Skip to content

Commit 1739ea5

Browse files
authored
Merge pull request #641 from php-enqueue/add-bc-topic-command-subsribers-compatibility
[bundle] Add BC for topic\command subscribers.
2 parents 815083c + 53962b9 commit 1739ea5

File tree

5 files changed

+171
-3
lines changed

5 files changed

+171
-3
lines changed

pkg/enqueue/Client/TopicSubscriberInterface.php

+15-3
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,22 @@ interface TopicSubscriberInterface
1515
*
1616
* or
1717
*
18-
* ['aTopicName' => [
19-
* 'processor' => 'processor',
18+
* [
19+
* [
20+
* 'topic' => 'aTopicName',
21+
* 'processor' => 'fooProcessor',
2022
* 'queue' => 'a_client_queue_name',
21-
* ]]
23+
*
24+
* 'aCustomOption' => 'aVal',
25+
* ],
26+
* [
27+
* 'topic' => 'anotherTopicName',
28+
* 'processor' => 'barProcessor',
29+
* 'queue' => 'a_client_queue_name',
30+
*
31+
* 'aCustomOption' => 'aVal',
32+
* ],
33+
* ]
2234
*
2335
* Note: If you set prefix_queue to true then the queue is used as is and therefor the driver is not used to prepare a transport queue name.
2436
* It is possible to pass other options, they could be accessible on a route instance through options.

pkg/enqueue/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPass.php

+35
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,41 @@ public function process(ContainerBuilder $container): void
6666
throw new \LogicException('Command subscriber configuration is invalid. Should be an array or string.');
6767
}
6868

69+
// 0.8 command subscriber
70+
if (isset($commands['processorName'])) {
71+
@trigger_error('The command subscriber 0.8 syntax is deprecated since Enqueue 0.9.', E_USER_DEPRECATED);
72+
73+
$source = $commands['processorName'];
74+
$processor = $params['processorName'] ?? $serviceId;
75+
76+
$options = $commands;
77+
unset(
78+
$options['processorName'],
79+
$options['queueName'],
80+
$options['queueNameHardcoded'],
81+
$options['exclusive'],
82+
$options['topic'],
83+
$options['source'],
84+
$options['source_type'],
85+
$options['processor'],
86+
$options['options']
87+
);
88+
89+
$options['processor_service_id'] = $serviceId;
90+
91+
if (isset($commands['queueName'])) {
92+
$options['queue'] = $commands['queueName'];
93+
}
94+
95+
if (isset($commands['queueNameHardcoded']) && $commands['queueNameHardcoded']) {
96+
$options['prefix_queue'] = false;
97+
}
98+
99+
$routeCollection->add(new Route($source, Route::COMMAND, $processor, $options));
100+
101+
continue;
102+
}
103+
69104
if (isset($commands['command'])) {
70105
$commands = [$commands];
71106
}

pkg/enqueue/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPass.php

+31
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,37 @@ public function process(ContainerBuilder $container): void
6969
foreach ($topics as $key => $params) {
7070
if (is_string($params)) {
7171
$routeCollection->add(new Route($params, Route::TOPIC, $serviceId, ['processor_service_id' => $serviceId]));
72+
73+
// 0.8 topic subscriber
74+
} elseif (is_array($params) && is_string($key)) {
75+
@trigger_error('The topic subscriber 0.8 syntax is deprecated since Enqueue 0.9.', E_USER_DEPRECATED);
76+
77+
$source = $key;
78+
$processor = $params['processorName'] ?? $serviceId;
79+
80+
$options = $params;
81+
unset(
82+
$options['processorName'],
83+
$options['queueName'],
84+
$options['queueNameHardcoded'],
85+
$options['topic'],
86+
$options['source'],
87+
$options['source_type'],
88+
$options['processor'],
89+
$options['options']
90+
);
91+
92+
$options['processor_service_id'] = $serviceId;
93+
94+
if (isset($params['queueName'])) {
95+
$options['queue'] = $params['queueName'];
96+
}
97+
98+
if (isset($params['queueNameHardcoded']) && $params['queueNameHardcoded']) {
99+
$options['prefix_queue'] = false;
100+
}
101+
102+
$routeCollection->add(new Route($source, Route::TOPIC, $processor, $options));
72103
} elseif (is_array($params)) {
73104
$source = $params['topic'] ?? null;
74105
$processor = $params['processor'] ?? $serviceId;

pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildCommandSubscriberRoutesPassTest.php

+43
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,49 @@ public function testShouldMergeExtractedRoutesWithAlreadySetInCollection()
398398
);
399399
}
400400

