gdx/p-service-bus

PServiceBus

Installs: 61 161

Dependents: 4

Suggesters: 0

Security: 0

Stars: 3

Forks: 2

pkg:composer/gdx/p-service-bus

2.6.0 2025-10-14 15:01 UTC

README

Service Bus for PHP inspired by NServiceBus.

You can read the principals of usage and why we need it from their documentation :)

Documentation is bad. Ask in issues if you need help.

Telegram group: https://t.me/PServiceBus

Symfony: https://packagist.org/packages/gdx/p-service-bus-symfony-bundle

Laravel: https://packagist.org/packages/gdx/p-service-bus-laravel-package

Simple concept

PServiceBus Simple

Extended concept

PServiceBus

Installation

composer require gdx/p-service-bus

Usage

So far no great examples

Please look For examples in https://gitlab.com/GDXbsv/pservicebus/-/tree/master/TestApp

Or to symfony bundle https://packagist.org/packages/gdx/p-service-bus-symfony-bundle

How to start to use it in your project:

Features

  • Saga/Aggregate consume command/event produce event.
  • Bus allow to send command or publish event.
  • CoroutineBus allow to send multiple commands or publish multiple events
  • Doctrine integration (Saga persistence, transactional messages(OutBox pattern), onlyOnce control)
  • Automatically init all the resources for you
  • ServiceBus as main entry point

Init

p-service-bus:init

Send/Publish command/event

\GDXbsv\PServiceBus\Bus\ServiceBus implements all the Bus interfaces.

If you have a couple of message use Bus

#command
$bus->send(new TestCommand());
#event
$bus->publish(new TestEvent());

If you have many messages use CoroutineBus

#command
$coroutine = $coroutineBus->sendCoroutine();
$coroutine->send(new TestCommand(1), CommandOptions::record());
$coroutine->send(new TestCommand(2), CommandOptions::record());
$coroutine->finish();
#event
$coroutine = $coroutineBus->publishCoroutine();
$coroutine->publish(new TestEvent(1), EventOptions::record());
$coroutine->publish(new TestEvent(2), EventOptions::record());
$coroutine->finish();

Consume

To start consuming run the command

p-service-bus:transport:consume  memory

where:

  • memory is transport name

Message Key

Message key allows you to control message routing and ensure order preservation for related messages. It's supported by Kafka (as partition key) and SQS FIFO queues (as MessageGroupId).

Setting Message Key

There are two ways to set the message key:

1. Via MessageOptions (Will be set for all external messages + for each message+handler):

use GDXbsv\PServiceBus\Message\CommandOptions;

$bus->send(
    new OrderCreatedCommand($orderId),
    CommandOptions::record()->withMessageKey($customerId)
);

2. Via Handle attribute (will be set only for internal message+handler):

#[Handle(transportName: 'kafka', messageKey: 'static-key')]
public function handleOrder(OrderCreatedCommand $command, MessageHandleContext $context): void
{
    // All messages handled here will use 'static-key' as message key
}

Priority: Handle attribute's messageKey has higher priority and will override the messageKey set in MessageOptions.

Transport-Specific Behavior

  • Kafka: Message key becomes the partition key. Messages with the same key are sent to the same partition, guaranteeing order preservation.
  • SQS FIFO: Message key becomes MessageGroupId. Messages with the same group ID are processed in order within FIFO queues (.fifo suffix required).
  • Other transports: Message key may be ignored or used differently depending on transport capabilities.

Handlers

You can make any method as handler just use PHP Attribute You can set:

  • retries amount of retries before message will go in DLQ (does not support by SQS, for sqs configure the queue itself with DSN string)
  • timeoutSec the initial delay before the message will be precessed the first time. (max 15 min for SQS)
  • retriesTimeoutExpression custom formula to calculate delay for each retry. We pass inside retries_count. Syntax see here https://symfony.com/doc/current/reference/formats/expression_language.html
  • messageKey message routing key for order preservation (see Message Key section above)

