RocketMQ集群Norouteinfoofthistopic解决⽅案(转)1.问题描述
在针对RocketMQ集群做模拟故障测试,测试环境:
1.两台linux服务器,系统配置MEM:64G,CPU:32 core。RocketMQ版本4.
2.0。
2.每台服务器上部署⼀个nameSvr,主broker、备broker(两台服务器之间互为主备)
当在模拟其中⼀台RocketMQ服务器故障时(强制停⽌这台服务器所有程序进程),RocketMQ的producer程序出现如下异常:[java] view plain copy
[java]
ption.MQClientException: No route info ofthis topic, xxx
at ketmq.client.impl.producer.DefaultMQProducerImpl.ndDefaultImpl(DefaultMQProducerImpl.java:564)
at ketmq.client.impl.producer.DefaultMQProducerImpl.nd(DefaultMQProducerImpl.jav
a:1069)
at ketmq.client.impl.producer.DefaultMQProducerImpl.nd(DefaultMQProducerImpl.java:1023)
at ketmq.client.producer.DefaultMQProducer.nd(DefaultMQProducer.java:214)
mq.MQProductor.ndDelayMsg(MQProductor.java:144)
mq.MQProductor.ndSessionDelayMsg(MQProductor.java:126)
ssion.SessionUtil.start(SessionUtil.java:83)
ssion.SessionUtil.checkAndStart(SessionUtil.java:105)
connhandler.WebSocketFrameHandler.doPong(WebSocketFrameHandler.java:88)
connhandler.WebSocketFrameHandler.channelRead0(WebSocket
FrameHandler.java:55) connhandler.WebSocketFrameHandler.channelRead0(WebSocketFrameHandler.java:33) channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
dec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
dec.http.websocketx.Utf8FrameValidator.channelRead(Utf8FrameValidator.java:77)
channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
connhandler.WsWebSocketServerProtocolHandler$1.channelRead(WsWebSocketServerProto colHandler.java:125)
channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
dec.sions.WebSocketServerExtensionHandler.channelRead(WebSocketServerExtensi onHandler.java:102)
channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
dec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
dec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
channel.nio.ad(AbstractNioByteChannel.java:129)
channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:610)
channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:551)
channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:465)
channel.nio.NioEventLoop.run(NioEventLoop.java:437)
urrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
urrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
2.分析解决
作为集群服务当单点产⽣故障⽽导致⽣产者⽆法发送消息是很不合理,故苦苦去寻找解决⽅案,在⽹上看到很多⼈说在broker启动的时候设置autoCreateTopicEnable=true就可以解决问题,测试了下并没鸟⽤。根据异常的字⾯意思理解猜测是当这个broker挂了后这个topic的路由信息就消失了,故猜测使⽤RocketMQ的客户端对某个topic进⾏消费发送时,当这个topic不存在第⼀次会随机选择⼀个broker来创建并存储这个topic。
为了验证猜测使⽤mqadmin查看topic的route信息:
从上图可以明显的看到这个topic只存在于testMQ1中当这个broker故障和RocketMQ就不存在这个topic了,并且producter不会再⾃动在另外⼀台broker上创建这个topic(猜测重新初始化productor或者重启productor能再另外⼀个broker中创建topic,只是猜测未进⾏验证,因为不符合应⽤需求,出现异常就需要重新初始化productor或者重启是不可接受的,有兴趣的可以⾃⼰去验证下猜测是否正确)。
居然productor的⾃动创建topic不能满⾜需求,那针对这种情况就将要使⽤的topic进⾏⼿动创建,使⽤mqadmin对两台linux主broker都创建了对应的topic信息,这样其中⼀台挂了后另外⼀台照样有topic就能进⾏服务了。创建命令如下:
现在重现查看topic的route信息可以发现两台主broker都包含了这个topic:
现在再次模拟单点故障测试,测试结果完美⽆异常。搞定!