使用redis的list(列表)命令实现消息队列,生产者使用lpush命令发布消息,消费者使用rpoplpush命令获取消息,同时将消息放入监听队列,如果处理超时,监听者将把消息弹回消息队列
生产者producter.php
<?phptry { //声明消息队列-list的键名 $queuekey = 'testqueuekey'; $redis = new redis(); $redis->connect('ip', 6379); //向列表高中语文题中push10条消息 for ($i = 0;$i < 10;$i++){ //为消息生成唯一标识 $uniqid = uniqid(mt_rand(10000, 99999).getmypid().memory_get_usage(), true); $ret = $redis->lpush($queuekey, json_encode(array('uniqid' => $uniqid, 'key' => 'key-'.$i, 'value' => 'data'))); var_dump($ret); }} catch (exception $e){ echo $e->getmessage();}
消费者consumer.php
<?phptry { //声明消息队列-list的键名 $queuekey = 'testqueuekey'; //声明监听者队列-list的键名 $watchqueuekey = 'watchqueuekey'; $redis = new redis(); $redis->connect('ip', 6379); //队列先进先出,弹出最先加入的消息,同时放入监听队列 while (true){ $ret = $redis->rpoplpush($queuekey, $watchqueuekey); if ($ret === fal){ sleep(1); } el { $retarray = json_decode($ret, true); //将唯一id写入缓存设置有效期 $redis->tex($retarray['uniqid'], 60, 0); //模拟失败 $rand = mt_rand(0,9); if ($rand < 3){ echo "failure:".$ret."\n"; } el { //todo //处理成功移除消息 $redis->lrem($watchqueuekey, $ret, 0); echo "success:".$ret."\n"; } } }} catch (exception $e){ echo $e->getmessage();}
监听者watcher.php
<?phptry { //声明消息队列-list的键名 $queuekey = 'testqueuekey'; //声明监听者队列-list的键名 $watchqueuekey = 'watchqueuekey'; $redis = new redis(); $redis->connect('ip', 6379); while (true){ //取出列表尾部的一个值 $ret = $redis->lindex($watchqueuekey, -1); //如果不存在则休眠1秒 if ($ret === fal){ sleep(1); } el { $retarray = json_decode($ret, true); $idcache = $redis->get($retarray['uniqid']); if ($idcache === fal){ //如果已过期,表示任务超时,弹回原队列 $redis->rpoplpush($watchqueuekey, $queuekey); echo "rpoplpush:".$ret."\n"; } el { //处理中,继续等待 sleep(1); } } }} catch (exception $e){ echo $e->getmessage();}
开启监听者php watcher.php
开启消费者php consumer.php
执行生产者php producter.php
生产者输出
int(1)int(2)int(3)int(4)int(5)int(6)int(7)int(8)int(9)int(10)
监听者输出
rpoplpush:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}rpoplpush:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}rpoplpush:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}rpoplpush:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
消费者输出
success:{"uniqid":"47280267323557445c4bde640dbfb4.78962728","key":"key-0","value":"data"}failure:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}success:{"uniqid":"39394267323642245c4bde640de992.34641654","key":"key-2","value":"data"}success:{"uniqid":"41335267323642245c4bde640df980.38466514","key":"key-3","value":"data"}failure:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}failure:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}success:{"uniqid":"43817267323642245c4bde640ec189.44008738","key":"key-7","value":"data"}success:{"uniqid":"69276267323642245c4bde640ecb91.04877522","key":"ke番禺实验中学y-8","value":"data"}failure:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}success:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}success:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}success:{"uni国内时政qid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}success:{"uniqid":"83293267323642陈阿娇的结局245c4bde640ed753.04622366","read过去分词key":"key-9","value":"data"}success:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
我们看到消费者第一次执行时失败的消息,超时后又被弹回了消息队列,消费者有了再次执行的机会,监听者的职责就是确保消费者执行失败或挂掉后消息还能再弹回原队列得到再次执行
转自:
本文发布于:2023-04-08 12:36:44,感谢您对本站的认可!
本文链接:https://www.wtabcd.cn/fanwen/zuowen/bb9670d0a3045e56635f2acdcac98ff2.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文word下载地址:PHP使用Redis的List(列表)命令实现消息队列.doc
本文 PDF 下载地址:PHP使用Redis的List(列表)命令实现消息队列.pdf
留言与评论(共有 0 条评论) |