ElasticarchScroll(游标)API详解
今天我们来探讨⼀下Elasticarch Scroll API,在这之前我们先回顾⼀下数据库的知识。
1. 相关数据库知识(帮助理解)
传统数据库游标:游标(cursor)是系统为⽤户开设的⼀个数据缓冲区,存放SQL语句的执⾏结果。每个游标区都有⼀个名字,⽤户可以⽤SQL语句逐⼀从游标中获取记录,并赋给主变量,交由主语⾔进⼀步处理。就本质⽽⾔,游标实际上是⼀种能从包括多条数据记录的结果集中每次提取⼀条记录的机制。
游标是⼀段私有的SQL⼯作区,也就是⼀段内存区域,⽤于暂时存放受SQL语句影响到的数据。通俗理解就是将受影响的数据暂时放到了⼀个内存区域的虚表中,⽽这个虚表就是游标。艾米 怀恩豪斯
2. 为什么使⽤Elasticarch Scroll
当Elasticarch响应请求时,它必须确定docs的顺序,排列响应结果。如果请求的页数较少(假设每页20个docs), Elasticarch不会有什么问题,但是如果页数较⼤时,⽐如请求第20页,Elasticarch不得不取出第1页到第20页的所有docs,再去除第1页到第19页的docs,得到第20页的docs。
解决的⽅法就是使⽤Scroll。因为Elasticarch要做⼀些操作(确定之前页数的docs)为每⼀次请求,所
新东方前途出国官网以,我们可以让Elasticarch储存这些信息为之后的查询请求。这样做的缺点是,我们不能永远的储存这些信息,因为存储资源是有限的。所以Elasticarch中可以设定我们需要存储这些信息的时长。
3. 如何使⽤ Elasticarch Scroll
我们只需在普通的query后加上scroll的参数例如: curl -XGET localhost:9200/bank/account/_arch?pretty&scroll=2m -d {“query”: {“match_all”:{}}} 其中“2m” 代表2分钟,是需要保存的时长(数字+单位,具体单位表⽰见表1)。
Table 1. 时间单位对照表
Time Units
y Year
M Month
w Week
d Day
h Hour
m Minute
s Second
上⾯的查询语句返回如下结果:
{
"_scroll_id": : " cXVlcnlUaGVuRmV0Y2g7NTs5MTM6aDEySHRHNVpScVNiN2VUZVV6QV9xdzs5MTQ6aDEySHRHNVpScVNiN2VUZVV6QV9xdzs5MTU6aDEySH RHNVpScVNiN2VUZVV6QV9xdzs5MTc6aDEySHRHNVpScVNiN2VUZVV6QV9xdzs5MTY6aDEySHRHNVpScVNiN2VUZVV6QV9xdzswOw==",
"took" : 3,
"timed_out" : fal,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
}…
观察返回结果可以发现,新增加了“_scroll_id”部分。这是⼀个在稍后查询中需要⽤到的句柄。之后的查询语句如下:
托尔斯泰简介curl –XGET 'localhost:9200/_arch/scroll?
scroll=2m&pretty&scroll_id=cXVlcnlUaGVuRmV0Y2g7NTs5MTM6aDEySHRHNVpScVNiN2VUZVV6QV9xdzs5MTQ6aDEySHRHNVpScVNiN2VUZVV6QV9xdzs5MT U6aDEySHRHNVpScVNiN2VUZVV6QV9xdzs5MTc6aDEySHRHNVpScVNiN2VUZVV6QV9xdzs5MTY6aDEySHRHNVpScVNiN2VUZVV6QV9xdzswOw=='
expo在上⾯语句的返回结果中,⼜会包含“_scroll_id”部分,在每次的查询中都会返回⼀个新的 “_scroll_id”(只有最新返回的“_scroll_id”是有效的),再次的查询只能使⽤最新的“_scroll_id”。注:如果查询中包含聚合,只有最初的查询结果是聚合结果。
3.1. Scanning Scroll API
如果只对查询结果感兴趣⽽不关⼼结果的顺序,可以使⽤更⾼效的scanning scroll。使⽤⽅法⾮常简单,只需在查询语句后加
上“arch_type=scan”即可。
curl -XGET 'localhost:9200/bank/account/_arch?pretty&scroll=2m&arch_type=scan' -d '{"size":3,"query":{"match_all":{}}}'
curl –XGET 'localhost:9200/_arch/scroll?scroll=2m&pretty&scroll_id=c2Nhbjs1OzkzMzpoMTJIdEc1WlJxU2I3ZVRlVXpBX3F3OzkzNzpoMTJIdEc1WlJxU2I3ZVRlV XpBX3F3OzkzNDpoMTJIdEc1WlJxU2I3ZVRlVXpBX3F3OzkzNjpoMTJIdEc1WlJxU2I3ZVRlVXpBX3F3OzkzNTpoMTJIdEc1WlJxU2I3ZVRlVXpBX3F3OzE7dG90YW xfaGl0czoxMDAwOw==
A scanning scroll 查询与 a standard scroll 查询有⼏点不同:
1. A scanning scroll 查询结果没有排序,结果的顺序是doc⼊库时的顺序;
2. A scanning scroll 查询不⽀持聚合
3. A scanning scroll最初的查询结果的“hits”列表中不会包含结果
4. A scanning scroll 最初的查询中如果设定了“size”,如下:
curl -XGET 'localhost:9200/bank/account/_arch?pretty&scroll=2m&arch_type=scan' -d '{"size":3,"query":{"match_all":{}}}'
这个“size”是设定每个分⽚(shard)的数量,也就是说如果设定size=3,⽽有5个shard,每次返回结果的最⼤值就是3*5=15。
3.2. 清除Scroll API
1. 在scroll arch过程中保存的信息在超时后会⾃动删除,但是我们也可以通过clear scroll API来⼿动删除。如下:
curl –XDELETE 'localhost:9200/_arch/scroll -d 'c2Nhbjs2OzM0NDg1ODpzRlBLc0FXNlNyNm5JWUc1'
2. 如果要删除多个,可以⽤逗号隔开,也可以通过下⾯语句全部删除
curl –XDELETE 'localhost:9200/_arch/scroll/_all'
4. Elasticarch 客户端TransportClient
高二英语练习册答案TestTransportClient.java(使⽤TransportClient代码⽰例)
// 实例化ES配置信息
Settings ttings = ingsBuilder()
.put("ansport.sniff", true)
.put("cluster.name", "hansight_cluster")
.put("node.name","node1").build();
// 实例化ES客户端
Client client = new TransportClient(ttings)
.addTransportAddress(new InetSocketTransportAddress("XXX.XXX.XXX.XXX", 9300));
// 设置Scroll参数,执⾏查询并返回结果
SearchRespon scrollResp = client.prepareSearch("bank")
.
tTypes("account")
.tSearchType(SearchType.SCAN)
.tScroll(new TimeValue(20000))
.tSize(3).execute().actionGet();
4.1. TransportClient#execute() 过程
下⾯先介绍⼀下execute()的过程,整个过程分两⼤步骤:
第⼀步:nd request
在设置完各种arch request属性后,返回的是⼀个ActionRequestBuilder的对象(本例中是SearchRequestBuilder对象),该类位于:org.elasticarch.action包下(SearchRequestBuilder位于:org.elasticarch.action.arch下)
在execute(…)⽅法中,创建了⼀个ActionListener(PlainListenableActionFuture是ActionListener的实现类)。那么这个ActionListener 的作⽤是什么呢?我们从源码中找到关于ActionListener的说明:“A l
istener for action respons or failures”,是⽤来监听action的执⾏结果(响应或者失败)的。创建源码如下:
public ListenableActionFutureexecute() {
PlainListenableActionFuturefuture = new PlainListenableActionFuture<>(request.listenerThreaded(), threadPool);
execute(future);
return future;
}
listener已经创建好了,但是我们还没有对应要监听的action,于是开始构建相应的action,在org.elasticarch.client.support. AbstractClient的arch⽅法中我们找到了构建过程:
@Override
public void arch(final SearchRequest request, final ActionListenerlistener) {
execute(SearchAction.INSTANCE, request, listener);
}
在源码的SearchAction.INSTANCE这⼀步构建了我们本次arch所需的action。那么action是⼲嘛的呢?action在源码中的说明
为“Ba action. Supports building the Request through a RequestBuilder”,可见它是⽤来创建请求的,本例中我们就是创建了⼀个arch的请求,并⽤listener来监听它的执⾏结果。
然后再通过action的proxy来调⽤执⾏ansport. TransportService的ndRequest⽅法将请求发送到rver端。
第⼆步:rvice respon
rvice端收到request后经过什么操作我们暂且不管(在rvice端中会详细说明)。Service处理完成后把结果返回到客户端。⾸先介绍⼀个⾮常重要的类:ansportty包下的MessageChannelHandler。服务端的respon返回后⾸先到了MessageChannelHandler的messageReceived(…)⽅法:
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
…
if (TransportStatus.isRequest(status)) {…}
el {
TransportResponHandler handler = ve(requestId);
// ignore if its null, the adapter logs it
if (handler != null) {
if (TransportStatus.isError(status)) {
handlerResponError(wrappedStream, handler);
} el {
Channel(), wrappedStream, handler);
}
} el {
// if its null, skip tho bytes
}
…
英语转换器regulation}
然后从缓存中⼜取得了这个这次请求的handler。通过调⽤handler.handleRespon(respon)这⼀步从⽽把respon返回到了future对象中(注:future=this.client.prepareSearch(“bank”).tTypes(“account”).tScroll(new TimeValue(20000)).tSize(3).execute())。⾄此execute()操作完成。
4.2. actionGet()操作
根据execute()操作返回的future对象直接调⽤actionGet()就获得结果。
SearchRespon scrollResp = future. actionGet();
5. Elasticarch Scroll 服务端
Elasticarch scroll ⾸次查询和之后的查询流程有所不同,设置了scan属性的查询和没设置该属性的查询流程也不同,client设置的request的属性不同,决定了Service端的处理流程。
⾸先MessageChannelHandlerd的messageReceived⽅法接受到请求,在handleRequest⽅法中获得请求相应的handler。这⾥需要注意的是,根据请求的不同,获取到的handler也是不⼀样的,例如第⼀次scroll arch的handler是
org.elasticarch.action.arch.TransportSearchAction .TransportHandler,⽽之后的scroll query的handler就变成
org.elasticarch.action.arch.TransportSearchScrollAction.TransportHandler。
之后再调⽤handler的messageReceived的⽅法,下⾯我们看⼀下这个⽅法:
public void messageReceived(SearchRequest request, final TransportChannel channel) throws Exception {
// no need for a threaded listener
chinkrequest.listenerThreaded(fal);
execute(request, new ActionListener() {
@Override
public void onRespon(SearchRespon result) {
try {
channel.ndRespon(result);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.ndRespon(e);
} catch (Exception e1) {
logger.warn("Failed to nd respon for arch", e1);
}
}
});
}
这个⽅法包含两个参数request和channel,request顾名思义,它包含了我们请求的信息,channel意
思是通道,⽽它的作⽤也确实类似于通道,它可以把操作结果(成功返回请求结果,失败则返回失败信息)返回给客户端。
下⾯我们再研究⼀下具体的操作类org.elasticarch.pe.TransportSearchQueryThenFetchAction(我们以first scroll arch为例说明)。它的业务逻辑主要靠其内部类AsyncAction来完成。通过调⽤performFirstPha⽅法(其实是⽗类BaAsyncAction的⽅法)来进⾏first scroll arch,这个过程分三步:
第⼀步:query(ndExecuteFirstPha这个⽅法);
第⼆步:fetch(onFirstPhaResult这个⽅法);
第三步:最后将获得的结果通过回调Respon(respon)将结果返回给客户端。
6. 关于scroll_id的⼏点补充
scroll_id是在Elasticarch rver端产⽣的。
6.1. 在哪⼀步产⽣的?
在fetch刚刚结束的时候,即TransportAction的各种⼦类,在本⽂例⼦中就是TransportSearchQueryThenFetchAction,在调⽤TransportSearchQueryThenFetchAction.AsyncAction. innerFinishHim()⽅法中:
void innerFinishHim() throws Exception {
InternalSearchRespon internalRespon = (sortedShardList, firstResults, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.archType(), firstResults, null);
}
}
调⽤了TransportSearchHelper的buildScrollId⽅法,来创建了scroll_id
6.2. scroll_id的含义
先让我们来看⼀下TransportSearchHelper的buildScrollId⽅法:
public static String buildScrollId(SearchType archType, AtomicArray archPhaResults, @Nullable Map attributes) throws IOException {
if (archType == SearchType.DFS_QUERY_THEN_FETCH || archType == SearchType.QUERY_THEN_FETCH) {
return buildScrollId(PardScrollId.QUERY_THEN_FETCH_TYPE, archPhaResults, attributes);
}
}
genetically...
public static String buildScrollId(String type, AtomicArray archPhaResults, @Nullable Map attrib
utes) throws IOException {
...
deBytes(bytesRef.bytes, bytesRef.offt, bytesRef.length, Ba64.URL_SAFE);
}
可以看到buildScrollId包含三个参数:
Type:String,查询的类型,PardScrollId.QUERY_THEN_FETCH_TYPE=queryThenFetch;
archPhaResults:结果信息
attributes:查询条件参数
可见⽣成的scroll_id中包含了这⼀次查询的查询⽅式以及⼀些结果信息,由于每次查询的结果不同,所以⽣成的scroll_id也不同,这也是Elasticarch API要求每次scroll查询要⽤最新 scroll_id的原因。
下⾯我们解码⼀个scroll_id来看看它的内容:
String scrollId = "cXVlcnlUaGVuRmV0Y2g7NTsxMTU0OmgxMkh0RzVaUnFTYjdlVGVVekFfcXc7MTE1MzpoMTJIdEc1WlJxU2I3ZVRlVXpBX3F3OzExNTY6aD EySHRHNVpScVNiN2VUZVV6QV9xdzsxMTU1OmgxMkh0RzVaUnFTYjdlVGVVekFfcXc7MTE1NzpoMTJIdEc1WlJxU2I3ZVRlVXpBX3F3OzA7";
BytesRef scroll_id_bytes = new BytesRef();
scroll_id_bytes.bytes = Ba64.decode(scrollId);
CharsRef chars = new CharsRef();
UnicodeUtil.UTF8toUTF16(scroll_id_bytes.bytes, 0, scroll_id_bytes.bytes.length, chars);
System.out.println("================");
StringBuffer sb = new StringBuffer();
for (char c : chars.chars) {
sb.append(c);
}
System.out.println(sb);
System.out.println("================");
运⾏结果如下:四级怎么算分
================
queryThenFetch;5;1154:h12HtG5ZRqSb7eTeUzA_qw;1153:h12HtG5ZRqSb7eTeUzA_qw;1156:h12HtG5ZRqSb7eTeUzA_qw;1155:h12HtG5ZRqSb7eTeUzA_q w;1157:h12HtG5ZRqSb7eTeUzA_qw;0;
================