使⽤Flink时遇到的问题(不断更新中)
1.启动不起来
查看JobManager⽇志:
WARN org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to retrieve leader gateway and port.
akka.actor.ActorNotFound: Actor not found for: ActorSelection[p://flink@t-sha1-flk-01:6123/), Path(/ur/jobmanager)]
at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
urrent.impl.CallbackRunnable.run(Promi.scala:32)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
at akka.dispatch.ute(BatchingExecutor.scala:120)
at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
urrent.uteWithValue(Promi.scala:40)
urrent.impl.Complete(Promi.scala:248)
at akka.pattern.PromiActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
urrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
urrent.ute(BatchingExecutor.scala:109)
urrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
at akka.actor.uteTask(Scheduler.scala:474)
at akka.actor.LightArrayRevolverScheduler$$uteBucket$1(Scheduler.scala:425)
at akka.actor.LightArrayRevolverScheduler$$Tick(Scheduler.scala:429)
at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
at java.lang.Thread.run(Thread.java:748)
解决⽅案:/etc/hosts中配置的主机名都是⼩写,但是在Flink配置⽂件(flink-config.yaml、masters、slaves)中配置的都是⼤写的hostname,将flink配置⽂件中的hostname都
改为⼩写或者IP地址
2.运⾏⼀段时间退出
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 4 for operator Compute By Event Time (1/12).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
at urrent.Executors$RunnableAdapter.call(Executors.java:511)
at urrent.FutureTask.run(FutureTask.java:266)
pay过去式
at urrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at urrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caud by: java.lang.Exception: Could not materialize checkpoint 4 for operator Compute By Event Time (1/12).
qq登陆记录查询
... 6 more
Caud by: urrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=7061809 , maxSize=5242880 . Consider using a different state backend, like the File S at port(FutureTask.java:122)
at (FutureTask.java:192)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
... 5 more
Suppresd: java.lang.Exception: Could not properly cancel managed keyed state future.
at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
... 5 more
Caud by: urrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=7061809 , maxSize=5242880 . Consider using a different state backend, like the Fi at port(FutureTask.java:122)
at (FutureTask.java:192)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
... 7 more
Caud by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=7061809 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
at org.apache.flink.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:64)
at org.apache.flink.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.cloAndGetBytes(MemCheckpointStreamFactory.java:144)
at org.apache.flink.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.cloAndGetHandle(MemCheckpointStreamFactory.java:125)
at org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.cloStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
永川茶山竹海at urrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:372)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
at org.apache.flink.streaming.runtime.tasks.uteCheckpointing(StreamTask.java:1094)
at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
at org.apache.flink.streaming.runtime.iggerCheckpointOnBarrier(StreamTask.java:543)
at org.apache.flink.streaming.runtime.ifyCheckpoint(BarrierBuffer.java:378)
at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
at org.apache.flink.streaming.runtime.NextNonBlocked(BarrierBuffer.java:183)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
... 1 more
[CIRCULAR REFERENCE:java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=7061809 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.解决⽅案:
状态存储,默认是在内存中,改为存储到HDFS中:
state.backend.fs.checkpointdir: hdfs://t-sha1-flk-01:9000/flink-checkpoints
3.长时间运⾏后,多次重启
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 1488 for operator Compute By Event Time -> (MonitorData, MonitorDataMapping,
MonitorSamplingData) (6/6).}
at
org.apache.flink.streaming.runtime.tasks.StreamTaskAsyncCheckpointRunnable.run(StreamTask.java:948)urrent.ExecutorsAsyncCheckpointRunnable.run(StreamTask.java:948)atjava.u at urrent.FutureTask.run(FutureTask.java:266)
at urrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at urrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caud by: java.lang.Exception: Could not materialize checkpoint 1488 for operator Compute By Ev
ent Time -> (MonitorData, MonitorDataMapping, MonitorSamplingData)
(6/6).
... 6 more
Caud by: urrent.ExecutionException: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /flink-
checkpoints/8c274785f1ab027e6146a59364be645f/chk-1488/2c612f30-c57d-4ede-9025-9554ca11fd12 could only be replicated to 0 nodes instead of minReplication (=1).
韩国陆地面积There are 3 datanode(s) running and no node(s) are excluded in this operation.
at org.apache.hadoop.hdfs.rver.blockmanagement.BlockManager.chooTarget4NewBlock(BlockManager.java:1628)
at org.apache.hadoop.hdfs.rver.NewBlockTargets(FSNamesystem.java:3121)
at org.apache.hadoop.hdfs.rver.AdditionalBlock(FSNamesystem.java:3045)
at org.apache.hadoop.hdfs.rver.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:493)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtosClientNamenodeProtocolClientNamenodeProtocol2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngineServerServerProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at
org.apache.hadoop.ipc.RPCServer.call(RPC.java:982)atorg.apache.hadoop.ipc.ServerServer.call(RPC.java:982)atorg.apache.hadoop.ipc.ServerHandler1.run(Server.java:2217)atorg.apache 查看hdfs⽇志,
WARN org.apache.hadoop.hdfs.protocol.BlockStoragePolicy:
Failed to place enough replicas: expected size is 2 but only 0 storage types can be lected
(replication=3, lected=[], unavailable=[DISK], removed=[DISK, DISK],
policy=BlockStoragePolicy{HOT:7, storageTypes=[DISK], creationFallbacks=[], replicationFallbacks=[ARCHIVE]})
搭建的Flink使⽤HDFS作为CheckPoint的存储,当flink重启时,原来的checkpoint没有⽤了,我就⼿动给删了,不知道和这个有没有关系,为了不继续报异常,便重启了Flink、
HDFS,重启后不再有异常信息了。
但是查看HDFS⽇志时,发现如下警告(不合规范的URI格式):
WARN org.apache.hadoop.Util:
Path /mnt/hadoop/dfs/name should be specified as a URI in configuration files.
Plea update hdfs configuration
原来是配置错了,l中的
<property>
<name>dfs.namenode.name.dir</name>
<value>/mnt/hadoop/dfs/name</value>
</property>
应该改为:
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/mnt/hadoop/dfs/name</value>
</property>
⾄此问题解决,根上的问题应该是l配置的不对导致的。
4.Unable to load native-hadoop library for your platform
鸭头的做法Flink启动时,有时会有如下警告信息:
WARN org.apache.hadoop.util.NativeCodeLoader
- Unable to load native-hadoop library for
using builtin-java class where applicable
解决⽅案:编辑/etc/profile⽂件,增加
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib"
未能解决该问题
5.hadoop checknative -a
WARN bzip2.Bzip2Factory: Failed to load/initialize native-bzip2 library system-native, will u pure-Java version
INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
Native library checking:
hadoop: true /usr/hadoop-2.7.3/lib/native/libhadoop.so.1.0.0
zlib: true /lib64/libz.so.1
snappy: fal
lz4: true revision:99
bzip2: fal
openssl: fal Cannot load libcrypto.so (libcrypto.so: cannot open shared object file: No such file or directory)!
INFO util.ExitUtil: Exiting with status 1
解决⽅案lol新英雄艾翁
cd /usr/lib64/
ln -s libcrypto.so.1.0.1e libcrypto.so
6.TaskManager退出
Flink运⾏⼀段时间后,出现TaskManager退出情况,通过jvisualvm抓取TaskManager的Dump,使⽤MAT进⾏分析,结果如下:
One instance of "org.apache.flink.runtime.iowork.buffer.NetworkBufferPool"
loaded by "sun.misc.Launcher$AppClassLoader @ 0x6c01de310" occupies 403,429,704 (76.24%) bytes.
The memory is accumulated in one instance of "java.lang.Object[]" loaded by "<system class loader>".
Keywords
sun.misc.Launcher$AppClassLoader @ 0x6c01de310
java.lang.Object[]
org.apache.flink.runtime.iowork.buffer.NetworkBufferPool
发现是⽹络缓冲池不⾜,查到⼀篇⽂章:
和我遇到的情况差不多,也是使⽤了InfluxDB作为Sink,最后在Clo⾥进⾏关闭,问题解决。
另外,在$FLINK_HOME/conf/flink-conf.yaml中,也有关于TaskManager⽹络栈的配置,暂时未调整
# The number of buffers for the network stack.
#
# taskmanagerwork.numberOfBuffers: 2048
7.Kafka partition leader切换导致Flink重启
实习证明英文
现象:
7.1 Flink重启,查看⽇志,显⽰:
java.lang.Exception: Failed to nd data to Kafka: This rver is not the leader for that topic-partition.
at org.apache.tors.kafka.FlinkKafkaProducerBa.checkErroneous(FlinkKafkaProducerBa.java:373)
at org.apache.tors.kafka.FlinkKafkaProducerBa.invoke(FlinkKafkaProducerBa.java:280)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caud by: org.s.NotLeaderForPartitionException: This rver is not the leader for that topic-partition.
7.2 查看Kafka的Controller⽇志,显⽰:
高清黑色壁纸
INFO [SessionExpirationListener on 10], ZK expired; shut down all controller components and
try to re-elect (ller.KafkaController$SessionExpirationListener)
7.3 设置retries参数
设置了retries参数,可以在Kafka的Partition发⽣leader切换时,Flink不重启,⽽是做3次尝试:
kafkaProducerConfig
{
"bootstrap.rvers": "192.169.2.20:9093,192.169.2.21:9093,192.169.2.22:9093"
"retries":3
}