chevere / workflow
Declarative workflow engine for PHP with automatic dependency resolution, sync/async job execution, and type-safe response chaining.
Installs: 1 605
Dependents: 1
Suggesters: 0
Security: 0
Stars: 90
Watchers: 3
Forks: 2
Open Issues: 4
pkg:composer/chevere/workflow
Requires
- php: ^8.1
- amphp/parallel: ^2.2
- chevere/action: ^2.0.0
- chevere/caller: ^1.0
- chevere/data-structure: ^1.0.1
- chevere/parameter: ^1.2.0
- chevere/regex: ^1.0.1
Requires (Dev)
- chevere/var-dump: ^2.0.6
- phpstan/phpstan: ^2.0
- phpunit/phpunit: ^10.5
- symplify/easy-coding-standard: ^12.2
This package is auto-updated.
Last update: 2026-02-10 17:22:21 UTC
README
Summary
Workflow is a PHP library for building and executing multi-step procedures with automatic dependency resolution. Define independent jobs that can run synchronously or asynchronously, pass data between them using typed responses, and let the engine handle execution order automatically.
Key features:
- Declarative job definitions - Define what to do, not how to orchestrate it
- Automatic dependency graph - Jobs execute in optimal order based on their dependencies
- Sync and async execution - Mix blocking and non-blocking jobs freely
- Type-safe responses - Access job outputs with full type safety
- Conditional execution - Run jobs based on variables or previous responses
- Built-in retry policies - Handle transient failures automatically
- Testable components - Each job is independently testable
Installing
Workflow is available through Packagist and the repository source is at chevere/workflow.
composer require chevere/workflow
Quick Start
Here's a minimal example to get you started:
use function Chevere\Workflow\{workflow, sync, variable, run}; // 1. Define a workflow with jobs $workflow = workflow( greet: sync( fn(string $name): string => "Hello, {$name}!", name: variable('username') ) ); // 2. Run with variables $result = run($workflow, username: 'World'); // 3. Get typed responses echo $result->response('greet')->string(); // Output: Hello, World!
Core Concepts
Workflow is built around four main concepts:
| Concept | Description |
|---|---|
| Job | A unit of work (Action class or Closure) that produces a response |
| Variable | External input provided when running the workflow |
| Response | Reference to output from a previous job |
| Graph | Automatic execution order based on job dependencies |
How It Works
- You define jobs using
sync()orasync()functions - Jobs declare their inputs: literal values,
variable()references, orresponse()from other jobs - The engine builds a dependency graph automatically
- Jobs execute in optimal order (parallel when possible)
- Access typed responses after execution
Functions Reference
| Function | Purpose |
|---|---|
workflow() |
Create a workflow from named jobs |
sync() |
Create a synchronous (blocking) job |
async() |
Create an asynchronous (non-blocking) job |
variable() |
Declare a runtime variable |
response() |
Reference another job's output |
run() |
Execute a workflow with variables |
Jobs
Jobs are the building blocks of a workflow. Each job wraps an executable unit (Action or Closure) and declares its input arguments.
Creating Jobs with Closures
Use closures for simple, inline operations:
use function Chevere\Workflow\{workflow, sync, async, variable, response, run}; $workflow = workflow( // Simple calculation add: sync( fn(int $a, int $b): int => $a + $b, a: 10, b: variable('number') ), // Format the result format: sync( fn(int $sum): string => "Sum: {$sum}", sum: response('add') ) ); $result = run($workflow, number: 5); echo $result->response('format')->string(); // Sum: 15
Creating Jobs with Action Classes
For complex or reusable logic, use Action classes:
use Chevere\Action\Action; class FetchUser extends Action { public function __invoke(int $userId): array { // Fetch user from database return ['id' => $userId, 'name' => 'John', 'email' => 'john@example.com']; } } class SendEmail extends Action { public function __invoke(string $email, string $subject): bool { // Send email logic return true; } }
$workflow = workflow( user: sync( FetchUser::class, userId: variable('id') ), notify: sync( SendEmail::class, email: response('user', 'email'), subject: 'Welcome!' ) ); $result = run($workflow, id: 123);
Sync vs Async Jobs
Synchronous jobs (sync) block execution until complete. Use for operations that must run in sequence:
workflow( first: sync(ActionA::class), // Runs first second: sync(ActionB::class), // Waits for first third: sync(ActionC::class), // Waits for second ); // Graph: first → second → third
Asynchronous jobs (async) run concurrently when they have no dependencies:
workflow( resize1: async(ResizeImage::class, size: 'thumb'), resize2: async(ResizeImage::class, size: 'medium'), resize3: async(ResizeImage::class, size: 'large'), store: sync(StoreFiles::class, files: response('resize1'), ...) ); // Graph: [resize1, resize2, resize3] → store
Job Arguments
Jobs accept three types of arguments:
workflow( example: sync( MyAction::class, literal: 'fixed value', // Literal value dynamic: variable('userInput'), // Runtime variable chained: response('otherJob'), // Previous job output ) );
Variables
Variables are placeholders for values provided at runtime. Declare them with variable():
$workflow = workflow( job1: sync( SomeAction::class, name: variable('userName'), age: variable('userAge') ) ); // Provide values when running $result = run($workflow, userName: 'Alice', userAge: 30);
All declared variables must be provided when running the workflow.
Responses
Use response() to pass output from one job to another. This automatically establishes a dependency.
$workflow = workflow( fetch: sync( FetchData::class, url: variable('endpoint') ), process: sync( ProcessData::class, data: response('fetch') // Gets entire response from 'fetch' ), extract: sync( ExtractField::class, value: response('fetch', 'items') // Gets 'items' key from response ) );
Accessing Nested Response Keys
When a job returns an array, access specific keys:
response('user') // Entire response response('user', 'id') // $response['id'] response('user', 'profile') // $response['profile']
Execution Graph
The workflow engine automatically builds an execution graph based on job dependencies. Jobs without dependencies run in parallel (when using async), while dependent jobs wait for their dependencies.
$workflow = workflow( // Independent async jobs run in parallel thumb: async(ImageResize::class, size: 'thumb', file: variable('image')), medium: async(ImageResize::class, size: 'medium', file: variable('image')), large: async(ImageResize::class, size: 'large', file: variable('image')), // Sync job waits for all above store: sync( StoreFiles::class, thumb: response('thumb'), medium: response('medium'), large: response('large') ) ); // View the execution graph $graph = $workflow->jobs()->graph()->toArray(); // [ // ['thumb', 'medium', 'large'], // Level 0: parallel // ['store'] // Level 1: after dependencies // ]
graph TD
thumb --> store
medium --> store
large --> store
Loading
Running Workflows
Execute a workflow with the run() function:
use function Chevere\Workflow\run; // Basic execution $result = run($workflow, var1: 'value1', var2: 'value2'); // With dependency injection container $result = run($workflow, $container, var1: 'value1');
Accessing Responses
The run result provides type-safe access to job responses:
$result = run($workflow, ...); // Get typed responses $result->response('jobName')->string(); // string $result->response('jobName')->int(); // int $result->response('jobName')->float(); // float $result->response('jobName')->bool(); // bool $result->response('jobName')->array(); // array // Access array keys directly $result->response('jobName', 'key')->string(); $result->response('jobName', 'nested', 'key')->int();
Check Skipped Jobs
When using conditional execution, check which jobs were skipped:
if ($result->skip()->contains('optionalJob')) { // Job was skipped }
Conditional Execution
Use withRunIf() to run a job only when conditions are met:
$workflow = workflow( compress: sync( CompressImage::class, file: variable('file') )->withRunIf( variable('shouldCompress'), // Must be truthy response('validate', 'isValid') // Must be truthy ) ); $result = run($workflow, file: '/path/to/image.jpg', shouldCompress: true );
All conditions must be truthy for the job to run. Skipped jobs are tracked in $result->skip().
Explicit Dependencies
While response() creates implicit dependencies, use withDepends() for explicit control:
$workflow = workflow( setup: sync(SetupAction::class), process: sync( ProcessAction::class, data: variable('input') )->withDepends('setup') // Wait for setup even without using its response );
Retry Policy
Configure automatic retries for jobs that may fail transiently:
$workflow = workflow( fetch: sync( FetchFromApi::class, url: variable('endpoint') )->withRetry( timeout: 300, // Max 300 seconds total maxAttempts: 5, // Try up to 5 times delay: 10 // Wait 10 seconds between attempts ) );
| Parameter | Type | Default | Description |
|---|---|---|---|
timeout |
int<0, max> |
0 |
Max execution time in seconds (0 = unlimited) |
maxAttempts |
int<1, max> |
1 |
Total attempts including initial |
delay |
int<0, max> |
0 |
Seconds between retries (0 = immediate) |
Retry delays use non-blocking sleep, making them safe for async runtimes.
Exception Handling
When a job fails, a WorkflowException wraps the original exception:
use Chevere\Workflow\Exceptions\WorkflowException; try { $result = run($workflow, ...); } catch (WorkflowException $e) { echo $e->name; // Name of the failed job echo $e->job; // Job instance echo $e->throwable; // Original exception }
Using WorkflowTrait
For class-based workflow management, use WorkflowTrait:
use Chevere\Workflow\Traits\WorkflowTrait; use function Chevere\Workflow\{workflow, sync, variable}; class OrderProcessor { use WorkflowTrait; public function process(int $orderId): void { $workflow = workflow( validate: sync(ValidateOrder::class, id: variable('orderId')), charge: sync(ChargePayment::class, order: response('validate')), fulfill: sync(FulfillOrder::class, order: response('charge')) ); $this->execute($workflow, orderId: $orderId); } public function getResult(): string { return $this->run()->response('fulfill')->string(); } }
Testing
Testing Actions
Test your Action classes independently:
use PHPUnit\Framework\TestCase; class FetchUserTest extends TestCase { public function testFetchUser(): void { $action = new FetchUser(); $result = $action(userId: 123); $this->assertSame(123, $result['id']); $this->assertArrayHasKey('name', $result); } }
Testing Workflow Graph
Verify execution order:
public function testWorkflowGraph(): void { $workflow = workflow( a: async(ActionA::class), b: async(ActionB::class), c: sync(ActionC::class, x: response('a'), y: response('b')) ); $graph = $workflow->jobs()->graph()->toArray(); $this->assertSame([['a', 'b'], ['c']], $graph); }
Testing Responses
Test complete workflow execution:
public function testWorkflowResponses(): void { $result = run($workflow, input: 'test'); $this->assertSame('expected', $result->response('job1')->string()); $this->assertSame(42, $result->response('job2', 'count')->int()); }
Testing Exceptions
Use ExpectWorkflowExceptionTrait for error scenarios:
use Chevere\Workflow\Traits\ExpectWorkflowExceptionTrait; class WorkflowExceptionTest extends TestCase { use ExpectWorkflowExceptionTrait; public function testJobFailure(): void { $this->expectWorkflowException( closure: fn() => run($workflow, input: 'invalid'), exception: InvalidArgumentException::class, job: 'validate', message: 'Invalid input provided' ); } }
Real-World Examples
Image Processing Pipeline
$workflow = workflow( // Parallel image resizing thumb: async( ImageResize::class, file: variable('image'), width: 150, height: 150 ), medium: async( ImageResize::class, file: variable('image'), width: 800 ), // Store after all resizing completes store: sync( StoreFiles::class, thumb: response('thumb'), medium: response('medium'), directory: variable('outputDir') ) ); $result = run($workflow, image: '/uploads/photo.jpg', outputDir: '/processed/' );
User Registration Flow
$workflow = workflow( validate: sync( ValidateRegistration::class, email: variable('email'), password: variable('password') ), createUser: sync( CreateUser::class, data: response('validate') ), sendWelcome: async( SendWelcomeEmail::class, user: response('createUser') ), logEvent: async( LogRegistration::class, userId: response('createUser', 'id') ) );
Conditional Processing
$workflow = workflow( analyze: sync( AnalyzeContent::class, content: variable('text') ), translate: sync( TranslateContent::class, text: variable('text'), targetLang: variable('lang') )->withRunIf( variable('needsTranslation') ), publish: sync( PublishContent::class, content: response('analyze'), translated: response('translate') ) ); $result = run($workflow, text: 'Hello world', lang: 'es', needsTranslation: true );
Demo
Run the included examples:
php demo/hello-world.php # Basic workflow php demo/chevere.php # Chained jobs php demo/closure.php # Using closures php demo/sync-vs-async.php # Performance comparison php demo/image-resize.php # Parallel processing php demo/run-if.php # Conditional execution
See the demo directory for all examples.
Documentation
Documentation is available at chevere.org/packages/workflow.
For a comprehensive introduction, read Workflow for PHP on Rodolfo's blog.
License
Copyright Rodolfo Berrios A.
This software is licensed under the Apache License, Version 2.0. See LICENSE for the full license text.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.