在Elasticsearch中,通过指定文档的_id
,使用Elasticsearch
自带的index api
可以实现插入一条新document
,如果该_id
已存在,则将直接更新该document
。
因此,通过该index API
来对已有的文档实现更新,其实是进行了一次 reindex
的操作。
# es.index(index=index_name, doc_type="doc", id=id, body=dict())
def reindex_update_es(index_name: str, _id, data_dict: dict):
# Creates or updates a document in an index.
try:
res = es_obj.index(
index=index_name,
doc_type="_doc",
id=_id,
body=data_dict
)
if not res or (res.get('_shards') and res.get('_shards').get('failed') != 0):
return None
else:
return res.get('_id')
except Exception as e:
print('index data error: %s' % e)
return None
因为是 reindex
过程,如果通过这种方法修改, 当数据量或者 document 很大的时候,效率非常的低。
Elasticsearch中的update Api
支持根据用户提供的脚本去实现更新
update
更新操作允许ES获得某个指定的文档,可以通过脚本等操作对该文档进行更新。
可以把它看成是先删除再索引的原子操作,只是省略了返回的过程,这样即节省了来回传输的网络流量,也避免了中间时间造成的文档修改冲突。
在 Python 中可以直接通过包装好的接口来更新:
es.update(index="supermarket", doc_type="doc", id="tkOLi4IBGBS9mFAEdkef", body={"doc": {"title": "MacBook Pro 14", "price": 12999.0}})
注意 body
参数,我们需要添加 doc
来指定修改的内容
添加字段:
es.update(index="supermarket", doc_type="doc", id="tkOLi4IBGBS9mFAEdkef", body={"doc": {"title": "MacBook Pro 14", "price": 12999.0, "category": "Apple"}})
update_by_query,顾名思义,这种更新方式,即通过查询再更新。
该方法的优点是可以指定某些数据,然后达到更新的目的
在 ES 中,我们通过 update_by_query
中的 query
和 script
来实现先查询再更新的机制
body = {
"query": {
"bool":
{
"must": [
{"term": {"category.keyword": "小米"}}
]
}
},
"script": {
"inline": "ctx._source.category = params.category",
"params": {
"category": "xiaomi"
},
"lang": "painless"
}
}
es_obj.update_by_query(index="supermarket", doc_type="_doc", body=body)
在上面的操作中:query
字段,表示我们要查询的条件,根据该条件找到对应的数据 script
字段包含以下关键字:
在实际需求中,面对最多的还是批量更新
当然你也可以通过 for 循环一条一条来更新,不过这种方法效率太低了。
尤其是面对数据量很大的时候,那真的是急死人..
好在 ES 有提供批量操作的接口 bulk
在 Python 中可以直接导入使用
from elasticsearch.helpers import bulk
def bulk_update_es(index_name: str, data_list: list):
actions = []
for item in data_list:
_id = item.get('id')
doc = item.get('doc')
index_action = {
"_op_type": "update",
"_index": index_name,
"_type": "_doc",
"_id": _id,
"doc": doc,
}
actions.append(index_action)
if actions:
bulk(es_obj, actions)
data_list = [
{
"id": "sUNui4IBGBS9mFAECUdX",
"doc": {"title": "Apple Watch 8", "category": "Apple", "price": 3999.0}
},
{
"id": "1003",
"doc": {"title": "Apple Watch Ultra", "category": "Apple", "price": 5999.0}
},
{
"id": "1004",
"doc": {"title": "Mac Mini", "category": "Apple", "price": 4299.0}
},
]
bulk_update_es("supermarket", data_list)
可以看到有个 doc 的参数,和上面介绍的 update 方法类似,doc中的值便是我们需要修改的字段内容
_op_type 为操作类型为update,表明是更新的操作
以该种方式组合的 index_action 组成数组,通过 bulk 便能实现批量更新 !
基于Nginx+Supervisord+uWSGI+Django1.11.1+Python3.6.5构建