mix8872 / rabbitmq-rpc
Json RPC implementation over rabbitmq
Requires
- needle-project/laravel-rabbitmq: dev-master
This package is auto-updated.
Last update: 2025-04-07 14:35:06 UTC
README
Способ позволяет выполнять функции с аргументами на удаленном сервисе (или группе сервисов).
Зависимости:
- php 8.*
- php-amqplib
needle-project/laravel-rabbitmq
,
используется форкgit@github.com:mix8872/laravel-rabbitmq.git
в котором сняты ограничения на максимальное время работы и количество запросов
Принцип работы
Для выполнения процедур используется json формат сообщения rmq, который содержит следующие поля:
- 'request_id' => 'string|required' | ID запроса - добавляется автоматически
- 'reply_to' => 'string|required' | имя очереди rmq для ответа - добавляется автоматически из
config('app.name')
на эту очередь должен быть подписан отправитель запроса, чтоб получить ответ или уведомление об ошибке - 'action' => 'string|required_without:error|regex:/[a-z]+.[a-z]+/ui' | формат поля action: 'псевдоним_класса.метод'
псевдоним_класса - ключ массива в конфиге laravel_rabbitmq.rpc - 'attributes' => 'array' | массив атрибутов [ключ => значение]
- 'error' => 'string|required_without:action' | ответ ошибкой на предыдущий запрос
- 'reply_for' => 'string|required_with:error' | ИД предыдущего запроса, обязателен, если отвечаем ошибкой
Подписчик RMQ получает сообщение, по полю action
ищет нужный метод и выполняет его, подставляя в него аргументы
из поля attributes
.
Все сообщения в RabbitMQ шифруются через Crypt::encryptString
, таким образом во всех связанных сервисах
должен быть указан один и тот же APP_KEY
.
Конфигурирование
В разделе queues
задать все необходимые очереди, например:
... 'queues' => [ '<имя_сервиса>' => [ 'connection' => 'grch', 'name' => '<имя_сервиса>', 'attributes' => [ 'durable' => true, 'bind' => [ ['exchange' => 'GRCHExchanger', 'routing_key' => 'projectsData'], // подписываем очередь на обновления проектов ['exchange' => 'GRCHExchanger', 'routing_key' => 'usersData'], // подписываем очередь на обновления пользователей ['exchange' => 'GRCHExchanger', 'routing_key' => config('app.name')], // собственная очередь сервиса ], ], ], ], ...
В разделе consumers
указать:
... 'consumers' => [ '<имя_сервиса>' => [ 'queue' => '<имя_сервиса>', 'message_processor' => RMQRpcProcessor::class, ], ], ...
Добавить в конфиг laravel_rabbitmq
блок rpc
-> processors
, в котором задать массив обработчиков в формате
<псевдоним> => <класс>
, например:
... 'rpc' => [ 'processors' => [ 'users' => UsersProcessor::class, 'projects' => ProjectsProcessor::class, ], ], ...
Использование
Создаем обработчики
Создаем классы обработчиков и заполняем их в конфиг.
Каждому обработчику должен быть назначен уникальный псевдоним, который будет использоваться для вызова процедур.
Класс обработчика должен содержать публичные методы.
Каждый публичный метод может быть вызван через rpc.
Допускается использование статических методов.
Если запрошенный метод является статическим - он будет выполнен напрямую,
иначе будет создан экземпляр класса и выполнен запрошенный метод у объекта.
Отправка сообщения
Выполнение процедуры
Например, отправка команды на создание проекта:
RMQRpcPublisher::make() ->action('projects.create') ->attributes([ 'project' => $project->toArray(), ]) ->publish('projectsData');
где:
- 'projects.create' - экшн, projects - псевдоним класса, create - метод класса
- 'project' - аргумент для передачи в метод класса
- 'projectsData' - routing key, для отправки в очереди, подписанные на этот маршрут
Ответ с ошибкой
RMQRpcPublisher::make() ->error(<текст_ошибки>) ->replyFor(<request_id_из_предыдущего_запроса>) ->publish(<reply_to_из_предыдущего_запроса>);
Запускаем слушатель rmq:
php artisan rabbitmq:consume <имя_сервиса> --time=-1 --messages=-1
, где:
- --time=-1 - без ограничения по времени
- --messages=-1 - без ограничения по количеству сообщений