php使用消息队列rabbitmq

消息队列

顾名思义,消息队列就是消息传送接收的队列,既然是队列就是先进先出,前面的消息消费或者接收之后,后面的消息才能被接收或者消费。

消息队列的用途

  • 代码解耦
  • 异步通知
  • 流量削峰
  • 日志处理

代码解耦

通过消息队列,将彼此紧密关联的代码拆分开,彼此只需要依赖消息队列即可,不需要处理彼此之间的依赖。

异步通知

所有的消息只要发送到消息队列之后,就认为消息投递成功,至于消息接收者什么时候接收和处理消息,生产者不需要关心。也就是消息被消费了是异步通知到生产者的。

流量削峰

并发的时候,我们为了尽可能提高吞吐量,我们需要将耗时的操作尽量异步操作,比如发送短信,发送邮件,同步通知等内容,我们把他们放到消息队列中之后,我们的应用的吞吐量会大大提升。

日志处理

每个应用每时每刻都在产生大量的日志。而我们可能会有多个应用,面对着海量的日志,我们需要一个可以高吞吐量的存储分析系统,而消息队列就可以帮助我们进行日志处理。

RabbitMQ安装

安装erlang

rabbitmq的安装依赖erlang环境,所以我们需要先安装erlang环境,配置环境变量,当可以在命令行控制台通过erl访问eshell的时候,表明我们的erlang环境安装成功。

安装rabbitmq

在erlang安装之后,我们可以安装rabbitmq,rabbitmq的安装也是比较简单的,我们只需要去官网下载二进制文件包,解压之后,将sbin目录配置到环境变量中即可。当我们通过控制台输入rabbitmq-server可以启动的时候,表明我们的rabbitmq安装配置成功。

PHP配置RabbitMQ依赖

  • 安装socket扩展
  • 使用composer进行rabbitmq依赖安装
composer require "php-amqplib/php-amqplib"

PHP操作rabbitmq的简单示例

生产者

<?php

require_once __DIR__ . "/vendor/autoload.php";

//引入连接和消息类
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//创建socket连接
$connection = new AMQPStreamConnection("localhost", 5672, 'guest', 'guest');

//创建连接频道
$channel = $connection->channel();

//声明队列
$channel->queue_declare('hello', false, false, false, false);

//创建消息
$msg = new AMQPMessage("测试中文~");

//发送消息到队列
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

//关闭连接
$channel->close();
$connection->close();

消费者

<?php

require_once __DIR__ . '/vendor/autoload.php';

//引入连接类
use PhpAmqpLib\Connection\AMQPStreamConnection;

//创建连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

//创建频道
$channel = $connection->channel();

//声明队列
$channel->queue_declare('hello', false, false, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
};

//监听队列,消费消息
$channel->basic_consume('hello', '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}