RabbitMQ 快速开始

参考:

http://php-amqplib.github.io/php-amqplib/

https://www.rabbitmq.com/amqp-0-9-1-reference.html#exchange.declare.reserved-1

http://rabbitmq.mr-ping.com/AMQP/AMQP_0-9-1_Model_Explained.html

生产者

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;


$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', 'spike');

$channel = $connection->channel(10087);


$channel->exchange_declare(
    'spike_exchange', // exchange:交换机名称
    'topic', // type:交换机类型,常见的如fanout、direct、topic
    false, // passive 
    false, // durable:设置是否持久化
    false, // autoDelete:设置是否自动删除
    // argument:其他一些结构化参数,比如alternate-exchange
);

$data = "spike's message";

//发送方其实不需要设置队列, 不过对于持久化有关,建议执行该行
$channel->queue_declare(
    'spike_queue', // queue 
    false, // passive 
    false, // durable 
    false,  // exclusive 
    false, // auto_delete 
    );

$msg = new AMQPMessage();

for ($i=0; $i < 10; $i++) { 
    $msg->setBody($data.'-'.$i);
    
    $channel->basic_publish(
        $msg, // PhpAmqpLib\Message\AMQPMessage  $msg
        'spike_exchange', //exchange = '',
        'spike.message'  // routing_key
    
    );
    echo ' [x] Sent spike.message:', $data.'-'.$i, "\n";
    usleep(300 * 1000); 
}

$msg->setBody('spike ok!!');
$channel->basic_publish(
    $msg, // PhpAmqpLib\Message\AMQPMessage  $msg
    'spike_exchange', //exchange = '',
    'spike.error'  // routing_key
);

$channel->close();
$connection->close();

消费者

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost',
 5672,
 'guest',
 'guest',
 'spike');
$channel = $connection->channel($channel_id=10086);

$channel->exchange_declare('spike_exchange', 'topic', false, false, false);

list($queue_name, ,) = $channel->queue_declare("spike_queue", false, false, false, false);

echo $queue_name;
$binding_key = '';

//binding_key 对应 routing_key 可绑定
$channel->queue_bind(
    'spike_queue', // queue
    'spike_exchange', // exchange
    'spike.message' // routing_key 
);

$channel->queue_bind(
    'spike_queue', // queue
    'spike_exchange', // exchange
    'spike.error' // routing_key 
);

echo " [*] Waiting... To exit press CTRL+C\n";

$callback = function ($msg) {
    usleep(100 * 1000);
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
    $msg->ack();
};

$channel->basic_qos(null, 1, null);

$channel->basic_consume(
    'spike_queue', // queue
    'spike_recveive', // consumer_tag 
    false, // no_local 
    false,  // no_ack 
    false, // exclusive 
    false, // nowait 
    $callback //callback 
);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

点赞

发表评论

邮箱地址不会被公开。 必填项已用*标注