401+
public function testShouldRegister08CommandProcessor()
402+
{
403+
$routeCollection = new Definition(RouteCollection::class);
404+
$routeCollection->addArgument([]);
405+
406+
$processor = $this->createCommandSubscriberProcessor([
407+
'processorName' => 'fooCommand',
408+
'queueName' => 'a_client_queue_name',
409+
'queueNameHardcoded' => true,
410+
'exclusive' => true,
411+
'anOption' => 'aFooVal',
412+
]);
413+
414+
$container = new ContainerBuilder();
415+
$container->setParameter('enqueue.clients', ['default']);
416+
$container->setParameter('enqueue.default_client', 'default');
417+
$container->setDefinition('enqueue.client.default.route_collection', $routeCollection);
418+
$container->register('aFooProcessor', get_class($processor))
419+
->addTag('enqueue.command_subscriber')
420+
;
421+
422+
$pass = new BuildCommandSubscriberRoutesPass();
423+
$pass->process($container);
424+
425+
$this->assertInternalType('array', $routeCollection->getArgument(0));
426+
$this->assertCount(1, $routeCollection->getArgument(0));
427+
428+
$this->assertEquals(
429+
[
430+
[
431+
'source' => 'fooCommand',
432+
'source_type' => 'enqueue.client.command_route',
433+
'processor' => 'aFooProcessor',
434+
'processor_service_id' => 'aFooProcessor',
435+
'anOption' => 'aFooVal',
436+
'queue' => 'a_client_queue_name',
437+
'prefix_queue' => false,
438+
],
439+
],
440+
$routeCollection->getArgument(0)
441+
);
442+
}
443+
401444
private function createCommandSubscriberProcessor($commandSubscriberReturns = ['aCommand'])
402445
{
403446
$processor = new class() implements Processor, CommandSubscriberInterface {

pkg/enqueue/Tests/Symfony/Client/DependencyInjection/BuildTopicSubscriberRoutesPassTest.php

+47
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,53 @@ public function testShouldMergeExtractedRoutesWithAlreadySetInCollection()
358358
);
359359
}
360360

361+
public function testShouldRegister08TopicSubscriber()
362+
{
363+
$routeCollection = new Definition(RouteCollection::class);
364+
$routeCollection->addArgument([]);
365+
366+
$processor = $this->createTopicSubscriberProcessor([
367+
'fooTopic' => ['processorName' => 'aCustomFooProcessorName', 'queueName' => 'fooQueue', 'queueNameHardcoded' => true, 'anOption' => 'aFooVal'],
368+
'barTopic' => ['processorName' => 'aCustomBarProcessorName', 'anOption' => 'aBarVal'],
369+
]);
370+
371+
$container = new ContainerBuilder();
372+
$container->setParameter('enqueue.clients', ['default']);
373+
$container->setParameter('enqueue.default_client', 'default');
374+
$container->setDefinition('enqueue.client.default.route_collection', $routeCollection);
375+
$container->register('aFooProcessor', get_class($processor))
376+
->addTag('enqueue.topic_subscriber')
377+
;
378+
379+
$pass = new BuildTopicSubscriberRoutesPass();
380+
$pass->process($container);
381+
382+
$this->assertInternalType('array', $routeCollection->getArgument(0));
383+
$this->assertCount(2, $routeCollection->getArgument(0));
384+
385+
$this->assertEquals(
386+
[
387+
[
388+
'source' => 'fooTopic',
389+
'source_type' => 'enqueue.client.topic_route',
390+
'processor' => 'aCustomFooProcessorName',
391+
'processor_service_id' => 'aFooProcessor',
392+
'anOption' => 'aFooVal',
393+
'queue' => 'fooQueue',
394+
'prefix_queue' => false,
395+
],
396+
[
397+
'source' => 'barTopic',
398+
'source_type' => 'enqueue.client.topic_route',
399+
'processor' => 'aCustomBarProcessorName',
400+
'processor_service_id' => 'aFooProcessor',
401+
'anOption' => 'aBarVal',
402+
],
403+
],
404+
$routeCollection->getArgument(0)
405+
);
406+
}
407+
361408
private function createTopicSubscriberProcessor($topicSubscriberReturns = ['aTopic'])
362409
{
363410
$processor = new class() implements Processor, TopicSubscriberInterface {

0 commit comments

Comments
 (0)