albertwill/yii2-rabbitmq

Maintained fork of mikemadisonweb/yii2-rabbitmq that fixes duplicate multi-connection registration issues and improves worker process robustness.

Installs: 48

Dependents: 0

Suggesters: 0

Security: 0

Stars: 0

Watchers: 0

Forks: 33

Type:yii2-extension

pkg:composer/albertwill/yii2-rabbitmq

v2.9.3 2025-12-30 09:21 UTC

This package is auto-updated.

Last update: 2025-12-30 09:23:00 UTC


README

Wrapper based on php-amqplib library to incorporate messaging in your Yii2 application via RabbitMQ. Inspired by RabbitMqBundle for Symfony framework.

Note: This is a fork of mikemadisonweb/yii2-rabbitmq with the following enhancements:

  • Multi-connection isolation: Each connection independently manages its queues, exchanges, and bindings
  • Automatic reconnection: Consumer and Producer automatically handle connection failures and reconnect
  • Semaphore support: Control concurrent consumer instances in Kubernetes auto-scaling scenarios

This documentation is relevant for the version 2.*, which require PHP version >=7.0. For legacy PHP applications >=5.4 please use previous version of this extension.

Latest Stable Version License Build Status Coverage Status FOSSA Status

Installation

The preferred way to install this extension is through composer.

Either run

php composer.phar require albertwill/yii2-rabbitmq

or add

"albertwill/yii2-rabbitmq": "^2.8"

to the require section of your composer.json file.

Configuration

This extension facilitates the creation of RabbitMQ producers and consumers to meet your specific needs.

Note: If you plan to use Semaphore feature, you need to configure both Redis component and application ID:

Required Configuration for Semaphore:

The Yii::$app->id is used to generate unique semaphore keys to avoid conflicts between different projects sharing the same Redis instance. Both Redis component and Yii::$app->id are required for Semaphore feature.

// config/web.php or config/console.php
return [
    'id' => 'my_app_id',  // Required for Semaphore feature
    'name' => 'My Application',
    
    'components' => [
        'redis' => [
            'class' => 'yii\redis\Connection',
            'hostname' => 'localhost',
            'port' => 6379,
            'database' => 0,
            // 'password' => 'your_password',  // if needed
        ],
        // ... other components ...
    ],
    // ... other config ...
];

Important: If Yii::$app->id is not set or empty, the Semaphore feature will throw an InvalidConfigException. The semaphore key is automatically generated as: rabbitmq:semaphore:{app_id}:{consumer_name} to ensure uniqueness across different projects and consumers.

This is an example basic config:

<?php
return [
    // should be in common.php
    'components'    => [
        // ...
        'rabbitmq' => [
            'class' => \mikemadisonweb\rabbitmq\Configuration::class,
            'connections' => [
                [
                    // You can pass these parameters as a single `url` option: https://www.rabbitmq.com/uri-spec.html
                    'host' => 'YOUR_HOSTNAME',
                    'port' => '5672',
                    'user' => 'YOUR_USERNAME',
                    'password' => 'YOUR_PASSWORD',
                    'vhost' => '/',
                ]
                // When multiple connections is used you need to specify a `name` option for each one and define them in producer and consumer configuration blocks.
                // Each connection will independently manage its queues, exchanges, and bindings.
            ],
            'exchanges' => [
                [
                    'name' => 'YOUR_EXCHANGE_NAME',
                    'type' => 'direct'
                    // Refer to Defaults section for all possible options
                ],
            ],
            'queues' => [
                [
                    'name' => 'YOUR_QUEUE_NAME',
                    // Queue can be configured here the way you want it:
                    //'durable' => true,
                    //'auto_delete' => false,
                ],
                [
                    'name' => 'YOUR_ANOTHER_QUEUE_NAME',
                ],
            ],
            'bindings' => [
                [
                    'queue' => 'YOUR_QUEUE_NAME',
                    'exchange' => 'YOUR_EXCHANGE_NAME',
                    'routing_keys' => ['YOUR_ROUTING_KEY'],
                ],
            ],
            'producers' => [
                [
                    'name' => 'YOUR_PRODUCER_NAME',
                ],
            ],
            'consumers' => [
                [
                    'name' => 'YOUR_CONSUMER_NAME',
                    // Every consumer should define one or more callbacks for corresponding queues
                    'callbacks' => [
                        // queue name => callback class name
                        'YOUR_QUEUE_NAME' => \path\to\YourConsumer::class,
                    ],
                ],
            ],
        ],
    ],
];

