ody/amqp

AMQP module for ODY framework

0.2.0 2025-04-06 18:22 UTC

This package is auto-updated.

Last update: 2025-04-06 18:54:47 UTC


README

This package provides RabbitMQ integration for the ODY framework, allowing you to easily implement asynchronous messaging patterns in your application.

Installation

composer require ody/amqp

Configuration

First, publish the configuration file:

php ody publish:config ody/amqp

This will create a config/amqp.php file where you can configure your RabbitMQ connections:

return [
    'enable' => true,
    
    'default' => [
        'host' => env('RABBITMQ_HOST', 'localhost'),
        'port' => env('RABBITMQ_PORT', 5672),
        'user' => env('RABBITMQ_USER', 'guest'),
        'password' => env('RABBITMQ_PASSWORD', 'guest'),
        'vhost' => env('RABBITMQ_VHOST', '/'),
        
        'concurrent' => [
            'limit' => 10,  // Max concurrent consumers per process
        ],
        
        'params' => [
            'connection_timeout' => 3.0,
            'read_write_timeout' => 3.0,
            'heartbeat' => 60,
            'keepalive' => true,
        ],
    ],
    
    // You can define multiple connection pools
    'analytics' => [
        'host' => 'analytics-rabbitmq',
        // Other connection settings
    ],
    
    // Connection pooling configuration
    'pool' => [
        'enable' => true,
        'max_connections' => 20,
        'max_channels_per_connection' => 20,
        'max_idle_time' => 60,  // seconds
    ],
    
    'producer' => [
        'paths' => ['app/Producers'],
        'retry' => [
            'max_attempts' => 3,
            'initial_interval' => 1000,  // ms
            'multiplier' => 2.0,
            'max_interval' => 10000,  // ms
        ],
    ],
    
    'consumer' => [
        'paths' => ['app/Consumers'],
        'prefetch_count' => 10,
        'auto_declare' => true,
    ],
    
    'process' => [
        'enable' => true,
        'max_consumers' => 10,
        'auto_restart' => true,
    ]
];

Setting Up Docker

For local development, you can use Docker to run RabbitMQ:

docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=password \
  rabbitmq:3-management

Or use docker-compose:

version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3-management
    container_name: rabbitmq
    ports:
      - "5672:5672"   # AMQP protocol port
      - "15672:15672" # Management UI port
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=password
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
      - rabbitmq_log:/var/log/rabbitmq

volumes:
  rabbitmq_data:
  rabbitmq_log:

Connection Pooling

The framework implements connection pooling to optimize RabbitMQ connections by reusing existing connections and channels instead of creating new ones for each operation. This significantly improves performance in high-throughput scenarios.

Pool Configuration

Configure the connection pool in your config/amqp.php file:

'pool' => [
    'enable' => true,                   // Enable/disable connection pooling
    'max_connections' => 20,            // Maximum connections in the pool
    'max_channels_per_connection' => 20,// Maximum channels per connection
    'max_idle_time' => 60,              // Maximum idle time in seconds
],

Usage

Connection pooling works transparently with the existing AMQP API:

// The publish method automatically uses pooled connections
AMQP::publish(UserCreatedProducer::class, [
    $user->id,
    $user->email,
    $user->username
]);

// For advanced usage, you can directly access pooled resources
$connection = AMQP::getPooledConnection();
$channel = AMQP::getPooledChannel();

Creating Producers

Producers send messages to RabbitMQ. Create a producer class in your app/Producers directory:

<?php

namespace App\Producers;

use Ody\AMQP\Attributes\Producer;
use Ody\AMQP\Message\ProducerMessage;

#[Producer(exchange: 'user_events', routingKey: 'user.created', type: 'topic')]
class UserCreatedProducer extends ProducerMessage
{
    public function __construct(
        private int $userId,
        private string $email,
        private string $username
    ) {
        $this->payload = [
            'user_id' => $this->userId,
            'email' => $this->email,
            'username' => $this->username,
            'created_at' => date('Y-m-d H:i:s')
        ];
    }
}

