RabbitMQ + Yii

13.08.2013
Работа с очередями через RabbitMQ в Yii-проектах

Отложенное исполнение задач и работа с очередью  - не бином Ньютона, особых сложностей тут нет. Но некоторые вещи забываются, да и вообще полезно иметь руководство на всякий случай.

Известны два решения для работы с очередью - Gearman и RabbitMQ. В том виде, в котором приходилось их использовать, оба брокера примерно одинаковы. Тем не менее, считается, что RabbitMQ "круче".

Собственно вот как можно его использовать в проектах, основанных на Yii.

1. Установка 

> git clone git://github.com/alanxz/rabbitmq-c.git
> cd rabbitmq-c
> git submodule init
> git submodule update
> autoreconf -i && ./configure && make && sudo make install

Здесь могут быть проблемы: autoreconf - возможно потребуется доустановить pkg-config (aptitude install pkg-config).

Теперь сам модуль php-amqp: 

> wget <a href="http://pecl.php.net/get/amqp">http://pecl.php.net/get/amqp</a> -O amqp.tar.gz
> tar -zxvf amqp.tar.gz
> cd amqp-1.0.7
> phpize
> ./configure --with-amqp
> make
> make install

Прописываем в php.ini новое расширение: extension=amqp.so.

И, собственно, сервер:

> echo 'deb <a href="http://www.rabbitmq.com/debian/">http://www.rabbitmq.com/debian/</a> testing main' >> /etc/apt/sources.list
> wget&nbsp;http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
> sudo apt-key add rabbitmq-signing-key-public.asc
> aptitude update
> aptitude install rabbitmq-server

Web-панель для управления сервером (RabbitMQ management plugin):

> rabbitmq-plugins enable rabbitmq_management

Пpи помощи утилиты командной строки rabbitmqctl добавляем пользователей (add_user) и прописываем их в нужные группы (set_user_tags). Иногда этот плагин удобен, можно реже использовать rabbitmqctl и командную строку.

2. Компонент для Yii

В нашей работе с очередью процесс будет определяться двумя параметрами для указания очереди и обменника и тремя для подключения к самому брокеру очереди.

Создадим расширение для Yii, при помощи которого можно будет получать и ставить задачи в очередь.

Добавляем в конфиге в секцию components:

'mq' => array(
  'class' => 'ext.amqp',
  'host'  => '127.0.0.1',
  'login'  => 'guest',
  'password'  => 'guest',
  'exchangeName'  => 'test-exchange',
  'queueName'  => 'test-queue',
),

Сам класс расширения создадим в extensions с именем amqp.php. Исходник, например такой:

class amqp extends CApplicationComponent {
    private $_host;
    private $_login;
    private $_password;
    private $_exchangeName;
    private $_queueName;

    private $_connection;
    private $_channel;
    private $_exchange;
    private $_queue;

    public function init()
    {
        parent::init();
    }

    public function __construct() { }

    public function __destruct() {
        if(  $this->_connection ) {
            $this->_connection->disconnect();
        }
    }

    private function getConnection() {
        if( !$this->_connection ) {
            $this->_connection = new AMQPConnection();
            $this->_connection->setLogin($this->_login);
            $this->_connection->setPassword($this->_password);
            $this->_connection->setHost($this->_host);

            $this->_connection->connect();
            if (!$this->_connection->isConnected()) {
                $this->_connection = null;
                return;
            }
        }
        return $this->_connection;
    }

    private function getExchange() {
        if( !$this->_exchange ) {
            $this->_exchange   = new AMQPExchange( $this->getChannel() );
            $this->_exchange->setName( $this->_exchangeName );
            $this->_exchange->setType('fanout');
            $this->_exchange->declareExchange();
        }
        return $this->_exchange;
    }

    private function getChannel() {
        if( !$this->_channel ) {
            $this->_channel    = new AMQPChannel($this->getConnection());
        }
        return $this->_channel;
    }

    private function getQueue() {
        if( !$this->_queue ) {
            $this->_queue      = new AMQPQueue( $this->getChannel() );
            $this->_queue->setName( $this->_queueName );
            $this->_queue->declareQueue();
            $this->_queue->bind( $this->_exchangeName, $this->_queueName );
        }
        return $this->_queue;
    }

    public function schedule($task)
    {
        try {
            $this->getQueue(); // вдруг очередь еще не объявлена?
            if( !$this->getExchange()->publish( (string)$task, $this->_queueName ) ) {
                throw new Exception('Message not sent!');
            }
        }
        catch( Exception $e ) {
            die( $e->getMessage() );
        }
    }

    public function process( ) {
      $this->getExchange(); // вдруг обменник еще не объявлен?
      while ($envelope = $this->getQueue()->get(AMQP_AUTOACK)) {
        echo $envelope->getBody(); // здесь некая полезная работа происходит. имеет смысл ее реализацию вынести в отдельное расширение
      }
    }

    public function setHost( $_host ) {
        $this->_host = $_host;
    }
    public function setLogin( $_login ) {
        $this->_login = $_login;
    }
    public function setPassword( $_password ) {
        $this->_password = $_password;
    }
    public function setExchangeName( $_exchangeName ) {
        $this->_exchangeName = $_exchangeName;
    }
    public function setQueueName( $_queueName ) {
        $this->_queueName = $_queueName;
    }
}

Постановка заданий в очередь требует некоторого внимания: надо удостовериться, что обменник и очередь существуют.

Для этого перед каждой операцией на постановку задания в очередь декларируем и очередь и обменник, хуже не будет.

Для того, чтобы добавить задание в очередь при каком-то действии, можно использовать такой код:

Yii::app()->mq->schedule( 'важное задание' );

На исполнение иногда имеет смысл передавать сериализованные объекты и вызывать их методы потом, при обработке очереди.

Обработку очереди проводим при помощи всегда запущенного процесса. Если использовать supervisor для контроля, то, говорят, лучшая практика - постоянно убивать обработчик и постоянно поднимать его заново.

Тогда код консольной команды может быть, например, таким:

class MQCommand extends CConsoleCommand
{
    public function run( $args ) {
        $method = isset($args[0]) ? $args[0] : '';
        if( $method ) {
            $this->$method();
        }
    }
    public function work() {
        for ( $i = 1; $i < 10; ++$i ) {
            Yii::app()->mq->process();
        }
    }
}

Запускать так:

> php yiic mq work

Все оказалось просто и нам не пришлось ставить сторонних расширений 2GIS или еще чьих-то.


UPD: 

Как правильно замечают в комментах, в методе getQueue имеет смысл зашить инициализацию обменника. Тогда нам не потребуется следить за его объявлением в методе process. Кроме того, несмотря на то, что в текущей версии ничего особенно плохого не произойдет, если привязать очередь к еще несуществующему обменнику (обменник мы все равно объявим. пусть позднее - но точно ли все, что послано в очередь придет по адресу я не проверял), но логичнее использовать исправленный код для getQueue:

private function getQueue() {
        if( !$this->_queue ) {
            $this->getExchange();
            $this->_queue      = new AMQPQueue( $this->getChannel() );
            $this->_queue->setName( $this->_queueName );
            $this->_queue->declareQueue();
            $this->_queue->bind( $this->_exchangeName, $this->_queueName );
        }
        return $this->_queue;
    }

И можно убрать $this->getExchange(); из метода process