mqtt获取所有topic_MQTT——订阅报⽂
我们已经把相关的连接报⽂搞定了。笔者想来想去还是决定先讲解⼀下订阅报⽂(SUBSCRIBE )。如果传统的通信⽅式是客户端和服务端之间⼀般就直接传输信息。但是MQTT的通信⽅式是通过发布/订阅的⽅式进⾏的。笔者不知道他是否跟设计模式中的发布订阅模式有没有关系。可是他们思想却有⼀点相似之处。
客户端知道服务上有很多个主题。就好⽐如说有很多消息的分类⼀样⼦。有社会新闻、体育讲坛等。那么客户端只要找到⾃⼰感兴趣的进⾏订阅就可以了。⼀个客户端可以向服务器订阅多个主题。⽽所谓的发布就是客户端对不同的主题进⾏发布信息。即好⽐如新闻的发布者⼀样⼦。这个时候只要订阅这个主题的客户端就可以接收到来⾃服务端的新闻。我们的⼿机常常会接收到⼀些推送的信息。事实上有很多App应⽤都是⽤MQTT协议来进⾏的。所以不难看出服务端主要是负责客户端和客户端的之间信息的传输和信息管理。⼤⾄如图下
注意:发布者也是客户端。订阅者也是客户端
主题(Topic )
如果主题只是⼀个字符串值的话,那么显然会⽐较单调。这样⼦功能也显得⽐较⽆⼒。所以在主题上⾯
就了所谓的分隔符和通配符的说法(个⼈想法)。分隔符的意思就是让主题可以分层次。就好如说主题“体育讲坛/篮球/NBA”。看到这样⼦的主题,请问⼀下你还有什么不明⽩的话。是不是感觉很有层次感。剩下只有⼀个问题?如果我们订阅了主题“体育讲坛/篮球/NBA”,并向主题“体育讲坛/篮球”发布⼀个信息。那么已经订阅主题“体育讲坛/篮球/NBA”的客户端们是不是可以接受到信息呢?反过来讲如果我们订阅了主题“体育讲坛/篮球”,向主题“体育讲坛/篮球/NBA”发信息,客户端们是否⼜能接受信息呢?
笔者就以HiveMQ作服务器来做⼀下上⾯的⼩实验。如下
客验结果显然是失败的——订阅主题“体育讲坛/篮球/NBA”的客户端根本收不到来⾃主题“体育讲坛/篮球“的发布信息。说明分隔符就是⽤于主题名的分层次。没有别的意思。
通过上⾯的实验我们知道如果想要收到NBA就是必须订阅主题“体育讲坛/篮球/NBA”。可是总是有⼀些⼈只要是篮球的新闻有喜欢。怎么办。通配符的功能就出来了。通配符有俩种——"+"和“#”。+为单层的通配符。表⽰当前这⼀层的全都合⾮。这样⼦以上⾯的说到的例⼦来做实验。我们订阅⼀个主题为“体育讲坛/篮球/+”。按照理解的意思就是只要是在“体育讲坛/篮球”的信息都是我们想要的。结果如下
我们可以看到笔者在“体育讲坛/篮球/NBA”和“体育讲坛/篮球/ABC”各发布了信息。结果他都能收到。
那么如果我们对主题“体育讲坛/篮球”或是主题“体育讲坛/篮球/NBA/福州专场”发布信息呢?笔者试过了很可惜都不⾏。
记得我们上⾯说到有⼀些⼈只要跟篮球相关的都喜欢。可是如果使⽤通配符“+”是可以接近我们的要求。注意是接近。“+”通配符只是表⽰当前⼀层的。从当前的第⼆层就不⾏了。⽽本⾝的层也不算。就像上⾯的。只有篮球下的⼦⼀层才是合⾮的。讲到这⾥⼤家⼀定会想到⽤“#“通配符试试。没有错。“#“通配符就是表⽰当前本⾝和下⾯⼦层所有。如下
实验的结果很终满⾜了。
对于主题,在⽂档中有⼀个要求——主题不能以 ”#“ "+" "$" 为开头。对于”#“ ” +“的话,⼤家都好理解。那么”$“⼜是什么⿁。在⽂档我们可以看到这样⼦的字符"$SYS"。事实上他们是想说”$“开头的主题⼀般⽤于系统内部的⼀些主题。你们可以去找⼀些第三⽅的MQTT服务器。都会有很多以”$“开头的主题。cold
第六次反法同盟
SUBSCRIBE报⽂
通过上⾯的介绍。笔者想你们⼀定对MQTT通信⽅式有了⼀定的概念。⽽本章的订阅报⽂就是⽤于告诉服务器我想要什么的主题了。通过前⾯⼏章的了解。我们知道报⽂的固定报头是少了的。笔者就以MQTT 3.1.1来介绍吧。如下
SUBSCRIBE报⽂的INT值是8。所以对应的⼆进制为1000。后⾯的DUP QOS RETAIN对应是0010。其中QOS是必须是01。对订阅者来讲,他⼀定希望⾃⼰的订阅是成功的。所以订阅报⽂的QOS是01就相当好理解了。如果不理解QOS是什么的话,请看⼀下前⾯⼏章。
订阅报⽂也有可变报头,可变报头只有⼀个消息ID。消息ID是从客端端开始分配的。笔者为什么样⼦认为呢?主要是看到客户端在发布信息的时候就要求消息ID。所以笔者才会觉得消息ID在客户端进⾏分配的。当然也不是什么报⽂都会消息ID的。但是有消息ID⼀般QOS⼤于0。
订阅报⽂的有效载荷⾥⾯存在了相关的订阅订题列表。前⾯说过可以⽀持⼀个客户端多个订阅。列表⾥⾯每有⼀主题项只有俩个值。⼀个表⽰主题名,⼀个表⽰服务质量要求(Requested QoS)。这⾥的服务质量要求(Requested QoS)和 固定报头的服务质量的值是⼀样⼦。但是⽤意却是不⼀样⼦。这⾥是指这个订阅者接收这主题的服务质量最⼤等级。举个列⼦吧。笔者订阅了⼀个主题主题“体育讲坛/篮
球/NBA”,同时他的服务质量要求(Requested QoS)的值为1。这个时候有⼀个发布者在这个主题上发布⼀个服务质QOS为2。笔者还是可以收到这个发布者发来的信息。只是信息的服务质量QOS却变为1了。要明⽩QOS(1)和QOS(2)的执⾏⾏为是不样⼦的。这个后⾯章节会讲到。当然如果发布者在这个主题上发布⼀个服务质QOS为0。这就没有什么区别了。如下
对于有效载荷笔者这⾥就不多讲解了。也没有什么可说的。看⽂档的图⽚就够了如下。
宏观上:
微观上:
列表出我们可以看他订阅了俩个主题。⼀个主题”a/b“,⼀个主题”c/b“。上⾯列出⼤概的图⽚(宏观上)和⽐较细的图⽚(微观上)。如果看不懂也没有关系。笔者接下来会⽤代码来抓⼀包看看。相信在对照⼀下就明⽩列表出画的是什么。
域名抢注现在让我们好好想想当服务器接收到来⾃客户端的订阅报⽂的时候要做些什么样⼦的反应呢?⾸先我们要明⽩如果服务端接收到⼀个订阅报⽂,第⼀步想到⼀定是查看订阅报⽂的格式是不是正确的。相关的主题名是不是为空的。主题名的写法是不是⾮法。这些⼀定离不开。当然对应的⼀些共有的验证笔者就不说了。⼀切没有问题的情况下,服务器会去看⼀下当前订阅者前⾯有没有订阅过相同的主题。如果有就替换当前的。如果没有就创建⼀下新的。然后服务器在根据当前主题查找⼀下符合保留的信息。如果有,就发送给当前的订阅者。然后发送⼀个订阅报⽂确定(SUBACK )。当然这前后没有规定。先发送⼀个订阅报⽂确定(SUBACK ),在处理保留的信息也是可以的。
注意:在发送符合保留的信息就要对QOS进⾏处理。上⾯笔者也讲过了。
SUBACK 报⽂
当服务端处理SUBSCRIBE报⽂的时候,都会⽣成⼀个SUBACK 报⽂来回应订阅者。笔者这⾥不想对他太过的讲解。他的内容也很简单。如下
对于SUBACK 报⽂的可变报头⾥⾯也只有⼀个消息ID。⽽且跟SUBSCRIBE报⽂的消息ID是⼀样⼦的。有效载何的内容存放是订阅主题的服务质量要求(Requested QoS)。笔者在MQTT 3.1 ⽂档时⾯可以看到有多个主题的列⼦。可是在MQTT 3.1.1⾥⾯却没有。那么笔者就把MQTT 3.1.1的放在下⾥吧。读者们可以⾃⾏查看。
上⾯列表⾥⾯显⽰返回码,事实上是主题相关的服务质量要求(Requested QoS)。所以就可以知道他可以会返回四个值。如下
QOS 0:0x00探亲假>孤单的背影图片
QOS 1:0x01
QOS2 :0x02
Failure :0x80
田亮跳水
代码实现
有了上⾯的了解之后,笔者就想在通过⼀些代码来加深理解。当然重新写那是不可能的。笔者就⽤上⼀章的代码。并加上订阅报⽂相关的处理。如下
1 private voidonSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) {2
3 if (!ted) {
4 ctx.clo();
5 return;
6 }
7 int messageId =msg.variableHeader().messageId();8
9 List requestSubscriptions =msg.payload().topicSubscriptions();10明星图
11 for(MqttTopicSubscription subscription : requestSubscriptions) {12
13 if(StringUtils.picName())) {14 ctx.clo();15 return;16 }17 }18
19 List grantedQosLevels = new ArrayList();20
21 requestSubscriptions.forEach(subscription ->{22 if (picName().startsWith("$"))
grantedQosLevels.add(MqttQoS.FAILURE.value());23 elgrantedQosLevels.add(subscription.qualityOfService().value());24 });25
26
27 BrokerSessionHelper.ndMessage(28 ctx,wMessage(30 new
MqttFixedHeader(MqttMessageType.SUBACK, fal, MqttQoS.AT_MOST_ONCE, fal, 0),31 MqttMessageIdVariableHeader.from(messageId),32 newMqttSubAckPayload(grantedQosLevels)),33 this.clientId,34 messageId,35 true);36
37 for (int i = 0; i < requestSubscriptions.size(); i++) {38
39 MqttQoS grantedQoS =MqttQoS.(i));40 String topic
=(i).topicName();41
42 //1。查看以前有没有订阅过相同的主题,如果有就替换。43 //2。查看有没有符合的保留信息,有发送44 //读者们⾃⾏去实现。是要⽤redis,还是要⽤sqllite⾃去实现。
45
46 }47 }
订阅报⽂的实现并不难。难就在对于对保留信息的处理。还有就是服务端要对当前的客户端的订阅进⾏保留。那么笔者这边做的事情⽐较简单。主要是为了学习查看相关的报⽂格式。但是笔者还是要列出来⼀下。如下
1.判断是否发⽣过连接。即是连接报⽂的处理。如果没有的话,断开连接。
if (!ted) {
ctx.clo();return;
}
2.获得报⽂的消息ID和相关的订阅主题。判断主题不为空。当然你也可⾃定义主题的验证合法规则。笔者这⾥就不多说了。
int messageId =msg.variableHeader().messageId();
List requestSubscriptions =msg.payload().topicSubscriptions();for(MqttTopicSubscription subscription : requestSubscriptions) {if(StringUtils.picName())) {
ctx.clo();return;
}
}
3.获得相关主题的服务质量要求,⽤于返回码和处理保留的消息。并返回SUBACK报⽂
1 List grantedQosLevels = new ArrayList();2
3 requestSubscriptions.forEach(subscription ->{
4 if (picName().startsWith("$"))
grantedQosLevels.add(MqttQoS.FAILURE.value());5 elgrantedQosLevels.add(subscription.qualityOfService().value());6 });7
8
9 BrokerSessionHelper.ndMessage(10 ctx,wMessage(12 new
MqttFixedHeader(MqttMessageType.SUBACK, fal, MqttQoS.AT_MOST_ONCE, fal, 0),13 MqttMessageIdVariableHeader.from(messageId),14 newMqttSubAckPayload(grantedQosLevels)),15 this.clientId,16 messageId,17 true);
4.处理保留的信息。这⾥笔者并没实现。因为这⾥要接合相关的数据库或是NOSQL。所以这⾥笔者没有去做。因这⾥太多的东西的。⽽且不同的⼈实现和想法也不⼀样⼦。所以笔者就没有列出来。
for (int i = 0; i < requestSubscriptions.size(); i++) {扭扭体操
MqttQoS grantedQoS=MqttQoS.(i));
String (i).topicName();//1。查看以前有没有订阅过相同的主题,如果有就替换。//2。查看有没有符合的保留信息,有发送//读者们⾃⾏去实现。是要⽤redis,还是要⽤sqllite⾃去实现。
}
笔者把相关的抓到的报⽂格列出来。如下
SUBSCRIBE报⽂:
笔者已经把SUBSCRIBE报⽂的各个部分⽤不同的颜⾊标出耿了。其中的黄⾊线表⽰下同主题的长度。就是上⾯微观图⽚⾥⾯的MSB和LSB。其他的也没有什么。 只是要注意最后⼀个值也就是服务质量要求(Requested QoS)。笔者这边是1。所以最后的⼆进制是00000001。
SUBACK 报⽂:
我们可以看到SUBACK 报⽂的消息ID和SUBSCRIBE报⽂的消息是⼀样⼦的。还有就是记得最后的服务质量要求。