linxi / pulsar-client
There is no license information available for the latest version (dev-master) of this package.
dev-master
2024-12-17 11:37 UTC
Requires
- php: >=7.0
- ikilobyte/pulsar-client-php: ^1.3
This package is auto-updated.
Last update: 2025-05-17 12:36:16 UTC
README
Contents
- Contents
- About
- Requirements
- Installation
- registering
- [Publish Vendor Files](#Publish Vendor Files)
- config/pulsar.php
- Producer
- Consumer
- Options
- Options
- License
About
This is a Apache Pulsar client library implemented in php Reference PulsarApi.proto And support Swoole coroutine
Features
- Support URL (
pulsar://
、pulsar+ssl://
、http://
、https://
) - Multi topic consumers
- TLS connection
- Automatic reconnection (Only Consumer)
- Message batching
- Message Properties
- Authentication with
jwt
,basic
Requirements
- PHP >=7.0 (Supported PHP8)
- Swoole Extension(If you want to use in swoole)
- Use in the swoole only requires that the
SWOOLE_HOOK_SOCKETS、SWOOLE_HOOK_STREAM_FUNCTION
orSWOOLE_HOOK_ALL
- Use in the swoole only requires that the
Installation
composer require linxi/pulsar-client
registering
registering service providers and facades in the config/app.php
'providers' => [ PulsarProducerProvider::class, PulsarConsumerProvider::class,], 'aliases' => [ 'PulsarProducer' => PulsarProducer::class, 'PulsarConsumer' => PulsarConsumer::class, ]
Publish Vendor Files
This command will generate a pulsar.php configuration file in the config directory. This file contains the configuration settings for the Pulsar client.
php artisan vendor:publish --provider="Linxi\PulsarClient\PulsarProducerProvider"
config/pulsar.php
the default option is default
<?php return [ //config your pulsar connections 'connections' => [ 'default' => [ 'url' => env('PULSAR_SERVICE_URL_DEFAULT', 'pulsar://localhost:6650'), 'token' => env('PULSAR_SERVICE_TOKEN_DEFAULT', 'pulsar://localhost:6650'), 'timeout' => env('PULSAR_SERVICE_TIMEOUT_DEFAULT', 3), ], ], //config your pulsar topics 'topics' => [ 'default' => [ 'name' => env('PULSAR_TOPIC_DEFAULT', 'persistent://public/test/demo_1216'), 'subscriber' => env('PULSAR_SUBSCRIBER_DEFAULT', 'default_consumer'), //subscription_type Exclusive:0 Shared:1 Failover:2 Key_Shared:3 'subscription_type' => env('PULSAR_SUBSCRIPTION_TYPE_DEFAULT', 1), //earliest:1 latest:0 'initial_position' => env('PULSAR_SUBSCRIPTION_INITIAL_POSITION_DEFAULT', 1), 'queue_size' => env('PULSAR_RECEIVE_QUEUE_SIZE', env('QUEUE_SIZE', 100)), //redeliver after a few seconds 'nack_redelivery_delay' => env('PULSAR_SET_NACK_REDELIVERY_DELAY', 5), 'dead_letter_policy' => [ 'max_redeliver_count' => env('PULSAR_MAX_REDELIVER_COUNT', 3), 'dead_letter_topic' => env('PULSAR_DEAD_LETTER_TOPIC', 'persistent://public/test/test_dead') ], ], ], //config your pulsar connetion and topic 'topic_servers' => [ 'default' => [ 'connection' => 'default', 'topic' => 'default' ], ], //this is for consumer command 'tasks' => [ //task_name 'pulsar_test_task' => [ 'topic_server' => 'default', 'task_process_class' => '\App\Pulsar\PulsarTest::class', ] ] ];
PulsarTask
the demo of pulsar task is App\PulsarTask\PulsarTaskDemo
Register the Command Class
use Linxi\PulsarClient\PulsarTask\PulsarConsumerTask; class Kernel extends ConsoleKernel { /** * The Artisan commands provided by your application. * * @var array */ protected $commands = [ PulsarConsumerTask::class, ]; }
Producer
<?php use Linxi\PulsarClient\Facades\PulsarProducerFacade; require_once __DIR__ . '/vendor/autoload.php'; // $msgId = PulsarProducer::setTopicServer("default")->send("this message is " . time()); $msgId = PulsarProducerFacade::send("this message is " . time());
Consumer
<?php use Linxi\PulsarClient\Facades\PulsarConsumerFacade; require_once __DIR__ . '/vendor/autoload.php'; while (true) { $message = PulsarConsumerFacade::setTopicServer('default')->receive(); echo sprintf('Got message 【%s】messageID[%s] topic[%s] nowTime[%s] publishTime[%s] redeliveryCount[%d]', $message->getPayload(), $message->getMessageId(), $message->getTopic(), date('Y-m-d H:i:s'), date('Y-m-d H:i:s', $message->getPublishTime() / 1000), $message->getRedeliveryCount() ) . "\n"; PulsarConsumerFacade::ack($message); }
Configure the Consumer Script
config tasks in config/pulsar.php
'tasks' => [ //task_name 'pulsar_task_demo' => [ 'topic_server' => 'default', 'task_process_class' => App\PulsarTask\PulsarTaskDemo::class, ] ]
Launch the Task
php artisan pulsar:consumer pulsar_task_demo
Options
- Producer
- setTopicServer()
- send()
- sendAsync()
- Consumer
- receive()
- batchReceive()
- ack()
- nack()
- setTopicServer()
MessageNotFound ErrCode (v1.2.1)
MessageNotFound::Ignore
MessageNotFound::CommandParseFail
License
MIT LICENSE