rabbitmq怎么用的(RabbitMQ 入门教程(PHP) 实现延迟功能)

wufei123 发布于 2023-11-13 阅读(595)

php教程全集

php 使用rabbitmq-delayed-message-exchange插件实现延迟功能1.安装3.6.x下载地址3.7.x下载地址下载后解压,并将其拷贝至(使用Linux Deb游戏ian/RPM部署)rabbitmq服务器目录:

/usr/local/rabbitmq/plugins中( windows安装目录\rabbitmq_server-version\plugins ).游戏2.启用插件使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang

启用插件rabbitmq-plugins enable rabbit游戏mq_delayed_message_exchang输出如下:The following plugins have been enabled: rabbitmq_delayed_mes游戏sage_exchange

通过rabbitmq-plugins list查看已安装列表,如下:[ ] rabbitmq_delayed_message_exchange 20171215-3.6.x3游戏.机制解释安装插件后会生成新的Exchange类型

x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统游戏)表中,检测消息延迟时间,如达到可投递时间时并将其通过

x-delayed-type类型标记的交换机类型投递至目标队列4.php实现过程消费者 delay_consumer2.php: delayed_游戏exchange_test, queueName => delayed_queue_test, routeKey => delayed_route_test, 游戏 ); $connectConfig = array( host => localhost, port => 5672, logi游戏n => guest, password => guest, vhost => / ); //var_dump(extension_l游戏oaded(amqp)); //exit(); try { $conn = new AMQPConnection($connectConfig); 游戏 $conn->connect(); if (!$conn->isConnected()) { //die(Conexiune esuata); 游戏 //TODO 记录日志 echo rabbit-mq 连接错误:, json_encode($connectConfig); exit(); }游戏 $channel = new AMQPChannel($conn); if (!$channel->isConnected()) { // die(C游戏onnection through channel failed); //TODO 记录日志 echo rabbit-mq Connection through cha游戏nnel failed:, json_encode($connectConfig); exit(); } $exchange = new AMQPExc游戏hange($channel); //$exchange->setFlags(AMQP_DURABLE);//声明一个已存在的交换器的,如果不存在将抛出异常,这个一般用在consume游戏端 $exchange->setName($params[exchangeName]); $exchange->setType(x-delayed-message); 游戏//x-delayed-message类型 /*RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。

fanout:把所有发送到该Exchan游戏ge的消息投递到所有与它绑定的队列中 direct:把消息投递到那些binding key与routing key完全匹配的队列中 topic:将消息路由到bind游戏ing key与routing key模式匹配的队列中。

*/ $exchange->setArgument(x-delayed-type,direct); $excha游戏nge->declareExchange(); //$channel->startTransaction(); $queue = new AMQPQueue($ch游戏annel); $queue->setName($params[queueName]); $queue->setFlags(AMQP_DURABLE); 游戏 $queue->declareQueue(); //绑定 $queue->bind($params[exchangeName], $params[routeKey]游戏); } catch(Exception $e) { echo $e->getMessage(); exit(); } 游戏 function callback(AMQPEnvelope $message) { global $queue; if ($message) { $游戏body = $message->getBody(); echo 接收时间:.date("Y-m-d H:i:s", time()). PHP_EOL; echo 接收游戏内容:.$body . PHP_EOL; //为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息 $queue->ack($message->g游戏etDeliveryTag()); } else { echo no message . PHP_EOL; } } /游戏/$queue->consume(callback); 第一种消费方式,但是会阻塞,程序一直会卡在此处 //第二种消费方式,非阻塞 /*$start = time()游戏; while(true) { $message = $queue->get(); if(!empty($message)) 游戏 { echo $message->getBody(); $queue->ack($message->getDeliveryTag()); //应答,代表该消息已经游戏消费 $end = time(); echo

