Skip to content

Commit fce6c46

Browse files
committed
Make cli commands multi transport\client.
1 parent 9fd20c2 commit fce6c46

32 files changed

+1218
-481
lines changed

composer.json

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
"doctrine/dbal": "~2.5",
1919
"ramsey/uuid": "^2|^3.5",
2020
"psr/log": "^1",
21+
"psr/container": "^1",
2122
"symfony/event-dispatcher": "4.0.*",
2223
"makasim/temp-file": "^0.2",
2324
"google/cloud-pubsub": "^0.6.1|^1.0",

pkg/enqueue-bundle/DependencyInjection/Configuration.php

+6-13
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,13 @@ public function getConfigTreeBuilder(): TreeBuilder
2626
return ['transport' => ['dsn' => 'null:']];
2727
});
2828

29+
$transportFactory = new TransportFactory('default');
30+
2931
$transportNode = $rootNode->children()->arrayNode('transport');
30-
(new TransportFactory('default'))->addConfiguration($transportNode);
32+
$transportFactory->addTransportConfiguration($transportNode);
33+
34+
$consumptionNode = $rootNode->children()->arrayNode('consumption');
35+
$transportFactory->addQueueConsumerConfiguration($consumptionNode);
3136

3237
$rootNode->children()
3338
->arrayNode('client')->children()
@@ -40,18 +45,6 @@ public function getConfigTreeBuilder(): TreeBuilder
4045
->scalarNode('default_processor_queue')->defaultValue('default')->cannotBeEmpty()->end()
4146
->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end()
4247
->end()->end()
43-
->arrayNode('consumption')->addDefaultsIfNotSet()->children()
44-
->integerNode('idle_timeout')
45-
->min(0)
46-
->defaultValue(0)
47-
->info('the time in milliseconds queue consumer waits if no message received')
48-
->end()
49-
->integerNode('receive_timeout')
50-
->min(0)
51-
->defaultValue(100)
52-
->info('the time in milliseconds queue consumer waits for a message (100 ms by default)')
53-
->end()
54-
->end()->end()
5548
->booleanNode('job')->defaultFalse()->end()
5649
->arrayNode('async_events')
5750
->addDefaultsIfNotSet()

pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php

+17-11
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,15 @@ public function load(array $configs, ContainerBuilder $container): void
2828
$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
2929
$loader->load('services.yml');
3030

31-
$this->setupAutowiringForProcessors($container);
32-
3331
$transportFactory = (new TransportFactory('default'));
34-
$transportFactory->build($container, $config['transport']);
32+
$transportFactory->buildConnectionFactory($container, $config['transport']);
33+
$transportFactory->buildContext($container, []);
34+
$transportFactory->buildQueueConsumer($container, $config['consumption']);
35+
$transportFactory->buildRpcClient($container, []);
3536

3637
if (isset($config['client'])) {
38+
$this->setupAutowiringForProcessors($container);
39+
3740
$loader->load('client.yml');
3841
$loader->load('extensions/flush_spool_producer_extension.yml');
3942
$loader->load('extensions/exclusive_command_extension.yml');
@@ -75,17 +78,20 @@ public function load(array $configs, ContainerBuilder $container): void
7578
->replaceArgument(1, $config['client']['redelivered_delay_time'])
7679
;
7780
}
78-
}
7981

80-
// todo configure queue consumer
81-
$container->getDefinition('enqueue.transport.default.queue_consumer')
82-
->replaceArgument(2, $config['consumption']['idle_timeout'])
83-
->replaceArgument(3, $config['consumption']['receive_timeout'])
84-
;
82+
$locatorId = 'enqueue.locator';
83+
if ($container->hasDefinition($locatorId)) {
84+
$locator = $container->getDefinition($locatorId);
85+
$locator->replaceArgument(0, array_replace($locator->getArgument(0), [
86+
'enqueue.client.default.queue_consumer' => new Reference('enqueue.client.default.queue_consumer'),
87+
'enqueue.client.default.driver' => new Reference('enqueue.client.default.driver'),
88+
'enqueue.client.default.delegate_processor' => new Reference('enqueue.client.default.delegate_processor'),
89+
'enqueue.client.default.producer' => new Reference('enqueue.client.default.producer'),
90+
]));
91+
}
8592

86-
if ($container->hasDefinition('enqueue.client.default.queue_consumer')) {
8793
$container->getDefinition('enqueue.client.default.queue_consumer')
88-
->replaceArgument(2, $config['consumption']['idle_timeout'])
94+
->replaceArgument(2, $config['consumption']['idle_time'])
8995
->replaceArgument(3, $config['consumption']['receive_timeout'])
9096
;
9197
}

