nimbly / syndicate
Simple queue consumer framework that supports message dispatching.
Installs: 3 955
Dependents: 0
Suggesters: 0
Security: 0
Stars: 3
Watchers: 2
Forks: 0
Open Issues: 0
Requires
- php: >=7.2
- ext-json: *
- psr/container: ^1.0
Requires (Dev)
- aws/aws-sdk-php: ~3.0
- google/cloud-pubsub: ^1.7
- iron-io/iron_mq: ^4.0
- microsoft/azure-storage-queue: ^1.3
- nimbly/carton: ^1.0
- pda/pheanstalk: ~3.0
- php-coveralls/php-coveralls: ^2.1
- phploc/phploc: ^5.0
- phpunit/phpunit: ^8.0
- predis/predis: ~1.0
- squizlabs/php_codesniffer: ^3.4
- symfony/var-dumper: ^4.2
- vimeo/psalm: ^3.1
Suggests
- aws/aws-sdk-php: To add support for AWS Simple Queue Service (SQS).
- google/cloud-pubsub: To add support for Google Cloud Pubsub.
- microsoft/azure-storage-queue: To add support for Microsoft Azure Storage Queue.
- pda/pheanstalk: To add support for Beanstalkd queue.
- predis/predis: To add support for Redis based queue and pub/sub functionality.
README
Install
composer require nimbly/syndicate
Basic usage
Create Queue instance
$queue = new Syndicate\Queue\Sqs( "https://queue.url", new SqsClient([ 'version' => 'latest', 'region' => 'us-west-2' ]) );
Listen on queue
Listening is a blocking call and runs in an infinite loop. Your callback will be triggered when a new Message has arrived.
$queue->listen(function(Message $message): void { /** * * Process the message... * */ // Delete the message from Queue. $message->delete(); });
Setting a custom serializer and deserializer
By default, Syndicate assumes your messages are JSON and will attempt to auto serialize/deserialize accordingly.
However, if your messages are in some other format, you may supply your own serializer and deserializer callbacks.
The serializer is applied to all outgoing message payloads.
$queue->setSerializer(function($message): string { return \json_encode($message); });
The deserializer callback is applied to all incoming message payloads.
For example, to handle deserializing a message payload from SQS that was forwarded by SNS, you could pass in the following deserializer callback.
$queue->setDeserializer(function($payload) { $payload = \json_decode($payload); if( \property_exists($payload, "Type") && $payload->Type === "Notification" ){ return \json_decode($payload->Message); } return $payload; });
Shutting down the Queue
You may shutdown the queue by using the shutdown()
method.
The Queue instance will respond to PCNTL signals in a safe manner that will not interrupt in the middle of Message processing. You can install signal handlers in your code to cleanly and safely shutdown the service.
\pcntl_signal( SIGINT, function() use ($queue): void { Log::info("[SIGNAL] Shutting down queue."); $queue->shutdown(); } );
Routing and Dispatching
Using the Dispatcher
and Router
you can have your messages passed off to specific Handlers. How you route is up to you and the message format.
Commonly, a message will contain a message type or event name - these are prime candidates for keys to routing.
Router
Create a new Router
instance by passing in a \callable
route resolver and an array
of key and value pairs as route definitions.
Route resolver
The route resolver is responsible for taking the incoming Message instance and finding a matching route to dispatch the Message to.
The dispatcher will loop through all configured routes and call the resolver with the Message and a route.
The resolver must simple return a bool
value indicating whether the message matches the given route.
Route definitions
The route definitions are an array of key/value pairs mapping any key you want to either a callable
, string
in the format of Full\Namespace\ClassName@methodName
, or an array of the above.
$router = new Router(function(Message $message, string $routeKey): bool { return $message->getPayload()->eventName == $routeKey; }, [ 'UserLoggedOff' => function(Message $message): void { // Do some session cleanup stuff... }, 'UserRegistered' => '\App\Handlers\UserHandler@userRegistered', 'UserClosedAccount' => [ '\App\Handlers\UserHandler@userAccountClosed', '\App\Handlers\NotificationHandler@userAccountClosed' ] ]);
Dispatcher
Create a new Dispatcher
instance by passing the Router
instance.
$dispatcher = new Dispatcher($router);
PSR-11 Container support
The Dispatcher
can accept a PSR-11 compliant ContainerInterface
instance to be used during dependency resolution when dispatching a matched message to a handler.
$dispatcher = new Dispatcher( $router, $container );
Or you can call the setContainer
method directly.
$dispatcher->setContainer($container);
The Dispatcher
will attempt to resolve any dependencies your handler requires including the Syndicate\Message
instance.
Add a default handler
If the Router
cannot resolve a route for the Message
, the Dispatcher
will attempt to pass the message off to its default handler.
The default handler can be set as a callable
and accepts the Message
instance.
$dispatcher->setDefaultHandler(function(Message $message): void { Log::critical("No route defined for {$message->getPayload()->eventName}!"); $message->release(); });
If the Message cannot be dispatched and no default handler was given, a DispatchException
will be thrown.
Using the Dispatcher with the Queue
$queue->listen(function(Message $message) use ($dispatcher): void { $dispatcher->dispatch($message); });