Elasticarch使⽤update_by_query
elasticarch中有⼀个⽅法是批量修改,就是先查询出需要修改的索引记录,然后批量修改。这个本来没什么,但是使⽤过的都知道,⽤java来调⽤这个⽅法很别扭。
⼀般来说,我们使⽤elasticarch,都建议使⽤Java Rest Client,就是RestHighLevelClient这个api。这⾥得从Java Client和Java Rest Client说起了,低版本的elasticarch提供了ElasticarchClient的实现,TransportClient,⼀个传输客户端。在6.4.x 版本中,需要⽤TransportClient来构建ElasticarchClient,并且这个client才是实现本⽂update_by_query所需的client。
因为我们要使⽤UpdateByQueryRequestBuilder,所以必须使⽤ElasticarchClient,⽽这个client只能通过TransportClient来构建,在6.4.x版本中,我们还需要引⼊transport依赖:
<dependency>
<groupId>org.elasticarch.client</groupId>
<artifactId>transport</artifactId>
<version>6.4.0</version>
</dependency>
因为RestClient⽆法满⾜构建条件。transportclient连接elasticarch,使⽤的端⼝是9300,⽽不是和restclient⼀样使⽤的是http 连接的9200。
对⽐⼀下restclient构建:
elasticarch开启了两种不同类型的服务端⼝,transportclient与elasticarch更加的解耦合。
前⾯提到了要使⽤UpdateByQueryRequestBuilder,就必须使⽤transport依赖。⽽改依赖⾥⾯就是⼀个简单的实现类PreBuiltTransportClient。
前⾯废话这么多,⽆⾮就是现在elasticarch版本多,依赖版本也多,⽽且有的api被丢弃,有的api发⽣了改变,让我们很难捉摸。david beckham
下⾯我们通过⼀个简单的⽰例,了解⼀下update_by_query,⾸先通过⼯具以命令的⽅式看看执⾏结果:这⾥构建⼀个
index=students,type=student的索引,有3条记录,每条记录有⼀个age字段均为18。
严格英文
refresher这⾥通过执⾏查询然后修改操作,将student的age全部修改为32:
操作执⾏成功,受影响的记录有3条。再次查看所有的索引记录:
以上这个步骤是通过命令的⽅式验证了_update_by_query的可⾏性,我们现在通过java代码的⽅式来实现这种操作,前⾯说了,这个操作需要⽤到ElasticarchClient,⽽ElasticarchClient需要通过TransportClient来构建。这⾥直接给出源代码,我的l依赖是这样的:
java代码:
实战口译 林超伦
lasticarch;
import java.InetAddress;ishare
import java.util.HashMap;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticarch.client.ElasticarchClient;
import ings.Settings;
import ansport.TransportAddress;
import org.elasticarch.index.query.QueryBuilders;twist是什么意思
import org.index.BulkByScrollRespon;
import org.index.UpdateByQueryAction;
import org.index.UpdateByQueryRequestBuilder;
import org.elasticarch.script.Script;
import org.elasticarch.script.ScriptType;
import ansport.client.PreBuiltTransportClient;
@SuppressWarnings("unchecked")
public class UpdateByQueryApp {
private static final Logger log = Logger(UpdateByQueryApp.class);
private static ElasticarchClient client = null;
static{
try {
client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(
new ByName("127.0.0.1"),9300)
);
} catch (Exception e) {
e.printStackTrace();
wild style
}
}
public static void update(){
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction
.wRequestBuilder(client);
Map<String, Object> params = new HashMap<String, Object>();
params.put("age", 18);
ScriptType type = ScriptType.INLINE;
String lang = "painless";
String code = "ctx._source.age=params.age";
Script script = new Script(type, lang, code, params);
BulkByScrollRespon respon = updateByQuery.source("students").script(script)
.Query("age", "32"))
.abortOnVersionConflict(fal)
.get();
log.info("update : "+Updated());
beckman}
public static void main(String[] args) {
update();
}
多血症}
前⾯我们通过命令的⽅式将所有students索引记录的age修改为了32,这⾥我们就将所有age=32的记录,全部修改为age=18。运⾏程序,控制台打印信息如下:
2019-09-04 21:15:02 - org.elasticarch.plugins.PluginsService.logPluginInfo [main] [INFO ] - no modules loaded
2019-09-04 21:15:02 - org.elasticarch.plugins.PluginsService.logPluginInfo [main] [INFO ] - loaded plugin [org.index.ReindexPlugin] 2019-09-04 21:15:02 - org.elasticarch.plugins.PluginsService.logPluginInfo [main] [INFO ] - loaded plugin [org.elasticarch.join.ParentJoinPlugin]
2019-09-04 21:15:02 - org.elasticarch.plugins.PluginsService.logPluginInfo [main] [INFO ] - loaded plugin [org.elasticarch.percolator.PercolatorPlugin] 2019-09-04 21:15:02 - org.elasticarch.plugins.PluginsService.logPluginInfo [main] [INFO ] - loaded plugin [org.elasticarch.script.mustache.MustachePlugin] 2019-09-04 21:15:02 - org.elasticarch.plugins.PluginsService.logPluginInfo [main] [INFO ] - loaded plugin [ansport.Netty4Plugin]
2019-09-04 21:15:05 - lasticarch.UpdateByQueryApp.update [main] [INFO ] - update : 3
表明批量修改成功,可以查看记录:umd
需要注意的是,transportclient和restclient他们构建时所需的端⼝是不⼀样的,分别是9300和9200。