RocketMQ常见问题排查思路
RocketMQ消费者订阅了tag,需要注意什么?
在RocketMQ中,⼀个消费组能同时订阅多个 tag,但⼀个消费组的不同消费者不能分开订阅不同的tag,即同⼀个消费组的订阅关系必须保持⼀样。例如:常见错误使⽤⽅式同⼀个项⽬中,⼀段消费代码订阅tagA,然后拷贝到这段代码再更改为tagB。
正确⽤法:
public void subscribe(){
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("arch_online_test_consumer");
consumer.subscribe("arch_online_test","tag1 || tag2 || tag3");
}
错误⽤法:
public class SubscribeTest {
public void subscribeA(){
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("arch_online_test_consumer");
anglerconsumer.subscribe("arch_online_test","tag1");
}
public void subscribeB(){
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("arch_online_test_consumer");
福州翻译consumer.subscribe("arch_online_test","tag2");
}
}
发现⼤量的RocketMQ client ⼤量的info⽇志输出,如何禁⽤?
答: 尝试以下设置,项⽬中使⽤了Slf4j
1、可以配置RocketmqClient的logger设置优先级为warn
2、也可以通过-Drocketmq.client.logUSlf4j=fal 和 -Drocketmq.client.logLevel=WARN 关闭MQ客户端使⽤Slf4j并提⾼⽇志等级
项⽬中没有使⽤Slf4j,可以通过-Drocketmq.client.logLevel=WARN调⾼⽇志等级。
我的服务消费后需要调⽤第三⽅接⼝,别⼈的接⼝调⽤有限制,Rocketmq消费可以限流吗? RocketMQ本⾝没有类似每秒消费多少条数据的精确限流,我们可以结合Sentienl来实现:
private String KEY = "arch"; // 资源名称由topic和消费组构成
giantspublic static void main(String[] args) throws InterruptedException, MQClientException {
initFlowControlRule(); // Sentinel流控规则
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("arch_consumer");
consumer.tNamesrvAddr("");
consumer.subscribe("arch_topic", "*");
consumer.tConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) {
Entry entry = null;spring是什么意思
try {
<(KEY); // 定义资源
entry = (KEY, EntryType.OUT);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
} catch (BlockException ex) {
// Blocked.被限流后消息重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} finally {
考研各科考试时间if (entry != null) {
}
}
condchance
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
面板英文
});
consumer.start();
如何快速学习韩语System.out.printf("Consumer Started.%n");
}
private static void initFlowControlRule() {
FlowRule rule = new FlowRule();
rule.tResource(KEY);
rule.tCount(5);// 每秒通过5条消息
rule.tGrade(RuleConstant.FLOW_GRADE_QPS);
rule.tControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);
rule.tMaxQueueingTimeMs(5 * 1000); // 排队超时时间5秒
FlowRuleManager.loadRules(Collections.singletonList(rule));
}
RocketMQ默认延迟等级有18个,我可以扩增吗?
可以的,但是不建议扩增太多等级,可以通过修改broker属性messageDelayLevel来实现,注意修改了后需要重启broker.
Caud by: ption.RemotingTimeoutException: wait respon on the channel <10.58.218.151:10910> timeout, 3000(ms)
1、这个错误⽣产者发送消息的出现⽐较频繁,发送超时;
2、业务层⾯:
先确认是⼀台机器有问题,还是多台有问题; 如果是只有⼀台有问题,⼤概率是业务服务或者机器有问题;
确认同时间段内段其它远程服务是否异常:⽐如dubbo调⽤、zookeeper调⽤、mysql调⽤等;
服务层⾯: 查看时间段内的JVM相关指标,⽐如GC次数、GC耗时等
机器层⾯: 通过falcon查看机器的load、cpu busy、disk io util、swap等
3、MQ层⾯:
如果发⽣的时间段,很多业务出现该情况,⼤概率是Broker问题
查看机器等load、cpu busy、disk io util 、swap、net等;
公司⽬前没有捕捉⽹络抖动的⼯具和平台,如果真发⽣的⽹络层⾯的抖动,是很难排查的. 下⾯就是⼀个经典的案例:
【图⽚】
Not found the consumer group consumer stats, becau return offt table is empty, maybe the consumer not consume any message
检查⽣产者、消费者使⽤客户端版本是否相同
新增消费组,消费起点如何设置
通过设置consumer.tConsumerFromWhere属性可以解决, 注意此属性是有在group第⼀次消费时⽣效,后续都是延续上⼀次消费进度offt进⾏消费.
消费被订阅了, 但是没有消费(offt过⼩)
根据messageID进⾏查询具体某个消息时, 会出现以上提⽰. 说明当前消息还没有被某个group消费,并且当前消息offt⼩于maxofft时,会有以上提⽰
确认是不是顺序消费,消费失败阻塞后续消费了?
producer运⾏起来发送消息时抛出异常: No route info of this topic
1、broker上不存在该topic, 创建该topic即可; 注意: 测试环境topic都是⾃动创建, 偶尔会出现创建异常情况, 使⽤⼿动创建保证创建成功;
2、broker没有正确链接到name rver上;
3、producer没有正确连接到name rver上;
最佳实践建议
1、消费端幂等性验证
rocketmq⽆法避免消息重复(Exactly-Once). 建议采取消息Id、业务唯⼀标识字段做幂等性
2、消费速度慢处理⽅式
1) 提⾼消费并⾏度
2) 跳过⾮重要消息
3) 优化消息消费过程
3、其它
1) 订阅组与topic多对⼀, 避免⼀对多
2) 顺序消息注意异常处理, 使⽤ack⽅式替代
3) 不建议阻塞监听器, 会导致阻塞线程池, 并最终线程池耗尽⽆法消费
4) 3.x版本消息重塑需要停⽌该group左右消费者应⽤, 否则不⽣效
5) 建议使⽤push模式, 没有特殊需求属性值尽量保证保持默认配置
================
控制台查询message,messageTrack提⽰ xxxgroup订阅了,但是被过滤掉了
⽐较消息和group的tag是否匹配。注意*的问题,只有在配置group订阅的时候*才有全匹配的意思,在消息在只是表⽰tag是字符串"*"
控制台查询message,messageTrack提⽰ xxxgroup订阅了,但是没有消费(Offt⼩)
表⽰还没有消费到这⼀条,可以查询消费者进度对⽐核实下
消息没有被消费。
先根据msgid查询消息所在queue和对应偏移,然后查询消费者偏移⽐较⼤⼩。如果消费者偏移⼩,说明还没消费。如果消费者偏移⼤说明已经越过,可以通过tag判断。也有可能是消费失败但是消费者没有记录异常。far的比较级
消费者⽆法正常消费
消费者启动但是⼀直有积压⽆法消费可能有⼀下⼏种可能
通group订阅信息不⼀致导致:详情
InstanceName 冲突:InstanceName是队列负载均衡算法的计算依赖,相同的InstanceName会导致队列分配混乱导致部分队列消费混乱
多起的的客户端:经常发现业务在项⽬中多起了客户端,可以先查询客户端连接然后与业务核实
消费队列不均匀
指观察消费队列 ⼀部分差值正常,⼀部分差值不正常有很多挤压。
卑鄙的我2电影
先观察消费者队列的分配情况,之后观察消费者连接的InstanceName 是否有冲突
异常:RemotingTimeoutException: wait respon on the channel <xxx> timeout, xxx(ms)
这个异常本质就是rpc调⽤超时,rocket的rpc请求都是异步的。这个异常表⽰发送成功后等待响应超过了最⼤等待时间。可能是有⼈发送了⼤消息,或者rocket发⽣了波动。可以适当的调整最⼤超时时间
tSendMsgTimeout
===================
上图中,⼀个 MQ 对应有两个消费者,他们是在同⼀个 Group1 中,起初⼤家都只有 Topic1,这时候是正常消费的。但如果在第⼀个消费者⾥⾯加⼊⼀个 Topic2,这时候是⽆法消费或消费不正常了。
这是 RocketMQ 本⾝的机制引起的问题,需要在第⼆个消费者⾥⾯加⼊ Topic2 才能正常消费。
为什么
因为broker 端通过group的订阅配置来构建consumeQueue 。
group 订阅配置存储在
ketmq.broker.BrokerController的SubscriptionGroupManager属性中
消费消费者启动后会定时发送⼼跳,⼼跳中会带上⾃⼰的订阅配置