Flink写入数据到ElasticSearch(ElasticSearch详细使用指南及采坑记录)

更新时间:2023-06-26 15:35:57 阅读: 评论:0

Flink写⼊数据到ElasticSearch(ElasticSearch详细使⽤指南及采坑记录)⼀、ElasticSearchSink介绍
在使⽤Flink进⾏数据的处理的时候,⼀个必要步骤就是需要将计算的结果进⾏存储或导出,Flink中这个过程称为Sink,官⽅我们提供了常⽤的⼏种Sink Connector,例如:
Apache Kafka
京剧四大名旦分别是Elasticarch
Elasticarch 2x
Hadoop FileSystem
这篇就选取其中⼀个常⽤的ElasticarchSink来进⾏介绍,并讲解⼀下⽣产环境中使⽤时的⼀些注意点,以及其内部实现机制。
⼆、使⽤⽅式
a、添加pom依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticarch2_2.10</artifactId>
形容快乐的成语
<version>1.3.1</version>
</dependency>
根据⾃⼰所⽤的filnk版本以及es版本对上⾯的版本号进⾏调整
b、实现对应代码
DataStream<String> input = ...;
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster-name");
离开反义词//该配置表⽰批量写⼊ES时的记录条数
config.put("bulk.flush.max.actions", "1");
List<InetSocketAddress> transportAddress = new ArrayList<>();
transportAddress.add(new ByName("127.0.0.1"), 9300));
transportAddress.add(new ByName("10.2.3.1"), 9300));
input.addSink(new ElasticarchSink<>(config, transportAddress, new ElasticarchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
//将需要写⼊ES的字段依次添加到Map当中
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
狗狗旅馆
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
c、扩展配置
经过上⾯的代码已经实现了⼀个基础版的EsSink,但是上述代码当ES集群出现波动的时候,由于不具备重试机制则有可能出现丢数据的情况。⽣产环境中为了实现数据完整性,我们需要添加⼀些失败重试配置,来实现写⼊失败情况下的容错处理,常⽤的失败重试配置有:
//1、⽤来表⽰是否开启重试机制
config.put("bulk.able", "true");
//2、重试策略,⼜可以分为以下两种类型
//a、指数型,表⽰多次重试之间的时间间隔按照指数⽅式进⾏增长。eg:2 -> 4 -> 8 ...
config.put("bulk.pe", "EXPONENTIAL");
//b、常数型,表⽰多次重试之间的时间间隔为固定常数。eg:2 -> 2 -> 2 ...
config.put("bulk.pe", "CONSTANT");
lucky什么意思//3、进⾏重试的时间间隔。对于指数型则表⽰起始的基数
config.put("bulk.flush.backoff.delay", "2");
//4、失败重试的次数
config.put("bulk.ies", "3");
其他的⼀些配置:
bulk.flush.max.actions: 批量写⼊时的最⼤写⼊条数
bulk.flush.max.size.mb: 批量写⼊时的最⼤数据量
bulk.flush.interval.ms: 批量写⼊的时间间隔,配置后则会按照该时间间隔严格执⾏,⽆视上⾯的两个批量写⼊配置
三、失败处理器
写⼊ES的时候很多时候由于ES集群队列满了,或者节点挂掉,经常会导致写⼊操作执⾏失败。考虑到这样的失败写⼊场景,EsSink为⽤户提供了失败处理器机制,创建Sink对象的时候,同时可以传⼊⼀个失败处理器,⼀旦出现写⼊失败的情况则会回调所传⼊的处理器⽤于错误恢复。具体的⽤法为:
DataStream<String> input = ...;
input.addSink(new ElasticarchSink<>(
四级分数构成config, transportAddress,
new ElasticarchSinkFunction<String>() {...},
new ActionRequestFailureHandler() {
@Override
void onFailure(ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer) throw Throwable {
if (ainsThrowable(failure, EsRejectedExecutionException.class)) {
// 将失败请求继续加⼊队列,后续进⾏重试写⼊
indexer.add(action);
} el if (ainsThrowable(failure, ElasticarchParException.class)) {
/
/ 添加⾃定义的处理逻辑
} el {
throw failure;
}
}
}));
霸气动漫头像男如果仅仅只是想做失败重试,也可以直接使⽤官⽅提供的默认的 RetryRejectedExecutionFailureHandler ,该处理器会对EsRejectedExecutionException 导致到失败写⼊做重试处理。
四、其他注意点
1、EsSink代码块不能使⽤try-catch-Exception来捕捉
之前在使⽤EsSink的时候,为了防⽌某次写⼊失败造成程序中断,对ElasticarchSinkFunction的 pr
ocess() ⽅法使⽤try-catch-exception语句块进⾏了捕捉,但实际运⾏的时候发现程序跑着跑着还是被⼀个 EsRejectedException 异常中断掉了。让⼈奇怪的是明明对异常进⾏了捕捉,为什么这个异常还是能够抛出来,下来通过查看源码发现,如果在初始化EsSink对象的时候没有传⼊ActionRequestFailureHandler 则会使⽤默认的 ActionRequestFailureHandler ,这个处理器的源码如下:
public class NoOpFailureHandler implements ActionRequestFailureHandler {
private static final long rialVersionUID = 737941343410827885L;
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
// 这⾥抛出的是⼀个throwable
throw failure;
}
可以看到,在发⽣异常的时候,默认的处理器会将异常包装成⼀个 Throw 对象抛出,这就是直接使⽤ try-Exception ⽆法捕捉到的原因。
解决⽅法:
实现⾃⼰的失败处理器消化掉异常
使⽤ throw 来捕捉异常
该问题⼀定要重点注意,负责会导致实时任务终⽌掉!
2、失败重试机制依赖于checkpoint
如果想要使⽤EsSink的失败重试机制,则需要通过ableCheckpoint()⽅法来开启Flink任务对checkpoint的⽀持,如果没有开启checkpoint机制的话,则失败重试策略是⽆法⽣效的。这个是通过跟踪 ElasticarchSinkBa 类源码的时候发现的,核⼼的代码如下:
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {绿豆芽炒肉丝
/
/ no initialization needed
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkErrorAndRethrow();
//如果没有开启checkPoint机制,则该变量为fal,也就导致下⾯的flush重试代码不会执⾏到
if (flushOnCheckpoint) {
do {
//失败重试的时机是发⽣在程序在打checkpoint的时候
bulkProcessor.flush();
checkErrorAndRethrow();
} while (() != 0);
}
}
3、经验学习
可以通过第⼆点贴出的源码发现,虽然EsSink实现了 CheckpointedFunction 接⼝,并且重写了checkPoint的相关⽅法,但其并没有墨守成规的利⽤checkpoint定义的那样利⽤State机制⽤于故障恢复。⽽是利⽤了checkpoint的空壳,定时执⾏的框架来实现了⾃⼰的⼀套失败重试机制。这⼀点很值得我们借鉴,很多知识点要学会活学活⽤,他⼭之⽯可以攻⽟

本文发布于:2023-06-26 15:35:57,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/82/1044697.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:失败   重试   机制   时候   实现   处理器   配置   时间
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图