Skip to content

Commit ee66ba2

Browse files
authored
Merge pull request #63 from php-enqueue/consumption-queue-subscriber-interface
[consumption] Add support of QueueSubscriberInterface to transport consume command.
2 parents 8edeb66 + ea100c3 commit ee66ba2

File tree

5 files changed

+127
-28
lines changed

5 files changed

+127
-28
lines changed

docs/bundle/cli_commands.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -138,18 +138,18 @@ Help:
138138
## enqueue:transport:consume
139139

140140
```
141-
./bin/console enqueue:transport:consume --help
141+
./bin/console enqueue:transport:consume --help
142142
Usage:
143-
enqueue:transport:consume [options] [--] <queue> <processor-service>
143+
enqueue:transport:consume [options] [--] <processor-service>
144144
145145
Arguments:
146-
queue Queues to consume from
147146
processor-service A message processor service
148147
149148
Options:
150149
--message-limit=MESSAGE-LIMIT Consume n messages and exit
151150
--time-limit=TIME-LIMIT Consume messages during this time
152151
--memory-limit=MEMORY-LIMIT Consume messages until process reaches this memory limit in MB
152+
--queue[=QUEUE] Queues to consume from (multiple values allowed)
153153
-h, --help Display this help message
154154
-q, --quiet Do not output any message
155155
-V, --version Display this application version

pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public function testTransportConsumeMessagesCommandShouldConsumeMessage(array $e
148148
$tester->execute([
149149
'--message-limit' => 1,
150150
'--time-limit' => '+10sec',
151-
'queue' => 'enqueue.test',
151+
'--queue' => ['enqueue.test'],
152152
'processor-service' => 'test.message.processor',
153153
]);
154154

pkg/enqueue/Symfony/Consumption/ContainerAwareConsumeMessagesCommand.php

+19-7
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
use Enqueue\Consumption\ChainExtension;
66
use Enqueue\Consumption\Extension\LoggerExtension;
77
use Enqueue\Consumption\QueueConsumer;
8+
use Enqueue\Consumption\QueueSubscriberInterface;
89
use Enqueue\Psr\PsrProcessor;
910
use Symfony\Component\Console\Command\Command;
1011
use Symfony\Component\Console\Input\InputArgument;
1112
use Symfony\Component\Console\Input\InputInterface;
13+
use Symfony\Component\Console\Input\InputOption;
1214
use Symfony\Component\Console\Logger\ConsoleLogger;
1315
use Symfony\Component\Console\Output\OutputInterface;
1416
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
@@ -48,8 +50,8 @@ protected function configure()
4850
->setDescription('A worker that consumes message from a broker. '.
4951
'To use this broker you have to explicitly set a queue to consume from '.
5052
'and a message processor service')
51-
->addArgument('queue', InputArgument::REQUIRED, 'Queues to consume from')
5253
->addArgument('processor-service', InputArgument::REQUIRED, 'A message processor service')
54+
->addOption('queue', null, InputOption::VALUE_IS_ARRAY | InputOption::VALUE_OPTIONAL, 'Queues to consume from', [])
5355
;
5456
}
5557

@@ -58,28 +60,38 @@ protected function configure()
5860
*/
5961
protected function execute(InputInterface $input, OutputInterface $output)
6062
{
61-
$queueName = $input->getArgument('queue');
62-
6363
/** @var PsrProcessor $processor */
6464
$processor = $this->container->get($input->getArgument('processor-service'));
65-
if (!$processor instanceof PsrProcessor) {
65+
if (false == $processor instanceof PsrProcessor) {
6666
throw new \LogicException(sprintf(
6767
'Invalid message processor service given. It must be an instance of %s but %s',
6868
PsrProcessor::class,
6969
get_class($processor)
7070
));
7171
}
7272

73+
$queues = $input->getOption('queue');
74+
if (empty($queues) && $processor instanceof QueueSubscriberInterface) {
75+
$queues = $processor::getSubscribedQueues();
76+
}
77+
78+
if (empty($queues)) {
79+
throw new \LogicException(sprintf(
80+
'The queues are not provided. The processor must implement "%s" interface and it must return not empty array of queues or queues set using --queue option.',
81+
QueueSubscriberInterface::class
82+
));
83+
}
84+
7385
$extensions = $this->getLimitsExtensions($input, $output);
7486
array_unshift($extensions, new LoggerExtension(new ConsoleLogger($output)));
7587

7688
$runtimeExtensions = new ChainExtension($extensions);
7789

7890
try {
79-
$queue = $this->consumer->getPsrContext()->createQueue($queueName);
80-
// @todo set additional queue options
91+
foreach($queues as $queue) {
92+
$this->consumer->bind($queue, $processor);
93+
}
8194

82-
$this->consumer->bind($queue, $processor);
8395
$this->consumer->consume($runtimeExtensions);
8496
} finally {
8597
$this->consumer->getPsrContext()->close();

pkg/enqueue/Tests/Symfony/Consumption/ContainerAwareConsumeMessagesCommandTest.php

+85-17
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use Enqueue\Psr\PsrProcessor;
99
use Enqueue\Psr\PsrQueue;
1010
use Enqueue\Symfony\Consumption\ContainerAwareConsumeMessagesCommand;
11+
use Enqueue\Tests\Symfony\Consumption\Mock\QueueSubscriberProcessor;
1112
use Symfony\Component\Console\Tester\CommandTester;
1213
use Symfony\Component\DependencyInjection\Container;
1314
use PHPUnit\Framework\TestCase;
@@ -32,10 +33,11 @@ public function testShouldHaveExpectedOptions()
3233

3334
$options = $command->getDefinition()->getOptions();
3435

35-
$this->assertCount(3, $options);
36+
$this->assertCount(4, $options);
3637
$this->assertArrayHasKey('memory-limit', $options);
3738
$this->assertArrayHasKey('message-limit', $options);
3839
$this->assertArrayHasKey('time-limit', $options);
40+
$this->assertArrayHasKey('queue', $options);
3941
}
4042

4143
public function testShouldHaveExpectedAttributes()
@@ -44,41 +46,63 @@ public function testShouldHaveExpectedAttributes()
4446

4547
$arguments = $command->getDefinition()->getArguments();
4648

47-
$this->assertCount(2, $arguments);
49+
$this->assertCount(1, $arguments);
4850
$this->assertArrayHasKey('processor-service', $arguments);
49-
$this->assertArrayHasKey('queue', $arguments);
5051
}
5152

5253
public function testShouldThrowExceptionIfProcessorInstanceHasWrongClass()
5354
{
54-
$this->setExpectedException(\LogicException::class, 'Invalid message processor service given.'.
55-
' It must be an instance of Enqueue\Psr\PsrProcessor but stdClass');
56-
5755
$container = new Container();
5856
$container->set('processor-service', new \stdClass());
5957

6058
$command = new ContainerAwareConsumeMessagesCommand($this->createQueueConsumerMock());
6159
$command->setContainer($container);
6260

6361
$tester = new CommandTester($command);
62+
63+
$this->expectException(\LogicException::class);
64+
$this->expectExceptionMessage('Invalid message processor service given. It must be an instance of Enqueue\Psr\PsrProcessor but stdClass');
6465
$tester->execute([
65-
'queue' => 'queue-name',
6666
'processor-service' => 'processor-service',
67+
'--queue' => ['queue-name'],
6768
]);
6869
}
6970

70-
public function testShouldExecuteConsumption()
71+
public function testThrowIfNeitherQueueOptionNorProcessorImplementsQueueSubscriberInterface()
7172
{
7273
$processor = $this->createProcessor();
7374

74-
$queue = $this->createQueueMock();
75+
$consumer = $this->createQueueConsumerMock();
76+
$consumer
77+
->expects($this->never())
78+
->method('bind')
79+
;
80+
$consumer
81+
->expects($this->never())
82+
->method('consume')
83+
;
84+
85+
$container = new Container();
86+
$container->set('processor-service', $processor);
87+
88+
$command = new ContainerAwareConsumeMessagesCommand($consumer);
89+
$command->setContainer($container);
90+
91+
$tester = new CommandTester($command);
92+
93+
94+
$this->expectException(\LogicException::class);
95+
$this->expectExceptionMessage('The queues are not provided. The processor must implement "Enqueue\Consumption\QueueSubscriberInterface" interface and it must return not empty array of queues or queues set using --queue option.');
96+
$tester->execute([
97+
'processor-service' => 'processor-service',
98+
]);
99+
}
100+
101+
public function testShouldExecuteConsumptionWithExplisitlySetQueueViaQueueOption()
102+
{
103+
$processor = $this->createProcessor();
75104

76105
$context = $this->createContextMock();
77-
$context
78-
->expects($this->once())
79-
->method('createQueue')
80-
->willReturn($queue)
81-
;
82106
$context
83107
->expects($this->once())
84108
->method('close')
@@ -88,15 +112,15 @@ public function testShouldExecuteConsumption()
88112
$consumer
89113
->expects($this->once())
90114
->method('bind')
91-
->with($this->identicalTo($queue), $this->identicalTo($processor))
115+
->with('queue-name', $this->identicalTo($processor))
92116
;
93117
$consumer
94118
->expects($this->once())
95119
->method('consume')
96120
->with($this->isInstanceOf(ChainExtension::class))
97121
;
98122
$consumer
99-
->expects($this->exactly(2))
123+
->expects($this->exactly(1))
100124
->method('getPsrContext')
101125
->will($this->returnValue($context))
102126
;
@@ -109,8 +133,52 @@ public function testShouldExecuteConsumption()
109133

110134
$tester = new CommandTester($command);
111135
$tester->execute([
112-
'queue' => 'queue-name',
113136
'processor-service' => 'processor-service',
137+
'--queue' => ['queue-name'],
138+
]);
139+
}
140+
141+
public function testShouldExecuteConsumptionWhenProcessorImplementsQueueSubscriberInterface()
142+
{
143+
$processor = new QueueSubscriberProcessor();
144+
145+
$context = $this->createContextMock();
146+
$context
147+
->expects($this->once())
148+
->method('close')
149+
;
150+
151+
$consumer = $this->createQueueConsumerMock();
152+
$consumer
153+
->expects($this->at(0))
154+
->method('bind')
155+
->with('fooSubscribedQueues', $this->identicalTo($processor))
156+
;
157+
$consumer
158+
->expects($this->at(1))
159+
->method('bind')
160+
->with('barSubscribedQueues', $this->identicalTo($processor))
161+
;
162+
$consumer
163+
->expects($this->at(2))
164+
->method('consume')
165+
->with($this->isInstanceOf(ChainExtension::class))
166+
;
167+
$consumer
168+
->expects($this->at(3))
169+
->method('getPsrContext')
170+
->will($this->returnValue($context))
171+
;
172+
173+
$container = new Container();
174+
$container->set('processor-service', $processor);
175+
176+
$command = new ContainerAwareConsumeMessagesCommand($consumer);
177+
$command->setContainer($container);
178+
179+
$tester = new CommandTester($command);
180+
$tester->execute([
181+
'processor-service' => 'processor-service'
114182
]);
115183
}
116184

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
namespace Enqueue\Tests\Symfony\Consumption\Mock;
3+
4+
use Enqueue\Consumption\QueueSubscriberInterface;
5+
use Enqueue\Psr\PsrContext;
6+
use Enqueue\Psr\PsrMessage;
7+
use Enqueue\Psr\PsrProcessor;
8+
9+
class QueueSubscriberProcessor implements PsrProcessor, QueueSubscriberInterface
10+
{
11+
public function process(PsrMessage $message, PsrContext $context)
12+
{
13+
}
14+
15+
public static function getSubscribedQueues()
16+
{
17+
return ['fooSubscribedQueues', 'barSubscribedQueues'];
18+
}
19+
}

0 commit comments

Comments
 (0)