首页 > 作文

如何通过RabbitMq实现动态定时任务详解

更新时间:2023-04-04 14:28:15 阅读: 评论:0

目录
一、需求背景二、方案思考(1)需求大致分析(2)可尝试的方案三、通过rabbitmq实现延时任务并间接实现动态定时任务。(1)通过死信的方式实现延时信息消费(2)通过mq延时插件实现延时任务(重点)四、mq延时任务插件实现动态定时任务(1)安装延时插件(2)编码实现(3)流程图总结

一、需求背景

定时任务的需求所谓是数不胜数,其中实现方式也是百花齐放,用得最多的大概率为springboot中的 @scheduled(cron = “0 0 1 1 * ?”) 注解,或者是定时任务xxl-job框架,这两者我接触的比较多,除此之外还有,quartz 、elastic-job、但这两个在分布式领域而言,和xxl-jobb比较,xxl-job更为受欢迎。无论是这些框架或者是springboot自带的定时任务组件,基本上都能满足固定定时任务的需求。而我们今天讨论的是动态定时任务的实现。

动态定时任务的需求其实在现实生活中随处可见,如花费到期多久之后发送信息提醒用户?每当我走过老师窗前时间间隔是多少。又或者客户下单之后多久提醒商家发货,提醒的频率又是多少…。这样的需求还有很多。今天我们针对此类需求进行探讨。

二、方案思考

(1)需求大致分析

对于此类需求相比于传统的定时任务无非多了可控性, 其可控性包括了定时任务开始和结束时间的可控性,周期性可控性,只要解决了这两个问题,实际上此类的需求也就迎刃而解了。

(2)可尝试的方案

前面提供的方案只做文字探索性描述。

2.1、 采用重写springboot 的定时框架,从数据库中读取cron表达式来实现可控性周期。

其本质是通过如下线程进行动态定时任务的创建,从而实现对应的周期可控性。

threadpooltaskscheduler threadpooltaskscheduler = new threadpooltaskscheduler();

其具体的细节不再说,其存在的痛点包括了

1 . 需要另外逻辑去实现可控性开始时间和结束时间

2. 此任务开启的入参是corn表达式,需要另外的逻辑将其进行转化,太过于猥琐

2.2、采用时间线程池

时间线程池我忘记叫什么,他是可以指定开始时间,周期时间的,相对而言,比第一种方案来得更为直观,其我考虑到的痛点如下。其实上面那种方案也是有这个痛点的。

多节点,多服务的服务部署情况下,无法实现高可用特性需要编写过多的逻辑来管理任务线程,如果不够谨慎,有可能造成内存浪费。

2.3、采用延时操作

简单言之,实际上只要实现了延时操作 便是实现了动态的开始时间以及周期性运行,可以利用其递归的概念实现所谓的动态周期。

redis 队列来实现延时

redis的体量本身定位就不高,在数据量(任务量)过大时,对redis的压力也很大,redis不一定扛得住。但其实通过redis来实现延时消息这样的成功案列还是有很多的。在这里就不细说了。

rabbitmq实现延时消息。

通过mq实现延时消息是本文的重点,在标题三会细说。

三、通过rabbitmq实现延时任务并间接实现动态定时任务。

(1)通过死信的方式实现延时信息消费

通过创建死信队列来实现延时任务,然后再通过递归思想实现对应的逻辑,就可以实现对应的动态延时任务,但是这个会存在以下下几个痛点。

队列顺序消费

通过死信,我们确实可以动态的控制消息的消费时间,但是消息在队列里面,如果队列里面存在多个信息任务,前一个未到消费时间,后一个已经到了消费时间,这就好导致了,即使后面任务信息消费时间到了,却没法被消费的问题。解决方法,对队列进行排序逻辑,但如果这样做的话,就有点猥琐了。

开销过大。

对于通过死信来实现延时消息,网上有挺多优秀的博客介绍,在此就不做说明了。

(2)通过mq延时插件实现延时任务(重点)

使用延时插件需要mq在3.6以上(实际上我在尝试下载的时候并未发现git上有对应3.6的插件,所以还是选择较高的版本比较好)。

四、mq延时任务插件实现动态定时任务

(1)安装延时插件

这里不做过多说明,重点在于编码的实现,主要步骤如下。

去官网下载对应版本的插件,地址为下载地址

插件名字为rabbitmq_delayed_message_exchange

将插件放到mq插件目录下,然后cmd命令解压网(网上有命令),然后重启mq服务。大概就这样的一减肥心得个过程。

(2)编码实现

创建队列

