RocketMQ延迟消息的使⽤与分析
⽂章⽬录
延迟消息的使⽤
使⽤⽐较简单,指定message的DelayTimeLevel即可。⽰例代码如下:
Message msg =new Message("DelayTopicTest","TagA",("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET));
//设置延迟级别,注意这⾥的3不是代表延迟3s
msg.tDelayTimeLevel(3);
SendResult ndResult = producer.nd(msg);
关于元旦的英语作文⽬前rockatmq⽀持的延迟时间有:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
以上⽀持的延迟时间在msg.tDelayTimeLevel对应的级别依次是1,2,3。。。。
实现原理
延迟队列的核⼼思路是:所有的延迟消息由producer发出之后,都会存放到同⼀个topic(SCHEDULE_TOPIC_XXXX)下,不同的延迟级别会对应不同的队列序号,当延迟时间到之后,由定时线程读取转换为普通的消息存的真实指定的topic下,此时对于consumer端此消息才可见,从⽽被consumer消费。
延迟消息存放的结构如下图所⽰:
consumequeue
├── SCHEDULE_TOPIC_XXXX
│├── 0
││└── 00000000000000000000
│├── 1
││└── 00000000000000000000
│├── 2
││└── 00000000000000000000
│├── 3
││└── 00000000000000000000
│├── 4
simple││└── 00000000000000000000
.....
.....
├── DelayTopicTest
│├── 0
││└── 00000000000000000000
│├── 1
││└── 00000000000000000000
│├── 2
││└── 00000000000000000000
│└── 3
│└── 00000000000000000000
其中不同的延迟级别放在不同的队列序号下(queueId=delayLevel-1)。每⼀个延迟级别对应的延迟消息转换为普通消息的位置标识存放在~/store/config/delayOfft.json⽂件内。
key为对应的延迟级别,value对应不同延迟级别转换为普通消息的offt值。
{
"offtTable":{1:1,2:1,3:11,4:1,5:1,6:1,7:1,8:1,9:1,10:1,11:1,12:1,13:1,14:1,15:0,16:0,17:0,18:0}
}
源码分析
⼊⼝:ScheduleMessageService.start
broker启动的时候会调⽤此⽅法。
public void start(){
//1. 根据⽀持的各种延迟级别,添加不同延迟时间的TimeTask
for(Map.Entry<Integer, Long> entry :Set()){
Integer level = Key();
received什么意思Long timeDelay = Value();
//每⼀个延迟级别对应已经读取为普通消息的offt值
Long offt =(level);
if(null == offt){
offt =0L;
}
if(timeDelay != null){
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offt),1000L);
}
}
//2. 添加⼀个10s执⾏⼀次的TimeTask
this.timer.scheduleAtFixedRate(new TimerTask(){
@Override
public void run(){
try{
venture是什么意思
ScheduleMessageService.this.persist();
}catch(Throwable e){
<("scheduleAtFixedRate flush exception", e);
}
}
},10000,MessageStoreConfig().getFlushDelayOfftInterval());
}
从以上源码基本可以得出⼏个结论:
1、通过jdk⾃带的Timer类开启⼀个timer定时器,在这个timer类添加了多个TimeTask。其中不同的延迟级别都对应
DeliverDelayedMessageTimerTask的不同实例。
2、TimeTask分为两类:DeliverDelayedMessageTimerTask(每秒执⾏1次)和 ScheduleMessageService.this.persist()(每
10秒是执⾏⼀次)
3、每⼀个延迟级别对应⼀个offt,这个offt是⼲嘛的呢?(先抛结论:这个offt的值代表每个级别的延迟队列已经转换为普通
消息的位置)
两类TimeTask的作⽤
1、DeliverDelayedMessageTimerTask
作⽤:扫描延迟消息队列(SCHEDULE_TOPIC_XXXX)的消息,将该延迟消息转换为指定的topic的消息。新托福真题
核⼼代码:uteOnTimeup
出国签证需要多少钱
public void executeOnTimeup(){
/
/读取队列SCHEDULE_TOPIC_XXXX,其中不同的延迟级别对应不同的队列id(queueId=delayLevel-1)
ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(“SCHEDULE_TOPIC_XXXX”,delayLevel2QueueId(d elayLevel));
long failScheduleOfft = offt;
if(cq != null){
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offt);
if(bufferCQ != null){
try{
long nextOfft = offt;
int i =0;
ConsumeQueueExt.CqExtUnit cqExtUnit =new ConsumeQueueExt.CqExtUnit();
//循环读取延迟消息
for(; i < Size(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE){
long offtPy = ByteBuffer().getLong();
nextOfft = offt +(i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown =tDeliverTimestamp(now, tagsCode)- now;
//只有当延迟消息发送的时间在当前时间之前才处理,否则此消息应该延迟后再处理
if(countdown <=0){
//根据offt值读取SCHEDULE_TOPIC_XXXX队列的消息
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOfft(offtPy, sizePy);
if(msgExt != null){
try{
//将读取的消息转换为真实topic的消息(也就是普通消息)
MessageExtBrokerInner msgInner =ssageTimeup(msgExt);
//存放此消息
PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);
}catch(Exception e){
}
}
}el{
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOfft),countdown);
ScheduleMessageService.this.updateOfft(this.delayLevel, nextOfft);
return;
}
}
//计算下⼀次读取延迟队列的offt,是定时任务下⼀次从该位置读取延时消息人琴俱亡翻译
nextOfft = offt +(i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOfft), DELAY_FOR_A_WHILE );
//将下⼀次读取延迟队列的offt存放到⼀个缓存map中
ScheduleMessageService.this.updateOfft(this.delayLevel, nextOfft);
return;
}
}
el{
美国名校公开课long cqMinOfft = cq.getMinOfftInQueue();
if(offt < cqMinOfft){
failScheduleOfft = cqMinOfft;
}
}
}
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,failScheduleOfft), DELAY_FOR_A_WHILE) ;
}
以上贴的代码较长,我做了⼀点精简,这⾥我梳理下⼤概思路:
1、读取不同延迟级别对应的延迟消息;
2、取得对应延迟级别读取的开始位置offt;
3、将延迟消息转换为指定topic的普通消息并存放起来。
4、修改下⼀次读取的offt值(修改的只是缓存),并指定下⼀次转换延迟消息的timetask。
astrazeneca2、ScheduleMessageService.this.persist()
将延迟队列扫描处理的进度offt持久化到delayOfft.json⽂件中。
public synchronized void persist(){
//读取offtTable缓存的延迟队列的值
String jsonString =de(true);
if(jsonString != null){
//读取delayOfft.json的⽂件地址
String fileName =figFilePath();
try{
//持久化到delayOfft.json⽂件中
MixAll.string2File(jsonString, fileName);
}catch(IOException e){
高考机器人
<("persist file "+ fileName +" exception", e);
}
}
}
总结
通过源码分析我们其实明⽩了,延迟消息相⽐普通消息只不过是在broker多了⼀层消息topic的转换,对于消息的发送和消费和普通消息没有什么差异。
但这⾥有⼀点要注意:RocketMQ的延迟消息本⾝有⼀个很⼤的缺点,熟悉java⾃带的Timer类的⼩伙伴应该知道⼀个timer对应只有⼀个线程,然后来处理不同的timeTask,⽽RockerMQ本⾝也确实只new了⼀个Timer,也就是说当同时发送的延迟消息过多的时候⼀个线程处理速度⼀定是有瓶颈的,因此在实际项⽬中使⽤延迟消息⼀定不要过多依赖,只能作为⼀个辅助⼿段。