h4or/threadables

A modern PHP library for parallel execution using process forking with shared memory IPC

Installs: 0

Dependents: 0

Suggesters: 0

Security: 0

Stars: 0

Watchers: 0

Forks: 0

Open Issues: 0

pkg:composer/h4or/threadables

dev-main 2026-01-30 20:19 UTC

This package is not auto-updated.

Last update: 2026-01-30 20:37:44 UTC


README

PHP Version License Tests Coverage

A modern, elegant PHP library for parallel execution using process forking. Run code in separate processes with shared memory communication, graceful shutdown, and comprehensive error handling.

Features

  • ๐Ÿš€ Simple API - Start threads with a single line of code
  • ๐Ÿ”„ Shared Memory - Communicate between parent and child processes
  • โฑ๏ธ Timeouts - Automatic killing of long-running threads
  • ๐Ÿ›‘ Graceful Shutdown - Signal threads to stop cleanly
  • ๐Ÿ“ก Signal Handling - Auto-handles SIGTERM, SIGINT, SIGHUP in child processes
  • ๐Ÿ“Š Progress Tracking - Monitor thread progress in real-time
  • ๐ŸŠ Thread Pools - Batch operations with concurrency limits
  • ๐Ÿ”’ Thread-Safe - Mutex-protected shared memory access
  • โšก Parallel Map - Process arrays in parallel with ease
  • ๐Ÿ”Œ Pluggable Serializers - Use PHP serialize, JSON, or custom formats
  • ๐Ÿงน Memory Cleanup - Automatic cleanup with leak detection utilities

Requirements

  • PHP 8.3+
  • Linux/Unix (process forking via pcntl)
  • PHP Extensions: pcntl, shmop, sysvsem

Installation

composer require h4or/threadables

Quick Start

Basic Thread

use H4or\Threadables\Thread;

// Run code in a separate process (non-blocking)
$handle = Thread::run(function ($ctx) {
    $ctx->set('result', 'Hello from thread!');
});

// Do other work while thread runs...

$handle->join(); // Wait for completion
echo $handle->get('result'); // "Hello from thread!"

Blocking Execution

$handle = Thread::runBlocking(function ($ctx) {
    $ctx->set('answer', 42);
});

echo $handle->get('answer'); // 42

Parallel Map

// Process items in parallel
$results = Thread::map([1, 2, 3, 4, 5], function ($item, $ctx) {
    $ctx->set('result', $item * 2);
});
// $results = [2, 4, 6, 8, 10]

Core Concepts

Thread Handle

When you start a thread, you get a ThreadHandle that lets you:

$handle = Thread::run(fn($ctx) => sleep(5));

$handle->isAlive();        // Check if running
$handle->join();           // Wait for completion
$handle->stop();           // Request graceful shutdown
$handle->kill();           // Force kill immediately
$handle->getExitCode();    // Get exit code (0 = success)
$handle->getFailure();     // Get exception details if failed
$handle->getRuntime();     // Get execution time in seconds

Shared Data

Threads communicate via shared memory:

$handle = Thread::run(function ($ctx) {
    // Write individual fields
    $ctx->set('status', 'processing');
    $ctx->set('progress', 50);
    
    // Write entire data structure
    $ctx->write(['items' => [1, 2, 3], 'count' => 3]);
});

// Parent can read while thread runs
while ($handle->isAlive()) {
    echo "Progress: " . $handle->get('progress', 0) . "%\n";
    usleep(100000);
}

$handle->join();
$data = $handle->read(); // Get all data

Graceful Shutdown

Threads can check for shutdown requests:

$handle = Thread::run(function ($ctx) {
    while (!$ctx->shouldStop()) {
        // Do work...
        usleep(100000);
    }
    $ctx->set('status', 'stopped gracefully');
});

sleep(1);
$handle->stop(); // Request shutdown
$handle->join();

Error Handling

Exceptions in threads are captured:

$handle = Thread::runBlocking(function ($ctx) {
    throw new RuntimeException('Something went wrong');
});

if ($handle->failed()) {
    $failure = $handle->getFailure();
    echo $failure->message;  // "Something went wrong"
    echo $failure->class;    // "RuntimeException"
    echo $failure->file;     // "/path/to/file.php"
    echo $failure->line;     // 42
    echo $failure->trace;    // Full stack trace
}

Timeouts

Automatically kill threads that run too long:

$handle = Thread::runWithTimeout(function ($ctx) {
    sleep(60); // Long operation
}, timeoutSeconds: 5.0);

if ($handle->wasTimedOut()) {
    echo "Thread was killed after 5 seconds";
}

Thread Pools

For batch operations with concurrency control:

use H4or\Threadables\Support\ThreadPool;

// Map with concurrency limit (max 3 threads at once)
$results = ThreadPool::map(
    items: $largeArray,
    callback: fn($item, $ctx) => $ctx->set('result', process($item)),
    concurrency: 3
);

// Run multiple tasks
$handles = ThreadPool::run([
    fn($ctx) => $ctx->set('result', 'Task 1'),
    fn($ctx) => $ctx->set('result', 'Task 2'),
    fn($ctx) => $ctx->set('result', 'Task 3'),
], concurrency: 2);

// Batch operations
ThreadPool::joinAll($handles);
ThreadPool::killAll($handles);
ThreadPool::requestShutdownAll($handles);

// Query status
$running = ThreadPool::getRunning($handles);
$completed = ThreadPool::getCompleted($handles);
$succeeded = ThreadPool::getSucceeded($handles);
$failed = ThreadPool::getFailed($handles);

Class-Based Threads

For reusable thread logic, extend Threadable:

use H4or\Threadables\Threadable;
use H4or\Threadables\Core\ThreadContext;

class DataProcessor extends Threadable
{
    public function __construct(
        private readonly array $data
    ) {}

    public function execute(ThreadContext $context): void
    {
        $results = [];
        
        foreach ($this->data as $i => $item) {
            if ($context->shouldStop()) {
                break;
            }
            
            $results[] = $this->processItem($item);
            $context->set('progress', ($i + 1) / count($this->data) * 100);
        }
        
        $context->set('result', $results);
    }

    private function processItem(mixed $item): mixed
    {
        // Your processing logic
        return $item;
    }
}

// Usage
$processor = new DataProcessor([1, 2, 3, 4, 5]);
$handle = $processor->run();
$handle->join();
$results = $handle->get('result');

Signal Handling

Child processes automatically handle POSIX signals for graceful shutdown:

// Signals (SIGTERM, SIGINT, SIGHUP) are auto-handled in child processes.
// When a signal is received, $context->shouldStop() returns true.

$handle = Thread::run(function ($ctx) {
    while (!$ctx->shouldStop()) {
        // This loop will exit gracefully on Ctrl+C or kill signal
        doWork();
        usleep(100000);
    }
    $ctx->set('result', 'Gracefully stopped');
});

// In parent: handle Ctrl+C to stop threads
pcntl_async_signals(true);
pcntl_signal(SIGINT, function () use ($handle) {
    $handle->requestShutdown();
});

Custom Serializers

By default, PHP's serialize() is used for shared memory data. You can use JSON for portable, human-readable data:

use H4or\Threadables\Support\SynchronizedMemory;
use H4or\Threadables\Serializers\JsonSerializer;

// Create memory with JSON serializer
$memory = new SynchronizedMemory(
    serializer: new JsonSerializer()
);

// Or with custom JSON options
$memory = new SynchronizedMemory(
    serializer: new JsonSerializer(
        encodeFlags: JSON_PRETTY_PRINT | JSON_THROW_ON_ERROR,
        decodeFlags: JSON_THROW_ON_ERROR,
        depth: 128
    )
);

Available serializers:

  • PhpSerializer (default): Full PHP type support, validates non-serializable types
  • JsonSerializer: Portable format, human-readable, loses class information

You can create custom serializers by implementing SerializerInterface:

use H4or\Threadables\Contracts\SerializerInterface;

class MsgpackSerializer implements SerializerInterface
{
    public function serialize(mixed $value): string
    {
        return msgpack_pack($value);
    }

    public function unserialize(string $data): mixed
    {
        return msgpack_unpack($data);
    }
}

Memory Cleanup

The library automatically cleans up shared memory resources. For debugging or manual cleanup:

use H4or\Threadables\Thread;

// Check active resources in current process
echo Thread::getActiveResourceCount(); // e.g., 3

// List all system IPC resources (runs `ipcs -m`)
echo Thread::listSystemResources();

// Clean up orphaned resources by known keys (e.g., after a crash)
Thread::cleanupOrphaned($memoryKey, $mutexKey);

API Reference

Thread (Static Factory)

Method Description
run(Closure, ?int $memorySize) Start non-blocking thread
runBlocking(Closure, ?int $memorySize) Start and wait for completion
runWithTimeout(Closure, float $timeout, ?int $memorySize) Start with auto-kill timeout
map(array, Closure, ?int $memorySize) Process array in parallel

ThreadHandle

Method Description
isAlive() / running() Check if thread is running
isFinished() Check if thread has completed
join() / await() Wait for thread to complete
stop(bool $force = false) Request shutdown or force kill
kill() Force kill immediately
pause() / resume() Pause/resume thread execution
get(string $key, $default) Read shared data field
set(string $key, $value) Write shared data field
read() Read all shared data
write(mixed $data) Write all shared data
getResult($default) Shorthand for get('result', $default)
getFailure() Get ThreadFailure if failed
succeeded() / failed() Check success/failure status
getExitCode() Get process exit code
wasTimedOut() Check if killed by timeout
getRuntime() Get execution time in seconds
getState() Get current state string
setName(string) / getName() Set/get thread name
toArray() Get complete thread info as array

ThreadContext (Inside Thread)

Method Description
shouldStop() Check if shutdown requested
get(string $key, $default) Read shared data field
set(string $key, $value) Write shared data field
read() Read all shared data
write(mixed $data) Write all shared data
increment(string $key, int $by) Atomically increment counter
decrement(string $key, int $by) Atomically decrement counter
has(string $key) Check if key exists
forget(string $key) Remove a key
clear() Clear all shared data

ThreadPool

Method Description
map(array, Closure, int $concurrency, ?int $memorySize) Parallel map with limit
run(array $tasks, int $concurrency, ?int $memorySize) Run tasks with limit
joinAll(array $handles) Wait for all to complete
killAll(array $handles) Kill all threads
requestShutdownAll(array $handles) Request graceful shutdown
waitAll(array $handles, float $timeout) Wait with timeout
waitAny(array $handles, ?float $timeout) Wait for first completion
getRunning(array $handles) Get running threads
getCompleted(array $handles) Get completed threads
getSucceeded(array $handles) Get successful threads
getFailed(array $handles) Get failed threads
countRunning(array $handles) Count running threads
allCompleted(array $handles) Check if all done
anyRunning(array $handles) Check if any running
allSucceeded(array $handles) Check if all succeeded
anyFailed(array $handles) Check if any failed

Examples

The examples/ directory contains 22 runnable examples:

php examples/01_basic_thread.php          # Basic thread execution
php examples/02_blocking_execution.php    # Blocking execution
php examples/03_shared_data.php           # Shared data communication
php examples/04_graceful_shutdown.php     # Graceful shutdown handling
php examples/05_error_handling.php        # Exception handling
php examples/06_class_based_threadable.php # Class-based threads
php examples/07_parallel_map.php          # Parallel array processing
php examples/08_concurrency_limit.php     # Concurrency limiting
php examples/09_thread_pool_operations.php # Thread pool batch operations
php examples/10_timeouts.php              # Timeout handling
php examples/11_progress_tracking.php     # Progress bar tracking
php examples/12_debugging.php             # Debug info and toArray()
php examples/13_custom_memory_size.php    # Custom shared memory size
php examples/14_multiple_progress_bars.php # Multiple thread monitoring
php examples/15_signal_handling.php       # POSIX signal handling (Ctrl+C)
php examples/16_custom_serializers.php    # Custom serialization formats
php examples/17_parallel_http.php         # Real-world HTTP fetching
php examples/18_parallel_files.php        # Parallel file processing
php examples/19_atomic_counters.php       # Atomic increment/decrement
php examples/20_producer_consumer.php     # Producer-consumer pattern
php examples/21_retry_with_backoff.php    # Retry with exponential backoff
php examples/22_memory_cleanup.php        # Memory cleanup & debugging

Memory Considerations

Shared memory has a default size of 64KB. For larger data:

// Set memory size per thread
$handle = Thread::run(fn($ctx) => $ctx->write($largeData), memorySize: 1024 * 1024);

// For class-based threads
$threadable = new MyThreadable();
$threadable->setSharedMemorySize(1024 * 1024); // 1MB
$handle = $threadable->run();

// Check if data will fit
if ($handle->wouldFit($data)) {
    $handle->write($data);
}

Development

# Run tests
composer test

# Run with coverage
composer test:coverage

# Static analysis
composer analyse

# Format code
composer format

License

MIT License. See LICENSE.md for details.

Credits