SpringBoot结合RedisStream实现消息队列Demo SpringBoot 中使⽤Redis Stream 实现消息监听
Demo环境
JDK8
Maven3.6.3
springboot2.4.3
POM依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="maven.apache/POM/4.0.0" xmlns:xsi="www.w3/2001/XMLSchema-instance"
xsi:schemaLocation="maven.apache/POM/4.0.0 maven.apache/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>vip.huhailong</groupId>
<artifactId>redismq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>redismq</name>
<description>ba redis stream mq</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>感谢有你作文
<groupId>org.springframework.boot</groupId>
飞机简笔
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>禁毒手抄报简单又漂亮
这⾥是⼀个简单的Demo,所以关于redis的⼀些序列化配置就省略了。
配置监听消息类
配置监听消息类,这⾥类需要实现StreamListener接⼝,该接⼝下只有⼀个要实现的⽅法——onMessage⽅法,代码:package distool;
slf4j.Slf4j;
import org.tion.stream.MapRecord;
import org.dis.stream.StreamListener;
import org.springframework.stereotype.Component;
/**
* @author Huhailong
科林斯地峡
* @Description 监听消息
* @Date 2021/3/10.
*/
@Slf4j
@Component
public class ListenerMessage implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> entries) {
log.info("接受到来⾃redis的消息");
System.out.println("message id "+Id());
System.out.println("stream "+Stream());
System.out.println("body "+Value());
}
}
配置完该类后我们再创建⼀个类将该监听器注⼊进去,代码:
package fig;
import lombok.var;
import org.springframework.beans.factory.annotation.Autowired;
import t.annotation.Bean;
打印机开关import t.annotation.Configuration;
import org.tion.RedisConnectionFactory;
import org.tion.stream.Consumer;
import org.tion.stream.ReadOfft;
import org.tion.stream.StreamOfft;
女人鼻子大代表什么
import org.dis.stream.StreamMessageListenerContainer;
import org.dis.stream.Subscription;
import distool.ListenerMessage;
import java.time.Duration;
/**
* @author Huhailong
* @Description
* @Date 2021/3/12.
*/
工程质量检测
@Configuration
public class RedisStreamConfig {
@Autowired
private ListenerMessage streamListener;
@Bean
契诃夫变色龙public Subscription subscription(RedisConnectionFactory factory){
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.build();
var listenerContainer = ate(factory,options);
var subscription = iveAutoAck(Consumer.from("mygroup","huhailong"),
listenerContainer.start();
return subscription;
}
}
代码分析:
1. ⾸先将我们实现了StreamListener的监听器类注⼊。
2. subscription⽅法返回的是⼀个Subscription类型,它是与当前正在运⾏任务的链接,可以理解为订阅的链接,它有俩个⽅法
1. boolean await(Duration timeout): 当订阅变为活动或超时时,同步阻塞呼叫将返回
2. boolean isActive(): 如果当前正在订阅则为true
3. 代码中的var是使⽤了Lombok的可变局部变量。主要是为了⽅便
4. StreamMessageListenerContainer: 消息侦听容器,不能在外部实现。创建后,StreamMessageListenerContainer可以订阅Redis流并
使⽤传⼊的消息。 StreamMessageListenerContainer允许多个流读取请求,并为每个读取请求返回⼀个Subscription句柄。取消订阅最终将终⽌后台轮询。使⽤键和值序列化器转换消息以⽀持各种序列化策略。
5. __StreamMessageListenerContainerOptions: __ 它是上⾯4的选项,代码中pollTimeout表⽰轮询超时时间
6. create⽅法使⽤给定的RedisConnectionFactory和上⾯的配置选项创建侦听器。
7. 接下来使⽤receiveAutoAck创建⼀个新订阅,注意这⾥接受到消息后会被⾃动的确认,如果不想⾃动确认请使⽤其他的创建订阅⽅
式。该⽅法共有三个参数
1. 消费组 consumer group ,它不能为null (Consumer类型)
2. stream offt ,stream的偏移量(StreamOfft 类型)
3. listener 不能为null (StreamListener<K,V> 类型)
代码中表⽰消费者来⾃名称为mygroup的组,消费者名称为huhailong,这⾥偏移量设置为了lastConsumed,它表⽰读取ID⼤于消费者组使⽤的最后⼀个元素的所有新到达的元素。
现在就可以运⾏项⽬来验证了,将项⽬运⾏起来后通过终端给对应key的stream添加⼀条消息
> XADD mystream * message springboot
这时可以看到spring boot的控制台打印除了⼀下消息:
message id 1615532778588-0
stream mystream
body {message=springboot}
说明侦听成功,它会⼀直处于监听状态,只要对应key的stream添加了新的消息都会被侦听到,到此也就简单的实现了消息队列功能。