ody / amqp
AMQP module for ODY framework
Requires
- php: >=8.3
- ody/process: ^0.1.2
- ody/task: ^0.1.2
- php-amqplib/php-amqplib: ^3.7
This package is auto-updated.
Last update: 2025-04-06 18:54:47 UTC
README
This package provides RabbitMQ integration for the ODY framework, allowing you to easily implement asynchronous messaging patterns in your application.
Installation
composer require ody/amqp
Configuration
First, publish the configuration file:
php ody publish:config ody/amqp
This will create a config/amqp.php
file where you can configure your RabbitMQ connections:
return [ 'enable' => true, 'default' => [ 'host' => env('RABBITMQ_HOST', 'localhost'), 'port' => env('RABBITMQ_PORT', 5672), 'user' => env('RABBITMQ_USER', 'guest'), 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'vhost' => env('RABBITMQ_VHOST', '/'), 'concurrent' => [ 'limit' => 10, // Max concurrent consumers per process ], 'params' => [ 'connection_timeout' => 3.0, 'read_write_timeout' => 3.0, 'heartbeat' => 60, 'keepalive' => true, ], ], // You can define multiple connection pools 'analytics' => [ 'host' => 'analytics-rabbitmq', // Other connection settings ], // Connection pooling configuration 'pool' => [ 'enable' => true, 'max_connections' => 20, 'max_channels_per_connection' => 20, 'max_idle_time' => 60, // seconds ], 'producer' => [ 'paths' => ['app/Producers'], 'retry' => [ 'max_attempts' => 3, 'initial_interval' => 1000, // ms 'multiplier' => 2.0, 'max_interval' => 10000, // ms ], ], 'consumer' => [ 'paths' => ['app/Consumers'], 'prefetch_count' => 10, 'auto_declare' => true, ], 'process' => [ 'enable' => true, 'max_consumers' => 10, 'auto_restart' => true, ] ];
Setting Up Docker
For local development, you can use Docker to run RabbitMQ:
docker run -d --name rabbitmq \ -p 5672:5672 \ -p 15672:15672 \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=password \ rabbitmq:3-management
Or use docker-compose:
version: '3.8' services: rabbitmq: image: rabbitmq:3-management container_name: rabbitmq ports: - "5672:5672" # AMQP protocol port - "15672:15672" # Management UI port environment: - RABBITMQ_DEFAULT_USER=admin - RABBITMQ_DEFAULT_PASS=password volumes: - rabbitmq_data:/var/lib/rabbitmq - rabbitmq_log:/var/log/rabbitmq volumes: rabbitmq_data: rabbitmq_log:
Connection Pooling
The framework implements connection pooling to optimize RabbitMQ connections by reusing existing connections and channels instead of creating new ones for each operation. This significantly improves performance in high-throughput scenarios.
Pool Configuration
Configure the connection pool in your config/amqp.php
file:
'pool' => [ 'enable' => true, // Enable/disable connection pooling 'max_connections' => 20, // Maximum connections in the pool 'max_channels_per_connection' => 20,// Maximum channels per connection 'max_idle_time' => 60, // Maximum idle time in seconds ],
Usage
Connection pooling works transparently with the existing AMQP API:
// The publish method automatically uses pooled connections AMQP::publish(UserCreatedProducer::class, [ $user->id, $user->email, $user->username ]); // For advanced usage, you can directly access pooled resources $connection = AMQP::getPooledConnection(); $channel = AMQP::getPooledChannel();
Creating Producers
Producers send messages to RabbitMQ. Create a producer class in your app/Producers
directory:
<?php namespace App\Producers; use Ody\AMQP\Attributes\Producer; use Ody\AMQP\Message\ProducerMessage; #[Producer(exchange: 'user_events', routingKey: 'user.created', type: 'topic')] class UserCreatedProducer extends ProducerMessage { public function __construct( private int $userId, private string $email, private string $username ) { $this->payload = [ 'user_id' => $this->userId, 'email' => $this->email, 'username' => $this->username, 'created_at' => date('Y-m-d H:i:s') ]; } }
Creating Consumers
Consumers receive and process messages from RabbitMQ. Create a consumer class in your app/Consumers
directory:
<?php namespace App\Consumers; use Ody\AMQP\Attributes\Consumer; use Ody\AMQP\Message\ConsumerMessage; use Ody\AMQP\Message\Result; use PhpAmqpLib\Message\AMQPMessage; #[Consumer( exchange: 'user_events', routingKey: 'user.created', queue: 'welcome_email_queue', type: 'topic' )] class WelcomeEmailConsumer extends ConsumerMessage { public function consumeMessage(array $data, AMQPMessage $message): Result { try { // Process the message $userId = $data['user_id']; $email = $data['email']; $username = $data['username']; // Send welcome email // $this->emailService->sendWelcomeEmail($email, $username); // Log success error_log("Welcome email sent to $email for user $userId"); // Acknowledge the message return Result::ACK; } catch (\Exception $e) { // Log the error error_log("Failed to send welcome email: " . $e->getMessage()); // Reject and requeue the message for retry return Result::REQUEUE; } } }
Publishing Messages
To publish messages in your application code:
use Ody\AMQP\AMQP; use App\Producers\UserCreatedProducer; // In your controller or service public function registerUser(array $userData) { // Create user in database $user = $this->userRepository->create($userData); // Publish event AMQP::publish(UserCreatedProducer::class, [ $user->id, // userId $user->email, // email $user->username // username ]); return $user; }
Delayed Messages
You can publish messages with a delay:
// Send a reminder after 24 hours AMQP::publishDelayed(ReminderProducer::class, [ $user->id, 'Your trial is about to expire' ], 86400000); // 24 hours in milliseconds
Working with Topic Exchanges
Topic exchanges provide flexible routing:
// Producer #[Producer(exchange: 'notifications', routingKey: 'user.123.email', type: 'topic')] class UserNotificationProducer extends ProducerMessage { /* ... */ } // Consumer with wildcards #[Consumer( exchange: 'notifications', routingKey: 'user.*.email', // Match any user ID queue: 'email_notifications', type: 'topic' )] class EmailNotificationConsumer extends ConsumerMessage { /* ... */ }
Message Results
Consumers should return one of these results:
Result::ACK
: Acknowledge the message (successfully processed)Result::NACK
: Negative acknowledgment (failed to process, don't requeue)Result::REQUEUE
: Reject and requeue the message (retry later)Result::DROP
: Reject the message and drop it (don't retry)
public function consumeMessage(array $data, AMQPMessage $message): Result { try { // Process message return Result::ACK; } catch (\Exception $e) { // Decide based on error type if ($e instanceof TemporaryFailureException) { return Result::REQUEUE; // Retry later } // Permanent failure return Result::DROP; // Don't retry } }
Monitoring and Management
RabbitMQ provides a management UI available at http://localhost:15672/
(default username/password is the one you
configured).
You can use this interface to:
- Monitor queues and exchanges
- View message rates and consumer activity
- Manage exchanges, queues, and bindings
- View connections and channels
Connection Pool Monitoring
To monitor the connection pool, you can use:
// Get connection pool statistics $connectionStats = \Ody\AMQP\AMQPConnectionPool::getStats(); // Get channel pool statistics $channelStats = \Ody\AMQP\AMQPChannelPool::getStats();
This returns information about active connections, their state, and channel distribution.
For more advanced usage and configuration options, refer to the full documentation or explore the RabbitMQ official documentation.