modelslab/modelq

A lightweight PHP task queue library, alternative to traditional queue systems, optimized for ML inference workloads

Installs: 0

Dependents: 0

Suggesters: 0

Security: 0

Stars: 0

Watchers: 0

Forks: 0

Open Issues: 0

pkg:composer/modelslab/modelq

0.2 2025-12-25 09:22 UTC

This package is auto-updated.

Last update: 2025-12-25 09:22:52 UTC


README

A lightweight PHP task queue library, alternative to traditional queue systems, optimized for ML inference workloads.

PHP Version License

Features

  • Simple API - Easy to use task registration and enqueueing
  • Redis-backed - Fast and reliable task storage using Redis
  • Streaming Support - Real-time streaming results for long-running tasks
  • Remote Workers - Connect PHP frontend to Python GPU workers via Redis
  • Middleware Hooks - Lifecycle hooks for monitoring and customization
  • Delayed Tasks - Schedule tasks to run after a delay
  • CLI Tools - Built-in commands for queue management
  • ML Optimized - Designed for machine learning inference workloads

Use Case: PHP Frontend + Python GPU Worker

ModelQ PHP is designed to work seamlessly with Python ModelQ workers running on GPU servers. This enables you to:

  • Run your PHP web application on standard servers
  • Offload ML inference tasks to dedicated GPU servers running Python
  • Share a managed Redis instance (AWS ElastiCache, GCP Memorystore, etc.)
┌─────────────────┐      ┌─────────────────┐      ┌─────────────────┐
│   PHP Frontend  │─────▶│  Managed Redis  │◀─────│ Python Worker   │
│   (Web Server)  │      │  (AWS/GCP/etc)  │      │ (GPU Server)    │
└─────────────────┘      └─────────────────┘      └─────────────────┘
        │                                                  │
   Enqueue tasks                                    Process tasks
   Get results                                      (ML inference)

Python Worker (GPU Server)

from modelq import ModelQ

app = ModelQ(
    redis_host="your-redis.cache.amazonaws.com",
    redis_port=6379,
    redis_password="your-password"
)

@app.task("generate_image")
def generate_image(data):
    prompt = data.get("prompt")
    # Use GPU for image generation...
    return {"image_url": "https://cdn.example.com/generated.jpg"}

@app.task("llm_completion", stream=True)
def llm_completion(data):
    for token in generate_tokens(data["prompt"]):
        yield token

app.run_workers(num_workers=4)

PHP Frontend (Web Server)

<?php
use ModelsLab\ModelQ\ModelQ;

// Connect to the SAME Redis as Python worker
$modelq = new ModelQ(
    host: 'your-redis.cache.amazonaws.com',
    port: 6379,
    password: 'your-password'
);

// Register tasks (handlers empty - Python does the work)
$modelq->task('generate_image', fn($d) => null);
$modelq->task('llm_completion', fn($d) => null, ['stream' => true]);

// Enqueue task for Python worker
$task = $modelq->enqueue('generate_image', [
    'prompt' => 'A sunset over mountains'
]);

// Get result from Python worker
$result = $task->getResult($modelq->getRedisClient(), timeout: 120);
echo $result['image_url'];

// Or stream LLM responses
$task = $modelq->enqueue('llm_completion', ['prompt' => 'Hello']);
foreach ($task->getStream($modelq->getRedisClient()) as $token) {
    echo $token;
}

See examples/remote_worker.php for a complete example.

Requirements

  • PHP 8.1 or higher
  • Redis server
  • phpredis extension (ext-redis)

Installation

composer require modelslab/modelq

Make sure you have the phpredis extension installed:

# Ubuntu/Debian
sudo apt-get install php-redis

# macOS with Homebrew
pecl install redis

Quick Start

1. Create a Worker

<?php

require_once 'vendor/autoload.php';

use ModelsLab\ModelQ\ModelQ;

$modelq = new ModelQ(host: '127.0.0.1', port: 6379);

// Register a task handler
$modelq->task('process_image', function (array $data): array {
    $imageUrl = $data['url'];
    // Process the image...
    return ['status' => 'processed', 'url' => $imageUrl];
});

// Start the worker
$modelq->startWorkers(numWorkers: 2);

2. Enqueue Tasks

<?php

require_once 'vendor/autoload.php';

use ModelsLab\ModelQ\ModelQ;

$modelq = new ModelQ(host: '127.0.0.1', port: 6379);

// Register the task (handler can be empty on producer side)
$modelq->task('process_image', fn($data) => null);

// Enqueue a task
$task = $modelq->enqueue('process_image', ['url' => 'https://example.com/image.jpg']);

// Wait for the result
$result = $task->getResult($modelq->getRedisClient(), timeout: 30);
echo "Result: " . json_encode($result);

Usage

Basic Task Registration

use ModelsLab\ModelQ\ModelQ;

$modelq = new ModelQ(host: '127.0.0.1', port: 6379);

// Simple task
$modelq->task('add_numbers', function (array $data): array {
    return ['sum' => $data['a'] + $data['b']];
});

// Task with options
$modelq->task('long_running_task', function (array $data): mixed {
    // Long running operation...
    return $result;
}, [
    'timeout' => 300,  // 5 minute timeout
    'retries' => 3,    // Retry up to 3 times on failure
]);

Enqueueing Tasks

// Basic enqueue
$task = $modelq->enqueue('add_numbers', ['a' => 5, 'b' => 3]);

// Get the task ID
echo "Task ID: " . $task->taskId;

// Wait for result (blocking)
$result = $task->getResult($modelq->getRedisClient(), timeout: 10);

Custom Task IDs

By default, ModelQ generates a UUID for each task. You can provide your own task ID to correlate tasks with your database records:

// Use your database record ID as the task ID
$orderId = 'order-12345';
$task = $modelq->enqueue('process_order', ['order_id' => $orderId], taskId: $orderId);

echo $task->taskId; // 'order-12345'

// Later, retrieve the task using the same ID
$status = $modelq->getTaskStatus($orderId);
$details = $modelq->getTaskDetails($orderId);

This is useful when you want to:

  • Track tasks using your existing database primary keys
  • Easily correlate queue tasks with database records
  • Look up task status without storing the generated UUID

Streaming Tasks

For tasks that produce incremental output (like ML text generation):

// Register a streaming task
$modelq->task('generate_text', function (array $data): Generator {
    $prompt = $data['prompt'];
    $words = ['Hello', 'World', 'from', 'ModelQ'];

    foreach ($words as $word) {
        usleep(100000); // Simulate processing
        yield $word;
    }
}, ['stream' => true]);

// Consume the stream
$task = $modelq->enqueue('generate_text', ['prompt' => 'Hello']);

foreach ($task->getStream($modelq->getRedisClient()) as $chunk) {
    echo $chunk . " ";
}
// Output: Hello World from ModelQ

Queue Management

// Get all queued tasks
$tasks = $modelq->getAllQueuedTasks();
foreach ($tasks as $task) {
    echo "Task: {$task['task_id']} - {$task['task_name']}\n";
}

// Get task status
$status = $modelq->getTaskStatus($taskId);
echo "Status: $status"; // queued, processing, completed, failed

// Remove a task from queue
$modelq->removeTaskFromQueue($taskId);

// Clear the entire queue
$modelq->deleteQueue();

// Get currently processing tasks
$processing = $modelq->getProcessingTasks();

Task History & Monitoring

Track past tasks, view errors, and monitor remote workers:

// Get full task details (including error info for failed tasks)
$details = $modelq->getTaskDetails($taskId);
if ($details['status'] === 'failed') {
    echo "Error: " . $details['error']['message'];
    echo "Type: " . $details['error']['type'];
    echo "File: " . $details['error']['file'] . ":" . $details['error']['line'];
    echo "Trace: " . $details['error']['trace'];
}

// Get task history (most recent first)
$history = $modelq->getTaskHistory(limit: 50);

// Get only failed tasks
$failed = $modelq->getFailedTasks(limit: 20);
foreach ($failed as $task) {
    echo "Task {$task['task_name']} failed: {$task['error']['message']}\n";
}

// Get only completed tasks
$completed = $modelq->getCompletedTasks(limit: 20);

// Filter by task name
$imageTasks = $modelq->getTasksByName('process_image', limit: 50);

// Get task statistics
$stats = $modelq->getTaskStats();
echo "Total: {$stats['total']}\n";
echo "Completed: {$stats['by_status']['completed']}\n";
echo "Failed: {$stats['by_status']['failed']}\n";

// See per-task-name stats
foreach ($stats['by_task_name'] as $name => $counts) {
    echo "{$name}: {$counts['completed']}/{$counts['total']} succeeded\n";
}

// Get task count in history
$count = $modelq->getTaskHistoryCount();

// Clear old history (older than 7 days by default)
$removed = $modelq->clearTaskHistory(); // Default: 7 days
$removed = $modelq->clearTaskHistory(3600); // Older than 1 hour

Worker Info

Get detailed information about registered workers including system resources (CPU, RAM, GPU):

// Get all registered workers
$workers = $modelq->getWorkers();
foreach ($workers as $workerId => $worker) {
    echo "Worker: {$workerId}\n";
    echo "  Status: {$worker['status']}\n";
    echo "  Hostname: {$worker['hostname']}\n";
    echo "  OS: {$worker['os']}\n";
    
    if ($worker['system_info']) {
        $cpu = $worker['system_info']['cpu'];
        $ram = $worker['system_info']['ram'];
        
        echo "  CPU: {$cpu['cores_logical']} cores ({$cpu['usage_percent']}% used)\n";
        echo "  RAM: {$ram['total_gb']} GB ({$ram['used_percent']}% used)\n";
        
        // GPU info (if available)
        foreach ($worker['system_info']['gpu'] as $gpu) {
            echo "  GPU: {$gpu['name']} - {$gpu['memory_total_gb']} GB\n";
            echo "       Utilization: {$gpu['gpu_utilization_percent']}%\n";
        }
    }
    
    echo "  Tasks: " . implode(', ', $worker['allowed_tasks']) . "\n";
}

// Get a specific worker by ID
$worker = $modelq->getWorker('gpu-server-1');
if ($worker) {
    echo "Worker {$worker['worker_id']} is {$worker['status']}\n";
}

Worker info includes:

  • worker_id - Unique worker identifier
  • status - Current status (idle, busy)
  • allowed_tasks - List of tasks this worker handles
  • last_heartbeat - Unix timestamp of last heartbeat
  • hostname - Worker hostname
  • os - Operating system info
  • python_version - Python version (for Python workers)
  • php_version - PHP version (for PHP workers)
  • system_info - Detailed CPU, RAM, and GPU information

Delayed Tasks

// Enqueue a task to run after 60 seconds
$taskData = [
    'task_id' => 'delayed-' . uniqid(),
    'task_name' => 'send_reminder',
    'payload' => ['user_id' => 123],
    'status' => 'queued',
];

$modelq->enqueueDelayedTask($taskData, delaySeconds: 60);

Middleware

Create custom middleware for lifecycle hooks:

use ModelsLab\ModelQ\Middleware\Middleware;
use ModelsLab\ModelQ\Task\Task;

class LoggingMiddleware extends Middleware
{
    public function beforeEnqueue(?Task $task): void
    {
        echo "Enqueueing task: {$task->taskName}\n";
    }

    public function afterEnqueue(?Task $task): void
    {
        echo "Task enqueued: {$task->taskId}\n";
    }

    public function beforeWorkerBoot(): void
    {
        echo "Worker starting...\n";
    }

    public function afterWorkerBoot(): void
    {
        echo "Worker ready!\n";
    }

    public function onError(?Task $task, ?\Throwable $error): void
    {
        echo "Task {$task->taskId} failed: {$error->getMessage()}\n";
    }

    public function onTimeout(?Task $task): void
    {
        echo "Task {$task->taskId} timed out\n";
    }
}

// Apply middleware
$modelq->setMiddleware(new LoggingMiddleware());

Custom Redis Client

// Use your own Redis connection
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->auth('your-password');
$redis->select(2);

$modelq = new ModelQ(redisClient: $redis);

Error Handling

use ModelsLab\ModelQ\Exception\TaskTimeoutException;
use ModelsLab\ModelQ\Exception\TaskProcessingException;
use ModelsLab\ModelQ\Exception\RetryTaskException;

try {
    $result = $task->getResult($modelq->getRedisClient(), timeout: 10);
} catch (TaskTimeoutException $e) {
    echo "Task {$e->taskId} timed out\n";
} catch (TaskProcessingException $e) {
    echo "Task {$e->taskName} failed: {$e->getMessage()}\n";
}

// Inside a task handler, trigger a retry
$modelq->task('flaky_task', function (array $data): mixed {
    if (someCondition()) {
        throw new RetryTaskException('Temporary failure, retrying...');
    }
    return $result;
});

CLI Commands

ModelQ includes CLI commands for queue management:

# Show queue status
./vendor/bin/modelq status app.php

# List all queued tasks
./vendor/bin/modelq list-queued app.php

# Remove a specific task
./vendor/bin/modelq remove-task app.php <task-id>

# Clear the entire queue
./vendor/bin/modelq clear-queue app.php

# Start workers
./vendor/bin/modelq run-workers app.php --workers=4

The app.php file should return a configured ModelQ instance:

<?php
// app.php
require_once 'vendor/autoload.php';

use ModelsLab\ModelQ\ModelQ;

$modelq = new ModelQ(host: '127.0.0.1', port: 6379);

$modelq->task('my_task', function (array $data): mixed {
    // Task handler
    return $data;
});

return $modelq;

Configuration Options

$modelq = new ModelQ(
    redisClient: null,           // Optional: Provide your own Redis client
    host: '127.0.0.1',           // Redis host
    port: 6379,                  // Redis port
    db: 0,                       // Redis database number
    password: null,              // Redis password
    serverId: null,              // Custom server ID (defaults to hostname)
    webhookUrl: null,            // Webhook URL for task completion notifications
    requeueThreshold: null,      // Time before requeuing stuck tasks
    delaySeconds: 30,            // Default delay for delayed tasks
    logger: null,                // PSR-3 logger instance
);

Task Options

$modelq->task('my_task', $handler, [
    'timeout' => 60,     // Task timeout in seconds
    'stream' => false,   // Enable streaming mode
    'retries' => 3,      // Number of retry attempts
]);

Architecture

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Producer  │────▶│    Redis    │◀────│   Worker    │
│  (Enqueue)  │     │   (Queue)   │     │  (Process)  │
└─────────────┘     └─────────────┘     └─────────────┘
                           │
                           ▼
                    ┌─────────────┐
                    │   Results   │
                    │   Storage   │
                    └─────────────┘

Redis Keys

ModelQ uses the following Redis keys:

Key Pattern Type Description
task_queue List Main task queue
task:{id} String Task data
task_result:{id} String Task result
task_stream:{id} Stream Streaming task output
servers Hash Registered worker servers
processing_tasks Set Currently processing tasks
delayed_tasks Sorted Set Delayed tasks (score = execution time)

Testing

# Run PHPUnit tests
./vendor/bin/phpunit

# Run manual tests
php test_manual.php

# Run worker tests
php test_worker.php

# Run streaming tests
php test_streaming.php

Examples

Check the examples/ directory for complete working examples:

  • examples/basic_usage.php - Basic task queue operations
  • examples/producer.php - Enqueueing tasks and getting results
  • examples/worker.php - Worker process setup
  • examples/streaming_example.php - Streaming task example
  • examples/middleware_example.php - Custom middleware usage
  • examples/queue_management.php - Queue management operations
  • examples/remote_worker.php - PHP + Remote Python GPU Worker

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Related Projects