Elasticarch使⽤updateByQuery批量更新数据Elasticarch 使⽤updateByQuery批量更新数据
对于批量更新数据,通常我们有2种做法
1. 按照更新的条件,从es查询出所有待更新的记录的id,然后根据id再通过Bulk.Builder接⼝完成批量更新
2. 直接使⽤updateByQuery接⼝完成批量更新的操作
{
"mappings":{
"_doc":{
"properties":{
"uid":{
"type":"keyword"
},
zoom是什么意思
"urname":{
"type":"text"
},
"age":{
"type":"integer"
},
"like":{
"type":"keyword"
}
}
}
}
}
然后插⼊⼏条测试数据,查询得到以下4条数据
localhost:9200/ur/_arch
{
"hits":{
"total":4,
"max_score":1,
"hits":[
{
"_index":"ur",
victoria silvstedt
"_type":"_doc",
"_id":"MV2uAXgB4vfr7WLWaK3Y",
"_score":1,
"_source":{
"uid":4,
"urname":"⼩红",
"age":20,
"like":"篮球"
}
},
{
hammer是什么意思
"_index":"ur",
"_type":"_doc",
"_id":"MF2uAXgB4vfr7WLWK63s",
"_score":1,
"_source":{
"uid":3,
"urname":"⼩明",
"age":20,
"like":"篮球"
}
},
{
"_index":"ur",
放纵是什么意思"_type":"_doc",
"_id":"Ll2tAXgB4vfr7WLWcq1P",
"_score":1,
"_source":{
"uid":1,
"urname":"张三",
"age":18,
"like":"乒乓球"
}
},
{
"_index":"ur",
"_type":"_doc",
"_id":"L12tAXgB4vfr7WLW162C",
"_score":1,
"_source":{
"uid":2,
"urname":"李四",
"age":19,
"like":"⽻⽑球"
}
}
]
}
}
测试数据准备好后,我们的需求是把"age=20"的记录, “like"修改成"⽹球”1,按第⼀种更新⽅式,先查出es的id,再使⽤Bulk.Builder来更新:
@Test
public void testUpdateByQuery1()throws Exception{
HttpClientConfig.Builder builder =new HttpClientConfig.Builder("localhost:9200/");
JestClientFactory jestClientFactory =new JestClientFactory();
2019年河北高考一分一段表jestClientFactory.tHttpClientConfig(builder.build());
// 获取到jestClient对象
JestClient jestClient = Object();
尚孔教育// 构造查询条件
TermQueryBuilder termQueryBuilder = Query("age",20);
SearchSourceBuilder archSourceBuilder =new SearchSourceBuilder().query(termQueryBuilder).fetchSource(fal);
//fetchSource设置为fal表⽰不查询⽂档内容,只会返回id
Search.Builder arch =new Search.String()).addIndex("ur").addType("_doc");
SearchResult result = ute(arch.build());
System.out.println(result);
wirelesslan
}
打印输出的内容为:
Result:{"took":45,"timed_out":fal,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":2,"max_score":1.0,"hits":[{"_index":"ur","_type":" _doc","_id":"MV2uAXgB4vfr7WLWaK3Y","_score":1.0},{"_index":"ur","_type":"_doc","_id":"MF2uAXgB4vfr7WLWK63s","_score":1.0}]}}, isSucceeded:true , respon code:200, error message: null
可以看到已经查询出条件age=20的两条记录,并且只返回了es的id,有了id我们就能根据Bulk.Builder来进⾏批量更新:
@Test
public void testUpdateByQuery1()throws Exception{
HttpClientConfig.Builder builder =new HttpClientConfig.Builder("localhost:9200/");
JestClientFactory jestClientFactory =new JestClientFactory();
jestClientFactory.tHttpClientConfig(builder.build());
南昌超越英语学校// 获取到jestClient对象
JestClient jestClient = Object();
// 构造查询条件
TermQueryBuilder termQueryBuilder = Query("age",20);
SearchSourceBuilder archSourceBuilder =new SearchSourceBuilder().query(termQueryBuilder).fetchSource(fal);
Search.Builder arch =new Search.String()).addIndex("ur").addType("_doc");
SearchResult result = ute(arch.build());
List<String> esIdList = Hits(Map.class).stream().map(hit -> hit.id).List());
Bulk.Builder bulk =new Bulk.Builder();
esIdList.forEach(esId ->{
Map<String,Object> beanMap =new HashMap<>();
beanMap.put("like","⽹球");// 把like修改为⽹球
Map<String,Object> doc =new HashMap<>();
doc.put("doc", beanMap);
Update.Builder update =new Update.Builder(doc).id(esId);// 指定更新的id
bulk.addAction(update.build());
});
bulk.defaultIndex("ur").defaultType("_doc");//指定更新的索引名和type
BulkResult result1 = ute(bulk.build());
System.out.println(result1);
}
"total":4,
"max_score":1.0,
"hits":[
{
"_index":"ur",
"_type":"_doc",
"_id":"MV2uAXgB4vfr7WLWaK3Y",
"_score":1.0,
"_source":{
familiarize"uid":4,
"urname":"⼩红",
"age":20,
"like":"⽹球"
}
},
{
"_index":"ur",
"_type":"_doc",
"_id":"MF2uAXgB4vfr7WLWK63s",
"_score":1.0,
"_source":{
"uid":3,
"urname":"⼩明",
"age":20,
"like":"⽹球"
}
},
{
"_index":"ur",
"_type":"_doc",
"_id":"Ll2tAXgB4vfr7WLWcq1P",
"_score":1.0,
"_source":{
"uid":1,
"urname":"张三",
"age":18,
"like":"乒乓球"
}
},
{
"_index":"ur",
"_type":"_doc",
"_id":"L12tAXgB4vfr7WLW162C",
"_score":1.0,
"_source":{
"uid":2,
"urname":"李四",
"age":19,
"like":"⽻⽑球"
}
}
]
}
}
发现age=20的两条记录的like字段已经从“篮球”更改为“⽹球”
2,直接使⽤updateByQuery接⼝更新
从官⽹update_by_query可以看到,需要使⽤脚本来更新,并且要求的格式为:
"source":"ctx._unt++",
"lang":"painless"
},
"query":{
"term":{
"ur.id":"kimchy"
}
}
}
所以我们也要按这个格式来构造更新语句,在java代码⾥,我们可以使⽤XContentBuilder来构造json格式的对象
@Test
public void testUpdateByQuery2()throws Exception {
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().Query("age",20));
XContentBuilder xContentBuilder =jsonBuilder()
.startObject()
.field("query", queryBuilder)
.startObject("script")
.field("lang","painless")
.field("source","ctx._source.like='⽹球'")
.endObject().endObject()
;
xContentBuilder.flush();
final String payload =((ByteArrayOutputStream) OutputStream()).toString("UTF-8");
System.out.println(payload);
}
输出的内容为:
{
"query":{
"bool":{
"filter":[
{
"term":{
"age":{prehistorical
"value":20,
"boost":1
}
}
}
],
"adjust_pure_negative":true,
"boost":1
}
},
"script":{
"lang":"painless",
"source":"ctx._source.like='⽹球'"
}
}
可以看到通过这种⽅式构造的json数据,符合update_by_query的格式,接下来就使⽤updateByQuery的api来更新: