gdx / p-service-bus
PServiceBus
Installs: 61 161
Dependents: 4
Suggesters: 0
Security: 0
Stars: 3
Forks: 2
pkg:composer/gdx/p-service-bus
Requires
- php: >=8.4
- ext-json: *
- ext-pcntl: *
- doctrine/instantiator: ^1.4 || ^2.0
- prewk/result: ^3.1.0
- psr/log: ^3.0|^2.0|^1.0
- ramsey/uuid: ^4.5
- symfony/console: *
- symfony/expression-language: *
Requires (Dev)
- ext-ffi: *
- aws/aws-sdk-php: ^3.351.11
- bunny/bunny: ^v0.5.6
- doctrine/orm: ^2.20.5
- enqueue/dsn: ^0.10.26
- guzzlehttp/guzzle: ^7.9.3
- idealo/php-rdkafka-ffi: ^0.6.0
- php-standard-library/psalm-plugin: ^2.3.0
- phpunit/phpunit: ^11.5.28
- react/async: ^v3.0 || ^4.0
- rector/rector: ^2.1.2
- roave/security-advisories: dev-master
- symfony/cache: *
- vimeo/psalm: ^7.0.0-beta11
Suggests
- ext-ffi: Allows to use Kafka as transport
- aws/aws-sdk-php: Allows to use SQS and/or SNS
- bunny/bunny: Allows to use RabbitMq as transport
- doctrine/orm: If you want to use it with Doctrine.
- enqueue/dsn: Allows to use SQS, SNS and Kafka transports
- idealo/php-rdkafka-ffi: Allows to use Kafka as transport
- react/async: Allows to use RabbitMq as transport with async support
- dev-master
- 2.6.0
- 2.5.1
- 2.5.0
- 2.4.0
- 2.3.1
- 2.3.0
- 2.2.2
- 2.2.1
- 2.2.0
- 2.1.1
- 2.1.0
- 2.0.4
- 2.0.3
- 2.0.2
- 2.0.1
- 2.0.0
- 1.13.0
- 1.12.1
- 1.12.0
- 1.11.0
- 1.10.1
- 1.10.0
- 1.9.0
- 1.8.0
- 1.7.5
- 1.7.4
- 1.7.3
- 1.7.2
- 1.7.1
- 1.7.0
- 1.6.4
- 1.6.3
- 1.6.2
- 1.6.1
- 1.6.0
- 1.5.1
- 1.5.0
- 1.4.1
- 1.4.0
- 1.3.0
- 1.2.0
- 1.1.1
- 1.1.0
- 1.0.0
- 0.17.0
- 0.16.2
- 0.16.1
- 0.16.0
- 0.15.0
- 0.14.3
- 0.14.2
- 0.14.1
- 0.14.0
- 0.13.3
- 0.13.2
- 0.13.1
- 0.13.0
- 0.12.0
- 0.11.2
- 0.11.1
- 0.11.0
- 0.10.0
- 0.9.0
- 0.8.5
- 0.8.4
- 0.8.3
- 0.8.2
- 0.8.1
- 0.8.0
- 0.7.4
- 0.7.3
- 0.7.2
- 0.7.1
- 0.7.0
- 0.6.5
- 0.6.4
- 0.6.3
- 0.6.2
- 0.6.1
- 0.6.0
- 0.5.6
- 0.5.4
- 0.5.3
- 0.5.2
- 0.5.1
- 0.5.0
- 0.4.16
- 0.4.15
- 0.4.14
- 0.4.13
- 0.4.12
- 0.4.11
- 0.4.10
- 0.4.9
- 0.4.8
- 0.4.7
- 0.4.6
- 0.4.5
- 0.4.4
- 0.4.3
- 0.4.2
- 0.4.1
- 0.4.0
- 0.3.10
- 0.3.9
- 0.3.8
- 0.3.7
- 0.3.6
- 0.3.5
- 0.3.4
- 0.3.2
- 0.3.1
- 0.3.0
- 0.2.0
- dev-Kafka-AWS-iam
- dev-add-attempt-message-option
This package is auto-updated.
Last update: 2025-10-14 13:13:49 UTC
README
Service Bus for PHP inspired by NServiceBus.
You can read the principals of usage and why we need it from their documentation :)
Documentation is bad. Ask in issues if you need help.
Telegram group: https://t.me/PServiceBus
Symfony: https://packagist.org/packages/gdx/p-service-bus-symfony-bundle
Laravel: https://packagist.org/packages/gdx/p-service-bus-laravel-package
Simple concept
Extended concept
Installation
composer require gdx/p-service-bus
Usage
So far no great examples
Please look For examples in https://gitlab.com/GDXbsv/pservicebus/-/tree/master/TestApp
Or to symfony bundle https://packagist.org/packages/gdx/p-service-bus-symfony-bundle
How to start to use it in your project:
- Initialization example https://gitlab.com/GDXbsv/pservicebus/-/blob/master/src/Setup.php
- How I do it in tests https://gitlab.com/GDXbsv/pservicebus/-/blob/master/tests/Integration/IntegrationTestCase.php#L45
Features
- Saga/Aggregate consume command/event produce event.
- Bus allow to send command or publish event.
- CoroutineBus allow to send multiple commands or publish multiple events
- Doctrine integration (Saga persistence, transactional messages(OutBox pattern), onlyOnce control)
- Automatically init all the resources for you
- ServiceBus as main entry point
Init
p-service-bus:init
Send/Publish command/event
\GDXbsv\PServiceBus\Bus\ServiceBus
implements all the Bus interfaces.
If you have a couple of message use Bus
#command
$bus->send(new TestCommand());
#event
$bus->publish(new TestEvent());
If you have many messages use CoroutineBus
#command
$coroutine = $coroutineBus->sendCoroutine();
$coroutine->send(new TestCommand(1), CommandOptions::record());
$coroutine->send(new TestCommand(2), CommandOptions::record());
$coroutine->finish();
#event
$coroutine = $coroutineBus->publishCoroutine();
$coroutine->publish(new TestEvent(1), EventOptions::record());
$coroutine->publish(new TestEvent(2), EventOptions::record());
$coroutine->finish();
Consume
To start consuming run the command
p-service-bus:transport:consume memory
where:
- memory is transport name
Message Key
Message key allows you to control message routing and ensure order preservation for related messages. It's supported by Kafka (as partition key) and SQS FIFO queues (as MessageGroupId).
Setting Message Key
There are two ways to set the message key:
1. Via MessageOptions (Will be set for all external messages + for each message+handler):
use GDXbsv\PServiceBus\Message\CommandOptions;
$bus->send(
new OrderCreatedCommand($orderId),
CommandOptions::record()->withMessageKey($customerId)
);
2. Via Handle attribute (will be set only for internal message+handler):
#[Handle(transportName: 'kafka', messageKey: 'static-key')]
public function handleOrder(OrderCreatedCommand $command, MessageHandleContext $context): void
{
// All messages handled here will use 'static-key' as message key
}
Priority: Handle attribute's messageKey
has higher priority and will override the messageKey
set in MessageOptions.
Transport-Specific Behavior
- Kafka: Message key becomes the partition key. Messages with the same key are sent to the same partition, guaranteeing order preservation.
- SQS FIFO: Message key becomes
MessageGroupId
. Messages with the same group ID are processed in order within FIFO queues (.fifo
suffix required). - Other transports: Message key may be ignored or used differently depending on transport capabilities.
Handlers
You can make any method as handler just use PHP Attribute You can set:
- retries amount of retries before message will go in DLQ (does not support by SQS, for sqs configure the queue itself with DSN string)
- timeoutSec the initial delay before the message will be precessed the first time. (max 15 min for SQS)
- retriesTimeoutExpression custom formula to calculate delay for each retry. We pass inside
retries_count
. Syntax see here https://symfony.com/doc/current/reference/formats/expression_language.html - messageKey message routing key for order preservation (see Message Key section above)
You can set all of these options with MessageOption. Handler will override them. It is not recommended, so not documented.
<?php declare(strict_types=1);
use GDXbsv\PServiceBus\Bus\Handling\Handle;
use GDXbsv\PServiceBus\Bus\Handling\MessageHandleContext;
/**
* @internal
*/
final class Handlers
{
public string $result = '';
#[Handle('memory')]
public function handleCommand(TestMultiHandlersCommand $command, MessageHandleContext $context): void
{
$this->result .= '||' . $command->name;
}
/**
* 5 tries = initial try + 4 retries
*
* Retry no | Delay
* --------- | -------------
* 1 | 0 h 5 min
* 2 | 0 h 25 min
* 3 | 2 h 5 min
* 4 | 10 h 25 min
*
* after all retries -> push to DLQ after 10s
*/
#[Handle(transportName: 'memory', retries: 5, timeoutSec: 100, retriesTimeoutExpression: '(retries_count > 4) ? 10 : (60 * (5 ** retries_count))')]
public function anyNameFunction(Test1Event $event, MessageHandleContext $context): void
{
$this->result .= '||' . $event->name;
}
}
Transport Attributes
The MessageHandleContext
now provides access to transport-specific attributes and retry information through the transportAttributes
property:
#[Handle('sqs')]
public function handleSqsMessage(MyEvent $event, MessageHandleContext $context): void
{
// Access retry attempt count (0-based)
$attemptNumber = $context->transportAttributes->attempt;
// Access transport-specific attributes
$attributes = $context->transportAttributes->attributes;
// SQS attributes
$messageId = $attributes['MessageId'] ?? null;
$receiptHandle = $attributes['ReceiptHandle'] ?? null;
$sentTimestamp = $attributes['SentTimestamp'] ?? null;
// Handle based on attempt count
if ($attemptNumber === 0) {
// First attempt
$this->handleFirstAttempt($event);
} else {
// Retry attempt
$this->handleRetry($event, $attemptNumber);
}
}
External events
Send outside to subscribed clients (for example from SNS). Or receive from outside where we subscribed (for example to SNS).
<?php
declare(strict_types=1);
use GDXbsv\PServiceBus\Message\ExternalIn;
use GDXbsv\PServiceBus\Message\ExternalOut;
/**
* @internal
* @immutable
* @psalm-immutable
*/
#[ExternalOut(transportName: 'memory-external', externalName: 'test.external_1_event')]
final class ExternalOutEvent
{
}
#[ExternalIn(transportName: 'memory-external', externalName: 'test.external_1_event')]
final class ExternalInEvent
{
}
Replay
Sometimes something goes wrong and you want to replay certain events. For this use replay annotations.
<?php
declare(strict_types=1);
use GDXbsv\PServiceBus\Message\EventOptions;
use GDXbsv\PServiceBus\Message\Message;
use GDXbsv\PServiceBus\Message\Replay\Replay;
use GDXbsv\PServiceBusTestApp\Handling\Test1Event;
/**
* @internal
* @immutable
* @psalm-immutable
*
* @psalm-import-type ReplayOutput from \GDXbsv\PServiceBus\Message\Replay\Replaying
*/
final class ReplayForEvent
{
/**
* @return ReplayOutput
*/
#[Replay(replayName: 'testReplay')]
public function anyName(): \Traversable {
for ($i=1; $i<=5; ++$i) {
yield new Message(new Test1Event(), EventOptions::record());
}
}
}
Then use command to start replay
p-service-bus:message:replay testReplay "\GDXbsv\PServiceBusTestApp\Handling\Handlers::handle2Event1" memory
where:
- testReplay is replay name from Attribute
- "\GDXbsv\PServiceBusTestApp\Handling\Handlers::handle2Event1" is className + :: + methodName
- memory is transport name
Saga
Inspired: https://docs.particular.net/nservicebus/sagas/
This is a long living process when you have to react on multiple events to make some decision. Or just when you data and message should be transactionally binded together with outbox pattern.
Example with doctrine:
<?php declare(strict_types=1);
use GDXbsv\PServiceBus\Bus\Handling\Handle;
use GDXbsv\PServiceBus\Id;
use GDXbsv\PServiceBus\Message\TimeSpan;
use GDXbsv\PServiceBus\Saga\MessageSagaContext;
use GDXbsv\PServiceBus\Saga\Saga;
use GDXbsv\PServiceBus\Saga\SagaContext;
use GDXbsv\PServiceBus\Saga\SagaPropertyMapper;
use Doctrine\ORM\Mapping as ORM;
use GDXbsv\PServiceBusTestApp\Saga\TestSagaCommand;
use GDXbsv\PServiceBusTestApp\Saga\TestSagaMapStringCommand;
use GDXbsv\PServiceBusTestApp\Saga\TestsSagaInEvent;
use GDXbsv\PServiceBusTestApp\Saga\TestsSagaOutputEvent;
/**
* @final
*/
#[ORM\Entity]
final class TestSaga extends Saga
{
#[ORM\Column(type: 'id', nullable: false)]
#[ORM\Id]
private Id $id;
#[ORM\Column(type: 'string', nullable: false)]
private ?string $string;
#[ORM\Column(type: 'string', nullable: true)]
private ?string $value;
/**
* @param Id<static> $id
*/
private function __construct(Id $id, string $string)
{
$this->id = $id;
$this->string = $string;
}
public static function configureHowToCreateSaga(SagaCreateMapper $mapper): void
{
$mapper
->toMessage(
// do not forget to create handling function in a case if saga exists and to let saga know that we wait this message
function (TestSagaCreateCommand $command, MessageSagaContext $context) {
return new self(new Id($command->id), $command->string);
}
);
}
public static function configureHowToFindSaga(SagaPropertyMapper $mapper): void
{
$mapper
// Find saga by id
->mapSaga(new \ReflectionProperty(TestSaga::class, 'id'))
->toMessage(
function (TestSagaCommand $command, MessageSagaContext $context) {
return new Id($command->id);
}
)
->toMessage(
function (TestsSagaInEvent $message, MessageSagaContext $context) {
return new Id($message->string);
}
);
$mapper
// Find saga by string propery
->mapSaga(new \ReflectionProperty(TestSaga::class, 'string'))
->toMessage(
function (TestSagaMapStringCommand $command, MessageSagaContext $context) {
return $command->string;
}
);
}
/** We have to tell saga we wait this message, or saga could already exist */
#[Handle('memory', 3)]
public function testSagaCreateCommand(TestSagaCreateCommand $command, SagaContext $context)
{
$this->string = $command->string;
}
/** We can remove saga after all */
#[Handle('memory', 3)]
public function testRemove(TestSagaRemoveCommand $command, SagaContext $context)
{
$this->markAsComplete();
}
#[Handle('memory', 3)]
public function testHandlerFunction(TestSagaCommand $command, SagaContext $context)
{
$this->string = $command->string;
$context->timeout(new TestsSagaOutputEvent('testHandlerFunction'), TimeSpan::fromSeconds(0));
}
#[Handle('memory', 3)]
public function testListeningFunction(TestsSagaInEvent $event, SagaContext $context)
{
$this->string = $event->string;
$this->value = $event->value;
$context->publish(new TestsSagaOutputEvent('testListeningFunction'));
}
#[Handle('memory', 3)]
public function handleTestSagaMapStringCommand(
TestSagaMapStringCommand $command,
SagaContext $context
) {
$context->publish(new TestsSagaOutputEvent($this->id->toString()));
}
}
where:
configureHowToCreateSaga
describe how to create saga (WARNING: Handlers should exist for all creation messages)configureHowToFindSaga
describe how to find saga#[Handle('memory', 3)]
set methods as handlers
You can create custom finders by using #[SagaFind]
attribute, for example:
<?php declare(strict_types=1);
use Doctrine\ORM\EntityManager;
use GDXbsv\PServiceBus\Message\MessageOptions;
use GDXbsv\PServiceBus\Saga\SagaFind;
use GDXbsv\PServiceBusTestApp\Saga\CustomDoctrineSearchEvent;
use GDXbsv\PServiceBusTestApp\Saga\TestSaga;
/**
* @internal
* @immutable
* @psalm-immutable
*/
final class CustomDoctrineSagaFinder
{
public function __construct(
private EntityManager $em
) {
}
#[SagaFind]
public function findByMultipleFields(
CustomDoctrineSearchEvent $event,
MessageOptions $messageOptions
): TestSaga {
$qb = $this->em->createQueryBuilder();
$qb
->from(TestSaga::class, 'saga')
->select('saga')
->where($qb->expr()->eq('saga.string', ':propertyValue'))
->setParameter(':propertyValue', $event->string);
$saga = $qb->getQuery()->getSingleResult();
return $saga;
}
}
Transport
Available transports: InMemory, RabbitMQ (Bunny), SQS, SNS, Kafka, and CompositeTransport.
You can create custom transports by implementing these interfaces:
interface Transport
{
/**
* @return \Generator<int, void, Envelope|null, void>
*/
public function sending(): \Generator;
/**
* @return \Generator<int, Envelope, Result\Ok<null, mixed>|Result\Err<mixed, \Exception>, void>
*/
public function receive(int $limit = 0): \Generator;
public function stop(): void;
}
interface TransportSynchronisation
{
public function sync(): void;
}
Please pay attention that sync()
method for an external bus MUST subscribe you on external message name if you want it
to happen automatically.
CompositeTransport
CompositeTransport allows you to combine multiple transports and send messages to all of them simultaneously. This is useful for:
- Broadcasting messages to multiple message brokers
- Synchronizing data across different systems
- Migration scenarios where you need to publish to both old and new infrastructure
Note: CompositeTransport only supports sending messages. Receiving and synchronization must be done from individual transports independently.
use GDXbsv\PServiceBus\Transport\CompositeTransport;
$sqsTransport = SqsTransport::ofDsn('sqs+http://...');
$kafkaTransport = KafkaFactory::createInternalFromDsn('kafka://...');
$composite = new CompositeTransport([$sqsTransport, $kafkaTransport]);
SQS transport
Examle DSN or config
sqs+http://key:secret@aws:4100/123456789012?region=eu-west-1&retries=3&visibilityTimeout=30&waitSeconds=20&waitBetweenLoopsSeconds=40&messagesBatch=10&preload=true&tags[name]=value&tags[name2]=value2&assume=arn%3Aaws%3Aiam%3A%3A123456789012%3Arole%2Fxaccounts3access&queue=QueueName"
Options:
Option | Description | Default |
---|---|---|
region | AWS region | |
retries | Retries before DLQ (Dead Letter Queue) during the creation of the queue | 3 |
visibilityTimeout | How long consumed messages will be blocked from next attempt (sec) | 30 |
waitSeconds | Long polling - how long will AWS wait to collect a batch of messages | 20 |
waitBetweenLoopsSeconds | If no messages, how long will we sleep between attempts | 40 |
messagesBatch | How many messages will we consume from AWS with one request | 10 |
preload | Asynchronously load the next messages while working on the previous ones | true |
tags[name] | Add tags to the queue during the creation | |
assume | The AWS role which we want to assume | |
queue | The actual queue name |
Message Key for FIFO Queues
SQS FIFO queues support the messageKey
feature (see Message Key section for usage). For FIFO queues:
messageKey
is mapped toMessageGroupId
- Messages with the same
MessageGroupId
are processed in order MessageDeduplicationId
is automatically set (uses message ID)- Queue name must end with
.fifo
- Enable content-based deduplication on the queue or ensure unique message IDs
Note: Standard (non-FIFO) SQS queues ignore the messageKey parameter.
RabbitMQ (Bunny) Transport
RabbitMQ transport provides reliable message queuing with AMQP protocol. There are two transport types:
- Internal Transport: Direct queue-to-queue messaging with built-in retry and DLQ support
- External Transport: Exchange-based pub/sub for inter-service communication
Message Key Usage
The messageKey
parameter is stored in message headers for tracking and correlation purposes. RabbitMQ queues maintain FIFO (first-in-first-out) order by default, ensuring messages are processed in the order they are received.
Example:
use GDXbsv\PServiceBus\Message\MessageOptions;
$bus->send(
new OrderCommand($orderId),
MessageOptions::record()->withMessageKey('customer-123')
);
The messageKey is preserved in headers throughout the message lifecycle and can be accessed in handlers via $envelope->headers['message_key']
.
Additional Information
See https://gitlab.com/GDXbsv/pservicebus/-/tree/master/src/Transport/Bunny
SQS-SNS transport
For SQS-SNS transports SNS is only for external messages.
See https://gitlab.com/GDXbsv/pservicebus/-/tree/master/src/Transport/Sqs See https://gitlab.com/GDXbsv/pservicebus/-/tree/master/src/Transport/Sns
Kafka Transport
Kafka transport provides high-throughput, distributed messaging with both internal and external transports.
Internal Transport DSN
Example DSN:
kafka://broker1:9092,broker2:9092/my-topic?group.id=my-consumer-group&partitions.amount=3&replication.factor=3&auto.offset.reset=earliest&enable.transactions=true
With SASL authentication:
kafka://user:pass@broker1:9092,broker2:9092/my-topic?group.id=my-group&partitions.amount=3
DSN Options
Option | Description | Default | Required |
---|---|---|---|
broker list | Comma-separated list of Kafka brokers (host:port) | - | Yes |
topic (path) | Topic name for internal messages | - | Yes |
username (DSN user) | SASL username (set via DSN: kafka://user:pass@broker/topic ) | - | No |
password (DSN password) | SASL password (set via DSN: kafka://user:pass@broker/topic ) | - | No |
partitions.amount | Number of partitions for the topic | - | Yes |
group.id | Consumer group identifier | p-service-bus | No |
replication.factor | Replication factor for the topic | 3 | No |
auto.offset.reset | Where to start consuming: earliest (from beginning) or latest (only new messages) | earliest | No |
enable.transactions | Enable transactional message production (true/false) | false | No |
transactional.id | Custom transactional ID (auto-generated if not provided when transactions enabled) | auto | No |
socket.timeout.ms | Socket timeout in milliseconds | - | No |
queue.buffering.max.messages | Maximum number of messages in producer queue | - | No |
queue.buffering.max.ms | Maximum time messages can be buffered before sending (ms) | - | No |
batch.num.messages | Maximum number of messages in a batch | - | No |
SSL Support
Use kafka+ssl://
scheme for SSL connections:
kafka+ssl://broker1:9093/my-topic?partitions.amount=3&ssl.ca.location=/path/to/ca-cert.pem
With SASL authentication over SSL:
kafka+ssl://user:pass@broker1:9093/my-topic?partitions.amount=3&ssl.ca.location=/path/to/ca-cert.pem
SSL Options:
Option | Description | Required for SSL |
---|---|---|
ssl.ca.location | Path to the CA certificate file (PEM format) for SSL/TLS | Yes |
Note: The ssl.ca.location
parameter is mandatory when using kafka+ssl://
scheme. It specifies the path to the CA certificate bundle used to verify the broker's certificate.
AWS MSK IAM Authentication
PServiceBus supports AWS MSK (Managed Streaming for Kafka) with IAM authentication using SASL/OAUTHBEARER mechanism. This allows you to use AWS IAM roles and policies to control access to your Kafka clusters without managing separate credentials.
Requirements:
- AWS SDK for PHP:
composer require aws/aws-sdk-php
- Use
kafka+ssl://
scheme (IAM authentication requires SSL/TLS) - AWS credentials configured (via environment variables, IAM role, or explicit credentials)
DSN Format:
kafka+ssl://broker.kafka.region.amazonaws.com:9098/my-topic?partitions.amount=3&group.id=my-group&aws.iam.auth=true&aws.region=us-east-1
AWS MSK IAM Options:
Option | Description | Required | Default |
---|---|---|---|
aws.iam.auth | Enable AWS IAM authentication (must be true for MSK IAM) | Yes | false |
aws.region | AWS region where your MSK cluster is located (e.g., us-east-1 , eu-west-1 ) | Yes | - |
aws.assume.role.arn | ARN of IAM role to assume for authentication (optional, for cross-account access) | No | - |
aws.iam.session_token | AWS session token (for temporary credentials, rarely needed) | No | - |
Authentication Methods:
Default Credentials (Recommended for EC2/ECS/Lambda):
kafka+ssl://b-1.mycluster.xxxxx.kafka.us-east-1.amazonaws.com:9098/orders?partitions.amount=3&aws.iam.auth=true&aws.region=us-east-1
Uses AWS credentials from the environment (IAM instance profile, environment variables, or
~/.aws/credentials
)Explicit Credentials:
kafka+ssl://ACCESS_KEY:SECRET_KEY@b-1.mycluster.xxxxx.kafka.us-east-1.amazonaws.com:9098/orders?partitions.amount=3&aws.iam.auth=true&aws.region=us-east-1
Uses provided access key and secret key in the DSN
With Session Token (Temporary Credentials):
kafka+ssl://ACCESS_KEY:SECRET_KEY@broker:9098/orders?partitions.amount=3&aws.iam.auth=true&aws.region=us-east-1&aws.iam.session_token=SESSION_TOKEN
Assume Role (Cross-Account or Role Chaining):
kafka+ssl://broker:9098/orders?partitions.amount=3&aws.iam.auth=true&aws.region=us-east-1&aws.assume.role.arn=arn:aws:iam::123456789012:role/KafkaAccessRole
Assumes the specified IAM role for authentication
Complete Example:
use GDXbsv\PServiceBus\Transport\Kafka\KafkaFactory;
// Using default AWS credentials (EC2 instance profile or environment)
$transport = KafkaFactory::createInternalFromDsn(
'kafka+ssl://b-1.mycluster.xxxxx.kafka.us-east-1.amazonaws.com:9098,b-2.mycluster.xxxxx.kafka.us-east-1.amazonaws.com:9098/orders?group.id=order-processor&partitions.amount=5&replication.factor=3&aws.iam.auth=true&aws.region=us-east-1'
);
// With assume role for cross-account access
$transport = KafkaFactory::createInternalFromDsn(
'kafka+ssl://b-1.mycluster.xxxxx.kafka.us-east-1.amazonaws.com:9098/orders?group.id=processor&partitions.amount=3&aws.iam.auth=true&aws.region=us-east-1&aws.assume.role.arn=arn:aws:iam::987654321098:role/CrossAccountKafkaAccess'
);
// With explicit credentials (not recommended for production)
$transport = KafkaFactory::createInternalFromDsn(
'kafka+ssl://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY@b-1.mycluster.xxxxx.kafka.us-east-1.amazonaws.com:9098/orders?group.id=processor&partitions.amount=3&aws.iam.auth=true&aws.region=us-east-1'
);
Required IAM Permissions:
Your IAM role or user must have permissions to connect to the MSK cluster. Example policy:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:DescribeCluster"
],
"Resource": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/*"
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:*Topic*",
"kafka-cluster:WriteData",
"kafka-cluster:ReadData"
],
"Resource": "arn:aws:kafka:us-east-1:123456789012:topic/my-cluster/*"
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:AlterGroup",
"kafka-cluster:DescribeGroup"
],
"Resource": "arn:aws:kafka:us-east-1:123456789012:group/my-cluster/*"
}
]
}
How It Works:
- When
aws.iam.auth=true
is set, the transport uses SASL/OAUTHBEARER mechanism - AWS credentials are obtained from the specified source (environment, DSN, or assumed role)
- A signed URL is generated using AWS Signature Version 4 for the Kafka cluster
- The signed URL is encoded and used as the OAuth token
- Tokens are automatically refreshed before expiration (default: 900 seconds)
Troubleshooting:
- Error: "authentication failure": Verify IAM permissions and AWS credentials
- Error: "broker transport failure": Check broker address and port (should be 9098 for IAM)
- Error: "invalid argument": Ensure
kafka+ssl://
scheme is used withaws.iam.auth=true
- Token refresh issues: Tokens are cached and refreshed automatically; ensure credentials haven't expired
Notes:
- IAM authentication requires the broker to support SASL/OAUTHBEARER (AWS MSK with IAM enabled)
- SSL/TLS is mandatory for IAM authentication (
kafka+ssl://
scheme required) - Token lifetime is 15 minutes, automatically refreshed
- Both Producer and Consumer are authenticated during construction
- Admin operations (topic creation, metadata) are also authenticated
Examples
Basic setup:
use GDXbsv\PServiceBus\Transport\Kafka\KafkaFactory;
$transport = KafkaFactory::createInternalFromDsn(
'kafka://localhost:9092/orders?partitions.amount=3'
);
With transactions (exactly-once semantics):
$transport = KafkaFactory::createInternalFromDsn(
'kafka://localhost:9092/orders?partitions.amount=3&enable.transactions=true'
);
Multiple brokers with SSL:
$transport = KafkaFactory::createInternalFromDsn(
'kafka+ssl://broker1:9093,broker2:9093/orders?&partitions.amount=5&replication.factor=3'
);
Performance tuning:
$transport = KafkaFactory::createInternalFromDsn(
'kafka://localhost:9092/high-throughput?group.id=processor&partitions.amount=10&queue.buffering.max.messages=100000&queue.buffering.max.ms=50&batch.num.messages=10000'
);
External Transport
For external messages (pub/sub across services), use KafkaFactory::createExternalFromDsn()
:
use GDXbsv\PServiceBus\Transport\Kafka\KafkaFactory;
$externalTransport = KafkaFactory::createExternalFromDsn(
'kafka://localhost:9092/external-events?group.id=external-consumer&partitions.amount=3',
messageNameMapOut: ['com.myapp.users.created' => 'UserCreated'],
messageNameMapIn: ['com.another_app.users.validated' => 'UserValidated']
);
Key Features
- Transactions: Enable with
enable.transactions=true
for exactly-once delivery semantics - Consumer Groups: Use
group.id
to scale consumers horizontally - Partitioning: Set
partitions.amount
to control parallelism - Offset Management: Control where to start consuming with
auto.offset.reset
- High Throughput: Tune batching and buffering parameters for performance
- Partition Key: Kafka uses the
messageKey
(see Message Key section) as the partition key. Messages with the same key are sent to the same partition, guaranteeing order preservation. See the main Message Key section for usage examples.
Notes
- Dead Letter Queue (DLQ) topics are automatically created with
_DL
suffix - Retries are not yet implemented in Kafka transport
- Manual commit is used for reliability (
enable.auto.commit=false
) - Partition EOF detection is enabled by default
See https://gitlab.com/GDXbsv/pservicebus/-/tree/master/src/Transport/Kafka