pkg/enqueue-bundle/Resources/config/client.yml

+3-33
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
services:
2-
enqueue.client.default.context:
2+
services:
3+
enqueue.client.default.context:
34
class: 'Interop\Queue\Context'
45
factory: ['@enqueue.client.default.driver', 'getContext']
56

@@ -48,7 +49,7 @@ services:
4849
- '@enqueue.client.default.driver'
4950

5051
enqueue.client.default.processor_registry:
51-
class: 'Enqueue\Symfony\Client\ContainerProcessorRegistry'
52+
class: 'Enqueue\Symfony\ContainerProcessorRegistry'
5253

5354
enqueue.client.default.delegate_processor:
5455
class: 'Enqueue\Client\DelegateProcessor'
@@ -75,37 +76,6 @@ services:
7576
arguments:
7677
- []
7778

78-
enqueue.client.default.consume_messages_command:
79-
class: 'Enqueue\Symfony\Client\ConsumeMessagesCommand'
80-
arguments:
81-
- '@enqueue.client.default.queue_consumer'
82-
- '@enqueue.client.default.delegate_processor'
83-
- '@enqueue.client.default.driver'
84-
tags:
85-
- { name: 'console.command' }
86-
87-
enqueue.client.default.produce_message_command:
88-
class: 'Enqueue\Symfony\Client\ProduceMessageCommand'
89-
arguments:
90-
- '@enqueue.client.default.producer'
91-
tags:
92-
- { name: 'console.command' }
93-
94-
enqueue.client.default.setup_broker_command:
95-
class: 'Enqueue\Symfony\Client\SetupBrokerCommand'
96-
arguments:
97-
- '@enqueue.client.default.driver'
98-
tags:
99-
- { name: 'console.command' }
100-
101-
enqueue.client.default.routes_command:
102-
class: 'Enqueue\Symfony\Client\RoutesCommand'
103-
arguments:
104-
- '@enqueue.client.default.config'
105-
- '@enqueue.client.default.route_collection'
106-
tags:
107-
- { name: 'console.command' }
108-
10979
# todo
11080
enqueue.profiler.message_queue_collector:
11181
class: 'Enqueue\Bundle\Profiler\MessageQueueCollector'

pkg/enqueue-bundle/Resources/config/services.yml

+35-2
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,45 @@ services:
55
- []
66
tags: ['container.service_locator']
77

8-
enqueue.consume_command:
8+
enqueue.transport.consume_command:
99
class: 'Enqueue\Symfony\Consumption\ConfigurableConsumeCommand'
10-
public: true
1110
arguments:
1211
- '@enqueue.locator'
1312
- 'enqueue.transport.%s.queue_consumer'
1413
- 'enqueue.transport.%s.processor_registry'
1514
tags:
1615
- { name: 'console.command' }
16+
17+
enqueue.client.consume_command:
18+
class: 'Enqueue\Symfony\Client\ConsumeCommand'
19+
arguments:
20+
- '@enqueue.locator'
21+
- 'enqueue.client.%s.queue_consumer'
22+
- 'enqueue.client.%s.driver'
23+
- 'enqueue.client.%s.delegate_processor'
24+
tags:
25+
- { name: 'console.command' }
26+
27+
enqueue.client.produce_command:
28+
class: 'Enqueue\Symfony\Client\ProduceCommand'
29+
arguments:
30+
- '@enqueue.locator'
31+
- 'enqueue.client.%s.producer'
32+
tags:
33+
- { name: 'console.command' }
34+
35+
enqueue.client.setup_broker_command:
36+
class: 'Enqueue\Symfony\Client\SetupBrokerCommand'
37+
arguments:
38+
- '@enqueue.locator'
39+
- 'enqueue.client.%s.driver'
40+
tags:
41+
- { name: 'console.command' }
42+
43+
enqueue.client.routes_command:
44+
class: 'Enqueue\Symfony\Client\RoutesCommand'
45+
arguments:
46+
- '@enqueue.locator'
47+
- 'enqueue.client.%s.driver'
48+
tags:
49+
- { name: 'console.command' }

pkg/enqueue-bundle/Tests/Functional/App/config/config.yml

+16-4
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,18 @@ services:
3434
alias: 'enqueue.client.default.traceable_producer'
3535
public: true
3636