To use this extension you should be familiar with the basic concepts of RabbitMQ. If you are not confident in your knowledge I suggest reading this article.

The 'callback' parameter can be a class name or a service name from dependency injection container. Starting from Yii version 2.0.11 you can configure your container like this:

<?php
use yii\di\Instance;

return [
    // ...
    'container' => [
        'definitions' => [],
        'singletons' => [
            'rabbitmq.import-data.consumer' => [
                [
                    'class' => \path\to\YourConsumer::class,
                ],
                [
                    // If dependency is needed
                    'some-dependency' => Instance::of('dependency-service-name'),
                ],
            ],
        ],
    ],
];

If you need several consumers you can list respective entries in the configuration, but that would require a separate worker(daemon process) for each of that consumers. While it can be absolutely fine in some cases if you are dealing with small queues which consuming messages really fast you may want to group them into one worker. So just list your callbacks in consumer config and one worker will perform your business logic on multiple queues.

Be sure that all queues and exchanges are defined in corresponding bindings, it's up to you to set up correct message routing.

Lifecycle events

There are also some lifecycle events implemented: before_consume, after_consume, before_publish, after_publish. You can use them for any additional work you need to do before or after message been consumed/published. For example, make sure that Yii knows the database connection has been closed by a timeout as a consumer is a long-running process:

<?php
// config/main.php
return [
    // ...
    'components'    => [
        // ...
        'rabbitmq'  => [
            // ...
            'on before_consume' => function ($event) {
                if (\Yii::$app->has('db') && \Yii::$app->db->isActive) {
                    try {
                        \Yii::$app->db->createCommand('SELECT 1')->query();
                    } catch (\yii\db\Exception $exception) {
                        \Yii::$app->db->close();
                    }
                }
            },
        ],
        // ...
    ],
];

Logger

Last but not least is logger configuration which is also optional:

<?php
// config/main.php
return [
    // ...
    'components'    => [
        // ...
        'rabbitmq'  => [
            // ...
            'logger' => [
                'log' => true,
                'category' => 'application',
                'print_console' => false,
                'system_memory' => false,
            ],
        ],
        // ...
    ],
];

Logger disabled by default. When enabled it will log messages into main application log or to your own log target if you specify corresponding category name. Option 'print_console' gives you additional information while debugging a consumer in you console.

Example

Simple setup of Yii2 basic template with the RabbitMQ extension is available here. Feel free to experiment with it and debug your existing configuration in an isolated manner.

Console commands

Extension provides several console commands:

  • rabbitmq/consume - Run a consumer
  • rabbitmq/declare-all - Create RabbitMQ exchanges, queues and bindings based on configuration
  • rabbitmq/declare-exchange - Create the exchange listed in configuration
  • rabbitmq/declare-queue - Create the queue listed in configuration
  • rabbitmq/delete-all - Delete all RabbitMQ exchanges and queues that is defined in configuration
  • rabbitmq/delete-exchange - Delete the exchange
  • rabbitmq/delete-queue - Delete the queue
  • rabbitmq/publish - Publish a message from STDIN to the queue
  • rabbitmq/purge-queue - Delete all messages from the queue

To start a consumer:

yii rabbitmq/consume YOUR_CONSUMER_NAME

In this case, you can use process control system, like Supervisor, to restart consumer process and this way keep your worker run continuously.

Message limit

As PHP daemon especially based upon a framework may be prone to memory leaks, it may be reasonable to limit the number of messages to consume and stop:

--memoryLimit, -l:  (defaults to 0)
--messagesLimit, -m:  (defaults to 0)

Auto-declare

By default extension configured in auto-declare mode, which means that on every message published exchanges, queues and bindings will be checked and created if missing. If performance means much to your application you should disable that feature in configuration and use console commands to declare and delete routing schema by yourself.

Usage

As the consumer worker will read messages from the queue, execute a callback method and pass a message to it.

Consume

In order a class to become a callback it should implement ConsumerInterface:

<?php

namespace components\rabbitmq;

use mikemadisonweb\rabbitmq\components\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class YourConsumer implements ConsumerInterface
{
    /**
     * @param AMQPMessage $msg
     * @return bool
     */
    public function execute(AMQPMessage $msg)
    {
        $data = $msg->body;
        // Apply your business logic here
        
        return ConsumerInterface::MSG_ACK;
    }
}

You can publish any data type(object, int, array etc), despite the fact that RabbitMQ will transfer payload as a string here in consumer $msg->body your data will be of the same type it was sent.

Return codes

As for the return codes there is a bunch of them in order for you to control following processing of the message by the broker:

  • ConsumerInterface::MSG_ACK - Acknowledge message (mark as processed) and drop it from the queue
  • ConsumerInterface::MSG_REJECT - Reject and drop message from the queue
  • ConsumerInterface::MSG_REJECT_REQUEUE - Reject and requeue message in RabbitMQ

Publish

Here is an example how you can publish a message:

$producer = \Yii::$app->rabbitmq->getProducer('YOUR_PRODUCER_NAME');
$msg = serialize(['dataset_id' => 657, 'linked_datasets' => []]);
$producer->publish($msg, 'YOUR_EXCHANGE_NAME', 'YOUR_ROUTING_KEY');

Routing key as third parameter is optional, which can be the case for fanout exchanges.

By default connection to broker only get established upon publishing a message, it would not try to connect on each HTTP request if there is no need to.

Automatic Reconnection

Both Consumer and Producer support automatic reconnection when connection failures occur. You can configure reconnection behavior:

'producers' => [
    [
        'name' => 'YOUR_PRODUCER_NAME',
        'max_reconnect_attempts' => 3,  // Maximum reconnection attempts (default: 3)
        'reconnect_delay' => 2,         // Delay between reconnection attempts in seconds (default: 2)
    ],
],
'consumers' => [
    [
        'name' => 'YOUR_CONSUMER_NAME',
        'callbacks' => [
            'YOUR_QUEUE_NAME' => \path\to\YourConsumer::class,
        ],
        'max_reconnect_attempts' => 3,  // Maximum reconnection attempts (default: 3)
        'reconnect_delay' => 2,         // Delay between reconnection attempts in seconds (default: 2)
    ],
],

When a connection failure is detected, the extension will automatically:

  • Close the old connection
  • Attempt to reconnect (up to max_reconnect_attempts times)
  • Wait reconnect_delay seconds between attempts
  • Retry the operation after successful reconnection

For Consumers, the reconnection process preserves the semaphore (if configured) to maintain proper concurrency control.

Semaphore Support (Kubernetes Auto-scaling)

This extension supports semaphore-based concurrency control, which is useful in Kubernetes auto-scaling scenarios to limit the number of concurrent consumer instances.

What is Semaphore? Semaphore is a distributed concurrency control mechanism that uses Redis to coordinate multiple consumer processes. It ensures that no more than a specified number of consumer instances are running simultaneously, which is crucial in containerized environments like Kubernetes where auto-scaling can create multiple pod instances.

How it works:

  1. When a consumer starts, it attempts to acquire a semaphore slot from Redis
  2. If the limit is reached, the consumer waits (based on acquire_sleep configuration)
  3. Once acquired, the consumer runs and periodically sends heartbeats to refresh the TTL
  4. When the consumer stops (normally or due to error), it releases the semaphore slot
  5. Other waiting consumers can then acquire the released slot

Configuration Example:

'rabbitmq' => [
    // ... other config ...
    'semaphore' => [
        'type' => \mikemadisonweb\rabbitmq\components\semaphore\HashSemaphore::class,  // or IncrSemaphore::class
        'redis_component_name' => 'redis',  // Redis component name in Yii2
        'limit' => 10,                      // Maximum concurrent consumers (default: -1, disabled)
        'ttl' => 300,                       // Semaphore TTL in seconds (default: 300)
        'acquire_sleep' => 60,              // Sleep interval when acquisition fails in seconds (default: 60)
    ],
    'consumers' => [
        [
            'name' => 'YOUR_CONSUMER_NAME',
            'callbacks' => [
                'YOUR_QUEUE_NAME' => \path\to\YourConsumer::class,
            ],
            'semaphore' => [
                'limit' => 5,  // Override global limit for this consumer
                'ttl' => 600,  // Override global TTL for this consumer
            ],
        ],
    ],
],

Semaphore Types:

  1. HashSemaphore (Recommended for most cases)

    • Uses Redis Set to track unique tokens for each consumer instance
    • Each consumer gets a unique token, allowing better tracking and debugging
    • Better for scenarios where you need to identify which specific instances are running
    • Slightly more Redis memory usage but provides better observability
  2. IncrSemaphore (Better performance)

    • Uses Redis INCR/DECR to maintain a simple counter
    • Lower memory footprint and slightly better performance
    • Suitable for high-throughput scenarios where instance tracking is not needed
    • Simpler implementation, less Redis operations

Configuration Parameters:

Parameter Type Default Description
type string HashSemaphore::class Semaphore implementation class (HashSemaphore::class or IncrSemaphore::class)
redis_component_name string 'redis' Name of the Redis component in Yii2 application
limit int -1 Maximum concurrent consumer instances. When <= 0, semaphore is disabled
ttl int 300 Time-to-live in seconds. Semaphore key expires after this time if no heartbeat
acquire_sleep int 60 Sleep interval (seconds) when semaphore acquisition fails. Minimum value is 1

Semaphore Key Format: The semaphore key is automatically generated as: rabbitmq:semaphore:{app_id}:{consumer_name} to avoid conflicts between different projects and consumers.

Important: The {app_id} part comes from Yii::$app->id, which must be configured in your application config. If it's not set, the Semaphore feature will throw an InvalidConfigException. This ensures that different applications sharing the same Redis instance don't interfere with each other's semaphore keys.

Example:

  • If Yii::$app->id = 'my-app' and consumer name is 'import-consumer'
  • The semaphore key will be: rabbitmq:semaphore:my-app:import-consumer

Lifecycle:

  • Acquire: Consumer attempts to acquire a semaphore slot when it starts
  • Heartbeat: Consumer periodically sends heartbeats to refresh the TTL (prevents expiration due to long-running processes)
  • Release: Consumer releases the semaphore slot when it stops (normal exit or error)

Important Notes:

  • When limit <= 0, semaphore control is disabled
  • The semaphore is automatically acquired when the consumer starts and released when it stops
  • During reconnection, the semaphore state is preserved to maintain proper concurrency control
  • If a consumer crashes without releasing the semaphore, it will expire after ttl seconds
  • acquire_sleep must be at least 1 second. The system will continuously retry acquiring the semaphore until successful or interrupted by a signal

Recommended Configuration Strategy for acquire_sleep:

The acquire_sleep parameter controls how long a consumer waits before retrying when it fails to acquire a semaphore. Proper configuration is crucial to balance Redis load and consumer responsiveness.

Performance Considerations:

  • Based on performance tests, Redis can handle approximately 1000-1100 semaphore operations per second
  • Each acquire() operation takes about 0.9-1.0 milliseconds
  • The system automatically adds ±20% random jitter to acquire_sleep to prevent thundering herd problems

Calculation Formula:

Minimum acquire_sleep = (Total Workers - Limit) / Redis Capacity
Recommended acquire_sleep = Minimum × 1.5 to 2.0 (safety margin)

