springboot整合mqtt
1、引⼊依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2、配置项
mqtt:
rverURIs: tcp://127.0.0.1:1883 #mqtt服务地址
urname: abc
password: abc
keepAliveInterval:3 #⼼跳单位秒
connectionTimeout:5 #连接超时时间单位秒
cleanSession:fal #是否清除ssion
async:true #是否异步发送
producer: #⽣产者配置
defaultQos:1 #消息质量0:最多⼀次传输(可能丢包)1:⾄少⼀次传输,(可能重包)2:保证只有⼀次传输 defaultRetained:fal #是否保留消息
clientId: rver_outbound #⽣产者连接id
defaultTopic: notification #默认订阅
consumer: #消费者配置
defaultTopic: notification #默认订阅
clientId: rver_inbound #消费者连接id
3、MQTT配置类
@Configuration
@Slf4j
public class MqttConfig {
@Value("${mqtt.rverURIs}")
private String rverURIs;
@Value("${mqtt.urname}")
private String urname;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.producer.clientId}")
private String producerClientId;
@Value("${sumer.clientId}")
private String consumerClientId;
/
/⼼跳频率
@Value("${mqtt.keepAliveInterval}")
private int keepAliveInterval;
//超时时间
@Value("${tionTimeout}")
private int connectionTimeout;
//⽣产者默认订阅
@Value("${mqtt.producer.defaultTopic}")
private String producerDefaultTopic;
//消费者默认订阅
@Value("${sumer.defaultTopic}")
private String consumerDefaultTopic;
//默认消息质量
//默认消息质量
@Value("${mqtt.producer.defaultQos}")
private int defaultProducerQos;
//默认订阅
@Value("${mqtt.producer.defaultRetained}")
private boolean defaultRetained;
//接收消息管道
public static final String INBOUND_CHANNEL ="rver_inbound";
/**
* Mqtt配置项
* @return {@lip.paho.client.mqttv3.MqttConnectOptions}
*/
@Bean
public MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions options =new MqttConnectOptions();
// 设置是否清空ssion,这⾥如果设置为fal表⽰服务器会保留客户端的连接记录,
// 这⾥设置为true表⽰每次连接到服务器都以新的⾝份连接
options.tCleanSession(true);
// ⽤户名
options.tUrName(urname);
/
/ 密码
options.CharArray());
// mqtt服务url
options.tServerURIs(StringUtils.split(rverURIs,","));
// 设置超时时间单位为秒
options.tConnectionTimeout(connectionTimeout);
// 设置会话⼼跳时间单位为秒服务器会每隔1.5*keepAliveInterval秒的时间向客户端发送⼼跳判断客户端是否在线,但这个⽅法并没有重连的机制 options.tKeepAliveInterval(keepAliveInterval);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
//options.tWill("willTopic", WILL_DATA, 2, fal);
return options;
}
/**
* MQTT客户端
* @return {@link org.springframework.MqttPahoClientFactory}
*/
@Bean
public MqttPahoClientFactory mqttClientFactory(){
DefaultMqttPahoClientFactory factory =new DefaultMqttPahoClientFactory();
factory.tConnectionOptions(getMqttConnectOptions());
return factory;
}
/
**
* mqtt消息管道(⽣产者)
* @return {@link ssaging.MessageChannel}
*/
@Bean(name = IMqttSender.OUTBOUND_CHANNEL)
public MessageChannel outboundChannel(){
return new DirectChannel();
}
/**
* mqtt消息处理器(⽣产者)
* @return {@link ssaging.MessageHandler}
*/
@Bean
@ServiceActivator(inputChannel = IMqttSender.OUTBOUND_CHANNEL)
public MessageHandler getMqttProducer(){
MqttPahoMessageHandler messageHandler =new MqttPahoMessageHandler(producerClientId,mqttClientFactory());
messageHandler.tAsync(true);
messageHandler.tDefaultTopic(producerDefaultTopic);
//messageHandler.tDefaultRetained(defaultRetained);
messageHandler.tDefaultQos(defaultProducerQos);
return messageHandler;
}
}
/**
* 消费者如果不需要订阅消息,可不⽤配置下⾯
*/
/**
* MQTT信息通道(消费者)
* @return {@link ssaging.MessageChannel}
*/
@Bean(name = INBOUND_CHANNEL)
public MessageChannel mqttInboundChannel(){
return new DirectChannel();
}
/**
* MQTT消息订阅绑定(消费者)
* @return {@link org.MessageProducer}
*/
@Bean
public MessageProducer inbound(){
// 可以同时消费(订阅)多个Topic
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
consumerClientId,mqttClientFactory(),
StringUtils.split(consumerDefaultTopic,","));
adapter.tCompletionTimeout(5000);
adapter.tConverter(new DefaultPahoMessageConverter());
adapter.tQos(1);
// 设置订阅通道
adapter.tOutputChannel(mqttInboundChannel());
return adapter;
}
/**
* MQTT消息处理器(消费者)订阅的消息将会在这⾥打印
*
* @return {@link ssaging.MessageHandler}
*/
@Bean
@ServiceActivator(inputChannel = INBOUND_CHANNEL)
public MessageHandler handler(){
return new MessageHandler(){
@Override
public void handleMessage(Message<?> message)throws MessagingException { log.info("--------------------接收到订阅消息--------------------");
log.info("{}", Payload());
}
};
}
}
4、MQTT消息发送接⼝
@Component
@MessagingGateway(defaultRequestChannel = IMqttSender.OUTBOUND_CHANNEL)
public interface IMqttSender {
//发送消息管道
String OUTBOUND_CHANNEL ="rver_outbound";
/**
* 发送信息到MQTT服务器
* @param data 发送的⽂本
*/
void ndToMqtt(String data);
/**
* 发送信息到MQTT服务器
* @param topic 主题
* @param payload 消息主体
*/
void ndToMqtt(@Header(MqttHeaders.TOPIC) String topic,
String payload);
/**
* 发送信息到MQTT服务器
* @param topic 主题
* @param qos 对消息处理的⼏种机制。<br> 0 表⽰的是订阅者没收到消息不会再次发送,消息会丢失。<br>
* 1 表⽰的是会尝试重试,⼀直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br>
* 2 多了⼀次去重的动作,确保订阅者收到的消息有⼀次。
* @param payload 消息主体
*/
void ndToMqtt(@Header(MqttHeaders.TOPIC) String topic,
@Header(MqttHeaders.QOS)int qos,
String payload);
}
注意事项:@MessagingGateway注解不会被@ComponentScan当做普通的组件扫描。放在Application同级⽬录及⼦⽬录时,会被默认扫描。如果需要将IMqttSender放在Application类的⽬录外,需要在Application类中配置@IntegrationComponentScan进⾏扫描。
5、发送消息到MQTT服务
@ActiveProfiles("test")
@SpringBootTest
public class MqttTest {
@Resource
private IMqttSender mqttSender;
@Test
void contextLoads(){
mqttSender.ndToMqtt("测试发布消息");//默认topic 默认qos
mqttSender.ndToMqtt("testTopic","测试发布消息");//⾃定义topic 默认qos
mqttSender.ndToMqtt("testTopic",1,"测试发布消息");//⾃定义topic ⾃定义qos
}
}