首页 > 作文

PHP+RabbitMQ实现消息队列的完整代码

更新时间:2023-04-07 11:35:41 阅读: 评论:0

前言

为什么使用rabbitmq而不是activemq或者rocketmq?

首先,从业务上来讲,我并不要求消息的100%接受率,并且,我需要结合php开发,rabbitmq相较rocketmq,延迟较低(微妙级)。至于activemq,貌似问题较多。rabbitmq对各种语言的支持较好,所以选择rabbitmq。

先安装php对应的rabbitmq,这里用的是 php_amqp 不什么叠翠同的扩展实现方式会有细微的差异.

php扩展地址: http://pecl.php.net/package/amqp

具体以官网为准 /d/file/titlepic/getstarted.html

介绍

config.php 配置信息bamq.php mq基类productmq.php 生产者类consumermq.php 消费者类consumer2mq.php 消费者2(可有多个)

config.php

 <?php return [  //配置  'host' => [   'host' => '127.0.0.1',   'port' => '5672',   'login' => 'guest',   'password' => 'guest',   'vhost'=>'/',  ],  //交换机  'exchange'=>'word',  //路由  'routes' => [], ];

bamq.php

 <?php /**  * created by phpstorm.  * ur: pc  * date: 2018/12/13  * time: 14:11  */  namespace myobjsummary\rabbitmq;  /** member  *  amqpchannel  *  amqpconnection  *  amqpenvelope  *  amqpexchange  *  amqpqueue  * class bamq  * @package myobjsummary\rabbitmq  */ class bamq {  /** mq channel   * @var \amqpchannel   */  public $amqpchannel ;   /** mq link   * @var \amqpconnection   */  public $amqpconnection ;   /** mq envelope   * @var \amqpenvelope   */  public $amqpenvelope ;   /** mq exchange   * @var \amqpexchange   */  public $amqpexchange ;   /** mq queue   * @var \amqpqueue   */  public $amqpqueue ;   /** conf   * @var   */  public $conf ;   /** exchange   * @var   */  public $exchange ;   /** link   * bamq constructor.   * @throws \amqpconnectionexception   */  public function __construct()  {   $conf = require 'config.php' ;   if(!$conf)    throw new \amqpconnectionexception('config err走群众路线心得体会or!');   $this->conf  = $conf['h篮球多少钱ost'] ;   $this->exchange = $conf[乞巧节的风俗'exchange'] ;   $this->amqpconnection = new \amqpconnection($this->conf);   if (!$this->amqpconnection->connect())    throw new \amqpconnectionexception("cannot connect to the broker!\n");  }   /**   * clo link   */  public function clo()  {   $this->amqpconnection->disconnect();  }   /** channel   * @return \amqpchannel   * @throws \amqpconnectionexception   */  public function channel()  {   if(!$this->amqpchannel) {    $this->amqpchannel = new \amqpchannel($this->amqpconnection);   }   return $this->amqpchannel;  }   /** exchange   * @return \amqpexchange   * @throws \amqpconnectionexception   * @throws \amqpexchangeexception   */  public function exchange()  {   if(!$this->amqpexchange) {    $this->amqpexchange = new \amqpexchange($this->channel());    $this->amqpexchange->tname人事档案整理($this->exchange);   }   return $this->amqpexchange ;  }   /** queue   * @return \amqpqueue   * @throws \amqpconnectionexception   * @throws \amqpqueueexception   */  public function queue()  {   if(!$this->amqpqueue) {    $this->amqpqueue = new \amqpqueue($this->channel());   }   return $this->amqpqueue ;  }   /** envelope   * @return \amqpenvelope   */  public function envelope()  {   if(!$this->amqpenvelope) {    $this->amqpenvelope = new \amqpenvelope();   }   return $this->amqpenvelope;  } }

productmq.php

 <?php //生产者 p namespace myobjsummary\rabbitmq; require 'bamq.php'; class productmq extends bamq {  private $routes = ['hello','word']; //路由key   /**   * productmq constructor.   * @throws \amqpconnectionexception   */  public function __construct()  {   parent::__construct();  }   /** 只控制发送成功 不接受消费者是否收到   * @throws \amqpchannelexception   * @throws \amqpconnectionexception   * @throws \amqpexchangeexception   */  public function run()  {   //频道   $channel = $this->channel();   //创建交换机对象   $ex = $this->exchange();   //消息内容   $message = 'product message '.rand(1,99999);   //开始事务   $channel->starttransaction();   $nded = true ;   foreach ($this->routes as $route) {    $nded = $ex->publish($message, $route) ;    echo "nd message:".$nded."\n";   }   if(!$nded) {    $channel->rollbacktransaction();   }   $channel->committransaction(); //提交事务   $this->clo();   die ;  } } try{  (new productmq())->run(); }catch (\exception $exception){  var_dump($exception->getmessage()) ; }

consumermq.php

 <?php //消费者 c namespace myobjsummary\rabbitmq; require 'bamq.php'; class consumermq extends bamq {  private $q_name = 'hello'; //队列名  private $route = 'hello'; //路由key   /**   * consumermq constructor.   * @throws \amqpconnectionexception   */  public function __construct()  {   parent::__construct();  }   /** 接受消息 如果终止 重连时会有消息   * @throws \amqpchannelexception   * @throws \amqpconnectionexception   * @throws \amqpexchangeexception   * @throws \amqpqueueexception   */  public function run()  {    //创建交换机   $ex = $this->exchange();   $ex->ttype(amqp_ex_type_direct); //direct类型   $ex->tflags(amqp_durable); //持久化   //echo "exchange status:".$ex->declare()."\n";    //创建队列   $q = $this->queue();   //var_dump($q->declare());exit();   $q->tname($this->q_name);   $q->tflags(amqp_durable); //持久化   //echo "message total:".$q->declarequeue()."\n";    //绑定交换机与队列,并指定路由键   echo 'queue bind: '.$q->bind($this->exchange, $this->route)."\n";    //阻塞模式接收消息   echo "message:\n";   while(true){    $q->consume(function ($envelope,$queue){     $msg = $envelope->getbody();     echo $msg."\n"; //处理消息     $queue->ack($envelope->getdeliverytag()); //手动发送ack应答    });    //$q->consume('processmessage', amqp_autoack); //自动ack应答   }   $this->clo();  } } try{  (new consumermq)->run(); }catch (\exception $exception){  var_dump($exception->getmessage()) ; }

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对www.887551.com的支持。

本文发布于:2023-04-07 11:35:23,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/zuowen/3f49e1576f0c2df52eb8d216a55e6064.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

本文word下载地址:PHP+RabbitMQ实现消息队列的完整代码.doc

本文 PDF 下载地址:PHP+RabbitMQ实现消息队列的完整代码.pdf

标签:消息   路由   交换机   队列
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图