thesis / amqp
Async (fiber based) client for AMQP 0.9.1
Fund package maintenance!
www.tinkoff.ru/cf/5MqZQas2dk7
Requires
- php: ^8.3
- ext-filter: *
- amphp/amp: ^3.0
- amphp/pipeline: ^1.2
- amphp/socket: ^2.3
- revolt/event-loop: ^1.0
- thesis/amp-bridge: ^0.1.0
- thesis/byte-buffer: ^0.1.0
- thesis/byte-order: ^0.2.0
- thesis/byte-reader: ^0.3.1
- thesis/byte-reader-writer: ^0.1.0
- thesis/byte-writer: ^0.2.1
- thesis/endian: ^0.1.0
Requires (Dev)
- bamarni/composer-bin-plugin: ^1.8
- ergebnis/composer-normalize: ^2.45
- phpunit/phpunit: ^11.5
- symfony/var-dumper: ^7.2
This package is auto-updated.
Last update: 2025-05-24 05:56:28 UTC
README
Pure asynchronous (fiber based) strictly typed full-featured PHP driver for AMQP 0.9.1 protocol.
Features
- Full support for AMQP 0.9.1
- Supports AMQP uri specification
- Publish messages in batch
- Automatic acknowledges processing in Publisher Confirms mode
- Automatic returns processing in Publisher Confirms mode and mandatory flag enabled
- A more convenient way to work with transactions
- Consume messages as concurrent iterator
- Consume messages in batch
Installation
composer require thesis/amqp
Usage
- Configuration
- Client
- Channel
- Publish a message
- Safety
- Consume a batch of messages
- Confirms
- Explicit returns
- License
Configuration
Configuration can be created from dsn, that follows the amqp uri spec.
<?php declare(strict_types=1); use Thesis\Amqp\Config; $config = Config::fromURI('amqp://guest:guest@localhost:5672/');
Multiple addresses are supported. The client will connect to the first available amqp server host.
<?php declare(strict_types=1); use Thesis\Amqp\Config; $config = Config::fromURI('amqp://guest:guest@localhost:5672,localhost:5673/');
From array (for example, if you keep the configuration of your application as an array).
<?php declare(strict_types=1); use Thesis\Amqp\Config; $config = Config::fromArray([ 'scheme' => 'amqp', 'urls' => ['localhost:5672'], 'user' => 'guest', 'password' => 'guest', ]);
From primary constructor.
<?php declare(strict_types=1); use Thesis\Amqp\Config; $config = new Config( urls: ['localhost:5672'], user: 'guest', vhost: '/test', authMechanisms: ['plain', 'amqplain'], );
If the original amqp server settings remain unchanged, you can use Config::default()
.
<?php declare(strict_types=1); use Thesis\Amqp\Config; $config = Config::default(); // amqp://guest:guest@localhost:5672/
Client
The client is the connection facade to the amqp
server. It is responsible for connecting and disconnecting (also closing all channels) from the server.
It is not necessary to explicitly connect to work with the client. The connection will be established when the first channel is created.
<?php declare(strict_types=1); use Thesis\Amqp\Config; use Thesis\Amqp\Client; $client = new Client(Config::default()); // your code here $client->disconnect();
Channel
The new channel can be obtained only from the client.
<?php declare(strict_types=1); use Thesis\Amqp\Config; use Thesis\Amqp\Client; $client = new Client(Config::default()); $channel = $client->channel(); $channel->close(); $client->disconnect();
- If you are terminating an application, you don't have to call
$channel->close()
, because$client->disconnect()
will close all channels anyway. - However, you cannot leave channels open during the life of the application without using them – otherwise you may exhaust the open channel limit from the
channel_max
setting. - After closing a channel yourself or getting a
Thesis\Amqp\Exception\ChannelWasClosed
exception, you cannot use the channel – open a new one.
Publish a message
There are notable changes here compared to other libraries.
- First, the message is an object.
- Secondly, all special headers like
correlationId
,expiration
,messageId
and so on are placed in the properties of this object, so you don't have to pass them through user headers and remember how keys should be named.
<?php declare(strict_types=1); use Thesis\Amqp\Config; use Thesis\Amqp\Client; use Thesis\Amqp\Message; use Thesis\Amqp\DeliveryMode; use Thesis\Time\TimeSpan; $client = new Client(Config::default()); $channel = $client->channel(); $channel->publish(new Message( body: '...', headers: ['x' => 'y'], contentType: 'application/json', contentEncoding: 'json', deliveryMode: DeliveryMode::Persistent, expiration: TimeSpan::fromSeconds(5), ));
Safety
It is safe to call nack/reject
after ack
or competitively. Operations will be ordered and processed only once.
For example, you want to call nack
on any error and ack
only on successful cases. Then you can write the code as follows:
<?php declare(strict_types=1); use Thesis\Amqp\Config; use Thesis\Amqp\Client; use Thesis\Amqp\DeliveryMessage; use Thesis\Amqp\Channel; $client = new Client(Config::default()); $channel = $client->channel(); $handler = function (DeliveryMessage $delivery) use ($httpclient): void { // handle the delivery with an \Exception $delivery->nack(); }; $delivery = $channel->get('test'); \assert($delivery !== null); try { $handler($delivery); } finally { $delivery->ack(); }
Here ack
in finally
block will only be sent if neither nack
, reject
, nor ack
in the $handler
is called.
Consume a batch of messages
Although AMQP doesn't have a native way to receive messages in batches, we can achieve this using two operations — basic.qos(count: N)
and basic.ack(multiple: true)
on the last message. basic.qos
limits the number of messages the AMQP server can push to our consumer, and this number should match the batch size.
basic.ack(multiple: true)
allows us to send a single acknowledgment for the entire batch. You don’t need to implement this yourself — it's included with this library.
Simply use Channel::consumeBatch
and pass a callback. As an argument, you’ll receive a ConsumeBatch
instance, on which you can call ack
or nack
.
Note that you don’t need to call these functions on individual DeliveryMessage
— only on the ConsumeBatch
!
However, since it may take a while to fill a batch, you can specify a timeout
. This way, you'll receive a non-empty batch either when the required number of messages is collected or when the timer expires — whichever comes first.
See the example: you'll see two batches there — one will arrive immediately because the queue already contains enough messages and the second will arrive after a 1-second wait, consisting of just 3 messages.
Since basic.qos(count: N)
is a crucial requirement for implementing batching, the consumeBatch
and consumeBatchIterator
methods call it automatically.
You don’t need to call Channel::qos
yourself!
Confirms
There are notable changes here compared to other libraries. Instead of a callback api through which you could handle confirmations,
you get a PublishConfirmation
object that can be waited on in non-blocking mode via await
.
<?php declare(strict_types=1); use Thesis\Amqp\Client; use Thesis\Amqp\Message; use Thesis\Amqp\Config; require_once __DIR__ . '/../../vendor/autoload.php'; $client = new Client(Config::default()); $channel = $client->channel(); $channel->confirmSelect(); $confirmation = $channel->publish(new Message('...'), routingKey: 'test'); var_dump($confirmation?->await()); $client->disconnect();
The PublishConfirmation::await
will return PublishResult
enum that can be in one of the Acked, Nacked, Canceled, Waiting, Unrouted
states.
Since confirmations can return in batches, there is no need to wait for each confirmation in turn. Instead, you can publish many messages and wait for a confirmation at the end. If you are lucky, the amqp server will return multiple confirmations, or even one for the entire batches.
<?php declare(strict_types=1); use Thesis\Amqp\Client; use Thesis\Amqp\PublishConfirmation; use Thesis\Amqp\Message; use Thesis\Amqp\Config; require_once __DIR__ . '/../../vendor/autoload.php'; $client = new Client(Config::fromURI('amqp://thesis:secret@localhost:5673/')); $channel = $client->channel(); $channel->confirmSelect(); $confirmations = []; for ($i = 0; $i < 100; ++$i) { $confirmation = $channel->publish(new Message('...'), routingKey: 'test'); \assert($confirmation !== null); $confirmations[] = $confirmation; } PublishConfirmation::awaitAll($confirmations); $client->disconnect();
Explicit returns
In AMQP messaging system it’s possible for a published message to have no destination. This is acceptable in some scenarios such as the Publish-Subscribe pattern, where it’s fine for events to go unhandled, but not in others. For example, in the Command pattern every message is expected to be processed.
To detect and react to such delivery failures, you must publish messages with the mandatory
flag enabled. This tells the AMQP server to return any message that cannot be routed to at least one queue.
However, there’s a challenge: returned messages are delivered asynchronously via a separate thread (not the OS thread) and are not associated with the original publishing request. This means the publisher has no immediate way of knowing whether a message was routed or returned. In some cases, you may want to know this synchronously, so that you can:
- Log the message;
- Store the message in the DB;
- Automatically declare the required topology (e.g., queues or bindings) and republish.
To support this use case, the library provides a mechanism based on publisher confirms
and a custom header:
- Enable
publisher confirm
mode; - Set the
mandatory
flag when publishing.
The library will add a special header X-Thesis-Mandatory-Id
. This allows the library to correlate any returned message with its original publish request.
If the message is unroutable, the library will return PublishResult::Unrouted
.
<?php declare(strict_types=1); use Thesis\Amqp\Client; use Thesis\Amqp\Config; use Thesis\Amqp\Message; use Thesis\Amqp\PublishResult; require_once __DIR__ . '/../vendor/autoload.php'; $client = new Client(Config::fromURI('amqp://thesis:secret@localhost:5673')); $channel = $client->channel(); $channel->confirmSelect(); $confirmation = $channel->publish(new Message('abz'), routingKey: 'xxx', mandatory: true); if ($confirmation?->await() === PublishResult::Unrouted) { // handle use case }
This mechanism only works if
publisher confirms
are enabled. Without them the library cannot track which messages were successfully published to queues, because no frame will receive.
License
The MIT License (MIT). Please see License File for more information.