mqtt消息机制介绍(以mosquitto为例)
正⽂:
⽂章⽬录
1 序⾔
基于研究多进程设备中,主进程和其他业务进程间,消息分发处理机制 的考虑,记录如下,故⽂章重点在服务端消息机制的处理⽅⾯,mqtt协议本⾝本⾮重点
2 概述
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是⼀种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,是⼀个基于客户端-服务器的消息发布/订阅传输协议。
MQTT最⼤优点在于,以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为⼀种低开销、低带宽占⽤的即时通讯协议,轻量、简单、开放和易于实现的,这些特点使它适⽤范围⾮常⼴泛,在物联⽹、⼩型设备、移动应⽤等⽅⾯有较⼴泛的应⽤。
3 协议介绍
MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种⾝份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
[外链图⽚转存失败,源站可能有防盗链机制,建议将图⽚保存下来直接上传(img-GinBs8T8-1607485570622)
(file:///C:\Urs\256737\AppData\Local\Temp\msohtmlclip1\01\clip_image002.jpg)]
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以理解为消息的内容,是指订阅者具体要使⽤的内容。
MQTT会构建底层⽹络传输:它将建⽴客户端到服务器的连接,提供两者之间的⼀个有序的、⽆损的、基于字节流的双向传输。当应⽤数据通过MQTT⽹络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。
3.1 客户端 Client
⼀个使⽤MQTT协议的应⽤程序或者设备,它总是建⽴到服务器的⽹络连接。客户端可以:
(1)发布其他客户端可能会订阅的信息;
(2)订阅其它客户端发布的消息;
(3)退订或删除应⽤程序的消息;
(4)断开与服务器连接。
3.2 服务端 Broker
MQTT服务器以称为"消息代理"(Broker),可以是⼀个应⽤程序或⼀台设备。它是位于消息发布者和订阅者之间,它可以:
(1)接受来⾃客户的⽹络连接;
(2)接受客户发布的应⽤信息;
(3)处理来⾃客户端的订阅和退订请求;
(4)向订阅的客户转发应⽤程序消息。
3.3 订阅、主题、会话导医台
订阅(Subscription):
包含主题筛选器(Topic Filter)和最⼤服务质量(QoS)。订阅会与⼀个会话(Session)关联。⼀个会话可以包含多个订阅。每⼀个会话中的每个订阅都有⼀个不同的主题筛选器。
会话(Session):
每个客户端与服务器建⽴连接后就是⼀个会话,客户端和服务器之间有状态交互。会话存在于⼀个⽹络之间,也可能在客户端和服务器之间跨越多个连续的⽹络连接。
主题名(Topic Name):
连接到⼀个应⽤程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。
主题筛选器(Topic Filter):
⼀个对主题名通配符筛选器,在订阅表达式中使⽤,表⽰订阅所匹配到的多个主题。
负载(Payload):消息订阅者所具体接收的内容。
3.4 服务质量QoS
MQTT⽀持三种消息发布服务质量(QoS):
⾄多⼀次(QoS==0),消息发布完全依赖底层 TCP/IP ⽹络。会发⽣消息丢失或重复。这⼀级别可⽤于如下情况,环境传感器数据,丢失⼀次读记录⽆所谓,因为不久后还会有第⼆次发送。
⾄少⼀次(QoS==1),确保消息到达,但消息重复可能会发⽣。
只有⼀次(QoS==2),确保消息到达⼀次。这⼀级别可⽤于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。⼩型传输,开销很⼩(固定长度的头部是 2 字节),协议交换最⼩化,以降低⽹络流量。
4 消息机制
以Mosquitto-0.1为例,简要剖析mqtt-broker的消息处理机制。
巴啦啦小魔仙美雪
⼀个mqtt - broker要完成哪些任务?
作为并发服务器,维护多个客户端的TCP链路
处理客户端mqtt connect、disconnect、subscribe、publish、ping等请求
处理消息持久化即消息永久保存问题、处理不同QoS消息
Mosquitto-0.1是怎样完成broker要完成的任务的?
使⽤plect处理客户端并发
使⽤结构mqtt3_context维护每个客户端信息
使⽤sqlite3创建表客户端clients、订阅sbus、消息messages、持久化retain,来实现对客户端-订阅/发布/持久化-消息的管理
4.1 概述
Mosquitto V0.1版本,实现了独⽴、完整的MQTT V3.1协议的服务端(broker)。源码⾏数约3000⾏,使⽤C语⾔编写,.c⽂件13个,broker使⽤其中的10个⽂件。因为mosquitto基于sqlite3,其编译
链接和运⾏,需要libsqlite3.so 。
⽂件名主要函数描述
conf.c mqtt3_config_read读取并解析配置⽂件
context.c mqtt3_context_initmqtt3_context_cleanup 提供mqtt3_context的初始化和清理接⼝。
mqtt3_context结构含socket fd,客户端id,最后⼀次收发时间,保活时间等
参数。
memory.c mqtt3_callocmqtt3_free等提供内存分配和使⽤接⼝
databa.c mqtt3_db_openmqtt3_db_clo_mqtt3_db_tables_create_mqtt3_db_statement_preparemqtt3_db_XXX_inrt
等
提供mqtt相关sqlite3数据库
操作接⼝
net.c mqtt3_socket_listen提供TCP socket接⼝mosquito.c handle_read⼊⼝函数所在⽂件
raw_nd.c mqtt3_raw_publishmqtt3_raw_puback等提供mqtt原始报⽂发送接⼝
氧气raw_nd_client.c mqtt3_raw_connectmqtt3_raw_disconnectmqtt3_raw_subscribemqtt3_raw_unsubscribe 提供客户端mqtt原始报⽂发送接⼝,broker不使⽤该⽂
件
raw_nd_rver.c mqtt3_raw_connack提供connect ack发送接⼝
read_handle.c mqtt3_handle_publishmqtt3_handle_pubackmqtt3_handle_pingreq等提供socket上读⼊数据处理
接⼝
read_handle_client.c mqtt3_handle_connackmqtt3_handle_subackmqtt3_handle_unsuback 提供客户端socket上读⼊数据处理接⼝,broker不使⽤
该⽂件
read_handle_rver.c mqtt3_handle_connectmqtt3_handle_disconnectmqtt3_handle_subscribemqtt3_handle_unsubscribe
提供mqtt
conn/disconn/sub/unsub
命令处理接⼝
util.c mqtt3_command_to_string提供⼯具,未使⽤⽂件名主要函数描述
⽂件调⽤关系
mosquito.c - - > conf.c
mosquito.c - - > read_handle_rver.c - - > context.c
|- - > databa.c
|- - > raw_nd.c
|- - > raw_nd_rver.c
|- - > net.c
mosquito.c - - > read_handle.c
|- - > databa.c
| - - > net.c
郭玉成mosquito.c - - > memory.c
mosquito.c - - > databa.c留学贷款
4.2 数据结构
4.2.1 数据库表
mosquitto启动后共创建5个表:
主要⽤于版本信息的config
CREATE TABLE config ([key]TEXT PRIMARY KEY,value TEXT);
记录客户端信息的clients
CREATE TABLE clients (
sock INTEGER,
id TEXT PRIMARY KEY,
clean_start INTEGER,
will INTEGER,
will_retain INTEGER,
牛筋面
will_qos INTEGER,
will_topic TEXT,
will_message TEXT,
last_mid INTEGER
)
;
订阅信息subs
CREATE TABLE subs ( client_id TEXT, sub TEXT, qos INTEGER);
持久化消息retain
CREATE TABLE retain (
sub TEXT,
qos INTEGER,
payloadlen INTEGER,
payload BLOB
);
客户端消息messages
CREATE TABLE messages (
client_id TEXT,
timestamp INTEGER,
direction INTEGER,
status INTEGER,
mid INTEGER,
香菇炒dup INTEGER,
qos INTEGER,
retain INTEGER,
sub TEXT,
payloadlen INTEGER,
payload BLOB
);
4.2.2 mqtt3_config系统全局配置
CREATE TABLE messages (
client_id TEXT,
timestamp INTEGER,
direction INTEGER,
status INTEGER,
mid INTEGER,
dup INTEGER,
qos INTEGER,
retain INTEGER,
sub TEXT,
payloadlen INTEGER,
payload BLOB
);
4.2.3 mqtt3_context客户端上下⽂
typedef struct _mqtt3_context{
int sock;
time_t last_msg_in;
time_t last_msg_out;
uint16_t keepalive;
bool clean_start;
char*id;
} mqtt3_context;
**mqtt3_context****⽅法:**mqtt3_context_init:初始化,mqtt3_context_cleanup:关闭socket;如果clean_start置上,则还需要删除subs、messages、clients中与对应客户端有关的⾏
4.2.4 mqtt3_msg_status消息状态
typedef enum{
ms_invalid =0,
ms_publish =1,
ms_publish_puback =2,
ms_wait_puback =3,
ms_publish_pubrec =4,
ms_wait_pubrec =5,
ms_rend_pubrel =6,
ms_wait_pubrel =7,
ms_rend_pubcomp =8,
ms_wait_pubcomp =9
} mqtt3_msg_status;
4.3 算法和处理流程
4.3.1 系统初始化
mqtt3_config_read
读取系统配置⽂件,完成mqtt3_config的初始化(未提供配置时使⽤默认配置,例如默认监听端⼝1883)
mqtt3_db_open
1. 根据系统配置打开/创建数据库,并调⽤_mqtt3_db_tables_create使⽤CREATE TABLE IF NOT EXISTS的⽅式创建5个表。
2. 调⽤_mqtt3_db_invalidate_sockets初始化客户端socket: UPDATE clients SET sock=-1
mqtt3_socket_listen
开启TCP监听(默认端⼝1883)。歌声嘹亮
系统初始化完成后,进⼊处理循环。
4.3.2 Broker socket新连接处理
收到新的TCP connect之后,accept并且调⽤mqtt3_context_init进⾏context初始化
4.3.3 Broker socket 读操作及处理
TCP socket读数据在handle_read()中根据mqtt报⽂类型进⾏处理。Broker可能收到的mqtt报⽂有connect、publish、puback、pubrec、pubrel、pubcomp、subscribe、unsubscribe、ping、disconnect共10种(不含错误的报⽂)。
1) mqtt connect处理(ack)
【协议定义】客户端的connect应该回复ack
【处理】mqtt3_handle_connect为处理客户端mqtt connect的函数,最重要的是两个任务:
a\设置context id域;
b\将(发起mqtt connect的)客户端信息插⼊表clients。
context->id = client_id;
mqtt3_db_client_inrt(context, will, will_retain, will_qos, will_topic, ill_message);
【⽰例】查看sqlite3数据库clients表
2) mqtt disconnect处理(no ack)
【协议定义】disconnect消息:e0 00,不需要应答。
【处理】 mqtt3_handle_disconnect调⽤mqtt3_socket_clo