You can set all of these options with MessageOption. Handler will override them. It is not recommended, so not documented.

<?php declare(strict_types=1);

use GDXbsv\PServiceBus\Bus\Handling\Handle;
use GDXbsv\PServiceBus\Bus\Handling\MessageHandleContext;

/**
 * @internal
 */
final class Handlers
{
    public string $result = '';

    #[Handle('memory')]
    public function handleCommand(TestMultiHandlersCommand $command, MessageHandleContext $context): void
    {
        $this->result .= '||' . $command->name;
    }

    /**
     * 5 tries = initial try + 4 retries
     *
     * Retry no  | Delay
     * --------- | -------------
     *      1    |  0 h  5 min
     *      2    |  0 h 25 min
     *      3    |  2 h  5 min
     *      4    | 10 h 25 min
     *
     * after all retries -> push to DLQ after 10s
     */
    #[Handle(transportName: 'memory', retries: 5, timeoutSec: 100, retriesTimeoutExpression: '(retries_count > 4) ? 10 : (60 * (5 ** retries_count))')]
    public function anyNameFunction(Test1Event $event, MessageHandleContext $context): void
    {
        $this->result .= '||' . $event->name;
    }
}

Transport Attributes

The MessageHandleContext now provides access to transport-specific attributes and retry information through the transportAttributes property:

#[Handle('sqs')]
public function handleSqsMessage(MyEvent $event, MessageHandleContext $context): void
{
    // Access retry attempt count (0-based)
    $attemptNumber = $context->transportAttributes->attempt;
    
    // Access transport-specific attributes
    $attributes = $context->transportAttributes->attributes;
    
    // SQS attributes
    $messageId = $attributes['MessageId'] ?? null;
    $receiptHandle = $attributes['ReceiptHandle'] ?? null;
    $sentTimestamp = $attributes['SentTimestamp'] ?? null;
    
    // Handle based on attempt count
    if ($attemptNumber === 0) {
        // First attempt
        $this->handleFirstAttempt($event);
    } else {
        // Retry attempt
        $this->handleRetry($event, $attemptNumber);
    }
}

External events

Send outside to subscribed clients (for example from SNS). Or receive from outside where we subscribed (for example to SNS).

<?php
declare(strict_types=1);

use GDXbsv\PServiceBus\Message\ExternalIn;
use GDXbsv\PServiceBus\Message\ExternalOut;

/**
 * @internal
 * @immutable
 * @psalm-immutable
 */
#[ExternalOut(transportName: 'memory-external', externalName: 'test.external_1_event')]
final class ExternalOutEvent
{
}

#[ExternalIn(transportName: 'memory-external', externalName: 'test.external_1_event')]
final class ExternalInEvent
{
}

Replay

Sometimes something goes wrong and you want to replay certain events. For this use replay annotations.

<?php
declare(strict_types=1);

use GDXbsv\PServiceBus\Message\EventOptions;
use GDXbsv\PServiceBus\Message\Message;
use GDXbsv\PServiceBus\Message\Replay\Replay;
use GDXbsv\PServiceBusTestApp\Handling\Test1Event;

/**
 * @internal
 * @immutable
 * @psalm-immutable
 *
 * @psalm-import-type ReplayOutput from \GDXbsv\PServiceBus\Message\Replay\Replaying
 */
final class ReplayForEvent
{
    /**
     * @return ReplayOutput
     */
    #[Replay(replayName: 'testReplay')]
    public function anyName(): \Traversable {
        for ($i=1; $i<=5; ++$i) {
            yield new Message(new Test1Event(), EventOptions::record());
        }
    }
}

Then use command to start replay

p-service-bus:message:replay testReplay "\GDXbsv\PServiceBusTestApp\Handling\Handlers::handle2Event1" memory

