Redisson延时队列原理详解

更新时间:2023-07-16 22:41:38 阅读: 评论:0

Redisson延时队列原理详解
花了⼀天研究了下Redisson 的延时队列,RBlockingQueue ,RDelayedQueue 。 ⽹上没⼀个说清楚的,⽽且都是说轮询redis的zt,都是错误的! 让我来纠正,如果我有错的也可指出。
Demo⽤法
public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException {
Config config = new Config();
config.uSingleServer().tAddress("redis://172.29.2.10:7000");
RedissonClient redisson = ate(config);
RBlockingQueue<String> blockingQueue = BlockingQueue("dest_queue1");
RDelayedQueue<String> delayedQueue = DelayedQueue(blockingQueue);
new Thread() {
public void run() {
while(true) {
try {
//阻塞队列有数据就返回,否则wait
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
}.start();
for(int i=1;i<=5;i++) {
/
/ 向阻塞队列放⼊数据
delayedQueue.offer("fffffffff"+i, 13, TimeUnit.SECONDS);
}
}
上⾯构造了Redisson 阻塞延时队列,然后向⾥⾯塞了5条数据,都是13秒后到期。我们先不启动程序,先打开redis执⾏:
[root@localhost redis-cluster]# redis-cli -c -p 7000 -h 172.29.2.10 --raw
172.29.2.10:7000> monitor
OK
monitor 命令可以监控redis执⾏了哪些命令,注意线上不要乱搞,耗性能的。然后我们启动程序,观察redis执⾏命令情况,这⾥分为三个阶段:
第⼀介段:客户端程序启动,offer⽅法执⾏之前,redis服务会收到如下redis命令:
1610452446.652126 [0 172.29.2.194:65025] "SUBSCRIBE" "redisson_delay_queue_channel:{dest_queue1}"
1610452446.672009 [0 lua] "zrangebyscore" "redisson_delay_queue_timeout:{dest_queue1}" "0" "1610452442403" "limit" "0" "2"
1610452446.672018 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0" "WITHSCORES"
q版恐龙1610452446.673896 [0 172.29.2.194:65034] "BLPOP" "dest_queue1" "0"
SUBSCRIBE
这⾥订阅了⼀个固定的队列 redisson_delay_queue_channel:{dest_queue1}, 就是为了开启进程⾥⾯的延时任务,很重要,redisson 延时取数据都靠它了。后⾯会说。
zrangebyscore
zrangebyscore⽤法扫盲
>> zrangebyscore key min max [WITHSCORES] [LIMIT offt count]
自杀的图片
分页获取指定区间内(min - max),带有分数值(可选)的有序集成员的列表。
redisson_delay_queue_timeout:{dest_queue1} 是⼀个zt,当有延时数据存⼊Redisson队列时,就会在此队列中插⼊ 数据,排序分
数为延时的时间戳。
zrangebyscore就是取出前2条(源码是100条,如下图)过了当前时间的数据。如果取的是0的话就执⾏下⾯的zrange, 这⾥程序刚启
动肯定是0(除⾮是之前的队列数据没有取完)。之所以在刚启动时 这样取数据就是为了把上次进程宕机后没发完的数据发完。
zrange
取出第⼀个数,也就是判断上⾯的还有不有下⼀页。
BLPOP
移出并获取 dest_queue1 列表的第⼀个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为⽌ , 这⾥显然没有元素 ,
就会⼀直阻塞。
第⼆介段:执⾏offer向Redisson队列设置值
这个阶段我们发现redis⼲了下⾯事情:
1610452446.684465 [0 lua] "zadd" "redisson_delay_queue_timeout:{dest_queue1}" "1610452455407" ":\xdf\x0eO\x8c\xa7\xd4C\r\x00\x00\x00\x00\x00\x0 1610452446.684480 [0 lua] "rpush" "redisson_delay_queue:{dest_queue1}" ":\xdf\x0eO\x8c\xa7\xd4C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff1" 1610452446.684492 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0"
1610452446.684498 [0 lua] "publish" "redisson_delay_queue_channel:{dest_queue1}" "1610452455407"
1610452446.687922 [0 lua] "zadd" "redisson_delay_queue_timeout:{dest_queue1}" "1610452455422" "e\xfd\xfe?j?\xdbC\r\x00\x00\x00\x00\x00\x00\x00\x04>\nff 1610452446.687943 [0 lua] "rpush" "redisson_delay_queue:{dest_queue1}" "e\xfd\xfe?j?\xdbC\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff2"
1610452446.687958 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0"
端午节吃粽子
1610452446.690478 [0 lua] "zadd" "redisson_delay_queue_timeout:{dest_queue1}" "1610452455424" "\x80J\x01j\x11\xee\xda\xc3\r\x00\x00\x00\x00\x00\x00\x0 1610452446.690492 [0 lua] "rpush" "redisson_delay_queue:{dest_queue1}" "\x80J\x01j\x11\xee\xda\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff3" 1610452446.690502 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0"
1610452446.692661 [0 lua] "zadd" "redisson_delay_queue_timeout:{dest_queue1}" "1610452455427" "v\xb5\xd0r\xb48\xd4\xc3\r\x00\x00\x00\x00\x00\x00\x00\x 1610452446.692674 [0 lua] "rpush" "redisson_delay_queue:{dest_queue1}" "v\xb5\xd0r\xb48\xd4\xc3\r\x00\x00\x00\x00\x00
\x00\x00\x04>\nfffffffff4" 1610452446.692683 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0"
1610452446.696054 [0 lua] "zadd" "redisson_delay_queue_timeout:{dest_queue1}" "1610452455429" "\xe7\a\x8b\xee\t-\x94C\r\x00\x00\x00\x00\x00\x00\x00\x0 1610452446.696081 [0 lua] "rpush" "redisson_delay_queue:{dest_queue1}" "\xe7\a\x8b\xee\t-\x94C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff5" 1610452446.696098 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0"
我们客户端是设置了5条数据。上⾯也可以看出来。
zadd
往我们zt⾥⾯设置 数据截⽌的时间戳(当前执⾏的时间戳+延时的时间毫秒值),内容为我们的ffffff1 ,不过特殊编码了,加了点什
么,不⽤管。
rpush
同步⼀份数据到list队列,这⾥也不知道⼲嘛的,先放到这⾥。
zrange+publish
取出排序好的第⼀个数据,也就是最临近要触发的数据,然后发送通知 (之前订阅了的客户端,可能是微服务就有多个客户端),内容为
将要触发的时间。客户端收到通知后,就在⾃⼰进程⾥⾯开启延时任务(HashedWheelTimer),到时间后就可以从redis取数据发送。
后⾯⼜是我们的5条循环的设置数据
第三介段:到延时时间取redis数据
1610452459.680953 [0 lua] "zrangebyscore" "redisson_delay_queue_timeout:{dest_queue1}" "0" "1610452455416" "limit" "0" "2"
1610452459.680967 [0 lua] "rpush" "dest_queue1" "\x04>\nfffffffff1"
1610452459.680976 [0 lua] "lrem" "redisson_delay_queue:{dest_queue1}" "1" ":\xdf\x0eO\x8c\xa7\xd4C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff1" 1610452459.680984 [0 lua] "zrem" "redisson_delay_queue_timeout:{dest_queue1}" ":\xdf\x0eO\x8c\xa7\xd4C\r\x00\x00\x00\x00\
x00\x00\x00\x04>\nfffffffff1" 1610452459.680991 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0" "WITHSCORES" // 判断是否有值
1610452459.745813 [0 lua] "zrangebyscore" "redisson_delay_queue_timeout:{dest_queue1}" "0" "1610452455480" "limit" "0" "2"
1610452459.745829 [0 lua] "rpush" "dest_queue1" "\x04>\nfffffffff2"
1610452459.745837 [0 lua] "lrem" "redisson_delay_queue:{dest_queue1}" "1" "e\xfd\xfe?j?\xdbC\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff2" 1610452459.745845 [0 lua] "rpush" "dest_queue1" "\x04>\nfffffffff3"
1610452459.745848 [0 lua] "lrem" "redisson_delay_queue:{dest_queue1}" "1" "\x80J\x01j\x11\xee\xda\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff3" 1610452459.745855 [0 lua] "zrem" "redisson_delay_queue_timeout:{dest_queue1}" "e\xfd\xfe?j?\xdbC\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff2" "\x80J\x01j 1610452459.745864 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0" "WITHSCORES"
1610452459.756909 [0 172.29.2.194:65026] "BLPOP" "dest_queue1" "0"
1610452459.758092 [0 lua] "zrangebyscore" "redisson_delay_queue_timeout:{dest_queue1}" "0" "1610452455493" "limit" "0" "2"
秦振华
1610452459.758108 [0 lua] "rpush" "dest_queue1" "\x04>\nfffffffff4"
1610452459.758114 [0 lua] "lrem" "redisson_delay_queue:{dest_queue1}" "1" "v\xb5\xd0r\xb48\xd4\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff4" 1610452459.758121 [0 lua] "rpush" "dest_queue1" "\x04>\nfffffffff5"
1610452459.758124 [0 lua] "lrem" "redisson_delay_queue:{dest_queue1}" "1" "\xe7\a\x8b\xee\t-\x94C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff5" 1610452459.758133 [0 lua] "zrem" "redisson_delay_queue_timeout:{dest_queue1}" "v\xb5\xd0r\xb48\xd4\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff4" "\xe7 1610452459.758143 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0" "WITHSCORES"
1610452459.759030 [0 172.29.2.194:65037] "BLPOP" "dest_queue1" "0"
1610452459.760933 [0 172.29.2.194:65036] "BLPOP" "dest_queue1" "0"
1610452459.763913 [0 172.29.2.194:65038] "BLPOP" "dest_queue1" "0"
1610452459.765999 [0 172.29.2.194:65039] "BLPOP" "dest_queue1" "0"
这个阶段是由客户端进程⾥⾯的延时任务执⾏的,延时任务是在第⼆阶段构造的,已经说了(通过redis的订阅/发布实现)。
zrangebyscore
取出前2条到时间的数据,第⼀阶段已说。
rpush
将上⾯取到的数据push到阻塞队列,注意我们第⼀阶段已经监听了这个阻塞队列
"BLPOP" "dest_queue1" "0"
所以这⾥就会通知客户端取数据。
lrem + zrem
将取完的数据删掉。
zrange
取zt第⼀个数据,有的话继续上⾯逻辑取数据,否则进⼊下⾯。
BLPOP
继续监听这个阻塞队列。以便下次⽤。
⼩结⼀下
客户端启动,redisson先订阅⼀个key,同时 BLPOP key 0 ⽆限监听⼀个阻塞队列(等⾥⾯有数据了就返回)。
当有数据put时,redisson先把数据放到⼀个zt集合(按延时到期时间的时间戳为分数排序),同时发布上⾯订阅的key,发布内容为数据到期的timeout,此时客户端进程开启⼀个延时任务,延时时间为发布的timeout。
客户端进程的延时任务到了时间执⾏,从zt分页取出过了当前时间的数据,然后将数据rpush到第⼀步的阻塞队列⾥。然后将当前数据从zt移除,取完之后,⼜执⾏ BLPOP key 0 ⽆限监听⼀个阻塞队列。
上⼀步客户端监听的阻塞队列返回取到数据,回调到 RBlockingQueue 的 take⽅法。于是,我们就收到了数据。
村妇女主任⼤致原理就是这样,redisson不是通过轮询zt的,将延时任务执⾏放到进程⾥⾯实现,只有到时间才会取redis zt。
redisson⾥⾯还有很多异常,重试机制 没讲。毕竟时间就⼀天,没法全部吃透。有了这些原理,我相信你也能实现⼀个属于⾃⼰的redisson延时队列了。
redisson延时队列优化
由于我在线上使⽤了redisson延时队列,在数据量⼩的时候表现很佳也很稳定,但我们瞬时流量特别⼤,发⽣了到了延时时间了还给我延时⼗⼏分钟的情况,这个是我万万没想到的。
这个是我测试的情况
当我设置了14511条数据到redisson延时队列时,取出来的时间在本⾝的延时时间上还延时了198636多毫秒,⽽且时间随着数据增加⽽增加。 我们线上是微信⾃动发消息业务,这样会导致你跟你⼥朋友推晚安消息,结果她第⼆天早上收到,然后她就认为你勾搭上了美国的妞,和你分⼿,并暗暗⾃喜到⽼娘早就有外遇了,就盼着这天了。
为了你的性福,于是我想到了⼀个优化的法⼦,构建多个redisson队列,类似cluster模式。需要⾃⼰开发。
温和造句我把⼀个redisson延时队列实例放到了⼀数组⾥⾯,然后put值的时候采⽤轮训的负载均衡模式,put和take都是采⽤线程池,结果收到了很理想的效果,下⾯是测试结果:
10万多条数据,真实延时时间最⼤33399毫秒,已经表现很好了,毕竟我开发环境redis特别垃圾。
要代码的也可以加我qq:657455400 ,最好是⾃⼰实现,也很简单。
兄弟(妹⼦)不要以为就这样江湖再见了,我还要烦你⼀点时间。
上⾯介绍了进程⾥⾯的延时任务都是⼀笔带过,下⾯来讲讲下它的原理。redisson使⽤的是netty⾥⾯的延时任
务 ioty.util.HashedWheelTimer
HashedWheelTimer 实现原理
HashedWheelTimer本质是⼀种类似延迟任务队列的实现,适⽤于对时效性不⾼的,可快速执⾏的,⼤量这样的“⼩”任务,能够做到⾼性能,低消耗
redisson是在这⾥⽤的 tion.MasterSlaveConnectionManager
// 初始化  timer
protected void initTimer(MasterSlaveServersConfig config) {
int[] timeouts = new int[]{RetryInterval(), Timeout()};
Arrays.sort(timeouts);
int minTimeout = timeouts[0];
if (minTimeout % 100 != 0) {
minTimeout = (minTimeout % 100) / 2;二姑娘的生存之道
} el if (minTimeout == 100) {
minTimeout = 50;
} el {
minTimeout = 100;
}
罗技g100// minTimeout 为100
timer = new HashedWheelTimer(new DefaultThreadFactory("redisson-timer"), minTimeout, TimeUnit.MILLISECONDS, 1024, fal);
connectionWatcher = new IdleConnectionWatcher(this, config);
subscribeService = new PublishSubscribeService(this, config);
}
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
try {
wTimeout(task, delay, unit);
} catch (IllegalStateException e) {
if (isShuttingDown()) {
return DUMMY_TIMEOUT;
}
throw e;
}
}

本文发布于:2023-07-16 22:41:38,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/89/1084310.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:延时   数据   队列   时间
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图