Redisson延迟队列执⾏流程源码分析⼀下
在实际分布式项⽬中延迟任务⼀般不会使⽤JDK⾃带的延迟队列,因为它是基于JVM内存存储,没有持久化操作,所以当服务重启后就会丢失任务。フリフレ
decentralize
在项⽬中可以使⽤MQ死信队列或redisson延迟队列进⾏处理延迟任务,本篇⽂章将讲述redisson延迟队列的使⽤demo和其执⾏源码。
demo⽰例
通过脚⼿架创建⼀个简易springboot项⽬,引⼊redisson的maven依赖,并简单配置redisson连接属性。
<!-- redisson引⽤ -->
<dependency>port什么意思
<groupId&disson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.6</version>
</dependency>
@Configuration
public class RedissonConfig {
@Value("${dis.host}")
private String host;
@Value("${dis.port}")
private String port;
/**
* 获取redissonClient实例
*
* @return
* @throws Exception
*/
@Bean
public RedissonClient getRedisson() {楚门的世界英文影评
Config config = new Config();
String address = "redis://" + host + ":" + port;
config.uSingleServer().tAddress(address);
ate(config);
}
}
复制代码
定义⼀个redisson延迟队列插⼊和获取任务处理类RedissonQueueHandle,通过控制spring的bean加载周期开启独⽴线程获取延迟任务。
这⾥获取延迟任务使⽤了三种⽅法,除了第⼀种阻塞式获取任务⽅法外,其他两种⽅法都不是百分⽐按照延迟参数获取到任务,因为是时间间隔定时循环获取延迟任务。
/**
* redisson延迟队列处理器
*
* @author zrh
*/
@Slf4j
@Component
未完待续英文public class RedissonQueueHandle implements InitializingBean {
private final RBlockingQueue<RedisDataEntity<?>> queue;
private final RDelayedQueue<RedisDataEntity<?>> delayedQueue;
public RedissonQueueHandle (RedissonClient client) {
public RedissonQueueHandle (RedissonClient client) {
this.queue = BlockingQueue("redisson:queue");
this.delayedQueue = DelayedQueue(queue);
}
@Override
public void afterPropertiesSet () {
// 开⼀个线程阻塞式获取任务
thread();
/
/ 使⽤netty时间轮循环获取任务
// watchDog(new HashedWheelTimer());
// 使⽤线程池定时获取任务
// schedule();
}
private void thread () {
new Thread(() -> {
while (true) {
try {
RedisDataEntity entity = queue.take();
log.info("本次获取数据:{},耗时:{}", entity, System.currentTimeMillis() - Time());
} catch (Exception e) {
}
}
}, "zrh").start();
}
private void watchDog (final HashedWheelTimer timer) {
RedisDataEntity entity = queue.poll();
if (null != entity) {
log.info("本次获取数据:{},耗时:{}", entity, System.currentTimeMillis() - Time());
}
watchDog(timer);
}, 3, TimeUnit.SECONDS);
}
private void schedule () {
RedisDataEntity entity = queue.poll();
if (null != entity) {
log.info("本次获取数据:{},耗时:{}", entity, System.currentTimeMillis() - Time());
}
}, 5, 5, TimeUnit.SECONDS);
}
/
**
* 放⼊redis,定时过期
*
* @param entity
*/
lead
public void offer (RedisDataEntity entity) {
秘密花园2try {
delayedQueue.offer(entity, Expire(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
<("放⼊redis延迟队列异常", e);
}
}
}
复制代码
放⼊redisson延迟队列可以是字符串也可以是对象RedisDataEntity,因为有进⾏IO磁盘存储操作,所以必须实现Serializable序列化
接⼝。
/**
* @Author: ZRH
* @Date: 2022/1/10 11:54
*/
@Data
public class RedisDataEntity<T> implements Serializable {
/**
* 数据
*/
private final T data;
/**
* 过期时间(单位:毫秒)
*/
private final Long expire;
/**
* 添加时间
*/
private final Long time;
public RedisDataEntity (T data, Long expire, Long time) { this.data = data;
this.time = time;
}
}
复制代码
然后开⼀个插⼊数据接⼝:
* @Author: ZRH
英语阅读网* @Date: 2022/1/10 11:45
*/
@Slf4j
@RestController
public class IndexController {
private final RedissonQueueHandle redisHandle;
public IndexController (RedissonQueueHandle redisHandle) {
}
@PostMapping("redissonQueue")
public String redissonQueue (@RequestParam String data, @RequestParam Long expire) {
RedisDataEntity entity = new RedisDataEntity(data, expire, System.currentTimeMillis());
log.info("本次添加数据:{}", entity);
redisHandle.offer(entity);
return "ok";
}
}
juice的复数
访问接⼝设置延迟30秒:localhost:8802/redissonQueue?data=a&expire=30000,打印结果如下
2022-01-14 14:21:52.140 INFO 10808 --- [nio-8802-exec-1] c.ller.IndexController : 本次添加数据:RedisDataEntity(data=a, expire=30000, time= 2022-01-14 14:21:52.887 INFO 10808 --- [nio-8802-exec-2] c.ller.IndexController : 本次添加数据:RedisDataEntity(data=a, expire=30000, time= 2022-01-14 14:22:22.240 INFO 10808 --- [ zrh] c.dis.RedissonQueueHandle : 本次获取数据:RedisDataEntity(data=a, expire=30000, time=1 2022-01-14 14:22:22.914 INFO 10808 --- [ zrh] c.dis.RedissonQueueHandle : 本次获取数据:RedisDataEntity(data=a, expire=30000, time=1
复制代码
初始执⾏流程源码解析
redisson延迟队列最终都是和redis服务进⾏交互的,那可以使⽤monitor命令查看redis中执⾏了哪些命令,这样对了解其执⾏流程有很⼤帮助。
上图是项⽬启动时,对redis发送的⼏个指令
"SUBSCRIBE":订阅队列"redisson_delay_queue_channel:{redisson:queue}",⾥⾯有个定时任务通过该队列获取数据
"zrangebyscore":获取"redisson_delay_queue_timeout:{redisson:queue}"集合中排序score值在0到
1642148406748(当前时间戳)内的前100元素
"zrange":获取"redisson_delay_queue_timeout:{redisson:queue}"集合中第⼀个元素,⽤于获取下⼀个元素的到期时间
"BLPOP":取出并移除"redisson:queue"列表⾥的第⼀个元素,如果没有元素就⼀直等待阻塞。所以这⾥会阻塞着
"rpush":如果指令"zrangebyscore"获取到了元素,那就将元素推送到队列redisson:queue内
"lrem":如果指令"zrangebyscore"获取到了元素,那就删除队列"redisson_delay_queue:{redisson:queue}内元素为v的第⼀个
元素
SUBSCRIBE指令
国王的演讲字幕进⼊RedissonDelayedQueue延迟队列的构造函数,⾥⾯就有上述执⾏指令的lua脚本命令(为了不影响篇幅删了⼀部分代码,下
同):
protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String nam super(codec, commandExecutor, name);
// list结构,⽤于延迟队列的订阅发布
channelName = prefixName("redisson_delay_queue_channel", getRawName());
// list结构,存放元素原始顺序
queueName = prefixName("redisson_delay_queue", getRawName());
// zt结构,存放未到期元素,并按照过期时间进⾏排好序
timeoutSetName = prefixName("redisson_delay_queue_timeout", getRawName());
QueueTransferTask task = new ConnectionManager()) {
@Override
protected RFuture<Long> pushTaskAsync() {
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "if #expiredValues > 0 then "
+ "for i, v in ipairs(expiredValues) do "
+ "local randomId, value = struct.unpack('dLc0', v);"
+ "redis.call('rpush', KEYS[1], value);"
+ "redis.call('lrem', KEYS[3], 1, v);"
+ "end; "
+ "redis.call('zrem', KEYS[2], unpack(expiredValues));"
+ "end; "
// get startTime from scheduler queue head task
+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
+ "if v[1] ~= nil then "
+ "return v[2]; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(getRawName(), timeoutSetName, queueName),
System.currentTimeMillis(), 100);
}
@Override
protected RTopic getTopic() {
ateRaw(LongCodec.INSTANCE, commandExecutor, channelName);
}
};
queueTransferService.schedule(queueName, task);
this.queueTransferService = queueTransferService;
}
复制代码
继续跟进queueTransferService.schedule(queueName, task)⽅法,因为第⼀次进⼊tasks集合,所以最后执⾏start()⽅法:
......
private final ConcurrentMap<String, QueueTransferTask> tasks = new ConcurrentHashMap<>();
public synchronized void schedule(String name, QueueTransferTask task) {
QueueTransferTask oldTask = tasks.putIfAbnt(name, task);
if (oldTask == null) {
task.start();
} el {
oldTask.incUsage();
}
}
复制代码
进⼊QueueTransferTask,继续跟进schedulerTopic.addListener(...)⽅法: