flink写⼊elasticarch报错!OOM内存溢出!连接异常关闭!
最近公司有个项⽬,需要flink实时地对elasticarch进⾏频繁的插⼊。但是在写⼊elasticarch的时候出现了OOM内存溢出的异常,以及连接异常中断的错误。
报错如下:1.Caud by: java.lang.IllegalStateException: I/O reactor has been shut down 连接异常关闭。
2.java.lang.OutOfMemoryError: Direct buffer memory OOM内存溢出。
⾸先解决第⼀个异常,连接中断。⽹上很多⼈说是因为es的client调⽤了clo⽅法,client请求在还没有完毕时就已经被关闭掉,导致后⾯的连接不可⽤,从⽽报出来这个异常。
但是我的代码⼀开始⽤的原⽣elasticarch7.12来执⾏插⼊请求,没⽤调⽤clo⽅法,所以异常可能是别的原因造成的。后⾯改为了flink封装的⽅法,需要⼿动关闭。
当然了,在解决这个问题之前,⼀定要保证代码本⾝执⾏没有问题,否则可能是其他的异常导致连接的关闭。
为了解决这个异常我们做了如下努⼒:
⽤flink封装的ElasticarchSink代替es原⽣的client来执⾏插⼊的请求。(可能原⽣的也可以,但是我们在测试过程中发现,flink封装的效果更好,更不容易出错)
然后设置参数:
1.设置超时时间: requestBuilder.tConnectTimeout(60000); requestBuilder.tSocketTimeout(60000);这⾥两个超时时间都设置的⼀分钟。
2.设置最⼤连接数和刷新周期: esSinkBuilder.tBulkFlushMaxActions(1); esSinkBuilder.tBulkFlushMaxSizeMb(1);
esSinkBuilder.tBulkFlushInterval(1);//刷新周期设置的1毫秒。
3.设置线程数量:
IOReactorConfig.custom().tIoThreadCount(5).build());
esSinkBuilder.tFailureHandler(new RetryRequestFailureHandler());//处理失败的Elasticarch请求
这⾥sink每执⾏⼀次就要建⽴⼀次请求,所以要进⾏关闭。if(build!=null)build.clo();
elasticarch7.12版本使⽤了登录验证
完整代码如下:
`//operator为flink数据流
SingleOutputStreamOperator<JSONObject> operator;
//elasticarch 地址
List<HttpHost> esAddress = EsAddress("locolhost1:9200,locolhost2:9200,locolhost3:9200,locolhost4:9200,locolhost5:9200");
//getEsAddress实体类
public static List<HttpHost> getEsAddress(String hosts) throws MalformedURLException {
String[] hostList = hosts.split(",");
List<HttpHost> address = new ArrayList<>();
for (String host : hostList) {
if (host.startsWith("http")) {
URL url = new URL(host);民事申诉状范文
address.add(new Host(), Port()));
} el {教养笔记大班
String[] parts = host.split(":", 2);
if (parts.length > 1) {
address.add(new HttpHost(parts[0], Integer.parInt(parts[1])));
} el {
throw new MalformedURLException("invalid elasticarch hosts format");
}
}
}
陌陌含情
return address;
}
予观夫巴陵胜状//elasticarch插⼊请求
ESSinkUtil.addSink(esAddress, 1, 8, operator, new ElasticarchSinkFunction<JSONObject>() {
@Override
public void process(JSONObject metric, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
requestIndexer.add(Requests.indexRequest()
.index(INDEX)
.("id"))
.JSONString(metric), XContentType.JSON));
}
明开头成语
});
//flink封装的elasticarch连接sink
public static <T> void addSink(List<HttpHost> hosts, int bulkFlushMaxActions, int parallelism,
SingleOutputStreamOperator<T> data, ElasticarchSinkFunction<T> func) {
try {
ElasticarchSink.Builder<T> esSinkBuilder = new ElasticarchSink.Builder<>(hosts, func);
esSinkBuilder.tBulkFlushMaxActions(bulkFlushMaxActions);//每次最⼤插⼊数量
esSinkBuilder.tBulkFlushMaxSizeMb(1);//最⼤插⼊内存
esSinkBuilder.tBulkFlushInterval(1);//插⼊刷新周期
esSinkBuilder.tFailureHandler(new RetryRequestFailureHandler());//处理失败的Elasticarch请求
/
/设置⾃定义http客户端配置
esSinkBuilder.tRestClientFactory(new RestClientFactory() {
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
final CredentialsProvider credentialsProvider =new BasicCredentialsProvider();
credentialsProvider.tCredentials(AuthScope.ANY,new UrnamePasswordCredentials(USER, PASSWORD));
restClientBuilder.tHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
//httpClientBuilder.disableAuthCaching();
httpClientBuilder.tDefaultCredentialsProvider(credentialsProvider);
return httpClientBuilder.tDefaultIOReactorConfig(
IOReactorConfig.custom()
.tIoThreadCount(5)//设置线程数量为5
.build());
}
}).tRequestConfigCallback(new RestClientBuilder.RequestConfigCallback(){
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestBuilder) {
requestBuilder.tConnectTimeout(5000);//连接超时时间
requestBuilder.tSocketTimeout(60000);
requestBuilder.tConnectionRequestTimeout(10000);
return requestBuilder;
}
});
}
});
//todo:xpack curity
ElasticarchSink<T> build = esSinkBuilder.build();
data.addSink(build).tParallelism(parallelism);
if (build!=null) build.clo();//build⽤完后⼀定要关闭
} catch (Exception e) {
e.printStackTrace();
}
}
//处理失败的Elasticarch请求
public class RetryRequestFailureHandler implements ActionRequestFailureHandler {
public RetryRequestFailureHandler() {
}
@Override
public void onFailure(ActionRequest actionRequest, Throwable throwable, int i, RequestIndexer requestIndexer) throws Throwable {
if (ExceptionUtils.findThrowable(throwable, EsRejectedExecutionException.class).isPrent()) {
requestIndexer.add(new ActionRequest[]{actionRequest});
} el {
if (ExceptionUtils.findThrowable(throwable, SocketTimeoutException.class).isPrent()) {
漏景
return;
} el {
Optional<IOException> exp = ExceptionUtils.findThrowable(throwable, IOException.class);咸柠檬
if (exp.isPrent()) {
IOException ioExp = ();
if (ioExp != null && Message() != null && Message().contains("max retry timeout")) {
return;
}
}
}
throw throwable;
}
}
人活着到底是为了什么
}
//下⾯原⽣elasticarch建⽴client连接
public static RestHighLevelClient client(){
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.tCredentials(AuthScope.ANY,
new UrnamePasswordCredentials(USER, PASSWORD)); //es账号密码(默认⽤户名为elastic)
//创建带⽤户名密码的ES客户端对象
try {
if (null == client){
client = new RestHighLevelClient(RestClient.builder(new HttpHost(PRODUCE_HOST,PORT,SCHEMA)
,new HttpHost(PRODUCE_HOST2,PORT,SCHEMA),new HttpHost(PRODUCE_HOST3,PORT,SCHEMA),new HttpHost(PRODUCE_HOST4,PORT,SCHEMA) ,new HttpHost(PRODUCE_HOST5,PORT,SCHEMA))
//异步HTTPclient连接数配置
.tHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
//httpClientBuilder.disableAuthCaching();
httpClientBuilder.tDefaultCredentialsProvider(credentialsProvider);
return httpClientBuilder.tDefaultIOReactorConfig(
IOReactorConfig.custom()
.tIoThreadCount(5)
.build());
}
})
.tRequestConfigCallback(new RestClientBuilder.RequestConfigCallback(){
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestBuilder) {
requestBuilder.tConnectTimeout(5000);
requestBuilder.tSocketTimeout(60000);
requestBuilder.tConnectionRequestTimeout(10000);
return requestBuilder;
}
})
);
}
} catch (Exception e) {
e.printStackTrace();
}
return client;
}`
然后解决OOM内存溢出的问题,我们出发点是代码⾥调整并⾏度,保证有⾜够的slot可⽤,暂定为8。然后环境配置⾥调⼤内存。⼀般来说内存溢出就是存在内存泄漏
还有可能是代码本⾝异常太多,导致程序异常。通过修改代码,找到可能出现异常的地⽅,进⾏修改。
接着就是给flink设置重启策略
上述操作弄好之后,flink的报错就消失了,之前任务⼀直跑不上去,放到ui上⾯马上就报红失败。
码字不易,如果问题解决了别忘了留⾔点赞噢