thesis / nats
Async (fiber based) client for Nats.
Fund package maintenance!
www.tinkoff.ru/cf/5MqZQas2dk7
Installs: 113
Dependents: 0
Suggesters: 0
Security: 0
Stars: 46
Watchers: 4
Forks: 1
Open Issues: 4
pkg:composer/thesis/nats
Requires
- php: ^8.3
- ext-filter: *
- amphp/amp: ^3.1
- amphp/byte-stream: ^2.1
- amphp/parser: ^1.1
- amphp/pipeline: ^1.2
- amphp/socket: ^2.3
- cuyz/valinor: ^1.15
- revolt/event-loop: ^1.0.7
- thesis/time-span: ^0.2.3
Requires (Dev)
- bamarni/composer-bin-plugin: ^1.8.2
- phpunit/phpunit: ^10.5.58
- symfony/var-dumper: ^6.4.15 || ^7.3.5
Suggests
- ext-sodium: Provides Ed25519 for nkey authentication
This package is auto-updated.
Last update: 2025-11-20 18:32:17 UTC
README
Pure non-blocking (fiber based) strictly typed full-featured PHP driver for NATS.
Features
- NATS Core
- NATS JetStream
- NATS KV
- NATS ObjectStore
- NATS CRDT
- NATS Message Scheduler
- NATS JetStream Batch Publishing
- Nats Service Api
Installation
composer require thesis/nats
Nats Core
The library implements the full functionality of NATS Core, including pub-sub, queues and request–reply.
Pub-Sub
<?php declare(strict_types=1); require_once __DIR__ . '/vendor/autoload.php'; use Thesis\Nats; use function Amp\delay; use function Amp\trapSignal; $nc = new Nats\Client(Nats\Config::default()); $nc->subscribe('foo.*', static function (Nats\Delivery $delivery): void { dump("Received message {$delivery->message->payload} for consumer#1"); }); $nc->subscribe('foo.>', static function (Nats\Delivery $delivery): void { dump("Received message {$delivery->message->payload} for consumer#2"); }); $subscription = $nc->subscribe('foo.bar', static function (Nats\Delivery $delivery): void { dump("Received message {$delivery->message->payload} for consumer#3"); }); $nc->publish('foo.bar', new Nats\Message('Hello World!')); // visible for all consumers $nc->publish('foo.baz', new Nats\Message('Hello World!')); // visible only for 1-2 consumers $nc->publish('foo.bar.baz', new Nats\Message('Hello World!')); // visible only for 2 consumer $subscription->stop(); $nc->publish('foo.bar', new Nats\Message('Hello World!')); // visible for 1-2 consumers trapSignal([\SIGTERM, \SIGINT]); $nc->drain();
Queues
<?php declare(strict_types=1); require_once __DIR__ . '/vendor/autoload.php'; use Thesis\Nats; use function Amp\trapSignal; $nc = new Nats\Client(Nats\Config::default()); $nc->subscribe( subject: 'foo.>', handler: static function (Nats\Delivery $delivery): void { dump("Received message {$delivery->message->payload} for consumer#1"); }, queueGroup: 'test', ); $nc->subscribe( subject: 'foo.>', handler: static function (Nats\Delivery $delivery): void { dump("Received message {$delivery->message->payload} for consumer#2"); }, queueGroup: 'test', ); $nc->subscribe( subject: 'foo.>', handler: static function (Nats\Delivery $delivery): void { dump("Received message {$delivery->message->payload} for consumer#3"); }, queueGroup: 'test', ); $nc->publish('foo.bar', new Nats\Message('x')); $nc->publish('foo.baz', new Nats\Message('y')); $nc->publish('foo.bar.baz', new Nats\Message('z')); trapSignal([\SIGTERM, \SIGINT]); $nc->drain();
Request-reply
<?php declare(strict_types=1); require_once __DIR__ . '/vendor/autoload.php'; use Thesis\Nats; $nc = new Nats\Client(Nats\Config::default()); $nc->subscribe('foo.>', static function (Nats\Delivery $delivery): void { dump("Received request {$delivery->message->payload}"); $delivery->reply(new Nats\Message(strrev($delivery->message->payload ?? ''))); }); $response = $nc->request('foo.bar', new Nats\Message('Hello World!')); dump("Received response {$response->message->payload}"); $nc->drain();
Nats JetStream
JetStream is the built-in NATS persistence system. The library provides both JetStream entity management (streams, consumers) and message publishing/consumption capabilities.
Fetch Batch
You can request a specific number of messages at once using PullConsumer::fetch with FetchConfig::batch(number_of_messages). For example:
<?php declare(strict_types=1); use Thesis\Nats; use Thesis\Nats\JetStream; $consumer = $stream->createOrUpdateConsumer(...); $batch = $consumer ->pulling() ->fetch(JetStream\FetchConfig::batch(10)); foreach ($batch as $delivery) { // handle message $delivery->ack(); }
You can also configure the batch retrieval timeout and heartbeat settings using FetchConfig::maxWait and FetchConfig::heartbeat parameters.
Fetch Bytes
If you want to retrieve a batch of a specific size in bytes, you can use FetchConfig::bytes(byte_size):
<?php declare(strict_types=1); use Thesis\Nats; use Thesis\Nats\JetStream; $consumer = $stream->createOrUpdateConsumer(...); $batch = $consumer ->pulling() ->fetch(JetStream\FetchConfig::bytes(1024)); foreach ($batch as $delivery) { // handle message $delivery->ack(); }
And you can also configure the timeout and heartbeat settings.
Fetch Immediate
If you simply want to retrieve a batch of messages currently available in the stream, use FetchConfig::immediate():
<?php declare(strict_types=1); use Thesis\Nats; use Thesis\Nats\JetStream; $consumer = $stream->createOrUpdateConsumer(...); $batch = $consumer ->pulling() ->fetch(JetStream\FetchConfig::immediate()); foreach ($batch as $delivery) { // handle message $delivery->ack(); }
Push Consumer
NATS offers two message delivery models: pull and push. Although the NATS documentation recommends the pull approach, for most PHP use cases push consumers are often preferable.
Similar to RabbitMQ, push consumers can be configured to avoid negative impacts on both the consumers and the messages themselves.
Key configuration parameters include maxAckPending, which limits the number of unacknowledged messages NATS will deliver before waiting (analogous to prefetch count in RabbitMQ),
and the ackPolicy. This setup allows you to scale message processing by increasing the number of consumers and distribute messages among them more evenly.
For example, if you want to process messages one by one with explicit acknowledgments, you would set maxAckPending=1 and ackPolicy=explicit.
<?php declare(strict_types=1); require_once __DIR__ . '/vendor/autoload.php'; use Thesis\Nats; use Thesis\Nats\JetStream\Api; use Thesis\Time\TimeSpan; $consumer = $stream->createOrUpdateConsumer(new Api\ConsumerConfig( durableName: 'EventPushConsumer', deliverSubject: 'push-consumer-delivery', ackPolicy: Api\AckPolicy::Explicit, idleHeartbeat: TimeSpan::fromSeconds(5), maxAckPending: 1, )); $subscription = $consumer->push( static function (Nats\JetStream\Delivery $delivery): void { dump($delivery->message->payload); $delivery->ack(); }, );
Additionally, for push consumers, the deliverSubject parameter is mandatory, as it is this very parameter that distinguishes the push model from pull.
If you want to distribute messages from a single consumer across different subscriptions, you should use the deliverGroup parameter.
This enables you to scale processing across multiple consumer instances. This parameter functions identically to queueGroup in NATS Core.
<?php declare(strict_types=1); require_once __DIR__ . '/vendor/autoload.php'; use Thesis\Nats; use Thesis\Nats\JetStream\Api; use Thesis\Time\TimeSpan; $consumer = $stream->createOrUpdateConsumer(new Api\ConsumerConfig( durableName: 'EventPushConsumer', deliverSubject: 'push-consumer-delivery', deliverGroup: 'testing', ackPolicy: Api\AckPolicy::Explicit, idleHeartbeat: TimeSpan::fromSeconds(5), maxAckPending: 1, ));
Please refer to the push consumer configuration documentation to understand the purpose of each parameter.
Pull Consumer
Additionally, you can use PullConsumer::consume. This method handles message buffering, periodically requests the server and monitors timeouts to automatically request the next message batch.
Just like with push consumers, you can call pull directly on the Consumer object:
<?php declare(strict_types=1); require_once __DIR__ . '/vendor/autoload.php'; use Thesis\Nats; use Thesis\Nats\JetStream\Api; use Thesis\Time\TimeSpan; $nc = new Nats\Client(Nats\Config::default()); $js = $nc->jetStream(); $consumer = $stream->createOrUpdateConsumer(new Api\ConsumerConfig( durableName: 'EventPullConsumer', ackPolicy: Api\AckPolicy::Explicit, )); $subscription = $consumer->pull( static function (Nats\JetStream\Delivery $delivery): void { dump($delivery->message->payload); $delivery->ack(); }, );
However, if you want to read messages with multiple consumers within the same process, you need to create a PullConsumer using Consumer::pulling(), and then create multiple consumer instances:
<?php declare(strict_types=1); require_once __DIR__ . '/vendor/autoload.php'; use Thesis\Nats; use Thesis\Nats\JetStream\Api; use Thesis\Time\TimeSpan; use function Amp\trapSignal; $consumer = $stream->createOrUpdateConsumer(new Api\ConsumerConfig( durableName: 'EventPullConsumer', ackPolicy: Api\AckPolicy::Explicit, )); $consumer = $consumer->pulling(); $consumer->consume( static function (Nats\JetStream\Delivery $delivery): void { dump("Consumer#1: {$delivery->message->payload}"); $delivery->ack(); }, ); $consumer->consume( static function (Nats\JetStream\Delivery $delivery): void { dump("Consumer#2: {$delivery->message->payload}"); $delivery->ack(); }, ); trapSignal([\SIGINT, \SIGTERM]); $consumer->drain();
To stop all subscriptions, use Consumer::drain() or Consumer::stop().
You can control the number of requested messages per batch or the batch size using the PullConsumeConfig::maxMessages and PullConsumeConfig::maxBytes parameters respectively.
However, only one of these parameters can be set at a time.
<?php declare(strict_types=1); use Thesis\Nats\JetStream;; $consumer->consume( static function (Nats\JetStream\Delivery $delivery): void {}, new JetStream\PullConsumeConfig( maxMessages: 1_000, ), );
Get message
<?php declare(strict_types=1); require_once __DIR__ . '/vendor/autoload.php'; use Thesis\Nats; use Thesis\Nats\JetStream\Api\StreamConfig; $nc = new Nats\Client(Nats\Config::default()); $js = $nc->jetStream(); $js->deleteStream('EventStream'); $stream = $js->createStream(new StreamConfig( name: 'EventStream', description: 'Application events', subjects: ['events.*'], )); for ($i = 0; $i < 5; ++$i) { $js->publish( subject: 'events.payment_rejected', message: new Nats\Message( payload: "Message#{$i}", headers: (new Nats\Headers()) ->with(Nats\Header\MsgId::header(), "id:{$i}"), ), ); } dump($stream->getLastMessageForSubject('events.payment_rejected')?->payload); $nc->drain();
NATS Key Value Store
JetStream, the persistence layer of NATS, not only allows for the higher qualities of service and features associated with 'streaming', but it also enables some functionalities not found in messaging systems like Key Value Store.
Store key values
<?php declare(strict_types=1); require __DIR__ . '/vendor/autoload.php'; use Thesis\Nats; use Thesis\Nats\JetStream\KeyValue\BucketConfig; $nc = new Nats\Client(Nats\Config::default()); $js = $nc->jetStream(); $kv = $js->createOrUpdateKeyValue(new BucketConfig( bucket: 'configs', )); $kv->put('app.env', 'prod'); $kv->put('database.dsn', 'mysql:host=127.0.0.1;port=3306'); dump( $kv->get('app.env')?->value, $kv->get('database.dsn')?->value, ); $nc->drain();
Watch KV
<?php declare(strict_types=1); require __DIR__ . '/vendor/autoload.php'; use Thesis\Nats; use Thesis\Nats\JetStream\KeyValue\BucketConfig; use function Amp\trapSignal; $nc = new Nats\Client(Nats\Config::default()); $js = $nc->jetStream(); $js->deleteKeyValue('configs'); $kv = $js->createOrUpdateKeyValue(new BucketConfig( bucket: 'configs', )); $subscription = $kv->watch(static function (Nats\JetStream\KeyValue\Entry $entry): void { dump("Config key {$entry->key} value changed to {$entry->value}"); }); $kv->put('app.env', 'prod'); $kv->put('database.dsn', 'mysql:host=127.0.0.1;port=3306'); trapSignal([\SIGTERM, \SIGINT]); $subscription->stop(); $nc->drain();
NATS Object Store
JetStream, the persistence layer of NATS, not only allows for the higher qualities of service and features associated with 'streaming', but it also enables some functionalities not found in messaging systems like Object Store.
Store objects in the buckets
<?php declare(strict_types=1); require __DIR__ . '/vendor/autoload.php'; use Thesis\Nats; use Thesis\Nats\JetStream\ObjectStore\ObjectMeta; use Thesis\Nats\JetStream\ObjectStore\ResourceReader; use Thesis\Nats\JetStream\ObjectStore\StoreConfig; $nc = new Nats\Client(Nats\Config::default()); $js = $nc->jetStream(); $js->deleteObjectStore('code'); $store = $js->createOrUpdateObjectStore(new StoreConfig( store: 'code', )); $handle = fopen(__DIR__.'/app.php', 'r') ?? throw new \RuntimeException('Failed to open file.'); $store->put(new ObjectMeta(name: 'app.php'), new ResourceReader($handle)); fclose($handle); $store->put(new ObjectMeta('config.php'), '<?php return [];'); dump( (string) $store->get('app.php'), (string) $store->get('config.php'), ); $nc->drain();
Watch Object Store
<?php declare(strict_types=1); require __DIR__ . '/vendor/autoload.php'; use Thesis\Nats; use Thesis\Nats\JetStream\ObjectStore\ObjectInfo; use Thesis\Nats\JetStream\ObjectStore\ObjectMeta; use Thesis\Nats\JetStream\ObjectStore\StoreConfig; use function Amp\delay; $nc = new Nats\Client(Nats\Config::default()); $js = $nc->jetStream(); $js->deleteObjectStore('code'); $store = $js->createOrUpdateObjectStore(new StoreConfig( store: 'code', )); $subscription = $store->watch(static function (ObjectInfo $info): void { dump("New object {$info->name} in the bucket {$info->bucket} at size {$info->size} bytes"); }); $store->put(new ObjectMeta('config.php'), '<?php return [];'); $store->put(new ObjectMeta('snippet.php'), '<?php echo 1 + 1;'); delay(0.5); $subscription->stop(); $nc->drain();
NATS CRDT
Distributed Counter CRDT. A Stream can opt in to supporting Counters which will allow any subject to be a counter. All subjects in the stream must be counters. See ADR-49 for details.
Add Counter
<?php declare(strict_types=1); require __DIR__ . '/vendor/autoload.php'; use Thesis\Nats; use Thesis\Nats\JetStream\Counter\CounterConfig; $nc = new Nats\Client(Nats\Config::default()); $js = $nc->jetStream(); $counter = $js->createOrUpdateCounter(new CounterConfig( name: 'atomics', )); dump($counter->add('x', 1)); // 1 dump($counter->add('x', 2)); // 3
Get Counter
<?php declare(strict_types=1); require __DIR__ . '/vendor/autoload.php'; use Thesis\Nats; use Thesis\Nats\JetStream\Counter\CounterConfig; $nc = new Nats\Client(Nats\Config::default()); $js = $nc->jetStream(); $counter = $js->createOrUpdateCounter(new CounterConfig( name: 'atomics', )); dump($counter->add('x', 1)); // 1 dump($counter->get('x')?->value); // 1
Get Counters
<?php declare(strict_types=1); require_once __DIR__ . '/vendor/autoload.php'; use Thesis\Nats; use Thesis\Nats\JetStream\Counter\CounterConfig; $nc = new Nats\Client(Nats\Config::default()); $js = $nc->jetStream(); $jetstream->deleteCounter('atomics'); $counter = $js->createOrUpdateCounter(new CounterConfig( name: 'atomics', )); $counter->add('x', 1); $counter->add('y', 1); $counter->add('z', 1); foreach ($counter->getMultiple() as $entry) { echo "{$entry->subject}: {$entry->value}\n"; }
NATS Message Scheduler
Delayed Message Scheduling. The AllowMsgSchedules stream configuration option allows the scheduling of messages. Users can use this feature for delayed publishing/scheduling of messages.
See ADR-51 for details.
Single scheduled message
<?php declare(strict_types=1); require_once __DIR__ . '/vendor/autoload.php'; use Thesis\Nats; use Thesis\Nats\Header; use Thesis\Nats\JetStream\Api\AckPolicy; use Thesis\Nats\JetStream\Api\ConsumerConfig; use Thesis\Nats\JetStream\Api\StreamConfig; use Thesis\Nats\JetStream\Api\DeliverPolicy; use function Amp\trapSignal; $nc = new Nats\Client(Nats\Config::default()); $js = $nc->jetStream(); $stream = $js->createStream(new StreamConfig( name: 'RecurrentsStream', subjects: [ 'recurrents', 'scheduler.recurrents.*', ], allowMsgSchedules: true, )); $js->publish('scheduler.recurrents.1', new Nats\Message( payload: '{"id":1}', headers: (new Nats\Headers()) ->with(Header\Schedule::Header, new \DateTimeImmutable('+5 seconds')) ->with(Header\ScheduleTarget::header(), 'recurrents'), )); $subscription = $stream ->createOrUpdateConsumer(new ConsumerConfig( durableName: 'RecurrentsConsumer', deliverPolicy: DeliverPolicy::New, ackPolicy: AckPolicy::None, filterSubjects: ['recurrents'], )) ->pull(static function (Nats\JetStream\Delivery $delivery): void { dump([ $delivery->message->payload, $delivery->message->headers?->get(Header\Scheduler::header()), $delivery->message->headers?->get(Header\ScheduleNext::header()), ]); }); trapSignal([\SIGINT, \SIGTERM]); $subscription->awaitCompletion();
NATS JetStream Batch Publishing
The AllowAtomicPublish stream configuration option allows to atomically publish N messages into a stream. See ADR-50 for details.
Publish using PublishBatch
<?php declare(strict_types=1); require_once __DIR__ . '/vendor/autoload.php'; use Thesis\Nats; use Thesis\Nats\JetStream\Api\StreamConfig; $nc = new Nats\Client(Nats\Config::default()); $js = $nc->jetStream(); $stream = $js->createStream(new StreamConfig( name: 'Batches', description: 'Batch Stream', subjects: ['batch.*'], allowAtomicPublish: true, )); $batch = $js->createPublishBatch(); for ($i = 0; $i < 999; ++$i) { $batch->publish('batch.orders', new Nats\Message("Order#{$i}")); } $batch->publish('batch.orders', new Nats\Message('Order#1000'), new Nats\PublishBatchOptions(commit: true));
Publish batch using JetStream
<?php declare(strict_types=1); require_once __DIR__ . '/vendor/autoload.php'; use Thesis\Nats; use Thesis\Nats\JetStream\Api\StreamConfig; $nc = new Nats\Client(Nats\Config::default()); $js = $nc->jetStream(); $stream = $js->createStream(new StreamConfig( name: 'Batches', description: 'Batch Stream', subjects: ['batch.*'], allowAtomicPublish: true, )); $js->publishBatch('batch.orders', [ new Nats\Message('Order#1'), new Nats\Message('Order#2'), new Nats\Message('Order#3'), ]);
Nats Service Api
This is implementation of ADR-32.
The core of the Micro component is the Service. A Service aggregates endpoints for handling application logic. Services are named and versioned. You create a Service using the Client::createService(), passing in the Service configuration.
Micro Service
<?php declare(strict_types=1); use Thesis\Nats; use Thesis\Nats\Micro; require __DIR__ . '/vendor/autoload.php'; $nc = new Nats\Client( Nats\Config::default(), ); $srv = $nc->createService(new Micro\ServiceConfig('EchoService', '1.0.0'));
Service endpoints
After a service is created, endpoints can be added. By default, an endpoint is available via its name.
<?php declare(strict_types=1); use Thesis\Nats; use Thesis\Nats\Micro; require __DIR__ . '/vendor/autoload.php'; $nc = new Nats\Client( Nats\Config::default(), ); $srv = $nc->createService(new Micro\ServiceConfig('EchoService', '1.0.0')); $srv ->addEndpoint(new Micro\EndpointConfig('srv.echo'), static function (Micro\Request $request): void { $request->respond(new Micro\Response($request->data)); }); dump($nc->request('srv.echo', new Nats\Message('ping'))->message->payload);
If the subject for the endpoint is more complex (e.g., contains a * or >), the subject can be specified separately from the name.
<?php declare(strict_types=1); use Thesis\Nats; use Thesis\Nats\Micro; require __DIR__ . '/vendor/autoload.php'; $nc = new Nats\Client( Nats\Config::default(), ); $srv = $nc->createService(new Micro\ServiceConfig('EchoService', '1.0.0')); $srv ->addEndpoint(new Micro\EndpointConfig('srv.echo', subject: 'srv.echo.*'), static function (Micro\Request $request): void { $request->respond(new Micro\Response($request->subject)); }); dump($nc->request('srv.echo.x', new Nats\Message('ping'))->message->payload);
Groups
Endpoints can also be aggregated using groups. A group represents a common subject prefix used by all endpoints associated with it.
<?php declare(strict_types=1); use Thesis\Nats; use Thesis\Nats\Micro; require __DIR__ . '/vendor/autoload.php'; $nc = new Nats\Client( Nats\Config::fromURI('tcp://user:Pswd1@nats-1:4222'), ); $nc ->createService(new Micro\ServiceConfig('EchoService', '1.0.0')) ->addGroup(new Micro\GroupConfig('srv.api')) ->addGroup(new Micro\GroupConfig('v1')) ->addEndpoint(new Micro\EndpointConfig('echo'), static function (Micro\Request $request): void { $request->respond(new Micro\Response($request->data)); }); dump($nc->request('srv.api.v1.echo', new Nats\Message('ping'))->message->payload);
License
The MIT License (MIT). Please see License File for more information.