. ($end - $start); exit(); } else { //echo游戏 message not found . PHP_EOL; } }*/ //注意:这里需要注意的是这个方法:$queue->consume,queue对象有两个方法可用于取消游戏息:consume和get。

前者是阻塞的,无消息时会被挂起,适合循环中使用;后者则是非阻塞的,取消息时有则取,无则返回false //就是说用了consume之后,会同步阻塞,该程序常游戏驻内存,不能用nginx,apache调用 $action = 2; if($action == 1){ $queue->consume(callba游戏ck); //第一种消费方式,但是会阻塞,程序一直会卡在此处 }else{ //第二种消费方式,非阻塞 $start = time(); 游戏 while(true) { $message = $queue->get(); if(!empty($message)) { 游戏 echo 接收时间:.date("Y-m-d H:i:s", time()). PHP_EOL; echo 接收内容:.$message->getBody().PHP_EOL;游戏 $queue->ack($message->getDeliveryTag()); //应答,代表该消息已经消费 $end = time(); echo游戏 运行时间:.($end - $start).秒.PHP_EOL; //exit(); } else { //echo 游戏message not found . PHP_EOL; } } }。

生产者delay_publisher2.php: delayed_exchange游戏_test, queueName => delayed_queue_test, routeKey => delayed_route_test, ); 游戏 $connectConfig = array( host => localhost, port => 5672, login => gu游戏est, password => guest, vhost => / ); //var_dump(extension_loaded(a游戏mqp)); 判断是否加载amqp扩展 //exit(); try { $conn = new AMQPConnection($connectConfi游戏g); $conn->connect(); if (!$conn->isConnected()) { //die(Conexiune esuata); 游戏 //TODO 记录日志 echo rabbit-mq 连接错误:, json_encode($connectConfig); exit(); 游戏 } $channel = new AMQPChannel($conn); if (!$channel->isConnected()) { // d游戏ie(Connection through channel failed); //TODO 记录日志 echo rabbit-mq Connection through游戏 channel failed:, json_encode($connectConfig); exit(); } $exchange = new AMQ游戏PExchange($channel); $exchange->setName($params[exchangeName]); $exchange->setType(x游戏-delayed-message); //x-delayed-message类型 /*RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。

f游戏anout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中 direct:把消息投递到那些binding key与routing key完全匹配的队列中 游戏 topic:将消息路由到binding key与routing key模式匹配的队列中。

*/ $exchange->setArgument(x-delayed-type,dire游戏ct); $exchange->declareExchange(); //$channel->startTransaction(); //Rabbit游戏MQ不容许声明2个相同名称、配置不同的Queue,否则报错 $queue = new AMQPQueue($channel); $queue->setName($par游戏ams[queueName]); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); //游戏绑定队列和交换机 $queue->bind($params[exchangeName], $params[routeKey]); //$channel->commitT游戏ransaction(); } catch(Exception $e) { } for($i=5;$i>0;$i--){ //生成消游戏息 echo 发送时间:.date("Y-m-d H:i:s", time()).PHP_EOL; echo i=.$i.,延迟.$i.秒.PHP_EOL; 游戏 $message = json_encode([order_id=>time(),i=>$i]); $exchange->publish($message, $params[ro游戏uteKey], AMQP_NOPARAM, [headers=>[x-delay=> 1000*$i]]); sleep(2); } $conn->d游戏isconnect();。

对于代码来讲,首先对于消费者核心代码$exchange->setType(x-delayed-message); //x-delayed-message类型 游戏$exchange->setArgument(x-delayed-type,direct);

生产者核心代码$exchange = new AMQPExchange($channel); 游戏 $exchange->setName($params[exchangeName]); $exchange->setType(x-delayed-message); //x-delay游戏ed-message类型 $exchange->setArgument(x-delayed-type,direct); $exchange->declareExchan游戏ge();

使用方法:先运行delay_consumer1.php,再运行delay_publisher1.php运行效果:

以上内容希望帮助到大家,更多PHP大厂PDF面试文档,PHP进阶架构视频资料,游戏PHP精彩好文免费获取可以关注公众号:PHP开源社区,或者访问:2021金三银四大厂面试真题集锦,必看!四年精华PHP技术文章整理合集——PHP框架篇

四年精华PHP技术文合集——微服务架构篇四年精华P游戏HP技术文合集——分布式架构篇四年精华PHP技术文合集——高并发场景篇四年精华PHP技术文章整理合集——数据库篇

亲爱的读者们,感谢您花时间阅读本文。如果您对本文有任何疑问或建议,请随时联系我。我非常乐游戏意与您交流。

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。