1. 创建直连交换机 : delay_exchange
2. 创建延迟队列: delay_queue, 延迟绑定键: delay_key

作用: 存放延迟消费的消息, 即自动过期但还没过期的消息

自带的x-message-ttl参数,使得整个队列的消息过期时间相同, 但是为了灵活设置, 改用其他参数方式实现:

参数:

{"x-dead-letter-exchange", "delay_exchange"},
{"x-dead-letter-routing-key", "work_key"}

x-dead-letter-exchange 消息过期后,消息要进入的交换机,俗称死信交换机,

Optional name of an exchange to which messages will be republished if they are rejected or expire.

x-dead-letter-routing-key 消息过期后,进入死信交换机的routing-key, 再根据这个key将消息放入不同的队列

Optional replacement routing key to use when a message is dead-lettered. If this is not set, the message's original routing key will be used.
3. 绑定 直连交换机delay_exchange 与 延迟队列delay_queue 与 延迟delay_key

把新消息都投入到延迟队列内, 消息一过期就自动按照 消息参数 转发到下一流程

4. 创建 临时工作队列: next_queue队列名随机, 工作绑定键: work_key

存放要立即消费执行的任务

5. 绑定 直连交换机delay_exchange 与 临时工作队列next_queue 与 工作绑定键work_key

把过期的消息, 即延迟任务 都存入工作队列

6. 消费掉 临时工作队列 next_queue


投递任务

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 发送
$config = [
    'host' => '',
    'port' => 5672,
    'user' => '',
    'password' => '',
    'vhost' => '/',

    'locale' => 'en_US',
];
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost'],
    false, 'AMQPLAIN', null, $config['locale'], 3.0, 3.0, null,
    true, 60, 0, null);
$channel = $connection->channel();

$exchange = 'delay_exchange';
$sourceRoutingKey = 'delay_key';


// 直连交换机(direct):  对路由绑定键routing key进行精确匹配,从而确定消息该分发到哪个队列
// 创建 持久化的交换机
$channel->exchange_declare($exchange, 'direct', false, true, false);

$msg = new AMQPMessage(igbinary_serialize(microtime(true)), [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,  // 消息持久化2, 默认不持久化1
    'expiration' => 3000 // 3秒后触发
]);
$channel->basic_publish($msg, $exchange, $sourceRoutingKey); // 消息发给交换机, 此时交换机缓存中 有大量消息没有被转发给 队列

echo " [x] Sent 'Hello World!'\n";

$channel->close();
$connection->close();

延迟消费任务

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 接收
$config = [
    'host' => '',
    'port' => 5672,
    'user' => '',
    'password'=> '',
    'vhost' => '/',
    'locale' => 'en_US',
];
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'],  $config['password'], $config['vhost'],
    false, 'AMQPLAIN', null, $config['locale'], 3.0, 3.0, null,
    true, 60, 0, null);

$channel = $connection->channel();

$exchange = 'delay_exchange';

$sourceQueue = 'delay_queue';
$sourceRoutingKey = 'delay_key';

$nextRoutingKey = 'work_key';
$nextQueue = 'tmpQueue';

$channel->exchange_declare($exchange, 'direct', false, true, false);

// 延迟队列
$arguments = new \PhpAmqpLib\Wire\AMQPTable();
$arguments->set("x-dead-letter-exchange", $exchange);
$arguments->set("x-dead-letter-routing-key", $nextRoutingKey);
$channel->queue_declare($sourceQueue, false, true, false, false, false, $arguments);
$channel->queue_bind($sourceQueue, $exchange, $sourceRoutingKey); // 把交换机的一手数据传到该延迟队列

// 工作队列
list($nextQueue, ,) = $channel->queue_declare("", false, true, false, false);
$channel->queue_bind($nextQueue, $exchange, $nextRoutingKey); // 把交换机的过期数据传到该临时工作队列,消费之



echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function (AMQPMessage $msg) {
    $tmp = json_encode(igbinary_unserialize($msg->body),JSON_UNESCAPED_UNICODE);
    echo 'time:', microtime(true);
    echo ' [x] Received ', $tmp,
    ', json size:', strlen($tmp),
    ', igbianry size:',$msg->body_size, "\n";

    //因为消息是持久化的,所以要求确认消息,消费者发回ack
    // 如果不ack,则该socket连接永远也不会接收到新的msg!!服务端误以为你挂起了 Ready->Unacked->[Ready或acked]
    echo 'delivery_tag:', $msg->delivery_info['delivery_tag'],PHP_EOL;
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null); // 基础流量控制, 公平调度, 默认不控制

// 消费端的 no_ack权限 改为 必须false才能使用ACk机制!!!
$channel->basic_consume($nextQueue, '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

标签: none

添加新评论