RabbitMQ: Отложеная доставка и время жизни пакета
Собрал наконец в одном классе, всё что мне нужно от RabbitMQ: а именно отправка сообщения в очередь, с отложеной доставкой, плюс уничтожение сообщения по прошествии какого то времени «не доставки».
Ну да, казалось бы элементарные вещи, но на PHP примеров для этого случая не сказать что уж слишком много как оказалось..
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
<?php //Необходимые классы use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; class TRabbit extends TLog{ public $host=""; public $login=""; public $password=""; public $port=""; public $connection=null; public static $isInstance=false; // настройки касающиеся очереди public $passive=false; // может использоваться для проверки того, инициирован ли обмен, без того, чтобы изменять состояние сервера public $durable=true; // убедимся, что RabbitMQ никогда не потеряет очередь при падении - очередь переживёт перезагрузку брокера public $exclusive=false; // используется только одним соединением, и очередь будет удалена при закрытии соединения public $autodelete=false; // очередь удаляется, когда отписывается последний подписчик public $ttl=3000; // время жизни пакета, через которое оно при недоставке дропнется,в милисекундах public function __destruct(){ if (self::$isInstance==false) { $this->connection->close(); } } public function __construct(){ self::$isInstance=true; $this->host= TConfig::GetConfigKeyValue("rabbitmq_host"); $this->login= TConfig::GetConfigKeyValue("rabbitmq_login"); $this->host= "localhost"; $this->password= TConfig::GetConfigKeyValue("rabbitmq_password"); $this->password="guest"; $this->port= TConfig::GetConfigKeyValue("rabbitmq_port"); $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->login, $this->password); } /** * Поставить в очередь сообщение * @global type $db * @param type $queue - имя очереди * @param type $message - сообщение в json формате * @param type $delay - отсрочка доставки в секундах (необязательно) * @param type $ttl - время жизни пакета, через которое оно при недоставке дропнется * @return type */ public function Producer($queue,$message,$delay=0,$ttl=null) { global $db; try { $answer = AnswerStrucNext(false, "Failed to queue a RabbitMQ message"); $channel = $this->connection->channel(); $channel->exchange_declare( 'delay', 'x-delayed-message', false, /* не создается очереь если нет её */ $this->durable, /* гарантированная доставка */ false, /* удаление очереди если пустая и нет слушателей */ false, /* internal */ false, /* не ждать ответа от сервера (т. е. продолжать получение) */ [ 'x-delayed-type' => ['S', 'direct'] ]); if (is_null($ttl)) $ttl=$this->ttl; $channel->queue_declare($queue, $this->passive, $this->durable, $this->exclusive, $this->autodelete,false, array( "x-message-ttl" => array("I", $ttl) ) ); $channel->queue_bind($queue, "delay", $queue); if ($delay==0){ $msg = new AMQPMessage($message); } else { $msg = new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $msg->set('application_headers', new AMQPTable(['x-delay' => $delay*1000])); }; $channel->basic_publish($msg,"delay",$queue); $channel->close(); $answer = AnswerStrucNext(false, "ok"); } catch(Exception $err) { $this->InsertLog(TLog::PT_PLAIN,"Ошибка постановки в очередь $queue (RabbitMQ)", TLog::R_Error, $message); }; return $answer; } } |