lishelun / nsq
NSQ Client for PHP
0.9.1
2023-12-14 15:43 UTC
Requires
- php: ^8.1
- ext-json: *
- amphp/http-client: ^4.6
- amphp/socket: ^1.1
- composer/semver: ^3.2
- phpinnacle/buffer: ^1.2
- psr/log: ^3.0
Requires (Dev)
- amphp/log: ^1.1
- dg/bypass-finals: ^1.3
- ergebnis/composer-normalize: ^2.15
- friendsofphp/php-cs-fixer: ^3.4
- nyholm/nsa: ^1.2
- phpstan/phpstan: ^1.8
- phpstan/phpstan-phpunit: ^1.1
- phpstan/phpstan-strict-rules: ^1.3
- phpunit/phpunit: ^9.5
- symfony/filesystem: ^6.1
- symfony/process: ^6.1
- symfony/var-dumper: ^6.1
- vimeo/psalm: ^4.4
This package is auto-updated.
Last update: 2025-03-14 18:40:43 UTC
README
PHP Client for NSQ.
This library follow SemVer. Until version 1.0 will be released anything MAY change at any time, public API SHOULD NOT be considered stable. If you want use it before stable version was released install strict version without range.
Installation
This library is installable via Composer:
composer require nsq/nsq
Requirements
This library requires PHP 8.0 or later.
Although not required, it is recommended that you install the phpinnacle/ext-buffer to speed up phpinnacle/buffer .
Features
- PUB
- SUB
- Feature Negotiation
- Discovery
- Backoff
- TLS
- Deflate
- Snappy
- Sampling
- AUTH
Usage
Producer
use Nsq\Producer; $producer = Producer::create(address: 'tcp://nsqd:4150'); // Publish a message to a topic $producer->publish('topic', 'Simple message'); // Publish multiple messages to a topic (atomically) $producer->publish('topic', [ 'Message one', 'Message two', ]); // Publish a deferred message to a topic $producer->publish('topic', 'Deferred message', delay: 5000);
Consumer
use Nsq\Consumer; use Nsq\Message; $consumer = Consumer::create( address: 'tcp://nsqd:4150', topic: 'topic', channel: 'channel', onMessage: static function (Message $message): Generator { yield $message->touch(); // Reset the timeout for an in-flight message yield $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process) yield $message->finish(); // Finish a message (indicate successful processing) }, );
Lookup
use Nsq\Lookup; use Nsq\Message; $lookup = new Lookup('http://nsqlookupd0:4161'); $lookup = new Lookup(['http://nsqlookupd0:4161', 'http://nsqlookupd1:4161', 'http://nsqlookupd2:4161']); $callable = static function (Message $message): Generator { yield $message->touch(); // Reset the timeout for an in-flight message yield $message->requeue(timeout: 5000); // Re-queue a message (indicate failure to process) yield $message->finish(); // Finish a message (indicate successful processing) }; $lookup->subscribe(topic: 'topic', channel: 'channel', onMessage: $callable); $lookup->subscribe(topic: 'anotherTopic', channel: 'channel', onMessage: $callable); $lookup->unsubscribe(topic: 'local', channel: 'channel'); $lookup->stop(); // unsubscribe all
Integrations
License:
The MIT License (MIT). Please see LICENSE
for more information.