首页 > 作文

PHP使用Redis的List(列表)命令实现消息队列

更新时间:2023-04-08 12:36:46 阅读: 评论:0

使用redis的list(列表)命令实现消息队列,生产者使用lpush命令发布消息,消费者使用rpoplpush命令获取消息,同时将消息放入监听队列,如果处理超时,监听者将把消息弹回消息队列

1.用到的list(列表)命令

命令作用lpush将一个或多个值插入到列表头部rpoplpush弹出列表最后一个值,同时插入到另一个列表头部,并返回该值lrem删除列表内的给定值lindex按索引获取列表内的值

2.队列的组成

名称职责生产者发布消息消费者获取并处理消息监听者监听超时的消息,弹回原消息队列,确保消费者挂掉后或处理失败后消息能被其他消费者处理

3.php实现代码

生产者producter.php

<?phptry {    //声明消息队列-list的键名    $queuekey = 'testqueuekey';    $redis = new redis();    $redis->connect('ip', 6379);    //向列表高中语文题中push10条消息    for ($i = 0;$i < 10;$i++){        //为消息生成唯一标识        $uniqid = uniqid(mt_rand(10000, 99999).getmypid().memory_get_usage(), true);        $ret = $redis->lpush($queuekey, json_encode(array('uniqid' => $uniqid, 'key' => 'key-'.$i, 'value' => 'data')));        var_dump($ret);    }} catch (exception $e){    echo $e->getmessage();}

消费者consumer.php

<?phptry {    //声明消息队列-list的键名    $queuekey = 'testqueuekey';    //声明监听者队列-list的键名    $watchqueuekey = 'watchqueuekey';    $redis = new redis();    $redis->connect('ip', 6379);    //队列先进先出,弹出最先加入的消息,同时放入监听队列    while (true){        $ret = $redis->rpoplpush($queuekey, $watchqueuekey);        if ($ret === fal){            sleep(1);        } el {            $retarray = json_decode($ret, true);            //将唯一id写入缓存设置有效期            $redis->tex($retarray['uniqid'], 60, 0);            //模拟失败            $rand = mt_rand(0,9);            if ($rand < 3){                echo "failure:".$ret."\n";            } el {                //todo                //处理成功移除消息                $redis->lrem($watchqueuekey, $ret, 0);                echo "success:".$ret."\n";            }        }    }} catch (exception $e){    echo $e->getmessage();}

监听者watcher.php

<?phptry {    //声明消息队列-list的键名    $queuekey = 'testqueuekey';    //声明监听者队列-list的键名    $watchqueuekey = 'watchqueuekey';    $redis = new redis();    $redis->connect('ip', 6379);    while (true){        //取出列表尾部的一个值        $ret = $redis->lindex($watchqueuekey, -1);        //如果不存在则休眠1秒        if ($ret === fal){            sleep(1);        } el {            $retarray = json_decode($ret, true);            $idcache = $redis->get($retarray['uniqid']);            if ($idcache === fal){                //如果已过期,表示任务超时,弹回原队列                $redis->rpoplpush($watchqueuekey, $queuekey);                echo "rpoplpush:".$ret."\n";            } el {                //处理中,继续等待                sleep(1);            }        }    }} catch (exception $e){    echo $e->getmessage();}

4.执行队列

开启监听者php watcher.php
开启消费者php consumer.php
执行生产者php producter.php
生产者输出

int(1)int(2)int(3)int(4)int(5)int(6)int(7)int(8)int(9)int(10)

监听者输出

rpoplpush:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}rpoplpush:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}rpoplpush:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}rpoplpush:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}

消费者输出

success:{"uniqid":"47280267323557445c4bde640dbfb4.78962728","key":"key-0","value":"data"}failure:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}success:{"uniqid":"39394267323642245c4bde640de992.34641654","key":"key-2","value":"data"}success:{"uniqid":"41335267323642245c4bde640df980.38466514","key":"key-3","value":"data"}failure:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}failure:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}success:{"uniqid":"43817267323642245c4bde640ec189.44008738","key":"key-7","value":"data"}success:{"uniqid":"69276267323642245c4bde640ecb91.04877522","key":"ke番禺实验中学y-8","value":"data"}failure:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}success:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}success:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}success:{"uni国内时政qid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}success:{"uniqid":"83293267323642陈阿娇的结局245c4bde640ed753.04622366","read过去分词key":"key-9","value":"data"}success:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}

我们看到消费者第一次执行时失败的消息,超时后又被弹回了消息队列,消费者有了再次执行的机会,监听者的职责就是确保消费者执行失败或挂掉后消息还能再弹回原队列得到再次执行

转自:

本文发布于:2023-04-08 12:36:44,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/zuowen/bb9670d0a3045e56635f2acdcac98ff2.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

本文word下载地址:PHP使用Redis的List(列表)命令实现消息队列.doc

本文 PDF 下载地址:PHP使用Redis的List(列表)命令实现消息队列.pdf

标签:队列   消息   消费者   生产者
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图