Skip to content

Enable job-queue for default configuration #636

New issue

Have a question about this project? No Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “No Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? No Sign in to your account

Merged
merged 8 commits into from
Nov 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/bundle/config_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ enqueue:

# The factory service should be a class that implements "Enqueue\Monitoring\StatsStorageFactory" interface
storage_factory_class: ~
job: false
async_commands:
enabled: false
extensions:
Expand Down
36 changes: 28 additions & 8 deletions pkg/enqueue-bundle/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
namespace Enqueue\Bundle\DependencyInjection;

use Enqueue\AsyncCommand\RunCommandProcessor;
use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncEventDispatcherExtension;
use Enqueue\JobQueue\Job;
use Enqueue\Monitoring\Symfony\DependencyInjection\MonitoringFactory;
use Enqueue\Symfony\Client\DependencyInjection\ClientFactory;
use Enqueue\Symfony\DependencyInjection\TransportFactory;
Expand Down Expand Up @@ -35,6 +37,8 @@ public function getConfigTreeBuilder(): TreeBuilder
->append(ClientFactory::getConfiguration($this->debug))
->append($this->getMonitoringConfiguration())
->append($this->getAsyncCommandsConfiguration())
->append($this->getJobConfiguration())
->append($this->getAsyncEventsConfiguration())
->arrayNode('extensions')->addDefaultsIfNotSet()->children()
->booleanNode('doctrine_ping_connection_extension')->defaultFalse()->end()
->booleanNode('doctrine_clear_identity_map_extension')->defaultFalse()->end()
Expand All @@ -45,14 +49,6 @@ public function getConfigTreeBuilder(): TreeBuilder
->end()
;

// $rootNode->children()
// ->booleanNode('job')->defaultFalse()->end()
// ->arrayNode('async_events')
// ->addDefaultsIfNotSet()
// ->canBeEnabled()
// ->end()
// ;

return $tb;
}

Expand All @@ -76,4 +72,28 @@ private function getAsyncCommandsConfiguration(): ArrayNodeDefinition
->canBeEnabled()
;
}

private function getJobConfiguration(): ArrayNodeDefinition
{
if (false === class_exists(Job::class)) {
return MissingComponentFactory::getConfiguration('job', ['enqueue/job-queue']);
}

return (new ArrayNodeDefinition('job'))
->addDefaultsIfNotSet()
->canBeEnabled()
;
}

private function getAsyncEventsConfiguration(): ArrayNodeDefinition
{
if (false == class_exists(AsyncEventDispatcherExtension::class)) {
return MissingComponentFactory::getConfiguration('async_events', ['enqueue/async-event-dispatcher']);
}

return (new ArrayNodeDefinition('async_events'))
->addDefaultsIfNotSet()
->canBeEnabled()
;
}
}
79 changes: 43 additions & 36 deletions pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Enqueue\Symfony\Client\DependencyInjection\ClientFactory;
use Enqueue\Symfony\DependencyInjection\TransportFactory;
use Enqueue\Symfony\DiUtils;
use Interop\Queue\Context;
use Symfony\Component\Config\FileLocator;
use Symfony\Component\Config\Resource\FileResource;
use Symfony\Component\DependencyInjection\ContainerBuilder;
Expand All @@ -35,7 +36,7 @@ public function load(array $configs, ContainerBuilder $container): void