where:

  • testReplay is replay name from Attribute
  • "\GDXbsv\PServiceBusTestApp\Handling\Handlers::handle2Event1" is className + :: + methodName
  • memory is transport name

Saga

Inspired: https://docs.particular.net/nservicebus/sagas/

This is a long living process when you have to react on multiple events to make some decision. Or just when you data and message should be transactionally binded together with outbox pattern.

Example with doctrine:

<?php declare(strict_types=1);

use GDXbsv\PServiceBus\Bus\Handling\Handle;
use GDXbsv\PServiceBus\Id;
use GDXbsv\PServiceBus\Message\TimeSpan;
use GDXbsv\PServiceBus\Saga\MessageSagaContext;
use GDXbsv\PServiceBus\Saga\Saga;
use GDXbsv\PServiceBus\Saga\SagaContext;
use GDXbsv\PServiceBus\Saga\SagaPropertyMapper;
use Doctrine\ORM\Mapping as ORM;
use GDXbsv\PServiceBusTestApp\Saga\TestSagaCommand;
use GDXbsv\PServiceBusTestApp\Saga\TestSagaMapStringCommand;
use GDXbsv\PServiceBusTestApp\Saga\TestsSagaInEvent;
use GDXbsv\PServiceBusTestApp\Saga\TestsSagaOutputEvent;


/**
 * @final
 */
 #[ORM\Entity]
final class TestSaga extends Saga
{
    #[ORM\Column(type: 'id', nullable: false)]
    #[ORM\Id]
    private Id $id;
    #[ORM\Column(type: 'string', nullable: false)]
    private ?string $string;
    #[ORM\Column(type: 'string', nullable: true)]
    private ?string $value;

    /**
     * @param Id<static> $id
     */
    private function __construct(Id $id, string $string)
    {
        $this->id = $id;
        $this->string = $string;
    }
    
    public static function configureHowToCreateSaga(SagaCreateMapper $mapper): void
    {
        $mapper
            ->toMessage(
                // do not forget to create handling function in a case if saga exists and to let saga know that we wait this message
                function (TestSagaCreateCommand $command, MessageSagaContext $context) {
                    return new self(new Id($command->id), $command->string);
                }
            );
    }

    public static function configureHowToFindSaga(SagaPropertyMapper $mapper): void
    {
        $mapper
            // Find saga by id
            ->mapSaga(new \ReflectionProperty(TestSaga::class, 'id'))
            ->toMessage(
                function (TestSagaCommand $command, MessageSagaContext $context) {
                    return new Id($command->id);
                }
            )
            ->toMessage(
                function (TestsSagaInEvent $message, MessageSagaContext $context) {
                    return new Id($message->string);
                }
            );
        $mapper
            // Find saga by string propery
            ->mapSaga(new \ReflectionProperty(TestSaga::class, 'string'))
            ->toMessage(
                function (TestSagaMapStringCommand $command, MessageSagaContext $context) {
                    return $command->string;
                }
            );
    }
    
    /** We have to tell saga we wait this message, or saga could already exist */
    #[Handle('memory', 3)]
    public function testSagaCreateCommand(TestSagaCreateCommand $command, SagaContext $context)
    {
        $this->string = $command->string;
    }
    
    /** We can remove saga after all */
    #[Handle('memory', 3)]
    public function testRemove(TestSagaRemoveCommand $command, SagaContext $context)
    {
        $this->markAsComplete();
    }

    #[Handle('memory', 3)]
    public function testHandlerFunction(TestSagaCommand $command, SagaContext $context)
    {
        $this->string = $command->string;
        $context->timeout(new TestsSagaOutputEvent('testHandlerFunction'), TimeSpan::fromSeconds(0));
    }

    #[Handle('memory', 3)]
    public function testListeningFunction(TestsSagaInEvent $event, SagaContext $context)
    {
        $this->string = $event->string;
        $this->value = $event->value;
        $context->publish(new TestsSagaOutputEvent('testListeningFunction'));
    }

