From d5c497ca67dd7ada52f51a2c1d82cc879ee29a69 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 15 Nov 2018 14:04:34 +0200 Subject: [PATCH 1/7] multi client configuration --- docs/bundle/config_reference.md | 1 + .../DependencyInjection/Configuration.php | 15 +++++++- .../DependencyInjection/EnqueueExtension.php | 21 +++++++----- pkg/enqueue-bundle/Resources/config/job.yml | 6 ++-- .../CalculateRootJobStatusProcessorTest.php | 2 -- .../Job/DependentJobServiceTest.php | 2 -- .../Tests/Functional/Job/JobRunnerTest.php | 2 -- .../Tests/Functional/Job/JobStorageTest.php | 2 -- .../DependencyInjection/ConfigurationTest.php | 22 +++++++----- .../EnqueueExtensionTest.php | 34 ++++++++++++++----- 10 files changed, 70 insertions(+), 37 deletions(-) diff --git a/docs/bundle/config_reference.md b/docs/bundle/config_reference.md index 9118a8f9c..8011a0621 100644 --- a/docs/bundle/config_reference.md +++ b/docs/bundle/config_reference.md @@ -43,6 +43,7 @@ enqueue: # the time in milliseconds queue consumer waits for a message (100 ms by default) receive_timeout: 100 + job: false async_commands: enabled: false extensions: diff --git a/pkg/enqueue-bundle/DependencyInjection/Configuration.php b/pkg/enqueue-bundle/DependencyInjection/Configuration.php index 260a50e9e..58475b12d 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Configuration.php +++ b/pkg/enqueue-bundle/DependencyInjection/Configuration.php @@ -3,6 +3,7 @@ namespace Enqueue\Bundle\DependencyInjection; use Enqueue\AsyncCommand\RunCommandProcessor; +use Enqueue\JobQueue\Job; use Enqueue\Monitoring\Symfony\DependencyInjection\MonitoringFactory; use Enqueue\Symfony\Client\DependencyInjection\ClientFactory; use Enqueue\Symfony\DependencyInjection\TransportFactory; @@ -35,6 +36,7 @@ public function getConfigTreeBuilder(): TreeBuilder ->append(ClientFactory::getConfiguration($this->debug)) ->append($this->getMonitoringConfiguration()) ->append($this->getAsyncCommandsConfiguration()) + ->append($this->getJobConfiguration()) ->arrayNode('extensions')->addDefaultsIfNotSet()->children() ->booleanNode('doctrine_ping_connection_extension')->defaultFalse()->end() ->booleanNode('doctrine_clear_identity_map_extension')->defaultFalse()->end() @@ -46,7 +48,6 @@ public function getConfigTreeBuilder(): TreeBuilder ; // $rootNode->children() -// ->booleanNode('job')->defaultFalse()->end() // ->arrayNode('async_events') // ->addDefaultsIfNotSet() // ->canBeEnabled() @@ -76,4 +77,16 @@ private function getAsyncCommandsConfiguration(): ArrayNodeDefinition ->canBeEnabled() ; } + + private function getJobConfiguration(): ArrayNodeDefinition + { + if (false === class_exists(Job::class)) { + return MissingComponentFactory::getConfiguration('job', ['enqueue/job-queue']); + } + + return (new ArrayNodeDefinition('job')) + ->addDefaultsIfNotSet() + ->canBeEnabled() + ; + } } diff --git a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php index 81c356688..f3a45e128 100644 --- a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php +++ b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php @@ -84,6 +84,19 @@ public function load(array $configs, ContainerBuilder $container): void $monitoringFactory->buildClientExtension($container, $configs['monitoring']); } } + + // job-queue + if (false == empty($configs['job']['enabled'])) { + if (false === isset($configs['client'])) { + throw new \LogicException('Client is required for job-queue.'); + } + + if ($name !== $defaultName) { + throw new \LogicException('Job-queue supports only default configuration.'); + } + + $loader->load('job.yml'); + } } $defaultClient = null; @@ -113,14 +126,6 @@ public function load(array $configs, ContainerBuilder $container): void $this->loadSignalExtension($config, $container); $this->loadReplyExtension($config, $container); -// if ($config['job']) { -// if (!class_exists(Job::class)) { -// throw new \LogicException('Seems "enqueue/job-queue" is not installed. Please fix this issue.'); -// } -// -// $loader->load('job.yml'); -// } -// // if ($config['async_events']['enabled']) { // if (false == class_exists(AsyncEventDispatcherExtension::class)) { // throw new \LogicException('The "enqueue/async-event-dispatcher" package has to be installed.'); diff --git a/pkg/enqueue-bundle/Resources/config/job.yml b/pkg/enqueue-bundle/Resources/config/job.yml index 2f55f3f70..0a368aebc 100644 --- a/pkg/enqueue-bundle/Resources/config/job.yml +++ b/pkg/enqueue-bundle/Resources/config/job.yml @@ -20,7 +20,7 @@ services: public: true arguments: - '@Enqueue\JobQueue\Doctrine\JobStorage' - - '@enqueue.client.default.producer' + - '@Enqueue\Client\ProducerInterface' # Deprecated. To be removed in 0.10. enqueue.job.processor: @@ -55,7 +55,7 @@ services: arguments: - '@Enqueue\JobQueue\Doctrine\JobStorage' - '@Enqueue\JobQueue\CalculateRootJobStatusService' - - '@enqueue.client.default.producer' + - '@Enqueue\Client\ProducerInterface' - '@logger' tags: - { name: 'enqueue.command_subscriber', client: 'default' } @@ -70,7 +70,7 @@ services: public: true arguments: - '@Enqueue\JobQueue\Doctrine\JobStorage' - - '@enqueue.client.default.producer' + - '@Enqueue\Client\ProducerInterface' - '@logger' tags: - { name: 'enqueue.topic_subscriber', client: 'default' } diff --git a/pkg/enqueue-bundle/Tests/Functional/Job/CalculateRootJobStatusProcessorTest.php b/pkg/enqueue-bundle/Tests/Functional/Job/CalculateRootJobStatusProcessorTest.php index 0749b8354..01346ad8c 100644 --- a/pkg/enqueue-bundle/Tests/Functional/Job/CalculateRootJobStatusProcessorTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/Job/CalculateRootJobStatusProcessorTest.php @@ -12,8 +12,6 @@ class CalculateRootJobStatusProcessorTest extends WebTestCase { public function testCouldBeConstructedByContainer() { - $this->markTestSkipped('Configuration for jobs is not yet ready'); - $instance = static::$container->get(CalculateRootJobStatusProcessor::class); $this->assertInstanceOf(CalculateRootJobStatusProcessor::class, $instance); diff --git a/pkg/enqueue-bundle/Tests/Functional/Job/DependentJobServiceTest.php b/pkg/enqueue-bundle/Tests/Functional/Job/DependentJobServiceTest.php index adee418ec..1aec06410 100644 --- a/pkg/enqueue-bundle/Tests/Functional/Job/DependentJobServiceTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/Job/DependentJobServiceTest.php @@ -12,8 +12,6 @@ class DependentJobServiceTest extends WebTestCase { public function testCouldBeConstructedByContainer() { - $this->markTestSkipped('Configuration for jobs is not yet ready'); - $instance = static::$container->get(DependentJobService::class); $this->assertInstanceOf(DependentJobService::class, $instance); diff --git a/pkg/enqueue-bundle/Tests/Functional/Job/JobRunnerTest.php b/pkg/enqueue-bundle/Tests/Functional/Job/JobRunnerTest.php index c27b48218..4aa647f77 100644 --- a/pkg/enqueue-bundle/Tests/Functional/Job/JobRunnerTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/Job/JobRunnerTest.php @@ -12,8 +12,6 @@ class JobRunnerTest extends WebTestCase { public function testCouldBeConstructedByContainer() { - $this->markTestSkipped('Configuration for jobs is not yet ready'); - $instance = static::$container->get(JobRunner::class); $this->assertInstanceOf(JobRunner::class, $instance); diff --git a/pkg/enqueue-bundle/Tests/Functional/Job/JobStorageTest.php b/pkg/enqueue-bundle/Tests/Functional/Job/JobStorageTest.php index 4d4a83316..650326ad8 100644 --- a/pkg/enqueue-bundle/Tests/Functional/Job/JobStorageTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/Job/JobStorageTest.php @@ -12,8 +12,6 @@ class JobStorageTest extends WebTestCase { public function testCouldGetJobStorageAsServiceFromContainer() { - $this->markTestSkipped('Configuration for jobs is not yet ready'); - $instance = static::$container->get(JobStorage::class); $this->assertInstanceOf(JobStorage::class, $instance); diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php index b735bda14..1522b49e1 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php @@ -160,34 +160,38 @@ public function testShouldThrowExceptionIfDefaultProcessorQueueIsEmpty() public function testJobShouldBeDisabledByDefault() { - $this->markTestSkipped('Configuration for jobs is not yet ready'); - $configuration = new Configuration(true); $processor = new Processor(); $config = $processor->processConfiguration($configuration, [[ - 'transport' => [], + 'default' => [ + 'transport' => [], + ], ]]); $this->assertArraySubset([ - 'job' => false, + 'default' => [ + 'job' => false, + ], ], $config); } public function testCouldEnableJob() { - $this->markTestSkipped('Configuration for jobs is not yet ready'); - $configuration = new Configuration(true); $processor = new Processor(); $config = $processor->processConfiguration($configuration, [[ - 'transport' => [], - 'job' => true, + 'default' => [ + 'transport' => [], + 'job' => true, + ], ]]); $this->assertArraySubset([ - 'job' => true, + 'default' => [ + 'job' => true, + ], ], $config); } diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php index 9fd7c2e98..b17d6a949 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php @@ -255,31 +255,49 @@ public function testShouldNotLoadDelayRedeliveredMessageExtensionIfRedeliveredDe public function testShouldLoadJobServicesIfEnabled() { - $this->markTestSkipped('Configuration for jobs is not yet ready'); - $container = $this->getContainerBuilder(true); $extension = new EnqueueExtension(); $extension->load([[ - 'transport' => [], - 'job' => true, + 'default' => [ + 'transport' => [], + 'client' => null, + 'job' => true, + ], ]], $container); self::assertTrue($container->hasDefinition(JobRunner::class)); } - public function testShouldNotLoadJobServicesIfDisabled() + public function testShouldThrowExceptionIfClientIsNotEnabledOnJobLoad() { - $this->markTestSkipped('Configuration for jobs is not yet ready'); + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Client is required for job-queue.'); + + $container = $this->getContainerBuilder(true); + + $extension = new EnqueueExtension(); + + $extension->load([[ + 'default' => [ + 'transport' => [], + 'job' => true, + ], + ]], $container); + } + public function testShouldNotLoadJobServicesIfDisabled() + { $container = $this->getContainerBuilder(true); $extension = new EnqueueExtension(); $extension->load([[ - 'transport' => [], - 'job' => false, + 'default' => [ + 'transport' => [], + 'job' => false, + ], ]], $container); self::assertFalse($container->hasDefinition(JobRunner::class)); From 0afd9ca5e3a250ab7d613e033130e3c58b814387 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 15 Nov 2018 14:21:48 +0200 Subject: [PATCH 2/7] multi client configuration --- .../Tests/Unit/DependencyInjection/ConfigurationTest.php | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php index 1522b49e1..d1ce8bd87 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php @@ -171,7 +171,9 @@ public function testJobShouldBeDisabledByDefault() $this->assertArraySubset([ 'default' => [ - 'job' => false, + 'job' => [ + 'enabled' => false, + ], ], ], $config); } From 15e9260630598ffdd10990e0df5db653d3b68d42 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 15 Nov 2018 15:10:29 +0200 Subject: [PATCH 3/7] multi client configuration --- pkg/enqueue-bundle/Tests/Functional/App/config/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml b/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml index 8a99208cb..433fa289d 100644 --- a/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml +++ b/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml @@ -27,7 +27,7 @@ enqueue: client: traceable_producer: true async_commands: true -# job: true + job: true # async_commands: true services: From 0842340e53d74c8192e9c50a495dfb0b7b3fc09e Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 15 Nov 2018 15:54:49 +0200 Subject: [PATCH 4/7] multi client configuration --- .../DependencyInjection/Configuration.php | 21 +++++++++----- .../DependencyInjection/EnqueueExtension.php | 29 ++++++++++--------- .../Tests/Functional/App/config/config.yml | 2 +- .../Functional/Events/AsyncListenerTest.php | 2 -- .../Functional/Events/AsyncProcessorTest.php | 2 -- .../Functional/Events/AsyncSubscriberTest.php | 2 -- .../DependencyInjection/ConfigurationTest.php | 4 --- .../DependencyInjection/ClientFactory.php | 14 ++++++--- .../DependencyInjection/TransportFactory.php | 16 ++++++---- 9 files changed, 51 insertions(+), 41 deletions(-) diff --git a/pkg/enqueue-bundle/DependencyInjection/Configuration.php b/pkg/enqueue-bundle/DependencyInjection/Configuration.php index 58475b12d..526afb4b0 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Configuration.php +++ b/pkg/enqueue-bundle/DependencyInjection/Configuration.php @@ -3,6 +3,7 @@ namespace Enqueue\Bundle\DependencyInjection; use Enqueue\AsyncCommand\RunCommandProcessor; +use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncEventDispatcherExtension; use Enqueue\JobQueue\Job; use Enqueue\Monitoring\Symfony\DependencyInjection\MonitoringFactory; use Enqueue\Symfony\Client\DependencyInjection\ClientFactory; @@ -37,6 +38,7 @@ public function getConfigTreeBuilder(): TreeBuilder ->append($this->getMonitoringConfiguration()) ->append($this->getAsyncCommandsConfiguration()) ->append($this->getJobConfiguration()) + ->append($this->getAsyncEventsConfiguration()) ->arrayNode('extensions')->addDefaultsIfNotSet()->children() ->booleanNode('doctrine_ping_connection_extension')->defaultFalse()->end() ->booleanNode('doctrine_clear_identity_map_extension')->defaultFalse()->end() @@ -47,13 +49,6 @@ public function getConfigTreeBuilder(): TreeBuilder ->end() ; -// $rootNode->children() -// ->arrayNode('async_events') -// ->addDefaultsIfNotSet() -// ->canBeEnabled() -// ->end() -// ; - return $tb; } @@ -89,4 +84,16 @@ private function getJobConfiguration(): ArrayNodeDefinition ->canBeEnabled() ; } + + private function getAsyncEventsConfiguration(): ArrayNodeDefinition + { + if (false == class_exists(AsyncEventDispatcherExtension::class)) { + return MissingComponentFactory::getConfiguration('async_events', ['enqueue/async-event-dispatcher']); + } + + return (new ArrayNodeDefinition('job')) + ->addDefaultsIfNotSet() + ->canBeEnabled() + ; + } } diff --git a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php index f3a45e128..b7c4e424f 100644 --- a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php +++ b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php @@ -53,7 +53,7 @@ public function load(array $configs, ContainerBuilder $container): void // transport & consumption $transportNames[] = $name; - $transportFactory = (new TransportFactory($name)); + $transportFactory = (new TransportFactory($name, $defaultName === $name)); $transportFactory->buildConnectionFactory($container, $configs['transport']); $transportFactory->buildContext($container, []); $transportFactory->buildQueueConsumer($container, $configs['consumption']); @@ -68,8 +68,8 @@ public function load(array $configs, ContainerBuilder $container): void $clientConfig['transport'] = $configs['transport']; $clientConfig['consumption'] = $configs['consumption']; - $clientFactory = new ClientFactory($name); - $clientFactory->build($container, $clientConfig, $defaultName === $name); + $clientFactory = new ClientFactory($name, $defaultName === $name); + $clientFactory->build($container, $clientConfig); $clientFactory->createDriver($container, $configs['transport']); $clientFactory->createFlushSpoolProducerListener($container); } @@ -97,6 +97,18 @@ public function load(array $configs, ContainerBuilder $container): void $loader->load('job.yml'); } + + // async events + if (false == empty($config['async_events']['enabled'])) { + if ($name !== $defaultName) { + throw new \LogicException('Async events supports only default configuration.'); + } + + $extension = new AsyncEventDispatcherExtension(); + $extension->load([[ + 'context_service' => 'enqueue.transport.default.context', + ]], $container); + } } $defaultClient = null; @@ -125,17 +137,6 @@ public function load(array $configs, ContainerBuilder $container): void $this->loadDoctrineClearIdentityMapExtension($config, $container); $this->loadSignalExtension($config, $container); $this->loadReplyExtension($config, $container); - -// if ($config['async_events']['enabled']) { -// if (false == class_exists(AsyncEventDispatcherExtension::class)) { -// throw new \LogicException('The "enqueue/async-event-dispatcher" package has to be installed.'); -// } -// -// $extension = new AsyncEventDispatcherExtension(); -// $extension->load([[ -// 'context_service' => 'enqueue.transport.default.context', -// ]], $container); -// } } public function getConfiguration(array $config, ContainerBuilder $container): Configuration diff --git a/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml b/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml index 433fa289d..72d279ac5 100644 --- a/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml +++ b/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml @@ -28,7 +28,7 @@ enqueue: traceable_producer: true async_commands: true job: true -# async_commands: true + async_commands: true services: test_enqueue.client.default.traceable_producer: diff --git a/pkg/enqueue-bundle/Tests/Functional/Events/AsyncListenerTest.php b/pkg/enqueue-bundle/Tests/Functional/Events/AsyncListenerTest.php index 8436181b1..6e952ab28 100644 --- a/pkg/enqueue-bundle/Tests/Functional/Events/AsyncListenerTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/Events/AsyncListenerTest.php @@ -18,8 +18,6 @@ class AsyncListenerTest extends WebTestCase { public function setUp() { - $this->markTestSkipped('Configuration for async_events is not yet ready'); - parent::setUp(); /** @var AsyncListener $asyncListener */ diff --git a/pkg/enqueue-bundle/Tests/Functional/Events/AsyncProcessorTest.php b/pkg/enqueue-bundle/Tests/Functional/Events/AsyncProcessorTest.php index 75ed3ad73..101e5ecec 100644 --- a/pkg/enqueue-bundle/Tests/Functional/Events/AsyncProcessorTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/Events/AsyncProcessorTest.php @@ -20,8 +20,6 @@ class AsyncProcessorTest extends WebTestCase { public function setUp() { - $this->markTestSkipped('Configuration for async_events is not yet ready'); - parent::setUp(); /** @var AsyncListener $asyncListener */ diff --git a/pkg/enqueue-bundle/Tests/Functional/Events/AsyncSubscriberTest.php b/pkg/enqueue-bundle/Tests/Functional/Events/AsyncSubscriberTest.php index 081da0734..6e00eafca 100644 --- a/pkg/enqueue-bundle/Tests/Functional/Events/AsyncSubscriberTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/Events/AsyncSubscriberTest.php @@ -18,8 +18,6 @@ class AsyncSubscriberTest extends WebTestCase { public function setUp() { - $this->markTestSkipped('Configuration for async_events is not yet ready'); - parent::setUp(); /** @var AsyncListener $asyncListener */ diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php index d1ce8bd87..0662813a7 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php @@ -373,8 +373,6 @@ public function testReplyExtensionCouldBeDisabled() public function testShouldDisableAsyncEventsByDefault() { - $this->markTestSkipped('Configuration for async_events is not yet ready'); - $configuration = new Configuration(true); $processor = new Processor(); @@ -391,8 +389,6 @@ public function testShouldDisableAsyncEventsByDefault() public function testShouldAllowEnableAsyncEvents() { - $this->markTestSkipped('Configuration for async_events is not yet ready'); - $configuration = new Configuration(true); $processor = new Processor(); diff --git a/pkg/enqueue/Symfony/Client/DependencyInjection/ClientFactory.php b/pkg/enqueue/Symfony/Client/DependencyInjection/ClientFactory.php index 9a2392d54..47d8e8ee5 100644 --- a/pkg/enqueue/Symfony/Client/DependencyInjection/ClientFactory.php +++ b/pkg/enqueue/Symfony/Client/DependencyInjection/ClientFactory.php @@ -41,13 +41,19 @@ final class ClientFactory */ private $name; - public function __construct(string $name) + /** + * @var bool + */ + private $default; + + public function __construct(string $name, bool $default = false) { if (empty($name)) { throw new \InvalidArgumentException('The name could not be empty.'); } $this->name = $name; + $this->default = $default; } public static function getConfiguration(bool $debug, string $name = 'client'): NodeDefinition @@ -69,7 +75,7 @@ public static function getConfiguration(bool $debug, string $name = 'client'): N return $builder; } - public function build(ContainerBuilder $container, array $config, bool $default = false): void + public function build(ContainerBuilder $container, array $config): void { $container->register($this->format('context'), Context::class) ->setFactory([$this->reference('driver'), 'getContext']) @@ -176,7 +182,7 @@ public function build(ContainerBuilder $container, array $config, bool $default ->addTag('enqueue.consumption_extension', ['priority' => 10, 'client' => $this->name]) ; - $container->getDefinition('enqueue.client.default.delay_redelivered_message_extension') + $container->getDefinition($this->format('delay_redelivered_message_extension')) ->replaceArgument(1, $config['redelivered_delay_time']) ; } @@ -192,7 +198,7 @@ public function build(ContainerBuilder $container, array $config, bool $default ])); } - if ($default) { + if ($this->default) { $container->setAlias(ProducerInterface::class, $this->format('producer')); $container->setAlias(SpoolProducer::class, $this->format('spool_producer')); } diff --git a/pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php b/pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php index 5d98d3f8d..eb9454b3a 100644 --- a/pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php +++ b/pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php @@ -32,13 +32,19 @@ final class TransportFactory */ private $name; - public function __construct(string $name) + /** + * @var bool + */ + private $default; + + public function __construct(string $name, bool $default = false) { if (empty($name)) { throw new \InvalidArgumentException('The name could not be empty.'); } $this->name = $name; + $this->default = $default; } public static function getConfiguration(string $name = 'transport'): NodeDefinition @@ -149,7 +155,7 @@ public function buildConnectionFactory(ContainerBuilder $container, array $confi ; } - if ('default' === $this->name) { + if ($this->default) { $container->setAlias(ConnectionFactory::class, $this->format('connection_factory')); } } @@ -167,7 +173,7 @@ public function buildContext(ContainerBuilder $container, array $config): void $this->addServiceToLocator($container, 'context'); - if ('default' === $this->name) { + if ($this->default) { $container->setAlias(Context::class, $this->format('context')); } } @@ -201,7 +207,7 @@ public function buildQueueConsumer(ContainerBuilder $container, array $config): $this->addServiceToLocator($container, 'queue_consumer'); $this->addServiceToLocator($container, 'processor_registry'); - if ('default' === $this->name) { + if ($this->default) { $container->setAlias(QueueConsumerInterface::class, $this->format('queue_consumer')); } } @@ -220,7 +226,7 @@ public function buildRpcClient(ContainerBuilder $container, array $config): void ->addArgument(new Reference($this->format('rpc_factory'))) ; - if ('default' === $this->name) { + if ($this->default) { $container->setAlias(RpcClient::class, $this->format('rpc_client')); } } From 134b603fa655ac376e06b115f836586ccd25f284 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 15 Nov 2018 17:25:22 +0200 Subject: [PATCH 5/7] multi client configuration --- .../DependencyInjection/Configuration.php | 2 +- .../DependencyInjection/ConfigurationTest.php | 36 ++++++++++++------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/pkg/enqueue-bundle/DependencyInjection/Configuration.php b/pkg/enqueue-bundle/DependencyInjection/Configuration.php index 526afb4b0..d7b3ba3a1 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Configuration.php +++ b/pkg/enqueue-bundle/DependencyInjection/Configuration.php @@ -91,7 +91,7 @@ private function getAsyncEventsConfiguration(): ArrayNodeDefinition return MissingComponentFactory::getConfiguration('async_events', ['enqueue/async-event-dispatcher']); } - return (new ArrayNodeDefinition('job')) + return (new ArrayNodeDefinition('async_events')) ->addDefaultsIfNotSet() ->canBeEnabled() ; diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php index 0662813a7..0c41fdcb3 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php @@ -377,12 +377,16 @@ public function testShouldDisableAsyncEventsByDefault() $processor = new Processor(); $config = $processor->processConfiguration($configuration, [[ - 'transport' => [], + 'default' => [ + 'transport' => [], + ], ]]); $this->assertArraySubset([ - 'async_events' => [ - 'enabled' => false, + 'default' => [ + 'async_events' => [ + 'enabled' => false, + ], ], ], $config); } @@ -394,26 +398,34 @@ public function testShouldAllowEnableAsyncEvents() $processor = new Processor(); $config = $processor->processConfiguration($configuration, [[ - 'transport' => [], - 'async_events' => true, + 'default' => [ + 'transport' => [], + 'async_events' => true, + ], ]]); $this->assertArraySubset([ - 'async_events' => [ - 'enabled' => true, + 'default' => [ + 'async_events' => [ + 'enabled' => true, + ], ], ], $config); $config = $processor->processConfiguration($configuration, [[ - 'transport' => [], - 'async_events' => [ - 'enabled' => true, + 'default' => [ + 'transport' => [], + 'async_events' => [ + 'enabled' => true, + ], ], ]]); $this->assertArraySubset([ - 'async_events' => [ - 'enabled' => true, + 'default' => [ + 'async_events' => [ + 'enabled' => true, + ], ], ], $config); } From 2a7d75a59bd6e0415001bb42a507b2f59f29b7d8 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 15 Nov 2018 18:09:10 +0200 Subject: [PATCH 6/7] multi client configuration --- pkg/enqueue-bundle/Tests/Functional/App/config/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml b/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml index 72d279ac5..55499e13b 100644 --- a/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml +++ b/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml @@ -26,8 +26,8 @@ enqueue: transport: 'null:' client: traceable_producer: true - async_commands: true job: true + async_events: true async_commands: true services: From 1db3a2e48810913ac593c581ae9222bc33f0c4c2 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Thu, 15 Nov 2018 19:21:00 +0200 Subject: [PATCH 7/7] multi client configuration --- .../DependencyInjection/EnqueueExtension.php | 37 ++++++++++--------- .../Tests/Functional/App/config/config.yml | 8 ++-- .../Tests/Functional/RoutesCommandTest.php | 7 +--- 3 files changed, 25 insertions(+), 27 deletions(-) diff --git a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php index b7c4e424f..043039fa9 100644 --- a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php +++ b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php @@ -16,6 +16,7 @@ use Enqueue\Symfony\Client\DependencyInjection\ClientFactory; use Enqueue\Symfony\DependencyInjection\TransportFactory; use Enqueue\Symfony\DiUtils; +use Interop\Queue\Context; use Symfony\Component\Config\FileLocator; use Symfony\Component\Config\Resource\FileResource; use Symfony\Component\DependencyInjection\ContainerBuilder; @@ -35,7 +36,7 @@ public function load(array $configs, ContainerBuilder $container): void // find default configuration $defaultName = null; - foreach ($config as $name => $configs) { + foreach ($config as $name => $modules) { // set first as default if (null === $defaultName) { $defaultName = $name; @@ -49,45 +50,45 @@ public function load(array $configs, ContainerBuilder $container): void $transportNames = []; $clientNames = []; - foreach ($config as $name => $configs) { + foreach ($config as $name => $modules) { // transport & consumption $transportNames[] = $name; $transportFactory = (new TransportFactory($name, $defaultName === $name)); - $transportFactory->buildConnectionFactory($container, $configs['transport']); + $transportFactory->buildConnectionFactory($container, $modules['transport']); $transportFactory->buildContext($container, []); - $transportFactory->buildQueueConsumer($container, $configs['consumption']); + $transportFactory->buildQueueConsumer($container, $modules['consumption']); $transportFactory->buildRpcClient($container, []); // client - if (isset($configs['client'])) { + if (isset($modules['client'])) { $clientNames[] = $name; - $clientConfig = $configs['client']; + $clientConfig = $modules['client']; // todo - $clientConfig['transport'] = $configs['transport']; - $clientConfig['consumption'] = $configs['consumption']; + $clientConfig['transport'] = $modules['transport']; + $clientConfig['consumption'] = $modules['consumption']; $clientFactory = new ClientFactory($name, $defaultName === $name); $clientFactory->build($container, $clientConfig); - $clientFactory->createDriver($container, $configs['transport']); + $clientFactory->createDriver($container, $modules['transport']); $clientFactory->createFlushSpoolProducerListener($container); } // monitoring - if (isset($configs['monitoring'])) { + if (isset($modules['monitoring'])) { $monitoringFactory = new MonitoringFactory($name); - $monitoringFactory->buildStorage($container, $configs['monitoring']); - $monitoringFactory->buildConsumerExtension($container, $configs['monitoring']); + $monitoringFactory->buildStorage($container, $modules['monitoring']); + $monitoringFactory->buildConsumerExtension($container, $modules['monitoring']); - if (isset($configs['client'])) { - $monitoringFactory->buildClientExtension($container, $configs['monitoring']); + if (isset($modules['client'])) { + $monitoringFactory->buildClientExtension($container, $modules['monitoring']); } } // job-queue - if (false == empty($configs['job']['enabled'])) { - if (false === isset($configs['client'])) { + if (false == empty($modules['job']['enabled'])) { + if (false === isset($modules['client'])) { throw new \LogicException('Client is required for job-queue.'); } @@ -99,14 +100,14 @@ public function load(array $configs, ContainerBuilder $container): void } // async events - if (false == empty($config['async_events']['enabled'])) { + if (false == empty($modules['async_events']['enabled'])) { if ($name !== $defaultName) { throw new \LogicException('Async events supports only default configuration.'); } $extension = new AsyncEventDispatcherExtension(); $extension->load([[ - 'context_service' => 'enqueue.transport.default.context', + 'context_service' => Context::class, ]], $container); } } diff --git a/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml b/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml index 55499e13b..718990333 100644 --- a/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml +++ b/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml @@ -115,7 +115,7 @@ services: - {name: 'enqueue.event_transformer', eventName: 'test_async_subscriber', transformerName: 'test_async' } # overwrite async listener with one based on client producer. so we can use traceable producer. -# enqueue.events.async_listener: -# class: 'Enqueue\Bundle\Tests\Functional\App\AsyncListener' -# public: true -# arguments: ['@enqueue.client.default.producer', '@enqueue.events.registry'] \ No newline at end of file + enqueue.events.async_listener: + class: 'Enqueue\Bundle\Tests\Functional\App\AsyncListener' + public: true + arguments: ['@enqueue.client.default.producer', '@enqueue.events.registry'] \ No newline at end of file diff --git a/pkg/enqueue-bundle/Tests/Functional/RoutesCommandTest.php b/pkg/enqueue-bundle/Tests/Functional/RoutesCommandTest.php index 914d771e7..afefe6482 100644 --- a/pkg/enqueue-bundle/Tests/Functional/RoutesCommandTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/RoutesCommandTest.php @@ -26,16 +26,13 @@ public function testShouldDisplayRegisteredTopics() $tester->execute([]); $expected = <<<'OUTPUT' -| topic | theTopic | default (prefixed) | test_topic_subscriber_processor | (hidden) | +| topic | theTopic | default (prefixed) | test_topic_subscriber_processor | (hidden) | OUTPUT; $this->assertSame(0, $tester->getStatusCode()); $this->assertContains($expected, $tester->getDisplay()); } - /** - * @group testit - */ public function testShouldDisplayCommands() { /** @var RoutesCommand $command */ @@ -45,7 +42,7 @@ public function testShouldDisplayCommands() $tester->execute([]); $expected = <<<'OUTPUT' -| command | theCommand | default (prefixed) | test_command_subscriber_processor | (hidden) | +| command | theCommand | default (prefixed) | test_command_subscriber_processor | (hidden) | OUTPUT; $this->assertSame(0, $tester->getStatusCode());