Creating Consumers

Consumers receive and process messages from RabbitMQ. Create a consumer class in your app/Consumers directory:

<?php

namespace App\Consumers;

use Ody\AMQP\Attributes\Consumer;
use Ody\AMQP\Message\ConsumerMessage;
use Ody\AMQP\Message\Result;
use PhpAmqpLib\Message\AMQPMessage;

#[Consumer(
    exchange: 'user_events',
    routingKey: 'user.created',
    queue: 'welcome_email_queue',
    type: 'topic'
)]
class WelcomeEmailConsumer extends ConsumerMessage
{
    public function consumeMessage(array $data, AMQPMessage $message): Result
    {
        try {
            // Process the message
            $userId = $data['user_id'];
            $email = $data['email'];
            $username = $data['username'];
            
            // Send welcome email
            // $this->emailService->sendWelcomeEmail($email, $username);
            
            // Log success
            error_log("Welcome email sent to $email for user $userId");
            
            // Acknowledge the message
            return Result::ACK;
        } catch (\Exception $e) {
            // Log the error
            error_log("Failed to send welcome email: " . $e->getMessage());
            
            // Reject and requeue the message for retry
            return Result::REQUEUE;
        }
    }
}

Publishing Messages

To publish messages in your application code:

use Ody\AMQP\AMQP;
use App\Producers\UserCreatedProducer;

// In your controller or service
public function registerUser(array $userData)
{
    // Create user in database
    $user = $this->userRepository->create($userData);
    
    // Publish event
    AMQP::publish(UserCreatedProducer::class, [
        $user->id,              // userId
        $user->email,           // email
        $user->username         // username
    ]);
    
    return $user;
}

Delayed Messages

You can publish messages with a delay:

// Send a reminder after 24 hours
AMQP::publishDelayed(ReminderProducer::class, [
    $user->id,
    'Your trial is about to expire'
], 86400000); // 24 hours in milliseconds

Working with Topic Exchanges

Topic exchanges provide flexible routing:

// Producer
#[Producer(exchange: 'notifications', routingKey: 'user.123.email', type: 'topic')]
class UserNotificationProducer extends ProducerMessage { /* ... */ }

// Consumer with wildcards
#[Consumer(
    exchange: 'notifications',
    routingKey: 'user.*.email',  // Match any user ID
    queue: 'email_notifications',
    type: 'topic'
)]
class EmailNotificationConsumer extends ConsumerMessage { /* ... */ }

Message Results

Consumers should return one of these results:

  • Result::ACK: Acknowledge the message (successfully processed)
  • Result::NACK: Negative acknowledgment (failed to process, don't requeue)
  • Result::REQUEUE: Reject and requeue the message (retry later)
  • Result::DROP: Reject the message and drop it (don't retry)
public function consumeMessage(array $data, AMQPMessage $message): Result
{
    try {
        // Process message
        return Result::ACK;
    } catch (\Exception $e) {
        // Decide based on error type
        if ($e instanceof TemporaryFailureException) {
            return Result::REQUEUE;  // Retry later
        }
        
        // Permanent failure
        return Result::DROP;  // Don't retry
    }
}

Monitoring and Management

RabbitMQ provides a management UI available at http://localhost:15672/ (default username/password is the one you configured).

You can use this interface to:

  • Monitor queues and exchanges
  • View message rates and consumer activity
  • Manage exchanges, queues, and bindings
  • View connections and channels

Connection Pool Monitoring

To monitor the connection pool, you can use:

// Get connection pool statistics
$connectionStats = \Ody\AMQP\AMQPConnectionPool::getStats();

// Get channel pool statistics
$channelStats = \Ody\AMQP\AMQPChannelPool::getStats();

This returns information about active connections, their state, and channel distribution.

For more advanced usage and configuration options, refer to the full documentation or explore the RabbitMQ official documentation.