// find default configuration
$defaultName = null;
foreach ($config as $name => $configs) {
foreach ($config as $name => $modules) {
// set first as default
if (null === $defaultName) {
$defaultName = $name;
Expand All @@ -49,40 +50,65 @@ public function load(array $configs, ContainerBuilder $container): void

$transportNames = [];
$clientNames = [];
foreach ($config as $name => $configs) {
foreach ($config as $name => $modules) {
// transport & consumption
$transportNames[] = $name;

$transportFactory = (new TransportFactory($name));
$transportFactory->buildConnectionFactory($container, $configs['transport']);
$transportFactory = (new TransportFactory($name, $defaultName === $name));
$transportFactory->buildConnectionFactory($container, $modules['transport']);
$transportFactory->buildContext($container, []);
$transportFactory->buildQueueConsumer($container, $configs['consumption']);
$transportFactory->buildQueueConsumer($container, $modules['consumption']);
$transportFactory->buildRpcClient($container, []);

// client
if (isset($configs['client'])) {
if (isset($modules['client'])) {
$clientNames[] = $name;

$clientConfig = $configs['client'];
$clientConfig = $modules['client'];
// todo
$clientConfig['transport'] = $configs['transport'];
$clientConfig['consumption'] = $configs['consumption'];
$clientConfig['transport'] = $modules['transport'];
$clientConfig['consumption'] = $modules['consumption'];

$clientFactory = new ClientFactory($name);
$clientFactory->build($container, $clientConfig, $defaultName === $name);
$clientFactory->createDriver($container, $configs['transport']);
$clientFactory = new ClientFactory($name, $defaultName === $name);
$clientFactory->build($container, $clientConfig);
$clientFactory->createDriver($container, $modules['transport']);
$clientFactory->createFlushSpoolProducerListener($container);
}

// monitoring
if (isset($configs['monitoring'])) {
if (isset($modules['monitoring'])) {
$monitoringFactory = new MonitoringFactory($name);
$monitoringFactory->buildStorage($container, $configs['monitoring']);
$monitoringFactory->buildConsumerExtension($container, $configs['monitoring']);
$monitoringFactory->buildStorage($container, $modules['monitoring']);
$monitoringFactory->buildConsumerExtension($container, $modules['monitoring']);

if (isset($modules['client'])) {
$monitoringFactory->buildClientExtension($container, $modules['monitoring']);
}
}

// job-queue
if (false == empty($modules['job']['enabled'])) {
if (false === isset($modules['client'])) {
throw new \LogicException('Client is required for job-queue.');
}

if (isset($configs['client'])) {
$monitoringFactory->buildClientExtension($container, $configs['monitoring']);
if ($name !== $defaultName) {
throw new \LogicException('Job-queue supports only default configuration.');
}

$loader->load('job.yml');
}

// async events
if (false == empty($modules['async_events']['enabled'])) {
if ($name !== $defaultName) {
throw new \LogicException('Async events supports only default configuration.');
}

$extension = new AsyncEventDispatcherExtension();
$extension->load([[
'context_service' => Context::class,
]], $container);
}
}

Expand Down Expand Up @@ -112,25 +138,6 @@ public function load(array $configs, ContainerBuilder $container): void
$this->loadDoctrineClearIdentityMapExtension($config, $container);
$this->loadSignalExtension($config, $container);
$this->loadReplyExtension($config, $container);

// if ($config['job']) {
// if (!class_exists(Job::class)) {
// throw new \LogicException('Seems "enqueue/job-queue" is not installed. Please fix this issue.');
// }
//
// $loader->load('job.yml');
// }
//
// if ($config['async_events']['enabled']) {
// if (false == class_exists(AsyncEventDispatcherExtension::class)) {
// throw new \LogicException('The "enqueue/async-event-dispatcher" package has to be installed.');
// }
//
// $extension = new AsyncEventDispatcherExtension();
// $extension->load([[
// 'context_service' => 'enqueue.transport.default.context',
// ]], $container);
// }
}

public function getConfiguration(array $config, ContainerBuilder $container): Configuration
Expand Down
6 changes: 3 additions & 3 deletions pkg/enqueue-bundle/Resources/config/job.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ services:
public: true
arguments:
- '@Enqueue\JobQueue\Doctrine\JobStorage'
- '@enqueue.client.default.producer'
- '@Enqueue\Client\ProducerInterface'
Copy link
Member

@makasim makasim Nov 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think you should use autowire here. Could you please rollback?

Never mind


# Deprecated. To be removed in 0.10.
enqueue.job.processor:
Expand Down Expand Up @@ -55,7 +55,7 @@ services:
arguments:
- '@Enqueue\JobQueue\Doctrine\JobStorage'
- '@Enqueue\JobQueue\CalculateRootJobStatusService'
- '@enqueue.client.default.producer'
- '@Enqueue\Client\ProducerInterface'
- '@logger'
tags:
- { name: 'enqueue.command_subscriber', client: 'default' }
Expand All @@ -70,7 +70,7 @@ services:
public: true
arguments:
- '@Enqueue\JobQueue\Doctrine\JobStorage'
- '@enqueue.client.default.producer'
- '@Enqueue\Client\ProducerInterface'
- '@logger'
tags:
- { name: 'enqueue.topic_subscriber', client: 'default' }
Expand Down
12 changes: 6 additions & 6 deletions pkg/enqueue-bundle/Tests/Functional/App/config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ enqueue:
transport: 'null:'
client:
traceable_producer: true
job: true
async_events: true
async_commands: true
# job: true
# async_commands: true

services:
test_enqueue.client.default.traceable_producer:
Expand Down Expand Up @@ -115,7 +115,7 @@ services:
- {name: 'enqueue.event_transformer', eventName: 'test_async_subscriber', transformerName: 'test_async' }

# overwrite async listener with one based on client producer. so we can use traceable producer.
# enqueue.events.async_listener:
# class: 'Enqueue\Bundle\Tests\Functional\App\AsyncListener'
# public: true
# arguments: ['@enqueue.client.default.producer', '@enqueue.events.registry']
enqueue.events.async_listener:
class: 'Enqueue\Bundle\Tests\Functional\App\AsyncListener'
public: true
arguments: ['@enqueue.client.default.producer', '@enqueue.events.registry']
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ class AsyncListenerTest extends WebTestCase
{
public function setUp()
{
$this->markTestSkipped('Configuration for async_events is not yet ready');

parent::setUp();

/** @var AsyncListener $asyncListener */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ class AsyncProcessorTest extends WebTestCase
{
public function setUp()
{
$this->markTestSkipped('Configuration for async_events is not yet ready');

parent::setUp();

/** @var AsyncListener $asyncListener */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ class AsyncSubscriberTest extends WebTestCase
{
public function setUp()
{
$this->markTestSkipped('Configuration for async_events is not yet ready');

parent::setUp();

/** @var AsyncListener $asyncListener */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ class CalculateRootJobStatusProcessorTest extends WebTestCase
{
public function testCouldBeConstructedByContainer()
{
$this->markTestSkipped('Configuration for jobs is not yet ready');

$instance = static::$container->get(CalculateRootJobStatusProcessor::class);

$this->assertInstanceOf(CalculateRootJobStatusProcessor::class, $instance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ class DependentJobServiceTest extends WebTestCase
{
public function testCouldBeConstructedByContainer()
{
$this->markTestSkipped('Configuration for jobs is not yet ready');

$instance = static::$container->get(DependentJobService::class);

$this->assertInstanceOf(DependentJobService::class, $instance);
Expand Down
2 changes: 0 additions & 2 deletions pkg/enqueue-bundle/Tests/Functional/Job/JobRunnerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ class JobRunnerTest extends WebTestCase
{
public function testCouldBeConstructedByContainer()
{
$this->markTestSkipped('Configuration for jobs is not yet ready');

$instance = static::$container->get(JobRunner::class);

$this->assertInstanceOf(JobRunner::class, $instance);
Expand Down
2 changes: 0 additions & 2 deletions pkg/enqueue-bundle/Tests/Functional/Job/JobStorageTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ class JobStorageTest extends WebTestCase
{
public function testCouldGetJobStorageAsServiceFromContainer()
{
$this->markTestSkipped('Configuration for jobs is not yet ready');

$instance = static::$container->get(JobStorage::class);

$this->assertInstanceOf(JobStorage::class, $instance);
Expand Down
7 changes: 2 additions & 5 deletions pkg/enqueue-bundle/Tests/Functional/RoutesCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,13 @@ public function testShouldDisplayRegisteredTopics()
$tester->execute([]);

$expected = <<<'OUTPUT'
| topic | theTopic | default (prefixed) | test_topic_subscriber_processor | (hidden) |
| topic | theTopic | default (prefixed) | test_topic_subscriber_processor | (hidden) |
OUTPUT;

$this->assertSame(0, $tester->getStatusCode());
$this->assertContains($expected, $tester->getDisplay());
}

/**
* @group testit
*/
public function testShouldDisplayCommands()
{
/** @var RoutesCommand $command */
Expand All @@ -45,7 +42,7 @@ public function testShouldDisplayCommands()
$tester->execute([]);

$expected = <<<'OUTPUT'
| command | theCommand | default (prefixed) | test_command_subscriber_processor | (hidden) |
| command | theCommand | default (prefixed) | test_command_subscriber_processor | (hidden) |
OUTPUT;

$this->assertSame(0, $tester->getStatusCode());
Expand Down
Loading