Rocketmqtopic管理及自动创建分析

更新时间:2023-07-25 04:46:50 阅读: 评论:0

Rocketmqtopic管理及⾃动创建分析
rocketmq topic 信息是保存在broker 上
1. borker会定期保存到store/config 下topics.json
2. 每次broker启动会默认创建系统级topic,其中就有TBW102,此topic的route(配置)将作为ndmessage时候未创建topic的默
认route
3. 禁⽌⾃动创建topic⽅法:可以配置broker的配置autoCreateTopicEnable=fal 不创建默认topic达到关闭
// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
if(BrokerConfig().isAutoCreateTopicEnable()){
雅思报名考试String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
ankaraTopicConfig topicConfig =new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.tReadQueueNums(BrokerConfig()
.getDefaultTopicQueueNums());
topicConfig.tWriteQueueNums(BrokerConfig()
.getDefaultTopicQueueNums());
男士如何保养int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.tPerm(perm);
}
4. 或者通过修改TBW102 PERM为0并且需要注意每个新broker增加的时候都需要设置为0(可以通过复⽤已经存在的broker存储信
息),也可以达到同样⽬的。
master之间不会同步topic,namesrv上的topic 只作为查询使⽤
nd message 中topic相关关键代码在DefaultMQProducerImpl的 ndDefaultImpl⽅法中, 调⽤:
TopicPublishInfo topicPublishInfo =Topic());
如果寻找不到当前topic,使⽤默认topic配置进⾏发送。
tryToFindTopicPublishInfo代码分析如下:
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic){
//先看看本地是否有
TopicPublishInfo topicPublishInfo =(topic);
if(null== topicPublishInfo ||!topicPublishInfo.ok()){
mitel
//从namesrv寻找,新topic将寻找不到,topicPublishInfo.isHaveTopicRouterInfo()为空
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo =(topic);
}
if(topicPublishInfo.isHaveTopicRouterInfo()|| topicPublishInfo.ok()){
return topicPublishInfo;
}el{
//新topic,没有找到route,进⼊此处
shinshin
//  defaultMQProducer -TOPIC TBW102
// Configuration_Client.md createTopicKey                  | TBW102          | When a message is nt, topics that do not exist on the rver are automatically c reated and a Key is specified that can be ud to configure the default route to the topic where the message is nt.|
///    DefaultMQProducer.java private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
//    public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"; // Will be created at broker when isAutoCreateTopicEnable
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic,true,this.defaultMQProducer);
topicPublishInfo =(topic);
return topicPublishInfo;
}
公务员考试复习资料
}
⼀路到发送消息
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
ndResult =MQClientAPIImpl().ndMessage(
四级包过brokerAddr,
msg,
requestHeader,
timeout - costTimeSync,
puncher
communicationMode,
context,
this);
portray
发送前没有创建topic,有broker在收到消息后负责创建,broker的具体实现在SendMessageProcessor类的
private RemotingCommand ndMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext ndMessageContext,
final SendMessageRequestHeader requestHeader)throws
⽅法中调⽤AbstractSendMessageProcessor.msgCheck的过程中实现创建
log.warn("the topic {} not exist, producer: {}", Topic(), ctx.channel().remoteAddress());            topicConfig =TopicConfigManager().c
reateTopicInSendMessageMethod(                Topic(),
RemotingHelper.parChannelRemoteAddr(ctx.channel()),
if(null== topicConfig){
Topic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)){
topicConfig =entrepreneur
TopicConfigManager().createTopicInSendMessageBackMethod(                            Topic(),1, PermName.PERM_WRITE | PermName.PERM_READ,                            topicSysFlag);
}
}
if(null== topicConfig){
respon.tCode(ResponCode.TOPIC_NOT_EXIST);
respon.tRemark("topic["+ Topic()+"] not exist, apply first plea!"
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return respon;
}

本文发布于:2023-07-25 04:46:50,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/78/1115854.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:创建   默认   考试   保存   时候
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图