h4or / threadables
A modern PHP library for parallel execution using process forking with shared memory IPC
Installs: 0
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Watchers: 0
Forks: 0
Open Issues: 0
pkg:composer/h4or/threadables
Requires
- php: ^8.3
- symfony/uid: ^7.0
- zhgzhg/gphpthread: ^1.0.8
Requires (Dev)
- laravel/pint: ^1.0
- phpstan/phpstan: ^2.1
- phpunit/phpunit: ^10.3.2
- spatie/ray: ^1.28
This package is not auto-updated.
Last update: 2026-01-30 20:37:44 UTC
README
A modern, elegant PHP library for parallel execution using process forking. Run code in separate processes with shared memory communication, graceful shutdown, and comprehensive error handling.
Features
- ๐ Simple API - Start threads with a single line of code
- ๐ Shared Memory - Communicate between parent and child processes
- โฑ๏ธ Timeouts - Automatic killing of long-running threads
- ๐ Graceful Shutdown - Signal threads to stop cleanly
- ๐ก Signal Handling - Auto-handles SIGTERM, SIGINT, SIGHUP in child processes
- ๐ Progress Tracking - Monitor thread progress in real-time
- ๐ Thread Pools - Batch operations with concurrency limits
- ๐ Thread-Safe - Mutex-protected shared memory access
- โก Parallel Map - Process arrays in parallel with ease
- ๐ Pluggable Serializers - Use PHP serialize, JSON, or custom formats
- ๐งน Memory Cleanup - Automatic cleanup with leak detection utilities
Requirements
- PHP 8.3+
- Linux/Unix (process forking via
pcntl) - PHP Extensions:
pcntl,shmop,sysvsem
Installation
composer require h4or/threadables
Quick Start
Basic Thread
use H4or\Threadables\Thread; // Run code in a separate process (non-blocking) $handle = Thread::run(function ($ctx) { $ctx->set('result', 'Hello from thread!'); }); // Do other work while thread runs... $handle->join(); // Wait for completion echo $handle->get('result'); // "Hello from thread!"
Blocking Execution
$handle = Thread::runBlocking(function ($ctx) { $ctx->set('answer', 42); }); echo $handle->get('answer'); // 42
Parallel Map
// Process items in parallel $results = Thread::map([1, 2, 3, 4, 5], function ($item, $ctx) { $ctx->set('result', $item * 2); }); // $results = [2, 4, 6, 8, 10]
Core Concepts
Thread Handle
When you start a thread, you get a ThreadHandle that lets you:
$handle = Thread::run(fn($ctx) => sleep(5)); $handle->isAlive(); // Check if running $handle->join(); // Wait for completion $handle->stop(); // Request graceful shutdown $handle->kill(); // Force kill immediately $handle->getExitCode(); // Get exit code (0 = success) $handle->getFailure(); // Get exception details if failed $handle->getRuntime(); // Get execution time in seconds
Shared Data
Threads communicate via shared memory:
$handle = Thread::run(function ($ctx) { // Write individual fields $ctx->set('status', 'processing'); $ctx->set('progress', 50); // Write entire data structure $ctx->write(['items' => [1, 2, 3], 'count' => 3]); }); // Parent can read while thread runs while ($handle->isAlive()) { echo "Progress: " . $handle->get('progress', 0) . "%\n"; usleep(100000); } $handle->join(); $data = $handle->read(); // Get all data
Graceful Shutdown
Threads can check for shutdown requests:
$handle = Thread::run(function ($ctx) { while (!$ctx->shouldStop()) { // Do work... usleep(100000); } $ctx->set('status', 'stopped gracefully'); }); sleep(1); $handle->stop(); // Request shutdown $handle->join();
Error Handling
Exceptions in threads are captured:
$handle = Thread::runBlocking(function ($ctx) { throw new RuntimeException('Something went wrong'); }); if ($handle->failed()) { $failure = $handle->getFailure(); echo $failure->message; // "Something went wrong" echo $failure->class; // "RuntimeException" echo $failure->file; // "/path/to/file.php" echo $failure->line; // 42 echo $failure->trace; // Full stack trace }
Timeouts
Automatically kill threads that run too long:
$handle = Thread::runWithTimeout(function ($ctx) { sleep(60); // Long operation }, timeoutSeconds: 5.0); if ($handle->wasTimedOut()) { echo "Thread was killed after 5 seconds"; }
Thread Pools
For batch operations with concurrency control:
use H4or\Threadables\Support\ThreadPool; // Map with concurrency limit (max 3 threads at once) $results = ThreadPool::map( items: $largeArray, callback: fn($item, $ctx) => $ctx->set('result', process($item)), concurrency: 3 ); // Run multiple tasks $handles = ThreadPool::run([ fn($ctx) => $ctx->set('result', 'Task 1'), fn($ctx) => $ctx->set('result', 'Task 2'), fn($ctx) => $ctx->set('result', 'Task 3'), ], concurrency: 2); // Batch operations ThreadPool::joinAll($handles); ThreadPool::killAll($handles); ThreadPool::requestShutdownAll($handles); // Query status $running = ThreadPool::getRunning($handles); $completed = ThreadPool::getCompleted($handles); $succeeded = ThreadPool::getSucceeded($handles); $failed = ThreadPool::getFailed($handles);
Class-Based Threads
For reusable thread logic, extend Threadable:
use H4or\Threadables\Threadable; use H4or\Threadables\Core\ThreadContext; class DataProcessor extends Threadable { public function __construct( private readonly array $data ) {} public function execute(ThreadContext $context): void { $results = []; foreach ($this->data as $i => $item) { if ($context->shouldStop()) { break; } $results[] = $this->processItem($item); $context->set('progress', ($i + 1) / count($this->data) * 100); } $context->set('result', $results); } private function processItem(mixed $item): mixed { // Your processing logic return $item; } } // Usage $processor = new DataProcessor([1, 2, 3, 4, 5]); $handle = $processor->run(); $handle->join(); $results = $handle->get('result');
Signal Handling
Child processes automatically handle POSIX signals for graceful shutdown:
// Signals (SIGTERM, SIGINT, SIGHUP) are auto-handled in child processes. // When a signal is received, $context->shouldStop() returns true. $handle = Thread::run(function ($ctx) { while (!$ctx->shouldStop()) { // This loop will exit gracefully on Ctrl+C or kill signal doWork(); usleep(100000); } $ctx->set('result', 'Gracefully stopped'); }); // In parent: handle Ctrl+C to stop threads pcntl_async_signals(true); pcntl_signal(SIGINT, function () use ($handle) { $handle->requestShutdown(); });
Custom Serializers
By default, PHP's serialize() is used for shared memory data. You can use JSON for portable, human-readable data:
use H4or\Threadables\Support\SynchronizedMemory; use H4or\Threadables\Serializers\JsonSerializer; // Create memory with JSON serializer $memory = new SynchronizedMemory( serializer: new JsonSerializer() ); // Or with custom JSON options $memory = new SynchronizedMemory( serializer: new JsonSerializer( encodeFlags: JSON_PRETTY_PRINT | JSON_THROW_ON_ERROR, decodeFlags: JSON_THROW_ON_ERROR, depth: 128 ) );
Available serializers:
- PhpSerializer (default): Full PHP type support, validates non-serializable types
- JsonSerializer: Portable format, human-readable, loses class information
You can create custom serializers by implementing SerializerInterface:
use H4or\Threadables\Contracts\SerializerInterface; class MsgpackSerializer implements SerializerInterface { public function serialize(mixed $value): string { return msgpack_pack($value); } public function unserialize(string $data): mixed { return msgpack_unpack($data); } }
Memory Cleanup
The library automatically cleans up shared memory resources. For debugging or manual cleanup:
use H4or\Threadables\Thread; // Check active resources in current process echo Thread::getActiveResourceCount(); // e.g., 3 // List all system IPC resources (runs `ipcs -m`) echo Thread::listSystemResources(); // Clean up orphaned resources by known keys (e.g., after a crash) Thread::cleanupOrphaned($memoryKey, $mutexKey);
API Reference
Thread (Static Factory)
| Method | Description |
|---|---|
run(Closure, ?int $memorySize) |
Start non-blocking thread |
runBlocking(Closure, ?int $memorySize) |
Start and wait for completion |
runWithTimeout(Closure, float $timeout, ?int $memorySize) |
Start with auto-kill timeout |
map(array, Closure, ?int $memorySize) |
Process array in parallel |
ThreadHandle
| Method | Description |
|---|---|
isAlive() / running() |
Check if thread is running |
isFinished() |
Check if thread has completed |
join() / await() |
Wait for thread to complete |
stop(bool $force = false) |
Request shutdown or force kill |
kill() |
Force kill immediately |
pause() / resume() |
Pause/resume thread execution |
get(string $key, $default) |
Read shared data field |
set(string $key, $value) |
Write shared data field |
read() |
Read all shared data |
write(mixed $data) |
Write all shared data |
getResult($default) |
Shorthand for get('result', $default) |
getFailure() |
Get ThreadFailure if failed |
succeeded() / failed() |
Check success/failure status |
getExitCode() |
Get process exit code |
wasTimedOut() |
Check if killed by timeout |
getRuntime() |
Get execution time in seconds |
getState() |
Get current state string |
setName(string) / getName() |
Set/get thread name |
toArray() |
Get complete thread info as array |
ThreadContext (Inside Thread)
| Method | Description |
|---|---|
shouldStop() |
Check if shutdown requested |
get(string $key, $default) |
Read shared data field |
set(string $key, $value) |
Write shared data field |
read() |
Read all shared data |
write(mixed $data) |
Write all shared data |
increment(string $key, int $by) |
Atomically increment counter |
decrement(string $key, int $by) |
Atomically decrement counter |
has(string $key) |
Check if key exists |
forget(string $key) |
Remove a key |
clear() |
Clear all shared data |
ThreadPool
| Method | Description |
|---|---|
map(array, Closure, int $concurrency, ?int $memorySize) |
Parallel map with limit |
run(array $tasks, int $concurrency, ?int $memorySize) |
Run tasks with limit |
joinAll(array $handles) |
Wait for all to complete |
killAll(array $handles) |
Kill all threads |
requestShutdownAll(array $handles) |
Request graceful shutdown |
waitAll(array $handles, float $timeout) |
Wait with timeout |
waitAny(array $handles, ?float $timeout) |
Wait for first completion |
getRunning(array $handles) |
Get running threads |
getCompleted(array $handles) |
Get completed threads |
getSucceeded(array $handles) |
Get successful threads |
getFailed(array $handles) |
Get failed threads |
countRunning(array $handles) |
Count running threads |
allCompleted(array $handles) |
Check if all done |
anyRunning(array $handles) |
Check if any running |
allSucceeded(array $handles) |
Check if all succeeded |
anyFailed(array $handles) |
Check if any failed |
Examples
The examples/ directory contains 22 runnable examples:
php examples/01_basic_thread.php # Basic thread execution php examples/02_blocking_execution.php # Blocking execution php examples/03_shared_data.php # Shared data communication php examples/04_graceful_shutdown.php # Graceful shutdown handling php examples/05_error_handling.php # Exception handling php examples/06_class_based_threadable.php # Class-based threads php examples/07_parallel_map.php # Parallel array processing php examples/08_concurrency_limit.php # Concurrency limiting php examples/09_thread_pool_operations.php # Thread pool batch operations php examples/10_timeouts.php # Timeout handling php examples/11_progress_tracking.php # Progress bar tracking php examples/12_debugging.php # Debug info and toArray() php examples/13_custom_memory_size.php # Custom shared memory size php examples/14_multiple_progress_bars.php # Multiple thread monitoring php examples/15_signal_handling.php # POSIX signal handling (Ctrl+C) php examples/16_custom_serializers.php # Custom serialization formats php examples/17_parallel_http.php # Real-world HTTP fetching php examples/18_parallel_files.php # Parallel file processing php examples/19_atomic_counters.php # Atomic increment/decrement php examples/20_producer_consumer.php # Producer-consumer pattern php examples/21_retry_with_backoff.php # Retry with exponential backoff php examples/22_memory_cleanup.php # Memory cleanup & debugging
Memory Considerations
Shared memory has a default size of 64KB. For larger data:
// Set memory size per thread $handle = Thread::run(fn($ctx) => $ctx->write($largeData), memorySize: 1024 * 1024); // For class-based threads $threadable = new MyThreadable(); $threadable->setSharedMemorySize(1024 * 1024); // 1MB $handle = $threadable->run(); // Check if data will fit if ($handle->wouldFit($data)) { $handle->write($data); }
Development
# Run tests composer test # Run with coverage composer test:coverage # Static analysis composer analyse # Format code composer format
License
MIT License. See LICENSE.md for details.
Credits
- Built on GPhpThread for process forking
- Uses Symfony UID for unique identifiers