Where:

  • Total Workers: Estimated total number of consumer processes
  • Limit: Semaphore limit configured
  • Redis Capacity: ~1000 operations/second (conservative estimate)

Configuration Recommendations by Scale:

Total Workers Limit Recommended acquire_sleep Notes
< 1,000 Any 5-10 seconds Small scale, low Redis load
1,000 - 5,000 < 500 10-20 seconds Medium scale
5,000 - 10,000 < 1,000 15-30 seconds Large scale, need to control retry frequency
> 10,000 < 2,000 20-60 seconds Very large scale, must strictly control

Example Scenarios:

  1. 10,000 workers, limit = 100:

    • Waiting workers: 9,900
    • Minimum: 9,900 / 1000 ≈ 10 seconds
    • Recommended: 15-30 seconds
    • With ±20% jitter: actual wait time ranges from 12-36 seconds (for 15s) or 24-72 seconds (for 30s)
  2. 10,000 workers, limit = 1,000:

    • Waiting workers: 9,000
    • Minimum: 9,000 / 1000 = 9 seconds
    • Recommended: 10-20 seconds
  3. 10,000 workers, limit = 5,000:

    • Waiting workers: 5,000
    • Minimum: 5,000 / 1000 = 5 seconds
    • Recommended: 5-15 seconds

Best Practices:

  • Monitor Redis QPS: Adjust acquire_sleep based on actual Redis load monitoring
  • Consider Heartbeat Operations: Remember that heartbeat operations also consume Redis capacity
  • Network Latency: If Redis is in a remote network, consider increasing acquire_sleep slightly
  • Dynamic Adjustment: Adjust based on actual runtime conditions rather than fixed values
  • Random Jitter: The system automatically adds ±20% random jitter to spread retry requests and reduce Redis pressure spikes

Special Cases:

  • Fast Response Required: Use smaller values (1-10 seconds) but ensure Redis load is acceptable
  • Resource Constrained: If Redis performance is limited or network latency is high, use larger values (30-60 seconds)
  • Test Environment: Can use minimum value (1 second) for faster testing

Use Cases:

  • Kubernetes Auto-scaling: Limit concurrent consumer pods to prevent resource exhaustion
  • Resource Management: Control database connections or API rate limits
  • Cost Optimization: Limit concurrent processing to stay within cloud service quotas

Options

All configuration options:

$rabbitmq_defaults = [
        'auto_declare' => true,
        'connections' => [
            [
                'name' => self::DEFAULT_CONNECTION_NAME,
                'type' => AMQPLazyConnection::class,
                'url' => null,
                'host' => null,
                'port' => 5672,
                'user' => 'guest',
                'password' => 'guest',
                'vhost' => '/',
                'connection_timeout' => 3,
                'read_write_timeout' => 3,
                'ssl_context' => null,
                'keepalive' => false,
                'heartbeat' => 0,
                'channel_rpc_timeout' => 0.0
            ],
        ],
        'exchanges' => [
            [
                'name' => null,
                'type' => null,
                'passive' => false,
                'durable' => true,
                'auto_delete' => false,
                'internal' => false,
                'nowait' => false,
                'arguments' => null,
                'ticket' => null,
            ],
        ],
        'queues' => [
            [
                'name' => '',
                'passive' => false,
                'durable' => true,
                'exclusive' => false,
                'auto_delete' => false,
                'nowait' => false,
                'arguments' => null,
                'ticket' => null,
            ],
        ],
        'bindings' => [
            [
                'exchange' => null,
                'queue' => null,
                'to_exchange' => null,
                'routing_keys' => [],
            ],
        ],
        'producers' => [
            [
                'name' => null,
                'connection' => self::DEFAULT_CONNECTION_NAME,
                'safe' => true,
                'content_type' => 'text/plain',
                'delivery_mode' => 2,
                'serializer' => 'serialize',
                'max_reconnect_attempts' => 3,  // Maximum reconnection attempts
                'reconnect_delay' => 2,           // Delay between reconnection attempts (seconds)
            ],
        ],
        'consumers' => [
            [
                'name' => null,
                'connection' => self::DEFAULT_CONNECTION_NAME,
                'callbacks' => [],
                'qos' => [
                    'prefetch_size' => 0,
                    'prefetch_count' => 0,
                    'global' => false,
                ],
                'idle_timeout' => 60,            // Idle timeout in seconds
                'idle_timeout_exit_code' => null, // Exit code when idle timeout occurs
                'proceed_on_exception' => false,
                'max_reconnect_attempts' => 3,     // Maximum reconnection attempts
                'reconnect_delay' => 2,           // Delay between reconnection attempts (seconds)
                'deserializer' => 'unserialize',
                'semaphore' => [
                    'type' => null,              // Semaphore type (must be a subclass of Semaphore)
                    'limit' => null,             // Semaphore limit (must be positive integer if used)
                    'ttl' => null,               // Semaphore TTL in seconds
                    'acquire_sleep' => null,     // Sleep interval when acquisition fails in seconds
                ],
            ],
        ],
        'logger' => [
            'log' => false,
            'category' => 'application',
            'print_console' => true,
            'system_memory' => false,
        ],
        'semaphore' => [
            'type' => HashSemaphore::class,      // Default semaphore type
            'redis_component_name' => 'redis',   // Redis component name
            'limit' => -1,                       // Default limit (-1 means disabled)
            'ttl' => 300,                        // Default TTL in seconds
            'acquire_sleep' => 60,               // Default sleep interval in seconds
        ],
    ];
