Skip to content

Commit 389b5ce

Browse files
authored
Merge pull request #548 from php-enqueue/consumer-multiple-transport-support
[consumption] Add ability to consume from multiple transports.
2 parents d5984eb + fce6c46 commit 389b5ce

File tree

54 files changed

+2645
-987
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+2645
-987
lines changed

composer.json

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
"doctrine/dbal": "~2.5",
1919
"ramsey/uuid": "^2|^3.5",
2020
"psr/log": "^1",
21+
"psr/container": "^1",
2122
"symfony/event-dispatcher": "4.0.*",
2223
"makasim/temp-file": "^0.2",
2324
"google/cloud-pubsub": "^0.6.1|^1.0",

pkg/enqueue-bundle/DependencyInjection/Configuration.php

+6-13
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,13 @@ public function getConfigTreeBuilder(): TreeBuilder
2626
return ['transport' => ['dsn' => 'null:']];
2727
});
2828

29+
$transportFactory = new TransportFactory('default');
30+
2931
$transportNode = $rootNode->children()->arrayNode('transport');
30-
(new TransportFactory('default'))->addConfiguration($transportNode);
32+
$transportFactory->addTransportConfiguration($transportNode);
33+
34+
$consumptionNode = $rootNode->children()->arrayNode('consumption');
35+
$transportFactory->addQueueConsumerConfiguration($consumptionNode);
3136

3237
$rootNode->children()
3338
->arrayNode('client')->children()
@@ -40,18 +45,6 @@ public function getConfigTreeBuilder(): TreeBuilder
4045
->scalarNode('default_processor_queue')->defaultValue('default')->cannotBeEmpty()->end()
4146
->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end()
4247
->end()->end()
43-
->arrayNode('consumption')->addDefaultsIfNotSet()->children()
44-
->integerNode('idle_timeout')
45-
->min(0)
46-
->defaultValue(0)
47-
->info('the time in milliseconds queue consumer waits if no message received')
48-
->end()
49-
->integerNode('receive_timeout')
50-
->min(0)
51-
->defaultValue(100)
52-
->info('the time in milliseconds queue consumer waits for a message (100 ms by default)')
53-
->end()
54-
->end()->end()
5548
->booleanNode('job')->defaultFalse()->end()
5649
->arrayNode('async_events')
5750
->addDefaultsIfNotSet()

pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php

+17-13
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
use Enqueue\Client\CommandSubscriberInterface;
88
use Enqueue\Client\TopicSubscriberInterface;
99
use Enqueue\Client\TraceableProducer;
10-
use Enqueue\Consumption\QueueConsumer;
1110
use Enqueue\JobQueue\Job;
1211
use Enqueue\Symfony\DependencyInjection\ClientFactory;
1312
use Enqueue\Symfony\DependencyInjection\TransportFactory;
@@ -29,13 +28,15 @@ public function load(array $configs, ContainerBuilder $container): void
2928
$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
3029
$loader->load('services.yml');
3130

32-
$this->setupAutowiringForProcessors($container);
33-
3431
$transportFactory = (new TransportFactory('default'));
35-
$transportFactory->createConnectionFactory($container, $config['transport']);
36-
$transportFactory->createContext($container, $config['transport']);
32+
$transportFactory->buildConnectionFactory($container, $config['transport']);
33+
$transportFactory->buildContext($container, []);
34+
$transportFactory->buildQueueConsumer($container, $config['consumption']);
35+
$transportFactory->buildRpcClient($container, []);
3736

3837
if (isset($config['client'])) {
38+
$this->setupAutowiringForProcessors($container);
39+
3940
$loader->load('client.yml');
4041
$loader->load('extensions/flush_spool_producer_extension.yml');
4142
$loader->load('extensions/exclusive_command_extension.yml');
@@ -77,17 +78,20 @@ public function load(array $configs, ContainerBuilder $container): void
7778
->replaceArgument(1, $config['client']['redelivered_delay_time'])
7879
;
7980
}
80-
}
8181

