consilience / laravel-message-flow
Use Laravel queues to push messages between aplications.
Installs: 857
Dependents: 0
Suggesters: 0
Security: 0
Stars: 5
Watchers: 2
Forks: 0
Open Issues: 1
pkg:composer/consilience/laravel-message-flow
Requires
- php: ^8.4
- illuminate/support: ^10.0|^11.0|^12.0
Requires (Dev)
- orchestra/testbench: ^9.0|^10.0
- phpunit/phpunit: ^10.0|^11.0
This package is auto-updated.
Last update: 2026-02-15 23:02:39 UTC
README
Laravel Message Flow
- Laravel Message Flow
- Installation
- Setting Up a Shared Queue (Redis Example)
- Artisan Commands
- Testing
Overview
Laravel Message Flow is a lightweight messaging system for passing structured data between Laravel applications. It is built entirely on Laravel's queue system — if two applications can connect to the same queue (Redis, database, SQS, or any other driver), they can exchange messages.
There are no external dependencies beyond Laravel itself. No message broker to install, no new protocols to learn — just queue connections you already know how to configure.
The original use-case was to replace fragile webhooks between a suite of applications with something easier to set up, monitor and maintain.
Concepts
Messages
A message is any JSON-serialisable data. Each message carries three things:
- A UUID that uniquely identifies the message across applications.
- A name used for routing (which queue to send to) and handling (which observer processes it on the receiving side).
- A payload — the JSON data itself, portable and self-contained with no dependency on models or classes in the source application.
Outbound Flow
Sending a message is as simple as creating a MessageFlowOut model instance.
An Eloquent observer detects the new record and dispatches it through a
configurable routing pipeline that determines which queue connection and
queue name to use, then pushes the message onto that queue.
The outbound message is stored in the message_flow_out table throughout
this process, giving you a local audit trail of what was sent.
Inbound Flow
On the receiving application, a standard Laravel queue worker picks up the
job and writes it to the local message_flow_in table as a MessageFlowIn
model. Your application then handles the message via an Eloquent observer —
the same pattern you already use for model events.
Once processed, the inbound message can be marked as complete, failed,
or deleted, depending on your application's needs.
Routing Pipeline
The outbound routing pipeline is a sequence of configurable pipe stages
(using Laravel's Pipeline) that each message passes through before being
dispatched. The default pipeline routes the message to a queue based on its
name (via config mappings), dispatches it, and optionally cleans up the
local record.
You have full control over this pipeline. You can add custom routing logic,
logging, duplicate detection, or anything else — each stage is a simple
class implementing a handle method.
Message Statuses
Both inbound and outbound messages track their lifecycle through a status
column:
Outbound (MessageFlowOut):
| Status | Meaning |
|---|---|
new |
Created, waiting to be routed and dispatched |
queued |
Successfully dispatched to the queue |
complete |
Fully processed (dispatched, optionally acknowledged) |
failed |
Could not be dispatched — can be retried |
Inbound (MessageFlowIn):
| Status | Meaning |
|---|---|
new |
Received, waiting for the application to handle |
complete |
Successfully processed by the application |
failed |
Processing failed — can be retried |
Failed messages are never automatically deleted, so they can be inspected
and retried. Completed messages can be cleaned up with the
message-flow:purge artisan command or by enabling the
DeleteCompleteMessage pipe in the outbound routing pipeline.
Optionally, acknowledgement messages can be sent in the reverse direction to confirm end-to-end delivery:
sequenceDiagram
box Application A
participant AO as Out Model
participant AI as In Model
end
box Shared Queue Driver
participant QtoB as to-b
participant QtoA as to-a
end
box Application B
participant BI as In Model
participant BO as Out Model
participant BA as Jobs
end
Note over AO: Create message
activate AO
AO->>QtoB: Message
activate QtoB
QtoB->>BI: Message
deactivate QtoB
activate BI
BI->>BO: Create ack message
activate BO
BI->>BA: Dispatch to handler
activate BA
BA->>BA: Process message
BA->>BI: Mark complete / delete
deactivate BA
deactivate BI
BO->>QtoA: Ack message
activate QtoA
deactivate BO
QtoA->>AI: Ack message
deactivate QtoA
activate AI
AI->>AO: Delete original message
deactivate AI
deactivate AO
Loading
Configuration Overview
| Role | What to configure |
|---|---|
| All apps | Shared Redis entry + queue entry (Steps 1-2), identical on every app |
| Sender | Outbound routing in message-flow.php (Step 3) |
| Receiver | Queue worker + Eloquent observer (Step 4) |
For two-way communication, each app is both sender and receiver — see Two-Way Communication below.
Installation
Requirements
- PHP
8.4or higher - Laravel
10,11or12
Install Using Composer
composer require consilience/laravel-message-flow
Publish Migrations and Config
php artisan vendor:publish \
--provider="Consilience\Laravel\MessageFlow\MessageFlowServiceProvider"
You can then run php artisan migrate to migrate the database.
Setting Up a Shared Queue (Redis Example)
This example uses Redis, but any queue driver supported by Laravel will work. The same four steps apply regardless of driver.
The Three Config Layers
Laravel uses the word "connection" at several levels, which can be confusing. Setting up a shared queue touches three config files, each at a different layer:
| Layer | Config file | What it names |
|---|---|---|
| Redis server | config/database.php |
How to reach a Redis instance and which key prefix to use |
| Queue | config/queue.php |
A named queue that uses a particular Redis server entry as its backend |
| Message Flow | config/message-flow.php |
Which named queue to push messages onto |
No changes to your existing Laravel defaults are needed — we just add new entries alongside them.
Step 1 — Redis Server Entry
config/database.php
Add a Redis entry with a fixed prefix so that every application sharing the queue sees the same keys, regardless of each app's global Redis prefix:
'redis' => [ // Your existing options and entries stay unchanged. // Shared Redis entry for message flow: 'message-flow-redis' => [ 'url' => env('REDIS_URL'), 'host' => env('REDIS_HOST', '127.0.0.1'), 'password' => env('REDIS_PASSWORD', null), 'port' => env('REDIS_PORT', '6379'), 'database' => env('REDIS_DB', '0'), 'prefix' => 'message-flow:', ], ],
Step 2 — Queue Entry
config/queue.php
Add a queue entry that uses the Redis entry from Step 1 as its backend:
'connections' => [ // ... 'message-flow-queue' => [ 'driver' => 'redis', 'connection' => 'message-flow-redis', // ← Redis entry from Step 1 'queue' => 'default', 'retry_after' => 90, 'block_for' => null, ], ],
Steps 1 and 2 are identical on every application sharing the same message flow. They define the shared transport — not who sends or receives.
Step 3 — Sender: Outbound Routing
config/message-flow.php
Tell Message Flow which queue name to push outbound messages onto. Name each queue after the receiver — this keeps things unambiguous regardless of how many senders push to it:
'out' => [ 'name-mappings' => [ 'default' => [ 'queue-connection' => 'message-flow-queue', // ← queue entry from Step 2 'queue-name' => 'to-warehouse', // ← what the receiver listens on ], ], ],
Different message names can route to different receivers:
'name-mappings' => [ 'stock-update' => [ 'queue-connection' => 'message-flow-queue', 'queue-name' => 'to-warehouse', ], 'invoice-ready' => [ 'queue-connection' => 'message-flow-queue', 'queue-name' => 'to-billing', ], ],
Send a message by creating a MessageFlowOut record:
use Consilience\Laravel\MessageFlow\Models\MessageFlowOut; MessageFlowOut::create([ 'name' => 'stock-update', 'payload' => ['sku' => 'ABC-123', 'qty' => 50], ]);
An Eloquent observer dispatches the message through the routing pipeline automatically — no additional code needed on the sender side.
Step 4 — Receiver: Worker and Observer
Start a worker listening on the queue name that senders push to:
php artisan queue:work message-flow-queue --queue=to-warehouse
The first argument (message-flow-queue) is the queue entry from Step 2.
The --queue flag is the queue name from the sender's Step 3 config.
Create an observer to handle incoming messages:
php artisan make:observer MessageFlowObserver \
--model='Consilience\Laravel\MessageFlow\Models\MessageFlowIn'
namespace App\Observers; use Consilience\Laravel\MessageFlow\Models\MessageFlowIn; class MessageFlowObserver { public function created(MessageFlowIn $message): void { if (! $message->isNew()) { return; } // Process the message payload. // ... $message->setComplete()->save(); // Or on failure: // $message->setFailed()->save(); } }
Register the observer in a service provider:
use Consilience\Laravel\MessageFlow\Models\MessageFlowIn; use App\Observers\MessageFlowObserver; public function boot(): void { MessageFlowIn::observe(MessageFlowObserver::class); }
Two-Way Communication
When two applications need to send messages to each other, both act as sender and receiver. The shared infrastructure (Steps 1–2) stays identical on both apps. Each app just needs:
- Its own outbound routing (Step 3) — pointing to the other app's inbound queue
- Its own queue worker (Step 4) — listening on its own inbound queue
Name each queue after the receiver. This makes it clear who consumes which queue, regardless of how many senders push to it.
| Core app | Fulfilment app | |
|---|---|---|
| Redis entry (Step 1) | message-flow-redis |
message-flow-redis (identical) |
| Queue entry (Step 2) | message-flow-queue |
message-flow-queue (identical) |
| Sends to (Step 3) | queue-name: 'to-fulfilment' |
queue-name: 'to-core' |
| Worker listens on (Step 4) | --queue=to-core |
--queue=to-fulfilment |
Core's config/message-flow.php:
'name-mappings' => [ 'default' => [ 'queue-connection' => 'message-flow-queue', 'queue-name' => 'to-fulfilment', ], ],
Fulfilment's config/message-flow.php:
'name-mappings' => [ 'default' => [ 'queue-connection' => 'message-flow-queue', 'queue-name' => 'to-core', ], ],
Both apps share a single Redis entry and a single queue entry —
only the outbound queue-name and the worker's --queue flag differ.
For one-way messaging, only the sender needs Step 3 and only the receiver needs Step 4. For two-way, each app does both.
Artisan Commands
This package introduces a few new artisan commands:
Create Message
This command allows you to create a new outbound message.
php artisan message-flow:create-message \
--name='routing-name' \
--payload='{"json":"payload"}' \
--status=new
If no options are provided, the name will be default, the status new and
the payload an empty object.
List Messages
This command will list the messages currently in the cache tables. These are messages that are being sent, or have been sent and have not yet been deleted. They are also messages that have been received and also not been deleted.
php artisan message-flow:list-messages \
--direction={inbound|outbound} \
--status={new|complete|failed|other} \
--uuid={uuid-of-message} \
--limit=20 \
--page=1 \
--process
The status and uuid options can take multiple values.
The limit option sets the number of records returned.
This is effectively the page size.
The page option specifies which page (of size limit) to display.
Page numbers start at 1 for the first page.
The process option will dispatch jobs for messages that have not yet been processed.
For outbound messages that will be matching messages in the new or failed states.
This will generally only be needed for testing or kicking off failed observers.
For inbound messages in the new state, this will fire the eloquent created event
to kick the custom observers into action.
With the -v option, the payload will be included in the listing.
Some payloads may be large.
Purge Messages
This command deletes old messages from the inbound and/or outbound cache tables.
By default it purges records with a complete status that were last updated
more than 30 days ago.
php artisan message-flow:purge \
--days=30 \
--hours=0 \
--direction={inbound|outbound|both} \
--status={complete|failed} \
--dry-run
The --days and --hours options are combined to set the age threshold.
For example, --days=1 --hours=12 purges records older than 36 hours,
and --days=0 --hours=6 purges records older than 6 hours.
The --direction option defaults to both. Abbreviations are accepted
(e.g. --direction=in or --direction=out).
The --status option can be specified multiple times to purge records
in more than one status. It defaults to complete if not specified.
The --dry-run option shows how many records would be deleted without
actually deleting them.
Running manually
Purge all completed messages older than 30 days from both tables:
php artisan message-flow:purge
Purge completed and failed inbound messages older than 7 days:
php artisan message-flow:purge --days=7 --direction=inbound --status=complete --status=failed
Preview what would be deleted:
php artisan message-flow:purge --days=14 --dry-run
Scheduling
To run the purge automatically, add it to your application's scheduler.
In routes/console.php (Laravel 11+):
use Illuminate\Support\Facades\Schedule; Schedule::command('message-flow:purge --days=30')->daily();
Or in a service provider's boot() method:
$this->app->booted(function () { $schedule = app(\Illuminate\Console\Scheduling\Schedule::class); $schedule->command('message-flow:purge --days=30')->daily(); });
Testing
Tests use Orchestra Testbench and PHPUnit:
composer test
TODO
- Names and routing (advanced config).
- Outbound pipeline (advanced config).
- Tests to complete.
- A simple quickstart example with two applications talking to each other.