elvenpath/laravel-pub-sub-adapter

Installs: 2

Dependents: 0

Suggesters: 0

Security: 0

Stars: 0

Watchers: 0

Forks: 0

Open Issues: 0

pkg:composer/elvenpath/laravel-pub-sub-adapter

dev-main 2025-09-30 04:49 UTC

This package is auto-updated.

Last update: 2025-09-30 04:49:29 UTC


README

A robust publish-subscribe messaging system for Laravel applications supporting either Kafka or Redis as the message transport (configured via driver setting).

Features

  • Configurable Transport: Choose between Kafka or Redis as your message bus driver
  • Attribute-based Configuration: Simple declaration using PHP 8 attributes
  • Auto-discovery: Automatic handler and publisher discovery
  • Message Factory: Automatic serialization/deserialization with complex type support
  • Priority Handlers: Control handler execution order with priorities
  • Artisan Commands: Built-in CLI tools for message management
  • PHPStan Max Level: Full static analysis support
  • Well Tested: Comprehensive test coverage

Requirements

  • PHP ^8.2
  • Laravel ^10.0 or ^11.0
  • ext-json
  • ext-kafka (optional - only required when using Kafka driver)
  • mateusjunges/laravel-kafka (optional - only required when using Kafka driver)

Installation

Install the package via Composer:

# For Redis-only usage
composer require elvenpath/laravel-pub-sub-adapter

# For Kafka support, also install:
composer require mateusjunges/laravel-kafka
# And ensure php-kafka extension is installed

The service provider will be automatically registered via Laravel's package discovery.

Note: If you plan to use Kafka, you'll need to install the mateusjunges/laravel-kafka package and ensure the php-kafka extension is available. For Redis-only usage, no additional dependencies are required.

Choosing Your Message Transport

This package supports two message transport options:

  • Redis: Simple pub/sub messaging, perfect for real-time notifications and lightweight messaging
  • Kafka: Enterprise-grade streaming platform with durability, partitioning, and replay capabilities

Choose Redis for simpler use cases, or Kafka for high-throughput, mission-critical messaging systems.

Redis Driver Requirements

For Redis support, you need one of the following:

  1. PhpRedis Extension (Recommended for production):

    # Install via PECL
    pecl install redis
  2. Predis Package (Pure PHP, no extension required):

    composer require predis/predis

Laravel will automatically detect which Redis client is available. Configure your Redis connection in config/database.php as per Laravel documentation.

Publishing Configuration

Publish the configuration file:

php artisan vendor:publish --provider="LaravelMessaging\MessagingServiceProvider"

This will create a config/messaging.php configuration file.

Configuration

Basic Configuration

Configure your message bus in config/messaging.php:

return [
    // Message bus driver: 'kafka' or 'redis'
    'driver' => env('MESSAGING_DRIVER', 'kafka'),

    'service' => [
        'name' => env('MESSAGING_SERVICE_NAME', env('APP_NAME', 'Laravel Messaging')),
        'instance' => env('MESSAGING_SERVICE_INSTANCE', 'default'),
    ],

    // Kafka Configuration
    'kafka' => [],

    // Redis Configuration
    'redis' => [
        'connection' => env('MESSAGING_REDIS_CONNECTION', 'pubsub'),
    ],

    // Handler discovery
    'handler_discovery' => [
        'enabled' => env('MESSAGING_HANDLER_DISCOVERY', true),
        'scan_paths' => [
            app_path('Handlers') => 'App\\Handlers',
            app_path('Messaging/Handlers') => 'App\\Messaging\\Handlers',
        ],
    ],

    // Publisher discovery
    'publisher_discovery' => [
        'enabled' => env('MESSAGING_PUBLISHER_DISCOVERY', true),
        'scan_paths' => [
            app_path('Services') => 'App\\Services',
            app_path('Http/Controllers') => 'App\\Http\\Controllers',
            app_path('Jobs') => 'App\\Jobs',
            app_path('Messaging/Publishers') => 'App\\Messaging\\Publishers',
        ],
    ],

    // Legacy handler subscriptions
    'subscriptions' => [
        // Example:
        // [
        //     'topic' => 'user.events',
        //     'handler' => App\\Handlers\\UserHandler::class,
        //     'priority' => 100,
        // ],
    ],
];

