RocketMQTAG过滤原理

更新时间:2023-07-05 18:14:33 阅读: 评论:0

RocketMQTAG过滤原理
RocketMQ TAG 过滤原理
RocketMQ 消息过滤分成TAG过滤和SQL Filter顾虑,其中 SQL Filter是在服务端处理,可以减轻客户端的处理压⼒,语法⽐较灵活,实现⽅式也相对复杂⼀些。Tags过滤实现⽐较简单,主要是在客户端实现。
订阅TOPIC
当⽤户订阅⼀个topic准备消费的时候,调⽤MQPushConsumer的subscribe⽅法
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(ConsumerGroup(),
topic, subExpression);
if (this.mQClientFactory != null) {
this.mQClientFactory.ndHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
这⾥⾮常简单,将⽤户传递进来的订阅信息包装成SubscriptionData之后放置到⼀个map当中,并不会做太多处理。
消息消费
DefaultMQPushConsumerImpl#pullMessage⽅法中,通过pull请求到broker端拉取信息,在broker响应的时候,调⽤pullcallback回调处理拉取的消息
协作学习
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
//这⾥对获得的消息做拦截,TAG过滤也在这⾥处理
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.MessageQueue(), pullResult,
subscriptionData);
.......
}
在拉取到消息之后,第⼀件事情就是调⽤DefaultMQPushConsumerImpl.this.pullAPIWrapper的processPullResult⽅法,在这个⽅法中会对Tags进⾏过滤
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
复工复产施工方案final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq, SuggestWhichBrokerId());
//查找到了对应的消息
if (PullStatus.FOUND == PullStatus()) {
ByteBuffer byteBuffer = ByteBuffer.MessageBinary());
//拿到全量的拉取的消息
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
List<MessageExt> msgListFilterAgain = msgList;
//如果是需要做过滤
if (!TagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {指尖上的爱
//开始做过滤,如果是对应的,就消费,否则就直接废弃了
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
for (MessageExt msg : msgList) {
性生活多久一次才正常if (Tags() != null) {
if (TagsSet().Tags())) {
msgListFilterAgain.add(msg);
}
}
}
}
没有英文怎么说
if (this.hasHook()) {
FilterMessageContext filterMessageContext = new FilterMessageContext();
西葫芦怎么吃filterMessageContext.tUnitMode(unitMode);
filterMessageContext.tMsgList(msgListFilterAgain);
}
长大的烦恼//处理和事务相关的东西
for (MessageExt msg : msgListFilterAgain) {
String traFlag = Property(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parBoolean(traFlag)) {
msg.Property(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
}
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
}上海的高校
pullResultExt.tMsgFoundList(msgListFilterAgain);
}
pullResultExt.tMessageBinary(null);
return pullResult;
}
在这⾥,如果订阅的时候有关注Tags,在消息到达的时候,按照订阅的tags进⾏过滤,⽤户监听消息的时候就只会收到⾃⼰关注的tag。⼩结
总体来说,rocketmq的tag过滤实现⽐较简单,broker端也不会关注tag,完全由客户端处理。Tag过滤语法简单,灵活性也⽐较差,⽐较适合过滤场景简单且客户端对计算资源不是很敏感的⽤户。如果希望更为复杂消息过滤功能可以尝试使⽤Sql Filter。

本文发布于:2023-07-05 18:14:33,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/82/1080276.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:过滤   消息   处理   时候   客户端   拉取   订阅   信息
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图