fereydooni/laravel-concurrency

A robust concurrent processing system for Laravel inspired by Go's concurrency model

dev-master 2025-04-24 13:48 UTC

This package is auto-updated.

Last update: 2025-04-24 13:58:26 UTC


README

A robust concurrent processing system for Laravel inspired by Go's concurrency model (goroutines and channels) using PHP attributes.

Features

  • Concurrent task execution using PHP attributes
  • Channel-based communication between tasks
  • Task orchestration with priorities and dependencies
  • Graceful error handling and retries
  • Monitoring of task status and performance
  • Flexible configuration options

Requirements

  • PHP 8.1+
  • Laravel 10.x

Installation

composer require fereydooni/laravel-concurrency

Publish the configuration file:

php artisan vendor:publish --provider="Fereydooni\ConcurrentFlow\ConcurrentFlowServiceProvider"

Run the migrations:

php artisan migrate

Usage

Defining Concurrent Tasks

Use the ConcurrentTask attribute to define methods that should be executed concurrently:

use Fereydooni\ConcurrentFlow\Attributes\ConcurrentTask;

class DataProcessor
{
    #[ConcurrentTask(name: 'process_user_data', priority: 'high', maxRetries: 3)]
    public function processUserData($data)
    {
        // Process data
        return $processedData;
    }
}

Using Channels for Communication

Use the Channel attribute to define methods that produce or consume data via channels:

use Fereydooni\ConcurrentFlow\Attributes\Channel;

class DataProcessor
{
    #[Channel(name: 'user_data', buffer: 100)]
    public function processAndSend($data)
    {
        // Process data and send to channel
        return $processedData;
    }
}

Running Tasks and Using Channels

use Fereydooni\ConcurrentFlow\ConcurrencyManager;

// Get the concurrency manager
$manager = app(ConcurrencyManager::class);

// Create and run a task
$task = $manager->createTask(DataProcessor::class, 'processUserData', [$userData]);
$manager->runTask($task);

// Send data to a channel
$manager->sendToChannel('user_data', $someData);

// Receive data from a channel
$result = $manager->receiveFromChannel('user_data');

Task Orchestration

// Run multiple tasks in parallel
$tasks = [
    $manager->createTask(DataProcessor::class, 'processUserData', [$userData1]),
    $manager->createTask(DataProcessor::class, 'processUserData', [$userData2]),
];
$results = $manager->runParallel($tasks);

// Set up task dependencies
$task1 = $manager->createTask(DataProcessor::class, 'step1', [$data]);
$task2 = $manager->createTask(DataProcessor::class, 'step2', []);
$manager->addDependency($task2, $task1); // task2 depends on task1
$manager->runWithDependencies($task2);

Configuration

The package configuration file (concurrent-flow.php) allows you to customize:

  • Concurrency driver (pcntl, queue)
  • Default channel buffer size
  • Default task priority
  • Task timeout settings
  • Storage backend for channels

Error Handling

Tasks will be retried based on the maxRetries setting. All task executions are logged in the database for monitoring and debugging.

License

The MIT License (MIT). Please see License File for more information.