juniorfontenele / laravel-rabbitmq
A Laravel package for RabbitMQ
Fund package maintenance!
juniorfontenele
Requires
- php: ^8.3
- illuminate/support: ^12
- php-amqplib/php-amqplib: ^3.7
Requires (Dev)
- driftingly/rector-laravel: ^2.0
- fakerphp/faker: ^1.24
- larastan/larastan: ^3.3
- laravel/framework: ^12
- laravel/pint: ^1.21
- laravel/tinker: ^2.10
- nunomaduro/collision: ^8.8
- orchestra/testbench: ^10.1
- pestphp/pest: ^3.8
- pestphp/pest-plugin-arch: ^3.1
- pestphp/pest-plugin-laravel: ^3.1
- phpstan/extension-installer: ^1.4
- phpstan/phpstan-deprecation-rules: ^2.0
- phpstan/phpstan-phpunit: ^2.0
- rector/rector: ^2.0
- spatie/laravel-ray: ^1.40
- spatie/ray: ^1.41
This package is auto-updated.
Last update: 2025-04-06 19:30:32 UTC
README
A robust Laravel package for integrating with RabbitMQ, providing support for multiple connections, exchanges, queues, and consumers. This package simplifies the process of working with RabbitMQ in your Laravel applications.
Features
- Multiple RabbitMQ connections support
- SSL/TLS connection support
- Easy configuration of exchanges and queues
- Flexible consumer system
- Built-in retry mechanism
- Command-line worker
- Event-driven architecture
Requirements
- PHP 8.3 or higher
- Laravel 12.0 or higher
- RabbitMQ server
- php-amqplib/php-amqplib ^3.7
Installation
You can install the package via composer:
composer require juniorfontenele/laravel-rabbitmq
After installing, publish the configuration file:
php artisan vendor:publish --tag=config --provider="JuniorFontenele\LaravelRabbitMQ\Providers\LaravelRabbitMQServiceProvider"
Configuration
Basic Environment Configuration
Add the following variables to your .env
file:
RABBITMQ_HOST=localhost RABBITMQ_PORT=5672 RABBITMQ_USER=guest RABBITMQ_PASSWORD=guest RABBITMQ_VHOST=/ RABBITMQ_SSL=false
For SSL connections, you can also configure:
RABBITMQ_SSL=true RABBITMQ_SSL_CAFILE=/path/to/ca.pem RABBITMQ_SSL_CERTFILE=/path/to/cert.pem RABBITMQ_SSL_KEYFILE=/path/to/key.pem RABBITMQ_SSL_VERIFY_PEER=true
Configuration Structure
The published configuration file (config/rabbitmq.php
) contains sections for:
Connections
Define your RabbitMQ server connections:
'connections' => [ 'default' => [ 'host' => env('RABBITMQ_HOST', 'localhost'), 'port' => env('RABBITMQ_PORT', 5672), 'user' => env('RABBITMQ_USER', 'guest'), 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'vhost' => env('RABBITMQ_VHOST', '/'), 'ssl' => [ 'enabled' => env('RABBITMQ_SSL', false), 'cafile' => env('RABBITMQ_SSL_CAFILE', null), 'local_cert' => env('RABBITMQ_SSL_CERTFILE', null), 'local_key' => env('RABBITMQ_SSL_KEYFILE', null), 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), ], ], // Add more connections as needed ]
Exchanges
Configure your RabbitMQ exchanges:
'exchanges' => [ 'default' => [ 'connection' => 'default', // Connection to use 'name' => 'app.default', // Exchange name 'type' => 'direct', // Type: direct, topic, fanout, headers 'passive' => false, // Don't create, error if doesn't exist 'durable' => true, // Survive broker restart 'auto_delete' => false, // Delete when no queues bound 'internal' => false, // No direct publishing 'arguments' => [], // Additional arguments ], ]
Queues
Configure your RabbitMQ queues:
'queues' => [ 'default' => [ 'exchange' => 'default', // Exchange to bind to 'name' => 'default_queue', // Queue name 'routing_key' => 'default_queue', // Routing key 'consumer_tag' => 'consumer_tag', // Consumer identifier 'passive' => false, // Don't create, error if doesn't exist 'durable' => true, // Survive broker restart 'exclusive' => false, // Only one connection can use 'auto_delete' => false, // Delete when no consumers 'arguments' => [], // Additional arguments 'prefetch' => [ 'count' => 1, // Messages to prefetch 'size' => 0, // Total size in bytes ], 'retry' => [ 'enabled' => true, // Enable retry mechanism 'max_attempts' => 3, // Maximum retry attempts 'delay' => 60000, // Delay between retries (ms) ], ], ]
Worker
Configure worker behavior:
'worker' => [ 'memory_limit' => 128, // Memory limit in MB 'timeout' => 60, // Wait timeout in seconds 'sleep' => 3, // Sleep when no message (seconds) 'max_jobs' => 0, // Max jobs (0 = unlimited) 'tries' => 1, // Default retry attempts ],
Usage
Publishing Messages
You can publish messages using the RabbitMQ facade:
use JuniorFontenele\LaravelRabbitMQ\Facades\RabbitMQ; // Basic publishing RabbitMQ::publish('default', [ 'id' => 1, 'message' => 'Hello, RabbitMQ!' ]); // Publishing with options RabbitMQ::publish('notifications', $data, [ 'message_id' => uniqid(), 'correlation_id' => $correlationId, 'headers' => ['priority' => 'high'], 'routing_key' => 'notifications.email' ]);
Consuming Messages
Creating a Consumer
Create a custom consumer by extending the base Consumer
class:
<?php namespace App\System\RabbitMQ\Consumers; use JuniorFontenele\LaravelRabbitMQ\Consumer; use PhpAmqpLib\Message\AMQPMessage; class NotificationsConsumer extends Consumer { /** * Process the message. * * @param AMQPMessage $message * @return void */ public function consume(AMQPMessage $message): void { $data = json_decode($message->getBody(), true); // Process the message // ... // Acknowledge the message after successful processing $message->ack(); } }
Registering a Consumer
Register your consumer in a service provider:
<?php namespace App\Providers; use App\System\RabbitMQ\Consumers\NotificationsConsumer; use Illuminate\Support\ServiceProvider; use JuniorFontenele\LaravelRabbitMQ\RabbitMQManager; class AppServiceProvider extends ServiceProvider { public function boot(RabbitMQManager $manager) { // Register consumer for notifications queue $manager->registerConsumer('notifications', NotificationsConsumer::class); } }
Starting a Worker
You can start a worker via command line:
# Basic usage php artisan rabbitmq:work default # With options php artisan rabbitmq:work notifications \ --memory=256 \ --timeout=120 \ --sleep=5 \ --max-jobs=1000 \ --tries=3 \ --once
Or programmatically:
use JuniorFontenele\LaravelRabbitMQ\Worker; $worker = app(Worker::class); $worker->work('notifications', [ 'memory_limit' => 256, 'timeout' => 120, 'max_jobs' => 1000, 'verbose' => true ]);
Listening for Events
The package dispatches the following events:
rabbitmq.processing
: Before processing a messagerabbitmq.processed
: After successful processingrabbitmq.failed
: When processing fails
You can listen for these events in your EventServiceProvider
:
protected $listen = [ 'rabbitmq.processing' => [ \App\Listeners\LogProcessingMessage::class, ], 'rabbitmq.processed' => [ \App\Listeners\LogProcessedMessage::class, ], 'rabbitmq.failed' => [ \App\Listeners\LogFailedMessage::class, ], ];
Or using the event facade:
use Illuminate\Support\Facades\Event; Event::listen('rabbitmq.failed', function($message, $queue, $exception) { Log::error("Failed to process message", [ 'queue' => $queue, 'error' => $exception->getMessage() ]); });
Error Handling
The package includes a built-in retry mechanism:
- When a consumer throws an exception, the
failed
method is called - The message retry count is checked against the configured maximum attempts
- If retries are available, the message is requeued
- If max retries are reached, the message is rejected (not requeued)
- A
rabbitmq.failed
event is dispatched
You can customize this behavior by overriding the failed
method in your consumer:
public function failed(AMQPMessage $message, Throwable $exception): void { // Custom failure handling Log::error("Custom failure handling", [ 'message' => $message->getBody(), 'exception' => $exception->getMessage() ]); // Call parent implementation or handle completely custom parent::failed($message, $exception); }
Best Practices
- Use durable exchanges and queues for important messages
- Configure prefetch count appropriately for your workload
- Implement proper error handling in consumers
- Use meaningful routing keys for better message routing
- Configure retry policies based on your use case
- Monitor memory usage and set appropriate limits
- Use correlation IDs to track related messages
- Implement dead letter exchanges for failed messages
Testing
The package includes tests that you can run:
composer test
You can also run tests with coverage:
composer test-coverage
To test your own implementation, you can mock the RabbitMQManager
in your tests:
use JuniorFontenele\LaravelRabbitMQ\RabbitMQManager; use JuniorFontenele\LaravelRabbitMQ\Facades\RabbitMQ; // Mock the RabbitMQManager $this->mock(RabbitMQManager::class, function ($mock) { $mock->shouldReceive('publish') ->once() ->with('notifications', ['test' => 'data'], []) ->andReturn(null); }); // Call your code that uses the RabbitMQ facade RabbitMQ::publish('notifications', ['test' => 'data']);
Changelog
Please see CHANGELOG for more information on what has changed recently.
Contributing
Please see CONTRIBUTING for details.
Security Vulnerabilities
Please review our security policy on how to report security vulnerabilities.
Credits
License
The MIT License (MIT). Please see License File for more information.