从最开始的使用redis实现的单进程消费的异步任务系统到加入swoole的多进程消费模式,现在,我们的异步任务系统终于又能迈进一步。
因为有了前面两个简单系统的经验,这回基于rabbitmq的异步任务系统设计的的更加完善,包括多进程消费,异常重试等。
消费端架构图
从图中可以看到,我们这个系统是一个基于事件的异步任务系统。就是说当一个事件产生时,生产者将事件抛给调度器,调度器负责查询事件下有哪些任务,然后将这些任务丢到相应的队列中,最后由消费者消费任务队列中的任务。
在整个系统中主要分为三大部分
1.事件生产者,即产生消息事件的一方。
2.任务调度器(scheduler),负责注册事件并调度任务。
3.消费者(worker),负责消费任务队列中的任务。
事件生产者很简单,在业务系统中直上海城市管理学院接调用即可,代码如下。
<?phprequire_once dir.'/../autoload.php';u asynclib\ebats\event;try{ $event = new event('order_paied'); //定义事件 $event->toptions(['order_id' => 'fb138020392193312']); //事件产生的参数 $event->publish();}catch (exception $exc){ echo $exc->getmessage();}
调度器主要做两件事,一是注册事件,另一个是调度任务。
注册事件代码如下:
//注册事件eventmanager::register('order_create', 'cloorder', 'demo', 10);//关闭未付款订单(延迟任务)eventmanager::register('order_paied', 'v财务管理实训报告irtualshipping', 'demo'); //虚拟商品自动发货
这样就注册了两个事件,事件下各有一个任务。
具体调度部分代码很简单,就不多赘述,有兴趣的可以去看代码。
重头戏来了,一个异步任务系统最重要的就是消费端了,现在让我们来看下worker的流程图。
一个完整的消费进程
可以看到,在这里我们采用了两个交换器和两个队列,一个负责处理正常的任务即ntask,另一个负责处理需要延迟执行的任务即dtask。简单描述下一个任务的生命周期。
1、task产生,进入正常任务的交换器exchange[ebats_core_ntask]
2、交换器根据topic将任务分发到对应的队列中
3、子进程ntask阻塞等待成功获取到task,并执行该任务
4、执行失败,需要重试时抛出retryexception,不需要重试时抛出taskexception
5、子进程ntask捕获到重试异常将任务抛给延迟任务的交换器exchange[ebats_core_dtask]
6、将任务执行信息回调给上层开发者以便保存查看
1、子进程dtask阻塞等待成功获取到task,并执行该任务
2、执行失败,需要重试时抛出retryexception,不需要重试时抛出taskexception
3、子进程dtask捕获到重试异常将任务抛给延迟任务的交换器exchange[ebats_core_dtask]
4、将任务执行信息回调给上层开发者以便保存查看
消费者代码如下:
require_once dir.'/../autoload.php';require_once dir.'/task/taskdemomodel.php';us寝室装扮e asynclib春夜洛城闻笛的意思\ebats\worker; //执行结果回调函数$callback = function ($topic, $taskid, $taskname, $params, $timeu, $message){ };$worker = new worker($callback); //支持多进程消费默认为1$worker->tqueue('demo'); //队列名和事件的topic一一对应$worker->run();
一般来说这是一个基于事件的任务系统,那么能不能直接产生任务呢。答案是肯定的。
只需要创建一个自定义调度器,由您自行实现调度逻辑,最终生成一个任务即可。代码如下:
<?phprequire_once dir.'/../autoload.php';u asynclib\ebats\task;u asynclib\core\consumer;u asynclib\amq\exchangetypes;u asynclib\exception\exceptioninterface; /** * 本示例演示了如何创建一个自定义调度器,开发者可以根据自身需求开发自己的任务调度器 */try{ $worker = new consumer(); $worker->texchange('order_fanout', exchangetypes::topic); $worker->tqueue('shzf_order_paied', ['*.*.wait_ller_nd_goods']); $worker->run(function($key, $msg){ $order_data = json_encode($msg); echo " [$key] $order_data \n"; task::create('demo', 'orderasync', $msg);//创建任务,之后消息将作为参数由任务接管处理 })职场秘事;}catch (exceptioninterface $exc){ echo $exc->getmessage();}
这样,当接收到消息时就会产生一个orderasync的任务,您只需要启动一个用来消费这个topic的worker即可。
也许你会觉得这里直接写业务逻辑的代码就可以了,实际上也确实可以。当你可以忍受一个进程慢慢消费的时候是可以这样做的。但大多数情况下我们还是希望它能够尽快的消费掉,所以建议这里只负责创建任务,具体任务的业务逻辑由worker去执行。
以上就是基于rabbitmq和swoole实现的一个完整的异步任务系统的详细内容
视频教学:九年架构师讲解php+swoole+rabbitmq实现异步任务多进程消费
更多内容请访问
怎么从一名码农成为架构师的必看知识点:目录大全(持续更新)50w年薪挑战!
本文发布于:2023-04-08 09:20:23,感谢您对本站的认可!
本文链接:https://www.wtabcd.cn/fanwen/zuowen/cfdd48e158c6e98e8809d0e30238fa4a.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文word下载地址:基于RabbitMQ和Swoole实现的一个完整的异步任务系统.doc
本文 PDF 下载地址:基于RabbitMQ和Swoole实现的一个完整的异步任务系统.pdf
留言与评论(共有 0 条评论) |