MQTT单个订阅消息量过⼤处理
The missing piece between MQTT and a SQL databa in a M2M landscape
Message Queue Telemetry Transport (MQTT) is awesome when it comes to Machine-to-Machine (M2M) Communication. Due to its applied Publish-Subscribe pattern it offers great scalability even with thousands of connected devices.
Classic M2M landscape
The picture above shows a classic M2M landscape with a few publishers and a few subscribers.
Talking from the perspective of a provider of M2M rvices (which you are when you are hosting your own for homeautomation or your applications), you typically have additional needs to generate added value for yourlf or your customer. So let’s say you want to store all MQTT publishes which are broadcasted to the broker for later analysis in a SQL databa.
The concrete u ca
So we want to store every message in a SQL databa in our concrete u ca. Let’s say we want to store them into a
MySQL/MariaDB. The following simple databa scheme will be ud:
Implementation with a wildcard subscriber
The easiest way to achieve the storage is to add an additional client which subscribes to the Wildcard Topic (which happens to be # in MQTT). This ensures that the client receives all messages which are distributed by the broker. The client can now persist the message to the MySQL databa every time a message arrives.
This would look like this:
M2M Landscape with Wildcard Subscriber
We cho to implement the client library with . For brevity only the relevant callback part on message arrival is shown here. The full source code can be found .
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
谁杀了张飞17 18 19 20 ......
private static final String SQL_INSERT = "INSERT INTO `Messages` (`message`,`topic`,`quality_of_rvice`) VALUES (?,?,?)"; ......
@Override
public void messageArrived(MqttTopic topic, MqttMessage message) throws Exception {
//Let's assume we have a prepared statement with the SQL.
try {
statement.tBytes(1, Payload());
statement.tString(2, Name());
statement.tInt(3, Qos());
//Ok, let's persist to the databa
} catch (SQLException e) {
simim<("Error while inrting", e);
}
}
So we esntially just implemented the messageArrived method, which is called every time a new message arrives. Then we just persist it with a plain ol’ JDBC Prepared Statement. That’s all.
Gotchas and Limitations
This approach works well in some scenarios but has some downsides. Some of the challenges we will face with that approach could be:
What happens if the wilcard subscriber disconnects? What happens if it reconnects?
Isn’t the wildcard subscriber some kind of bottleneck?
Do we need different wildcard subscribers when we want to a cond databa?
Is there a way to ensure that each message will be nt only once?
Let’s look into the questions in more detail.老年菜谱100例
What happens on subscriber disconnect or reconnect?
A tough problem is how to handle disconnects of the wildcard subscriber. The problem in a nutshell is, that all messages which are distributed by the broker are never going to be received by the wildcard subscriber if it is disconnected at the moment. In our ca that would mean, that we cannot persist the messages to the databa.
Another challenge are retained messages. Retained messages are messages which are stored at the broker and will be published by the broker when a client subscribes to the topic with the retained message. The challenge here is, that the messages should not be written to our databa in our ca, becau we most likely already received the messages before with a “normal” publish. To avoid this shortcoming, the wildcard subscriber could be implemented with clean ssion = fal, so the broker remembers all subscriptions for the client.
丰年虾Isn’t the wildcard subscriber some kind of bottleneck?
Short answer: Yes, most likely.
Slightly longer answer: It depends. In scenarios with very low message throughput there will be no problem with a wildcard subscriber from a performance perspective. When you are dealing with thousands, tens of thousands or even hundreds of thousands publishing clients, there is a chance that the client library is not able to handle the load or will thwart the system throughput. Another key factor here is, that all messages from the broker to the wildcard subscriber have to go over the network, which can result in unnecessary traffic. It is of cour possible to launch the subscribing client on the same machine as the broker. This solves the traffic problem, but the broker and the subscriber share the same system resources and the messaging overhead is on both applications, which is not optimal. This is even more rious in a clustered broker environment.
Do we need different wildcard subscribers when we want to integrate a cond databa?
It depends on your u ca and your expected message throughput. If for example all your writes to the different databas are blocking, you hit the bottleneck problem probably earlier than with just one integrated databa. To distribute the “databa-load”, it could be a smart idea to have different subscribers for different databas. If your actions are
non-blocking, you could handle this with one wildcard subscriber.
Is there a way to ensure that each message will be only nt once?
This can only be achieved when all publishers publish with the MQTT Quality of Service of 2, which guarantees that each message is delivered exactly once to the broker. The subscriber client can subscribe also with Quality of Service 2 and now it is guaranteed that every message will arrive exactly once on the subscriber. This approach has two problems: It is unlikely that you can assure that all publishers nd with Quality of Service 2 and with Quality of Service 2 it is much harder to scale.
Implementation with HiveMQs Plugin system.
To overcome the problems, we designed the HiveMQ MQTT broker with a powerful plugin system. This plugin system allows one to hook into HiveMQ with custom code to extend the broker with additional functionality to enable deep integration into existing systems or to implement individual u cas in an elegant and simple manner. Let us e how the SQL integration can be solved with the HiveMQ plugin system.
M2M Landscape HiveMQ Plugin System
In this scenario, the plugin system of HiveMQ takes care of persisting the messages. No subscriber (and no publisher) are aware of the persistence mechanism, which esntially solves all the problems we identified. But let us look first how this is implemented:
Implementation
丁佩三级
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32public class MessageStoreCallback implements OnPublishReceivedCallback {
private static final String SQL_INSERT = "INSERT INTO `Messages` (`message`,`topic`,`quality_of_rvice`) VALUES (?,?,?)"; private final BoneCP connectionPool;
酸辣椒@Inject
public MessageStoreCallback(BoneCP connectionPool) {
五上英语
}
@Override
public void onPublishReceived(PUBLISH publish, String clientId) throws OnPublishReceivedException {
try {
final Connection connection = Connection();
final PreparedStatement preparedStatement = connection.prepareStatement(SQL_INSERT);
preparedStatement.tBytes(1, Payload());
preparedStatement.tString(2, Topic());
preparedStatement.tInt(3, QoS().getQosNumber());
preparedStatement.clo();
connection.clo();
} catch (SQLException e) {
throw new OnPublishReceivedException(e, fal); //We do not disconnect the publishing client here
}
}
}
When looking at the code, we can e that this is almost completely the same as we implemented the wildcard subscriber. The slight difference is, that we get much more information about the publish message as before. We can access all attributes a publish message consists of (like retained, duplicate, etc) and we get information about the client which published the message. This enables finer control of what we want to persist. (What about only persisting messages from a specific client?). Additionally, it is possible to disconnect a client when something wrong or illegal was published. This can be achieved with the OnPublishReceivedException.
For better performance we inject a to get the databa connections. Since all plugins can hook into HiveMQ and reu its components via Dependency Injection, optimal testability for plugins is ensured. Of cour it is possible to write plugins without Dependency Injection, however, it is not recommended.
Key benefits
All the problems we identified with Wildcard subscribers are solved with the plugin system:
No messages are lost since the broker takes care of the message handling.
There is no bottleneck. All plugin executions are completely asynchronous and do not thwart the broker.
We can choo if we write different plugins for different u cas (e.g. a cond databa) but we do not need to.
Every plugin execution for a message will only occur once, so we do not have to care about duplicate handling.
The benefits are also true for a clustered HiveMQ environment. With the HiveMQ plugin system we are not only able to write MQTT messages to a MySQL databa in an efficient way, we can also utilize the same mechanism to integrate HiveMQ to an existing software landscape. It is easy to integrate an Enterpri Service Bus (ESB), call REST APIs, integrate your billing system or even publish new MQTT messages on specific message occurrence.
Summary.
We discusd two ways of how to handle the storage of MQTT messages to an existing SQL databa. We discusd the downsides of using wildcard subscriber MQTT clients and why this approach does not scale well. We learned that the HiveMQ plugin system solves the problems and allows you to deeply integrate the HiveMQ broker with existing systems (which happens to be a SQL databa in our example).
More information about the plugin system will follow up soon! Don’t hesitate to if you want to learn more how HiveMQ and its plugin system can help you.
As a final note it is worth mentioning, that a SQL databa can become a bottleneck pretty soon on high message throughput. We recommend using a NoSQL store for such tasks, but this will be discusd in a follow-up blog post.
三年级童话故事300字