    #[Handle('memory', 3)]
    public function handleTestSagaMapStringCommand(
        TestSagaMapStringCommand $command,
        SagaContext $context
    ) {
        $context->publish(new TestsSagaOutputEvent($this->id->toString()));
    }
}

where:

  • configureHowToCreateSaga describe how to create saga (WARNING: Handlers should exist for all creation messages)
  • configureHowToFindSaga describe how to find saga
  • #[Handle('memory', 3)] set methods as handlers

You can create custom finders by using #[SagaFind] attribute, for example:

<?php declare(strict_types=1);

use Doctrine\ORM\EntityManager;
use GDXbsv\PServiceBus\Message\MessageOptions;
use GDXbsv\PServiceBus\Saga\SagaFind;
use GDXbsv\PServiceBusTestApp\Saga\CustomDoctrineSearchEvent;
use GDXbsv\PServiceBusTestApp\Saga\TestSaga;

/**
 * @internal
 * @immutable
 * @psalm-immutable
 */
final class CustomDoctrineSagaFinder
{
    public function __construct(
        private EntityManager $em
    ) {
    }

    #[SagaFind]
    public function findByMultipleFields(
        CustomDoctrineSearchEvent $event,
        MessageOptions $messageOptions
    ): TestSaga {
        $qb = $this->em->createQueryBuilder();
        $qb
            ->from(TestSaga::class, 'saga')
            ->select('saga')
            ->where($qb->expr()->eq('saga.string', ':propertyValue'))
            ->setParameter(':propertyValue', $event->string);
        $saga = $qb->getQuery()->getSingleResult();

        return $saga;
    }
}

Transport

Available transports: InMemory, RabbitMQ (Bunny), SQS, SNS, Kafka, and CompositeTransport.

You can create custom transports by implementing these interfaces:

interface Transport
{
    /**
     * @return \Generator<int, void, Envelope|null, void>
     */
    public function sending(): \Generator;

    /**
     * @return \Generator<int, Envelope, Result\Ok<null, mixed>|Result\Err<mixed, \Exception>, void>
     */
    public function receive(int $limit = 0): \Generator;

    public function stop(): void;
}
interface TransportSynchronisation
{
    public function sync(): void;
}

Please pay attention that sync() method for an external bus MUST subscribe you on external message name if you want it to happen automatically.

CompositeTransport

CompositeTransport allows you to combine multiple transports and send messages to all of them simultaneously. This is useful for:

  • Broadcasting messages to multiple message brokers
  • Synchronizing data across different systems
  • Migration scenarios where you need to publish to both old and new infrastructure

Note: CompositeTransport only supports sending messages. Receiving and synchronization must be done from individual transports independently.

use GDXbsv\PServiceBus\Transport\CompositeTransport;

$sqsTransport = SqsTransport::ofDsn('sqs+http://...');
$kafkaTransport = KafkaFactory::createInternalFromDsn('kafka://...');

$composite = new CompositeTransport([$sqsTransport, $kafkaTransport]);

SQS transport

Examle DSN or config sqs+http://key:secret@aws:4100/123456789012?region=eu-west-1&retries=3&visibilityTimeout=30&waitSeconds=20&waitBetweenLoopsSeconds=40&messagesBatch=10&preload=true&tags[name]=value&tags[name2]=value2&assume=arn%3Aaws%3Aiam%3A%3A123456789012%3Arole%2Fxaccounts3access&queue=QueueName"

Options:

OptionDescriptionDefault
regionAWS region
retriesRetries before DLQ (Dead Letter Queue) during the creation of the queue3
visibilityTimeoutHow long consumed messages will be blocked from next attempt (sec)30
waitSecondsLong polling - how long will AWS wait to collect a batch of messages20
waitBetweenLoopsSecondsIf no messages, how long will we sleep between attempts40
messagesBatchHow many messages will we consume from AWS with one request10
preloadAsynchronously load the next messages while working on the previous onestrue
tags[name]Add tags to the queue during the creation
assumeThe AWS role which we want to assume
queueThe actual queue name

