brandembassy / queue-management
Installs: 164 931
Dependents: 0
Suggesters: 0
Security: 0
Stars: 2
Watchers: 12
Forks: 0
Open Issues: 0
Requires
- php: >=8.1
- ext-json: *
- aws/aws-sdk-php: ^3.209
- brandembassy/datetime: ^3.0
- doctrine/collections: ^1.8.0 || ^2.0
- nette/utils: ^3.0
- predis/predis: ^1.1 || ^2.1.2
- psr/log: ^1.1
- ramsey/uuid: ^4.2
- symfony/event-dispatcher-contracts: ^3.5
- tracy/tracy: ^2.9
Requires (Dev)
- brandembassy/coding-standard: ^13.0
- brandembassy/mockery-tools: ^4.1.1
- dealerdirect/phpcodesniffer-composer-installer: ^0.7.2
- mockery/mockery: ^1.5.1
- phpunit/phpunit: ^10.5
- roave/security-advisories: dev-latest
- dev-master
- 8.0
- 7.5
- 7.4
- 7.3
- 7.2
- 7.1
- 7.0
- 6.8
- 6.7
- 6.6
- 6.5
- 6.4
- 6.3
- 6.2
- 6.1
- 6.0
- 5.x-dev
- 5.7.5
- 5.7.4
- 5.7.3
- 5.7.2
- 5.7.1
- 5.7
- 5.6.1
- 5.6
- 5.5
- 5.4
- 5.3
- 5.2.x-dev
- 5.2.2
- 5.2.1
- 5.2
- 5.1
- 5.0
- 4.7
- 4.6
- 4.5
- 4.4.1
- 4.4
- 4.3.1
- 4.3
- 4.2
- 4.1
- 4.0
- 3.1.2
- 3.1.1
- 3.1
- 3.0
- 2.x-dev
- 2.2
- 2.1.1
- 2.1
- 2.0
- 2.0-beta
- 1.8
- 1.7.3
- 1.7.2
- 1.7.1
- 1.7
- 1.6.1
- 1.6
- 1.5
- 1.4.2
- 1.4.1
- 1.4
- 1.3.3
- 1.3.2
- 1.3.1
- 1.3
- 1.2
- 1.1
- 1.0
- 0.2.1
- 0.2
- 0.1
- dev-allow-doctrine-collections-2.0
- dev-DE-44637-job-get-parameters
- dev-DE-47403-update-predis
- dev-DE-40274-allow-php-8-1
- dev-db-reconnect
- dev-deduplication
- dev-sqs2-update-aws-sdk
- dev-sqs
- dev-sqs-support-poc
- dev-reconnect-when-publish-fail
This package is auto-updated.
Last update: 2024-12-20 09:47:32 UTC
README
Usage
1. Create Job class
<?php declare(strict_types = 1); use BE\QueueManagement\Jobs\SimpleJob; class ExampleJob extends SimpleJob { public const JOB_NAME = 'exampleJob'; public const PARAMETER_FOO = 'foo'; public function getFoo(): string { return $this->getParameter(self::PARAMETER_FOO); } }
2. Create job processor
<?php declare(strict_types = 1); use BE\QueueManagement\Jobs\Execution\JobProcessorInterface; use BE\QueueManagement\Jobs\JobInterface; use YourApp\Jobs\ExampleJob; class ExampleJobProcessor implements JobProcessorInterface { public function process(JobInterface $job): void { assert($job instanceof ExampleJob); echo $job->getFoo(); } }
3. Create job definition
For example using neon DI:
parameters: queue: jobs: defaultJobLoader: BE\QueueManagement\Jobs\Loading\SimpleJobLoader() jobDefinitions: exampleJob: class: YourApp\Jobs\ExampleJob queueName: example_queue maxAttempts: 20 # null means no limit jobLoader: YourApp\JobLoaders\ExampleJobLoader() # if not set default job loader is used jobDelayRule: BE\QueueManagement\Jobs\FailResolving\DelayRules\ConstantDelayRule() jobProcessorService: queue.processors.exampleJobProcessor services: queue.processors.exampleJobProcessor: YourApp\JobProcessors\ExampleJobProcessor # JobDefinitionsContainer queue.jobDefinitionsContainer: BE\QueueManagement\Jobs\JobDefinitions\JobDefinitionsContainer(%queue.jobs.jobDefinitions%)
4. Push job into queue
<?php declare(strict_types = 1); use BE\QueueManagement\Queue\QueueManagerInterface; use YourApp\Jobs\ExampleJob; use BE\QueueManagement\Jobs\JobInterface; use BE\QueueManagement\Jobs\JobDefinitions\JobDefinitionsContainer; class JobPusher { /** * @var QueueManagerInterface */ protected $queueManager; /** * @var JobDefinitionsContainer */ private $jobDefinitionsContainer; public function __construct(QueueManagerInterface $queueManager, JobDefinitionsContainer $jobDefinitionsContainer) { $this->queueManager = $queueManager; $this->jobDefinitionsContainer = $jobDefinitionsContainer; } protected function push(string $jobUuid): void { $jobDefinition = $this->jobDefinitionsContainer->get(ExampleJob::JOB_NAME); $exampleJob = new ExampleJob( $jobUuid, new DateTimeImmutable(), JobInterface::INIT_ATTEMPTS, $jobDefinition, new ArrayCollection([ExampleJob::PARAMETER_FOO => 'bar']) ); $this->queueManager->push($exampleJob); } }
5. Run worker
<?php declare(strict_types = 1); namespace BE\AdapterSdk\Console\Commands\Queue; use BE\QueueManagement\Queue\WorkerInterface; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Command\Command; class WorkerStartCommand extends Command { /** * @var WorkerInterface */ private $worker; public function __construct(WorkerInterface $worker) { parent::__construct(); $this->worker = $worker; } protected function configure(): void { $this->setName('queue:worker:start'); $this->setDescription('Start queue worker'); } protected function execute(InputInterface $input, OutputInterface $output): int { $queueName = $input->getArgument(self::QUEUE_NAME); assert(is_string($queueName)); $this->sqsWorker->start($queueName); $output->writeln('Worker started'); return 0; } }