RabbitMQ: отложенные сообщения
Задача: организовать доставку сообщения «слушателю» спустя какое-то время, а не сразу
Решение: одним из способов реализации является использование модуля rabbitmq-delayed-message-exchange.
Установка для Ubuntu:
- Скачать подходящую версию модуля для версии вашего RabbitMQ в формате *.ez
- Положить его в папку (в моём случае) /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.9/plugins
- Включить плагин
1 |
rabbitmq-plugins enable rabbitmq_delayed_message_exchange |
Использование:
Пример постановки в очередь:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
define('WUO_ROOT', dirname(__FILE__)); require_once WUO_ROOT.'/../vendor/autoload.php'; // загружаем классы spl_autoload_register(function ($class_name) { require_once WUO_ROOT.'/../class/'.$class_name.'.php'; }); //Необходимые классы use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; //Создаем соединение $connection = new AMQPStreamConnection('22.357.184.20', 5672, 'цсцаучсцу', 'сысыуксыук'); //Берем канал и декларируем в нем новую очередь, первый аргумент - название $channel = $connection->channel(); // Очередь $channel->queue_declare('wss_messages', false, true, false, false); // Отправляю 5 сообщений for ($i=0;$i<6;$i++){ $msg = new AMQPMessage("Hello All №".$i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $msg->set('application_headers', new AMQPTable(['x-delay' => 5000])); $channel->basic_publish($msg, 'main'); }; echo "Messages sended\n"; $channel->close(); $connection->close(); |
Пример слушателя:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
define('WUO_ROOT', dirname(__FILE__)); require_once WUO_ROOT.'/../vendor/autoload.php'; // загружаем классы spl_autoload_register(function ($class_name) { require_once WUO_ROOT.'/../class/'.$class_name.'.php'; }); //Необходимые классы use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; //Создаем соединение $connection = new AMQPStreamConnection('22.53.14.120', 5672, 'уцмцу', 'мцукмцукм'); $channel = $connection->channel(); $channel->queue_declare('wss_messages', false, true, false, false); $callback = function ($msg) { echo $msg->body; echo PHP_EOL; }; $channel->basic_qos(null, 1, null); $channel->basic_consume('wss_messages', '', false, true, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); |