参考:
http://php-amqplib.github.io/php-amqplib/
https://www.rabbitmq.com/amqp-0-9-1-reference.html#exchange.declare.reserved-1
http://rabbitmq.mr-ping.com/AMQP/AMQP_0-9-1_Model_Explained.html
生产者
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', 'spike');
$channel = $connection->channel(10087);
$channel->exchange_declare(
'spike_exchange', // exchange:交换机名称
'topic', // type:交换机类型,常见的如fanout、direct、topic
false, // passive
false, // durable:设置是否持久化
false, // autoDelete:设置是否自动删除
// argument:其他一些结构化参数,比如alternate-exchange
);
$data = "spike's message";
//发送方其实不需要设置队列, 不过对于持久化有关,建议执行该行
$channel->queue_declare(
'spike_queue', // queue
false, // passive
false, // durable
false, // exclusive
false, // auto_delete
);
$msg = new AMQPMessage();
for ($i=0; $i < 10; $i++) {
$msg->setBody($data.'-'.$i);
$channel->basic_publish(
$msg, // PhpAmqpLib\Message\AMQPMessage $msg
'spike_exchange', //exchange = '',
'spike.message' // routing_key
);
echo ' [x] Sent spike.message:', $data.'-'.$i, "\n";
usleep(300 * 1000);
}
$msg->setBody('spike ok!!');
$channel->basic_publish(
$msg, // PhpAmqpLib\Message\AMQPMessage $msg
'spike_exchange', //exchange = '',
'spike.error' // routing_key
);
$channel->close();
$connection->close();
消费者
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost',
5672,
'guest',
'guest',
'spike');
$channel = $connection->channel($channel_id=10086);
$channel->exchange_declare('spike_exchange', 'topic', false, false, false);
list($queue_name, ,) = $channel->queue_declare("spike_queue", false, false, false, false);
echo $queue_name;
$binding_key = '';
//binding_key 对应 routing_key 可绑定
$channel->queue_bind(
'spike_queue', // queue
'spike_exchange', // exchange
'spike.message' // routing_key
);
$channel->queue_bind(
'spike_queue', // queue
'spike_exchange', // exchange
'spike.error' // routing_key
);
echo " [*] Waiting... To exit press CTRL+C\n";
$callback = function ($msg) {
usleep(100 * 1000);
echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
$msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume(
'spike_queue', // queue
'spike_recveive', // consumer_tag
false, // no_local
false, // no_ack
false, // exclusive
false, // nowait
$callback //callback
);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();