August Rush

一个还在努力成长的小火汁!

游龙当归海,海不迎我自来也。

We create our own demons.

You can reach me at augustrush0923@gmail.com
Python更新Elasticsearch数据三种方法
发布:2023年03月02日 | 作者:augustrush | 阅读量: 1620

全局更新

在Elasticsearch中,通过指定文档的_id,使用Elasticsearch自带的index api可以实现插入一条新document,如果该_id已存在,则将直接更新该document

因此,通过该index API来对已有的文档实现更新,其实是进行了一次 reindex 的操作。

# es.index(index=index_name, doc_type="doc", id=id, body=dict())

def reindex_update_es(index_name: str, _id, data_dict: dict):
    # Creates or updates a document in an index.
    try:
        res = es_obj.index(
            index=index_name,
            doc_type="_doc",
            id=_id,
            body=data_dict
        )
        if not res or (res.get('_shards') and res.get('_shards').get('failed') != 0):
            return None
        else:
            return res.get('_id')
    except Exception as e:
        print('index data error: %s' % e)
    return None

因为是 reindex 过程,如果通过这种方法修改, 当数据量或者 document 很大的时候,效率非常的低。


局部更新

Elasticsearch中的update Api支持根据用户提供的脚本去实现更新

update更新操作允许ES获得某个指定的文档,可以通过脚本等操作对该文档进行更新。

可以把它看成是先删除再索引的原子操作,只是省略了返回的过程,这样即节省了来回传输的网络流量,也避免了中间时间造成的文档修改冲突。

在 Python 中可以直接通过包装好的接口来更新:

es.update(index="supermarket", doc_type="doc", id="tkOLi4IBGBS9mFAEdkef", body={"doc": {"title": "MacBook Pro 14", "price": 12999.0}})

注意 body 参数,我们需要添加 doc 来指定修改的内容

添加字段:

es.update(index="supermarket", doc_type="doc", id="tkOLi4IBGBS9mFAEdkef", body={"doc": {"title": "MacBook Pro 14", "price": 12999.0, "category": "Apple"}})


按条件检索后更新

update_by_query,顾名思义,这种更新方式,即通过查询再更新。

该方法的优点是可以指定某些数据,然后达到更新的目的

在 ES 中,我们通过 update_by_query 中的 queryscript 来实现先查询再更新的机制

body = {
    "query": {
        "bool":
            {
                "must": [
                    {"term": {"category.keyword": "小米"}}
                ]
                }
    },
    "script": {
        "inline": "ctx._source.category = params.category",
        "params": {
            "category": "xiaomi"
        },
        "lang": "painless"
    }
}
es_obj.update_by_query(index="supermarket", doc_type="_doc", body=body)

在上面的操作中:query 字段,表示我们要查询的条件,根据该条件找到对应的数据 script 字段包含以下关键字:

  • source 是将要执行的脚本内容;
  • lang 表示的是当前脚本的语言;
  • param 则是脚本执行的参数;


批量更新

在实际需求中,面对最多的还是批量更新

当然你也可以通过 for 循环一条一条来更新,不过这种方法效率太低了。

尤其是面对数据量很大的时候,那真的是急死人..

好在 ES 有提供批量操作的接口 bulk

在 Python 中可以直接导入使用

from elasticsearch.helpers import bulk


def bulk_update_es(index_name: str, data_list: list):
    actions = []
    for item in data_list:
        _id = item.get('id')
        doc = item.get('doc')
        index_action = {
            "_op_type": "update",
            "_index": index_name,
            "_type": "_doc",
            "_id": _id,
            "doc": doc,
        }
        actions.append(index_action)
    if actions:
        bulk(es_obj, actions)


data_list = [
    {
        "id": "sUNui4IBGBS9mFAECUdX",
        "doc": {"title": "Apple Watch 8", "category": "Apple", "price": 3999.0}
    },
    {
        "id": "1003",
        "doc": {"title": "Apple Watch Ultra", "category": "Apple", "price": 5999.0}
    },
    {
        "id": "1004",
        "doc": {"title": "Mac Mini", "category": "Apple", "price": 4299.0}
    },
]
bulk_update_es("supermarket", data_list)

可以看到有个 doc 的参数,和上面介绍的 update 方法类似,doc中的值便是我们需要修改的字段内容

_op_type 为操作类型为update,表明是更新的操作

以该种方式组合的 index_action 组成数组,通过 bulk 便能实现批量更新 !



  • 标签云

  • 支付宝扫码支持一下

  • 微信扫码支持一下



基于Nginx+Supervisord+uWSGI+Django1.11.1+Python3.6.5构建

京ICP备20007446号-1 & 豫公网安备 41100202000460号

网站地图 & RSS | Feed