这里只弄了对应的核心代码,大致就是创建延时交换机,延时队列,以及绑定器,对应的key,value如下

    public static final string delay_exchange = "delay.exchange";    public static final string delay_route_key = "delay.routekey";    public static final string delay_queue = "delay.queue";    /**     * 延时交换机     * @return 延时交换机     */    @bean    public customexchange delayexchange() {        map<string, object> arguments = new hashmap<>(1);        arguments.put("x-delayed-type", "direct");        return new customexchange(delay_exchange,"x-delayed-message",true,fal,arguments);    }    /**     * mq已经安装了延时插件使用,否则得使用延时插件     * @return 延时发送队列。     */    @bean    public queue delayqueue() {        return new queue(delay_queue,true,fal,fal);    }    /**     * 延时绑定区     * @return 延时绑定区     */    @bean    public binding dela小脚丫ybind() {        return bindingbuilder.bind(this.delayqueue()).to(this.delayexchange()).with(delay_route_key).noargs();    }

生产者

这里写得比较随意,也直接使外国人的生活习惯用了lombok,还直接用了 @rvice ,有点草率,主要为了让读者看得清晰。还用了hutool工具类的jsonutil。

可以清晰的看到主方法里面需要传一个integer类型的入参,这个时间我将其转换成了秒,其mq实际入参为毫秒,所以读者不要被误导。入参time通俗的讲就是这个消息多久之后被消费。不需要在乎顺序。

package com.linkyoyo.bill.mq.impl;import cn.hutool.core.util.objectutil;import cn.hutool.json.jsonobject;import cn.hutool.json.jsonutil;import com.linkyoyo.bill.bo.workorderdelaynmailactionbo;import com.linkyoyo.bill.config.rabbitmqconfig;import com.linkyoyo.bill.mq.delaynderrvice;import linterjectionombok.allargsconstructor;import lombok.requiredargsconstructor;import lombok.extern.slf4j.slf4j;import org.springframework.amqp.rabbit.core.rabbittemplate;import org.springframework.scheduling.annotation.async;import org.springframework.stereotype.rvice;/** * 延时发送 * @author 邹 [295006967@qq.com] * @date 2022/1/4 20:33 */@slf4j@requiredargsconstructor@allargsconstructor@rvicepublic class delaynderrviceimpl implements delaynderrvice {    private final rabbittemplate rabbittemplate;    @override    @async    public void ndmessagebydelay(jsonobject message, integer time) {        if(objectutil.isnull(message) || objectutil.isnull(time)) {            return;        }        rabbittemplate.convertandnd(rabbitmqconfig.delay_exchange, rabbitmqconfig.delay_route_key, message, msg -> {            msg.getmessageproperties().theader("x-delay", time * 1000);            return msg;        });        log.info("延时发送成功:延时周期时间{}毫秒,消息内容为{}", time * 1000, message);    }    @override    public void ndmessagebydelay(workorderdelaynmailactionbo actionbo) {        integer aftercond = actionbo.getaftercond();        if(objectutil.isnull(aftercond)) {            aftercond = 0;        }        this.ndmessagebydelay(jsonutil.parobj(actionbo), aftercond);    }}

消费者

消费者的demo不太好写,只是做了一个简单的伪代码。 以定时任务发邮箱为例

1- 消费者线程开始,先执行发邮箱任务

2- 发送完邮箱之后判断是否还需要发邮箱,如果需要,就再通过生产者发送延时邮箱 此时可以指定下一次消费的时间,以此流程走下去便是一套动态任务的流程实现。可以参考后续的流程图。

这样就实现一个简易的定时任务发送邮箱的逻辑

private final delaynderrvice delaynderrvice;    @rabbithandler    @rabbitlistener(queues = rabbitmqconfig.delay_queue)    public void delayconsumer(message message) {        //业务逻辑        this.ndmail(workorderdelaynmailactionbo);        // 判断是否需要递归执行定时任务(实际上就是使用生产者再发一次延时消息,确认下一次消费)        if(需要进行定时任务) {             this.nddelaymessagetomq(workorderdelaynmailactionbo);        }        log.info("信息为:{}", message.getbody());    }

大致流程就这么多了,以下是整套步骤流闭环程图

(3)流程图

总结

到此这篇关于如何通过rabbitmq实现动态定时任务的文章就介绍到这了,更多相关rabbitmq实现动态定时任务内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!

本文发布于:2023-04-04 14:28:14,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/zuowen/105c210cb1d1c4a4854754d97956c78c.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

本文word下载地址:如何通过RabbitMq实现动态定时任务详解.doc

本文 PDF 下载地址:如何通过RabbitMq实现动态定时任务详解.pdf

标签:插件   队列   可控性   时间
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图