Environment Variables

Add these to your .env file:

# Message Bus Configuration
MESSAGING_DRIVER=redis

# Service Configuration
MESSAGING_SERVICE_NAME="My App"
MESSAGING_SERVICE_INSTANCE=default

# Redis Configuration (if using Redis)
MESSAGING_REDIS_CONNECTION=pubsub

# Discovery Configuration
MESSAGING_HANDLER_DISCOVERY=true
MESSAGING_PUBLISHER_DISCOVERY=true

Usage

Creating Messages

Messages are simple PHP classes that extend BaseMessage:

<?php declare(strict_types=1);

namespace App\Messages;

use LaravelMessaging\Messaging\BaseMessage;

class UserRegisteredMessage extends BaseMessage
{
    public function __construct(
        public readonly int $userId,
        public readonly string $email,
        public readonly string $name,
        public readonly \DateTimeImmutable $registeredAt
    ) {
        parent::__construct();
    }

    public static function getMessageType(): string
    {
        return 'user.registered';
    }
}

Creating Message Handlers

Handlers process incoming messages. Use the #[MessageHandler] attribute to configure them:

<?php declare(strict_types=1);

namespace App\Handlers;

use LaravelMessaging\Messaging\Handler\MessageHandler;
use LaravelMessaging\Messaging\Handler\MessageHandlerInterface;
use LaravelMessaging\Messaging\MessageInterface;
use App\Messages\UserRegisteredMessage;

#[MessageHandler(
    messageTypes: ['user.registered'],
    priority: 100,
    enabled: true
)]
class SendWelcomeEmailHandler implements MessageHandlerInterface
{
    public function handle(MessageInterface $message): bool
    {
        if (!$message instanceof UserRegisteredMessage) {
            return false;
        }

        // Send welcome email logic here
        Mail::to($message->email)->send(new WelcomeEmail($message->name));

        return true;
    }

    public static function getHandledMessageTypes(): array
    {
        return ['user.registered'];
    }
}

Handler Attributes

  • messageTypes: Array of message types this handler processes
  • priority: Higher priority handlers execute first (default: 100)
  • enabled: Toggle handler on/off (default: true)

Creating Publishers

Publishers send messages to the message bus:

<?php declare(strict_types=1);

namespace App\Publishers;

use LaravelMessaging\Messaging\Publisher\MessagePublisher;
use LaravelMessaging\Messaging\Bus\MessageBusInterface;
use App\Messages\UserRegisteredMessage;

#[MessagePublisher(
    messageTypes: ['user.registered'],
    description: 'Publishes user registration events',
    topics: ['user-events'],
    enabled: true
)]
class UserEventPublisher
{
    public function __construct(
        private readonly MessageBusInterface $messageBus
    ) {}

    public function publishUserRegistered(User $user): void
    {
        $message = new UserRegisteredMessage(
            userId: $user->id,
            email: $user->email,
            name: $user->name,
            registeredAt: new \DateTimeImmutable()
        );

        $this->messageBus->publish($message);
    }
}

Multiple Handlers per Message

You can have multiple handlers for the same message type with different priorities:

#[MessageHandler(messageTypes: ['order.created'], priority: 200)]
class UpdateInventoryHandler implements MessageHandlerInterface
{
    // Runs first (priority 200)
}

#[MessageHandler(messageTypes: ['order.created'], priority: 100)]
class SendOrderConfirmationHandler implements MessageHandlerInterface
{
    // Runs second (priority 100)
}

#[MessageHandler(messageTypes: ['order.created'], priority: 50)]
class LogOrderHandler implements MessageHandlerInterface
{
    // Runs last (priority 50)
}

Console Commands

Discover Messages

List all registered handlers and publishers:

# Discover all publishers and handlers
php artisan messages:discover

# Show only publishers
php artisan messages:discover --publishers

# Show only handlers
php artisan messages:discover --handlers

# Filter by message type
php artisan messages:discover --type=user.registered

# Show message flow mapping
php artisan messages:discover --flow

Consume Messages

Start consuming messages from the configured bus:

# Consume from all configured topics
php artisan messages:consume

# Consume from specific topic
php artisan messages:consume --topic=user.events

Publish Message

Publish a message via CLI:

# Publish with custom data
php artisan messages:publish user.registered \
    --data='{"userId":123,"email":"user@example.com"}' \
    --type="user.registered"

# Publish multiple messages
php artisan messages:publish test.topic --count=5

# Publish simple test message
php artisan messages:publish test.topic

Subscribe to Redis

For Redis pub/sub, start the subscriber:

# Subscribe to all configured Redis topics
php artisan messages:redis-subscribe

# Subscribe to specific topic
php artisan messages:redis-subscribe --topic=user.events

# Subscribe with timeout (in seconds)
php artisan messages:redis-subscribe --timeout=3600

Advanced Usage

Custom Message Bus

Implement your own message bus by extending AbstractMessageBus:

<?php declare(strict_types=1);

namespace App\Messaging;

use LaravelMessaging\Messaging\Bus\AbstractMessageBus;
use LaravelMessaging\Messaging\MessageInterface;

class CustomMessageBus extends AbstractMessageBus
{
    public function publish(MessageInterface $message): void
    {
        // Your custom publishing logic
    }

    public function consume(array $options = []): void
    {
        // Your custom consuming logic
    }
}

Message with Complex Types

The message factory handles complex type conversions automatically:

class ComplexMessage extends BaseMessage
{
    public function __construct(
        public readonly array $items,
        public readonly ?string $optional,
        public readonly CustomObject $customObject,
        public readonly \DateTime $createdAt
    ) {
        parent::__construct();
    }
}

Conditional Handler Registration

Disable handlers based on environment or configuration:

#[MessageHandler(
    messageTypes: ['payment.processed'],
    enabled: APP_ENV !== 'testing'  // Disable in tests
)]
class ChargePaymentHandler implements MessageHandlerInterface
{
    // Handler implementation
}

Handler Registry Access

Access the handler registry directly:

use LaravelMessaging\Messaging\Handler\MessageHandlerRegistry;

$registry = app(MessageHandlerRegistry::class);

// Get all handlers for a message type
$handlers = $registry->getHandlersForMessageType('user.registered');

// Register a handler manually
$registry->registerHandler('user.registered', $handlerInstance, 100);

// Get all registered message types
$types = $registry->getRegisteredMessageTypes();

Testing

Unit Testing Handlers

<?php declare(strict_types=1);

namespace Tests\Unit\Handlers;

use Tests\TestCase;
use App\Handlers\SendWelcomeEmailHandler;
use App\Messages\UserRegisteredMessage;

class SendWelcomeEmailHandlerTest extends TestCase
{
    public function test_handler_processes_user_registered_message(): void
    {
        $handler = new SendWelcomeEmailHandler();

        $message = new UserRegisteredMessage(
            userId: 1,
            email: 'test@example.com',
            name: 'Test User',
            registeredAt: new \DateTimeImmutable()
        );

        $result = $handler->handle($message);

        $this->assertTrue($result);
        // Assert email was sent
    }
}

Testing Publishers

public function test_publisher_sends_message_to_bus(): void
{
    $messageBus = $this->createMock(MessageBusInterface::class);
    $messageBus->expects($this->once())
        ->method('publish')
        ->with($this->isInstanceOf(UserRegisteredMessage::class));

    $publisher = new UserEventPublisher($messageBus);

    $user = User::factory()->create();
    $publisher->publishUserRegistered($user);
}

Development

Running Tests

# Run all tests
make test

# With coverage
make coverage

Code Style

# Fix code style
make style-check

Static Analysis

# Run PHPStan
make static-analysis

Examples

Check the /examples directory for complete working examples:

  • ActivityLogMessage.php - Example message class
  • ActivityLogMessageHandler.php - Example handler
  • ActivityLogPublisher.php - Example publisher
  • UserRegisteredMessage.php - User registration message
  • UserRegisteredHandler.php - User registration handler

Contributing

Contributions are welcome! Please ensure:

  1. All tests pass (make test)
  2. Code style is fixed (make style-check)
  3. Static analysis passes (make static-analysis)
  4. New features include tests
  5. Documentation is updated

License

This package is open-sourced software licensed under the MIT license.

Support

For issues and questions, please use the GitHub issue tracker.

Credits

Changelog

Please see CHANGELOG for recent changes.