参考:
http://php-amqplib.github.io/php-amqplib/
https://www.rabbitmq.com/amqp-0-9-1-reference.html#exchange.declare.reserved-1
http://rabbitmq.mr-ping.com/AMQP/AMQP_0-9-1_Model_Explained.html
生产者
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', 'spike'); $channel = $connection->channel(10087); $channel->exchange_declare( 'spike_exchange', // exchange:交换机名称 'topic', // type:交换机类型,常见的如fanout、direct、topic false, // passive false, // durable:设置是否持久化 false, // autoDelete:设置是否自动删除 // argument:其他一些结构化参数,比如alternate-exchange ); $data = "spike's message"; //发送方其实不需要设置队列, 不过对于持久化有关,建议执行该行 $channel->queue_declare( 'spike_queue', // queue false, // passive false, // durable false, // exclusive false, // auto_delete ); $msg = new AMQPMessage(); for ($i=0; $i < 10; $i++) { $msg->setBody($data.'-'.$i); $channel->basic_publish( $msg, // PhpAmqpLib\Message\AMQPMessage $msg 'spike_exchange', //exchange = '', 'spike.message' // routing_key ); echo ' [x] Sent spike.message:', $data.'-'.$i, "\n"; usleep(300 * 1000); } $msg->setBody('spike ok!!'); $channel->basic_publish( $msg, // PhpAmqpLib\Message\AMQPMessage $msg 'spike_exchange', //exchange = '', 'spike.error' // routing_key ); $channel->close(); $connection->close();
消费者
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', 'spike'); $channel = $connection->channel($channel_id=10086); $channel->exchange_declare('spike_exchange', 'topic', false, false, false); list($queue_name, ,) = $channel->queue_declare("spike_queue", false, false, false, false); echo $queue_name; $binding_key = ''; //binding_key 对应 routing_key 可绑定 $channel->queue_bind( 'spike_queue', // queue 'spike_exchange', // exchange 'spike.message' // routing_key ); $channel->queue_bind( 'spike_queue', // queue 'spike_exchange', // exchange 'spike.error' // routing_key ); echo " [*] Waiting... To exit press CTRL+C\n"; $callback = function ($msg) { usleep(100 * 1000); echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n"; $msg->ack(); }; $channel->basic_qos(null, 1, null); $channel->basic_consume( 'spike_queue', // queue 'spike_recveive', // consumer_tag false, // no_local false, // no_ack false, // exclusive false, // nowait $callback //callback ); while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close();