车联网网关架构MQTT篇(含MQTT消费者源码)

更新时间:2023-05-17 08:51:44 阅读: 评论:0

车联⽹⽹关架构MQTT篇(含MQTT消费者源码)
MQTT简述爬山可以减肥吗
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是⼀种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议。该协议构基于TCP/IP协议之上,由IBM于1999年发布。MQTT最⼤优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为⼀种低开销、低带宽占⽤的即时通讯协议,使其在物联⽹、⼩型设备、移动应⽤等⽅⾯有较⼴泛的应⽤。
这⾥我不对MQTT协议做过多的说明,⽹上有很多优秀的⽂章。
背景
公司致⼒于移动资产(车辆,集装箱等)监控产品研发与制造已有⼗⼏年历程,完成了传统的GPS定位器到货物运输管控智能电⼦锁产品的研发与制造的蜕变。产品通信协议也从传统的TCP/IP,蓝⽛,逐渐的向NB-IOT,Loar,MQTT等新型物联⽹通信协议转变。本⽂就是针对如果在传统设备通信⽹关的基础上完成对MQTT协议的集成,并与现有的平台完成对接。
⽹关架构调整
整个系统架构看起来⽐较乱,但是我们将MQTT拆分出来,并粗化平台⽅⾯的架构,MQTT架构可以简化如下:
其中MQTT Broker我们可以看作RabbitMQ与Kafka消息中间件,因为RabbitMQ与Kafka本⾝⽀持MQTT协议,设备可以直接接⼊,这样以来集成MQTT的设备是不是就变得⾮常简单。我们只需要做好MQTT消费者程序,对设备传输过来的数据进⾏解析,然后根据业务进⾏存储。另外平台或者APP也可以直接订阅MQ⾥⾯的数据,进⾏实时数据展⽰与报警消息推送等。
这⾥多说⼀点,为了使得消费者或者应⽤平台更⽅便的订阅到⾃⼰想要的数据,就需要MQTT设备定义好消息推送的主题与QoS服务质量。MQTT主题与QoS
Topic定义:
设备发布主题:Raw/协议标识(如JT-808)/设备Id
设备订阅主题:Instruction/设备Id
我可以通过订阅主题Raw/协议标识/#或者Raw/协议标识/+来订阅某个协议类型的主题,这样消费者可以通过订阅的主题来匹配协议解析⽅法,减少了消费者解析时的难度。
也可以通过订阅主题Raw/协议标识/设备Id来获取我只关注的某个设备数据。
设备订阅主题平台发送的主题Instruction/设备Id来实现远程指令的交互。
QoS定义:
⾸先我们先了解QoS0,1,2的区别
QoS 0:最多⼀次
“即发即弃”,⼀个消息不会被接收端应答,也不会被发送者存储并再发送。
QoS 1:⾄少⼀次
保证信息将会被⾄少发送⼀次给接受者,但是消息也可能被发送两次甚⾄更多次。
PUBLISH 与PUBACK的关联是通过⽐较数据包中的packet identifier完成的。如果在特定的时间内(timeout)发送端没有收到PUBACK 应答,那么发送者会重新发送PUBLISH消息。如果接受者接收到QoS为1 的消息,它会⽴即处理这⾥消息,⽐如把这个包发送给订阅该主题的接收端,并回复PUBACK包。
The duplicate(DUP)flag,⽤来标记PUBLISH 被重新分发的情况。仅仅是为了内部使⽤的⽬的,并且当QoS 为1 是不会被broker 或者client处理。接受者都会发送PUBACK消息,⽽不管DUP flag。
QoS 2:确保⼀次送达
如果接收端接收到了⼀个QoS 的PUBLISH消息,它将相应地处理PUBLISH消息,并通过PUBREC消息向发送⽅确认。直到他发出⼀个PUBCOMP包为⽌,接收端都保存这个包packet identifier。这⼀点很重要,因为它避免了⼆次处理同⼀个PUBLISH包。 当发送者接收到PUBREC的时候,它可以放弃最开始的publish了,因为它已经知道另⼀端已经接收到消息,他将保存PUBREC并且回复PUBREL。当接收端接收到PUBREL,它就可以丢弃所有该包的存储状态并回复PUBCOMP。当发送端接收到PUBCOMP时也会做同样的处理。当整个流程结束的时候,所有的参与者都确定消息被正确的发送和送达了。⽆论什么时候,⼀个包丢失了,发送端有责任在特定时间后重新发送最后⼀次发送的消息,接收端有责任响应每⼀个指令消息。
特别注意:RabbitMQ暂不⽀持QoS2!如果我们的⽹关中消息中间件使⽤的时RabbitMQ,在定义设备发布时QoS时只能定义为0或者1,因为设备的很多消息(如报警),设备需要得到发送成功的状态,所以设备发布消息时的QoS我们最好选择QoS1。
MQTT消费者实现
1. 启动客户端
坎坷拼音
/// 启动客户端
家庭作业
/// </summary>
public static void Start()
{
try
{
runState = true;盐酸二氧丙嗪颗粒
System.Threading.Thread thread = new System.Threading.Thread(new System.Threading.ThreadStart(DoWork));
thread.Start();
}
catch (Exception ex)
{
Log.Instance.Error("MqttClass.Start():" + ex.Message);
}
}
/// <summary>
/// 初始化MQTTClient
/// </summary>
private static void DoWork()
{
running = true;
Log.Instance.Show("Work >>Begin");
try
{
var factory = new MqttFactory();
mqttClient = factory.CreateMqttClient() as MqttClient;
options = new MqttClientOptionsBuilder()
.WithTcpServer(ServerUrl, Port)
.WithCredentials(UrId, Password)
.WithClientId(Guid.NewGuid().ToString().Substring(0, 23))//在MQTT 3.1版,每个客户ID不能超过23字节
.Build();
mqttClient.ConnectAsync(options);
mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(new Func<MqttClientConnectedEventArgs, Task>(Connected));
mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(new Func<MqttClientDisconnectedEventArgs, Task>(Disconnected));                mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(new Action<MqttApplicationMessageReceive                while (IsRun())
{
System.Threading.Thread.Sleep(100);
}
}
catch (Exception exp)
{
Console.WriteLine(exp);
}
Log.Instance.Show("Work >>End");
running = fal;
runState = fal;
}
其中有⼀个细节值得注意,在MQTT3.1版本,ClientId长度不能超过23字节,所以我上⾯对Guid进⾏了截取。
2.连接服务器并进⾏主题订阅
/// 连接服务器并按标题订阅内容
/// </summary>
/// <param name="e"></param>
/// <returns></returns>
private static async Task Connected(MqttClientConnectedEventArgs e)
{
try
{
描写学习的句子
List<MqttTopicFilter> listTopic = new List<MqttTopicFilter>();
if (listTopic.Count() <= 0)
{
var topicFilterBulder = new TopicFilterBuilder().WithTopic(Topic).Build();                    listTopic.Add(topicFilterBulder);
}
await mqttClient.SubscribeAsync(listTopic.ToArray());
Log.Instance.Show("Connected >>Subscrib:" + Topic + " >>Success");
}
洗衣服catch (Exception ex)
{
Log.Instance.Error("Connected:" + ex.Message);
}
}
3.如果断开则进⾏重连
/// <summary>
/// 失去连接触发事件
/// </summary>
/
// <param name="e"></param>
/// <returns></returns>
private static async Task Disconnected(MqttClientDisconnectedEventArgs e)
{
成长回忆录
try
{
Console.WriteLine("Disconnected >>Disconnected Server");
await Task.Delay(TimeSpan.FromSeconds(5));
米糕怎么做
try
{
await mqttClient.ConnectAsync(options);
}
catch (Exception exp)
{
Console.WriteLine("Disconnected >>Exception " + exp.Message);
}
}
catch (Exception exp)
{
Console.WriteLine(exp.Message);
}
}
4.接收消息触发事件

本文发布于:2023-05-17 08:51:44,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/82/665548.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   设备   主题   发送   订阅   协议   接收端   架构
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图