Skip to content

Factories rework. #106

New issue

Have a question about this project? No Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “No Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? No Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 64 additions & 13 deletions pkg/enqueue-bundle/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,22 @@
namespace Enqueue\Bundle\DependencyInjection;

use Enqueue\Client\Config;
use Enqueue\Symfony\TransportFactoryInterface;
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
use Symfony\Component\Config\Definition\ConfigurationInterface;

class Configuration implements ConfigurationInterface
{
/**
* @var TransportFactoryInterface[]
* @var string[]
*/
private $factories;
private $factoriesNames;

/**
* @param TransportFactoryInterface[] $factories
* @param string[] $factoriesNames
*/
public function __construct(array $factories)
public function __construct(array $factoriesNames)
{
$this->factories = $factories;
$this->factoriesNames = $factoriesNames;
}

/**
Expand All @@ -30,17 +29,69 @@ public function getConfigTreeBuilder()
$tb = new TreeBuilder();
$rootNode = $tb->root('enqueue');

$transportChildren = $rootNode->children()
->arrayNode('transport')->isRequired()->children();
$rootNode
->beforeNormalization()
->always(function ($v) {
if (empty($v['transport'])) {
$v['transport'] = [
'default' => ['dsn' => 'null://'],
];
}

foreach ($this->factories as $factory) {
$factory->addConfiguration(
$transportChildren->arrayNode($factory->getName())
);
}
if (is_string($v['transport'])) {
$v['transport'] = [
'default' => ['dsn' => $v['transport']],
];
}

if (is_array($v['transport'])) {
foreach ($v['transport'] as $name => $config) {
if (empty($config)) {
$config = ['dsn' => 'null://'];
}

if (is_string($config)) {
$config = ['dsn' => $config];
}

if (empty($config['dsn']) && empty($config['config'])) {
throw new \LogicException(sprintf('The transport "%s" is incorrectly configured. Either "dsn" or "config" must be set.', $name));
}

$v['transport'][$name] = $config;
}
}

return $v;
})
->end()
->children()
->arrayNode('transport')
->prototype('array')
->beforeNormalization()
->ifString()->then(function ($v) {
return ['dsn' => $v];
})
->ifEmpty()->then(function ($v) {
return ['dsn' => 'null://'];
})
->end()
->children()
->scalarNode('dsn')->end()
->enumNode('factory')->values($this->factoriesNames)->end()
->variableNode('config')
->treatNullLike([])
->info('The place for factory specific options')
->end()
->end()
->end()
->end()
->end()
;

$rootNode->children()
->arrayNode('client')->children()
->scalarNode('transport')->defaultValue('default')->end()
->booleanNode('traceable_producer')->defaultFalse()->end()
->scalarNode('prefix')->defaultValue('enqueue')->end()
->scalarNode('app_name')->defaultValue('app')->end()
Expand Down
48 changes: 44 additions & 4 deletions pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
use Enqueue\Client\TraceableProducer;
use Enqueue\JobQueue\Job;
use Enqueue\Null\Symfony\NullTransportFactory;
use Enqueue\Psr\PsrConnectionFactory;
use Enqueue\Psr\PsrContext;
use Enqueue\Symfony\DefaultTransportFactory;
use Enqueue\Symfony\TransportFactoryInterface;
use Symfony\Component\Config\FileLocator;
Expand Down Expand Up @@ -44,25 +46,63 @@ public function addTransportFactory(TransportFactoryInterface $transportFactory)
throw new \LogicException(sprintf('Transport factory with such name already added. Name %s', $name));
}

$this->factories[$name] = $transportFactory;
// $this->factories[$name] = $transportFactory;
}

/**
* @param string $name
* @param string $factoryClass
*/
public function addFactoryClass($name, $factoryClass)
{
if (array_key_exists($name, $this->factories)) {
throw new \LogicException(sprintf('The factory with such name has already been added. Name "%s"', $name));
}

$this->factories[$name] = $factoryClass;
}

/**
* {@inheritdoc}
*/
public function load(array $configs, ContainerBuilder $container)
{
$config = $this->processConfiguration(new Configuration($this->factories), $configs);
$config = $this->processConfiguration(new Configuration(array_keys($this->factories)), $configs);

$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
$loader->load('services.yml');

$container->getDefinition('enqueue.connection_factory_factory')
->replaceArgument(0, $this->factories);

foreach ($config['transport'] as $name => $transportConfig) {
$this->factories[$name]->createConnectionFactory($container, $transportConfig);
$this->factories[$name]->createContext($container, $transportConfig);
$factoryId = sprintf('enqueue.transport.%s.connection_factory', $name);
$contextId = sprintf('enqueue.transport.%s.context', $name);

if (isset($transportConfig['dsn'])) {
$transportConfig = $transportConfig['dsn'];
}

$container->register($factoryId, PsrConnectionFactory::class)
->addArgument($transportConfig)
->setFactory([new Reference('enqueue.connection_factory_factory'), 'createFactory'])
;

$container->register($contextId, PsrContext::class)
->setFactory([new Reference($factoryId), 'createContext'])
;
}

if (isset($config['client'])) {
$container->setAlias(
'enqueue.client.transport.connection_factory',
sprintf('enqueue.transport.%s.connection_factory', $config['client']['transport'])
);
$container->setAlias(
'enqueue.client.transport.context',
sprintf('enqueue.transport.%s.context', $config['client']['transport'])
);

$loader->load('client.yml');
$loader->load('extensions/flush_spool_producer_extension.yml');

Expand Down
19 changes: 8 additions & 11 deletions pkg/enqueue-bundle/EnqueueBundle.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use Enqueue\AmqpExt\AmqpContext;
use Enqueue\AmqpExt\Symfony\AmqpTransportFactory;
use Enqueue\AmqpExt\Symfony\RabbitMqAmqpTransportFactory;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientExtensionsPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass;
use Enqueue\Bundle\DependencyInjection\Compiler\BuildConsumptionExtensionsPass;
Expand All @@ -15,15 +14,14 @@
use Enqueue\Bundle\Events\DependencyInjection\AsyncEventsPass;
use Enqueue\Bundle\Events\DependencyInjection\AsyncTransformersPass;
use Enqueue\Dbal\DbalContext;
use Enqueue\Dbal\ManagerRegistryConnectionFactory;
use Enqueue\Dbal\Symfony\DbalTransportFactory;
use Enqueue\Fs\FsContext;
use Enqueue\Fs\Symfony\FsTransportFactory;
use Enqueue\Redis\RedisContext;
use Enqueue\Redis\Symfony\RedisTransportFactory;
use Enqueue\Sqs\SqsContext;
use Enqueue\Sqs\Symfony\SqsTransportFactory;
use Enqueue\Stomp\StompContext;
use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory;
use Enqueue\Stomp\Symfony\StompTransportFactory;
use Symfony\Component\DependencyInjection\Compiler\PassConfig;
use Symfony\Component\DependencyInjection\ContainerBuilder;
Expand All @@ -47,29 +45,28 @@ public function build(ContainerBuilder $container)
$extension = $container->getExtension('enqueue');

if (class_exists(StompContext::class)) {
$extension->addTransportFactory(new StompTransportFactory());
$extension->addTransportFactory(new RabbitMqStompTransportFactory());
$extension->addFactoryClass('stomp', StompTransportFactory::class);
}

if (class_exists(AmqpContext::class)) {
$extension->addTransportFactory(new AmqpTransportFactory());
$extension->addTransportFactory(new RabbitMqAmqpTransportFactory());
$extension->addFactoryClass('amqp', AmqpTransportFactory::class);
}

if (class_exists(FsContext::class)) {
$extension->addTransportFactory(new FsTransportFactory());
$extension->addFactoryClass('file', FsTransportFactory::class);
}

if (class_exists(RedisContext::class)) {
$extension->addTransportFactory(new RedisTransportFactory());
$extension->addFactoryClass('redis', RedisTransportFactory::class);
}

if (class_exists(DbalContext::class)) {
$extension->addTransportFactory(new DbalTransportFactory());
$extension->addFactoryClass('dbal', DbalTransportFactory::class);
$extension->addFactoryClass('doctrine', ManagerRegistryConnectionFactory::class);
}

if (class_exists(SqsContext::class)) {
$extension->addTransportFactory(new SqsTransportFactory());
$extension->addFactoryClass('amazon_sqs', DbalTransportFactory::class);
}

$container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
Expand Down
4 changes: 2 additions & 2 deletions pkg/enqueue-bundle/Resources/config/client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ services:
class: 'Enqueue\Client\RpcClient'
arguments:
- '@enqueue.client.producer'
- '@enqueue.transport.context'
- '@enqueue.client.transport.context'

enqueue.client.router_processor:
class: 'Enqueue\Client\RouterProcessor'
Expand Down Expand Up @@ -78,7 +78,7 @@ services:
class: 'Enqueue\Consumption\QueueConsumer'
public: false
arguments:
- '@enqueue.transport.context'
- '@enqueue.client.transport.context'
- '@enqueue.consumption.extensions'

enqueue.client.consume_messages_command:
Expand Down
6 changes: 6 additions & 0 deletions pkg/enqueue-bundle/Resources/config/services.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
services:
enqueue.connection_factory_factory:
class: 'Enqueue\ConnectionFactoryFactory'
public: false
arguments:
- []

enqueue.consumption.extensions:
class: 'Enqueue\Consumption\ChainExtension'
public: false
Expand Down
6 changes: 6 additions & 0 deletions pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ public function provideEnqueueConfigs()
],
]];

yield 'default_dsn_as_env' => [[
'transport' => [
'default' => '%env(AMQP_DSN)%',
],
]];

yield 'default_dbal_as_dsn' => [[
'transport' => [
'default' => getenv('DOCTINE_DSN'),
Expand Down
Loading