由于php不支持多线程,但是作为一个完善的系统,有很多操作都是需要异步完成的。为了完成这些异步操作,我们做了一个基于redis队列任务系统。
大家知道,一个消息队列处理系统主要分经典名言名句为两大部分:消费者和生产者。
在我们的系统中,主系统作为生产者,任务系统作为消费者。
具体的工作流程如下:
1、主系统将需要需要处理的任务名称+任务参数push到队列中。
2、任务系统实时的对任务队列进行pop,pop出来一个任务就fork一个子进程,由子进程完成具体的任务逻辑。
具体代码如下:
1 /** 2 * 启动守护进程 3 */ 4 public function runaction() { 5 tools::log_message('error', 'daemon/run' . ' | action: restart', 'daemon-'); 6 while (true) { 7 $this->fork_process(); 8 } 9 exit;10 }11 12 /**13 * 创建子进程14 */15 private function fork_process() {16 $ppid = getmypid();17 $pid = pcntl_fork();18 if ($pid == 0) {//子进程19 $pid = posix_getpid();20 //echo "* process {$pid} was created \n\n";21 $this->mq_process();22 exit;23 } el {//主进程24 $pid = pcntl_wait($status, wuntraced); //取得子进程结束状态25 if (pcntl_wifexited($status)) {26 //echo "\n\n* sub process: {$pid} exited with {$status}";27 //tools::log_message('info', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid );28 } el {29 tools::log_message('error', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-');30 }31 }32 }33 34 /**35 * 业务任务队列处理36 */37 private function mq_process() {38 $data_pop = $this->masterredis->rpop($this->redis_list_key);39 $data = json_decode($data_pop, 1);40 if (!$data) {41 return fal;42 }43 $worker = '_task_' . $data['worker'];44 $class_name = ist($data['class']) ? $data['class'] : 'taskpromodel';45 $params = $data['params'];46 $class = new $class_name();47 $class->$worker($params);48 return true;49 }
这是一个简单的任务处理系统。
通过这个任务系统帮助我们实现了异步,到目前为止已经稳定运行了将近一年。
但很可惜,它是一个单进程的系统。它是一直在不断的fork,如果有任务就处理,没有任务就跳过。
这样很稳定。
但问题有两个:一是不断地fork、pop会浪费服务器资源,二是不支持并发!
第一个问题还好,但第二个问题就很严重。
当主系统 同时 抛过来大量的任务时,任务的处理时间就会无限的拉长。
为了解决并发的问题,我们计划做一个更加高效强壮的队里处理系统。
因为在php7之前不支持多线程,所以我们采用多进程。
从网上找了不少资料,大多所谓的多进程都是n个进程同时在后台运行。
显然这是不合适的。
我的预想是:每pop出一个任务就fork一个任务,任务执行完成后子进程结束。
这个问题很简单,那就是每fork一个子进程就自增一次。而当子进程执行完成就自减一次。
自增没有问题,我们就在主进程中操作就完了。那么该如何自减呢?
可能你会说,当然是在子进程中啊。但这里你需要注意:当fork的时候是从主进程复制了一份资源给子进程,这就意味着你无法在子进程中操作主进程中的计数器!
所以,这里就需要了解一个知识点:信号。
具体的可以自行google,这里直接看代码。
1 // 关于手机的英语作文install signal handler for dead kids2 pcntl_signal(sigchld, array($this, "sig_handler"));
这就安装了一个信号处理器。当然还缺少一点。
declare(ticks = 1);
declare是一个控制结构语句,具体的用法也请去google。
这句代码的意思就是每执行一条低级语句就调用一次信号处理器。
这样,每当子进程结束的时候就会调用信号处理器,我们就可以在信号处理器中进行自减。
在多进程开发中,如果处理不当就会导致进程残留。
为了解决进程残留,必须得将子进程回收。
1000元能投资什么那么如何对子进程进行回收就是一个技术点了。
在pcntl的demo中,包括很多博文中都是说在主进程中回收子进程。
但我们是基于redis的brpop的,而brpop是阻塞的。
这就导致一个问题:当执行n个任务之后,任务系统空闲的时候主进程是阻塞的,而在发生阻塞的时候子进程还在执行,所以就无法完成最后几个子进程的进程回收。。。
这里本来一直很纠结,但当我将信号处理器搞定之后就也很简单了。
进程回收也放到信号处理器中去。
pcntl是一个进程处理的扩展,但很可惜它对多进程的支持非常乏力。
所以这里采用swoole扩展中的process。
具体代码如下:
1 declare(ticks = 1); 2 class jobdaemoncontroller extends yaf_controller_abstract{ 3 4 u trait_redis; 5 6 private $maxprocess = 800; 7 private $child; 8 private $masterredis; 9 private $redis_task_wing = 'task:wing'; //待处理队列10 11 public function init(){12 // install signal handler for dead kids13 pcntl_signal(sigchld, array($this, "sig_handler"));14 t_time_limit(0);15 ini_t('default_socket_timeout', -1); //队列处理不超时,解决redis报错:read error on connection16 }17 18 private function redis_client(){19 $rds = new redis();20 $rds->connect('redis.master.host',6379);21 return $rds;22 }23 24 public function process(swoole_process $worker){// 第一个处理25 $globals['worker'] = $worker;26 swoole_event_add($worker->pipe, function($pipe) {27 $worker = $globals['worker'];28 $recv = $worker->read(); //nd data to master29 30 sleep(rand(1, 3));31 echo "from master: $recv\n";32 $worker->exit(0);33 });34 exit;35 }36 37 public function testaction(){38 for ($i = 0; $i < 10000; $i++){39 $data = [40 'abc' => $i,41 'timestamp' => time().rand(100,999)42 ];43 $this->masterredis->lpush($this->redis_task_wing, json_encode($data));44 }45 exit;46 }47 48 public function runaction(){49 while (1){50 // echo "\t now we de have $this->child child process\n";51 if ($this->child < $this->maxprocess){52 $rds = $this->redis_client();53 $data_pop = $rds->brpop($this->redis_task_wing, 3);//无任务时,阻塞等待54 if (!$data_pop){55 continue;56 }57 感人的事迹 echo "\t starting new child | now we de have $this->child child process\n";58 $this->child++;59 $process = new swoole_process([$this, 'process']);60 $process->write(json_encode($data_pop));61 $pid = $process->start();62 }63 党员的权利和义务思想汇报 }64 }65 66 private function sig_handler($signo) {67 // echo "recive: $signo \r\n";68 switch ($signo) {69 ca sigchld:70 while($ret = swoole_process::wait(fal)) {71 // echo "pid={$ret['pid']}\n";72 $this->child--;73 }74 }75 }76 }
最终,经过测试,单核1g的服务器执行1到3秒的任务可以做到800的并发。
本文发布于:2023-04-07 22:01:22,感谢您对本站的认可!
本文链接:https://www.wtabcd.cn/fanwen/zuowen/512c549ee595a772e986232fbc5bc020.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文word下载地址:Swoole和Redis实现的并发队列处理系统.doc
本文 PDF 下载地址:Swoole和Redis实现的并发队列处理系统.pdf
留言与评论(共有 0 条评论) |