Skip to content

Commit 98dd6b5

Browse files
authored
Merge pull request #554 from php-enqueue/queue-consumer-extensions
[consumption] Rework QueueConsumer extension points.
2 parents 389b5ce + 7fc8cca commit 98dd6b5

File tree

79 files changed

+2838
-1984
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+2838
-1984
lines changed

docs/bundle/config_reference.md

-3
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@ enqueue:
3131
redelivered_delay_time: 0
3232
consumption:
3333

34-
# the time in milliseconds queue consumer waits if no message received
35-
idle_timeout: 0
36-
3734
# the time in milliseconds queue consumer waits for a message (100 ms by default)
3835
receive_timeout: 100
3936
job: false

docs/bundle/consumption_extension.md

+4-10
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,14 @@ Let's first create an extension itself:
88
// src/AppBundle/Enqueue;
99
namespace AppBundle\Enqueue;
1010

11-
use Enqueue\Consumption\ExtensionInterface;
12-
use Enqueue\Consumption\EmptyExtensionTrait;
13-
use Enqueue\Consumption\Context;
11+
use Enqueue\Consumption\PostMessageReceivedExtensionInterface;
12+
use Enqueue\Consumption\Context\PostMessageReceived;
1413

15-
class CountProcessedMessagesExtension implements ExtensionInterface
14+
class CountProcessedMessagesExtension implements PostMessageReceivedExtensionInterface
1615
{
17-
use EmptyExtensionTrait;
18-
1916
private $processedMessages = 0;
2017

21-
/**
22-
* {@inheritdoc}
23-
*/
24-
public function onPostReceived(Context $context)
18+
public function onPostMessageReceived(PostMessageReceived $context): void
2519
{
2620
$this->processedMessages += 1;
2721
}

pkg/enqueue-bundle/Consumption/Extension/DoctrineClearIdentityMapExtension.php

+4-10
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,12 @@
22

33
namespace Enqueue\Bundle\Consumption\Extension;
44

5-
use Enqueue\Consumption\Context;
6-
use Enqueue\Consumption\EmptyExtensionTrait;
7-
use Enqueue\Consumption\ExtensionInterface;
5+
use Enqueue\Consumption\Context\MessageReceived;
6+
use Enqueue\Consumption\MessageReceivedExtensionInterface;
87
use Symfony\Bridge\Doctrine\RegistryInterface;
98

10-
class DoctrineClearIdentityMapExtension implements ExtensionInterface
9+
class DoctrineClearIdentityMapExtension implements MessageReceivedExtensionInterface
1110
{
12-
use EmptyExtensionTrait;
13-
1411
/**
1512
* @var RegistryInterface
1613
*/
@@ -24,10 +21,7 @@ public function __construct(RegistryInterface $registry)
2421
$this->registry = $registry;
2522
}
2623

27-
/**
28-
* {@inheritdoc}
29-
*/
30-
public function onPreReceived(Context $context)
24+
public function onMessageReceived(MessageReceived $context): void
3125
{
3226
foreach ($this->registry->getManagers() as $name => $manager) {
3327
$context->getLogger()->debug(sprintf(

pkg/enqueue-bundle/Consumption/Extension/DoctrinePingConnectionExtension.php

+4-10
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,12 @@
33
namespace Enqueue\Bundle\Consumption\Extension;
44

55
use Doctrine\DBAL\Connection;
6-
use Enqueue\Consumption\Context;
7-
use Enqueue\Consumption\EmptyExtensionTrait;
8-
use Enqueue\Consumption\ExtensionInterface;
6+
use Enqueue\Consumption\Context\MessageReceived;
7+
use Enqueue\Consumption\MessageReceivedExtensionInterface;
98
use Symfony\Bridge\Doctrine\RegistryInterface;
109

11-
class DoctrinePingConnectionExtension implements ExtensionInterface
10+
class DoctrinePingConnectionExtension implements MessageReceivedExtensionInterface
1211
{
13-
use EmptyExtensionTrait;
14-
1512
/**
1613
* @var RegistryInterface
1714
*/
@@ -25,10 +22,7 @@ public function __construct(RegistryInterface $registry)
2522
$this->registry = $registry;
2623
}
2724

28-
/**
29-
* {@inheritdoc}
30-
*/
31-
public function onPreReceived(Context $context)
25+
public function onMessageReceived(MessageReceived $context): void
3226
{
3327
/** @var Connection $connection */
3428
foreach ($this->registry->getConnections() as $connection) {

pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php

+1-2
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,7 @@ public function load(array $configs, ContainerBuilder $container): void
9191
}
9292

9393
$container->getDefinition('enqueue.client.default.queue_consumer')
94-
->replaceArgument(2, $config['consumption']['idle_time'])
95-
->replaceArgument(3, $config['consumption']['receive_timeout'])
94+
->replaceArgument(4, $config['consumption']['receive_timeout'])
9695
;
9796
}
9897

pkg/enqueue-bundle/Resources/config/client.yml

+2
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ services:
6868
arguments:
6969
- '@enqueue.client.default.context'
7070
- '@enqueue.client.default.consumption_extensions'
71+
- []
72+
- null
7173
- ~
7274
- ~
7375

pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrineClearIdentityMapExtensionTest.php

+12-9
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44

55
use Doctrine\Common\Persistence\ObjectManager;
66
use Enqueue\Bundle\Consumption\Extension\DoctrineClearIdentityMapExtension;
7-
use Enqueue\Consumption\Context;
7+
use Enqueue\Consumption\Context\MessageReceived;
88
use Interop\Queue\Consumer;
99
use Interop\Queue\Context as InteropContext;
10+
use Interop\Queue\Message;
1011
use Interop\Queue\Processor;
1112
use PHPUnit\Framework\TestCase;
1213
use Psr\Log\LoggerInterface;
@@ -42,17 +43,19 @@ public function testShouldClearIdentityMap()
4243
;
4344

4445
$extension = new DoctrineClearIdentityMapExtension($registry);
45-
$extension->onPreReceived($context);
46+
$extension->onMessageReceived($context);
4647
}
4748

48-
protected function createContext(): Context
49+
protected function createContext(): MessageReceived
4950
{
50-
$context = new Context($this->createMock(InteropContext::class));
51-
$context->setLogger($this->createMock(LoggerInterface::class));
52-
$context->setConsumer($this->createMock(Consumer::class));
53-
$context->setProcessor($this->createMock(Processor::class));
54-
55-
return $context;
51+
return new MessageReceived(
52+
$this->createMock(InteropContext::class),
53+
$this->createMock(Consumer::class),
54+
$this->createMock(Message::class),
55+
$this->createMock(Processor::class),
56+
1,
57+
$this->createMock(LoggerInterface::class)
58+
);
5659
}
5760

5861
/**

pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrinePingConnectionExtensionTest.php

+14-11
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44

55
use Doctrine\DBAL\Connection;
66
use Enqueue\Bundle\Consumption\Extension\DoctrinePingConnectionExtension;
7-
use Enqueue\Consumption\Context;
7+
use Enqueue\Consumption\Context\MessageReceived;
88
use Interop\Queue\Consumer;
99
use Interop\Queue\Context as InteropContext;
10+
use Interop\Queue\Message;
1011
use Interop\Queue\Processor;
1112
use PHPUnit\Framework\TestCase;
1213
use Psr\Log\LoggerInterface;
@@ -55,7 +56,7 @@ public function testShouldNotReconnectIfConnectionIsOK()
5556
;
5657

5758
$extension = new DoctrinePingConnectionExtension($registry);
58-
$extension->onPreReceived($context);
59+
$extension->onMessageReceived($context);
5960
}
6061

6162
public function testShouldDoesReconnectIfConnectionFailed()
@@ -100,7 +101,7 @@ public function testShouldDoesReconnectIfConnectionFailed()
100101
;
101102

102103
$extension = new DoctrinePingConnectionExtension($registry);
103-
$extension->onPreReceived($context);
104+
$extension->onMessageReceived($context);
104105
}
105106

106107
public function testShouldSkipIfConnectionWasNotOpened()
@@ -143,17 +144,19 @@ public function testShouldSkipIfConnectionWasNotOpened()
143144
;
144145

145146
$extension = new DoctrinePingConnectionExtension($registry);
146-
$extension->onPreReceived($context);
147+
$extension->onMessageReceived($context);
147148
}
148149

149-
protected function createContext(): Context
150+
protected function createContext(): MessageReceived
150151
{
151-
$context = new Context($this->createMock(InteropContext::class));
152-
$context->setLogger($this->createMock(LoggerInterface::class));
153-
$context->setConsumer($this->createMock(Consumer::class));
154-
$context->setProcessor($this->createMock(Processor::class));
155-
156-
return $context;
152+
return new MessageReceived(
153+
$this->createMock(InteropContext::class),
154+
$this->createMock(Consumer::class),
155+
$this->createMock(Message::class),
156+
$this->createMock(Processor::class),
157+
1,
158+
$this->createMock(LoggerInterface::class)
159+
);
157160
}
158161

159162
/**

pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php

-5
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ public function testShouldUseDefaultConfigurationIfNothingIsConfiguredAtAll()
3939
$this->assertEquals([
4040
'transport' => ['dsn' => 'null:'],
4141
'consumption' => [
42-
'idle_time' => 0,
4342
'receive_timeout' => 10000,
4443
],
4544
'job' => false,
@@ -66,7 +65,6 @@ public function testShouldUseDefaultTransportIfIfTransportIsConfiguredAtAll()
6665
$this->assertEquals([
6766
'transport' => ['dsn' => 'null:'],
6867
'consumption' => [
69-
'idle_time' => 0,
7068
'receive_timeout' => 10000,
7169
],
7270
'job' => false,
@@ -383,7 +381,6 @@ public function testShouldSetDefaultConfigurationForConsumption()
383381

384382
$this->assertArraySubset([
385383
'consumption' => [
386-
'idle_time' => 0,
387384
'receive_timeout' => 10000,
388385
],
389386
], $config);
@@ -397,14 +394,12 @@ public function testShouldAllowConfigureConsumption()
397394
$config = $processor->processConfiguration($configuration, [[
398395
'transport' => [],
399396
'consumption' => [
400-
'idle_time' => 123,
401397
'receive_timeout' => 456,
402398
],
403399
]]);
404400

405401
$this->assertArraySubset([
406402
'consumption' => [
407-
'idle_time' => 123,
408403
'receive_timeout' => 456,
409404
],
410405
], $config);

pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php

+2-6
Original file line numberDiff line numberDiff line change
@@ -430,21 +430,17 @@ public function testShouldConfigureQueueConsumer()
430430
'transport' => [
431431
],
432432
'consumption' => [
433-
'idle_time' => 123,
434433
'receive_timeout' => 456,
435434
],
436435
]], $container);
437436

438437
$def = $container->getDefinition('enqueue.transport.default.queue_consumer');
439-
$this->assertSame('%enqueue.transport.default.idle_time%', $def->getArgument(2));
440-
$this->assertSame('%enqueue.transport.default.receive_timeout%', $def->getArgument(3));
438+
$this->assertSame('%enqueue.transport.default.receive_timeout%', $def->getArgument(4));
441439

442-
$this->assertSame(123, $container->getParameter('enqueue.transport.default.idle_time'));
443440
$this->assertSame(456, $container->getParameter('enqueue.transport.default.receive_timeout'));
444441

445442
$def = $container->getDefinition('enqueue.client.default.queue_consumer');
446-
$this->assertSame(123, $def->getArgument(2));
447-
$this->assertSame(456, $def->getArgument(3));
443+
$this->assertSame(456, $def->getArgument(4));
448444
}
449445

450446
public function testShouldLoadProcessAutoconfigureChildDefinition()

pkg/enqueue/Client/ConsumptionExtension/DelayRedeliveredMessageExtension.php

+5-11
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,12 @@
33
namespace Enqueue\Client\ConsumptionExtension;
44

55
use Enqueue\Client\DriverInterface;
6-
use Enqueue\Consumption\Context;
7-
use Enqueue\Consumption\EmptyExtensionTrait;
8-
use Enqueue\Consumption\ExtensionInterface;
6+
use Enqueue\Consumption\Context\MessageReceived;
7+
use Enqueue\Consumption\MessageReceivedExtensionInterface;
98
use Enqueue\Consumption\Result;
109

11-
class DelayRedeliveredMessageExtension implements ExtensionInterface
10+
class DelayRedeliveredMessageExtension implements MessageReceivedExtensionInterface
1211
{
13-
use EmptyExtensionTrait;
14-
1512
const PROPERTY_REDELIVER_COUNT = 'enqueue.redelivery_count';
1613

1714
/**
@@ -36,12 +33,9 @@ public function __construct(DriverInterface $driver, $delay)
3633
$this->delay = $delay;
3734
}
3835

39-
/**
40-
* {@inheritdoc}
41-
*/
42-
public function onPreReceived(Context $context)
36+
public function onMessageReceived(MessageReceived $context): void
4337
{
44-
$message = $context->getInteropMessage();
38+
$message = $context->getMessage();
4539
if (false == $message->isRedelivered()) {
4640
return;
4741
}

pkg/enqueue/Client/ConsumptionExtension/ExclusiveCommandExtension.php

+6-10
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,11 @@
55
use Enqueue\Client\Config;
66
use Enqueue\Client\DriverInterface;
77
use Enqueue\Client\Route;
8-
use Enqueue\Consumption\Context;
9-
use Enqueue\Consumption\EmptyExtensionTrait as ConsumptionEmptyExtensionTrait;
10-
use Enqueue\Consumption\ExtensionInterface as ConsumptionExtensionInterface;
8+
use Enqueue\Consumption\Context\MessageReceived;
9+
use Enqueue\Consumption\MessageReceivedExtensionInterface;
1110

12-
final class ExclusiveCommandExtension implements ConsumptionExtensionInterface
11+
final class ExclusiveCommandExtension implements MessageReceivedExtensionInterface
1312
{
14-
use ConsumptionEmptyExtensionTrait;
15-
1613
/**
1714
* @var DriverInterface
1815
*/
@@ -28,11 +25,9 @@ public function __construct(DriverInterface $driver)
2825
$this->driver = $driver;
2926
}
3027

31-
public function onPreReceived(Context $context)
28+
public function onMessageReceived(MessageReceived $context): void
3229
{
33-
$message = $context->getInteropMessage();
34-
$queue = $context->getInteropQueue();
35-
30+
$message = $context->getMessage();
3631
if ($message->getProperty(Config::TOPIC)) {
3732
return;
3833
}
@@ -47,6 +42,7 @@ public function onPreReceived(Context $context)
4742
$this->queueToRouteMap = $this->buildMap();
4843
}
4944

45+
$queue = $context->getConsumer()->getQueue();
5046
if (array_key_exists($queue->getQueueName(), $this->queueToRouteMap)) {
5147
$context->getLogger()->debug('[ExclusiveCommandExtension] This is a exclusive command queue and client\'s properties are not set. Setting them');
5248

pkg/enqueue/Client/ConsumptionExtension/FlushSpoolProducerExtension.php

+7-11
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,13 @@
33
namespace Enqueue\Client\ConsumptionExtension;
44

55
use Enqueue\Client\SpoolProducer;
6-
use Enqueue\Consumption\Context;
7-
use Enqueue\Consumption\EmptyExtensionTrait;
8-
use Enqueue\Consumption\ExtensionInterface;
6+
use Enqueue\Consumption\Context\End;
7+
use Enqueue\Consumption\Context\PostMessageReceived;
8+
use Enqueue\Consumption\EndExtensionInterface;
9+
use Enqueue\Consumption\PostMessageReceivedExtensionInterface;
910

10-
class FlushSpoolProducerExtension implements ExtensionInterface
11+
class FlushSpoolProducerExtension implements PostMessageReceivedExtensionInterface, EndExtensionInterface
1112
{
12-
use EmptyExtensionTrait;
13-
1413
/**
1514
* @var SpoolProducer
1615
*/
@@ -24,15 +23,12 @@ public function __construct(SpoolProducer $producer)
2423
$this->producer = $producer;
2524
}
2625

27-
/**
28-
* {@inheritdoc}
29-
*/
30-
public function onPostReceived(Context $context)
26+
public function onPostMessageReceived(PostMessageReceived $context): void
3127
{
3228
$this->producer->flush();
3329
}
3430

35-
public function onInterrupted(Context $context)
31+
public function onEnd(End $context): void
3632
{
3733
$this->producer->flush();
3834
}

0 commit comments

Comments
 (0)