Kafka常见的导致重复消费原因和解决⽅案
问题分析
导致kafka的重复消费问题原因在于,已经消费了数据,但是offt没来得及提交(⽐如Kafka没有或者不知道该数据已经被消费)。
总结以下场景导致Kakfa重复消费:
原因1:强⾏kill线程,导致消费后的数据,offt没有提交(消费系统宕机、重启等)。
原因2:设置offt为⾃动提交,关闭kafka时,如果在clo之前,调⽤ consumer.unsubscribe() 则有可能部分offt没提交,下次重启会重复消费。
例如:
try{ consumer.unsubscribe();}catch(Exception e){}
try{ consumer.clo();}catch(Exception e){}
上⾯代码会导致部分offt没提交,下次启动时会重复消费。
解决⽅法:设置offt⾃动提交为fal
整合了Spring配置的修改如下配置
spring配置:
able-auto-commit=fal
sumer.auto-offt-ret=latest
整合了API⽅式的修改it为fal
API配置:
Properties props =new Properties();
props.put("bootstrap.rvers","localhost:9092");
props.put("group.id","test");
props.put("it","fal");
⼀旦设置了 it 为 true,Kafka 会保证在开始调⽤ poll ⽅法时,提交上次 poll 返回的所有消息。从顺序上来说,poll ⽅法的逻辑是先提交上⼀批消息的位移,再处理下⼀批消息,因此它能保证不出现消费丢失的情况。
原因3:(重复消费最常见的原因):消费后的数据,当offt还没有提交时,partition就断开连接。⽐如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的ssion timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有⼀定⼏率offt没提交,会导致重平衡后重复消费。
原因4:当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。
原因5:当消费者消费的速度很慢的时候,可能在⼀个ssion周期内还未完成,导致⼼跳机制检测报告出问题。
原因6:并发很⼤,可能在规定的时间(ssion.time.out默认30s)内没有消费完,就会可能导致reblance重平衡,导致⼀部分offt⾃动提交失败,然后重平衡后重复消费
问题描述:
我们系统压测过程中出现下⾯问题:异常rebalance,⽽且平均间隔3到5分钟就会触发rebalance,分析⽇志发现⽐较严重。错误⽇志如下:
08-09 11:01:11 131 pool-7-thread-3 ERROR [] - commit failed
org.apache.sumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subquent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the ssion timeout or by reducing the maximum size of batches returned in poll() with ds.
at
org.apache.sumer.internals.ConsumerCoordinator.ndOfftCommitRequest(ConsumerCoordinator.j ava:713) ~[MsgAgent-jar-with-dependencies.jar:na] at
org.apache.sumer.itOfftsSync(ConsumerCoordinator.java:59
6) ~[MsgAgent-jar-with-dependencies.jar:na] at
org.apache.itSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na] MsgConsumer.run(MsgCo
nsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na] at urrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[na:1.8.0_161] at urrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
这个错误的意思是,消费者在处理完⼀批poll的消息后,在同步提交偏移量给broker时报的错。初步分析⽇志是由于当前消费者线程消费的分区已经被broker给回收了,因为kafka认为这个消费者死了,那么为什么呢?
问题分析:
这⾥就涉及到问题是消费者在创建时会有⼀个属性max.poll.interval.ms(默认间隔时间为300s),
该属性意思为kafka消费者在每⼀轮poll()调⽤之间的最⼤延迟,消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有被再次调⽤,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。
处理重复数据
因为offt此时已经不准确,⽣产环境不能直接去修改offt偏移量。
所以重新指定了⼀个消费组(group.id=order_consumer_group),然后指定auto-offt-ret=latest这样我就只需要重启我的服务了,⽽不需要动kafka和zookeeper了!
#consumer
up-id=order_consumer_group
sumer.key-derializer=StringDerializer
sumer.value-derializer=StringDerializer
able-auto-commit=fal
sumer.auto-offt-ret=latest
注:如果你想要消费者从头开始消费某个topic的全量数据,可以重新指定⼀个全新的group.id=new_group,然后指定auto-offt-ret=earliest即可。