RocketMQ(⼗):rocketMQ的幂等性是什么?解决了什么问题?幂等性的两
种解决⽅。。。
前⾔:(简单聊聊)
rocketMq为啥要花这么多精⼒去学习,可能就和现在⽐较⽕的注册中⼼:nacos⼀样,可能是⼀种前景或者是⼀种趋势罢了,现在技术迭代太快,今天学了hibernate,明天出mybatis,学了mybatis,出来plus,学完spring,出来cloud,总之学就完了,技多不压⾝,学不等于会,但是学可以扩充⾃⼰的思想可以知道更多的解决思路和解决⽅案,有了解决⽅案和思路,剩下的就是版本迭代,百度撑腰,各种⼤佬指点,最后终极版就是成型的“项⽬”了。瞎扯⼀下,不喜勿喷;
1.上图先?【⼀个订单发了多个优惠券,这不是亏本买卖了?】
这图画的有点乱,稍安勿躁,⼀个个的来捋顺:
1.1:订单系统发送mq的时候会重复吗?
假设⽤户在⽀付成功之后,我们的订单系统收到了⼀个⽀付成功的消息,这个是没有问题的,但是不知道是⽹络波段还是什么原因,订单系统处理的速度有点慢,导致⽀付系统和订单系统之间的请求出现了超时,此时有可能⽀付系统再次重试调⽤了订单系统通知这个订单⽀付成功了,然后订单系统再次发送了⼀个订单⽀付成功的消息,也就是重复推送了两次消息,前提第⼀条消息是发送成功的,但是⽹络波动,导致请求⽹络超时,⾃动调⽤订单系统中的重试机制,那么此时再次发送⼀条消息过去就形成了重复发送消息;
1.2:优惠券系统的家常便饭,请求超时,版本迭代重启机器等等
假设优惠券系统已经拿到了⼀条订单⽀付成功的消息,然后已经进⾏处理了,说明已经对这个订单发送了⼀张优惠券,此时应该返回⼀个CONSUME_SUCCESS的状态,然后提交消费进度offt到broker的,但是不巧的是,刚刚发送完优惠券,还没有来得及提交消息的offt到broker,优惠券系统突然重启,这个时候还没有将这条消息的offt给broker,broker并不知道已经处理完了这条消息,然后优惠券系统重启之后,broker就会再次把这条消息交给你,再⼀次尽⼼处理,然后再⼀次发送优惠券,导致重复发送了两次优惠券。
…等等案例,都可能会出现重复消费的事件发⽣!
2.1:幂等性概念:
解决了上⾯的问题:其实就是⽤来避免对同⼀个请求或者是同⼀条消息进⾏重复处理的机制,所谓的幂等,意思就是例如有你有⼀个接⼝,然后如果别⼈对⼀次请求重试了很多次,来调⽤你的接⼝,你就必须保证⾃⼰系统的数据是正常的,不能多出来⼀些重复的数据,也就是幂等性;解决MQ消息重复问题关键就是引⼊幂等性机制《概念》
2.1:发送消息到mq的时候如何保证幂等性?
两种常⽤的解决⽅案:
1.业务判断法:
举个例⼦:当⽀付系统重试调⽤订单系统结构的接⼝是,需要发送⼀个请求到mq取,查询当前mq是否存在针对这个订单的⽀付信息,如果mq告诉你,针对id=1000的这个订单⽀付成功消息,已经写⼊进来了,那么订单系统就可以不⽤再发送这条消息到mq了,业务判断法的核⼼在于,消息肯定是存在于mq中的,到底是否已经发送,只有mq知道,如果mq中有,就不发了,如果没有就发,所以当订单系统的接⼝被重试调⽤的时候,这个接⼝上来就应该发送请求到mq中取查询⼀下,⽐如订单id=1000这个订单的⽀付成功消息,mq中有⽆?
基于redis缓存的幂等性机制状态判断法:这个⽅法的核⼼在于,需要引⼊⼀个redis缓存来存储是否发送过消息的状态,如果成功发送了⼀个消息到mq中,得去redis写⼀条数据,标记这个消息已经发送了,重复调⽤的时候在redis缓存中查询⼀下,这个订单的⽀付消息是否已经发送了,**缺陷:**redis的状态判断法,没法完全做到幂等性,打个⽐⽅:⽀付系统发送请求给订单系统,然后已经发送消息到mq中取了,但是订单系统突然崩了,还没来得及写⼊到redis中,就⽆法判断了;
2.2:优惠券系统如何保证消息处理的幂等性呢?
基于业务判断法即可,优惠券系统每次拿到⼀条消息给⽤户发⼀张优惠券,实际上就是向数据库插⼊⼀条数据,如果优惠券系统从mq 中拿到两条或者是多条重复的数据,只需要从优惠券数据库查询⼀下,⽐如id=1100的订单,是否已经发放过优惠卷了,是否有优惠卷的记录,有就不发了呗,这样就可以简单⾼效的解决消息的重复发送处理了;对于mq的消息重复问题,可以完成mq可以接收重复的消息,不会对系统的核⼼数据直接造成影响,但是关键要保证从mq获取消息的时候,必须要保证消息不能重复处理,⼀般采⽤的是业务判断法;
3.死信队列(就是咋咋都没处理的消息)【还是⽼规矩,上个图先】
1.如果优惠券系统的数据库宕机,会咋样?
《爱咋咋没⼯夫伺候了还!》
2.数据库宕机的时候,是否可以返回CONSUME_SUCCESS?
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context
){
//这⾥对获取到的msg订单消息进⾏处理
//例如增加⼏份,发送优惠卷,通知发货等等
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS
}
})
⼀旦返回成功,下⼀次就会处理下⼀批消息,但是这批消息实际上是没有处理成功的,所以消息就直接丢失了;
3.如果对消息的处理有异常,可以返回reconsume_later状态
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context
){
try{
//这⾥对获取到的msg订单消息进⾏处理
//例如增加⼏份,发送优惠卷,通知发货等等
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
catch(Exception e){
//如果因为数据库宕机,对消息处理失败了
//返回⼀个稍后重试消费状态later
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
})
如果消息处理失败了,就返回RESONSUME_LATER状态,让rocketMQ稍后再重新将这批消息给我,稍后重试对这批消息进⾏处理;
简单来说rocketmq会有⼀个针对consumerGroup的重试队列,如果返回了RECONSUME_LATER状态,他就会将这批消息放到这个消费组的重试队列中,假设消费组的名称“VoucherConsumerGroup”,那么它会有⼀
个“%RERAY%VoucherConsumerGroup”这个名字的重试队列,过⼀段时间之后,重试队列中的消息会再次给消费者进⾏消费,如果再次处理失败,⼜返回了reconsume_later,那么再过⼀段时间⼜会重新进⼊重试,默认最多是可以重试16次,每次重试的时间不⼀样,间隔时间可以如下配置:messageDelayLevel = 1s 5s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
5.如果连续重试16次还是⽆法处理消息?怎么办?
如果连续重试16次还是⽆法处理成功,这个时候就要进⼊另外⼀个队列了,叫做死信队列,所谓的死信队列,顾名思义就是死掉的消息放⼊这个队列中,死信队列名**“%DLQ%VoucherConsumerGroup”**,在rocketMQ的管理后台上是可以看到的,
6.对死信队列中的消息,如何处理?
具体看使⽤场景,例如专门开⼀个后台的线程,就是定于“%DLQ%VoucherConsumerGroup”这个死
信队列,对死信队列中的消息还是⼀直不停的进⾏重试机制。
7.消息处理失败场景下的⽅案总结
消费者底层⼀些依赖可能有故障了,⽐如数据库宕机,缓存宕机之类的,此时就没有办法完成消息的处理了,那么就可以通过⼀些返回状态去让消息进⼊rocketMQ⾃带的重试队列,同时如果反复重试还是不⾏,就可以让消息进⼊rocketMQ⾃带的死信队列,后续针对死信队列中的消息进⾏单独处理就可以了。
这个章节分享的差不多了,谢谢⼤家的耐⼼观看,表达错误的地⽅,还望评论交流;