37+
test_enqueue.transport.default.queue_consumer:
38+
alias: 'enqueue.transport.default.queue_consumer'
39+
public: true
40+
41+
test_enqueue.client.default.queue_consumer:
42+
alias: 'enqueue.client.default.queue_consumer'
43+
public: true
44+
45+
test_enqueue.transport.default.rpc_client:
46+
alias: 'enqueue.transport.default.rpc_client'
47+
public: true
48+
3749
test_enqueue.client.default.producer:
3850
alias: 'enqueue.client.default.producer'
3951
public: true
@@ -50,12 +62,12 @@ services:
5062
alias: 'enqueue.transport.default.context'
5163
public: true
5264

53-
test_enqueue.client.default.consume_messages_command:
54-
alias: 'enqueue.client.default.consume_messages_command'
65+
test_enqueue.client.consume_command:
66+
alias: 'enqueue.client.consume_command'
5567
public: true
5668

57-
test.enqueue.client.default.routes_command:
58-
alias: 'enqueue.client.default.routes_command'
69+
test.enqueue.client.routes_command:
70+
alias: 'enqueue.client.routes_command'
5971
public: true
6072

6173
test_async_listener:

pkg/enqueue-bundle/Tests/Functional/App/config/custom-config.yml

+14-2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,20 @@ services:
2323
alias: 'enqueue.transport.default.context'
2424
public: true
2525

26-
test_enqueue.consume_command:
27-
alias: 'enqueue.consume_command'
26+
test_enqueue.transport.consume_command:
27+
alias: 'enqueue.transport.consume_command'
28+
public: true
29+
30+
test_enqueue.client.consume_command:
31+
alias: 'enqueue.client.consume_command'
32+
public: true
33+
34+
test_enqueue.client.produce_command:
35+
alias: 'enqueue.client.produce_command'
36+
public: true
37+
38+
test_enqueue.client.setup_broker_command:
39+
alias: 'enqueue.client.setup_broker_command'
2840
public: true
2941

3042
test.message.processor:

pkg/enqueue-bundle/Tests/Functional/QueueConsumerTest.php

+3-1
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ class QueueConsumerTest extends WebTestCase
1111
{
1212
public function testCouldBeGetFromContainerAsService()
1313
{
14-
$queueConsumer = static::$container->get(QueueConsumer::class);
14+
$queueConsumer = static::$container->get('test_enqueue.client.default.queue_consumer');
15+
$this->assertInstanceOf(QueueConsumer::class, $queueConsumer);
1516

17+
$queueConsumer = static::$container->get('test_enqueue.transport.default.queue_consumer');
1618
$this->assertInstanceOf(QueueConsumer::class, $queueConsumer);
1719
}
1820
}

pkg/enqueue-bundle/Tests/Functional/RoutesCommandTest.php

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@ class RoutesCommandTest extends WebTestCase
1212
{
1313
public function testCouldBeGetFromContainerAsService()
1414
{
15-
$command = static::$container->get('test.enqueue.client.default.routes_command');
15+
$command = static::$container->get('test.enqueue.client.routes_command');
1616

1717
$this->assertInstanceOf(RoutesCommand::class, $command);
1818
}
1919

2020
public function testShouldDisplayRegisteredTopics()
2121
{
2222
/** @var RoutesCommand $command */
23-
$command = static::$container->get('test.enqueue.client.default.routes_command');
23+
$command = static::$container->get('test.enqueue.client.routes_command');
2424

2525
$tester = new CommandTester($command);
2626
$tester->execute([]);
@@ -36,7 +36,7 @@ public function testShouldDisplayRegisteredTopics()
3636
public function testShouldDisplayCommands()
3737
{
3838
/** @var RoutesCommand $command */
39-
$command = static::$container->get('test.enqueue.client.default.routes_command');
39+
$command = static::$container->get('test.enqueue.client.routes_command');
4040

4141
$tester = new CommandTester($command);
4242
$tester->execute([]);

pkg/enqueue-bundle/Tests/Functional/RpcClientTest.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ class RpcClientTest extends WebTestCase
1111
{
1212
public function testTransportRpcClientCouldBeGetFromContainerAsService()
1313
{
14-
$connection = static::$container->get(RpcClient::class);
14+
$rpcClient = static::$container->get('test_enqueue.transport.default.rpc_client');
1515

16-
$this->assertInstanceOf(RpcClient::class, $connection);
16+
$this->assertInstanceOf(RpcClient::class, $rpcClient);
1717
}
1818
}

0 commit comments

Comments
 (0)