rabbitmq延迟队列
1. 创建直连交换机 : delay_exchange
2. 创建延迟队列: delay_queue, 延迟绑定键: delay_key
作用: 存放延迟消费的消息, 即自动过期但还没过期的消息
自带的x-message-ttl参数,使得整个队列的消息过期时间相同, 但是为了灵活设置, 改用其他参数方式实现:
参数:
{"x-dead-letter-exchange", "delay_exchange"},
{"x-dead-letter-routing-key", "work_key"}
x-dead-letter-exchange 消息过期后,消息要进入的交换机,俗称死信交换机,
Optional name of an exchange to which messages will be republished if they are rejected or expire.
x-dead-letter-routing-key 消息过期后,进入死信交换机的routing-key, 再根据这个key将消息放入不同的队列
Optional replacement routing key to use when a message is dead-lettered. If this is not set, the message's original routing key will be used.
3. 绑定 直连交换机delay_exchange 与 延迟队列delay_queue 与 延迟delay_key
把新消息都投入到延迟队列内, 消息一过期就自动按照 消息参数 转发到下一流程
4. 创建 临时工作队列: next_queue队列名随机, 工作绑定键: work_key
存放要立即消费执行的任务
5. 绑定 直连交换机delay_exchange 与 临时工作队列next_queue 与 工作绑定键work_key
把过期的消息, 即延迟任务 都存入工作队列
6. 消费掉 临时工作队列 next_queue
投递任务
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 发送
$config = [
'host' => '',
'port' => 5672,
'user' => '',
'password' => '',
'vhost' => '/',
'locale' => 'en_US',
];
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost'],
false, 'AMQPLAIN', null, $config['locale'], 3.0, 3.0, null,
true, 60, 0, null);
$channel = $connection->channel();
$exchange = 'delay_exchange';
$sourceRoutingKey = 'delay_key';
// 直连交换机(direct): 对路由绑定键routing key进行精确匹配,从而确定消息该分发到哪个队列
// 创建 持久化的交换机
$channel->exchange_declare($exchange, 'direct', false, true, false);
$msg = new AMQPMessage(igbinary_serialize(microtime(true)), [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, // 消息持久化2, 默认不持久化1
'expiration' => 3000 // 3秒后触发
]);
$channel->basic_publish($msg, $exchange, $sourceRoutingKey); // 消息发给交换机, 此时交换机缓存中 有大量消息没有被转发给 队列
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();
延迟消费任务
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 接收
$config = [
'host' => '',
'port' => 5672,
'user' => '',
'password'=> '',
'vhost' => '/',
'locale' => 'en_US',
];
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost'],
false, 'AMQPLAIN', null, $config['locale'], 3.0, 3.0, null,
true, 60, 0, null);
$channel = $connection->channel();
$exchange = 'delay_exchange';
$sourceQueue = 'delay_queue';
$sourceRoutingKey = 'delay_key';
$nextRoutingKey = 'work_key';
$nextQueue = 'tmpQueue';
$channel->exchange_declare($exchange, 'direct', false, true, false);
// 延迟队列
$arguments = new \PhpAmqpLib\Wire\AMQPTable();
$arguments->set("x-dead-letter-exchange", $exchange);
$arguments->set("x-dead-letter-routing-key", $nextRoutingKey);
$channel->queue_declare($sourceQueue, false, true, false, false, false, $arguments);
$channel->queue_bind($sourceQueue, $exchange, $sourceRoutingKey); // 把交换机的一手数据传到该延迟队列
// 工作队列
list($nextQueue, ,) = $channel->queue_declare("", false, true, false, false);
$channel->queue_bind($nextQueue, $exchange, $nextRoutingKey); // 把交换机的过期数据传到该临时工作队列,消费之
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function (AMQPMessage $msg) {
$tmp = json_encode(igbinary_unserialize($msg->body),JSON_UNESCAPED_UNICODE);
echo 'time:', microtime(true);
echo ' [x] Received ', $tmp,
', json size:', strlen($tmp),
', igbianry size:',$msg->body_size, "\n";
//因为消息是持久化的,所以要求确认消息,消费者发回ack
// 如果不ack,则该socket连接永远也不会接收到新的msg!!服务端误以为你挂起了 Ready->Unacked->[Ready或acked]
echo 'delivery_tag:', $msg->delivery_info['delivery_tag'],PHP_EOL;
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null); // 基础流量控制, 公平调度, 默认不控制
// 消费端的 no_ack权限 改为 必须false才能使用ACk机制!!!
$channel->basic_consume($nextQueue, '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();