Exchange

For example, to declare an exchange you should provide name and type for it.

parameter required type default comments
name yes string The exchange name consists of a non-empty sequence of these characters: letters, digits, hyphen, underscore, period, or colon.
type yes string Type of the exchange, possible values are direct, fanout, topic and headers.
passive no boolean false If set to true, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not. The client can use this to check whether an exchange exists without modifying the server state. When set, all other method fields except name and no-wait are ignored. A declare with both passive and no-wait has no effect.
durable no boolean false Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.
auto_delete no boolean true If set to true, the exchange would be deleted when no queues are bound to it anymore.
internal no boolean false Internal exchange may not be used directly by publishers, but only when bound to other exchanges.
nowait no boolean false Client may send next request immediately after sending the first one, no waiting for the reply is required
arguments no array null A set of arguments for the declaration.
ticket no integer null Access ticket

Good use-case of the arguments parameter usage can be a creation of a dead-letter-exchange.

Queue

As for the queue declaration, all parameters are optional. Even if you do not provide a name for your queue server will generate a unique name for you:

parameter required type default comments
name no string '' The queue name can be empty, or a sequence of these characters: letters, digits, hyphen, underscore, period, or colon.
passive no boolean false If set to true, the server will reply with Declare-Ok if the queue already exists with the same name, and raise an error if not.
durable no boolean false Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts.
auto_delete no boolean true If set to true, the queue is deleted when all consumers have finished using it.
exclusive no boolean false Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.
nowait no boolean false Client may send next request immediately after sending the first one, no waiting for the reply is required
arguments false array null A set of arguments for the declaration.
ticket no integer null Access ticket

A complete explanation about options, their defaults, and valuable details can be found in AMQP 0-9-1 Reference Guide.

Beware that not all these options are allowed to be changed 'on-the-fly', in other words after queue or exchange had already been created. Otherwise, you will receive an error.

Breaking Changes

Since version 1.* this extension was completely rewritten internally and can be considered brand new. However, the following key differences can be distinguished:

  • PHP version 7.0 and above required
  • Configuration format changed
  • All extension components get automatically loaded using Yii2 Bootstraping
  • Different connection types supported
  • All extension components are registered in DIC as singletons
  • Routing component added to control schema in broker
  • Queue and exchange default options changed
  • Console commands are joined into one controller class which is added automatically and doesn't need to be configured
  • New console commands added to manipulate with routing schema
  • All data types are supported for message payload
  • Consumer handles control signals in a predictable manner

License

FOSSA Status