juniorfontenele / laravel-rabbitmq
A Laravel package for RabbitMQ
Fund package maintenance!
juniorfontenele
Requires
- php: ^8.3
- illuminate/support: ^12
- php-amqplib/php-amqplib: ^3.7
- phpseclib/phpseclib: ~3.0
Requires (Dev)
- driftingly/rector-laravel: ^2.0
- fakerphp/faker: ^1.24
- larastan/larastan: ^3.3
- laravel/framework: ^12
- laravel/pint: ^1.22
- laravel/tinker: ^2.10
- nunomaduro/collision: ^8.8
- orchestra/testbench: ^10.1
- pestphp/pest: ^3.8
- pestphp/pest-plugin-arch: ^3.1
- pestphp/pest-plugin-laravel: ^3.1
- phpstan/extension-installer: ^1.4
- phpstan/phpstan-deprecation-rules: ^2.0
- phpstan/phpstan-phpunit: ^2.0
- rector/rector: ^2.0
- spatie/laravel-ray: ^1.40
- spatie/ray: ^1.41
This package is auto-updated.
Last update: 2025-04-21 08:29:14 UTC
README
A robust Laravel package for integrating with RabbitMQ, providing support for multiple connections, exchanges, queues, and consumers. This package simplifies the process of working with RabbitMQ in your Laravel applications.
Features
- Multiple RabbitMQ connections support
- SSL/TLS connection support
- Easy configuration of exchanges and queues
- Flexible consumer system
- Built-in retry mechanism
- Command-line worker
- Event-driven architecture
Requirements
- PHP 8.3 or higher
- Laravel 12.0 or higher
- RabbitMQ server
- php-amqplib/php-amqplib ^3.7
Installation
You can install the package via composer:
composer require juniorfontenele/laravel-rabbitmq
After installing, publish the configuration file:
php artisan rabbitmq:install
Configuration
Basic Environment Configuration
Add the following variables to your .env
file:
RABBITMQ_HOST=localhost RABBITMQ_PORT=5672 RABBITMQ_USER=guest RABBITMQ_PASSWORD=guest RABBITMQ_VHOST=/ RABBITMQ_SSL=false
For SSL connections, you can also configure:
RABBITMQ_SSL=true RABBITMQ_SSL_CAFILE=/path/to/ca.pem RABBITMQ_SSL_CERTFILE=/path/to/cert.pem RABBITMQ_SSL_KEYFILE=/path/to/key.pem RABBITMQ_SSL_VERIFY_PEER=true
Configuration Structure
The published configuration file (config/rabbitmq.php
) contains sections for:
Connections
Define your RabbitMQ server connections:
'connections' => [ 'default' => [ 'host' => env('RABBITMQ_HOST', 'localhost'), 'port' => env('RABBITMQ_PORT', 5672), 'user' => env('RABBITMQ_USER', 'guest'), 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'vhost' => env('RABBITMQ_VHOST', '/'), 'ssl' => [ 'enabled' => env('RABBITMQ_SSL', false), 'cafile' => env('RABBITMQ_SSL_CAFILE', null), 'local_cert' => env('RABBITMQ_SSL_CERTFILE', null), 'local_key' => env('RABBITMQ_SSL_KEYFILE', null), 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), ], ], // Add more connections as needed ]
Exchanges
Configure your RabbitMQ exchanges:
'exchanges' => [ 'default' => [ 'connection' => 'default', // Connection to use 'name' => 'app.default', // Exchange name 'type' => 'direct', // Type: direct, topic, fanout, headers 'passive' => false, // Don't create, error if doesn't exist 'durable' => true, // Survive broker restart 'auto_delete' => false, // Delete when no queues bound 'internal' => false, // No direct publishing 'arguments' => [], // Additional arguments ], ]
Queues
Configure your RabbitMQ queues:
'queues' => [ 'default' => [ 'exchange' => 'default', // Exchange to bind to 'name' => 'default_queue', // Queue name 'routing_key' => 'default_queue', // Routing key 'consumer_tag' => 'consumer_tag', // Consumer identifier 'passive' => false, // Don't create, error if doesn't exist 'durable' => true, // Survive broker restart 'exclusive' => false, // Only one connection can use 'auto_delete' => false, // Delete when no consumers 'arguments' => [], // Additional arguments 'prefetch' => [ 'count' => 1, // Messages to prefetch 'size' => 0, // Total size in bytes ], 'retry' => [ 'enabled' => true, // Enable retry mechanism 'max_attempts' => 3, // Maximum retry attempts 'delay' => 60000, // Delay between retries (ms) ], ], ]
Worker
Configure worker behavior:
'worker' => [ 'memory_limit' => 128, // Memory limit in MB 'timeout' => 60, // Wait timeout in seconds 'sleep' => 3, // Sleep when no message (seconds) 'max_jobs' => 0, // Max jobs (0 = unlimited) 'tries' => 1, // Default retry attempts ],
Usage
Publishing Messages
You can publish messages using the RabbitMQ facade:
use JuniorFontenele\LaravelRabbitMQ\Facades\RabbitMQ;
The package provides a standardized EventMessage
class that helps with formatting and handling messages:
use JuniorFontenele\LaravelRabbitMQ\Facades\RabbitMQ; use JuniorFontenele\LaravelRabbitMQ\Messages\EventMessage; // Create a standardized event message $message = EventMessage::make('user.created', [ 'id' => 123, 'name' => 'John Doe', 'email' => 'john@example.com' ]) ->routingKey('user.events') ->messageId(uniqid()) ->correlationId('request-123'); // Publish the event message RabbitMQ::publish('default', $message);
The EventMessage
automatically includes:
- Timestamp
- Application name
- Hostname
- Event name
- Payload data
Consuming Messages
Creating a Consumer
Create a custom consumer by extending the base Consumer
class:
<?php namespace App\Consumers; use JuniorFontenele\LaravelRabbitMQ\Consumer; use JuniorFontenele\LaravelRabbitMQ\Messages\EventMessage; use PhpAmqpLib\Message\AMQPMessage; class NotificationsConsumer extends Consumer { /** * Process the message. * * @param AMQPMessage $message * @return void */ public function consume(AMQPMessage $message): void { // Parse message as a standard EventMessage try { $eventMessage = EventMessage::tryFrom($message); // Access standardized event data $event = $eventMessage->getEvent(); $payload = $eventMessage->getPayload(); $messageId = $eventMessage->getMessageId(); $correlationId = $eventMessage->getCorrelationId(); // Process based on event type match($event) { 'user.created' => $this->handleUserCreated($payload), 'user.updated' => $this->handleUserUpdated($payload), default => $this->handleUnknownEvent($event, $payload), }; // Or process as raw data $data = json_decode($message->getBody(), true); // Process the message // ... } catch (\Exception $e) { // Handle error parsing the message $this->failed($message, $e); return; } // Acknowledge the message after successful processing $message->ack(); } protected function handleUserCreated(array $payload): void { // Handle user created event } protected function handleUserUpdated(array $payload): void { // Handle user updated event } protected function handleUnknownEvent(string $event, array $payload): void { // Handle unknown event } }
Registering a Consumer
Register your consumer in a service provider:
<?php namespace App\Providers; use App\Consumers\NotificationsConsumer; use Illuminate\Support\ServiceProvider; use JuniorFontenele\LaravelRabbitMQ\RabbitMQManager; class AppServiceProvider extends ServiceProvider { public function boot(RabbitMQManager $manager) { // Register consumer for notifications queue $manager->registerConsumer('notifications', NotificationsConsumer::class); } }
Consumer auto-discovery
You can also auto-register consumers by adding them to the App\Consumers
folder. You have to extend the base JuniorFontenele\LaravelRabbitMQ\Consumer
class and use a studly name for class, e.g. NotificationsConsumer
for the notifications
queue. The package will automatically discover and register them.
Starting a Worker
You can start a worker via command line:
# Basic usage php artisan rabbitmq:work default # With options php artisan rabbitmq:work notifications \ --memory=256 \ --timeout=120 \ --sleep=5 \ --max-jobs=1000 \ --tries=3 \ --once
Or programmatically:
use JuniorFontenele\LaravelRabbitMQ\Worker; $worker = app(Worker::class); $worker->work('notifications', [ 'memory_limit' => 256, 'timeout' => 120, 'max_jobs' => 1000, 'verbose' => true ]);
Listening for Events
The package dispatches the following events:
Message Events
rabbitmq.processing
: Before processing a message- Parameters:
AMQPMessage $message, string $queue
- Parameters:
rabbitmq.processed
: After successful processing- Parameters:
AMQPMessage $message, string $queue
- Parameters:
rabbitmq.failed
: When processing fails- Parameters:
AMQPMessage $message, string $queue, Throwable $exception
- Parameters:
Worker Events
rabbitmq.worker.starting
: When worker starts processing a queue- Parameters:
string $queue
- Parameters:
rabbitmq.worker.started
: After worker has started- Parameters:
string $queue
- Parameters:
rabbitmq.worker.restarting
: When worker is restarting- Parameters:
string $queue
- Parameters:
rabbitmq.worker.stopping
: When worker is about to stop- Parameters:
string $stopType
('hard', 'soft', or 'stop'),int $status
- Parameters:
rabbitmq.worker.stopped
: When worker has stopped- Parameters:
string $queue
- Parameters:
rabbitmq.worker.timeout
: When worker times out waiting for messages- Parameters:
string $queue, AMQPTimeoutException $exception, bool $shouldSleep
- Parameters:
rabbitmq.worker.error
: When an error occurs during worker execution- Parameters:
string $queue, Throwable $exception
- Parameters:
rabbitmq.worker.signal
: When worker receives a system signal- Parameters:
int $signalNumber
- Parameters:
rabbitmq.worker.alarm
: When worker receives an alarm signal- Parameters:
int $signalNumber, int $memoryUsage
- Parameters:
rabbitmq.worker.failed
: When worker fails to start- Parameters:
string $queue, array $options, Throwable $exception
- Parameters:
You can listen for these events in your EventServiceProvider
:
protected $listen = [ 'rabbitmq.processing' => [ \App\Listeners\LogProcessingMessage::class, ], 'rabbitmq.processed' => [ \App\Listeners\LogProcessedMessage::class, ], 'rabbitmq.failed' => [ \App\Listeners\LogFailedMessage::class, ], 'rabbitmq.worker.starting' => [ \App\Listeners\LogWorkerStarting::class, ], 'rabbitmq.worker.stopped' => [ \App\Listeners\LogWorkerStopped::class, ], ];
Or using the event facade:
use Illuminate\Support\Facades\Event; Event::listen('rabbitmq.failed', function($message, $queue, $exception) { Log::error("Failed to process message", [ 'queue' => $queue, 'error' => $exception->getMessage() ]); });
Error Handling
The package includes a built-in retry mechanism:
- When a consumer throws an exception, the
failed
method is called - The message retry count is checked against the configured maximum attempts
- If retries are available, the message is requeued
- If max retries are reached, the message is rejected (not requeued)
- A
rabbitmq.failed
event is dispatched
You can customize this behavior by overriding the failed
method in your consumer:
public function failed(AMQPMessage $message, Throwable $exception): void { // Custom failure handling Log::error("Custom failure handling", [ 'message' => $message->getBody(), 'exception' => $exception->getMessage() ]); // Call parent implementation or handle completely custom parent::failed($message, $exception); }
Best Practices
- Use durable exchanges and queues for important messages
- Configure prefetch count appropriately for your workload
- Implement proper error handling in consumers
- Use meaningful routing keys for better message routing
- Configure retry policies based on your use case
- Monitor memory usage and set appropriate limits
- Use correlation IDs to track related messages
- Implement dead letter exchanges for failed messages
Testing
The package includes tests that you can run:
composer test
To test your own implementation, you can mock the RabbitMQManager
in your tests:
use JuniorFontenele\LaravelRabbitMQ\RabbitMQManager; use JuniorFontenele\LaravelRabbitMQ\Facades\RabbitMQ; // Mock the RabbitMQManager $this->mock(RabbitMQManager::class, function ($mock) { $mock->shouldReceive('publish') ->once() ->with('notifications', ['test' => 'data'], []) ->andReturn(null); }); // Call your code that uses the RabbitMQ facade RabbitMQ::publish('notifications', ['test' => 'data']);
Changelog
Please see CHANGELOG for more information on what has changed recently.
Contributing
Please see CONTRIBUTING for details.
Security Vulnerabilities
Please review our security policy on how to report security vulnerabilities.
Credits
License
The MIT License (MIT). Please see License File for more information.