grazulex / laravel-flowpipe
Composable, traceable and declarative Flow Pipelines for Laravel. A modern alternative to Laravel's Pipeline, with support for conditional steps, nested flows, tracing, validation, and more.
Requires
- php: ^8.3
- illuminate/contracts: ^12.0
- illuminate/support: ^12.19
- nesbot/carbon: ^3.10
- symfony/yaml: ^7.3
Requires (Dev)
- doctrine/dbal: ^4.2
- larastan/larastan: ^3.4
- laravel/pint: ^1.22
- orchestra/testbench: ^10.0
- pestphp/pest: ^3.8
- pestphp/pest-plugin-laravel: ^3.2
- rector/rector: ^2.0
This package is auto-updated.
Last update: 2025-07-16 15:29:46 UTC
README
Composable, traceable and declarative Flow Pipelines for Laravel. A modern alternative to Laravel's Pipeline, with support for conditional steps, nested flows, tracing, validation, and more.
Features
โจ Fluent API - Chainable, expressive syntax
๐ Flexible Steps - Support for closures, classes, and custom steps
๐ฏ Conditional Logic - Built-in conditional step execution with dot notation
๐ Tracing & Debugging - Track execution flow and performance
๐งช Test-Friendly - Built-in test tracer for easy testing
๐ Laravel Integration - Seamless service provider integration
โก Performance - Optimized for speed and memory efficiency
๐ YAML Flows - Define flows in YAML for easy configuration
๐จ Artisan Commands - Full CLI support for flow management
โ
Flow Validation - Validate flow definitions with comprehensive error reporting
๐ Export & Documentation - Export to JSON, Mermaid, and Markdown
๐ Step Groups - Reusable, named collections of steps
๐ฏ Nested Flows - Create isolated sub-workflows for complex logic
๐ก๏ธ Advanced Error Handling - Comprehensive error handling with retry, fallback, and compensation strategies
๐ Retry Strategies - Exponential and linear backoff, custom retry logic
๐ฏ Fallback Patterns - Graceful degradation with fallback mechanisms
๐ง Compensation - Automatic rollback and cleanup operations
๐จ Composite Strategies - Combine multiple error handling approaches
Requirements
- PHP 8.3+
- Laravel 12.0+
Installation
Install the package via Composer:
composer require grazulex/laravel-flowpipe
The service provider will be automatically registered thanks to Laravel's package auto-discovery.
Quick Start
Basic Pipeline
use Grazulex\LaravelFlowpipe\Flowpipe; $result = Flowpipe::make() ->send('Hello World') ->through([ fn($data, $next) => $next(strtoupper($data)), fn($data, $next) => $next(str_replace(' ', '-', $data)), fn($data, $next) => $next($data . '!'), ]) ->thenReturn(); // Result: "HELLO-WORLD!"
Error Handling with Retry
use Grazulex\LaravelFlowpipe\Flowpipe; // Exponential backoff retry $result = Flowpipe::make() ->send(['api_url' => 'https://api.example.com/data']) ->exponentialBackoff(3, 100, 2.0) // 3 attempts, 100ms base delay, 2x multiplier ->through([ fn($data, $next) => $next(callExternalAPI($data['api_url'])), fn($data, $next) => $next(processAPIResponse($data)), ]) ->thenReturn(); // Linear backoff retry $result = Flowpipe::make() ->send($userData) ->linearBackoff(3, 100, 50) // 3 attempts, 100ms base + 50ms increment ->through([ fn($data, $next) => $next(saveToDatabase($data)), ]) ->thenReturn();
Fallback Strategies
use Grazulex\LaravelFlowpipe\Flowpipe; // Simple fallback with default value $result = Flowpipe::make() ->send(['user_id' => 123]) ->withFallback(fn($payload, $error) => ['cached_data' => true, 'user_id' => $payload['user_id']]) ->through([ fn($data, $next) => $next(fetchUserProfile($data['user_id'])), ]) ->thenReturn(); // Exception-specific fallback $result = Flowpipe::make() ->send($orderData) ->fallbackOnException(NetworkException::class, fn($payload, $error) => getCachedOrderData($payload)) ->through([ fn($data, $next) => $next(fetchOrderFromAPI($data)), ]) ->thenReturn();
Compensation (Rollback) Strategies
use Grazulex\LaravelFlowpipe\Flowpipe; // Automatic rollback on failure $result = Flowpipe::make() ->send($transactionData) ->withCompensation(function ($payload, $error, $context) { // Rollback the transaction rollbackTransaction($payload['transaction_id']); return array_merge($payload, ['rolled_back' => true]); }) ->through([ fn($data, $next) => $next(processTransaction($data)), ]) ->thenReturn(); // Exception-specific compensation $result = Flowpipe::make() ->send($paymentData) ->compensateOnException(PaymentException::class, fn($payload, $error) => refundPayment($payload)) ->through([ fn($data, $next) => $next(chargePayment($data)), ]) ->thenReturn();
Composite Error Handling
use Grazulex\LaravelFlowpipe\ErrorHandling\Strategies\CompositeStrategy; use Grazulex\LaravelFlowpipe\ErrorHandling\Strategies\RetryStrategy; use Grazulex\LaravelFlowpipe\ErrorHandling\Strategies\FallbackStrategy; // Combine multiple strategies $compositeStrategy = CompositeStrategy::make() ->retry(RetryStrategy::exponentialBackoff(3, 100, 2.0)) ->fallback(FallbackStrategy::withDefault(['status' => 'cached'])); $result = Flowpipe::make() ->send($data) ->withErrorHandler($compositeStrategy) ->through([ fn($data, $next) => $next(unreliableOperation($data)), ]) ->thenReturn();
Conditional Steps
use Grazulex\LaravelFlowpipe\Steps\ConditionalStep; use Grazulex\LaravelFlowpipe\Contracts\Condition; class IsActiveCondition implements Condition { public function evaluate(mixed $payload): bool { return is_array($payload) && ($payload['active'] ?? false); } } $result = Flowpipe::make() ->send(['active' => true, 'name' => 'John']) ->through([ fn($data, $next) => $next($data['name']), ConditionalStep::when( new IsActiveCondition(), fn($name, $next) => $next(strtoupper($name)) ), ConditionalStep::unless( new IsActiveCondition(), fn($name, $next) => $next(strtolower($name)) ), ]) ->thenReturn(); // Result: "JOHN"
Step Groups & Nested Flows
Laravel Flowpipe supports reusable step groups and nested flows for better organization and modularity.
Step Groups
Define reusable groups of steps:
use Grazulex\LaravelFlowpipe\Flowpipe; // Define reusable step groups Flowpipe::group('text-processing', [ fn($data, $next) => $next(trim($data)), fn($data, $next) => $next(strtoupper($data)), fn($data, $next) => $next(str_replace(' ', '-', $data)), ]); Flowpipe::group('validation', [ fn($data, $next) => $next(strlen($data) > 0 ? $data : throw new InvalidArgumentException('Empty data')), fn($data, $next) => $next(preg_match('/^[A-Z-]+$/', $data) ? $data : throw new InvalidArgumentException('Invalid format')), ]); // Use groups in flows $result = Flowpipe::make() ->send(' hello world ') ->useGroup('text-processing') ->useGroup('validation') ->through([ fn($data, $next) => $next($data . '!'), ]) ->thenReturn(); // Result: "HELLO-WORLD!"
Nested Flows
Create isolated sub-workflows:
$result = Flowpipe::make() ->send('hello world') ->nested([ // This nested flow runs independently fn($data, $next) => $next(strtoupper($data)), fn($data, $next) => $next(str_replace(' ', '-', $data)), ]) ->through([ // Main flow continues with nested result fn($data, $next) => $next($data . '!'), ]) ->thenReturn(); // Result: "HELLO-WORLD!"
Combining Groups and Nested Flows
// Define processing groups Flowpipe::group('user-validation', [ fn($user, $next) => $next(filter_var($user['email'], FILTER_VALIDATE_EMAIL) ? $user : throw new InvalidArgumentException('Invalid email')), fn($user, $next) => $next(strlen($user['name']) > 0 ? $user : throw new InvalidArgumentException('Name required')), ]); Flowpipe::group('notifications', [ fn($user, $next) => $next(array_merge($user, ['email_sent' => true])), fn($user, $next) => $next(array_merge($user, ['logged' => true])), ]); $result = Flowpipe::make() ->send(['email' => 'user@example.com', 'name' => 'John Doe']) ->useGroup('user-validation') ->nested([ // Complex processing in isolation fn($user, $next) => $next(array_merge($user, ['id' => uniqid()])), fn($user, $next) => $next(array_merge($user, ['created_at' => now()])), ]) ->useGroup('notifications') ->thenReturn(); // Result: Complete user array with validation, processing, and notifications
YAML Flow Definitions
Create flow definitions in YAML for easy configuration, including groups and nested flows:
# flow_definitions/user_processing.yaml flow: UserProcessingFlow description: Process user data with validation and notifications send: name: "John Doe" email: "john@example.com" is_active: true steps: # Use a pre-defined group - type: group name: user-validation # Create a nested flow - type: nested steps: - type: closure action: append value: "_processed" - condition: field: is_active operator: equals value: true then: - type: closure action: uppercase else: - type: closure action: lowercase # Use another group - type: group name: notifications
Define groups in separate YAML files:
# groups/user-validation.yaml group: user-validation description: Validate user data steps: - type: closure action: validate_email - type: closure action: validate_name
Artisan Commands
Laravel Flowpipe comes with powerful Artisan commands:
# List all available flows php artisan flowpipe:list php artisan flowpipe:list --detailed # Validate flow definitions php artisan flowpipe:validate --all php artisan flowpipe:validate --path=user-registration.yaml php artisan flowpipe:validate --all --format=json # Run a flow php artisan flowpipe:run user_processing php artisan flowpipe:run user_processing --payload='{"name":"John","email":"john@example.com"}' # Export flows to different formats with enhanced group colors php artisan flowpipe:export user_processing --format=json php artisan flowpipe:export user_processing --format=mermaid php artisan flowpipe:export user_processing --format=md --output=docs/user_processing.md # Export groups with enhanced color styling php artisan flowpipe:export user-validation --type=group --format=mermaid php artisan flowpipe:export notifications --type=group --format=md # Create new flows php artisan flowpipe:make-flow NewUserFlow --template=basic php artisan flowpipe:make-flow ComplexFlow --template=conditional php artisan flowpipe:make-flow AdvancedFlow --template=advanced # Generate step classes php artisan flowpipe:make-step ProcessUserStep
Enhanced Mermaid Export with Group Colors
Laravel Flowpipe now supports enhanced Mermaid diagrams with rich color coding for different step types:
- Groups: Blue theme (๐ฆ Group elements)
- Nested Flows: Green theme (๐ Nested elements)
- Conditional Steps: Orange theme (โ Conditional elements)
- Transform Steps: Pink theme (๐ Transform elements)
- Validation Steps: Green theme (โ Validation elements)
- Cache Steps: Yellow theme (๐พ Cache elements)
- Batch Steps: Purple theme (๐ Batch elements)
- Retry Steps: Red theme (๐ Retry elements)
Documentation
For detailed documentation, examples, and advanced usage, please see:
- ๐ Full Documentation
- ๐ฏ Examples
- ๐ง Configuration
- ๐งช Testing
- ๐จ Artisan Commands
- ๐ก๏ธ Error Handling
- ๏ฟฝ Queue Integration
- ๏ฟฝ๐ Error Handling Usage Guide
- ๐ฏ Usage Examples
Examples
Basic Text Processing
$result = Flowpipe::make() ->send(' hello world ') ->through([ fn($text, $next) => $next(trim($text)), fn($text, $next) => $next(ucwords($text)), fn($text, $next) => $next(str_replace(' ', '-', $text)), ]) ->thenReturn(); // Result: "Hello-World"
User Registration Flow
use App\Flowpipe\Steps\ValidateUserStep; use App\Flowpipe\Steps\SendWelcomeEmailStep; use App\Flowpipe\Steps\AddToCrmStep; use Grazulex\LaravelFlowpipe\Steps\ConditionalStep; use Grazulex\LaravelFlowpipe\Contracts\Condition; use Grazulex\LaravelFlowpipe\Tracer\BasicTracer; class IsActiveCondition implements Condition { public function evaluate(mixed $payload): bool { return is_array($payload) && ($payload['is_active'] ?? false); } } $user = Flowpipe::make() ->send($userData) ->through([ new ValidateUserStep(), ConditionalStep::when( new IsActiveCondition(), new SendWelcomeEmailStep() ), ConditionalStep::when( new IsActiveCondition(), new AddToCrmStep() ), ]) ->withTracer(new BasicTracer()) ->thenReturn();
Complex Conditional Logic
use Grazulex\LaravelFlowpipe\Steps\ConditionalStep; use Grazulex\LaravelFlowpipe\Contracts\Condition; class IsAdminCondition implements Condition { public function evaluate(mixed $payload): bool { return is_array($payload) && ($payload['role'] ?? '') === 'admin'; } } class IsActiveCondition implements Condition { public function evaluate(mixed $payload): bool { return is_array($payload) && ($payload['active'] ?? false); } } $result = Flowpipe::make() ->send(['user' => ['role' => 'admin', 'active' => true]]) ->through([ fn($data, $next) => $next($data['user']), ConditionalStep::when( new IsAdminCondition(), fn($user, $next) => $next(array_merge($user, ['permissions' => ['read', 'write', 'delete']])) ), ConditionalStep::when( new IsActiveCondition(), fn($user, $next) => $next(array_merge($user, ['status' => 'enabled'])) ), ConditionalStep::unless( new IsActiveCondition(), fn($user, $next) => $next(array_merge($user, ['status' => 'disabled'])) ), ]) ->thenReturn();
Error Handling in Production Workflows
use Grazulex\LaravelFlowpipe\ErrorHandling\Strategies\CompositeStrategy; use Grazulex\LaravelFlowpipe\ErrorHandling\Strategies\RetryStrategy; use Grazulex\LaravelFlowpipe\ErrorHandling\Strategies\FallbackStrategy; use Grazulex\LaravelFlowpipe\ErrorHandling\Strategies\CompensationStrategy; // Production-ready order processing with comprehensive error handling $orderResult = Flowpipe::make() ->send($orderData) // Step 1: Validate order with fallback ->withFallback(function ($payload, $error) { Log::warning('Order validation failed, using basic validation', [ 'order_id' => $payload['order_id'], 'error' => $error->getMessage() ]); return array_merge($payload, ['validation_mode' => 'basic']); }) ->through([ fn($data, $next) => $next(validateOrder($data)), ]) // Step 2: Process payment with retry and compensation ->withErrorHandler( CompositeStrategy::make() ->retry(RetryStrategy::exponentialBackoff(3, 200, 2.0)) ->compensate(CompensationStrategy::make(function ($payload, $error, $context) { // Rollback any partial payment processing if (isset($payload['payment_intent_id'])) { cancelPaymentIntent($payload['payment_intent_id']); } return array_merge($payload, ['payment_cancelled' => true]); })) ) ->through([ fn($data, $next) => $next(processPayment($data)), ]) // Step 3: Update inventory with fallback to manual processing ->withFallback(function ($payload, $error) { // Queue for manual inventory processing QueueManualInventoryUpdate::dispatch($payload); return array_merge($payload, ['inventory_queued' => true]); }) ->through([ fn($data, $next) => $next(updateInventory($data)), ]) // Step 4: Send confirmation with retry ->exponentialBackoff(3, 100, 2.0) ->through([ fn($data, $next) => $next(sendOrderConfirmation($data)), ]) ->thenReturn();
Custom Error Handling Strategies
use Grazulex\LaravelFlowpipe\ErrorHandling\Strategies\RetryStrategy; // Custom retry logic based on exception type $customRetryStrategy = RetryStrategy::make(5, 100, function ($exception, $attempt) { // Only retry network errors if ($exception instanceof NetworkException) { return true; } // Retry rate limit errors with exponential backoff if ($exception instanceof RateLimitException) { sleep(pow(2, $attempt)); // Custom backoff return $attempt <= 3; } // Don't retry validation errors if ($exception instanceof ValidationException) { return false; } return $attempt <= 2; // Default retry for other errors }); $result = Flowpipe::make() ->send($data) ->withRetryStrategy($customRetryStrategy) ->through([ fn($data, $next) => $next(complexApiCall($data)), ]) ->thenReturn();
Testing
Laravel Flowpipe includes a dedicated test tracer for easy testing:
use Grazulex\LaravelFlowpipe\Tracer\TestTracer; public function test_user_processing_flow() { $tracer = new TestTracer(); $result = Flowpipe::make() ->send(['name' => 'John']) ->through([ fn($data, $next) => $next(strtoupper($data['name'])), ]) ->withTracer($tracer) ->thenReturn(); $this->assertEquals('JOHN', $result); $this->assertCount(1, $tracer->count()); }
Performance
Laravel Flowpipe is optimized for performance:
- Lazy Evaluation: Steps are only executed when needed
- Memory Efficient: Minimal memory footprint
- Traceable: Optional tracing with minimal overhead
- Cacheable: Flow definitions can be cached for better performance
API Reference
Flowpipe Methods
make()
- Create a new flowpipe instancesend($data)
- Set initial datathrough(array $steps)
- Add steps to the pipelineuseGroup(string $name)
- Add a predefined group to the pipelinenested(array $steps)
- Create a nested flowcache($key, $ttl, $store)
- Add cache stepretry($maxAttempts, $delayMs, $shouldRetry)
- Add retry steprateLimit($key, $maxAttempts, $decayMinutes, $keyGenerator)
- Add rate limit steptransform($transformer)
- Add transform stepvalidate($rules, $messages, $customAttributes)
- Add validation stepbatch($batchSize, $preserveKeys)
- Add batch stepwithTracer(Tracer $tracer)
- Add a tracerthenReturn()
- Execute and return resultcontext()
- Get flow context
Error Handling Methods
withErrorHandler(ErrorHandlerStrategy $strategy, int $maxAttempts = 3)
- Add custom error handlerwithRetryStrategy(RetryStrategy $strategy)
- Add retry strategywithFallback(Closure $fallbackHandler, ?Closure $shouldFallback = null)
- Add fallback handlingwithCompensation(Closure $compensationHandler, ?Closure $shouldCompensate = null)
- Add compensation handlingwithCompositeErrorHandler(array $strategies = [])
- Add composite error handlingexponentialBackoff(int $maxAttempts = 3, int $baseDelayMs = 100, float $multiplier = 2.0, ?Closure $shouldRetry = null)
- Add exponential backoff retrylinearBackoff(int $maxAttempts = 3, int $baseDelayMs = 100, int $increment = 100, ?Closure $shouldRetry = null)
- Add linear backoff retryfallbackOnException(string $exceptionClass, Closure $fallbackHandler)
- Add exception-specific fallbackcompensateOnException(string $exceptionClass, Closure $compensationHandler)
- Add exception-specific compensation
Static Methods
group(string $name, array $steps)
- Define a reusable step grouphasGroup(string $name)
- Check if a group existsgetGroups()
- Get all registered groupsclearGroups()
- Clear all registered groups (useful for testing)
Conditional Steps
ConditionalStep::when($condition, $step)
- Execute step when condition is trueConditionalStep::unless($condition, $step)
- Execute step when condition is false
Error Handling Strategies
RetryStrategy
RetryStrategy::make(int $maxAttempts = 3, int $delayMs = 100, ?Closure $shouldRetry = null, ?Closure $delayCalculator = null)
- Basic retryRetryStrategy::exponentialBackoff(int $maxAttempts = 3, int $baseDelayMs = 100, float $multiplier = 2.0, ?Closure $shouldRetry = null)
- Exponential backoffRetryStrategy::linearBackoff(int $maxAttempts = 3, int $baseDelayMs = 100, int $increment = 100, ?Closure $shouldRetry = null)
- Linear backoffRetryStrategy::forException(string $exceptionClass, int $maxAttempts = 3, int $delayMs = 100)
- Exception-specific retry
FallbackStrategy
FallbackStrategy::make(Closure $fallbackHandler, ?Closure $shouldFallback = null)
- Custom fallbackFallbackStrategy::withDefault(mixed $defaultValue, ?Closure $shouldFallback = null)
- Default value fallbackFallbackStrategy::withTransform(Closure $transformer, ?Closure $shouldFallback = null)
- Transform fallbackFallbackStrategy::withPayload(mixed $fallbackPayload, ?Closure $shouldFallback = null)
- Payload fallbackFallbackStrategy::forException(string $exceptionClass, Closure $fallbackHandler)
- Exception-specific fallback
CompensationStrategy
CompensationStrategy::make(Closure $compensationHandler, ?Closure $shouldCompensate = null)
- Basic compensationCompensationStrategy::rollback(Closure $rollbackHandler, ?Closure $shouldCompensate = null)
- Rollback compensationCompensationStrategy::cleanup(Closure $cleanupHandler, ?Closure $shouldCompensate = null)
- Cleanup compensationCompensationStrategy::forException(string $exceptionClass, Closure $compensationHandler)
- Exception-specific compensation
CompositeStrategy
CompositeStrategy::make(array $strategies = [])
- Create composite strategyCompositeStrategy::addStrategy(ErrorHandlerStrategy $strategy)
- Add strategy to compositeCompositeStrategy::retry(RetryStrategy $strategy)
- Add retry strategyCompositeStrategy::fallback(FallbackStrategy $strategy)
- Add fallback strategyCompositeStrategy::compensate(CompensationStrategy $strategy)
- Add compensation strategy
Tracer Methods
trace($stepClass, $before, $after, $duration)
- Trace step executionall()
- Get all trace logssteps()
- Get all step namescount()
- Get number of traced stepsfirstStep()
- Get first step namelastStep()
- Get last step nameclear()
- Clear all traces
Contributing
Please see CONTRIBUTING.md for details.
Security
If you discover any security-related issues, please email jms@grazulex.be instead of using the issue tracker.
Changelog
Please see RELEASES.md for more information on what has changed recently.
License
The MIT License (MIT). Please see License File for more information.
Credits
Support
- ๐ Report Issues
- ๐ฌ Discussions
- ๐ Documentation
Laravel Flowpipe is a modern, powerful alternative to Laravel's built-in Pipeline with enhanced features for complex workflow management.