From 811d0d8eae10c3e3fc2a5bc25ab4c0478fb2df05 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Sun, 21 Oct 2018 13:52:25 +0300 Subject: [PATCH 1/3] [client] Rename resource key factoryClass -> driverClass --- pkg/enqueue/Client/DriverFactory.php | 2 +- pkg/enqueue/Client/Resources.php | 32 +++++++++++----------- pkg/enqueue/Tests/Client/ResourcesTest.php | 18 ++++++------ 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/pkg/enqueue/Client/DriverFactory.php b/pkg/enqueue/Client/DriverFactory.php index 071ec0c62..36534d58c 100644 --- a/pkg/enqueue/Client/DriverFactory.php +++ b/pkg/enqueue/Client/DriverFactory.php @@ -33,7 +33,7 @@ public function create(ConnectionFactory $factory, string $dsn, array $config): $dsn = new Dsn($dsn); if ($driverInfo = $this->findDriverInfo($dsn, Resources::getAvailableDrivers())) { - $driverClass = $driverInfo['factoryClass']; + $driverClass = $driverInfo['driverClass']; if (RabbitMqDriver::class === $driverClass) { if (false == $factory instanceof AmqpConnectionFactory) { diff --git a/pkg/enqueue/Client/Resources.php b/pkg/enqueue/Client/Resources.php index 56fc76472..cdc803970 100644 --- a/pkg/enqueue/Client/Resources.php +++ b/pkg/enqueue/Client/Resources.php @@ -37,7 +37,7 @@ public static function getAvailableDrivers(): array $availableMap = []; foreach ($map as $item) { - if (class_exists($item['factoryClass'])) { + if (class_exists($item['driverClass'])) { $availableMap[] = $item; } } @@ -52,67 +52,67 @@ public static function getKnownDrivers(): array $map[] = [ 'schemes' => ['amqp', 'amqps'], - 'factoryClass' => AmqpDriver::class, + 'driverClass' => AmqpDriver::class, 'requiredSchemeExtensions' => [], 'packages' => ['enqueue/enqueue', 'enqueue/amqp-bunny'], ]; $map[] = [ 'schemes' => ['amqp', 'amqps'], - 'factoryClass' => RabbitMqDriver::class, + 'driverClass' => RabbitMqDriver::class, 'requiredSchemeExtensions' => ['rabbitmq'], 'packages' => ['enqueue/enqueue', 'enqueue/amqp-bunny'], ]; $map[] = [ 'schemes' => ['file'], - 'factoryClass' => FsDriver::class, + 'driverClass' => FsDriver::class, 'requiredSchemeExtensions' => [], 'packages' => ['enqueue/enqueue', 'enqueue/fs'], ]; $map[] = [ 'schemes' => ['null'], - 'factoryClass' => GenericDriver::class, + 'driverClass' => GenericDriver::class, 'requiredSchemeExtensions' => [], 'packages' => ['enqueue/enqueue', 'enqueue/null'], ]; $map[] = [ 'schemes' => ['gps'], - 'factoryClass' => GpsDriver::class, + 'driverClass' => GpsDriver::class, 'requiredSchemeExtensions' => [], 'packages' => ['enqueue/enqueue', 'enqueue/gps'], ]; $map[] = [ 'schemes' => ['redis'], - 'factoryClass' => RedisDriver::class, + 'driverClass' => RedisDriver::class, 'requiredSchemeExtensions' => [], 'packages' => ['enqueue/enqueue', 'enqueue/redis'], ]; $map[] = [ 'schemes' => ['sqs'], - 'factoryClass' => SqsDriver::class, + 'driverClass' => SqsDriver::class, 'requiredSchemeExtensions' => [], 'packages' => ['enqueue/enqueue', 'enqueue/sqs'], ]; $map[] = [ 'schemes' => ['stomp'], - 'factoryClass' => StompDriver::class, + 'driverClass' => StompDriver::class, 'requiredSchemeExtensions' => [], 'packages' => ['enqueue/enqueue', 'enqueue/stomp'], ]; $map[] = [ 'schemes' => ['stomp'], - 'factoryClass' => RabbitMqStompDriver::class, + 'driverClass' => RabbitMqStompDriver::class, 'requiredSchemeExtensions' => ['rabbitmq'], 'packages' => ['enqueue/enqueue', 'enqueue/stomp'], ]; $map[] = [ 'schemes' => ['kafka', 'rdkafka'], - 'factoryClass' => RdKafkaDriver::class, + 'driverClass' => RdKafkaDriver::class, 'requiredSchemeExtensions' => [], 'packages' => ['enqueue/enqueue', 'enqueue/rdkafka'], ]; $map[] = [ 'schemes' => ['mongodb'], - 'factoryClass' => MongodbDriver::class, + 'driverClass' => MongodbDriver::class, 'requiredSchemeExtensions' => [], 'packages' => ['enqueue/enqueue', 'enqueue/mongodb'], ]; @@ -132,19 +132,19 @@ public static function getKnownDrivers(): array 'sqlite3', 'sqlite', ], - 'factoryClass' => DbalDriver::class, + 'driverClass' => DbalDriver::class, 'requiredSchemeExtensions' => [], 'package' => ['enqueue/enqueue', 'enqueue/dbal'], ]; $map[] = [ 'schemes' => ['gearman'], - 'factoryClass' => GenericDriver::class, + 'driverClass' => GenericDriver::class, 'requiredSchemeExtensions' => [], 'package' => ['enqueue/enqueue', 'enqueue/gearman'], ]; $map[] = [ 'schemes' => ['beanstalk'], - 'factoryClass' => GenericDriver::class, + 'driverClass' => GenericDriver::class, 'requiredSchemeExtensions' => [], 'package' => ['enqueue/enqueue', 'enqueue/pheanstalk'], ]; @@ -173,7 +173,7 @@ public static function addDriver(string $driverClass, array $schemes, array $req self::getKnownDrivers(); self::$knownDrivers[] = [ 'schemes' => $schemes, - 'factoryClass' => $driverClass, + 'driverClass' => $driverClass, 'requiredSchemeExtensions' => $requiredExtensions, 'packages' => $packages, ]; diff --git a/pkg/enqueue/Tests/Client/ResourcesTest.php b/pkg/enqueue/Tests/Client/ResourcesTest.php index 7311f31d0..c2d2b117d 100644 --- a/pkg/enqueue/Tests/Client/ResourcesTest.php +++ b/pkg/enqueue/Tests/Client/ResourcesTest.php @@ -33,8 +33,8 @@ public function testShouldGetAvailableDriverInExpectedFormat() $driverInfo = $availableDrivers[0]; - $this->assertArrayHasKey('factoryClass', $driverInfo); - $this->assertSame(AmqpDriver::class, $driverInfo['factoryClass']); + $this->assertArrayHasKey('driverClass', $driverInfo); + $this->assertSame(AmqpDriver::class, $driverInfo['driverClass']); $this->assertArrayHasKey('schemes', $driverInfo); $this->assertSame(['amqp', 'amqps'], $driverInfo['schemes']); @@ -55,8 +55,8 @@ public function testShouldGetAvailableDriverWithRequiredExtensionInExpectedForma $driverInfo = $availableDrivers[1]; - $this->assertArrayHasKey('factoryClass', $driverInfo); - $this->assertSame(RabbitMqDriver::class, $driverInfo['factoryClass']); + $this->assertArrayHasKey('driverClass', $driverInfo); + $this->assertSame(RabbitMqDriver::class, $driverInfo['driverClass']); $this->assertArrayHasKey('schemes', $driverInfo); $this->assertSame(['amqp', 'amqps'], $driverInfo['schemes']); @@ -77,8 +77,8 @@ public function testShouldGetKnownDriversInExpectedFormat() $driverInfo = $knownDrivers[0]; - $this->assertArrayHasKey('factoryClass', $driverInfo); - $this->assertSame(AmqpDriver::class, $driverInfo['factoryClass']); + $this->assertArrayHasKey('driverClass', $driverInfo); + $this->assertSame(AmqpDriver::class, $driverInfo['driverClass']); $this->assertArrayHasKey('schemes', $driverInfo); $this->assertSame(['amqp', 'amqps'], $driverInfo['schemes']); @@ -126,7 +126,7 @@ public function testShouldAllowRegisterDriverThatIsNotInstalled() $driverInfo = end($availableDrivers); - $this->assertSame('theDriverClass', $driverInfo['factoryClass']); + $this->assertSame('theDriverClass', $driverInfo['driverClass']); } public function testShouldAllowGetPreviouslyRegisteredDriver() @@ -144,8 +144,8 @@ public function testShouldAllowGetPreviouslyRegisteredDriver() $driverInfo = end($availableDrivers); - $this->assertArrayHasKey('factoryClass', $driverInfo); - $this->assertSame($driverClass, $driverInfo['factoryClass']); + $this->assertArrayHasKey('driverClass', $driverInfo); + $this->assertSame($driverClass, $driverInfo['driverClass']); $this->assertArrayHasKey('schemes', $driverInfo); $this->assertSame(['fooscheme', 'barscheme'], $driverInfo['schemes']); From c67c6b7ede66e68aab54046f5dd2d11de8bd0eac Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Sun, 21 Oct 2018 21:31:46 +0300 Subject: [PATCH 2/3] [bundle] configure multiple transports. --- .../DependencyInjection/Configuration.php | 53 ++++++- .../DependencyInjection/EnqueueExtension.php | 18 ++- .../DependencyInjection/ConfigurationTest.php | 134 +++++++++++++++--- .../EnqueueExtensionTest.php | 18 +++ 4 files changed, 193 insertions(+), 30 deletions(-) diff --git a/pkg/enqueue-bundle/DependencyInjection/Configuration.php b/pkg/enqueue-bundle/DependencyInjection/Configuration.php index 3aca17a90..42c8715ca 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Configuration.php +++ b/pkg/enqueue-bundle/DependencyInjection/Configuration.php @@ -4,6 +4,7 @@ use Enqueue\Symfony\Client\DependencyInjection\ClientFactory; use Enqueue\Symfony\DependencyInjection\TransportFactory; +use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition; use Symfony\Component\Config\Definition\Builder\TreeBuilder; use Symfony\Component\Config\Definition\ConfigurationInterface; @@ -22,14 +23,58 @@ public function getConfigTreeBuilder(): TreeBuilder $rootNode = $tb->root('enqueue'); $rootNode ->beforeNormalization() - ->ifEmpty()->then(function () { - return ['transport' => ['dsn' => 'null:']]; - }); + ->always(function ($value) { + if (empty($value)) { + return [ + 'transport' => [ + 'default' => [ + 'dsn' => 'null:', + ], + ], + ]; + } + + if (is_string($value)) { + return [ + 'transport' => [ + 'default' => [ + 'dsn' => $value, + ], + ], + ]; + } + + return $value; + }) + ; $transportFactory = new TransportFactory('default'); + /** @var ArrayNodeDefinition $transportNode */ $transportNode = $rootNode->children()->arrayNode('transport'); - $transportFactory->addTransportConfiguration($transportNode); + $transportNode + ->beforeNormalization() + ->always(function ($value) { + if (empty($value)) { + return ['default' => ['dsn' => 'null:']]; + } + if (is_string($value)) { + return ['default' => ['dsn' => $value]]; + } + + if (is_array($value) && array_key_exists('dsn', $value)) { + return ['default' => $value]; + } + + return $value; + }); + $transportPrototypeNode = $transportNode + ->requiresAtLeastOneElement() + ->useAttributeAsKey('key') + ->prototype('array') + ; + + $transportFactory->addTransportConfiguration($transportPrototypeNode); $consumptionNode = $rootNode->children()->arrayNode('consumption'); $transportFactory->addQueueConsumerConfiguration($consumptionNode); diff --git a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php index 5d39a8e0f..8e0fdc414 100644 --- a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php +++ b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php @@ -25,11 +25,15 @@ public function load(array $configs, ContainerBuilder $container): void $loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config')); $loader->load('services.yml'); - $transportFactory = (new TransportFactory('default')); - $transportFactory->buildConnectionFactory($container, $config['transport']); - $transportFactory->buildContext($container, []); - $transportFactory->buildQueueConsumer($container, $config['consumption']); - $transportFactory->buildRpcClient($container, []); + foreach ($config['transport'] as $name => $transportConfig) { + $transportFactory = (new TransportFactory($name)); + $transportFactory->buildConnectionFactory($container, $transportConfig); + $transportFactory->buildContext($container, []); + $transportFactory->buildQueueConsumer($container, $config['consumption']); + $transportFactory->buildRpcClient($container, []); + } + + $container->setParameter('enqueue.transports', array_keys($config['transport'])); if (isset($config['client'])) { $this->setupAutowiringForProcessors($container); @@ -38,12 +42,12 @@ public function load(array $configs, ContainerBuilder $container): void $clientConfig = $config['client']; // todo - $clientConfig['transport'] = $config['transport']; + $clientConfig['transport'] = $config['transport']['default']; $clientConfig['consumption'] = $config['consumption']; $clientFactory = new ClientFactory('default'); $clientFactory->build($container, $clientConfig); - $clientFactory->createDriver($container, $config['transport']); + $clientFactory->createDriver($container, $config['transport']['default']); } if ($config['job']) { diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php index 4fdbd2939..76634af64 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php @@ -28,41 +28,133 @@ public function testCouldBeConstructedWithDebugAsArgument() new Configuration(true); } - public function testShouldUseDefaultConfigurationIfNothingIsConfiguredAtAll() + public function testShouldProcessNullAsDefaultNullTransport() { $configuration = new Configuration(true); $processor = new Processor(); - $config = $processor->processConfiguration($configuration, [[]]); + $config = $processor->processConfiguration($configuration, [null]); - $this->assertEquals([ - 'transport' => ['dsn' => 'null:'], - 'consumption' => [ - 'receive_timeout' => 10000, + $this->assertConfigEquals([ + 'transport' => [ + 'default' => ['dsn' => 'null:'], ], - 'job' => false, - 'async_events' => ['enabled' => false], - 'async_commands' => ['enabled' => false], - 'extensions' => [ - 'doctrine_ping_connection_extension' => false, - 'doctrine_clear_identity_map_extension' => false, - 'signal_extension' => function_exists('pcntl_signal_dispatch'), - 'reply_extension' => true, + ], $config); + } + + public function testShouldProcessStringAsDefaultDsnTransport() + { + $configuration = new Configuration(true); + + $processor = new Processor(); + $config = $processor->processConfiguration($configuration, ['foo://bar?option=val']); + + $this->assertConfigEquals([ + 'transport' => [ + 'default' => ['dsn' => 'foo://bar?option=val'], + ], + ], $config); + } + + public function testShouldProcessEmptyArrayAsDefaultNullTransport() + { + $configuration = new Configuration(true); + + $processor = new Processor(); + $config = $processor->processConfiguration($configuration, ['foo://bar?option=val']); + + $this->assertConfigEquals([ + 'transport' => [ + 'default' => ['dsn' => 'foo://bar?option=val'], + ], + ], $config); + } + + public function testShouldProcessSingleTransportAsDefault() + { + $configuration = new Configuration(true); + + $processor = new Processor(); + $config = $processor->processConfiguration($configuration, [[ + 'transport' => 'foo://bar?option=val', + ]]); + + $this->assertConfigEquals([ + 'transport' => [ + 'default' => ['dsn' => 'foo://bar?option=val'], + ], + ], $config); + } + + public function testShouldProcessTransportWithDsnKeyAsDefault() + { + $configuration = new Configuration(true); + + $processor = new Processor(); + $config = $processor->processConfiguration($configuration, [[ + 'transport' => [ + 'dsn' => 'foo://bar?option=val', + ], + ]]); + + $this->assertConfigEquals([ + 'transport' => [ + 'default' => ['dsn' => 'foo://bar?option=val'], ], ], $config); } - public function testShouldUseDefaultTransportIfIfTransportIsConfiguredAtAll() + public function testShouldProcessSeveralTransports() { $configuration = new Configuration(true); $processor = new Processor(); $config = $processor->processConfiguration($configuration, [[ - 'transport' => null, + 'transport' => [ + 'default' => ['dsn' => 'default:'], + 'foo' => ['dsn' => 'foo:'], + 'bar' => ['dsn' => 'bar:'], + ], ]]); + $this->assertConfigEquals([ + 'transport' => [ + 'default' => ['dsn' => 'default:'], + 'foo' => ['dsn' => 'foo:'], + 'bar' => ['dsn' => 'bar:'], + ], + ], $config); + } + + public function testTransportFactoryShouldValidateEachTransportAccordingToItsRules() + { + $configuration = new Configuration(true); + + $processor = new Processor(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Both options factory_class and factory_service are set. Please choose one.'); + $processor->processConfiguration($configuration, [ + [ + 'transport' => [ + 'default' => [ + 'factory_class' => 'aClass', + 'factory_service' => 'aService', + ], + ], + ], + ]); + } + + public function testShouldUseDefaultConfigurationIfNothingIsConfiguredAtAll() + { + $configuration = new Configuration(true); + + $processor = new Processor(); + $config = $processor->processConfiguration($configuration, [[]]); + $this->assertEquals([ - 'transport' => ['dsn' => 'null:'], + 'transport' => ['default' => ['dsn' => 'null:']], 'consumption' => [ 'receive_timeout' => 10000, ], @@ -88,8 +180,7 @@ public function testShouldSetDefaultConfigurationForClient() 'client' => null, ]]); - $this->assertArraySubset([ - 'transport' => ['dsn' => 'null:'], + $this->assertConfigEquals([ 'client' => [ 'prefix' => 'enqueue', 'app_name' => 'app', @@ -403,4 +494,9 @@ public function testShouldAllowConfigureConsumption() ], ], $config); } + + private function assertConfigEquals(array $expected, array $actual): void + { + $this->assertArraySubset($expected, $actual, false, var_export($actual, true)); + } } diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php index 8b4ebf7b2..d1a483067 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php @@ -443,6 +443,24 @@ public function testShouldConfigureQueueConsumer() $this->assertSame(456, $def->getArgument(4)); } + public function testShouldSetPropertyWithAllConfiguredTransports() + { + $container = $this->getContainerBuilder(true); + + $extension = new EnqueueExtension(); + $extension->load([[ + 'client' => [], + 'transport' => [ + 'default' => ['dsn' => 'default:'], + 'foo' => ['dsn' => 'foo:'], + 'bar' => ['dsn' => 'foo:'], + ], + ]], $container); + + $this->assertTrue($container->hasParameter('enqueue.transports')); + $this->assertEquals(['default', 'foo', 'bar'], $container->getParameter('enqueue.transports')); + } + public function testShouldLoadProcessAutoconfigureChildDefinition() { $container = $this->getContainerBuilder(true); From 181740d3fdb003c9c90ca41a88ce591da93df793 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Sun, 21 Oct 2018 21:52:00 +0300 Subject: [PATCH 3/3] Make build processor registyry pass support multi transports. --- pkg/enqueue-bundle/EnqueueBundle.php | 4 +- .../DependencyInjection/ClientFactory.php | 1 - .../FormatClientNameTrait.php | 2 +- .../BuildConsumptionExtensionsPass.php | 75 ++++++++++--------- .../BuildProcessorRegistryPass.php | 59 ++++++++------- .../FormatTransportNameTrait.php | 15 ++++ .../BuildConsumptionExtensionsPassTest.php | 46 +++++++----- .../BuildProcessorRegistryPassTest.php | 74 ++++++++++++++---- 8 files changed, 176 insertions(+), 100 deletions(-) rename pkg/enqueue/Symfony/{ => Client}/DependencyInjection/FormatClientNameTrait.php (94%) diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php index b1b2ad85d..a01ae5a1b 100644 --- a/pkg/enqueue-bundle/EnqueueBundle.php +++ b/pkg/enqueue-bundle/EnqueueBundle.php @@ -23,8 +23,8 @@ class EnqueueBundle extends Bundle public function build(ContainerBuilder $container): void { //transport passes - $container->addCompilerPass(new BuildConsumptionExtensionsPass('default')); - $container->addCompilerPass(new BuildProcessorRegistryPass('default')); + $container->addCompilerPass(new BuildConsumptionExtensionsPass()); + $container->addCompilerPass(new BuildProcessorRegistryPass()); //client passes $container->addCompilerPass(new BuildClientConsumptionExtensionsPass('default')); diff --git a/pkg/enqueue/Symfony/Client/DependencyInjection/ClientFactory.php b/pkg/enqueue/Symfony/Client/DependencyInjection/ClientFactory.php index e610ad87e..f78c1abbb 100644 --- a/pkg/enqueue/Symfony/Client/DependencyInjection/ClientFactory.php +++ b/pkg/enqueue/Symfony/Client/DependencyInjection/ClientFactory.php @@ -21,7 +21,6 @@ use Enqueue\Consumption\QueueConsumer; use Enqueue\Rpc\RpcFactory; use Enqueue\Symfony\ContainerProcessorRegistry; -use Enqueue\Symfony\DependencyInjection\FormatClientNameTrait; use Interop\Queue\Context; use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition; use Symfony\Component\DependencyInjection\ContainerBuilder; diff --git a/pkg/enqueue/Symfony/DependencyInjection/FormatClientNameTrait.php b/pkg/enqueue/Symfony/Client/DependencyInjection/FormatClientNameTrait.php similarity index 94% rename from pkg/enqueue/Symfony/DependencyInjection/FormatClientNameTrait.php rename to pkg/enqueue/Symfony/Client/DependencyInjection/FormatClientNameTrait.php index 7880d126c..6f76d2b5b 100644 --- a/pkg/enqueue/Symfony/DependencyInjection/FormatClientNameTrait.php +++ b/pkg/enqueue/Symfony/Client/DependencyInjection/FormatClientNameTrait.php @@ -1,6 +1,6 @@ name = $transportName; - } + protected $name; public function process(ContainerBuilder $container): void { - $extensionsId = sprintf('enqueue.transport.%s.consumption_extensions', $this->name); - if (false == $container->hasDefinition($extensionsId)) { - return; + if (false == $container->hasParameter('enqueue.transports')) { + throw new \LogicException('The "enqueue.transports" parameter must be set.'); } - $tags = $container->findTaggedServiceIds('enqueue.transport.consumption_extension'); + $names = $container->getParameter('enqueue.transports'); - $groupByPriority = []; - foreach ($tags as $serviceId => $tagAttributes) { - foreach ($tagAttributes as $tagAttribute) { - $transport = $tagAttribute['transport'] ?? 'default'; + foreach ($names as $name) { + $this->name = $name; - if ($transport !== $this->name && 'all' !== $transport) { - continue; - } + $extensionsId = $this->format('consumption_extensions'); + if (false == $container->hasDefinition($extensionsId)) { + throw new \LogicException(sprintf('Service "%s" not found', $extensionsId)); + } + + $tags = $container->findTaggedServiceIds('enqueue.transport.consumption_extension'); + + $groupByPriority = []; + foreach ($tags as $serviceId => $tagAttributes) { + foreach ($tagAttributes as $tagAttribute) { + $transport = $tagAttribute['transport'] ?? 'default'; - $priority = (int) ($tagAttribute['priority'] ?? 0); + if ($transport !== $this->name && 'all' !== $transport) { + continue; + } - $groupByPriority[$priority][] = new Reference($serviceId); + $priority = (int) ($tagAttribute['priority'] ?? 0); + + $groupByPriority[$priority][] = new Reference($serviceId); + } } - } - krsort($groupByPriority, SORT_NUMERIC); + krsort($groupByPriority, SORT_NUMERIC); - $flatExtensions = []; - foreach ($groupByPriority as $extension) { - $flatExtensions = array_merge($flatExtensions, $extension); + $flatExtensions = []; + foreach ($groupByPriority as $extension) { + $flatExtensions = array_merge($flatExtensions, $extension); + } + + $extensionsService = $container->getDefinition($extensionsId); + $extensionsService->replaceArgument(0, array_merge( + $extensionsService->getArgument(0), + $flatExtensions + )); } + } - $extensionsService = $container->getDefinition($extensionsId); - $extensionsService->replaceArgument(0, array_merge( - $extensionsService->getArgument(0), - $flatExtensions - )); + protected function getName(): string + { + return $this->name; } } diff --git a/pkg/enqueue/Symfony/DependencyInjection/BuildProcessorRegistryPass.php b/pkg/enqueue/Symfony/DependencyInjection/BuildProcessorRegistryPass.php index 8493dc7f9..c9b7f8b3a 100644 --- a/pkg/enqueue/Symfony/DependencyInjection/BuildProcessorRegistryPass.php +++ b/pkg/enqueue/Symfony/DependencyInjection/BuildProcessorRegistryPass.php @@ -9,44 +9,49 @@ final class BuildProcessorRegistryPass implements CompilerPassInterface { - /** - * @var string - */ - private $name; + use FormatTransportNameTrait; - public function __construct(string $transportName) - { - if (empty($transportName)) { - throw new \InvalidArgumentException('The name could not be empty.'); - } - - $this->name = $transportName; - } + protected $name; public function process(ContainerBuilder $container): void { - $processorRegistryId = sprintf('enqueue.transport.%s.processor_registry', $this->name); - if (false == $container->hasDefinition($processorRegistryId)) { - return; + if (false == $container->hasParameter('enqueue.transports')) { + throw new \LogicException('The "enqueue.transports" parameter must be set.'); } - $tag = 'enqueue.transport.processor'; - $map = []; - foreach ($container->findTaggedServiceIds($tag) as $serviceId => $tagAttributes) { - foreach ($tagAttributes as $tagAttribute) { - $transport = $tagAttribute['transport'] ?? 'default'; + $names = $container->getParameter('enqueue.transports'); - if ($transport !== $this->name && 'all' !== $transport) { - continue; - } + foreach ($names as $name) { + $this->name = $name; + + $processorRegistryId = $this->format('processor_registry'); + if (false == $container->hasDefinition($processorRegistryId)) { + throw new \LogicException(sprintf('Service "%s" not found', $processorRegistryId)); + } - $processor = $tagAttribute['processor'] ?? $serviceId; + $tag = 'enqueue.transport.processor'; + $map = []; + foreach ($container->findTaggedServiceIds($tag) as $serviceId => $tagAttributes) { + foreach ($tagAttributes as $tagAttribute) { + $transport = $tagAttribute['transport'] ?? 'default'; - $map[$processor] = new Reference($serviceId); + if ($transport !== $this->name && 'all' !== $transport) { + continue; + } + + $processor = $tagAttribute['processor'] ?? $serviceId; + + $map[$processor] = new Reference($serviceId); + } } + + $registry = $container->getDefinition($processorRegistryId); + $registry->setArgument(0, ServiceLocatorTagPass::register($container, $map, $processorRegistryId)); } + } - $registry = $container->getDefinition($processorRegistryId); - $registry->setArgument(0, ServiceLocatorTagPass::register($container, $map, $processorRegistryId)); + protected function getName(): string + { + return $this->name; } } diff --git a/pkg/enqueue/Symfony/DependencyInjection/FormatTransportNameTrait.php b/pkg/enqueue/Symfony/DependencyInjection/FormatTransportNameTrait.php index 17a41d0b9..e5531b579 100644 --- a/pkg/enqueue/Symfony/DependencyInjection/FormatTransportNameTrait.php +++ b/pkg/enqueue/Symfony/DependencyInjection/FormatTransportNameTrait.php @@ -2,10 +2,25 @@ namespace Enqueue\Symfony\DependencyInjection; +use Symfony\Component\DependencyInjection\ContainerInterface; +use Symfony\Component\DependencyInjection\Reference; + trait FormatTransportNameTrait { abstract protected function getName(): string; + private function reference(string $serviceName, $invalidBehavior = ContainerInterface::EXCEPTION_ON_INVALID_REFERENCE): Reference + { + return new Reference($this->format($serviceName), $invalidBehavior); + } + + private function parameter(string $serviceName): string + { + $fullName = $this->format($serviceName, false); + + return "%$fullName%"; + } + private function format(string $serviceName, $parameter = false): string { $pattern = 'enqueue.transport.%s.'.$serviceName; diff --git a/pkg/enqueue/Tests/Symfony/DependencyInjection/BuildConsumptionExtensionsPassTest.php b/pkg/enqueue/Tests/Symfony/DependencyInjection/BuildConsumptionExtensionsPassTest.php index 6a7e83fac..5401b7e18 100644 --- a/pkg/enqueue/Tests/Symfony/DependencyInjection/BuildConsumptionExtensionsPassTest.php +++ b/pkg/enqueue/Tests/Symfony/DependencyInjection/BuildConsumptionExtensionsPassTest.php @@ -25,28 +25,29 @@ public function testShouldBeFinal() $this->assertClassFinal(BuildConsumptionExtensionsPass::class); } - public function testCouldBeConstructedWithName() + public function testCouldBeConstructedWithoutArguments() { - $pass = new BuildConsumptionExtensionsPass('aName'); - - $this->assertAttributeSame('aName', 'name', $pass); + new BuildConsumptionExtensionsPass(); } - public function testThrowIfNameEmptyOnConstruct() + public function testThrowIfEnqueueTransportsParameterNotSet() { - $this->expectException(\InvalidArgumentException::class); - $this->expectExceptionMessage('The name could not be empty.'); - new BuildConsumptionExtensionsPass(''); + $pass = new BuildConsumptionExtensionsPass(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The "enqueue.transports" parameter must be set.'); + $pass->process(new ContainerBuilder()); } - public function testShouldDoNothingIfExtensionsServiceIsNotRegistered() + public function testThrowsIfNoConsumptionExtensionsServiceFoundForConfiguredTransport() { $container = new ContainerBuilder(); + $container->setParameter('enqueue.transports', ['foo', 'bar']); - //guard - $this->assertFalse($container->hasDefinition('enqueue.transport.aName.consumption_extensions')); + $pass = new BuildConsumptionExtensionsPass(); - $pass = new BuildConsumptionExtensionsPass('aName'); + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Service "enqueue.transport.foo.consumption_extensions" not found'); $pass->process($container); } @@ -56,6 +57,7 @@ public function testShouldRegisterTransportExtension() $extensions->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.transports', ['aName']); $container->setDefinition('enqueue.transport.aName.consumption_extensions', $extensions); $container->register('aFooExtension', ExtensionInterface::class) @@ -65,7 +67,7 @@ public function testShouldRegisterTransportExtension() ->addTag('enqueue.transport.consumption_extension', ['transport' => 'aName']) ; - $pass = new BuildConsumptionExtensionsPass('aName'); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $this->assertInternalType('array', $extensions->getArgument(0)); @@ -81,6 +83,7 @@ public function testShouldIgnoreOtherTransportExtensions() $extensions->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.transports', ['aName']); $container->setDefinition('enqueue.transport.aName.consumption_extensions', $extensions); $container->register('aFooExtension', ExtensionInterface::class) @@ -90,7 +93,7 @@ public function testShouldIgnoreOtherTransportExtensions() ->addTag('enqueue.transport.consumption_extension', ['transport' => 'anotherName']) ; - $pass = new BuildConsumptionExtensionsPass('aName'); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $this->assertInternalType('array', $extensions->getArgument(0)); @@ -105,6 +108,7 @@ public function testShouldAddExtensionIfTransportAll() $extensions->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.transports', ['aName']); $container->setDefinition('enqueue.transport.aName.consumption_extensions', $extensions); $container->register('aFooExtension', ExtensionInterface::class) @@ -114,7 +118,7 @@ public function testShouldAddExtensionIfTransportAll() ->addTag('enqueue.transport.consumption_extension', ['transport' => 'anotherName']) ; - $pass = new BuildConsumptionExtensionsPass('aName'); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $this->assertInternalType('array', $extensions->getArgument(0)); @@ -129,6 +133,7 @@ public function testShouldTreatTagsWithoutTransportAsDefaultTransport() $extensions->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.transports', ['default']); $container->setDefinition('enqueue.transport.default.consumption_extensions', $extensions); $container->register('aFooExtension', ExtensionInterface::class) @@ -138,7 +143,7 @@ public function testShouldTreatTagsWithoutTransportAsDefaultTransport() ->addTag('enqueue.transport.consumption_extension') ; - $pass = new BuildConsumptionExtensionsPass('default'); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $this->assertInternalType('array', $extensions->getArgument(0)); @@ -151,6 +156,7 @@ public function testShouldTreatTagsWithoutTransportAsDefaultTransport() public function testShouldOrderExtensionsByPriority() { $container = new ContainerBuilder(); + $container->setParameter('enqueue.transports', ['default']); $extensions = new Definition(); $extensions->addArgument([]); @@ -168,7 +174,7 @@ public function testShouldOrderExtensionsByPriority() $extension->addTag('enqueue.transport.consumption_extension', ['priority' => 2]); $container->setDefinition('baz_extension', $extension); - $pass = new BuildConsumptionExtensionsPass('default'); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $orderedExtensions = $extensions->getArgument(0); @@ -182,6 +188,7 @@ public function testShouldOrderExtensionsByPriority() public function testShouldAssumePriorityZeroIfPriorityIsNotSet() { $container = new ContainerBuilder(); + $container->setParameter('enqueue.transports', ['default']); $extensions = new Definition(); $extensions->addArgument([]); @@ -199,7 +206,7 @@ public function testShouldAssumePriorityZeroIfPriorityIsNotSet() $extension->addTag('enqueue.transport.consumption_extension', ['priority' => -1]); $container->setDefinition('baz_extension', $extension); - $pass = new BuildConsumptionExtensionsPass('default'); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $orderedExtensions = $extensions->getArgument(0); @@ -219,6 +226,7 @@ public function testShouldMergeWithAddedPreviously() ]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.transports', ['aName']); $container->setDefinition('enqueue.transport.aName.consumption_extensions', $extensions); $container->register('aFooExtension', ExtensionInterface::class) @@ -228,7 +236,7 @@ public function testShouldMergeWithAddedPreviously() ->addTag('enqueue.transport.consumption_extension') ; - $pass = new BuildConsumptionExtensionsPass('aName'); + $pass = new BuildConsumptionExtensionsPass(); $pass->process($container); $this->assertInternalType('array', $extensions->getArgument(0)); diff --git a/pkg/enqueue/Tests/Symfony/DependencyInjection/BuildProcessorRegistryPassTest.php b/pkg/enqueue/Tests/Symfony/DependencyInjection/BuildProcessorRegistryPassTest.php index 039024f7a..e9b9c7d71 100644 --- a/pkg/enqueue/Tests/Symfony/DependencyInjection/BuildProcessorRegistryPassTest.php +++ b/pkg/enqueue/Tests/Symfony/DependencyInjection/BuildProcessorRegistryPassTest.php @@ -26,24 +26,30 @@ public function testShouldBeFinal() $this->assertClassFinal(BuildProcessorRegistryPass::class); } - public function testCouldBeConstructedWithName() + public function testCouldBeConstructedWithoutArguments() { - $pass = new BuildProcessorRegistryPass('aName'); - - $this->assertAttributeSame('aName', 'name', $pass); + new BuildProcessorRegistryPass(); } - public function testThrowIfNameEmptyOnConstruct() + public function testThrowIfEnqueueTransportsParameterNotSet() { - $this->expectException(\InvalidArgumentException::class); - $this->expectExceptionMessage('The name could not be empty.'); - new BuildProcessorRegistryPass(''); + $pass = new BuildProcessorRegistryPass(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The "enqueue.transports" parameter must be set.'); + $pass->process(new ContainerBuilder()); } - public function testShouldDoNothingIfProcessorRegistryServiceIsNotRegistered() + public function testThrowsIfNoRegistryServiceFoundForConfiguredTransport() { - $pass = new BuildProcessorRegistryPass('aName'); - $pass->process(new ContainerBuilder()); + $container = new ContainerBuilder(); + $container->setParameter('enqueue.transports', ['foo', 'bar']); + + $pass = new BuildProcessorRegistryPass(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Service "enqueue.transport.foo.processor_registry" not found'); + $pass->process($container); } public function testShouldRegisterProcessorWithMatchedName() @@ -52,6 +58,7 @@ public function testShouldRegisterProcessorWithMatchedName() $registry->addArgument([]); $container = new ContainerBuilder(); + $container->setParameter('enqueue.transports', ['foo']); $container->setDefinition('enqueue.transport.foo.processor_registry', $registry); $container->register('aFooProcessor', 'aProcessorClass') ->addTag('enqueue.transport.processor', ['transport' => 'foo']) @@ -60,7 +67,7 @@ public function testShouldRegisterProcessorWithMatchedName() ->addTag('enqueue.transport.processor', ['transport' => 'bar']) ; - $pass = new BuildProcessorRegistryPass('foo'); + $pass = new BuildProcessorRegistryPass(); $pass->process($container); @@ -71,12 +78,47 @@ public function testShouldRegisterProcessorWithMatchedName() ]); } + public function testShouldRegisterProcessorWithMatchedNameToCorrespondingRegistries() + { + $fooRegistry = new Definition(ProcessorRegistryInterface::class); + $fooRegistry->addArgument([]); + + $barRegistry = new Definition(ProcessorRegistryInterface::class); + $barRegistry->addArgument([]); + + $container = new ContainerBuilder(); + $container->setParameter('enqueue.transports', ['foo', 'bar']); + $container->setDefinition('enqueue.transport.foo.processor_registry', $fooRegistry); + $container->setDefinition('enqueue.transport.bar.processor_registry', $barRegistry); + $container->register('aFooProcessor', 'aProcessorClass') + ->addTag('enqueue.transport.processor', ['transport' => 'foo']) + ; + $container->register('aBarProcessor', 'aProcessorClass') + ->addTag('enqueue.transport.processor', ['transport' => 'bar']) + ; + + $pass = new BuildProcessorRegistryPass(); + + $pass->process($container); + + $this->assertInstanceOf(Reference::class, $fooRegistry->getArgument(0)); + $this->assertLocatorServices($container, $fooRegistry->getArgument(0), [ + 'aFooProcessor' => 'aFooProcessor', + ]); + + $this->assertInstanceOf(Reference::class, $barRegistry->getArgument(0)); + $this->assertLocatorServices($container, $barRegistry->getArgument(0), [ + 'aBarProcessor' => 'aBarProcessor', + ]); + } + public function testShouldRegisterProcessorWithoutNameToDefaultTransport() { $registry = new Definition(ProcessorRegistryInterface::class); $registry->addArgument(null); $container = new ContainerBuilder(); + $container->setParameter('enqueue.transports', ['default']); $container->setDefinition('enqueue.transport.default.processor_registry', $registry); $container->register('aFooProcessor', 'aProcessorClass') ->addTag('enqueue.transport.processor', []) @@ -85,7 +127,7 @@ public function testShouldRegisterProcessorWithoutNameToDefaultTransport() ->addTag('enqueue.transport.processor', ['transport' => 'bar']) ; - $pass = new BuildProcessorRegistryPass('default'); + $pass = new BuildProcessorRegistryPass(); $pass->process($container); @@ -102,6 +144,7 @@ public function testShouldRegisterProcessorIfTransportNameEqualsAll() $registry->addArgument(null); $container = new ContainerBuilder(); + $container->setParameter('enqueue.transports', ['default']); $container->setDefinition('enqueue.transport.default.processor_registry', $registry); $container->register('aFooProcessor', 'aProcessorClass') ->addTag('enqueue.transport.processor', ['transport' => 'all']) @@ -110,7 +153,7 @@ public function testShouldRegisterProcessorIfTransportNameEqualsAll() ->addTag('enqueue.transport.processor', ['transport' => 'bar']) ; - $pass = new BuildProcessorRegistryPass('default'); + $pass = new BuildProcessorRegistryPass(); $pass->process($container); @@ -127,12 +170,13 @@ public function testShouldRegisterWithCustomProcessorName() $registry->addArgument(null); $container = new ContainerBuilder(); + $container->setParameter('enqueue.transports', ['default']); $container->setDefinition('enqueue.transport.default.processor_registry', $registry); $container->register('aFooProcessor', 'aProcessorClass') ->addTag('enqueue.transport.processor', ['processor' => 'customProcessorName']) ; - $pass = new BuildProcessorRegistryPass('default'); + $pass = new BuildProcessorRegistryPass(); $pass->process($container);