byjg / message-queue-client
Minimal PHP foundation for building any message-queue driver.
Fund package maintenance!
byjg
Installs: 8 443
Dependents: 3
Suggesters: 0
Security: 0
Stars: 2
Watchers: 1
Forks: 1
Open Issues: 1
pkg:composer/byjg/message-queue-client
Requires (Dev)
- phpunit/phpunit: ^9.6
- vimeo/psalm: ^5.9
README
A minimal PHP foundation for building message queue drivers. Features low-code publishing and consumption, decoupled components (Messages, Queues, and Connectors), and easy connector implementation.
Features
- Low code to publish and consume messages
- Messages, Queues and Connector objects are decoupled
- Easy to implement new connectors
┌─────────────────┐ ┌────────────────────────┐
│ │ │ Envelope │
│ │ │ │
│ │ │ │
│ │ │ ┌─────────────────┐ │
│ │ publish() │ │ Pipe │ │
│ ├─────────────────▶│ └─────────────────┘ │
│ │ │ ┌─────────────────┐ │
│ │ │ │ Message │ │
│ │ │ └─────────────────┘ │
│ │ │ │
│ │ └────────────────────────┘
│ Connector │
│ │
│ │
│ │ consume() ┌─────────────────┐
│ │◀────────────────────│ Pipe │
│ │ └─────────────────┘
│ │
│ │
│ │
└─────────────────┘
Code Structure
| Component | Description | Location |
|---|---|---|
| Message | Represents a message payload with properties | \ByJG\MessageQueueClient\Message |
| Pipe | Abstraction representing a queue or topic destination with optional properties | \ByJG\MessageQueueClient\Connector\Pipe |
| Envelope | Combines a Message with its destination Pipe | \ByJG\MessageQueueClient\Envelope |
| ConnectorInterface | Interface for message queue implementations | \ByJG\MessageQueueClient\Connector\ConnectorInterface |
| ConnectorFactory | Factory for creating connector instances | \ByJG\MessageQueueClient\Connector\ConnectorFactory |
| ConsumerClientTrait | Helper for implementing consumer clients | \ByJG\MessageQueueClient\ConsumerClientTrait |
| ConsumerClientInterface | Interface for consumer client implementations | \ByJG\MessageQueueClient\ConsumerClientInterface |
Implemented Connectors
| Connector | URL / Documentation | Composer Package |
|---|---|---|
| Mock | docs/mock-connector.md | - |
| RabbitMQ | https://github.com/byjg/rabbitmq-client | byjg/rabbitmq-client |
| Redis | https://github.com/byjg/redis-queue-client | byjg/redis-queue-client |
Usage
Publish
<?php // Register the connector and associate with a scheme use ByJG\MessageQueueClient\Connector\ConnectorFactory; use ByJG\MessageQueueClient\Connector\Pipe; use ByJG\MessageQueueClient\Envelope; use ByJG\MessageQueueClient\Message; use ByJG\MessageQueueClient\MockConnector; use ByJG\Util\Uri; ConnectorFactory::registerConnector(MockConnector::class); // Create a connector $connector = ConnectorFactory::create(new Uri("mock://local")); // Create a queue $pipe = new Pipe("test"); $pipe->withDeadLetter(new Pipe("dlq_test")); // Create a message $message = new Message("Hello World"); // Publish the message into the queue $connector->publish(new Envelope($pipe, $message));
Consume
<?php // Register the connector and associate with a scheme use ByJG\MessageQueueClient\Connector\ConnectorFactory; use ByJG\MessageQueueClient\Connector\Pipe; use ByJG\MessageQueueClient\Envelope; use ByJG\MessageQueueClient\Message; use ByJG\MessageQueueClient\MockConnector; use ByJG\Util\Uri; ConnectorFactory::registerConnector(MockConnector::class); // Create a connector $connector = ConnectorFactory::create(new Uri("mock://local")); // Create a queue $pipe = new Pipe("test"); $pipe->withDeadLetter(new Pipe("dlq_test")); // Connect to the queue and wait to consume the message $connector->consume( $pipe, // Queue name function (Envelope $envelope) { // Callback function to process the message echo "Process the message"; echo $envelope->getMessage()->getBody(); return Message::ACK; }, function (Envelope $envelope, $ex) { // Callback function to process the failed message echo "Process the failed message"; echo $ex->getMessage(); return Message::REQUEUE; } );
The consume method will wait for a message and call the callback function to process the message. If there is no message in the queue, the method will wait until a message arrives.
If you want to exit the consume method, just return Message::ACK | Message::EXIT from the callback function.
Possible return values from the callback function:
Message::ACK- Acknowledge the message and remove from the queueMessage::NACK- Not acknowledge the message and remove from the queue. If the queue has a dead letter queue, the message will be sent to the dead letter queue.Message::REQUEUE- Requeue the messageMessage::EXIT- Exit the consume method
Consumer Client
You can simplify the consume method by using the ConsumerClientTrait. See more details in the docs/consumer-client-trait.md.
Connectors
The connectors are the classes responsible to connect to the message queue server and send/receive messages.
All connector have the following interface:
<?php interface ConnectorInterface { public static function schema(): array; public function setUp(Uri $uri): void; public function getDriver(): mixed; public function publish(Envelope $envelope): void; public function consume(Pipe $pipe, \Closure $onReceive, \Closure $onError, ?string $identification = null): void; }
There is no necessary call the method getDriver() because the method publish() and consume() will call it automatically.
Use the method getDriver() only if you need to access the connection directly.
Documentation
Core Components
- Pipe Class - Represents a message queue or topic
- Message Class - Represents a message that can be published or consumed
- Envelope Class - Encapsulates a message with its destination pipe
Connectors
- Connector Interface - Interface for message queue connectors
- Connector Factory - Factory for creating connector instances
- Mock Connector - Simple connector for testing
Helpers
- Consumer Client Trait - Helper for implementing consumer clients
Dependencies
flowchart TD
byjg/message-queue-client --> byjg/uri
Loading