elvenpath / laravel-pub-sub-adapter
Installs: 2
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 0
Forks: 0
Open Issues: 0
pkg:composer/elvenpath/laravel-pub-sub-adapter
Requires
- php: ^8.2
- ext-json: *
- laravel/framework: ^10.0|^11.0|^12.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^v3.75.0
- mockery/mockery: ^1.6
- orchestra/testbench: ^9.14
- phpstan/phpstan: ^1.10
- phpunit/phpunit: ^11.0
Suggests
- ext-kafka: Required when using Kafka message bus driver
- ext-redis: For better performance with Redis driver (alternative to predis/predis)
- mateusjunges/laravel-kafka: Required when using Kafka message bus driver (^2.5)
- predis/predis: For Redis support without PHP extension (alternative to ext-redis)
This package is auto-updated.
Last update: 2025-09-30 04:49:29 UTC
README
A robust publish-subscribe messaging system for Laravel applications supporting either Kafka or Redis as the message transport (configured via driver setting).
Features
- Configurable Transport: Choose between Kafka or Redis as your message bus driver
- Attribute-based Configuration: Simple declaration using PHP 8 attributes
- Auto-discovery: Automatic handler and publisher discovery
- Message Factory: Automatic serialization/deserialization with complex type support
- Priority Handlers: Control handler execution order with priorities
- Artisan Commands: Built-in CLI tools for message management
- PHPStan Max Level: Full static analysis support
- Well Tested: Comprehensive test coverage
Requirements
- PHP ^8.2
- Laravel ^10.0 or ^11.0
- ext-json
- ext-kafka (optional - only required when using Kafka driver)
- mateusjunges/laravel-kafka (optional - only required when using Kafka driver)
Installation
Install the package via Composer:
# For Redis-only usage composer require elvenpath/laravel-pub-sub-adapter # For Kafka support, also install: composer require mateusjunges/laravel-kafka # And ensure php-kafka extension is installed
The service provider will be automatically registered via Laravel's package discovery.
Note: If you plan to use Kafka, you'll need to install the mateusjunges/laravel-kafka
package and ensure the php-kafka
extension is available. For Redis-only usage, no additional dependencies are required.
Choosing Your Message Transport
This package supports two message transport options:
- Redis: Simple pub/sub messaging, perfect for real-time notifications and lightweight messaging
- Kafka: Enterprise-grade streaming platform with durability, partitioning, and replay capabilities
Choose Redis for simpler use cases, or Kafka for high-throughput, mission-critical messaging systems.
Redis Driver Requirements
For Redis support, you need one of the following:
-
PhpRedis Extension (Recommended for production):
# Install via PECL pecl install redis
-
Predis Package (Pure PHP, no extension required):
composer require predis/predis
Laravel will automatically detect which Redis client is available. Configure your Redis connection in config/database.php
as per Laravel documentation.
Publishing Configuration
Publish the configuration file:
php artisan vendor:publish --provider="LaravelMessaging\MessagingServiceProvider"
This will create a config/messaging.php
configuration file.
Configuration
Basic Configuration
Configure your message bus in config/messaging.php
:
return [ // Message bus driver: 'kafka' or 'redis' 'driver' => env('MESSAGING_DRIVER', 'kafka'), 'service' => [ 'name' => env('MESSAGING_SERVICE_NAME', env('APP_NAME', 'Laravel Messaging')), 'instance' => env('MESSAGING_SERVICE_INSTANCE', 'default'), ], // Kafka Configuration 'kafka' => [], // Redis Configuration 'redis' => [ 'connection' => env('MESSAGING_REDIS_CONNECTION', 'pubsub'), ], // Handler discovery 'handler_discovery' => [ 'enabled' => env('MESSAGING_HANDLER_DISCOVERY', true), 'scan_paths' => [ app_path('Handlers') => 'App\\Handlers', app_path('Messaging/Handlers') => 'App\\Messaging\\Handlers', ], ], // Publisher discovery 'publisher_discovery' => [ 'enabled' => env('MESSAGING_PUBLISHER_DISCOVERY', true), 'scan_paths' => [ app_path('Services') => 'App\\Services', app_path('Http/Controllers') => 'App\\Http\\Controllers', app_path('Jobs') => 'App\\Jobs', app_path('Messaging/Publishers') => 'App\\Messaging\\Publishers', ], ], // Legacy handler subscriptions 'subscriptions' => [ // Example: // [ // 'topic' => 'user.events', // 'handler' => App\\Handlers\\UserHandler::class, // 'priority' => 100, // ], ], ];
Environment Variables
Add these to your .env
file:
# Message Bus Configuration MESSAGING_DRIVER=redis # Service Configuration MESSAGING_SERVICE_NAME="My App" MESSAGING_SERVICE_INSTANCE=default # Redis Configuration (if using Redis) MESSAGING_REDIS_CONNECTION=pubsub # Discovery Configuration MESSAGING_HANDLER_DISCOVERY=true MESSAGING_PUBLISHER_DISCOVERY=true
Usage
Creating Messages
Messages are simple PHP classes that extend BaseMessage
:
<?php declare(strict_types=1); namespace App\Messages; use LaravelMessaging\Messaging\BaseMessage; class UserRegisteredMessage extends BaseMessage { public function __construct( public readonly int $userId, public readonly string $email, public readonly string $name, public readonly \DateTimeImmutable $registeredAt ) { parent::__construct(); } public static function getMessageType(): string { return 'user.registered'; } }
Creating Message Handlers
Handlers process incoming messages. Use the #[MessageHandler]
attribute to configure them:
<?php declare(strict_types=1); namespace App\Handlers; use LaravelMessaging\Messaging\Handler\MessageHandler; use LaravelMessaging\Messaging\Handler\MessageHandlerInterface; use LaravelMessaging\Messaging\MessageInterface; use App\Messages\UserRegisteredMessage; #[MessageHandler( messageTypes: ['user.registered'], priority: 100, enabled: true )] class SendWelcomeEmailHandler implements MessageHandlerInterface { public function handle(MessageInterface $message): bool { if (!$message instanceof UserRegisteredMessage) { return false; } // Send welcome email logic here Mail::to($message->email)->send(new WelcomeEmail($message->name)); return true; } public static function getHandledMessageTypes(): array { return ['user.registered']; } }
Handler Attributes
messageTypes
: Array of message types this handler processespriority
: Higher priority handlers execute first (default: 100)enabled
: Toggle handler on/off (default: true)
Creating Publishers
Publishers send messages to the message bus:
<?php declare(strict_types=1); namespace App\Publishers; use LaravelMessaging\Messaging\Publisher\MessagePublisher; use LaravelMessaging\Messaging\Bus\MessageBusInterface; use App\Messages\UserRegisteredMessage; #[MessagePublisher( messageTypes: ['user.registered'], description: 'Publishes user registration events', topics: ['user-events'], enabled: true )] class UserEventPublisher { public function __construct( private readonly MessageBusInterface $messageBus ) {} public function publishUserRegistered(User $user): void { $message = new UserRegisteredMessage( userId: $user->id, email: $user->email, name: $user->name, registeredAt: new \DateTimeImmutable() ); $this->messageBus->publish($message); } }
Multiple Handlers per Message
You can have multiple handlers for the same message type with different priorities:
#[MessageHandler(messageTypes: ['order.created'], priority: 200)] class UpdateInventoryHandler implements MessageHandlerInterface { // Runs first (priority 200) } #[MessageHandler(messageTypes: ['order.created'], priority: 100)] class SendOrderConfirmationHandler implements MessageHandlerInterface { // Runs second (priority 100) } #[MessageHandler(messageTypes: ['order.created'], priority: 50)] class LogOrderHandler implements MessageHandlerInterface { // Runs last (priority 50) }
Console Commands
Discover Messages
List all registered handlers and publishers:
# Discover all publishers and handlers php artisan messages:discover # Show only publishers php artisan messages:discover --publishers # Show only handlers php artisan messages:discover --handlers # Filter by message type php artisan messages:discover --type=user.registered # Show message flow mapping php artisan messages:discover --flow
Consume Messages
Start consuming messages from the configured bus:
# Consume from all configured topics php artisan messages:consume # Consume from specific topic php artisan messages:consume --topic=user.events
Publish Message
Publish a message via CLI:
# Publish with custom data php artisan messages:publish user.registered \ --data='{"userId":123,"email":"user@example.com"}' \ --type="user.registered" # Publish multiple messages php artisan messages:publish test.topic --count=5 # Publish simple test message php artisan messages:publish test.topic
Subscribe to Redis
For Redis pub/sub, start the subscriber:
# Subscribe to all configured Redis topics php artisan messages:redis-subscribe # Subscribe to specific topic php artisan messages:redis-subscribe --topic=user.events # Subscribe with timeout (in seconds) php artisan messages:redis-subscribe --timeout=3600
Advanced Usage
Custom Message Bus
Implement your own message bus by extending AbstractMessageBus
:
<?php declare(strict_types=1); namespace App\Messaging; use LaravelMessaging\Messaging\Bus\AbstractMessageBus; use LaravelMessaging\Messaging\MessageInterface; class CustomMessageBus extends AbstractMessageBus { public function publish(MessageInterface $message): void { // Your custom publishing logic } public function consume(array $options = []): void { // Your custom consuming logic } }
Message with Complex Types
The message factory handles complex type conversions automatically:
class ComplexMessage extends BaseMessage { public function __construct( public readonly array $items, public readonly ?string $optional, public readonly CustomObject $customObject, public readonly \DateTime $createdAt ) { parent::__construct(); } }
Conditional Handler Registration
Disable handlers based on environment or configuration:
#[MessageHandler( messageTypes: ['payment.processed'], enabled: APP_ENV !== 'testing' // Disable in tests )] class ChargePaymentHandler implements MessageHandlerInterface { // Handler implementation }
Handler Registry Access
Access the handler registry directly:
use LaravelMessaging\Messaging\Handler\MessageHandlerRegistry; $registry = app(MessageHandlerRegistry::class); // Get all handlers for a message type $handlers = $registry->getHandlersForMessageType('user.registered'); // Register a handler manually $registry->registerHandler('user.registered', $handlerInstance, 100); // Get all registered message types $types = $registry->getRegisteredMessageTypes();
Testing
Unit Testing Handlers
<?php declare(strict_types=1); namespace Tests\Unit\Handlers; use Tests\TestCase; use App\Handlers\SendWelcomeEmailHandler; use App\Messages\UserRegisteredMessage; class SendWelcomeEmailHandlerTest extends TestCase { public function test_handler_processes_user_registered_message(): void { $handler = new SendWelcomeEmailHandler(); $message = new UserRegisteredMessage( userId: 1, email: 'test@example.com', name: 'Test User', registeredAt: new \DateTimeImmutable() ); $result = $handler->handle($message); $this->assertTrue($result); // Assert email was sent } }
Testing Publishers
public function test_publisher_sends_message_to_bus(): void { $messageBus = $this->createMock(MessageBusInterface::class); $messageBus->expects($this->once()) ->method('publish') ->with($this->isInstanceOf(UserRegisteredMessage::class)); $publisher = new UserEventPublisher($messageBus); $user = User::factory()->create(); $publisher->publishUserRegistered($user); }
Development
Running Tests
# Run all tests make test # With coverage make coverage
Code Style
# Fix code style
make style-check
Static Analysis
# Run PHPStan
make static-analysis
Examples
Check the /examples
directory for complete working examples:
ActivityLogMessage.php
- Example message classActivityLogMessageHandler.php
- Example handlerActivityLogPublisher.php
- Example publisherUserRegisteredMessage.php
- User registration messageUserRegisteredHandler.php
- User registration handler
Contributing
Contributions are welcome! Please ensure:
- All tests pass (
make test
) - Code style is fixed (
make style-check
) - Static analysis passes (
make static-analysis
) - New features include tests
- Documentation is updated
License
This package is open-sourced software licensed under the MIT license.
Support
For issues and questions, please use the GitHub issue tracker.
Credits
Changelog
Please see CHANGELOG for recent changes.