Rocketmqtopic管理及⾃动创建分析
rocketmq topic 信息是保存在broker 上
1. borker会定期保存到store/config 下topics.json
2. 每次broker启动会默认创建系统级topic,其中就有TBW102,此topic的route(配置)将作为ndmessage时候未创建topic的默
认route
3. 禁⽌⾃动创建topic⽅法:可以配置broker的配置autoCreateTopicEnable=fal 不创建默认topic达到关闭
// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
if(BrokerConfig().isAutoCreateTopicEnable()){
雅思报名考试String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
ankaraTopicConfig topicConfig =new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.tReadQueueNums(BrokerConfig()
.getDefaultTopicQueueNums());
topicConfig.tWriteQueueNums(BrokerConfig()
.getDefaultTopicQueueNums());
男士如何保养int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.tPerm(perm);
}
4. 或者通过修改TBW102 PERM为0并且需要注意每个新broker增加的时候都需要设置为0(可以通过复⽤已经存在的broker存储信
息),也可以达到同样⽬的。
master之间不会同步topic,namesrv上的topic 只作为查询使⽤
nd message 中topic相关关键代码在DefaultMQProducerImpl的 ndDefaultImpl⽅法中, 调⽤:
TopicPublishInfo topicPublishInfo =Topic());
如果寻找不到当前topic,使⽤默认topic配置进⾏发送。
tryToFindTopicPublishInfo代码分析如下:
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic){
//先看看本地是否有
TopicPublishInfo topicPublishInfo =(topic);
if(null== topicPublishInfo ||!topicPublishInfo.ok()){
mitel
//从namesrv寻找,新topic将寻找不到,topicPublishInfo.isHaveTopicRouterInfo()为空
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo =(topic);
}
if(topicPublishInfo.isHaveTopicRouterInfo()|| topicPublishInfo.ok()){
return topicPublishInfo;
}el{
//新topic,没有找到route,进⼊此处
shinshin
// defaultMQProducer -TOPIC TBW102
// Configuration_Client.md createTopicKey | TBW102 | When a message is nt, topics that do not exist on the rver are automatically c reated and a Key is specified that can be ud to configure the default route to the topic where the message is nt.|
/// DefaultMQProducer.java private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
// public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"; // Will be created at broker when isAutoCreateTopicEnable
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic,true,this.defaultMQProducer);
topicPublishInfo =(topic);
return topicPublishInfo;
}
公务员考试复习资料
}
⼀路到发送消息
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
ndResult =MQClientAPIImpl().ndMessage(
四级包过brokerAddr,
msg,
requestHeader,
timeout - costTimeSync,
puncher
communicationMode,
context,
this);
portray
发送前没有创建topic,有broker在收到消息后负责创建,broker的具体实现在SendMessageProcessor类的
private RemotingCommand ndMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext ndMessageContext,
final SendMessageRequestHeader requestHeader)throws
⽅法中调⽤AbstractSendMessageProcessor.msgCheck的过程中实现创建
log.warn("the topic {} not exist, producer: {}", Topic(), ctx.channel().remoteAddress()); topicConfig =TopicConfigManager().c
reateTopicInSendMessageMethod( Topic(),
RemotingHelper.parChannelRemoteAddr(ctx.channel()),
if(null== topicConfig){
Topic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)){
topicConfig =entrepreneur
TopicConfigManager().createTopicInSendMessageBackMethod( Topic(),1, PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
}
}
if(null== topicConfig){
respon.tCode(ResponCode.TOPIC_NOT_EXIST);
respon.tRemark("topic["+ Topic()+"] not exist, apply first plea!"
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return respon;
}