RabbitMQ: отложенные сообщения

Задача: организовать доставку сообщения «слушателю» спустя какое-то время, а не сразу

Решение: одним из способов реализации является использование модуля rabbitmq-delayed-message-exchange.

Установка для Ubuntu:

  1. Скачать подходящую версию модуля для версии вашего RabbitMQ в формате *.ez
  2. Положить его в папку (в моём случае) /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.9/plugins
  3. Включить плагин
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Использование:

Пример постановки в очередь:



define('WUO_ROOT', dirname(__FILE__));

require_once WUO_ROOT.'/../vendor/autoload.php';
// загружаем классы
spl_autoload_register(function ($class_name) {
    require_once WUO_ROOT.'/../class/'.$class_name.'.php';
});

//Необходимые классы
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

//Создаем соединение
$connection = new AMQPStreamConnection('22.357.184.20', 5672, 'цсцаучсцу', 'сысыуксыук');

//Берем канал и декларируем в нем новую очередь, первый аргумент - название
$channel = $connection->channel();

// Очередь
$channel->queue_declare('wss_messages', false, true, false, false);

// Отправляю 5 сообщений
for ($i=0;$i<6;$i++){
    $msg = new AMQPMessage("Hello All №".$i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);		
    $msg->set('application_headers', new AMQPTable(['x-delay' => 5000]));
    $channel->basic_publish($msg, 'main');
};
echo "Messages sended\n";
$channel->close();
$connection->close();

Пример слушателя:



define('WUO_ROOT', dirname(__FILE__));
require_once WUO_ROOT.'/../vendor/autoload.php';
// загружаем классы
spl_autoload_register(function ($class_name) {
    require_once WUO_ROOT.'/../class/'.$class_name.'.php';
});

//Необходимые классы
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

//Создаем соединение
$connection = new AMQPStreamConnection('22.53.14.120', 5672, 'уцмцу', 'мцукмцукм');
$channel = $connection->channel();

$channel->queue_declare('wss_messages', false, true, false, false);

$callback = function ($msg) {
        echo $msg->body;
        echo PHP_EOL;
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('wss_messages', '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
        $channel->wait();
}
$channel->close();
$connection->close();