gdx / p-service-bus
- php: >=8.2
- ext-json: *
- ext-pcntl: *
- doctrine/instantiator: ^1.4 || ^2.0
- prewk/result: ^3.1.0
- psr/log: ^3.0|^2.0|^1.0
- ramsey/uuid: ^4.5
- symfony/console: *
- symfony/expression-language: *
Requires (Dev)
- aws/aws-sdk-php: ^3.285.0
- bunny/bunny: ^0.5.0
- doctrine/orm: ^2.10.1
- enqueue/dsn: ^0.10.8
- guzzlehttp/guzzle: ^7.3
- php-standard-library/psalm-plugin: ^2.2.1
- phpunit/phpunit: ^10.2
- react/async: ^v3.0 || ^4.0
- rector/rector: ^1.0.0
- roave/security-advisories: dev-master
- symfony/cache: *
- vimeo/psalm: ^6.0.0
- aws/aws-sdk-php: Allows to use SQS and/or SNS
- bunny/bunny: Allows to use RabbitMq as transport
- doctrine/orm: If you want to use it with Doctrine.
- enqueue/dsn: Allows to use SQS and/or SNS
- react/async: Allows to use RabbitMq as transport
- dev-master
- 2.2.1
- 2.2.0
- 2.1.1
- 2.1.0
- 2.0.4
- 2.0.3
- 2.0.2
- 2.0.1
- 2.0.0
- 1.13.0
- 1.12.1
- 1.12.0
- 1.11.0
- 1.10.1
- 1.10.0
- 1.9.0
- 1.8.0
- 1.7.5
- 1.7.4
- 1.7.3
- 1.7.2
- 1.7.1
- 1.7.0
- 1.6.4
- 1.6.3
- 1.6.2
- 1.6.1
- 1.6.0
- 1.5.1
- 1.5.0
- 1.4.1
- 1.4.0
- 1.3.0
- 1.2.0
- 1.1.1
- 1.1.0
- 1.0.0
- 0.17.0
- 0.16.2
- 0.16.1
- 0.16.0
- 0.15.0
- 0.14.3
- 0.14.2
- 0.14.1
- 0.14.0
- 0.13.3
- 0.13.2
- 0.13.1
- 0.13.0
- 0.12.0
- 0.11.2
- 0.11.1
- 0.11.0
- 0.10.0
- 0.9.0
- 0.8.5
- 0.8.4
- 0.8.3
- 0.8.2
- 0.8.1
- 0.8.0
- 0.7.4
- 0.7.3
- 0.7.2
- 0.7.1
- 0.7.0
- 0.6.5
- 0.6.4
- 0.6.3
- 0.6.2
- 0.6.1
- 0.6.0
- 0.5.6
- 0.5.4
- 0.5.3
- 0.5.2
- 0.5.1
- 0.5.0
- 0.4.16
- 0.4.15
- 0.4.14
- 0.4.13
- 0.4.12
- 0.4.11
- 0.4.10
- 0.4.9
- 0.4.8
- 0.4.7
- 0.4.6
- 0.4.5
- 0.4.4
- 0.4.3
- 0.4.2
- 0.4.1
- 0.4.0
- 0.3.10
- 0.3.9
- 0.3.8
- 0.3.7
- 0.3.6
- 0.3.5
- 0.3.4
- 0.3.2
- 0.3.1
- 0.3.0
- 0.2.0
This package is auto-updated.
Last update: 2025-03-01 00:43:56 UTC
Service Bus for PHP inspired by NServiceBus.
You can read the principals of usage and why we need it from their documentation :)
Documentation is bad. Ask in issues if you need help.
Telegram group:
Simple concept
Extended concept
composer require gdx/p-service-bus
So far no great examples
Please look For examples in
Or to symfony bundle
How to start to use it in your project:
- Initialization example
- How I do it in tests
- Saga/Aggregate consume command/event produce event.
- Bus allow to send command or publish event.
- CoroutineBus allow to send multiple commands or publish multiple events
- Doctrine integration (Saga persistence, transactional messages(OutBox pattern), onlyOnce control)
- Automatically init all the resources for you
- ServiceBus as main entry point
Send/Publish command/event
implements all the Bus interfaces.
If you have a couple of message use Bus
$bus->send(new TestCommand());
$bus->publish(new TestEvent());
If you have many messages use CoroutineBus
$coroutine = $coroutineBus->sendCoroutine();
$coroutine->send(new TestCommand(1), CommandOptions::record());
$coroutine->send(new TestCommand(2), CommandOptions::record());
$coroutine = $coroutineBus->publishCoroutine();
$coroutine->publish(new TestEvent(1), EventOptions::record());
$coroutine->publish(new TestEvent(2), EventOptions::record());
To start consuming run the command
p-service-bus:transport:consume memory
- memory is transport name
You can make any method as handler just use PHP Attribute You can set:
- retries amount of retries before message will go in DLQ (does not support by SQS, for sqs configure the queue itself with DSN string)
- timeoutSec the initial delay before the message will be precessed the first time. (max 15 min for SQS)
- retriesTimeoutExpression custom formula to calculate delay for each retry. We pass inside
. Syntax see here
You can set all of these options with MessageOption. Handler will override them. It is not recommended, so not documented.
<?php declare(strict_types=1);
use GDXbsv\PServiceBus\Bus\Handling\Handle;
use GDXbsv\PServiceBus\Bus\Handling\MessageHandleContext;
* @internal
final class Handlers
public string $result = '';
public function handleCommand(TestMultiHandlersCommand $command, MessageHandleContext $context): void
$this->result .= '||' . $command->name;
* 5 tries = initial try + 4 retries
* Retry no | Delay
* --------- | -------------
* 1 | 0 h 5 min
* 2 | 0 h 25 min
* 3 | 2 h 5 min
* 4 | 10 h 25 min
* after all retries -> push to DLQ after 10s
#[Handle(transportName: 'memory', retries: 5, timeoutSec: 100, retriesTimeoutExpression: '(retries_count > 4) ? 10 : (60 * (5 ** retries_count))')]
public function anyNameFunction(Test1Event $event, MessageHandleContext $context): void
$this->result .= '||' . $event->name;
External events
Send outside to subscribed clients (for example from SNS). Or receive from outside where we subscribed (for example to SNS).
use GDXbsv\PServiceBus\Message\ExternalIn;
use GDXbsv\PServiceBus\Message\ExternalOut;
* @internal
* @immutable
* @psalm-immutable
#[ExternalOut(transportName: 'memory-external', externalName: 'test.external_1_event')]
final class ExternalOutEvent
#[ExternalIn(transportName: 'memory-external', externalName: 'test.external_1_event')]
final class ExternalInEvent
Sometimes something goes wrong and you want to replay certain events. For this use replay annotations.
use GDXbsv\PServiceBus\Message\EventOptions;
use GDXbsv\PServiceBus\Message\Message;
use GDXbsv\PServiceBus\Message\Replay\Replay;
use GDXbsv\PServiceBusTestApp\Handling\Test1Event;
* @internal
* @immutable
* @psalm-immutable
* @psalm-import-type ReplayOutput from \GDXbsv\PServiceBus\Message\Replay\Replaying
final class ReplayForEvent
* @return ReplayOutput
#[Replay(replayName: 'testReplay')]
public function anyName(): \Traversable {
for ($i=1; $i<=5; ++$i) {
yield new Message(new Test1Event(), EventOptions::record());
Then use command to start replay
p-service-bus:message:replay testReplay "\GDXbsv\PServiceBusTestApp\Handling\Handlers::handle2Event1" memory
- testReplay is replay name from Attribute
- "\GDXbsv\PServiceBusTestApp\Handling\Handlers::handle2Event1" is className + :: + methodName
- memory is transport name
This is a long living process when you have to react on multiple events to make some decision. Or just when you data and message should be transactionally binded together with outbox pattern.
Example with doctrine:
<?php declare(strict_types=1);
use GDXbsv\PServiceBus\Bus\Handling\Handle;
use GDXbsv\PServiceBus\Id;
use GDXbsv\PServiceBus\Message\TimeSpan;
use GDXbsv\PServiceBus\Saga\MessageSagaContext;
use GDXbsv\PServiceBus\Saga\Saga;
use GDXbsv\PServiceBus\Saga\SagaContext;
use GDXbsv\PServiceBus\Saga\SagaPropertyMapper;
use Doctrine\ORM\Mapping as ORM;
use GDXbsv\PServiceBusTestApp\Saga\TestSagaCommand;
use GDXbsv\PServiceBusTestApp\Saga\TestSagaMapStringCommand;
use GDXbsv\PServiceBusTestApp\Saga\TestsSagaInEvent;
use GDXbsv\PServiceBusTestApp\Saga\TestsSagaOutputEvent;
* @final
final class TestSaga extends Saga
#[ORM\Column(type: 'id', nullable: false)]
private Id $id;
#[ORM\Column(type: 'string', nullable: false)]
private ?string $string;
#[ORM\Column(type: 'string', nullable: true)]
private ?string $value;
* @param Id<static> $id
private function __construct(Id $id, string $string)
$this->id = $id;
$this->string = $string;
public static function configureHowToCreateSaga(SagaCreateMapper $mapper): void
// do not forget to create handling function in a case if saga exists and to let saga know that we wait this message
function (TestSagaCreateCommand $command, MessageSagaContext $context) {
return new self(new Id($command->id), $command->string);
public static function configureHowToFindSaga(SagaPropertyMapper $mapper): void
// Find saga by id
->mapSaga(new \ReflectionProperty(TestSaga::class, 'id'))
function (TestSagaCommand $command, MessageSagaContext $context) {
return new Id($command->id);
function (TestsSagaInEvent $message, MessageSagaContext $context) {
return new Id($message->string);
// Find saga by string propery
->mapSaga(new \ReflectionProperty(TestSaga::class, 'string'))
function (TestSagaMapStringCommand $command, MessageSagaContext $context) {
return $command->string;
/** We have to tell saga we wait this message, or saga could already exist */
#[Handle('memory', 3)]
public function testSagaCreateCommand(TestSagaCreateCommand $command, SagaContext $context)
$this->string = $command->string;
/** We can remove saga after all */
#[Handle('memory', 3)]
public function testRemove(TestSagaRemoveCommand $command, SagaContext $context)
#[Handle('memory', 3)]
public function testHandlerFunction(TestSagaCommand $command, SagaContext $context)
$this->string = $command->string;
$context->timeout(new TestsSagaOutputEvent('testHandlerFunction'), TimeSpan::fromSeconds(0));
#[Handle('memory', 3)]
public function testListeningFunction(TestsSagaInEvent $event, SagaContext $context)
$this->string = $event->string;
$this->value = $event->value;
$context->publish(new TestsSagaOutputEvent('testListeningFunction'));
#[Handle('memory', 3)]
public function handleTestSagaMapStringCommand(
TestSagaMapStringCommand $command,
SagaContext $context
) {
$context->publish(new TestsSagaOutputEvent($this->id->toString()));
describe how to create saga (WARNING: Handlers should exist for all creation messages)configureHowToFindSaga
describe how to find saga#[Handle('memory', 3)]
set methods as handlers
You can create custom finders by using #[SagaFind]
attribute, for example:
<?php declare(strict_types=1);
use Doctrine\ORM\EntityManager;
use GDXbsv\PServiceBus\Message\MessageOptions;
use GDXbsv\PServiceBus\Saga\SagaFind;
use GDXbsv\PServiceBusTestApp\Saga\CustomDoctrineSearchEvent;
use GDXbsv\PServiceBusTestApp\Saga\TestSaga;
* @internal
* @immutable
* @psalm-immutable
final class CustomDoctrineSagaFinder
public function __construct(
private EntityManager $em
) {
public function findByMultipleFields(
CustomDoctrineSearchEvent $event,
MessageOptions $messageOptions
): TestSaga {
$qb = $this->em->createQueryBuilder();
->from(TestSaga::class, 'saga')
->where($qb->expr()->eq('saga.string', ':propertyValue'))
->setParameter(':propertyValue', $event->string);
$saga = $qb->getQuery()->getSingleResult();
return $saga;
So far implemented only InMemoryTransport and RabbitMq(BunnyTransport) and SQS and SNS. But you can adapt any of yours by implementing 2 interfaces
interface Transport
* @return \Generator<int, void, Envelope|null, void>
public function sending(): \Generator;
* @return \Generator<int, Envelope, Result\Ok<null, mixed>|Result\Err<mixed, \Exception>, void>
public function receive(int $limit = 0): \Generator;
public function stop(): void;
interface TransportSynchronisation
public function sync(): void;
Please pay attention that sync()
method for an external bus MUST subscribe you on external message name if you want it
to happen automatically.
SQS transport
Examle DSN or config
Option | Description | Default |
region | AWS region | |
retries | Retries before DLQ (Dead Letter Queue) during the creation of the queue | 3 |
visibilityTimeout | How long consumed messages will be blocked from next attempt (sec) | 30 |
waitSeconds | Long polling - how long will AWS wait to collect a batch of messages | 20 |
waitBetweenLoopsSeconds | If no messages, how long will we sleep between attempts | 40 |
messagesBatch | How many messages will we consume from AWS with one request | 10 |
preload | Asynchronously load the next messages while working on the previous ones | true |
tags[name] | Add tags to the queue during the creation | |
assume | The AWS role which we want to assume | |
queue | The actual queue name |
bunny transport
For bunny transport it is different internal and external transports. External use exchanges and pub/sub.
SQS-SNS transport
For SQS-SNS transports SNS is only for external messages.
See See