82-
// todo configure queue consumer
83-
$container->getDefinition(QueueConsumer::class)
84-
->replaceArgument(2, $config['consumption']['idle_timeout'])
85-
->replaceArgument(3, $config['consumption']['receive_timeout'])
86-
;
82+
$locatorId = 'enqueue.locator';
83+
if ($container->hasDefinition($locatorId)) {
84+
$locator = $container->getDefinition($locatorId);
85+
$locator->replaceArgument(0, array_replace($locator->getArgument(0), [
86+
'enqueue.client.default.queue_consumer' => new Reference('enqueue.client.default.queue_consumer'),
87+
'enqueue.client.default.driver' => new Reference('enqueue.client.default.driver'),
88+
'enqueue.client.default.delegate_processor' => new Reference('enqueue.client.default.delegate_processor'),
89+
'enqueue.client.default.producer' => new Reference('enqueue.client.default.producer'),
90+
]));
91+
}
8792

88-
if ($container->hasDefinition('enqueue.client.default.queue_consumer')) {
8993
$container->getDefinition('enqueue.client.default.queue_consumer')
90-
->replaceArgument(2, $config['consumption']['idle_timeout'])
94+
->replaceArgument(2, $config['consumption']['idle_time'])
9195
->replaceArgument(3, $config['consumption']['receive_timeout'])
9296
;
9397
}

pkg/enqueue-bundle/EnqueueBundle.php

