Mqtt实现发布订阅-简单案例
Mqtt实现发布订阅-简单案例
当你看到这边⽂章是想必你也是刚开始学习mqtt,也许你对于什么是mqtt还不是太了解,建议去把mqtt的(点击跳转)看⼀下,了解清楚后再来学习,效果会更好
正式学习之前还需要搭建⼀个mqtt的服务器,本⽂⽤的是apache-apollo
1.所需要的maven依赖
<dependency>
<groupId&lip.paho</groupId>
<artifactId&lip.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
1.实现⼀个mqtt客户端类
ity;
lip.paho.client.mqttv3.*;
lip.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author
* @date 2020-11-25 14:59
* @Description:
*/
public class MyMqttClient {
private static final Logger LOGGER = Logger(MyMqttClient.class);
private static MqttClient client;
private String host;
private String urname;
private String password;
private String clientId;
private int timeout;
private int keepalive;
public static MqttClient getClient(){
return client;
}
public static void tClient(MqttClient client){
吴哥窟
MyMqttClient.client = client;
}
public MyMqttClient(String host, String urname, String password, String clientId,int timeout,int keepalive){
this.host = host;
this.urname = urname;
this.password = password;
this.clientId = clientId;
this.timeout = timeout;
this.keepalive = keepalive;
}
/**
/**
* 设置连接属性
*
* @param urname
* @param password
* @param timeout
* @param keepalive
* @return
*/
public MqttConnectOptions tMqttConnectOptions(String urname, String password,int timeout,int keepalive){
列提纲的格式
MqttConnectOptions options =new MqttConnectOptions();
//⽤户名
options.tUrName(urname);
//密码
options.CharArray());
// 设置超时时间单位为秒
options.tConnectionTimeout(timeout);
// 设置会话⼼跳时间单位为秒服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个⽅法并没有重连的机制 options.tKeepAliveInterval(keepalive);
// 设置是否清空ssion,这⾥如果设置为fal表⽰服务器会保留客户端的连接记录,设置为true表⽰每次连接到服务器都以新的⾝份连接 options.tCleanSession(fal);
//设置断开后重新连接
options.tAutomaticReconnect(true);
return options;
}
/**
* 连接mqtt服务端,得到MqttClient连接对象
*/
public void connect()throws MqttException {
if(client == null){桂花元宵
//clientid即连接MQTT的客户端ID,⼀般以唯⼀标识符表⽰,MemoryPersistence设置clientid的保存形式,默认为以内存保存
client =new MqttClient(host, clientId,new MemoryPersistence());
client.tCallback(new PushCallback(MyMqttClient.this));
}
MqttConnectOptions mqttConnectOptions =tMqttConnectOptions(urname, password, timeout, keepalive);
if(!client.isConnected()){
}el{
client.disconnect();
}
}
/**
* 发布,默认qos为0,⾮持久化
*
* @param pushMessage
* @param topic
*/
public void publish(String pushMessage, String topic){
publish(pushMessage, topic,0,fal);
}
/**
* 发布消息
*
* @param pushMessage
* @param topic
* @param qos
* @param retained:留存
*/
public void publish(String pushMessage, String topic,int qos,boolean retained){
MqttMessage message =new MqttMessage();
message.Bytes());
message.tQos(qos);
//retained为true表⽰会去取之前还未消费的数据,为fal只取最新接收的消息
message.tRetained(retained);
MqttTopic mqttTopic = Client().getTopic(topic);
MqttTopic mqttTopic = Client().getTopic(topic);
if(null == mqttTopic){
<("topic is not exist");
}
MqttDeliveryToken token;//Delivery:配送
synchronized(this){//注意:这⾥⼀定要同步,否则,在多线程publish的情况下,线程会发⽣死锁
try{
token = mqttTopic.publish(message);//也是发送到执⾏队列中,等待执⾏线程执⾏,将消息发送到消息中间件 token.waitForCompletion(1000L);
}catch(MqttPersistenceException e){
e.printStackTrace();
}catch(MqttException e){
e.printStackTrace();
}
}
}
/**
* 订阅某个主题,qos默认为0
*
* @param topic
*/
public void subscribe(String topic){
subscribe(topic,0);
}
/**
* 订阅某个主题
*
* @param topic
* @param qos 0:最多⼀次、1:最少⼀次、2:只有⼀次
*/
public void subscribe(String topic,int qos){
try{
}catch(MqttException e){
e.printStackTrace();
}
}
吉他谱周杰伦>重视的反义词}
3. mqtt回调类
ity;
lip.paho.client.mqttv3.IMqttDeliveryToken;
lip.paho.client.mqttv3.MqttCallback;
lip.paho.client.mqttv3.MqttException;
lip.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author
* @date 2020-11-25 14:53
* @Description:
*/
public class PushCallback implements MqttCallback {
private static final Logger LOGGER = Logger(PushCallback.class);
private MyMqttClient myMqttClient;
public PushCallback(MyMqttClient myMqttClient){
}
/**
* 连接丢失:⼀般⽤与重连
*
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable){
long reconnectTimes =1;
while(true){
try{
Client().isConnected()){
LOGGER.warn("mqtt reconnect success end");
return;
}
LOGGER.warn("mqtt reconnect times = {} ", reconnectTimes++);
}catch(MqttException e){
<("", e);
}
try{
Thread.sleep(1000);
}catch(InterruptedException e1){
//e1.printStackTrace();
}
}
}
/**
* subscribe后得到的消息会执⾏到这⾥⾯
*
色衰爱弛* @param topic
* @param message
* @throws Exception
*/
@Override
天堂小屋public void messageArrived(String topic, MqttMessage message)throws Exception {
System.out.println("主题:"+ topic +",接收消息内容 : "+new Payload())); }
/**
隔篁竹* (发布)publish后会执⾏到这⾥,发送状态
*
* @param iMqttDeliveryToken
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken){
}
}
4.测试发布、订阅
ller;
ity.MyMqttClient;
import org.junit.Test;
/**
* @author
* @date 2020-11-25 16:04
* @Description:
*/
public class TestMqtt {
/**
* 分级发布与订阅
*
* @throws Exception
*/
String topicName1 ="topic7";
//订阅
@Test
public void testTopic()throws Exception {
MyMqttClient myMqttClient =new MyMqttClient("tcp://127.0.0.1:61613","admin","password","mqttClient2",10,20); t();
myMqttClient.subscribe(topicName1);
while(true);
}
//发布
@Test
public void testTopic2()throws Exception {
MyMqttClient myMqttClient =new MyMqttClient("tcp://127.0.0.1:61613","admin","password","mqttClient4",10,20); t();
int count =10000;
for(int i =0; i < count; i++){
myMqttClient.publish(topicName1 +"发送消息"+i, topicName1,1,true);
}
}
}
本⽂参考了⽹上⼏篇博客,最后亲⾃编写测试通过
参考: