Another support component for Hyperf.

Fund package maintenance!
huangdijia
hdj.me/sponsors

Installs: 48 091

Dependents: 8

Suggesters: 0

Security: 0

Stars: 1

Watchers: 1

Forks: 0

pkg:composer/friendsofhyperf/support

v3.2.0-beta.1 2025-12-06 10:42 UTC

README

Latest Version Total Downloads GitHub license

A comprehensive support component for Hyperf providing essential utilities, helpers, and base classes.

Features

  • 🎯 Fluent Dispatch API - Elegant job dispatch with support for async queue, AMQP, and Kafka
  • 🔄 Closure Jobs - Execute closures as background jobs with dependency injection
  • 🛠️ Helper Functions - Collection of useful helper functions
  • 📦 Bus System - Pending dispatch classes for various message systems
  • 🧩 Traits & Utilities - Reusable traits and utility classes
  • ⏱️ Backoff Strategies - Multiple retry backoff implementations for retry mechanisms

Installation

composer require friendsofhyperf/support

Usage

Dispatch Helper Function

The dispatch() helper function provides a fluent API for dispatching jobs to different systems:

Async Queue (Closure Jobs)

use function FriendsOfHyperf\Support\dispatch;

// Simple closure dispatch to async queue
dispatch(function () {
    // Your job logic here
    logger()->info('Job executed!');
});

// With configuration
dispatch(function () {
    // Your job logic here
})
    ->onConnection('high-priority')
    ->delay(60) // Execute after 60 seconds
    ->setMaxAttempts(5);

// With dependency injection
dispatch(function (UserService $userService, LoggerInterface $logger) {
    $users = $userService->getActiveUsers();
    $logger->info('Processing ' . count($users) . ' users');
});

AMQP Producer Messages

use Hyperf\Amqp\Message\ProducerMessageInterface;
use function FriendsOfHyperf\Support\dispatch;

// Dispatch AMQP message
dispatch($amqpMessage)
    ->setPool('default')
    ->setExchange('my.exchange')
    ->setRoutingKey('my.routing.key')
    ->setTimeout(10)
    ->setConfirm(true);

Kafka Producer Messages

use Hyperf\Kafka\Producer\ProduceMessage;
use function FriendsOfHyperf\Support\dispatch;

// Dispatch Kafka message
dispatch($kafkaMessage)
    ->setPool('default');

CallQueuedClosure

The CallQueuedClosure class allows you to execute closures as async queue jobs:

use FriendsOfHyperf\Support\CallQueuedClosure;

// Create a closure job
$job = CallQueuedClosure::create(function () {
    // Your job logic
    return 'Job completed!';
});

// Configure max attempts
$job->setMaxAttempts(3);

// The job can be pushed to queue manually or via dispatch()

Pending Dispatch Classes

PendingAsyncQueueDispatch

Fluent API for async queue job dispatch:

use FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch;

$pending = new PendingAsyncQueueDispatch($job);
$pending
    ->onConnection('default')
    ->delay(30)
    ->when($condition, function ($dispatch) {
        $dispatch->onConnection('special');
    })
    ->unless($otherCondition, function ($dispatch) {
        $dispatch->delay(60);
    });
// Job is dispatched when object is destroyed

PendingAmqpProducerMessageDispatch

Fluent API for AMQP message dispatch:

use FriendsOfHyperf\Support\Bus\PendingAmqpProducerMessageDispatch;

$pending = new PendingAmqpProducerMessageDispatch($message);
$pending
    ->setPool('default')
    ->setExchange('my.exchange')
    ->setRoutingKey('my.routing.key')
    ->setTimeout(5)
    ->setConfirm(true);
// Message is sent when object is destroyed

PendingKafkaProducerMessageDispatch

Fluent API for Kafka message dispatch:

use FriendsOfHyperf\Support\Bus\PendingKafkaProducerMessageDispatch;

$pending = new PendingKafkaProducerMessageDispatch($message);
$pending->setPool('default');
// Message is sent when object is destroyed

Conditional Execution

All pending dispatch classes support conditional execution:

use function FriendsOfHyperf\Support\dispatch;

dispatch($job)
    ->when($shouldUseHighPriority, function ($dispatch) {
        $dispatch->onConnection('high-priority');
    })
    ->unless($isTestMode, function ($dispatch) {
        $dispatch->delay(10);
    });

API Reference

dispatch($job)

Creates a pending dispatch instance based on the job type:

  • ClosurePendingAsyncQueueDispatch with CallQueuedClosure
  • ProducerMessageInterfacePendingAmqpProducerMessageDispatch
  • ProduceMessagePendingKafkaProducerMessageDispatch
  • Other objects → PendingAsyncQueueDispatch

PendingAsyncQueueDispatch Methods

  • onConnection(string $connection): static - Set queue connection
  • delay(int $delay): static - Delay job execution (seconds)
  • setMaxAttempts(int $attempts): static - Set max retry attempts
  • when(mixed $condition, callable $callback): static - Conditional execution
  • unless(mixed $condition, callable $callback): static - Inverse conditional execution

PendingAmqpProducerMessageDispatch Methods

  • setPool(string $pool): static - Set AMQP pool name
  • setExchange(string $exchange): static - Set exchange name
  • setRoutingKey(array|string $routingKey): static - Set routing key(s)
  • setTimeout(int $timeout): static - Set timeout (seconds)
  • setConfirm(bool $confirm): static - Enable/disable confirm mode
  • when(mixed $condition, callable $callback): static - Conditional execution
  • unless(mixed $condition, callable $callback): static - Inverse conditional execution

PendingKafkaProducerMessageDispatch Methods

  • setPool(string $pool): static - Set Kafka pool name
  • when(mixed $condition, callable $callback): static - Conditional execution
  • unless(mixed $condition, callable $callback): static - Inverse conditional execution

CallQueuedClosure

  • create(Closure $closure): static - Create a new closure job
  • setMaxAttempts(int $attempts): void - Set max retry attempts
  • handle(): mixed - Execute the closure (called by queue worker)

Backoff Strategies

The component provides various backoff strategies for retry mechanisms:

ArrayBackoff

Use custom delay intervals defined in an array:

use FriendsOfHyperf\Support\Backoff\ArrayBackoff;

// Custom delays
$backoff = new ArrayBackoff([100, 500, 1000, 2000, 5000]);

// Stop after array is exhausted (returns 0)
$backoff = new ArrayBackoff([100, 500, 1000], false);

// From comma-separated string
$backoff = ArrayBackoff::fromString('100, 500, 1000, 2000');

// From predefined patterns
$backoff = ArrayBackoff::fromPattern('short');      // [100, 200, 300, 500, 1000]
$backoff = ArrayBackoff::fromPattern('medium');     // [200, 500, 1000, 2000, 5000]
$backoff = ArrayBackoff::fromPattern('long');       // [500, 1000, 2000, 5000, 10000, 30000]
$backoff = ArrayBackoff::fromPattern('exponential'); // [100, 200, 400, 800, 1600, 3200, 6400]

// Usage in retry logic
$attempt = 0;
while (true) {
    try {
        return performOperation();
    } catch (Exception $e) {
        $delay = $backoff->next();
        if ($delay === 0) {
            throw $e; // No more retries
        }
        usleep($delay * 1000); // Convert to microseconds
    }
}

Available Backoff Implementations

  • ArrayBackoff - Custom intervals from an array
  • FixedBackoff - Constant delay between retries
  • LinearBackoff - Linear growth with configurable step
  • ExponentialBackoff - Exponential growth with optional jitter
  • FibonacciBackoff - Fibonacci sequence-based delays
  • PoissonBackoff - Statistical distribution-based delays
  • DecorrelatedJitterBackoff - Decorrelated jitter for better spreading

Common Backoff Interface

All backoff implementations implement BackoffInterface:

interface BackoffInterface
{
    public function next(): int;           // Get next delay in milliseconds
    public function reset(): void;         // Reset attempt counter
    public function getAttempt(): int;     // Get current attempt count
    public function sleep(): int;          // Sleep for calculated delay
}

Contact

License

MIT