+16-9
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncTransformersPass;
88
use Enqueue\Symfony\Client\DependencyInjection\AnalyzeRouteCollectionPass;
99
use Enqueue\Symfony\Client\DependencyInjection\BuildClientExtensionsPass;
10-
use Enqueue\Symfony\Client\DependencyInjection\BuildCommandSubscriberRoutesPass;
11-
use Enqueue\Symfony\Client\DependencyInjection\BuildConsumptionExtensionsPass;
12-
use Enqueue\Symfony\Client\DependencyInjection\BuildProcessorRegistryPass;
13-
use Enqueue\Symfony\Client\DependencyInjection\BuildProcessorRoutesPass;
14-
use Enqueue\Symfony\Client\DependencyInjection\BuildTopicSubscriberRoutesPass;
10+
use Enqueue\Symfony\Client\DependencyInjection\BuildCommandSubscriberRoutesPass as BuildClientCommandSubscriberRoutesPass;
11+
use Enqueue\Symfony\Client\DependencyInjection\BuildConsumptionExtensionsPass as BuildClientConsumptionExtensionsPass;
12+
use Enqueue\Symfony\Client\DependencyInjection\BuildProcessorRegistryPass as BuildClientProcessorRegistryPass;
13+
use Enqueue\Symfony\Client\DependencyInjection\BuildProcessorRoutesPass as BuildClientProcessorRoutesPass;
14+
use Enqueue\Symfony\Client\DependencyInjection\BuildTopicSubscriberRoutesPass as BuildClientTopicSubscriberRoutesPass;
15+
use Enqueue\Symfony\DependencyInjection\BuildConsumptionExtensionsPass;
16+
use Enqueue\Symfony\DependencyInjection\BuildProcessorRegistryPass;
1517
use Symfony\Component\DependencyInjection\Compiler\PassConfig;
1618
use Symfony\Component\DependencyInjection\ContainerBuilder;
1719
use Symfony\Component\HttpKernel\Bundle\Bundle;
@@ -20,13 +22,18 @@ class EnqueueBundle extends Bundle
2022
{
2123
public function build(ContainerBuilder $container): void
2224
{
25+
//transport passes
2326
$container->addCompilerPass(new BuildConsumptionExtensionsPass('default'));
27+
$container->addCompilerPass(new BuildProcessorRegistryPass('default'));
28+
29+
//client passes
30+
$container->addCompilerPass(new BuildClientConsumptionExtensionsPass('default'));
2431
$container->addCompilerPass(new BuildClientExtensionsPass('default'));
25-
$container->addCompilerPass(new BuildTopicSubscriberRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
26-
$container->addCompilerPass(new BuildCommandSubscriberRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
27-
$container->addCompilerPass(new BuildProcessorRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
32+
$container->addCompilerPass(new BuildClientTopicSubscriberRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
33+
$container->addCompilerPass(new BuildClientCommandSubscriberRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
34+
$container->addCompilerPass(new BuildClientProcessorRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
2835
$container->addCompilerPass(new AnalyzeRouteCollectionPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 30);
29-
$container->addCompilerPass(new BuildProcessorRegistryPass('default'));
36+
$container->addCompilerPass(new BuildClientProcessorRegistryPass('default'));
3037

3138
if (class_exists(AsyncEventDispatcherExtension::class)) {
3239
$container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);

pkg/enqueue-bundle/Resources/config/client.yml

+3-33
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
services:
2-
enqueue.client.default.context:
2+
services:
3+
enqueue.client.default.context:
34
class: 'Interop\Queue\Context'
45
factory: ['@enqueue.client.default.driver', 'getContext']
56

@@ -48,7 +49,7 @@ services:
4849
- '@enqueue.client.default.driver'
4950

5051
enqueue.client.default.processor_registry:
51-
class: 'Enqueue\Symfony\Client\ContainerProcessorRegistry'
52+
class: 'Enqueue\Symfony\ContainerProcessorRegistry'
5253

5354
enqueue.client.default.delegate_processor:
5455
class: 'Enqueue\Client\DelegateProcessor'
@@ -75,37 +76,6 @@ services:
7576
arguments:
7677
- []
7778

78-
enqueue.client.default.consume_messages_command:
79-
class: 'Enqueue\Symfony\Client\ConsumeMessagesCommand'
80-
arguments:
81-
- '@enqueue.client.default.queue_consumer'
82-
- '@enqueue.client.default.delegate_processor'
83-
- '@enqueue.client.default.driver'
84-
tags:
85-
- { name: 'console.command' }
86-
87-
enqueue.client.default.produce_message_command:
88-
class: 'Enqueue\Symfony\Client\ProduceMessageCommand'
89-
arguments:
90-
- '@enqueue.client.default.producer'
91-
tags:
92-
- { name: 'console.command' }
93-
94-
enqueue.client.default.setup_broker_command:
95-
class: 'Enqueue\Symfony\Client\SetupBrokerCommand'
96-
arguments:
97-
- '@enqueue.client.default.driver'
98-
tags:
99-
- { name: 'console.command' }
100-
101-
enqueue.client.default.routes_command:
102-
class: 'Enqueue\Symfony\Client\RoutesCommand'
103-
arguments:
104-
- '@enqueue.client.default.config'
105-
- '@enqueue.client.default.route_collection'
106-
tags:
107-
- { name: 'console.command' }
108-
10979
# todo
11080
enqueue.profiler.message_queue_collector:
11181
class: 'Enqueue\Bundle\Profiler\MessageQueueCollector'
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,49 @@
1-
parameters:
2-
enqueue.queue_consumer.default_idle_time: 0
3-
enqueue.queue_consumer.default_receive_timeout: 10000
4-
51
services:
6-
enqueue.consumption.extensions:
7-
class: 'Enqueue\Consumption\ChainExtension'
8-
public: false
2+
enqueue.locator:
3+
class: 'Symfony\Component\DependencyInjection\ServiceLocator'
94
arguments:
105
- []
6+
tags: ['container.service_locator']
117

12-
Enqueue\Consumption\QueueConsumer:
13-
class: 'Enqueue\Consumption\QueueConsumer'
14-
public: true
8+
enqueue.transport.consume_command:
9+
class: 'Enqueue\Symfony\Consumption\ConfigurableConsumeCommand'
1510
arguments:
16-
- '@enqueue.transport.default.context'
17-
- '@enqueue.consumption.extensions'
18-
- '%enqueue.queue_consumer.default_idle_time%'
19-
- '%enqueue.queue_consumer.default_receive_timeout%'
20-
21-
# Deprecated. To be removed in 0.10.
22-
enqueue.consumption.queue_consumer:
23-
public: true
24-
alias: 'Enqueue\Consumption\QueueConsumer'
11+
- '@enqueue.locator'
12+
- 'enqueue.transport.%s.queue_consumer'
13+
- 'enqueue.transport.%s.processor_registry'
14+
tags:
15+
- { name: 'console.command' }
2516

26-
Enqueue\Symfony\Consumption\ContainerAwareConsumeMessagesCommand:
27-
class: 'Enqueue\Symfony\Consumption\ContainerAwareConsumeMessagesCommand'
28-
public: true
17+
enqueue.client.consume_command:
18+
class: 'Enqueue\Symfony\Client\ConsumeCommand'
2919
arguments:
30-
- '@Enqueue\Consumption\QueueConsumer'
20+
- '@enqueue.locator'
21+
- 'enqueue.client.%s.queue_consumer'
22+
- 'enqueue.client.%s.driver'
23+
- 'enqueue.client.%s.delegate_processor'
3124
tags:
3225
- { name: 'console.command' }
3326

34-
# Deprecated. To be removed in 0.10.
35-
enqueue.command.consume_messages:
36-
public: true
37-
alias: 'Enqueue\Symfony\Consumption\ContainerAwareConsumeMessagesCommand'
38-
39-
enqueue.transport.rpc_factory:
40-
class: 'Enqueue\Rpc\RpcFactory'
41-
public: false
27+
enqueue.client.produce_command:
28+
class: 'Enqueue\Symfony\Client\ProduceCommand'
4229
arguments:
43-
- '@enqueue.transport.default.context'
30+
- '@enqueue.locator'
31+
- 'enqueue.client.%s.producer'
32+
tags:
33+
- { name: 'console.command' }
4434

45-
Enqueue\Rpc\RpcClient:
46-
class: 'Enqueue\Rpc\RpcClient'
47-
public: true
35+
enqueue.client.setup_broker_command:
36+
class: 'Enqueue\Symfony\Client\SetupBrokerCommand'
4837
arguments:
49-
- '@enqueue.transport.default.context'
50-
- '@enqueue.transport.rpc_factory'
38+
- '@enqueue.locator'
39+
- 'enqueue.client.%s.driver'
40+
tags:
41+
- { name: 'console.command' }
5142

52-
# Deprecated. To be removed in 0.10.
53-
enqueue.transport.rpc_client:
54-
public: true
55-
alias: 'Enqueue\Rpc\RpcClient'
43+
enqueue.client.routes_command:
44+
class: 'Enqueue\Symfony\Client\RoutesCommand'
45+
arguments:
46+
- '@enqueue.locator'
47+
- 'enqueue.client.%s.driver'
48+
tags:
49+
- { name: 'console.command' }

pkg/enqueue-bundle/Tests/Functional/App/config/config.yml

+16-4
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,18 @@ services:
3434
alias: 'enqueue.client.default.traceable_producer'
3535
public: true
3636

37+
test_enqueue.transport.default.queue_consumer:
38+
alias: 'enqueue.transport.default.queue_consumer'
39+
public: true
40+
41+
test_enqueue.client.default.queue_consumer:
42+
alias: 'enqueue.client.default.queue_consumer'
43+
public: true
44+
45+
test_enqueue.transport.default.rpc_client:
46+
alias: 'enqueue.transport.default.rpc_client'
47+
public: true
48+
3749
test_enqueue.client.default.producer:
3850
alias: 'enqueue.client.default.producer'
3951
public: true
@@ -50,12 +62,12 @@ services:
5062
alias: 'enqueue.transport.default.context'
5163
public: true
5264

53-
test_enqueue.client.default.consume_messages_command:
54-
alias: 'enqueue.client.default.consume_messages_command'
65+
test_enqueue.client.consume_command:
66+
alias: 'enqueue.client.consume_command'
5567
public: true
5668

57-
test.enqueue.client.default.routes_command:
58-
alias: 'enqueue.client.default.routes_command'
69+
test.enqueue.client.routes_command:
70+
alias: 'enqueue.client.routes_command'
5971
public: true
6072

6173
test_async_listener:

pkg/enqueue-bundle/Tests/Functional/App/config/custom-config.yml

+15-2
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,28 @@ services:
2323
alias: 'enqueue.transport.default.context'
2424
public: true
2525

26-
test_enqueue.client.default.consume_messages_command:
27-
alias: 'enqueue.client.default.consume_messages_command'
26+
test_enqueue.transport.consume_command:
27+
alias: 'enqueue.transport.consume_command'
28+
public: true
29+
30+
test_enqueue.client.consume_command:
31+
alias: 'enqueue.client.consume_command'
32+
public: true
33+
34+
test_enqueue.client.produce_command:
35+
alias: 'enqueue.client.produce_command'
36+
public: true
37+
38+
test_enqueue.client.setup_broker_command:
39+
alias: 'enqueue.client.setup_broker_command'
2840
public: true
2941

3042
test.message.processor:
3143
class: 'Enqueue\Bundle\Tests\Functional\TestProcessor'
3244
public: true
3345
tags:
3446
- { name: 'enqueue.topic_subscriber', client: 'default' }
47+
- { name: 'enqueue.transport.processor', transport: 'default' }
3548

3649
test.message.command_processor:
3750
class: 'Enqueue\Bundle\Tests\Functional\TestCommandProcessor'

pkg/enqueue-bundle/Tests/Functional/QueueConsumerTest.php

+3-1
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ class QueueConsumerTest extends WebTestCase
1111
{
1212
public function testCouldBeGetFromContainerAsService()
1313
{
14-
$queueConsumer = static::$container->get(QueueConsumer::class);
14+
$queueConsumer = static::$container->get('test_enqueue.client.default.queue_consumer');
15+
$this->assertInstanceOf(QueueConsumer::class, $queueConsumer);
1516

17+
$queueConsumer = static::$container->get('test_enqueue.transport.default.queue_consumer');
1618
$this->assertInstanceOf(QueueConsumer::class, $queueConsumer);
1719
}
1820
}

pkg/enqueue-bundle/Tests/Functional/RoutesCommandTest.php

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@ class RoutesCommandTest extends WebTestCase
1212
{
1313
public function testCouldBeGetFromContainerAsService()
1414
{
15-
$command = static::$container->get('test.enqueue.client.default.routes_command');
15+
$command = static::$container->get('test.enqueue.client.routes_command');
1616

1717
$this->assertInstanceOf(RoutesCommand::class, $command);
1818
}
1919

2020
public function testShouldDisplayRegisteredTopics()
2121
{
2222
/** @var RoutesCommand $command */
23-
$command = static::$container->get('test.enqueue.client.default.routes_command');
23+
$command = static::$container->get('test.enqueue.client.routes_command');
2424

2525
$tester = new CommandTester($command);
2626
$tester->execute([]);
@@ -36,7 +36,7 @@ public function testShouldDisplayRegisteredTopics()
3636
public function testShouldDisplayCommands()
3737
{
3838
/** @var RoutesCommand $command */
39-
$command = static::$container->get('test.enqueue.client.default.routes_command');
39+
$command = static::$container->get('test.enqueue.client.routes_command');
4040

4141
$tester = new CommandTester($command);
4242
$tester->execute([]);

0 commit comments

Comments
 (0)