<?php
//Необходимые классы
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class TRabbit extends TLog{
public $host="";
public $login="";
public $password="";
public $port="";
public $connection=null;
public static $isInstance=false;
// настройки касающиеся очереди
public $passive=false; // может использоваться для проверки того, инициирован ли обмен, без того, чтобы изменять состояние сервера
public $durable=true; // убедимся, что RabbitMQ никогда не потеряет очередь при падении - очередь переживёт перезагрузку брокера
public $exclusive=false; // используется только одним соединением, и очередь будет удалена при закрытии соединения
public $autodelete=false; // очередь удаляется, когда отписывается последний подписчик
public $ttl=3000; // время жизни пакета, через которое оно при недоставке дропнется,в милисекундах
public function __destruct(){
if (self::$isInstance==false) {
$this->connection->close();
}
}
public function __construct(){
self::$isInstance=true;
$this->host= TConfig::GetConfigKeyValue("rabbitmq_host");
$this->login= TConfig::GetConfigKeyValue("rabbitmq_login");
$this->host= "localhost";
$this->password= TConfig::GetConfigKeyValue("rabbitmq_password");
$this->password="guest";
$this->port= TConfig::GetConfigKeyValue("rabbitmq_port");
$this->connection = new AMQPStreamConnection($this->host, $this->port, $this->login, $this->password);
}
/**
* Поставить в очередь сообщение
* @global type $db
* @param type $queue - имя очереди
* @param type $message - сообщение в json формате
* @param type $delay - отсрочка доставки в секундах (необязательно)
* @param type $ttl - время жизни пакета, через которое оно при недоставке дропнется
* @return type
*/
public function Producer($queue,$message,$delay=0,$ttl=null) {
global $db;
try {
$answer = AnswerStrucNext(false, "Failed to queue a RabbitMQ message");
$channel = $this->connection->channel();
$channel->exchange_declare(
'delay',
'x-delayed-message',
false, /* не создается очереь если нет её */
$this->durable, /* гарантированная доставка */
false, /* удаление очереди если пустая и нет слушателей */
false, /* internal */
false, /* не ждать ответа от сервера (т. е. продолжать получение) */
[
'x-delayed-type' => ['S', 'direct']
]);
if (is_null($ttl)) $ttl=$this->ttl;
$channel->queue_declare($queue, $this->passive, $this->durable, $this->exclusive, $this->autodelete,false,
array(
"x-message-ttl" => array("I", $ttl)
)
);
$channel->queue_bind($queue, "delay", $queue);
if ($delay==0){
$msg = new AMQPMessage($message);
} else {
$msg = new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$msg->set('application_headers', new AMQPTable(['x-delay' => $delay*1000]));
};
$channel->basic_publish($msg,"delay",$queue);
$channel->close();
$answer = AnswerStrucNext(false, "ok");
} catch(Exception $err) {
$this->InsertLog(TLog::PT_PLAIN,"Ошибка постановки в очередь $queue (RabbitMQ)", TLog::R_Error, $message);
};
return $answer;
}
}