Message Key for FIFO Queues

SQS FIFO queues support the messageKey feature (see Message Key section for usage). For FIFO queues:

  • messageKey is mapped to MessageGroupId
  • Messages with the same MessageGroupId are processed in order
  • MessageDeduplicationId is automatically set (uses message ID)
  • Queue name must end with .fifo
  • Enable content-based deduplication on the queue or ensure unique message IDs

Note: Standard (non-FIFO) SQS queues ignore the messageKey parameter.

RabbitMQ (Bunny) Transport

RabbitMQ transport provides reliable message queuing with AMQP protocol. There are two transport types:

  • Internal Transport: Direct queue-to-queue messaging with built-in retry and DLQ support
  • External Transport: Exchange-based pub/sub for inter-service communication

Message Key Usage

The messageKey parameter is stored in message headers for tracking and correlation purposes. RabbitMQ queues maintain FIFO (first-in-first-out) order by default, ensuring messages are processed in the order they are received.

Example:

use GDXbsv\PServiceBus\Message\MessageOptions;

$bus->send(
    new OrderCommand($orderId),
    MessageOptions::record()->withMessageKey('customer-123')
);

The messageKey is preserved in headers throughout the message lifecycle and can be accessed in handlers via $envelope->headers['message_key'].

Additional Information

See https://gitlab.com/GDXbsv/pservicebus/-/tree/master/src/Transport/Bunny

SQS-SNS transport

For SQS-SNS transports SNS is only for external messages.

See https://gitlab.com/GDXbsv/pservicebus/-/tree/master/src/Transport/Sqs See https://gitlab.com/GDXbsv/pservicebus/-/tree/master/src/Transport/Sns

Kafka Transport

Kafka transport provides high-throughput, distributed messaging with both internal and external transports.

Internal Transport DSN

Example DSN:

kafka://broker1:9092,broker2:9092/my-topic?group.id=my-consumer-group&partitions.amount=3&replication.factor=3&auto.offset.reset=earliest&enable.transactions=true

With SASL authentication:

kafka://user:pass@broker1:9092,broker2:9092/my-topic?group.id=my-group&partitions.amount=3

DSN Options

