RabbitMQ: Отложеная доставка и время жизни пакета

Собрал наконец в одном классе, всё что мне нужно от RabbitMQ: а именно отправка сообщения в очередь, с отложеной доставкой, плюс уничтожение сообщения по прошествии какого то времени «не доставки».

Ну да, казалось бы элементарные вещи, но на PHP примеров для этого случая не сказать что уж слишком много как оказалось..

<?php

//Необходимые классы
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;


class TRabbit extends TLog{ 
    public $host="";
    public $login="";
    public $password="";
    public $port="";
    public $connection=null;
    public static $isInstance=false; 
    // настройки касающиеся очереди
    public $passive=false;      // может использоваться для проверки того, инициирован ли обмен, без того, чтобы изменять состояние сервера
    public $durable=true;       // убедимся, что RabbitMQ никогда не потеряет очередь при падении - очередь переживёт перезагрузку брокера
    public $exclusive=false;    // используется только одним соединением, и очередь будет удалена при закрытии соединения
    public $autodelete=false;   // очередь удаляется, когда отписывается последний подписчик
    public $ttl=3000;           // время жизни пакета, через которое оно при недоставке дропнется,в милисекундах
            
    public function __destruct(){
        if (self::$isInstance==false) {        
          $this->connection->close();  
        }
    }    
    public function __construct(){
        self::$isInstance=true;
        $this->host= TConfig::GetConfigKeyValue("rabbitmq_host");
        $this->login= TConfig::GetConfigKeyValue("rabbitmq_login");
        $this->host= "localhost";
        
        $this->password= TConfig::GetConfigKeyValue("rabbitmq_password");
        $this->password="guest";
        $this->port= TConfig::GetConfigKeyValue("rabbitmq_port");        
        $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->login, $this->password);        
    }    
    /**
     * Поставить в очередь сообщение
     * @global type $db
     * @param type $queue   - имя очереди
     * @param type $message - сообщение в json формате
     * @param type $delay   - отсрочка доставки в секундах (необязательно)
     * @param type $ttl     - время жизни пакета, через которое оно при недоставке дропнется
     * @return type
     */
    public function  Producer($queue,$message,$delay=0,$ttl=null) {  
     global $db;
        try {    
          $answer = AnswerStrucNext(false, "Failed to queue a RabbitMQ message");             
             $channel = $this->connection->channel();      
            $channel->exchange_declare(
                'delay',
                'x-delayed-message',
                false,            /* не создается очереь если нет её */
                $this->durable,   /* гарантированная доставка */
                false,            /* удаление очереди если пустая и нет слушателей */
                false,            /* internal */
                false,            /* не ждать ответа от сервера (т. е. продолжать получение) */
                [
                    'x-delayed-type' => ['S', 'direct']                    
                ]);
             if (is_null($ttl)) $ttl=$this->ttl;
             $channel->queue_declare($queue, $this->passive, $this->durable, $this->exclusive, $this->autodelete,false,
                array(
                        "x-message-ttl" => array("I", $ttl)
                    )                     
                );
             $channel->queue_bind($queue, "delay", $queue);
               if ($delay==0){
                  $msg = new AMQPMessage($message);		        
               } else {
                   $msg = new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);		
                   $msg->set('application_headers', new AMQPTable(['x-delay' => $delay*1000]));        
               };
          $channel->basic_publish($msg,"delay",$queue);                  
          $channel->close();
          $answer = AnswerStrucNext(false, "ok");             
       } catch(Exception $err) {
           $this->InsertLog(TLog::PT_PLAIN,"Ошибка постановки в очередь $queue (RabbitMQ)", TLog::R_Error, $message);
       };          
     return $answer;
    }
}

RabbitMQ: отложенные сообщения

Задача: организовать доставку сообщения «слушателю» спустя какое-то время, а не сразу

Решение: одним из способов реализации является использование модуля rabbitmq-delayed-message-exchange.

Установка для Ubuntu:

  1. Скачать подходящую версию модуля для версии вашего RabbitMQ в формате *.ez
  2. Положить его в папку (в моём случае) /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.9/plugins
  3. Включить плагин
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Использование:

Пример постановки в очередь:



define('WUO_ROOT', dirname(__FILE__));

require_once WUO_ROOT.'/../vendor/autoload.php';
// загружаем классы
spl_autoload_register(function ($class_name) {
    require_once WUO_ROOT.'/../class/'.$class_name.'.php';
});

//Необходимые классы
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

//Создаем соединение
$connection = new AMQPStreamConnection('22.357.184.20', 5672, 'цсцаучсцу', 'сысыуксыук');

//Берем канал и декларируем в нем новую очередь, первый аргумент - название
$channel = $connection->channel();

// Очередь
$channel->queue_declare('wss_messages', false, true, false, false);

// Отправляю 5 сообщений
for ($i=0;$i<6;$i++){
    $msg = new AMQPMessage("Hello All №".$i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);		
    $msg->set('application_headers', new AMQPTable(['x-delay' => 5000]));
    $channel->basic_publish($msg, 'main');
};
echo "Messages sended\n";
$channel->close();
$connection->close();

Пример слушателя:



define('WUO_ROOT', dirname(__FILE__));
require_once WUO_ROOT.'/../vendor/autoload.php';
// загружаем классы
spl_autoload_register(function ($class_name) {
    require_once WUO_ROOT.'/../class/'.$class_name.'.php';
});

//Необходимые классы
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

//Создаем соединение
$connection = new AMQPStreamConnection('22.53.14.120', 5672, 'уцмцу', 'мцукмцукм');
$channel = $connection->channel();

$channel->queue_declare('wss_messages', false, true, false, false);

$callback = function ($msg) {
        echo $msg->body;
        echo PHP_EOL;
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('wss_messages', '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
        $channel->wait();
}
$channel->close();
$connection->close();