导航菜单
首页 > 网络编程 > PHP编程 » 正文

PHP+RabbitMQ实现消息队列的完整代码

后台-系统设置-扩展变量-手机广告位-内容正文顶部
这篇文章主要给大家介绍了关于利用PHP+RabbitMQ实现消息队列的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用PHP具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧

前言

为什么使用RabbitMq而不是ActiveMq或者RocketMq?

首先,从业务上来讲,我并不要求消息的100%接受率,并且,我需要结合php开发,RabbitMq相较RocketMq,延迟较低(微妙级)。至于ActiveMq,貌似问题较多。RabbitMq对各种语言的支持较好,所以选择RabbitMq。

先安装PHP对应的RabbitMQ,这里用的是 php_amqp 不同的扩展实现方式会有细微的差异.

php扩展地址: http://pecl.php.net/package/amqp

具体以官网为准  http://www.rabbitmq.com/getstarted.html

介绍

  • config.php 配置信息
  • BaseMQ.php MQ基类
  • ProductMQ.php 生产者类
  • ConsumerMQ.php 消费者类
  • Consumer2MQ.php 消费者2(可有多个)

config.php

 <?php return [  //配置  'host' => [   'host' => '127.0.0.1',   'port' => '5672',   'login' => 'guest',   'password' => 'guest',   'vhost'=>'/',  ],  //交换机  'exchange'=>'word',  //路由  'routes' => [], ];

BaseMQ.php

 <?php /**  * Created by PhpStorm.  * User: pc  * Date: 2018/12/13  * Time: 14:11  */  namespace MyObjSummary\rabbitMQ;  /** Member  *  AMQPChannel  *  AMQPConnection  *  AMQPEnvelope  *  AMQPExchange  *  AMQPQueue  * Class BaseMQ  * @package MyObjSummary\rabbitMQ  */ class BaseMQ {  /** MQ Channel   * @var \AMQPChannel   */  public $AMQPChannel ;   /** MQ Link   * @var \AMQPConnection   */  public $AMQPConnection ;   /** MQ Envelope   * @var \AMQPEnvelope   */  public $AMQPEnvelope ;   /** MQ Exchange   * @var \AMQPExchange   */  public $AMQPExchange ;   /** MQ Queue   * @var \AMQPQueue   */  public $AMQPQueue ;   /** conf   * @var   */  public $conf ;   /** exchange   * @var   */  public $exchange ;   /** link   * BaseMQ constructor.   * @throws \AMQPConnectionException   */  public function __construct()  {   $conf = require 'config.php' ;   if(!$conf)    throw new \AMQPConnectionException('config error!');   $this->conf  = $conf['host'] ;   $this->exchange = $conf['exchange'] ;   $this->AMQPConnection = new \AMQPConnection($this->conf);   if (!$this->AMQPConnection->connect())    throw new \AMQPConnectionException("Cannot connect to the broker!\n");  }   /**   * close link   */  public function close()  {   $this->AMQPConnection->disconnect();  }   /** Channel   * @return \AMQPChannel   * @throws \AMQPConnectionException   */  public function channel()  {   if(!$this->AMQPChannel) {    $this->AMQPChannel = new \AMQPChannel($this->AMQPConnection);   }   return $this->AMQPChannel;  }   /** Exchange   * @return \AMQPExchange   * @throws \AMQPConnectionException   * @throws \AMQPExchangeException   */  public function exchange()  {   if(!$this->AMQPExchange) {    $this->AMQPExchange = new \AMQPExchange($this->channel());    $this->AMQPExchange->setName($this->exchange);   }   return $this->AMQPExchange ;  }   /** queue   * @return \AMQPQueue   * @throws \AMQPConnectionException   * @throws \AMQPQueueException   */  public function queue()  {   if(!$this->AMQPQueue) {    $this->AMQPQueue = new \AMQPQueue($this->channel());   }   return $this->AMQPQueue ;  }   /** Envelope   * @return \AMQPEnvelope   */  public function envelope()  {   if(!$this->AMQPEnvelope) {    $this->AMQPEnvelope = new \AMQPEnvelope();   }   return $this->AMQPEnvelope;  } }

ProductMQ.php

 <?php //生产者 P namespace MyObjSummary\rabbitMQ; require 'BaseMQ.php'; class ProductMQ extends BaseMQ {  private $routes = ['hello','word']; //路由key   /**   * ProductMQ constructor.   * @throws \AMQPConnectionException   */  public function __construct()  {   parent::__construct();  }   /** 只控制发送成功 不接受消费者是否收到   * @throws \AMQPChannelException   * @throws \AMQPConnectionException   * @throws \AMQPExchangeException   */  public function run()  {   //频道   $channel = $this->channel();   //创建交换机对象   $ex = $this->exchange();   //消息内容   $message = 'product message '.rand(1,99999);   //开始事务   $channel->startTransaction();   $sendEd = true ;   foreach ($this->routes as $route) {    $sendEd = $ex->publish($message, $route) ;    echo "Send Message:".$sendEd."\n";   }   if(!$sendEd) {    $channel->rollbackTransaction();   }   $channel->commitTransaction(); //提交事务   $this->close();   die ;  } } try{  (new ProductMQ())->run(); }catch (\Exception $exception){  var_dump($exception->getMessage()) ; } 
        
收藏此文 赞一个 ( ) 打赏本站

如果本文对你有所帮助请打赏本站

  • 打赏方法如下:
  • 支付宝打赏
    支付宝扫描打赏
    微信打赏
    微信扫描打赏
后台-系统设置-扩展变量-手机广告位-内容正文底部

相关推荐:

留言与评论(共有 0 条评论)
   
验证码:
二维码