OptionDescriptionDefaultRequired
broker listComma-separated list of Kafka brokers (host:port)-Yes
topic (path)Topic name for internal messages-Yes
username (DSN user)SASL username (set via DSN: kafka://user:pass@broker/topic)-No
password (DSN password)SASL password (set via DSN: kafka://user:pass@broker/topic)-No
partitions.amountNumber of partitions for the topic-Yes
group.idConsumer group identifierp-service-busNo
replication.factorReplication factor for the topic3No
auto.offset.resetWhere to start consuming: earliest (from beginning) or latest (only new messages)earliestNo
enable.transactionsEnable transactional message production (true/false)falseNo
transactional.idCustom transactional ID (auto-generated if not provided when transactions enabled)autoNo
socket.timeout.msSocket timeout in milliseconds-No
queue.buffering.max.messagesMaximum number of messages in producer queue-No
queue.buffering.max.msMaximum time messages can be buffered before sending (ms)-No
batch.num.messagesMaximum number of messages in a batch-No

SSL Support

Use kafka+ssl:// scheme for SSL connections:

kafka+ssl://broker1:9093/my-topic?partitions.amount=3&ssl.ca.location=/path/to/ca-cert.pem

With SASL authentication over SSL:

kafka+ssl://user:pass@broker1:9093/my-topic?partitions.amount=3&ssl.ca.location=/path/to/ca-cert.pem

SSL Options:

OptionDescriptionRequired for SSL
ssl.ca.locationPath to the CA certificate file (PEM format) for SSL/TLSYes

Note: The ssl.ca.location parameter is mandatory when using kafka+ssl:// scheme. It specifies the path to the CA certificate bundle used to verify the broker's certificate.

AWS MSK IAM Authentication

PServiceBus supports AWS MSK (Managed Streaming for Kafka) with IAM authentication using SASL/OAUTHBEARER mechanism. This allows you to use AWS IAM roles and policies to control access to your Kafka clusters without managing separate credentials.

Requirements:

  • AWS SDK for PHP: composer require aws/aws-sdk-php
  • Use kafka+ssl:// scheme (IAM authentication requires SSL/TLS)
  • AWS credentials configured (via environment variables, IAM role, or explicit credentials)

DSN Format:

kafka+ssl://broker.kafka.region.amazonaws.com:9098/my-topic?partitions.amount=3&group.id=my-group&aws.iam.auth=true&aws.region=us-east-1

AWS MSK IAM Options:

OptionDescriptionRequiredDefault
aws.iam.authEnable AWS IAM authentication (must be true for MSK IAM)Yesfalse
aws.regionAWS region where your MSK cluster is located (e.g., us-east-1, eu-west-1)Yes-
aws.assume.role.arnARN of IAM role to assume for authentication (optional, for cross-account access)No-
aws.iam.session_tokenAWS session token (for temporary credentials, rarely needed)No-

Authentication Methods:

  1. Default Credentials (Recommended for EC2/ECS/Lambda):

    kafka+ssl://b-1.mycluster.xxxxx.kafka.us-east-1.amazonaws.com:9098/orders?partitions.amount=3&aws.iam.auth=true&aws.region=us-east-1
    

    Uses AWS credentials from the environment (IAM instance profile, environment variables, or ~/.aws/credentials)

  2. Explicit Credentials:

    kafka+ssl://ACCESS_KEY:SECRET_KEY@b-1.mycluster.xxxxx.kafka.us-east-1.amazonaws.com:9098/orders?partitions.amount=3&aws.iam.auth=true&aws.region=us-east-1
    

    Uses provided access key and secret key in the DSN

  3. With Session Token (Temporary Credentials):

    kafka+ssl://ACCESS_KEY:SECRET_KEY@broker:9098/orders?partitions.amount=3&aws.iam.auth=true&aws.region=us-east-1&aws.iam.session_token=SESSION_TOKEN
    
  4. Assume Role (Cross-Account or Role Chaining):

    kafka+ssl://broker:9098/orders?partitions.amount=3&aws.iam.auth=true&aws.region=us-east-1&aws.assume.role.arn=arn:aws:iam::123456789012:role/KafkaAccessRole
    

    Assumes the specified IAM role for authentication

Complete Example:

use GDXbsv\PServiceBus\Transport\Kafka\KafkaFactory;

// Using default AWS credentials (EC2 instance profile or environment)
$transport = KafkaFactory::createInternalFromDsn(
    'kafka+ssl://b-1.mycluster.xxxxx.kafka.us-east-1.amazonaws.com:9098,b-2.mycluster.xxxxx.kafka.us-east-1.amazonaws.com:9098/orders?group.id=order-processor&partitions.amount=5&replication.factor=3&aws.iam.auth=true&aws.region=us-east-1'
);

// With assume role for cross-account access
$transport = KafkaFactory::createInternalFromDsn(
    'kafka+ssl://b-1.mycluster.xxxxx.kafka.us-east-1.amazonaws.com:9098/orders?group.id=processor&partitions.amount=3&aws.iam.auth=true&aws.region=us-east-1&aws.assume.role.arn=arn:aws:iam::987654321098:role/CrossAccountKafkaAccess'
);

// With explicit credentials (not recommended for production)
$transport = KafkaFactory::createInternalFromDsn(
    'kafka+ssl://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY@b-1.mycluster.xxxxx.kafka.us-east-1.amazonaws.com:9098/orders?group.id=processor&partitions.amount=3&aws.iam.auth=true&aws.region=us-east-1'
);

Required IAM Permissions:

Your IAM role or user must have permissions to connect to the MSK cluster. Example policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": "arn:aws:kafka:us-east-1:123456789012:topic/my-cluster/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": "arn:aws:kafka:us-east-1:123456789012:group/my-cluster/*"
        }
    ]
}

How It Works:

  1. When aws.iam.auth=true is set, the transport uses SASL/OAUTHBEARER mechanism
  2. AWS credentials are obtained from the specified source (environment, DSN, or assumed role)
  3. A signed URL is generated using AWS Signature Version 4 for the Kafka cluster
  4. The signed URL is encoded and used as the OAuth token
  5. Tokens are automatically refreshed before expiration (default: 900 seconds)

Troubleshooting:

  • Error: "authentication failure": Verify IAM permissions and AWS credentials
  • Error: "broker transport failure": Check broker address and port (should be 9098 for IAM)
  • Error: "invalid argument": Ensure kafka+ssl:// scheme is used with aws.iam.auth=true
  • Token refresh issues: Tokens are cached and refreshed automatically; ensure credentials haven't expired

Notes:

  • IAM authentication requires the broker to support SASL/OAUTHBEARER (AWS MSK with IAM enabled)
  • SSL/TLS is mandatory for IAM authentication (kafka+ssl:// scheme required)
  • Token lifetime is 15 minutes, automatically refreshed
  • Both Producer and Consumer are authenticated during construction
  • Admin operations (topic creation, metadata) are also authenticated

Examples

Basic setup:

use GDXbsv\PServiceBus\Transport\Kafka\KafkaFactory;

$transport = KafkaFactory::createInternalFromDsn(
    'kafka://localhost:9092/orders?partitions.amount=3'
);

With transactions (exactly-once semantics):

$transport = KafkaFactory::createInternalFromDsn(
    'kafka://localhost:9092/orders?partitions.amount=3&enable.transactions=true'
);

Multiple brokers with SSL:

$transport = KafkaFactory::createInternalFromDsn(
    'kafka+ssl://broker1:9093,broker2:9093/orders?&partitions.amount=5&replication.factor=3'
);

Performance tuning:

$transport = KafkaFactory::createInternalFromDsn(
    'kafka://localhost:9092/high-throughput?group.id=processor&partitions.amount=10&queue.buffering.max.messages=100000&queue.buffering.max.ms=50&batch.num.messages=10000'
);

External Transport

For external messages (pub/sub across services), use KafkaFactory::createExternalFromDsn():

use GDXbsv\PServiceBus\Transport\Kafka\KafkaFactory;

$externalTransport = KafkaFactory::createExternalFromDsn(
    'kafka://localhost:9092/external-events?group.id=external-consumer&partitions.amount=3',
    messageNameMapOut: ['com.myapp.users.created' => 'UserCreated'],
    messageNameMapIn: ['com.another_app.users.validated' => 'UserValidated']
);

Key Features

  • Transactions: Enable with enable.transactions=true for exactly-once delivery semantics
  • Consumer Groups: Use group.id to scale consumers horizontally
  • Partitioning: Set partitions.amount to control parallelism
  • Offset Management: Control where to start consuming with auto.offset.reset
  • High Throughput: Tune batching and buffering parameters for performance
  • Partition Key: Kafka uses the messageKey (see Message Key section) as the partition key. Messages with the same key are sent to the same partition, guaranteeing order preservation. See the main Message Key section for usage examples.

Notes

  • Dead Letter Queue (DLQ) topics are automatically created with _DL suffix
  • Retries are not yet implemented in Kafka transport
  • Manual commit is used for reliability (enable.auto.commit=false)
  • Partition EOF detection is enabled by default

See https://gitlab.com/GDXbsv/pservicebus/-/tree/master/src/Transport/Kafka