Java实现对ES数据的新增,删除,修改,及合并

更新时间:2023-07-18 08:11:50 阅读: 评论:0

Java实现对ES数据的新增,删除,修改,及合并Java实现对ES数据的新增,删除,修改,及合并
新增数据
代码:
@Autowired
private RestHighLevelClient client;
/**
* @description ES写⼊数据
* @author zae
* @date 2022/1/13 14:40
* @param index 索引库
* @param dataList 数据集合(size为插⼊数据的条数)
*/
public void inrtEsData(String index,List<Map<String,Object>> dataList) {
BulkProcessor bulkProcessor = null;
try {
bulkProcessor = getBulkProcessor(client);
for(Map<String,Object> dataMap:dataList){
bulkProcessor.add(new IndexRequest(index).source(dataMap));
}
// 将数据刷新到ES中
bulkProcessor.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
boolean terminatedFlag = bulkProcessor.awaitClo(150L,
TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private BulkProcessor getBulkProcessor(RestHighLevelClient client) {
BulkProcessor bulkProcessor = null;
try {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
logger.info("Try to inrt data number : "
+ request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkRespon respon) {
logger.info("************** Success inrt data number : "
+ request.numberOfActions() + " , id: " +
executionId);
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
<("Bulk is unsuccess : " + failure + ", executionId: " + executionId);
看听学第一册}
};
BiConsumer<BulkRequest, ActionListener<BulkRespon>> bulkConsumer =
(request, bulkListener) -> client
.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer,
the climblistener);
builder.tBulkActions(5000);//刷新条数
builder.tBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));// 刷新⼤⼩
builder.tConcurrentRequests(10);//并发线程数
builder.tFlushInterval(TimeValue.timeValueSeconds(100L));// 时间频率
builder.stantBackoff(TimeValue.timeValueSeconds(1L), 3));//重试补偿策略
bulkProcessor = builder.build();
} catch (Exception e) {
e.printStackTrace();
try {
bulkProcessor.awaitClo(100L, TimeUnit.SECONDS);
} catch (Exception e1) {
<(e1.getMessage());
}
}
return bulkProcessor;
}
注意点:
传⼊数据中的List中的每个Map为⼀条数据,Map中的key为字段Field,value为值。
假如在插⼊数据前index库中没有定义Field及映射关系,在插⼊数据时会⾃动新增字段Field,字段的类型会默认映射为value值的类型。
假如新插⼊的数据和之前第⼀次插⼊的数据类型不⼀致,(相同的key但是value的类型不⼀样),这样在执⾏代码时可能不会报错失败,但是实际上并没有插⼊数据成功。
删除数据
代码
/**
* @description ES数据删除
* @author zae
* @date 2022/1/13 17:14
* @param index 索引库
* @param id 数据id
*/
public void delete(String index,String id){
DeleteRequest deleteRequest = new DeleteRequest(index,id);
DeleteRespon respon = null;
try {
respon = client.delete(deleteRequest,RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(respon);
expensive的同义词}
我的天用英语怎么说
删除的话直接指定索引库和要删除的数据的id进⾏删除,⾄于数据的id怎么获取,参考下⾯更新数据中的代码。
更新数据
直接修改
/**
* @description ES数据更新
* @author zae
* @date 2022/1/13 16:10
* @param index 索引库
* @param key 字段名
* @param value 更新后的值
* @param id  需要修改的那条数据的id
*/
public void update(String index,String key,Object value,String id){
aliastry {
UpdateRequest request = new UpdateRequest();
request.index(index) //索引名
良医妙药.id(id)//id
.doc(
XContentFactory.jsonBuilder()
.startObject()
.field(key, value)//要修改的字段及字段值
.endObject()
)
;
UpdateRespon respon= client.update(request,RequestOptions.DEFAULT);
System.out.println(respon.status());
} catch (Exception e) {
e.printStackTrace();
}
}
更新数据需要传⼊指定的索引库,需要修改的字段名称,修改后的value值,以及代表那条数据的唯⼀id
关于唯⼀id怎么获得,可以参考以下代码。
public void lectAndUpdate(){
int count = 0;
//step1:根据条件获取需要修改的数据的id
SearchHit[] archHits = esDeal.lectIdByKey(PRODUCT_DEV_INDEX, "name");
// step2:遍历更新查询出来的数据
for (SearchHit archHit : archHits) {
// 获取到单条记录的id
String id = Id();
//调⽤更新数据的核⼼⽅法
esDeal.update(PRODUCT_DEV_INDEX,"name","张三",id);
count++;
}
System.out.println("⼀共更新了"+count+"条数据");
}
public SearchHit[] lectIdByKey(String index,String key){
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.istsQuery(key));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(boolQueryBuilder)
.trackTotalHits(true)
.size(22);
SearchRequest archRequest = new SearchRequest()
.indices(index)
.source(sourceBuilder);
defaultgateway
try {
SearchRespon archRespon = client.arch(archRequest, RequestOptions.DEFAULT);
SearchHit[] hits = Hits().getHits();
return hits;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
数据合并
数据合并其实也是更新的⼀种,关键在于怎么提取及封装需要更新的数据,根据业务场景的不同达到合并的效果,以上⾯的图⽚为例,我需要将以上数据的20004.value⾥⾯的值合并上新的数据却也保留着原有的数据。
@Test
public void lectAndUpdate(){
int count = 0;
//step1:根据条件获取需要修改的数据的id
日语 培训SearchHit[] archHits = esDeal.lectIdByKey(PRODUCT_DEV_INDEX, "20004.value");
for (SearchHit archHit : archHits) {
经验交流会发言稿
String id = Id();
// step2:组装数据
Map<String, Object> sourceAsMap = SourceAsMap();
List<Map<String,Object>> mapList = (List<Map<String, Object>>) ("20004");
Map<String, Object> mapData = (0);
// 获取之前20004.value的数据,并添加上新的数据
List value = (List) ("value");
value.add("120021");
value.add("3990993");
mapData.put("value",value);
/
datagrid
/ step3:根据id更新数据
esDeal.update(PRODUCT_DEV_INDEX,"20004",mapList,id);
count++;
}
System.out.println("⼀共更新了"+count+"条数据");
}
备注:
代码中调⽤的lectIdByKey()以及update()在更新数据的第⼀部分都有代码,直接使⽤就好。根据以上代码更新后的数据为:
在更新20004.value的值时,不需要保持和20004.value的原有数据类型⼀致,更新成其他类型也是可以的,这点和插⼊key:value的数据是有区别的。

本文发布于:2023-07-18 08:11:50,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/90/180973.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:数据   修改   需要   获取   代码   新增
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图