Skip to content

Commit 5897366

Browse files
authored
Merge pull request #120 from php-enqueue/exclisuve-commands
[client] Add ability to define a command as exclusive
2 parents 4e56d9d + f1eb0cd commit 5897366

13 files changed

+480
-3
lines changed

docs/bundle/message_processor.md

+78-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ Here we just show how to register a message processor service to enqueue. Let's
55

66
* [Container tag](#container-tag)
77
* [Topic subscriber](#topic-subscriber)
8+
* [Command subscriber](#command-subscriber)
89

910
# Container tag
1011

@@ -27,8 +28,8 @@ The tag has some additional options:
2728
2829
# Topic subscriber
2930
30-
There is a `TopicSubscriber` interface (like [EventSubscriberInterface](https://github.com/symfony/symfony/blob/master/src/Symfony/Component/EventDispatcher/EventSubscriberInterface.php)).
31-
It allows to keep subscription login and process logic closer to each other.
31+
There is a `TopicSubscriberInterface` interface (like [EventSubscriberInterface](https://github.com/symfony/symfony/blob/master/src/Symfony/Component/EventDispatcher/EventSubscriberInterface.php)).
32+
It is handy to subscribe on event messages. It allows to keep subscription login and process logic closer to each other.
3233

3334
```php
3435
<?php
@@ -67,6 +68,81 @@ class SayHelloProcessor implements PsrProcessor, TopicSubscriberInterface
6768

6869
In the container you can just add the tag `enqueue.client.message_processor` and omit any other options:
6970

71+
```yaml
72+
# src/AppBundle/Resources/services.yml
73+
74+
services:
75+
app.async.say_hello_processor:
76+
class: 'AppBundle\Async\SayHelloProcessor'
77+
tags:
78+
- { name: 'enqueue.client.processor'}
79+
80+
```
81+
82+
# Command subscriber
83+
84+
There is a `CommandSubscriberInterface` interface which allows to register a command handlers.
85+
If you send a message using ProducerV2::sendCommand('aCommandName') method it will come to this processor.
86+
87+
```php
88+
<?php
89+
namespace AppBundle\Async;
90+
91+
use Enqueue\Client\CommandSubscriberInterface;
92+
use Enqueue\Psr\PsrProcessor;
93+
94+
class SayHelloProcessor implements PsrProcessor, CommandSubscriberInterface
95+
{
96+
public static function getSubscribedCommand()
97+
{
98+
return 'aCommandName';
99+
}
100+
}
101+
```
102+
103+
On the command subscriber you can also define additional settings such as queue and processor name:
104+
105+
```php
106+
<?php
107+
use Enqueue\Client\CommandSubscriberInterface;
108+
use Enqueue\Psr\PsrProcessor;
109+
110+
class SayHelloProcessor implements PsrProcessor, CommandSubscriberInterface
111+
{
112+
public static function getSubscribedCommand()
113+
{
114+
return ['queueName' => 'fooQueue', 'processorName' => 'aCommandName'];
115+
}
116+
}
117+
```
118+
119+
There is a possibility to register a command processor which works exclusively on the queue (no other processors bound to it).
120+
In this case you can send messages without setting any message properties at all. Here's an example of such a processor:
121+
122+
In the container you can just add the tag `enqueue.client.message_processor` and omit any other options:
123+
124+
```php
125+
<?php
126+
use Enqueue\Client\CommandSubscriberInterface;
127+
use Enqueue\Psr\PsrProcessor;
128+
129+
class SayHelloProcessor implements PsrProcessor, CommandSubscriberInterface
130+
{
131+
public static function getSubscribedCommand()
132+
{
133+
return [
134+
'processorName' => 'the-exclusive-command-name',
135+
'queueName' => 'the-queue-name',
136+
'queueNameHardcoded' => true,
137+
'exclusive' => true,
138+
];
139+
}
140+
}
141+
```
142+
143+
The same as a topic subscriber you have to tag a processor service (no need to add any options there):
144+
145+
70146
```yaml
71147
# src/AppBundle/Resources/services.yml
72148
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\DependencyInjection\Compiler;
4+
5+
use Enqueue\Client\Config;
6+
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
7+
use Symfony\Component\DependencyInjection\ContainerBuilder;
8+
9+
class BuildExclusiveCommandsExtensionPass implements CompilerPassInterface
10+
{
11+
use ExtractProcessorTagSubscriptionsTrait;
12+
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
public function process(ContainerBuilder $container)
17+
{
18+
$processorTagName = 'enqueue.client.processor';
19+
$extensionId = 'enqueue.client.exclusive_command_extension';
20+
if (false == $container->hasDefinition($extensionId)) {
21+
return;
22+
}
23+
24+
$queueMetaRegistry = $container->getDefinition($extensionId);
25+
26+
$queueNameToProcessorNameMap = [];
27+
foreach ($container->findTaggedServiceIds($processorTagName) as $serviceId => $tagAttributes) {
28+
$subscriptions = $this->extractSubscriptions($container, $serviceId, $tagAttributes);
29+
30+
foreach ($subscriptions as $subscription) {
31+
if (Config::COMMAND_TOPIC != $subscription['topicName']) {
32+
continue;
33+
}
34+
35+
if (false == isset($subscription['exclusive'])) {
36+
continue;
37+
}
38+
39+
if (false == $subscription['queueNameHardcoded']) {
40+
throw new \LogicException('The exclusive command could be used only with queueNameHardcoded attribute set to true.');
41+
}
42+
43+
$queueNameToProcessorNameMap[$subscription['queueName']] = $subscription['processorName'];
44+
}
45+
}
46+
47+
$queueMetaRegistry->replaceArgument(0, $queueNameToProcessorNameMap);
48+
}
49+
}

pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractProcessorTagSubscriptionsTrait.php

+4
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ protected function extractSubscriptions(ContainerBuilder $container, $processorS
4242
'queueName' => null,
4343
'queueNameHardcoded' => false,
4444
'processorName' => null,
45+
'exclusive' => false,
4546
];
4647

4748
$data = [];
@@ -70,6 +71,7 @@ protected function extractSubscriptions(ContainerBuilder $container, $processorS
7071
'queueName' => $resolve($params['queueName']) ?: $defaultQueueName,
7172
'queueNameHardcoded' => $resolve($params['queueNameHardcoded']),
7273
'processorName' => $processorName,
74+
'exclusive' => array_key_exists('exclusive', $params) ? $params['exclusive'] : false,
7375
];
7476
} else {
7577
throw new \LogicException(sprintf(
@@ -123,6 +125,8 @@ protected function extractSubscriptions(ContainerBuilder $container, $processorS
123125
'queueName' => $resolve($tagAttribute['queueName']) ?: $defaultQueueName,
124126
'queueNameHardcoded' => $resolve($tagAttribute['queueNameHardcoded']),
125127
'processorName' => $resolve($tagAttribute['processorName']) ?: $processorServiceId,
128+
'exclusive' => Config::COMMAND_TOPIC == $resolve($tagAttribute['topicName']) &&
129+
array_key_exists('exclusive', $tagAttribute) ? $tagAttribute['exclusive'] : false,
126130
];
127131
}
128132
}

pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php

+1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public function load(array $configs, ContainerBuilder $container)
6565
if (isset($config['client'])) {
6666
$loader->load('client.yml');
6767
$loader->load('extensions/flush_spool_producer_extension.yml');
68+
$loader->load('extensions/exclusive_command_extension.yml');
6869

6970
foreach ($config['transport'] as $name => $transportConfig) {
7071
$this->factories[$name]->createDriver($container, $transportConfig);

pkg/enqueue-bundle/EnqueueBundle.php

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass;
99
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass;
1010
use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass;
11+
use Enqueue\Bundle\DependencyInjection\Compiler\BuildExclusiveCommandsExtensionPass;
1112
use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass;
1213
use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass;
1314
use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass;
@@ -42,6 +43,7 @@ public function build(ContainerBuilder $container)
4243
$container->addCompilerPass(new BuildTopicMetaSubscribersPass());
4344
$container->addCompilerPass(new BuildQueueMetaRegistryPass());
4445
$container->addCompilerPass(new BuildClientExtensionsPass());
46+
$container->addCompilerPass(new BuildExclusiveCommandsExtensionPass());
4547

4648
/** @var EnqueueExtension $extension */
4749
$extension = $container->getExtension('enqueue');
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
services:
2+
enqueue.client.exclusive_command_extension:
3+
class: 'Enqueue\Client\ConsumptionExtension\ExclusiveCommandExtension'
4+
public: false
5+
arguments:
6+
- []
7+
tags:
8+
- { name: 'enqueue.consumption.extension', priority: 100 }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler;
4+
5+
use Enqueue\Bundle\DependencyInjection\Compiler\BuildExclusiveCommandsExtensionPass;
6+
use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\ExclusiveButQueueNameHardCodedCommandSubscriber;
7+
use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\ExclusiveCommandSubscriber;
8+
use Enqueue\Client\Config;
9+
use Enqueue\Client\ConsumptionExtension\ExclusiveCommandExtension;
10+
use Enqueue\Psr\PsrProcessor;
11+
use Enqueue\Test\ClassExtensionTrait;
12+
use PHPUnit\Framework\TestCase;
13+
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
14+
use Symfony\Component\DependencyInjection\ContainerBuilder;
15+
use Symfony\Component\DependencyInjection\Definition;
16+
17+
class BuildExclusiveCommandsExtensionPassTest extends TestCase
18+
{
19+
use ClassExtensionTrait;
20+
21+
public function testShouldImplementCompilerPass()
22+
{
23+
$this->assertClassImplements(CompilerPassInterface::class, BuildExclusiveCommandsExtensionPass::class);
24+
}
25+
26+
public function testCouldBeConstructedWithoutAnyArguments()
27+
{
28+
new BuildExclusiveCommandsExtensionPass();
29+
}
30+
31+
public function testShouldDoNothingIfExclusiveCommandExtensionServiceNotRegistered()
32+
{
33+
$container = new ContainerBuilder();
34+
35+
$pass = new BuildExclusiveCommandsExtensionPass();
36+
$pass->process($container);
37+
}
38+
39+
public function testShouldReplaceFirstArgumentOfExclusiveCommandExtensionServiceConstructorWithExpectedMap()
40+
{
41+
$container = new ContainerBuilder();
42+
$container->setParameter('enqueue.client.default_queue_name', 'default');
43+
$container->register('enqueue.client.exclusive_command_extension', ExclusiveCommandExtension::class)
44+
->addArgument([])
45+
;
46+
47+
$processor = new Definition(ExclusiveCommandSubscriber::class);
48+
$processor->addTag('enqueue.client.processor');
49+
$container->setDefinition('processor-id', $processor);
50+
51+
$pass = new BuildExclusiveCommandsExtensionPass();
52+
53+
$pass->process($container);
54+
55+
$this->assertEquals([
56+
'the-queue-name' => 'the-exclusive-command-name',
57+
], $container->getDefinition('enqueue.client.exclusive_command_extension')->getArgument(0));
58+
}
59+
60+
public function testShouldReplaceFirstArgumentOfExclusiveCommandConfiguredAsTagAttribute()
61+
{
62+
$container = new ContainerBuilder();
63+
$container->setParameter('enqueue.client.default_queue_name', 'default');
64+
$container->register('enqueue.client.exclusive_command_extension', ExclusiveCommandExtension::class)
65+
->addArgument([])
66+
;
67+
68+
$processor = new Definition($this->getMockClass(PsrProcessor::class));
69+
$processor->addTag('enqueue.client.processor', [
70+
'topicName' => Config::COMMAND_TOPIC,
71+
'processorName' => 'the-exclusive-command-name',
72+
'queueName' => 'the-queue-name',
73+
'queueNameHardcoded' => true,
74+
'exclusive' => true,
75+
]);
76+
$container->setDefinition('processor-id', $processor);
77+
78+
$pass = new BuildExclusiveCommandsExtensionPass();
79+
80+
$pass->process($container);
81+
82+
$this->assertEquals([
83+
'the-queue-name' => 'the-exclusive-command-name',
84+
], $container->getDefinition('enqueue.client.exclusive_command_extension')->getArgument(0));
85+
}
86+
87+
public function testShouldThrowIfExclusiveSetTrueButQueueNameIsNotHardcoded()
88+
{
89+
$container = new ContainerBuilder();
90+
$container->setParameter('enqueue.client.default_queue_name', 'default');
91+
$container->register('enqueue.client.exclusive_command_extension', ExclusiveCommandExtension::class)
92+
->addArgument([])
93+
;
94+
95+
$processor = new Definition(ExclusiveButQueueNameHardCodedCommandSubscriber::class);
96+
$processor->addTag('enqueue.client.processor');
97+
$container->setDefinition('processor-id', $processor);
98+
99+
$pass = new BuildExclusiveCommandsExtensionPass();
100+
101+
$this->expectException(\LogicException::class);
102+
$this->expectExceptionMessage('The exclusive command could be used only with queueNameHardcoded attribute set to true.');
103+
$pass->process($container);
104+
}
105+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock;
4+
5+
use Enqueue\Client\CommandSubscriberInterface;
6+
7+
class ExclusiveButQueueNameHardCodedCommandSubscriber implements CommandSubscriberInterface
8+
{
9+
public static function getSubscribedCommand()
10+
{
11+
return [
12+
'processorName' => 'the-exclusive-command-name',
13+
'queueName' => 'the-queue-name',
14+
'queueNameHardCoded' => false,
15+
'exclusive' => true,
16+
];
17+
}
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock;
4+
5+
use Enqueue\Client\CommandSubscriberInterface;
6+
7+
class ExclusiveCommandSubscriber implements CommandSubscriberInterface
8+
{
9+
public static function getSubscribedCommand()
10+
{
11+
return [
12+
'processorName' => 'the-exclusive-command-name',
13+
'queueName' => 'the-queue-name',
14+
'queueNameHardcoded' => true,
15+
'exclusive' => true,
16+
];
17+
}
18+
}

pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php

+6
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass;
88
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass;
99
use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass;
10+
use Enqueue\Bundle\DependencyInjection\Compiler\BuildExclusiveCommandsExtensionPass;
1011
use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass;
1112
use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass;
1213
use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass;
@@ -74,6 +75,11 @@ public function testShouldRegisterExpectedCompilerPasses()
7475
;
7576
$container
7677
->expects($this->at(6))
78+
->method('addCompilerPass')
79+
->with($this->isInstanceOf(BuildExclusiveCommandsExtensionPass::class))
80+
;
81+
$container
82+
->expects($this->at(7))
7783
->method('getExtension')
7884
->willReturn($extensionMock)
7985
;

pkg/enqueue/Client/CommandSubscriberInterface.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ interface CommandSubscriberInterface
1515
* 'processorName' => 'aCommandName',
1616
* 'queueName' => 'a_client_queue_name',
1717
* 'queueNameHardcoded' => true,
18+
* 'exclusive' => true,
1819
* ]
1920
*
20-
* queueName and queueNameHardcoded are optional.
21+
* queueName, exclusive and queueNameHardcoded are optional.
2122
*
2223
* Note: If you set queueNameHardcoded to true then the queueName is used as is and therefor the driver is not used to create a transport queue name.
2324
*

0 commit comments

Comments
 (0)