接下来看一下创建队列及接收消息的TP5.1示例:
需要提前启动消费者,绑定交换机与队列,并指定路由键
1.测试多消费者监听同一个事件,打开2个消费者,启动生产者,发现2个消费者可以同时收到消息
2.测试消费者挂掉时,消息是否丢失,停掉一个消费者1,启动生产者,再启动消费者1,发现还可以读取到数据,说明数据没有丢失
/** * 生产者 * @param * @return true * @throws \Exception * @use php think call CrazyBoxService_send */ public static function send(){ Log::record("send start"); Log::save(); $ex_name = 'amq.direct'; $routing_key = 'rk'; $queue = 'q'; //队列需要提前建好 $message = array(); $message['order_id'] = 12345; $host = '172.16.0.133'; $port = 5672; $vhost = "vtest"; $login = "test"; $pass = "test123"; $body = json_encode($message); try { $connection = new \AMQPConnection([ 'host' => $host, 'port' => $port, 'vhost' => $vhost, 'login' => $login, 'password' => $pass, ]); $connection->connect(); $channel = new \AMQPChannel($connection); $exchange = new \AMQPExchange($channel); $exchange->setName($ex_name); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->setFlags(AMQP_DURABLE); //持久化 $exchange->declareExchange(); //发送消息可以不指定队列,不指定队列时,如果没有绑定的队列,消息会丢失,所以需要提前有绑定的队列(队列创建以后,消费者提前启动起来,否则消息会丢失),这种情况所有绑定的队列的消费者都能接收到消息 $rs = $exchange->publish($body, $routing_key); Log::record("send params=".$body." result=".$rs); Log::save(); } catch (\AMQPConnectionException $e) { Log::record($e->getMessage); Log::save(); } $connection->disconnect(); } /** * 消费者测试1 * @return true * @throws \Exception * @use php think call CrazyBoxService_rev1 */ public static function rev1(){ self::rev("q1"); } /** * 消费者测试2 * @return true * @throws \Exception * @use php think call CrazyBoxService_rev2 */ public static function rev2(){ self::rev("q2"); } public static function rev3(){ self::rev("q3"); } /** * 消费者 提前建好q1 q2 2个队列 * @param $queue 队列名字 * @return true * @throws \Exception * @use php think call CrazyBoxService_send */ private static function rev($queue){ Log::record("rev start"); Log::save(); $ex_name = 'amq.direct'; $routing_key = 'rk'; $host = '172.16.0.133'; $port = 5672; $vhost = "vtest"; $login = "test"; $pass = "test123"; try { $connection = new \AMQPConnection([ 'host' => $host, 'port' => $port, 'vhost' => $vhost, 'login' => $login, 'password' => $pass, ]); $connection->connect(); $channel = new \AMQPChannel($connection); $exchange = new \AMQPExchange($channel); $exchange->setName($ex_name); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->setFlags(AMQP_DURABLE); //持久化 $exchange->declareExchange(); $q = new \AMQPQueue($channel); //设置队列名字 如果不存在则添加 $q->setName($queue); // AMQP_DURABLE | AMQP_AUTODELETE $q->setFlags(AMQP_DURABLE); // echo 'Message Total: ' . $q->declareQueue() . "\r\n"; //绑定交换机与队列,并指定路由键 $q->bind($ex_name, $routing_key); Log::record($queue." waiting for msg..."); Log::save(); //阻塞模式接收消息 AMQP_NOPARAM $q->consume(function($envelope, $queue){ Log::record("processMessage start"); Log::save(); $msg = $envelope->getBody(); Log::record("processMessage body=".$msg); Log::save(); // $queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答 return true; }, AMQP_AUTOACK); //自动ACK应答 } catch (\AMQPConnectionException $e) { Log::record($e->getMessage); Log::save(); } $connection->disconnect(); }
接下来看一下创建队列及接收消息的Laravel示例:
创建队列发送消息:
public function mq_send(){ $ex_name = 'amq.direct'; $routing_key = 'EOSETH'; $queue = 'tEOSETH'; $message = array(); $message['order_id'] = 12345; $body = json_encode($message); try { $connection = new \AMQPConnection([ 'host' => env("RABBIT_HOST"), 'port' => env("RABBIT_PORT"), 'vhost' => env("RABBIT_VHOST"), 'login' => env("RABBIT_LOGIN"), 'password' => env("RABBIT_PASS"), ]); $connection->connect(); $channel = new \AMQPChannel($connection); $exchange = new \AMQPExchange($channel); $exchange->setName($ex_name); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->setFlags(AMQP_DURABLE); //持久化 $exchange->declareExchange(); //发送消息可以不指定队列,但是指定队列可以防止队列不存在时创建一个 $q = new \AMQPQueue($channel); //设置队列名字 如果不存在则添加 $q->setName($queue); // AMQP_DURABLE | AMQP_AUTODELETE $q->setFlags(AMQP_DURABLE); // echo 'Message Total: ' . $q->declareQueue() . "\r\n"; //绑定交换机与队列,并指定路由键 $q->bind($ex_name, $routing_key); $rs = $exchange->publish($body, $routing_key); logger("put_order params=".$body." result=".$rs); } catch (\AMQPConnectionException $e) { logger($e->getMessage()); } $connection->disconnect(); }
创建队列消费者:
public function mq_rev(){ $ex_name = 'amq.direct'; $routing_key = 'EOSETH'; $queue = 'tEOSETH'; try { $connection = new \AMQPConnection([ 'host' => env("RABBIT_HOST"), 'port' => env("RABBIT_PORT"), 'vhost' => env("RABBIT_VHOST"), 'login' => env("RABBIT_LOGIN"), 'password' => env("RABBIT_PASS"), ]); $connection->connect(); $channel = new \AMQPChannel($connection); $exchange = new \AMQPExchange($channel); $exchange->setName($ex_name); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->setFlags(AMQP_DURABLE); //持久化 $exchange->declareExchange(); $q = new \AMQPQueue($channel); //设置队列名字 如果不存在则添加 $q->setName($queue); // AMQP_DURABLE | AMQP_AUTODELETE $q->setFlags(AMQP_DURABLE); // echo 'Message Total: ' . $q->declareQueue() . "\r\n"; //绑定交换机与队列,并指定路由键 $q->bind($ex_name, $routing_key); var_dump("Waiting for message…"); // while(TRUE) { //阻塞模式接收消息 AMQP_NOPARAM // $q->consume(array($this,'processMessage')); $q->consume(array($this,'processMessage'), AMQP_AUTOACK); //自动ACK应答 // } } catch (\AMQPConnectionException $e) { logger($e->getMessage()); } $connection->disconnect(); } function processMessage($envelope, $queue) { logger("processMessage start"); $msg = $envelope->getBody(); logger("processMessage body=".$msg); // $queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答 return true; }
交换机既可以由消息发送端创建,也可以由消息消费者创建。
创建一个队列后,需要将队列绑定到交换机上,队列才能工作,routingkey也是在这里指定的。有的资料上写成bindingkey,其实一回事儿,弄两个名词反倒容易混淆。
消息的处理,是有两种方式:
A,一次性。用 $q->get([...]),不管取到取不到消息都会立即返回,一般情况下使用轮询处理消息队列就要用这种方式;
B,阻塞。用 $q->consum( callback, [...] ) 程序会进入持续侦听状态,每收到一个消息就会调用callback指定的函数一次,直到某个callback函数返回FALSE才结束。
关于callback,这里多说几句: PHP的call_back是支持使用数组的,比如: $c = new MyClass(); $c->counter = 100; $q->consume( array($c,'myfunc') ) 这样就可以调用自己写的处理类。MyClass中myfunc的参数定义,与上例中processMessage一样就行。
在上述示例中,使用的$routingkey = '', 意味着接收全部的消息。我们可以将其改为 $routingkey = 'key_1',可以看到结果中仅有设置routingkey为key_1的内容了。
注意: routingkey = 'key_1' 与 routingkey = 'key_2' 是两个不同的队列。假设: client1 与 client2 都连接到 key_1 的队列上,一个消息被client1处理之后,就不会被client2处理。而 routingkey = '' 是另类,client_all绑定到 '' 上,将消息全都处理后,client1和client2上也就没有消息了。
在程序设计上,需要规划好exchange的名称,以及如何使用key区分开不同类型的标记,在消息产生的地方插入发送消息代码。后端处理,可以针对每一个key启动一个或多个client,以提高消息处理的实时性。
《本文》有 0 条评论