From 2bb31ae646c7b8f0b3a506c09695c2f3a6051f99 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 4 Jan 2017 17:30:55 +0200 Subject: [PATCH] [psr] Introduce MessageProcessor interface (moved from consumption). It would allow to decouple consumption code from consumption component. It must depends on psr queue only. --- docs/bundle/job_queue.md | 24 +- docs/bundle/quick_tour.md | 15 +- docs/job_queue/run_sub_job.md | 9 +- docs/job_queue/run_unique_job.md | 7 +- docs/quick_tour.md | 14 +- .../AmqpConsumptionUseCasesTest.php | 14 +- .../Compiler/BuildClientRoutingPass.php | 4 +- ...ass.php => BuildProcessorRegistryPass.php} | 8 +- .../Compiler/BuildQueueMetaRegistryPass.php | 4 +- .../BuildTopicMetaSubscribersPass.php | 4 +- ...ExtractProcessorTagSubscriptionsTrait.php} | 2 +- pkg/enqueue-bundle/EnqueueBundle.php | 8 +- .../Resources/config/client.yml | 14 +- pkg/enqueue-bundle/Resources/config/job.yml | 6 +- .../Functional/ConsumeMessagesCommandTest.php | 26 +- ...MessageProcessor.php => TestProcessor.php} | 9 +- .../Functional/app/config/amqp-config.yml | 4 +- .../DoctrineClearIdentityMapExtensionTest.php | 8 +- .../DoctrinePingConnectionExtensionTest.php | 8 +- .../Compiler/BuildClientRoutingPassTest.php | 18 +- ...php => BuildProcessorRegistryPassTest.php} | 48 ++-- .../BuildQueueMetaRegistryPassTest.php | 16 +- .../BuildTopicMetaSubscribersPassTest.php | 26 +- .../Tests/Unit/EnqueueBundleTest.php | 4 +- .../Client/ArrayMessageProcessorRegistry.php | 41 --- pkg/enqueue/Client/ArrayProcessorRegistry.php | 41 +++ ...ageProcessor.php => DelegateProcessor.php} | 10 +- ...ace.php => ProcessorRegistryInterface.php} | 6 +- pkg/enqueue/Client/RouterProcessor.php | 7 +- pkg/enqueue/Client/SimpleClient.php | 12 +- ...ageProcessor.php => CallbackProcessor.php} | 3 +- pkg/enqueue/Consumption/Context.php | 21 +- .../Consumption/Extension/LoggerExtension.php | 2 +- .../Consumption/Extension/ReplyExtension.php | 3 +- .../Consumption/MessageProcessorInterface.php | 16 -- pkg/enqueue/Consumption/QueueConsumer.php | 49 ++-- pkg/enqueue/Consumption/Result.php | 71 +++-- .../Router/RouteRecipientListProcessor.php | 7 +- .../Symfony/Client/ConsumeMessagesCommand.php | 14 +- ...hp => ContainerAwareProcessorRegistry.php} | 14 +- .../ContainerAwareConsumeMessagesCommand.php | 14 +- .../ArrayMessageProcessorRegistryTest.php | 58 ----- .../Client/ArrayProcessorRegistryTest.php | 58 +++++ pkg/enqueue/Tests/Client/ConfigTest.php | 2 +- .../DelayRedeliveredMessageExtensionTest.php | 2 +- ...ssorTest.php => DelegateProcessorTest.php} | 36 +-- ...ssorTest.php => CallbackProcessorTest.php} | 14 +- pkg/enqueue/Tests/Consumption/ContextTest.php | 28 +- .../LimitConsumedMessagesExtensionTest.php | 8 +- .../LimitConsumerMemoryExtensionTest.php | 8 +- .../LimitConsumptionTimeExtensionTest.php | 8 +- .../Extension/LoggerExtensionTest.php | 4 +- .../Extension/ReplyExtensionTest.php | 6 +- .../Tests/Consumption/QueueConsumerTest.php | 246 +++++++++--------- .../Functional/Client/SimpleClientTest.php | 2 +- .../RouteRecipientListProcessorTest.php | 8 +- .../Client/ConsumeMessagesCommandTest.php | 24 +- ...> ContainerAwareProcessorRegistryTest.php} | 36 +-- ...ntainerAwareConsumeMessagesCommandTest.php | 16 +- .../CalculateRootJobStatusProcessor.php | 8 +- ...rocessor.php => DependentJobProcessor.php} | 16 +- .../CalculateRootJobStatusProcessorTest.php | 6 +- ...Test.php => DependentJobProcessorTest.php} | 40 +-- pkg/psr-queue/DeliveryMode.php | 2 +- pkg/psr-queue/Processor.php | 35 +++ .../Tests/{Exception => }/ExceptionTest.php | 2 +- .../InvalidDeliveryModeExceptionTest.php | 2 +- .../InvalidDestinationExceptionTest.php | 6 +- .../InvalidMessageExceptionTest.php | 2 +- .../StompConsumptionUseCasesTest.php | 16 +- 70 files changed, 671 insertions(+), 659 deletions(-) rename pkg/enqueue-bundle/DependencyInjection/Compiler/{BuildMessageProcessorRegistryPass.php => BuildProcessorRegistryPass.php} (77%) rename pkg/enqueue-bundle/DependencyInjection/Compiler/{ExtractMessageProcessorTagSubscriptionsTrait.php => ExtractProcessorTagSubscriptionsTrait.php} (98%) rename pkg/enqueue-bundle/Tests/Functional/{TestMessageProcessor.php => TestProcessor.php} (68%) rename pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/{BuildMessageProcessorRegistryPassTest.php => BuildProcessorRegistryPassTest.php} (73%) delete mode 100644 pkg/enqueue/Client/ArrayMessageProcessorRegistry.php create mode 100644 pkg/enqueue/Client/ArrayProcessorRegistry.php rename pkg/enqueue/Client/{DelegateMessageProcessor.php => DelegateProcessor.php} (70%) rename pkg/enqueue/Client/{MessageProcessorRegistryInterface.php => ProcessorRegistryInterface.php} (50%) rename pkg/enqueue/Consumption/{CallbackMessageProcessor.php => CallbackProcessor.php} (87%) delete mode 100644 pkg/enqueue/Consumption/MessageProcessorInterface.php rename pkg/enqueue/Symfony/Client/{ContainerAwareMessageProcessorRegistry.php => ContainerAwareProcessorRegistry.php} (71%) delete mode 100644 pkg/enqueue/Tests/Client/ArrayMessageProcessorRegistryTest.php create mode 100644 pkg/enqueue/Tests/Client/ArrayProcessorRegistryTest.php rename pkg/enqueue/Tests/Client/{DelegateMessageProcessorTest.php => DelegateProcessorTest.php} (57%) rename pkg/enqueue/Tests/Consumption/{CallbackMessageProcessorTest.php => CallbackProcessorTest.php} (58%) rename pkg/enqueue/Tests/Symfony/Client/{ContainerAwareMessageProcessorRegistryTest.php => ContainerAwareProcessorRegistryTest.php} (52%) rename pkg/job-queue/{DependentJobMessageProcessor.php => DependentJobProcessor.php} (84%) rename pkg/job-queue/Tests/{DependentJobMessageProcessorTest.php => DependentJobProcessorTest.php} (86%) create mode 100644 pkg/psr-queue/Processor.php rename pkg/psr-queue/Tests/{Exception => }/ExceptionTest.php (93%) rename pkg/psr-queue/Tests/{Exception => }/InvalidDeliveryModeExceptionTest.php (96%) rename pkg/psr-queue/Tests/{Exception => }/InvalidDestinationExceptionTest.php (90%) rename pkg/psr-queue/Tests/{Exception => }/InvalidMessageExceptionTest.php (92%) diff --git a/docs/bundle/job_queue.md b/docs/bundle/job_queue.md index 7dc92e4aa..aded0212c 100644 --- a/docs/bundle/job_queue.md +++ b/docs/bundle/job_queue.md @@ -14,15 +14,14 @@ Guaranty that there is only single job running with such name. ```php getBody(); - return Result::ACK; - // return Result::REJECT; // when the message is broken - // return Result::REQUEUE; // the message is fine but you want to postpone processing + return self::ACK; + // return self::REJECT; // when the message is broken + // return self::REQUEUE; // the message is fine but you want to postpone processing } public static function getSubscribedTopics() @@ -72,9 +71,9 @@ Register it as a container service and subscribe to the topic: ```yaml foo_message_processor: - class: 'FooMessageProcessor' + class: 'FooProcessor' tags: - - { name: 'enqueue.client.message_processor' } + - { name: 'enqueue.client.processor' } ``` Now you can start consuming messages: diff --git a/docs/job_queue/run_sub_job.md b/docs/job_queue/run_sub_job.md index a1dacd817..e0322922e 100644 --- a/docs/job_queue/run_sub_job.md +++ b/docs/job_queue/run_sub_job.md @@ -6,16 +6,15 @@ They will be executed in parallel. ```php bind('foo_queue', function(Message $message) { // process messsage - return Result::ACK; + return Processor::ACK; }); $queueConsumer->bind('bar_queue', function(Message $message) { // process messsage - return Result::ACK; + return Processor::ACK; }); $queueConsumer->consume(); @@ -167,16 +167,16 @@ Here's an example of how you can send and consume messages. ```php bind('foo_topic', function (Message $message) { // process message - return Result::ACK; + return Processor::ACK; }); $client->send('foo_topic', 'Hello there!'); diff --git a/pkg/amqp-ext/Tests/Functional/AmqpConsumptionUseCasesTest.php b/pkg/amqp-ext/Tests/Functional/AmqpConsumptionUseCasesTest.php index 15ffb2a2f..66705ae7f 100644 --- a/pkg/amqp-ext/Tests/Functional/AmqpConsumptionUseCasesTest.php +++ b/pkg/amqp-ext/Tests/Functional/AmqpConsumptionUseCasesTest.php @@ -2,15 +2,15 @@ namespace Enqueue\AmqpExt\Tests\Functional; use Enqueue\AmqpExt\AmqpContext; -use Enqueue\Psr\Context; -use Enqueue\Psr\Message; use Enqueue\Consumption\ChainExtension; use Enqueue\Consumption\Extension\LimitConsumedMessagesExtension; use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension; use Enqueue\Consumption\Extension\ReplyExtension; -use Enqueue\Consumption\MessageProcessorInterface; use Enqueue\Consumption\QueueConsumer; use Enqueue\Consumption\Result; +use Enqueue\Psr\Context; +use Enqueue\Psr\Message; +use Enqueue\Psr\Processor; use Enqueue\Test\RabbitmqAmqpExtension; use Enqueue\Test\RabbitmqManagmentExtensionTrait; @@ -52,7 +52,7 @@ public function testConsumeOneMessageAndExit() new LimitConsumptionTimeExtension(new \DateTime('+3sec')), ])); - $processor = new StubMessageProcessor(); + $processor = new StubProcessor(); $queueConsumer->bind($queue, $processor); $queueConsumer->consume(); @@ -81,10 +81,10 @@ public function testConsumeOneMessageAndSendReplyExit() $replyMessage = $this->amqpContext->createMessage(__METHOD__.'.reply'); - $processor = new StubMessageProcessor(); + $processor = new StubProcessor(); $processor->result = Result::reply($replyMessage); - $replyProcessor = new StubMessageProcessor(); + $replyProcessor = new StubProcessor(); $queueConsumer->bind($queue, $processor); $queueConsumer->bind($replyQueue, $replyProcessor); @@ -98,7 +98,7 @@ public function testConsumeOneMessageAndSendReplyExit() } } -class StubMessageProcessor implements MessageProcessorInterface +class StubProcessor implements Processor { public $result = Result::ACK; diff --git a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientRoutingPass.php b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientRoutingPass.php index ccce00370..a1e95c750 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientRoutingPass.php +++ b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientRoutingPass.php @@ -6,14 +6,14 @@ class BuildClientRoutingPass implements CompilerPassInterface { - use ExtractMessageProcessorTagSubscriptionsTrait; + use ExtractProcessorTagSubscriptionsTrait; /** * {@inheritdoc} */ public function process(ContainerBuilder $container) { - $processorTagName = 'enqueue.client.message_processor'; + $processorTagName = 'enqueue.client.processor'; $routerId = 'enqueue.client.router_processor'; if (false == $container->hasDefinition($routerId)) { diff --git a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildMessageProcessorRegistryPass.php b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildProcessorRegistryPass.php similarity index 77% rename from pkg/enqueue-bundle/DependencyInjection/Compiler/BuildMessageProcessorRegistryPass.php rename to pkg/enqueue-bundle/DependencyInjection/Compiler/BuildProcessorRegistryPass.php index df489c716..c6835b243 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildMessageProcessorRegistryPass.php +++ b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildProcessorRegistryPass.php @@ -4,17 +4,17 @@ use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface; use Symfony\Component\DependencyInjection\ContainerBuilder; -class BuildMessageProcessorRegistryPass implements CompilerPassInterface +class BuildProcessorRegistryPass implements CompilerPassInterface { - use ExtractMessageProcessorTagSubscriptionsTrait; + use ExtractProcessorTagSubscriptionsTrait; /** * {@inheritdoc} */ public function process(ContainerBuilder $container) { - $processorTagName = 'enqueue.client.message_processor'; - $processorRegistryId = 'enqueue.client.message_processor_registry'; + $processorTagName = 'enqueue.client.processor'; + $processorRegistryId = 'enqueue.client.processor_registry'; if (false == $container->hasDefinition($processorRegistryId)) { return; diff --git a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildQueueMetaRegistryPass.php b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildQueueMetaRegistryPass.php index 96d99d22d..aeaba4f47 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildQueueMetaRegistryPass.php +++ b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildQueueMetaRegistryPass.php @@ -6,14 +6,14 @@ class BuildQueueMetaRegistryPass implements CompilerPassInterface { - use ExtractMessageProcessorTagSubscriptionsTrait; + use ExtractProcessorTagSubscriptionsTrait; /** * {@inheritdoc} */ public function process(ContainerBuilder $container) { - $processorTagName = 'enqueue.client.message_processor'; + $processorTagName = 'enqueue.client.processor'; $queueMetaRegistryId = 'enqueue.client.meta.queue_meta_registry'; if (false == $container->hasDefinition($queueMetaRegistryId)) { return; diff --git a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildTopicMetaSubscribersPass.php b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildTopicMetaSubscribersPass.php index cf53cb388..5a9e29c6e 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildTopicMetaSubscribersPass.php +++ b/pkg/enqueue-bundle/DependencyInjection/Compiler/BuildTopicMetaSubscribersPass.php @@ -6,14 +6,14 @@ class BuildTopicMetaSubscribersPass implements CompilerPassInterface { - use ExtractMessageProcessorTagSubscriptionsTrait; + use ExtractProcessorTagSubscriptionsTrait; /** * {@inheritdoc} */ public function process(ContainerBuilder $container) { - $processorTagName = 'enqueue.client.message_processor'; + $processorTagName = 'enqueue.client.processor'; $topicsSubscribers = []; foreach ($container->findTaggedServiceIds($processorTagName) as $serviceId => $tagAttributes) { diff --git a/pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractMessageProcessorTagSubscriptionsTrait.php b/pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractProcessorTagSubscriptionsTrait.php similarity index 98% rename from pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractMessageProcessorTagSubscriptionsTrait.php rename to pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractProcessorTagSubscriptionsTrait.php index d0f9e2ea0..20a0ebc4b 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractMessageProcessorTagSubscriptionsTrait.php +++ b/pkg/enqueue-bundle/DependencyInjection/Compiler/ExtractProcessorTagSubscriptionsTrait.php @@ -5,7 +5,7 @@ use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\Exception\ParameterNotFoundException; -trait ExtractMessageProcessorTagSubscriptionsTrait +trait ExtractProcessorTagSubscriptionsTrait { /** * @param ContainerBuilder $container diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php index d230d3a71..29b2f725b 100644 --- a/pkg/enqueue-bundle/EnqueueBundle.php +++ b/pkg/enqueue-bundle/EnqueueBundle.php @@ -4,17 +4,17 @@ use Enqueue\AmqpExt\AmqpContext; use Enqueue\AmqpExt\Symfony\AmqpTransportFactory; use Enqueue\AmqpExt\Symfony\RabbitMqTransportFactory; -use Enqueue\Symfony\DefaultTransportFactory; -use Enqueue\Symfony\NullTransportFactory; use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildExtensionsPass; -use Enqueue\Bundle\DependencyInjection\Compiler\BuildMessageProcessorRegistryPass; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass; use Enqueue\Bundle\DependencyInjection\EnqueueExtension; use Enqueue\Stomp\StompContext; use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory; use Enqueue\Stomp\Symfony\StompTransportFactory; +use Enqueue\Symfony\DefaultTransportFactory; +use Enqueue\Symfony\NullTransportFactory; use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\HttpKernel\Bundle\Bundle; @@ -27,7 +27,7 @@ public function build(ContainerBuilder $container) { $container->addCompilerPass(new BuildExtensionsPass()); $container->addCompilerPass(new BuildClientRoutingPass()); - $container->addCompilerPass(new BuildMessageProcessorRegistryPass()); + $container->addCompilerPass(new BuildProcessorRegistryPass()); $container->addCompilerPass(new BuildTopicMetaSubscribersPass()); $container->addCompilerPass(new BuildQueueMetaRegistryPass()); diff --git a/pkg/enqueue-bundle/Resources/config/client.yml b/pkg/enqueue-bundle/Resources/config/client.yml index 9ed2ce15e..ffcb806e0 100644 --- a/pkg/enqueue-bundle/Resources/config/client.yml +++ b/pkg/enqueue-bundle/Resources/config/client.yml @@ -18,12 +18,12 @@ services: - [] tags: - - name: 'enqueue.client.message_processor' + name: 'enqueue.client.processor' topicName: '__router__' queueName: '%enqueue.client.router_queue_name%' - enqueue.client.message_processor_registry: - class: 'Enqueue\Symfony\Client\ContainerAwareMessageProcessorRegistry' + enqueue.client.processor_registry: + class: 'Enqueue\Symfony\Client\ContainerAwareProcessorRegistry' public: false calls: - ['setContainer', ['@service_container']] @@ -38,11 +38,11 @@ services: public: true arguments: ['@enqueue.client.config', []] - enqueue.client.delegate_message_processor: - class: 'Enqueue\Client\DelegateMessageProcessor' + enqueue.client.delegate_processor: + class: 'Enqueue\Client\DelegateProcessor' public: false arguments: - - '@enqueue.client.message_processor_registry' + - '@enqueue.client.processor_registry' enqueue.client.extension.set_router_properties: class: 'Enqueue\Client\ConsumptionExtension\SetRouterPropertiesExtension' @@ -64,7 +64,7 @@ services: public: true arguments: - '@enqueue.client.queue_consumer' - - '@enqueue.client.delegate_message_processor' + - '@enqueue.client.delegate_processor' - '@enqueue.client.meta.queue_meta_registry' - '@enqueue.client.driver' tags: diff --git a/pkg/enqueue-bundle/Resources/config/job.yml b/pkg/enqueue-bundle/Resources/config/job.yml index 1dfb14638..4a05ce0c3 100644 --- a/pkg/enqueue-bundle/Resources/config/job.yml +++ b/pkg/enqueue-bundle/Resources/config/job.yml @@ -33,16 +33,16 @@ services: - '@enqueue.client.message_producer' - '@logger' tags: - - { name: 'enqueue.client.message_processor' } + - { name: 'enqueue.client.processor' } enqueue.job.dependent_job_processor: - class: 'Enqueue\JobQueue\DependentJobMessageProcessor' + class: 'Enqueue\JobQueue\DependentJobProcessor' arguments: - '@enqueue.job.storage' - '@enqueue.client.message_producer' - '@logger' tags: - - { name: 'enqueue.client.message_processor' } + - { name: 'enqueue.client.processor' } enqueue.job.dependent_job_service: class: 'Enqueue\JobQueue\DependentJobService' diff --git a/pkg/enqueue-bundle/Tests/Functional/ConsumeMessagesCommandTest.php b/pkg/enqueue-bundle/Tests/Functional/ConsumeMessagesCommandTest.php index 5dba0b292..47fcd0cbf 100644 --- a/pkg/enqueue-bundle/Tests/Functional/ConsumeMessagesCommandTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/ConsumeMessagesCommandTest.php @@ -2,9 +2,9 @@ namespace Enqueue\Bundle\Tests\Functional; use Enqueue\AmqpExt\AmqpMessage; +use Enqueue\Bundle\Tests\Functional\App\AmqpAppKernel; use Enqueue\Symfony\Client\ConsumeMessagesCommand; use Enqueue\Test\RabbitmqManagmentExtensionTrait; -use Enqueue\Bundle\Tests\Functional\App\AmqpAppKernel; use Symfony\Component\Console\Tester\CommandTester; /** @@ -35,9 +35,9 @@ public function testCouldBeGetFromContainerAsService() public function testClientConsumeMessagesCommandShouldConsumeMessage() { $command = $this->container->get('enqueue.client.consume_messages_command'); - $messageProcessor = $this->container->get('test.message.processor'); + $processor = $this->container->get('test.message.processor'); - $this->getMessageProducer()->send(TestMessageProcessor::TOPIC, 'test message body'); + $this->getMessageProducer()->send(TestProcessor::TOPIC, 'test message body'); $tester = new CommandTester($command); $tester->execute([ @@ -45,16 +45,16 @@ public function testClientConsumeMessagesCommandShouldConsumeMessage() '--time-limit' => 'now +10 seconds', ]); - $this->assertInstanceOf(AmqpMessage::class, $messageProcessor->message); - $this->assertEquals('test message body', $messageProcessor->message->getBody()); + $this->assertInstanceOf(AmqpMessage::class, $processor->message); + $this->assertEquals('test message body', $processor->message->getBody()); } public function testClientConsumeMessagesFromExplicitlySetQueue() { $command = $this->container->get('enqueue.client.consume_messages_command'); - $messageProcessor = $this->container->get('test.message.processor'); + $processor = $this->container->get('test.message.processor'); - $this->getMessageProducer()->send(TestMessageProcessor::TOPIC, 'test message body'); + $this->getMessageProducer()->send(TestProcessor::TOPIC, 'test message body'); $tester = new CommandTester($command); $tester->execute([ @@ -63,17 +63,17 @@ public function testClientConsumeMessagesFromExplicitlySetQueue() 'client-queue-names' => ['test'], ]); - $this->assertInstanceOf(AmqpMessage::class, $messageProcessor->message); - $this->assertEquals('test message body', $messageProcessor->message->getBody()); + $this->assertInstanceOf(AmqpMessage::class, $processor->message); + $this->assertEquals('test message body', $processor->message->getBody()); } public function testTransportConsumeMessagesCommandShouldConsumeMessage() { $command = $this->container->get('enqueue.command.consume_messages'); $command->setContainer($this->container); - $messageProcessor = $this->container->get('test.message.processor'); + $processor = $this->container->get('test.message.processor'); - $this->getMessageProducer()->send(TestMessageProcessor::TOPIC, 'test message body'); + $this->getMessageProducer()->send(TestProcessor::TOPIC, 'test message body'); $tester = new CommandTester($command); $tester->execute([ @@ -83,8 +83,8 @@ public function testTransportConsumeMessagesCommandShouldConsumeMessage() 'processor-service' => 'test.message.processor', ]); - $this->assertInstanceOf(AmqpMessage::class, $messageProcessor->message); - $this->assertEquals('test message body', $messageProcessor->message->getBody()); + $this->assertInstanceOf(AmqpMessage::class, $processor->message); + $this->assertEquals('test message body', $processor->message->getBody()); } private function getMessageProducer() diff --git a/pkg/enqueue-bundle/Tests/Functional/TestMessageProcessor.php b/pkg/enqueue-bundle/Tests/Functional/TestProcessor.php similarity index 68% rename from pkg/enqueue-bundle/Tests/Functional/TestMessageProcessor.php rename to pkg/enqueue-bundle/Tests/Functional/TestProcessor.php index e08000d54..cea3c4e25 100644 --- a/pkg/enqueue-bundle/Tests/Functional/TestMessageProcessor.php +++ b/pkg/enqueue-bundle/Tests/Functional/TestProcessor.php @@ -1,13 +1,12 @@ message = $message; - return Result::ACK; + return self::ACK; } public static function getSubscribedTopics() diff --git a/pkg/enqueue-bundle/Tests/Functional/app/config/amqp-config.yml b/pkg/enqueue-bundle/Tests/Functional/app/config/amqp-config.yml index ad130b295..312f81f52 100644 --- a/pkg/enqueue-bundle/Tests/Functional/app/config/amqp-config.yml +++ b/pkg/enqueue-bundle/Tests/Functional/app/config/amqp-config.yml @@ -38,6 +38,6 @@ enqueue: services: test.message.processor: - class: 'Enqueue\Bundle\Tests\Functional\TestMessageProcessor' + class: 'Enqueue\Bundle\Tests\Functional\TestProcessor' tags: - - { name: 'enqueue.client.message_processor' } + - { name: 'enqueue.client.processor' } diff --git a/pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrineClearIdentityMapExtensionTest.php b/pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrineClearIdentityMapExtensionTest.php index 93ac073ab..c9b0c8901 100644 --- a/pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrineClearIdentityMapExtensionTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrineClearIdentityMapExtensionTest.php @@ -2,11 +2,11 @@ namespace Enqueue\Bundle\Tests\Unit\Consumption\Extension; use Doctrine\Common\Persistence\ObjectManager; +use Enqueue\Bundle\Consumption\Extension\DoctrineClearIdentityMapExtension; +use Enqueue\Consumption\Context; use Enqueue\Psr\Consumer; use Enqueue\Psr\Context as PsrContext; -use Enqueue\Consumption\Context; -use Enqueue\Consumption\MessageProcessorInterface; -use Enqueue\Bundle\Consumption\Extension\DoctrineClearIdentityMapExtension; +use Enqueue\Psr\Processor; use Psr\Log\LoggerInterface; use Symfony\Bridge\Doctrine\RegistryInterface; @@ -51,7 +51,7 @@ protected function createPsrContext() $context = new Context($this->createMock(PsrContext::class)); $context->setLogger($this->createMock(LoggerInterface::class)); $context->setPsrConsumer($this->createMock(Consumer::class)); - $context->setMessageProcessor($this->createMock(MessageProcessorInterface::class)); + $context->setPsrProcessor($this->createMock(Processor::class)); return $context; } diff --git a/pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrinePingConnectionExtensionTest.php b/pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrinePingConnectionExtensionTest.php index 046442add..900ff2737 100644 --- a/pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrinePingConnectionExtensionTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrinePingConnectionExtensionTest.php @@ -2,11 +2,11 @@ namespace Enqueue\Bundle\Tests\Unit\Consumption\Extension; use Doctrine\DBAL\Connection; +use Enqueue\Bundle\Consumption\Extension\DoctrinePingConnectionExtension; +use Enqueue\Consumption\Context; use Enqueue\Psr\Consumer; use Enqueue\Psr\Context as PsrContext; -use Enqueue\Consumption\Context; -use Enqueue\Consumption\MessageProcessorInterface; -use Enqueue\Bundle\Consumption\Extension\DoctrinePingConnectionExtension; +use Enqueue\Psr\Processor; use Psr\Log\LoggerInterface; use Symfony\Bridge\Doctrine\RegistryInterface; @@ -99,7 +99,7 @@ protected function createPsrContext() $context = new Context($this->createMock(PsrContext::class)); $context->setLogger($this->createMock(LoggerInterface::class)); $context->setPsrConsumer($this->createMock(Consumer::class)); - $context->setMessageProcessor($this->createMock(MessageProcessorInterface::class)); + $context->setPsrProcessor($this->createMock(Processor::class)); return $context; } diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientRoutingPassTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientRoutingPassTest.php index e55e1d813..07b0d1978 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientRoutingPassTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildClientRoutingPassTest.php @@ -21,7 +21,7 @@ public function testShouldBuildRouteRegistry() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'topic', 'processorName' => 'processor', 'queueName' => 'queue', @@ -49,7 +49,7 @@ public function testThrowIfProcessorClassNameCouldNotBeFound() $container = $this->createContainerBuilder(); $processor = new Definition('notExistingClass'); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'processorName' => 'processor', ]); $container->setDefinition('processor', $processor); @@ -70,7 +70,7 @@ public function testShouldThrowExceptionIfTopicNameIsNotSet() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor', $processor); $router = new Definition(); @@ -89,7 +89,7 @@ public function testShouldSetServiceIdAdProcessorIdIfIsNotSetInTag() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'topic', 'queueName' => 'queue', ]); @@ -116,7 +116,7 @@ public function testShouldSetDefaultQueueIfNotSetInTag() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'topic', ]); $container->setDefinition('processor-service-id', $processor); @@ -142,7 +142,7 @@ public function testShouldBuildRouteFromSubscriberIfOnlyTopicNameSpecified() $container = $this->createContainerBuilder(); $processor = new Definition(OnlyTopicNameTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-service-id', $processor); $router = new Definition(); @@ -166,7 +166,7 @@ public function testShouldBuildRouteFromSubscriberIfProcessorNameSpecified() $container = $this->createContainerBuilder(); $processor = new Definition(ProcessorNameTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-service-id', $processor); $router = new Definition(); @@ -190,7 +190,7 @@ public function testShouldBuildRouteFromSubscriberIfQueueNameSpecified() $container = $this->createContainerBuilder(); $processor = new Definition(QueueNameTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-service-id', $processor); $router = new Definition(); @@ -216,7 +216,7 @@ public function testShouldThrowExceptionWhenTopicSubscriberConfigurationIsInvali $container = $this->createContainerBuilder(); $processor = new Definition(InvalidTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-service-id', $processor); $router = new Definition(); diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildMessageProcessorRegistryPassTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildProcessorRegistryPassTest.php similarity index 73% rename from pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildMessageProcessorRegistryPassTest.php rename to pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildProcessorRegistryPassTest.php index 44a1de5a7..0980f9a01 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildMessageProcessorRegistryPassTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildProcessorRegistryPassTest.php @@ -1,18 +1,18 @@ createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'topic', 'processorName' => 'processor-name', ]); @@ -28,9 +28,9 @@ public function testShouldBuildRouteRegistry() $processorRegistry = new Definition(); $processorRegistry->setArguments([]); - $container->setDefinition('enqueue.client.message_processor_registry', $processorRegistry); + $container->setDefinition('enqueue.client.processor_registry', $processorRegistry); - $pass = new BuildMessageProcessorRegistryPass(); + $pass = new BuildProcessorRegistryPass(); $pass->process($container); $expectedValue = [ @@ -45,16 +45,16 @@ public function testThrowIfProcessorClassNameCouldNotBeFound() $container = $this->createContainerBuilder(); $processor = new Definition('notExistingClass'); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'processorName' => 'processor', ]); $container->setDefinition('processor', $processor); $processorRegistry = new Definition(); $processorRegistry->setArguments([]); - $container->setDefinition('enqueue.client.message_processor_registry', $processorRegistry); + $container->setDefinition('enqueue.client.processor_registry', $processorRegistry); - $pass = new BuildMessageProcessorRegistryPass(); + $pass = new BuildProcessorRegistryPass(); $this->expectException(\LogicException::class); $this->expectExceptionMessage('The class "notExistingClass" could not be found.'); @@ -66,14 +66,14 @@ public function testShouldThrowExceptionIfTopicNameIsNotSet() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor', $processor); $processorRegistry = new Definition(); $processorRegistry->setArguments([]); - $container->setDefinition('enqueue.client.message_processor_registry', $processorRegistry); + $container->setDefinition('enqueue.client.processor_registry', $processorRegistry); - $pass = new BuildMessageProcessorRegistryPass(); + $pass = new BuildProcessorRegistryPass(); $this->expectException(\LogicException::class); $this->expectExceptionMessage('Topic name is not set on message processor tag but it is required.'); @@ -85,16 +85,16 @@ public function testShouldSetServiceIdAdProcessorIdIfIsNotSetInTag() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'topic', ]); $container->setDefinition('processor-id', $processor); $processorRegistry = new Definition(); $processorRegistry->setArguments([]); - $container->setDefinition('enqueue.client.message_processor_registry', $processorRegistry); + $container->setDefinition('enqueue.client.processor_registry', $processorRegistry); - $pass = new BuildMessageProcessorRegistryPass(); + $pass = new BuildProcessorRegistryPass(); $pass->process($container); $expectedValue = [ @@ -109,14 +109,14 @@ public function testShouldBuildRouteFromSubscriberIfOnlyTopicNameSpecified() $container = $this->createContainerBuilder(); $processor = new Definition(OnlyTopicNameTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-id', $processor); $processorRegistry = new Definition(); $processorRegistry->setArguments([]); - $container->setDefinition('enqueue.client.message_processor_registry', $processorRegistry); + $container->setDefinition('enqueue.client.processor_registry', $processorRegistry); - $pass = new BuildMessageProcessorRegistryPass(); + $pass = new BuildProcessorRegistryPass(); $pass->process($container); $expectedValue = [ @@ -131,14 +131,14 @@ public function testShouldBuildRouteFromSubscriberIfProcessorNameSpecified() $container = $this->createContainerBuilder(); $processor = new Definition(ProcessorNameTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-id', $processor); $processorRegistry = new Definition(); $processorRegistry->setArguments([]); - $container->setDefinition('enqueue.client.message_processor_registry', $processorRegistry); + $container->setDefinition('enqueue.client.processor_registry', $processorRegistry); - $pass = new BuildMessageProcessorRegistryPass(); + $pass = new BuildProcessorRegistryPass(); $pass->process($container); $expectedValue = [ @@ -155,14 +155,14 @@ public function testShouldThrowExceptionWhenTopicSubscriberConfigurationIsInvali $container = $this->createContainerBuilder(); $processor = new Definition(InvalidTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-id', $processor); $processorRegistry = new Definition(); $processorRegistry->setArguments([]); - $container->setDefinition('enqueue.client.message_processor_registry', $processorRegistry); + $container->setDefinition('enqueue.client.processor_registry', $processorRegistry); - $pass = new BuildMessageProcessorRegistryPass(); + $pass = new BuildProcessorRegistryPass(); $pass->process($container); } diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildQueueMetaRegistryPassTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildQueueMetaRegistryPassTest.php index ad1a85bf7..332e7d195 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildQueueMetaRegistryPassTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildQueueMetaRegistryPassTest.php @@ -20,7 +20,7 @@ public function testShouldDoNothingIfRegistryServicesNotSetToContainer() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'processorName' => 'processor', ]); $container->setDefinition('processor', $processor); @@ -34,7 +34,7 @@ public function testThrowIfProcessorClassNameCouldNotBeFound() $container = $this->createContainerBuilder(); $processor = new Definition('notExistingClass'); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'processorName' => 'processor', ]); $container->setDefinition('processor', $processor); @@ -55,7 +55,7 @@ public function testShouldBuildQueueMetaRegistry() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'processorName' => 'theProcessorName', 'topicName' => 'aTopicName', ]); @@ -80,7 +80,7 @@ public function testShouldSetServiceIdAdProcessorIdIfIsNotSetInTag() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'aTopicName', ]); $container->setDefinition('processor-service-id', $processor); @@ -104,7 +104,7 @@ public function testShouldSetQueueIfSetInTag() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'queueName' => 'theClientQueueName', 'topicName' => 'aTopicName', ]); @@ -129,7 +129,7 @@ public function testShouldBuildQueueFromSubscriberIfOnlyTopicNameSpecified() $container = $this->createContainerBuilder(); $processor = new Definition(OnlyTopicNameTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-service-id', $processor); $registry = new Definition(); @@ -151,7 +151,7 @@ public function testShouldBuildQueueFromSubscriberIfProcessorNameSpecified() $container = $this->createContainerBuilder(); $processor = new Definition(ProcessorNameTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-service-id', $processor); $registry = new Definition(); @@ -173,7 +173,7 @@ public function testShouldBuildQueueFromSubscriberIfQueueNameSpecified() $container = $this->createContainerBuilder(); $processor = new Definition(QueueNameTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-service-id', $processor); $registry = new Definition(); diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildTopicMetaSubscribersPassTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildTopicMetaSubscribersPassTest.php index 929f1fba4..1571fa25c 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildTopicMetaSubscribersPassTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/Compiler/BuildTopicMetaSubscribersPassTest.php @@ -20,7 +20,7 @@ public function testShouldBuildTopicMetaSubscribersForOneTagAndEmptyRegistry() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'topic', 'processorName' => 'processor-name', ]); @@ -45,7 +45,7 @@ public function testThrowIfProcessorClassNameCouldNotBeFound() $container = $this->createContainerBuilder(); $processor = new Definition('notExistingClass'); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'processorName' => 'processor', ]); $container->setDefinition('processor', $processor); @@ -66,7 +66,7 @@ public function testShouldBuildTopicMetaSubscribersForOneTagAndSameMetaInRegistr $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'topic', 'processorName' => 'barProcessorName', ]); @@ -96,7 +96,7 @@ public function testShouldBuildTopicMetaSubscribersForOneTagAndSameMetaInPlusAno $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'fooTopic', 'processorName' => 'barProcessorName', ]); @@ -128,14 +128,14 @@ public function testShouldBuildTopicMetaSubscribersForTwoTagAndEmptyRegistry() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'fooTopic', 'processorName' => 'fooProcessorName', ]); $container->setDefinition('processor-id', $processor); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'fooTopic', 'processorName' => 'barProcessorName', ]); @@ -162,14 +162,14 @@ public function testShouldBuildTopicMetaSubscribersForTwoTagSameMetaRegistry() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'fooTopic', 'processorName' => 'fooProcessorName', ]); $container->setDefinition('processor-id', $processor); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'fooTopic', 'processorName' => 'barProcessorName', ]); @@ -199,7 +199,7 @@ public function testThrowIfTopicNameNotSetOnTagAsAttribute() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', []); + $processor->addTag('enqueue.client.processor', []); $container->setDefinition('processor', $processor); $topicMetaRegistry = new Definition(); @@ -218,7 +218,7 @@ public function testShouldSetServiceIdAdProcessorIdIfIsNotSetInTag() $container = $this->createContainerBuilder(); $processor = new Definition(\stdClass::class); - $processor->addTag('enqueue.client.message_processor', [ + $processor->addTag('enqueue.client.processor', [ 'topicName' => 'topic', ]); $container->setDefinition('processor-id', $processor); @@ -242,7 +242,7 @@ public function testShouldBuildMetaFromSubscriberIfOnlyTopicNameSpecified() $container = $this->createContainerBuilder(); $processor = new Definition(OnlyTopicNameTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-id', $processor); $topicMetaRegistry = new Definition(); @@ -264,7 +264,7 @@ public function testShouldBuildMetaFromSubscriberIfProcessorNameSpecified() $container = $this->createContainerBuilder(); $processor = new Definition(ProcessorNameTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-id', $processor); $topicMetaRegistry = new Definition(); @@ -288,7 +288,7 @@ public function testShouldThrowExceptionWhenTopicSubscriberConfigurationIsInvali $container = $this->createContainerBuilder(); $processor = new Definition(InvalidTopicSubscriber::class); - $processor->addTag('enqueue.client.message_processor'); + $processor->addTag('enqueue.client.processor'); $container->setDefinition('processor-id', $processor); $topicMetaRegistry = new Definition(); diff --git a/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php index 71b652842..14b1bfe9e 100644 --- a/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/EnqueueBundleTest.php @@ -5,7 +5,7 @@ use Enqueue\AmqpExt\Symfony\RabbitMqTransportFactory; use Enqueue\Bundle\DependencyInjection\Compiler\BuildClientRoutingPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildExtensionsPass; -use Enqueue\Bundle\DependencyInjection\Compiler\BuildMessageProcessorRegistryPass; +use Enqueue\Bundle\DependencyInjection\Compiler\BuildProcessorRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildQueueMetaRegistryPass; use Enqueue\Bundle\DependencyInjection\Compiler\BuildTopicMetaSubscribersPass; use Enqueue\Bundle\DependencyInjection\EnqueueExtension; @@ -50,7 +50,7 @@ public function testShouldRegisterExpectedCompilerPasses() $container ->expects($this->at(2)) ->method('addCompilerPass') - ->with($this->isInstanceOf(BuildMessageProcessorRegistryPass::class)) + ->with($this->isInstanceOf(BuildProcessorRegistryPass::class)) ; $container ->expects($this->at(3)) diff --git a/pkg/enqueue/Client/ArrayMessageProcessorRegistry.php b/pkg/enqueue/Client/ArrayMessageProcessorRegistry.php deleted file mode 100644 index e48693c55..000000000 --- a/pkg/enqueue/Client/ArrayMessageProcessorRegistry.php +++ /dev/null @@ -1,41 +0,0 @@ -processors = $processors; - } - - /** - * @param string $name - * @param MessageProcessorInterface $processor - */ - public function add($name, MessageProcessorInterface $processor) - { - $this->processors[$name] = $processor; - } - - /** - * {@inheritdoc} - */ - public function get($processorName) - { - if (false == isset($this->processors[$processorName])) { - throw new \LogicException(sprintf('MessageProcessor was not found. processorName: "%s"', $processorName)); - } - - return $this->processors[$processorName]; - } -} diff --git a/pkg/enqueue/Client/ArrayProcessorRegistry.php b/pkg/enqueue/Client/ArrayProcessorRegistry.php new file mode 100644 index 000000000..fc7ec5a76 --- /dev/null +++ b/pkg/enqueue/Client/ArrayProcessorRegistry.php @@ -0,0 +1,41 @@ +processors = $processors; + } + + /** + * @param string $name + * @param Processor $processor + */ + public function add($name, Processor $processor) + { + $this->processors[$name] = $processor; + } + + /** + * {@inheritdoc} + */ + public function get($processorName) + { + if (false == isset($this->processors[$processorName])) { + throw new \LogicException(sprintf('Processor was not found. processorName: "%s"', $processorName)); + } + + return $this->processors[$processorName]; + } +} diff --git a/pkg/enqueue/Client/DelegateMessageProcessor.php b/pkg/enqueue/Client/DelegateProcessor.php similarity index 70% rename from pkg/enqueue/Client/DelegateMessageProcessor.php rename to pkg/enqueue/Client/DelegateProcessor.php index e7a2206e2..e04e9507c 100644 --- a/pkg/enqueue/Client/DelegateMessageProcessor.php +++ b/pkg/enqueue/Client/DelegateProcessor.php @@ -3,19 +3,19 @@ use Enqueue\Psr\Context; use Enqueue\Psr\Message as PsrMessage; -use Enqueue\Consumption\MessageProcessorInterface; +use Enqueue\Psr\Processor; -class DelegateMessageProcessor implements MessageProcessorInterface +class DelegateProcessor implements Processor { /** - * @var MessageProcessorRegistryInterface + * @var ProcessorRegistryInterface */ private $registry; /** - * @param MessageProcessorRegistryInterface $registry + * @param ProcessorRegistryInterface $registry */ - public function __construct(MessageProcessorRegistryInterface $registry) + public function __construct(ProcessorRegistryInterface $registry) { $this->registry = $registry; } diff --git a/pkg/enqueue/Client/MessageProcessorRegistryInterface.php b/pkg/enqueue/Client/ProcessorRegistryInterface.php similarity index 50% rename from pkg/enqueue/Client/MessageProcessorRegistryInterface.php rename to pkg/enqueue/Client/ProcessorRegistryInterface.php index 10b707f92..42aa8a884 100644 --- a/pkg/enqueue/Client/MessageProcessorRegistryInterface.php +++ b/pkg/enqueue/Client/ProcessorRegistryInterface.php @@ -1,14 +1,14 @@ queueMetaRegistry->add($this->config->getRouterQueueName()); $this->topicsMetaRegistry = new TopicMetaRegistry([]); - $this->processorsRegistry = new ArrayMessageProcessorRegistry(); + $this->processorsRegistry = new ArrayProcessorRegistry(); $this->driver = new AmqpDriver($context, $this->config, $this->queueMetaRegistry); $this->routerProcessor = new RouterProcessor($this->driver, []); @@ -80,7 +80,7 @@ public function bind($topic, callable $processor) $this->topicsMetaRegistry->addProcessor($topic, $processorName); $this->queueMetaRegistry->addProcessor($queueName, $processorName); - $this->processorsRegistry->add($processorName, new CallbackMessageProcessor($processor)); + $this->processorsRegistry->add($processorName, new CallbackProcessor($processor)); $this->routerProcessor->add($topic, $queueName, $processorName); } @@ -124,10 +124,10 @@ private function getProducer() } /** - * @return DelegateMessageProcessor + * @return DelegateProcessor */ private function getProcessor() { - return new DelegateMessageProcessor($this->processorsRegistry); + return new DelegateProcessor($this->processorsRegistry); } } diff --git a/pkg/enqueue/Consumption/CallbackMessageProcessor.php b/pkg/enqueue/Consumption/CallbackProcessor.php similarity index 87% rename from pkg/enqueue/Consumption/CallbackMessageProcessor.php rename to pkg/enqueue/Consumption/CallbackProcessor.php index eb156444b..01da5a74f 100644 --- a/pkg/enqueue/Consumption/CallbackMessageProcessor.php +++ b/pkg/enqueue/Consumption/CallbackProcessor.php @@ -3,8 +3,9 @@ use Enqueue\Psr\Context as PsrContext; use Enqueue\Psr\Message; +use Enqueue\Psr\Processor; -class CallbackMessageProcessor implements MessageProcessorInterface +class CallbackProcessor implements Processor { /** * @var callable diff --git a/pkg/enqueue/Consumption/Context.php b/pkg/enqueue/Consumption/Context.php index 0f413c0fe..08e078dc4 100644 --- a/pkg/enqueue/Consumption/Context.php +++ b/pkg/enqueue/Consumption/Context.php @@ -1,11 +1,12 @@ messageProcessor; + return $this->psrProcessor; } /** - * @param MessageProcessorInterface $messageProcessor + * @param Processor $psrProcessor */ - public function setMessageProcessor(MessageProcessorInterface $messageProcessor) + public function setPsrProcessor(Processor $psrProcessor) { - if ($this->messageProcessor) { + if ($this->psrProcessor) { throw new IllegalContextModificationException('The message processor could be set once'); } - $this->messageProcessor = $messageProcessor; + $this->psrProcessor = $psrProcessor; } /** diff --git a/pkg/enqueue/Consumption/Extension/LoggerExtension.php b/pkg/enqueue/Consumption/Extension/LoggerExtension.php index 4ddb9e0d7..3f9b616b6 100644 --- a/pkg/enqueue/Consumption/Extension/LoggerExtension.php +++ b/pkg/enqueue/Consumption/Extension/LoggerExtension.php @@ -1,11 +1,11 @@ getResult(); if (false == $result instanceof Result) { - throw new \LogicException('To send a reply an instance of Result class has to returned from a MessageProcessor.'); + throw new \LogicException('To send a reply an instance of Result class has to returned from a Processor.'); } if (false == $result->getReply()) { diff --git a/pkg/enqueue/Consumption/MessageProcessorInterface.php b/pkg/enqueue/Consumption/MessageProcessorInterface.php deleted file mode 100644 index d4135e376..000000000 --- a/pkg/enqueue/Consumption/MessageProcessorInterface.php +++ /dev/null @@ -1,16 +0,0 @@ -extension = $extension; $this->idleMicroseconds = $idleMicroseconds; - $this->boundMessageProcessors = []; + $this->boundProcessors = []; } /** @@ -62,31 +63,31 @@ public function getPsrContext() } /** - * @param Queue|string $queue - * @param MessageProcessorInterface|callable $messageProcessor + * @param Queue|string $queue + * @param Processor|callable $processor * * @return QueueConsumer */ - public function bind($queue, $messageProcessor) + public function bind($queue, $processor) { if (is_string($queue)) { $queue = $this->psrContext->createQueue($queue); } - if (is_callable($messageProcessor)) { - $messageProcessor = new CallbackMessageProcessor($messageProcessor); + if (is_callable($processor)) { + $processor = new CallbackProcessor($processor); } InvalidArgumentException::assertInstanceOf($queue, Queue::class); - InvalidArgumentException::assertInstanceOf($messageProcessor, MessageProcessorInterface::class); + InvalidArgumentException::assertInstanceOf($processor, Processor::class); if (empty($queue->getQueueName())) { throw new LogicException('The queue name must be not empty.'); } - if (array_key_exists($queue->getQueueName(), $this->boundMessageProcessors)) { + if (array_key_exists($queue->getQueueName(), $this->boundProcessors)) { throw new LogicException(sprintf('The queue was already bound. Queue: %s', $queue->getQueueName())); } - $this->boundMessageProcessors[$queue->getQueueName()] = [$queue, $messageProcessor]; + $this->boundProcessors[$queue->getQueueName()] = [$queue, $processor]; return $this; } @@ -104,7 +105,7 @@ public function consume(ExtensionInterface $runtimeExtension = null) /** @var Consumer[] $messageConsumers */ $messageConsumers = []; /** @var \Enqueue\Psr\Queue $queue */ - foreach ($this->boundMessageProcessors as list($queue, $messageProcessor)) { + foreach ($this->boundProcessors as list($queue, $processor)) { $messageConsumers[$queue->getQueueName()] = $this->psrContext->createConsumer($queue); } @@ -122,7 +123,7 @@ public function consume(ExtensionInterface $runtimeExtension = null) while (true) { try { /** @var Queue $queue */ - foreach ($this->boundMessageProcessors as list($queue, $messageProcessor)) { + foreach ($this->boundProcessors as list($queue, $processor)) { $logger->debug(sprintf('Switch to a queue %s', $queue->getQueueName())); $messageConsumer = $messageConsumers[$queue->getQueueName()]; @@ -131,7 +132,7 @@ public function consume(ExtensionInterface $runtimeExtension = null) $context->setLogger($logger); $context->setPsrQueue($queue); $context->setPsrConsumer($messageConsumer); - $context->setMessageProcessor($messageProcessor); + $context->setPsrProcessor($processor); $this->doConsume($extension, $context); } @@ -171,8 +172,8 @@ public function consume(ExtensionInterface $runtimeExtension = null) */ protected function doConsume(ExtensionInterface $extension, Context $context) { - $messageProcessor = $context->getMessageProcessor(); - $messageConsumer = $context->getPsrConsumer(); + $processor = $context->getPsrProcessor(); + $consumer = $context->getPsrConsumer(); $logger = $context->getLogger(); $extension->onBeforeReceive($context); @@ -181,7 +182,7 @@ protected function doConsume(ExtensionInterface $extension, Context $context) throw new ConsumptionInterruptedException(); } - if ($message = $messageConsumer->receive($timeout = 1)) { + if ($message = $consumer->receive($timeout = 1)) { $logger->info('Message received'); $logger->debug('Headers: {headers}', ['headers' => new VarExport($message->getHeaders())]); $logger->debug('Properties: {properties}', ['properties' => new VarExport($message->getProperties())]); @@ -191,19 +192,19 @@ protected function doConsume(ExtensionInterface $extension, Context $context) $extension->onPreReceived($context); if (!$context->getResult()) { - $result = $messageProcessor->process($message, $this->psrContext); + $result = $processor->process($message, $this->psrContext); $context->setResult($result); } switch ($context->getResult()) { case Result::ACK: - $messageConsumer->acknowledge($message); + $consumer->acknowledge($message); break; case Result::REJECT: - $messageConsumer->reject($message, false); + $consumer->reject($message, false); break; case Result::REQUEUE: - $messageConsumer->reject($message, true); + $consumer->reject($message, true); break; default: throw new \LogicException(sprintf('Status is not supported: %s', $context->getResult())); diff --git a/pkg/enqueue/Consumption/Result.php b/pkg/enqueue/Consumption/Result.php index 3d248afce..9b4eebfbd 100644 --- a/pkg/enqueue/Consumption/Result.php +++ b/pkg/enqueue/Consumption/Result.php @@ -1,26 +1,25 @@ reply; - } - - /** - * @param Message|null $reply - */ - public function setReply(Message $reply = null) - { - $this->reply = $reply; - } - /** * @param string $status * @param string $reason @@ -79,6 +62,30 @@ public function getReason() return $this->reason; } + /** + * @return PsrMessage|null + */ + public function getReply() + { + return $this->reply; + } + + /** + * @param PsrMessage|null $reply + */ + public function setReply(PsrMessage $reply = null) + { + $this->reply = $reply; + } + + /** + * @return string + */ + public function __toString() + { + return $this->status; + } + /** * @param string $reason * @@ -110,24 +117,16 @@ public static function requeue($reason = '') } /** - * @param Message $replyMessage + * @param PsrMessage $replyMessage * @param string|null $reason * * @return Result */ - public static function reply(Message $replyMessage, $reason = '') + public static function reply(PsrMessage $replyMessage, $reason = '') { $result = static::ack($reason); $result->setReply($replyMessage); return $result; } - - /** - * @return string - */ - public function __toString() - { - return $this->status; - } } diff --git a/pkg/enqueue/Router/RouteRecipientListProcessor.php b/pkg/enqueue/Router/RouteRecipientListProcessor.php index f81d25bca..ded38e9fb 100644 --- a/pkg/enqueue/Router/RouteRecipientListProcessor.php +++ b/pkg/enqueue/Router/RouteRecipientListProcessor.php @@ -3,10 +3,9 @@ use Enqueue\Psr\Context; use Enqueue\Psr\Message; -use Enqueue\Consumption\MessageProcessorInterface; -use Enqueue\Consumption\Result; +use Enqueue\Psr\Processor; -class RouteRecipientListProcessor implements MessageProcessorInterface +class RouteRecipientListProcessor implements Processor { /** * @var RecipientListRouterInterface @@ -31,6 +30,6 @@ public function process(Message $message, Context $context) $producer->send($recipient->getDestination(), $recipient->getMessage()); } - return Result::ACK; + return self::ACK; } } diff --git a/pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php b/pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php index ba69433fb..fe5d14f0e 100644 --- a/pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php +++ b/pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php @@ -1,7 +1,7 @@ processors[$processorName])) { - throw new \LogicException(sprintf('MessageProcessor was not found. processorName: "%s"', $processorName)); + throw new \LogicException(sprintf('Processor was not found. processorName: "%s"', $processorName)); } if (null === $this->container) { @@ -47,10 +47,10 @@ public function get($processorName) $processor = $this->container->get($this->processors[$processorName]); - if (false == $processor instanceof MessageProcessorInterface) { + if (false == $processor instanceof Processor) { throw new \LogicException(sprintf( 'Invalid instance of message processor. expected: "%s", got: "%s"', - MessageProcessorInterface::class, + Processor::class, is_object($processor) ? get_class($processor) : gettype($processor) )); } diff --git a/pkg/enqueue/Symfony/Consumption/ContainerAwareConsumeMessagesCommand.php b/pkg/enqueue/Symfony/Consumption/ContainerAwareConsumeMessagesCommand.php index 56a3647f8..4d0ec9f3e 100644 --- a/pkg/enqueue/Symfony/Consumption/ContainerAwareConsumeMessagesCommand.php +++ b/pkg/enqueue/Symfony/Consumption/ContainerAwareConsumeMessagesCommand.php @@ -3,8 +3,8 @@ use Enqueue\Consumption\ChainExtension; use Enqueue\Consumption\Extension\LoggerExtension; -use Enqueue\Consumption\MessageProcessorInterface; use Enqueue\Consumption\QueueConsumer; +use Enqueue\Psr\Processor; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; @@ -59,13 +59,13 @@ protected function execute(InputInterface $input, OutputInterface $output) { $queueName = $input->getArgument('queue'); - /** @var MessageProcessorInterface $messageProcessor */ - $messageProcessor = $this->container->get($input->getArgument('processor-service')); - if (!$messageProcessor instanceof MessageProcessorInterface) { + /** @var Processor $processor */ + $processor = $this->container->get($input->getArgument('processor-service')); + if (!$processor instanceof Processor) { throw new \LogicException(sprintf( 'Invalid message processor service given. It must be an instance of %s but %s', - MessageProcessorInterface::class, - get_class($messageProcessor) + Processor::class, + get_class($processor) )); } @@ -78,7 +78,7 @@ protected function execute(InputInterface $input, OutputInterface $output) $queue = $this->consumer->getPsrContext()->createQueue($queueName); // @todo set additional queue options - $this->consumer->bind($queue, $messageProcessor); + $this->consumer->bind($queue, $processor); $this->consumer->consume($runtimeExtensions); } finally { $this->consumer->getPsrContext()->close(); diff --git a/pkg/enqueue/Tests/Client/ArrayMessageProcessorRegistryTest.php b/pkg/enqueue/Tests/Client/ArrayMessageProcessorRegistryTest.php deleted file mode 100644 index d67b6a588..000000000 --- a/pkg/enqueue/Tests/Client/ArrayMessageProcessorRegistryTest.php +++ /dev/null @@ -1,58 +0,0 @@ -assertClassImplements(MessageProcessorRegistryInterface::class, ArrayMessageProcessorRegistry::class); - } - - public function testCouldBeConstructedWithoutAnyArgument() - { - new ArrayMessageProcessorRegistry(); - } - - public function testShouldThrowExceptionIfProcessorIsNotSet() - { - $registry = new ArrayMessageProcessorRegistry(); - - $this->expectException(\LogicException::class); - $this->expectExceptionMessage('MessageProcessor was not found. processorName: "processor-name"'); - $registry->get('processor-name'); - } - - public function testShouldAllowGetProcessorAddedViaConstructor() - { - $processor = $this->createMessageProcessorMock(); - - $registry = new ArrayMessageProcessorRegistry(['aFooName' => $processor]); - - $this->assertSame($processor, $registry->get('aFooName')); - } - - public function testShouldAllowGetProcessorAddedViaAddMethod() - { - $processor = $this->createMessageProcessorMock(); - - $registry = new ArrayMessageProcessorRegistry(); - $registry->add('aFooName', $processor); - - $this->assertSame($processor, $registry->get('aFooName')); - } - - /** - * @return \PHPUnit_Framework_MockObject_MockObject|MessageProcessorInterface - */ - protected function createMessageProcessorMock() - { - return $this->createMock(MessageProcessorInterface::class); - } -} diff --git a/pkg/enqueue/Tests/Client/ArrayProcessorRegistryTest.php b/pkg/enqueue/Tests/Client/ArrayProcessorRegistryTest.php new file mode 100644 index 000000000..1c60aa44d --- /dev/null +++ b/pkg/enqueue/Tests/Client/ArrayProcessorRegistryTest.php @@ -0,0 +1,58 @@ +assertClassImplements(ProcessorRegistryInterface::class, ArrayProcessorRegistry::class); + } + + public function testCouldBeConstructedWithoutAnyArgument() + { + new ArrayProcessorRegistry(); + } + + public function testShouldThrowExceptionIfProcessorIsNotSet() + { + $registry = new ArrayProcessorRegistry(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Processor was not found. processorName: "processor-name"'); + $registry->get('processor-name'); + } + + public function testShouldAllowGetProcessorAddedViaConstructor() + { + $processor = $this->createProcessorMock(); + + $registry = new ArrayProcessorRegistry(['aFooName' => $processor]); + + $this->assertSame($processor, $registry->get('aFooName')); + } + + public function testShouldAllowGetProcessorAddedViaAddMethod() + { + $processor = $this->createProcessorMock(); + + $registry = new ArrayProcessorRegistry(); + $registry->add('aFooName', $processor); + + $this->assertSame($processor, $registry->get('aFooName')); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|Processor + */ + protected function createProcessorMock() + { + return $this->createMock(Processor::class); + } +} diff --git a/pkg/enqueue/Tests/Client/ConfigTest.php b/pkg/enqueue/Tests/Client/ConfigTest.php index f9957c77d..3a140d226 100644 --- a/pkg/enqueue/Tests/Client/ConfigTest.php +++ b/pkg/enqueue/Tests/Client/ConfigTest.php @@ -5,7 +5,7 @@ class ConfigTest extends \PHPUnit_Framework_TestCase { - public function testShouldReturnRouterMessageProcessorNameSetInConstructor() + public function testShouldReturnRouterProcessorNameSetInConstructor() { $config = new Config( 'aPrefix', diff --git a/pkg/enqueue/Tests/Client/ConsumptionExtension/DelayRedeliveredMessageExtensionTest.php b/pkg/enqueue/Tests/Client/ConsumptionExtension/DelayRedeliveredMessageExtensionTest.php index 893d8e8e5..5c6d35a17 100644 --- a/pkg/enqueue/Tests/Client/ConsumptionExtension/DelayRedeliveredMessageExtensionTest.php +++ b/pkg/enqueue/Tests/Client/ConsumptionExtension/DelayRedeliveredMessageExtensionTest.php @@ -1,12 +1,12 @@ createMessageProcessorRegistryMock()); + new DelegateProcessor($this->createProcessorRegistryMock()); } public function testShouldThrowExceptionIfProcessorNameIsNotSet() @@ -22,7 +22,7 @@ public function testShouldThrowExceptionIfProcessorNameIsNotSet() 'Got message without required parameter: "enqueue.processor_name"' ); - $processor = new DelegateMessageProcessor($this->createMessageProcessorRegistryMock()); + $processor = new DelegateProcessor($this->createProcessorRegistryMock()); $processor->process(new NullMessage(), $this->createPsrContextMock()); } @@ -34,34 +34,34 @@ public function testShouldProcessMessage() Config::PARAMETER_PROCESSOR_NAME => 'processor-name', ]); - $messageProcessor = $this->createMessageProcessorMock(); - $messageProcessor + $processor = $this->createProcessorMock(); + $processor ->expects($this->once()) ->method('process') ->with($this->identicalTo($message), $this->identicalTo($session)) ->will($this->returnValue('return-value')) ; - $processorRegistry = $this->createMessageProcessorRegistryMock(); + $processorRegistry = $this->createProcessorRegistryMock(); $processorRegistry ->expects($this->once()) ->method('get') ->with('processor-name') - ->will($this->returnValue($messageProcessor)) + ->will($this->returnValue($processor)) ; - $processor = new DelegateMessageProcessor($processorRegistry); + $processor = new DelegateProcessor($processorRegistry); $return = $processor->process($message, $session); $this->assertEquals('return-value', $return); } /** - * @return \PHPUnit_Framework_MockObject_MockObject|MessageProcessorRegistryInterface + * @return \PHPUnit_Framework_MockObject_MockObject|ProcessorRegistryInterface */ - protected function createMessageProcessorRegistryMock() + protected function createProcessorRegistryMock() { - return $this->createMock(MessageProcessorRegistryInterface::class); + return $this->createMock(ProcessorRegistryInterface::class); } /** @@ -73,10 +73,10 @@ protected function createPsrContextMock() } /** - * @return \PHPUnit_Framework_MockObject_MockObject|MessageProcessorInterface + * @return \PHPUnit_Framework_MockObject_MockObject|Processor */ - protected function createMessageProcessorMock() + protected function createProcessorMock() { - return $this->createMock(MessageProcessorInterface::class); + return $this->createMock(Processor::class); } } diff --git a/pkg/enqueue/Tests/Consumption/CallbackMessageProcessorTest.php b/pkg/enqueue/Tests/Consumption/CallbackProcessorTest.php similarity index 58% rename from pkg/enqueue/Tests/Consumption/CallbackMessageProcessorTest.php rename to pkg/enqueue/Tests/Consumption/CallbackProcessorTest.php index 582992dcb..e37c6a048 100644 --- a/pkg/enqueue/Tests/Consumption/CallbackMessageProcessorTest.php +++ b/pkg/enqueue/Tests/Consumption/CallbackProcessorTest.php @@ -1,24 +1,24 @@ assertClassImplements(MessageProcessorInterface::class, CallbackMessageProcessor::class); + $this->assertClassImplements(Processor::class, CallbackProcessor::class); } public function testCouldBeConstructedWithCallableAsArgument() { - new CallbackMessageProcessor(function () { + new CallbackProcessor(function () { }); } @@ -27,7 +27,7 @@ public function testShouldCallCallbackAndProxyItsReturnedValue() $expectedMessage = new NullMessage(); $expectedContext = new NullContext(); - $processor = new CallbackMessageProcessor(function ($message, $context) use ($expectedMessage, $expectedContext) { + $processor = new CallbackProcessor(function ($message, $context) use ($expectedMessage, $expectedContext) { $this->assertSame($expectedMessage, $message); $this->assertSame($expectedContext, $context); diff --git a/pkg/enqueue/Tests/Consumption/ContextTest.php b/pkg/enqueue/Tests/Consumption/ContextTest.php index 5191e5976..89c2dde14 100644 --- a/pkg/enqueue/Tests/Consumption/ContextTest.php +++ b/pkg/enqueue/Tests/Consumption/ContextTest.php @@ -1,12 +1,12 @@ createMessageProcessor(); + $processorMock = $this->createProcessorMock(); $context = new Context($this->createPsrContext()); - $context->setMessageProcessor($messageProcessor); + $context->setPsrProcessor($processorMock); - $this->assertSame($messageProcessor, $context->getMessageProcessor()); + $this->assertSame($processorMock, $context->getPsrProcessor()); } - public function testThrowOnTryToChangeMessageProcessorIfAlreadySet() + public function testThrowOnTryToChangeProcessorIfAlreadySet() { - $messageProcessor = $this->createMessageProcessor(); - $anotherMessageProcessor = $this->createMessageProcessor(); + $processor = $this->createProcessorMock(); + $anotherProcessor = $this->createProcessorMock(); $context = new Context($this->createPsrContext()); - $context->setMessageProcessor($messageProcessor); + $context->setPsrProcessor($processor); $this->expectException(IllegalContextModificationException::class); - $context->setMessageProcessor($anotherMessageProcessor); + $context->setPsrProcessor($anotherProcessor); } public function testShouldAllowGetLoggerPreviouslySet() @@ -247,10 +247,10 @@ protected function createPsrConsumer() } /** - * @return \PHPUnit_Framework_MockObject_MockObject|MessageProcessorInterface + * @return \PHPUnit_Framework_MockObject_MockObject|Processor */ - protected function createMessageProcessor() + protected function createProcessorMock() { - return $this->createMock(MessageProcessorInterface::class); + return $this->createMock(Processor::class); } } diff --git a/pkg/enqueue/Tests/Consumption/Extension/LimitConsumedMessagesExtensionTest.php b/pkg/enqueue/Tests/Consumption/Extension/LimitConsumedMessagesExtensionTest.php index d8efd93dc..fcbd236ba 100644 --- a/pkg/enqueue/Tests/Consumption/Extension/LimitConsumedMessagesExtensionTest.php +++ b/pkg/enqueue/Tests/Consumption/Extension/LimitConsumedMessagesExtensionTest.php @@ -1,11 +1,11 @@ createMock(PsrContext::class)); $context->setLogger($this->createMock(LoggerInterface::class)); $context->setPsrConsumer($this->createMock(Consumer::class)); - $context->setMessageProcessor($this->createMock(MessageProcessorInterface::class)); + $context->setPsrProcessor($this->createMock(Processor::class)); return $context; } diff --git a/pkg/enqueue/Tests/Consumption/Extension/LimitConsumerMemoryExtensionTest.php b/pkg/enqueue/Tests/Consumption/Extension/LimitConsumerMemoryExtensionTest.php index 5a90eeddc..761c831a2 100644 --- a/pkg/enqueue/Tests/Consumption/Extension/LimitConsumerMemoryExtensionTest.php +++ b/pkg/enqueue/Tests/Consumption/Extension/LimitConsumerMemoryExtensionTest.php @@ -1,11 +1,11 @@ createMock(PsrContext::class)); $context->setLogger($this->createMock(LoggerInterface::class)); $context->setPsrConsumer($this->createMock(Consumer::class)); - $context->setMessageProcessor($this->createMock(MessageProcessorInterface::class)); + $context->setPsrProcessor($this->createMock(Processor::class)); return $context; } diff --git a/pkg/enqueue/Tests/Consumption/Extension/LimitConsumptionTimeExtensionTest.php b/pkg/enqueue/Tests/Consumption/Extension/LimitConsumptionTimeExtensionTest.php index 15c2b8980..5b2b12875 100644 --- a/pkg/enqueue/Tests/Consumption/Extension/LimitConsumptionTimeExtensionTest.php +++ b/pkg/enqueue/Tests/Consumption/Extension/LimitConsumptionTimeExtensionTest.php @@ -1,11 +1,11 @@ createMock(PsrContext::class)); $context->setLogger($this->createMock(LoggerInterface::class)); $context->setPsrConsumer($this->createMock(Consumer::class)); - $context->setMessageProcessor($this->createMock(MessageProcessorInterface::class)); + $context->setPsrProcessor($this->createMock(Processor::class)); return $context; } diff --git a/pkg/enqueue/Tests/Consumption/Extension/LoggerExtensionTest.php b/pkg/enqueue/Tests/Consumption/Extension/LoggerExtensionTest.php index 4bcdc09b4..e8a901325 100644 --- a/pkg/enqueue/Tests/Consumption/Extension/LoggerExtensionTest.php +++ b/pkg/enqueue/Tests/Consumption/Extension/LoggerExtensionTest.php @@ -1,12 +1,12 @@ setResult('notInstanceOfResult'); $this->expectException(\LogicException::class); - $this->expectExceptionMessage('To send a reply an instance of Result class has to returned from a MessageProcessor.'); + $this->expectExceptionMessage('To send a reply an instance of Result class has to returned from a Processor.'); $extension->onPostReceived($context); } diff --git a/pkg/enqueue/Tests/Consumption/QueueConsumerTest.php b/pkg/enqueue/Tests/Consumption/QueueConsumerTest.php index cc3154b73..e8a99a263 100644 --- a/pkg/enqueue/Tests/Consumption/QueueConsumerTest.php +++ b/pkg/enqueue/Tests/Consumption/QueueConsumerTest.php @@ -1,18 +1,18 @@ createPsrContextStub(), $this->createExtension()); } - public function testShouldSetEmptyArrayToBoundMessageProcessorsPropertyInConstructor() + public function testShouldSetEmptyArrayToBoundProcessorsPropertyInConstructor() { $consumer = new QueueConsumer($this->createPsrContextStub(), null, 0); - $this->assertAttributeSame([], 'boundMessageProcessors', $consumer); + $this->assertAttributeSame([], 'boundProcessors', $consumer); } public function testShouldAllowGetConnectionSetInConstructor() @@ -52,57 +52,57 @@ public function testShouldAllowGetConnectionSetInConstructor() public function testThrowIfQueueNameEmptyOnBind() { - $messageProcessorMock = $this->createMessageProcessorMock(); + $processorMock = $this->createProcessorMock(); $consumer = new QueueConsumer($this->createPsrContextStub(), null, 0); $this->expectException(\LogicException::class); $this->expectExceptionMessage('The queue name must be not empty.'); - $consumer->bind(new NullQueue(''), $messageProcessorMock); + $consumer->bind(new NullQueue(''), $processorMock); } - public function testThrowIfQueueAlreadyBoundToMessageProcessorOnBind() + public function testThrowIfQueueAlreadyBoundToProcessorOnBind() { - $messageProcessorMock = $this->createMessageProcessorMock(); + $processorMock = $this->createProcessorMock(); $consumer = new QueueConsumer($this->createPsrContextStub(), null, 0); - $consumer->bind(new NullQueue('theQueueName'), $messageProcessorMock); + $consumer->bind(new NullQueue('theQueueName'), $processorMock); $this->expectException(\LogicException::class); $this->expectExceptionMessage('The queue was already bound.'); - $consumer->bind(new NullQueue('theQueueName'), $messageProcessorMock); + $consumer->bind(new NullQueue('theQueueName'), $processorMock); } - public function testShouldAllowBindMessageProcessorToQueue() + public function testShouldAllowBindProcessorToQueue() { $queue = new NullQueue('theQueueName'); - $messageProcessorMock = $this->createMessageProcessorMock(); + $processorMock = $this->createProcessorMock(); $consumer = new QueueConsumer($this->createPsrContextStub(), null, 0); - $consumer->bind($queue, $messageProcessorMock); + $consumer->bind($queue, $processorMock); - $this->assertAttributeSame(['theQueueName' => [$queue, $messageProcessorMock]], 'boundMessageProcessors', $consumer); + $this->assertAttributeSame(['theQueueName' => [$queue, $processorMock]], 'boundProcessors', $consumer); } public function testThrowIfQueueNeitherInstanceOfQueueNorString() { - $messageProcessorMock = $this->createMessageProcessorMock(); + $processorMock = $this->createProcessorMock(); $consumer = new QueueConsumer($this->createPsrContextStub(), null, 0); $this->expectException(InvalidArgumentException::class); $this->expectExceptionMessage('The argument must be an instance of Enqueue\Psr\Queue but got stdClass.'); - $consumer->bind(new \stdClass(), $messageProcessorMock); + $consumer->bind(new \stdClass(), $processorMock); } - public function testThrowIfMessageProcessorNeitherInstanceOfMessageProcessorNorCallable() + public function testThrowIfProcessorNeitherInstanceOfProcessorNorCallable() { $consumer = new QueueConsumer($this->createPsrContextStub(), null, 0); $this->expectException(InvalidArgumentException::class); - $this->expectExceptionMessage('The argument must be an instance of Enqueue\Consumption\MessageProcessorInterface but got stdClass.'); + $this->expectExceptionMessage('The argument must be an instance of Enqueue\Psr\Processor but got stdClass.'); $consumer->bind(new NullQueue(''), new \stdClass()); } @@ -126,7 +126,7 @@ public function testShouldAllowBindCallbackToQueueName() $consumer->bind($queueName, $callback); - $boundProcessors = $this->readAttribute($consumer, 'boundMessageProcessors'); + $boundProcessors = $this->readAttribute($consumer, 'boundProcessors'); $this->assertInternalType('array', $boundProcessors); $this->assertCount(1, $boundProcessors); @@ -135,16 +135,16 @@ public function testShouldAllowBindCallbackToQueueName() $this->assertInternalType('array', $boundProcessors[$queueName]); $this->assertCount(2, $boundProcessors[$queueName]); $this->assertSame($queue, $boundProcessors[$queueName][0]); - $this->assertInstanceOf(CallbackMessageProcessor::class, $boundProcessors[$queueName][1]); + $this->assertInstanceOf(CallbackProcessor::class, $boundProcessors[$queueName][1]); } public function testShouldReturnSelfOnBind() { - $messageProcessorMock = $this->createMessageProcessorMock(); + $processorMock = $this->createProcessorMock(); $consumer = new QueueConsumer($this->createPsrContextStub(), null, 0); - $this->assertSame($consumer, $consumer->bind(new NullQueue('aQueueName'), $messageProcessorMock)); + $this->assertSame($consumer, $consumer->bind(new NullQueue('aQueueName'), $processorMock)); } public function testShouldSubscribeToGivenQueueAndQuitAfterFifthIdleCycle() @@ -166,14 +166,14 @@ public function testShouldSubscribeToGivenQueueAndQuitAfterFifthIdleCycle() ->willReturn($messageConsumerMock) ; - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->never()) ->method('process') ; $queueConsumer = new QueueConsumer($contextMock, new BreakCycleExtension(5), 0); - $queueConsumer->bind($expectedQueue, $messageProcessorMock); + $queueConsumer->bind($expectedQueue, $processorMock); $queueConsumer->consume(); } @@ -184,20 +184,20 @@ public function testShouldProcessFiveMessagesAndQuit() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->exactly(5)) ->method('process') ->willReturn(Result::ACK) ; $queueConsumer = new QueueConsumer($contextStub, new BreakCycleExtension(5), 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } - public function testShouldAckMessageIfMessageProcessorReturnSuchStatus() + public function testShouldAckMessageIfProcessorReturnSuchStatus() { $messageMock = $this->createMessageMock(); $messageConsumerStub = $this->createMessageConsumerStub($messageMock); @@ -209,8 +209,8 @@ public function testShouldAckMessageIfMessageProcessorReturnSuchStatus() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->with($this->identicalTo($messageMock)) @@ -218,20 +218,20 @@ public function testShouldAckMessageIfMessageProcessorReturnSuchStatus() ; $queueConsumer = new QueueConsumer($contextStub, new BreakCycleExtension(1), 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } - public function testThrowIfMessageProcessorReturnNull() + public function testThrowIfProcessorReturnNull() { $messageMock = $this->createMessageMock(); $messageConsumerStub = $this->createMessageConsumerStub($messageMock); $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->with($this->identicalTo($messageMock)) @@ -239,14 +239,14 @@ public function testThrowIfMessageProcessorReturnNull() ; $queueConsumer = new QueueConsumer($contextStub, new BreakCycleExtension(1), 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $this->expectException(\LogicException::class); $this->expectExceptionMessage('Status is not supported'); $queueConsumer->consume(); } - public function testShouldRejectMessageIfMessageProcessorReturnSuchStatus() + public function testShouldRejectMessageIfProcessorReturnSuchStatus() { $messageMock = $this->createMessageMock(); $messageConsumerStub = $this->createMessageConsumerStub($messageMock); @@ -258,8 +258,8 @@ public function testShouldRejectMessageIfMessageProcessorReturnSuchStatus() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->with($this->identicalTo($messageMock)) @@ -267,12 +267,12 @@ public function testShouldRejectMessageIfMessageProcessorReturnSuchStatus() ; $queueConsumer = new QueueConsumer($contextStub, new BreakCycleExtension(1), 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } - public function testShouldRequeueMessageIfMessageProcessorReturnSuchStatus() + public function testShouldRequeueMessageIfProcessorReturnSuchStatus() { $messageMock = $this->createMessageMock(); $messageConsumerStub = $this->createMessageConsumerStub($messageMock); @@ -284,8 +284,8 @@ public function testShouldRequeueMessageIfMessageProcessorReturnSuchStatus() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->with($this->identicalTo($messageMock)) @@ -293,20 +293,20 @@ public function testShouldRequeueMessageIfMessageProcessorReturnSuchStatus() ; $queueConsumer = new QueueConsumer($contextStub, new BreakCycleExtension(1), 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } - public function testThrowIfMessageProcessorReturnInvalidStatus() + public function testThrowIfProcessorReturnInvalidStatus() { $messageMock = $this->createMessageMock(); $messageConsumerStub = $this->createMessageConsumerStub($messageMock); $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->with($this->identicalTo($messageMock)) @@ -314,14 +314,14 @@ public function testThrowIfMessageProcessorReturnInvalidStatus() ; $queueConsumer = new QueueConsumer($contextStub, new BreakCycleExtension(1), 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $this->expectException(\LogicException::class); $this->expectExceptionMessage('Status is not supported: invalidStatus'); $queueConsumer->consume(); } - public function testShouldNotPassMessageToMessageProcessorIfItWasProcessedByExtension() + public function testShouldNotPassMessageToProcessorIfItWasProcessedByExtension() { $extension = $this->createExtension(); $extension @@ -338,15 +338,15 @@ public function testShouldNotPassMessageToMessageProcessorIfItWasProcessedByExte $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->never()) ->method('process') ; $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } @@ -357,7 +357,7 @@ public function testShouldCallOnStartExtensionMethod() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); + $processorMock = $this->createProcessorMock(); $extension = $this->createExtension(); $extension @@ -367,11 +367,11 @@ public function testShouldCallOnStartExtensionMethod() ->willReturnCallback(function (Context $context) use ( $contextStub, $messageConsumerStub, - $messageProcessorMock + $processorMock ) { $this->assertSame($contextStub, $context->getPsrContext()); $this->assertNull($context->getPsrConsumer()); - $this->assertNull($context->getMessageProcessor()); + $this->assertNull($context->getPsrProcessor()); $this->assertNull($context->getLogger()); $this->assertNull($context->getPsrMessage()); $this->assertNull($context->getException()); @@ -383,7 +383,7 @@ public function testShouldCallOnStartExtensionMethod() $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } @@ -394,7 +394,7 @@ public function testShouldCallOnIdleExtensionMethod() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); + $processorMock = $this->createProcessorMock(); $extension = $this->createExtension(); $extension @@ -404,11 +404,11 @@ public function testShouldCallOnIdleExtensionMethod() ->willReturnCallback(function (Context $context) use ( $contextStub, $messageConsumerStub, - $messageProcessorMock + $processorMock ) { $this->assertSame($contextStub, $context->getPsrContext()); $this->assertSame($messageConsumerStub, $context->getPsrConsumer()); - $this->assertSame($messageProcessorMock, $context->getMessageProcessor()); + $this->assertSame($processorMock, $context->getPsrProcessor()); $this->assertInstanceOf(NullLogger::class, $context->getLogger()); $this->assertNull($context->getPsrMessage()); $this->assertNull($context->getException()); @@ -419,7 +419,7 @@ public function testShouldCallOnIdleExtensionMethod() $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } @@ -431,7 +431,7 @@ public function testShouldCallOnBeforeReceiveExtensionMethod() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorStub(); + $processorMock = $this->createProcessorStub(); $queue = new NullQueue('aQueueName'); @@ -443,13 +443,13 @@ public function testShouldCallOnBeforeReceiveExtensionMethod() ->willReturnCallback(function (Context $context) use ( $contextStub, $messageConsumerStub, - $messageProcessorMock, + $processorMock, $expectedMessage, $queue ) { $this->assertSame($contextStub, $context->getPsrContext()); $this->assertSame($messageConsumerStub, $context->getPsrConsumer()); - $this->assertSame($messageProcessorMock, $context->getMessageProcessor()); + $this->assertSame($processorMock, $context->getPsrProcessor()); $this->assertInstanceOf(NullLogger::class, $context->getLogger()); $this->assertNull($context->getPsrMessage()); $this->assertNull($context->getException()); @@ -461,7 +461,7 @@ public function testShouldCallOnBeforeReceiveExtensionMethod() $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind($queue, $messageProcessorMock); + $queueConsumer->bind($queue, $processorMock); $queueConsumer->consume(); } @@ -473,7 +473,7 @@ public function testShouldCallOnPreReceivedAndPostReceivedExtensionMethods() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorStub(); + $processorMock = $this->createProcessorStub(); $extension = $this->createExtension(); $extension @@ -483,12 +483,12 @@ public function testShouldCallOnPreReceivedAndPostReceivedExtensionMethods() ->willReturnCallback(function (Context $context) use ( $contextStub, $messageConsumerStub, - $messageProcessorMock, + $processorMock, $expectedMessage ) { $this->assertSame($contextStub, $context->getPsrContext()); $this->assertSame($messageConsumerStub, $context->getPsrConsumer()); - $this->assertSame($messageProcessorMock, $context->getMessageProcessor()); + $this->assertSame($processorMock, $context->getPsrProcessor()); $this->assertSame($expectedMessage, $context->getPsrMessage()); $this->assertInstanceOf(NullLogger::class, $context->getLogger()); $this->assertNull($context->getException()); @@ -503,12 +503,12 @@ public function testShouldCallOnPreReceivedAndPostReceivedExtensionMethods() ->willReturnCallback(function (Context $context) use ( $contextStub, $messageConsumerStub, - $messageProcessorMock, + $processorMock, $expectedMessage ) { $this->assertSame($contextStub, $context->getPsrContext()); $this->assertSame($messageConsumerStub, $context->getPsrConsumer()); - $this->assertSame($messageProcessorMock, $context->getMessageProcessor()); + $this->assertSame($processorMock, $context->getPsrProcessor()); $this->assertSame($expectedMessage, $context->getPsrMessage()); $this->assertInstanceOf(NullLogger::class, $context->getLogger()); $this->assertNull($context->getException()); @@ -519,7 +519,7 @@ public function testShouldCallOnPreReceivedAndPostReceivedExtensionMethods() $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } @@ -530,7 +530,7 @@ public function testShouldAllowInterruptConsumingOnIdle() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); + $processorMock = $this->createProcessorMock(); $extension = $this->createExtension(); $extension @@ -548,11 +548,11 @@ public function testShouldAllowInterruptConsumingOnIdle() ->willReturnCallback(function (Context $context) use ( $contextStub, $messageConsumerStub, - $messageProcessorMock + $processorMock ) { $this->assertSame($contextStub, $context->getPsrContext()); $this->assertSame($messageConsumerStub, $context->getPsrConsumer()); - $this->assertSame($messageProcessorMock, $context->getMessageProcessor()); + $this->assertSame($processorMock, $context->getPsrProcessor()); $this->assertInstanceOf(NullLogger::class, $context->getLogger()); $this->assertNull($context->getPsrMessage()); $this->assertNull($context->getException()); @@ -563,7 +563,7 @@ public function testShouldAllowInterruptConsumingOnIdle() $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } @@ -578,7 +578,7 @@ public function testShouldCloseSessionWhenConsumptionInterrupted() ->method('close') ; - $messageProcessorMock = $this->createMessageProcessorMock(); + $processorMock = $this->createProcessorMock(); $extension = $this->createExtension(); $extension @@ -592,7 +592,7 @@ public function testShouldCloseSessionWhenConsumptionInterrupted() $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } @@ -609,15 +609,15 @@ public function testShouldCloseSessionWhenConsumptionInterruptedByException() ->method('close') ; - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->willThrowException($expectedException) ; $queueConsumer = new QueueConsumer($contextStub, new BreakCycleExtension(1), 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); try { $queueConsumer->consume(); @@ -640,8 +640,8 @@ public function testShouldSetMainExceptionAsPreviousToExceptionThrownOnInterrupt $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->willThrowException($mainException) @@ -656,7 +656,7 @@ public function testShouldSetMainExceptionAsPreviousToExceptionThrownOnInterrupt $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); try { $queueConsumer->consume(); @@ -677,8 +677,8 @@ public function testShouldAllowInterruptConsumingOnPreReceiveButProcessCurrentMe $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->willReturn(Result::ACK) @@ -700,12 +700,12 @@ public function testShouldAllowInterruptConsumingOnPreReceiveButProcessCurrentMe ->willReturnCallback(function (Context $context) use ( $contextStub, $messageConsumerStub, - $messageProcessorMock, + $processorMock, $expectedMessage ) { $this->assertSame($contextStub, $context->getPsrContext()); $this->assertSame($messageConsumerStub, $context->getPsrConsumer()); - $this->assertSame($messageProcessorMock, $context->getMessageProcessor()); + $this->assertSame($processorMock, $context->getPsrProcessor()); $this->assertSame($expectedMessage, $context->getPsrMessage()); $this->assertInstanceOf(NullLogger::class, $context->getLogger()); $this->assertNull($context->getException()); @@ -716,7 +716,7 @@ public function testShouldAllowInterruptConsumingOnPreReceiveButProcessCurrentMe $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } @@ -728,8 +728,8 @@ public function testShouldAllowInterruptConsumingOnPostReceive() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->willReturn(Result::ACK) @@ -751,12 +751,12 @@ public function testShouldAllowInterruptConsumingOnPostReceive() ->willReturnCallback(function (Context $context) use ( $contextStub, $messageConsumerStub, - $messageProcessorMock, + $processorMock, $expectedMessage ) { $this->assertSame($contextStub, $context->getPsrContext()); $this->assertSame($messageConsumerStub, $context->getPsrConsumer()); - $this->assertSame($messageProcessorMock, $context->getMessageProcessor()); + $this->assertSame($processorMock, $context->getPsrProcessor()); $this->assertSame($expectedMessage, $context->getPsrMessage()); $this->assertInstanceOf(NullLogger::class, $context->getLogger()); $this->assertNull($context->getException()); @@ -767,7 +767,7 @@ public function testShouldAllowInterruptConsumingOnPostReceive() $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } @@ -780,8 +780,8 @@ public function testShouldCallOnInterruptedIfExceptionThrow() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->willThrowException($expectedException) @@ -795,13 +795,13 @@ public function testShouldCallOnInterruptedIfExceptionThrow() ->willReturnCallback(function (Context $context) use ( $contextStub, $messageConsumerStub, - $messageProcessorMock, + $processorMock, $expectedMessage, $expectedException ) { $this->assertSame($contextStub, $context->getPsrContext()); $this->assertSame($messageConsumerStub, $context->getPsrConsumer()); - $this->assertSame($messageProcessorMock, $context->getMessageProcessor()); + $this->assertSame($processorMock, $context->getPsrProcessor()); $this->assertSame($expectedMessage, $context->getPsrMessage()); $this->assertSame($expectedException, $context->getException()); $this->assertInstanceOf(NullLogger::class, $context->getLogger()); @@ -812,7 +812,7 @@ public function testShouldCallOnInterruptedIfExceptionThrow() $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $this->expectException(\Exception::class); $this->expectExceptionMessage('Process failed'); @@ -826,8 +826,8 @@ public function testShouldCallExtensionPassedOnRuntime() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->willReturn(Result::ACK) @@ -856,7 +856,7 @@ public function testShouldCallExtensionPassedOnRuntime() ; $queueConsumer = new QueueConsumer($contextStub, new BreakCycleExtension(1), 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(new ChainExtension([$runtimeExtension])); } @@ -868,8 +868,8 @@ public function testShouldChangeLoggerOnStart() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->once()) ->method('process') ->willReturn(Result::ACK) @@ -906,7 +906,7 @@ public function testShouldChangeLoggerOnStart() $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, 0); - $queueConsumer->bind(new NullQueue('aQueueName'), $messageProcessorMock); + $queueConsumer->bind(new NullQueue('aQueueName'), $processorMock); $queueConsumer->consume(); } @@ -918,8 +918,8 @@ public function testShouldCallEachQueueOneByOne() $contextStub = $this->createPsrContextStub($messageConsumerStub); - $messageProcessorMock = $this->createMessageProcessorStub(); - $anotherMessageProcessorMock = $this->createMessageProcessorStub(); + $processorMock = $this->createProcessorStub(); + $anotherProcessorMock = $this->createProcessorStub(); $queue1 = new NullQueue('aQueueName'); $queue2 = new NullQueue('aAnotherQueueName'); @@ -929,8 +929,8 @@ public function testShouldCallEachQueueOneByOne() ->expects($this->at(1)) ->method('onBeforeReceive') ->with($this->isInstanceOf(Context::class)) - ->willReturnCallback(function (Context $context) use ($messageProcessorMock, $queue1) { - $this->assertSame($messageProcessorMock, $context->getMessageProcessor()); + ->willReturnCallback(function (Context $context) use ($processorMock, $queue1) { + $this->assertSame($processorMock, $context->getPsrProcessor()); $this->assertSame($queue1, $context->getPsrQueue()); }) ; @@ -938,16 +938,16 @@ public function testShouldCallEachQueueOneByOne() ->expects($this->at(4)) ->method('onBeforeReceive') ->with($this->isInstanceOf(Context::class)) - ->willReturnCallback(function (Context $context) use ($anotherMessageProcessorMock, $queue2) { - $this->assertSame($anotherMessageProcessorMock, $context->getMessageProcessor()); + ->willReturnCallback(function (Context $context) use ($anotherProcessorMock, $queue2) { + $this->assertSame($anotherProcessorMock, $context->getPsrProcessor()); $this->assertSame($queue2, $context->getPsrQueue()); }) ; $queueConsumer = new QueueConsumer($contextStub, new BreakCycleExtension(2), 0); $queueConsumer - ->bind($queue1, $messageProcessorMock) - ->bind($queue2, $anotherMessageProcessorMock) + ->bind($queue1, $processorMock) + ->bind($queue2, $anotherProcessorMock) ; $queueConsumer->consume(new ChainExtension([$extension])); @@ -993,26 +993,26 @@ protected function createPsrContextStub($messageConsumer = null) } /** - * @return \PHPUnit_Framework_MockObject_MockObject|MessageProcessorInterface + * @return \PHPUnit_Framework_MockObject_MockObject|Processor */ - protected function createMessageProcessorMock() + protected function createProcessorMock() { - return $this->createMock(MessageProcessorInterface::class); + return $this->createMock(Processor::class); } /** - * @return \PHPUnit_Framework_MockObject_MockObject|MessageProcessorInterface + * @return \PHPUnit_Framework_MockObject_MockObject|Processor */ - protected function createMessageProcessorStub() + protected function createProcessorStub() { - $messageProcessorMock = $this->createMessageProcessorMock(); - $messageProcessorMock + $processorMock = $this->createProcessorMock(); + $processorMock ->expects($this->any()) ->method('process') ->willReturn(Result::ACK) ; - return $messageProcessorMock; + return $processorMock; } /** diff --git a/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php b/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php index 1dd606b98..ef176d5a5 100644 --- a/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php +++ b/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php @@ -2,12 +2,12 @@ namespace Enqueue\Tests\Functional\Client; use Enqueue\AmqpExt\AmqpContext; -use Enqueue\Psr\Message; use Enqueue\Client\SimpleClient; use Enqueue\Consumption\ChainExtension; use Enqueue\Consumption\Extension\LimitConsumedMessagesExtension; use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension; use Enqueue\Consumption\Result; +use Enqueue\Psr\Message; use Enqueue\Test\RabbitmqAmqpExtension; use Enqueue\Test\RabbitmqManagmentExtensionTrait; diff --git a/pkg/enqueue/Tests/Router/RouteRecipientListProcessorTest.php b/pkg/enqueue/Tests/Router/RouteRecipientListProcessorTest.php index d88a0f929..386440fb3 100644 --- a/pkg/enqueue/Tests/Router/RouteRecipientListProcessorTest.php +++ b/pkg/enqueue/Tests/Router/RouteRecipientListProcessorTest.php @@ -1,10 +1,10 @@ assertClassImplements(MessageProcessorInterface::class, RouteRecipientListProcessor::class); + $this->assertClassImplements(Processor::class, RouteRecipientListProcessor::class); } public function testCouldBeConstructedWithRouterAsFirstArgument() diff --git a/pkg/enqueue/Tests/Symfony/Client/ConsumeMessagesCommandTest.php b/pkg/enqueue/Tests/Symfony/Client/ConsumeMessagesCommandTest.php index 164f69ee2..cb16e5d9f 100644 --- a/pkg/enqueue/Tests/Symfony/Client/ConsumeMessagesCommandTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/ConsumeMessagesCommandTest.php @@ -1,13 +1,13 @@ createQueueConsumerMock(), - $this->createDelegateMessageProcessorMock(), + $this->createDelegateProcessorMock(), $this->createQueueMetaRegistry([]), $this->createDriverMock() ); @@ -28,7 +28,7 @@ public function testShouldHaveCommandName() { $command = new ConsumeMessagesCommand( $this->createQueueConsumerMock(), - $this->createDelegateMessageProcessorMock(), + $this->createDelegateProcessorMock(), $this->createQueueMetaRegistry([]), $this->createDriverMock() ); @@ -40,7 +40,7 @@ public function testShouldHaveCommandAliases() { $command = new ConsumeMessagesCommand( $this->createQueueConsumerMock(), - $this->createDelegateMessageProcessorMock(), + $this->createDelegateProcessorMock(), $this->createQueueMetaRegistry([]), $this->createDriverMock() ); @@ -52,7 +52,7 @@ public function testShouldHaveExpectedOptions() { $command = new ConsumeMessagesCommand( $this->createQueueConsumerMock(), - $this->createDelegateMessageProcessorMock(), + $this->createDelegateProcessorMock(), $this->createQueueMetaRegistry([]), $this->createDriverMock() ); @@ -70,7 +70,7 @@ public function testShouldHaveExpectedAttributes() { $command = new ConsumeMessagesCommand( $this->createQueueConsumerMock(), - $this->createDelegateMessageProcessorMock(), + $this->createDelegateProcessorMock(), $this->createQueueMetaRegistry([]), $this->createDriverMock() ); @@ -85,7 +85,7 @@ public function testShouldExecuteConsumptionAndUseDefaultQueueName() { $queue = new NullQueue(''); - $processor = $this->createDelegateMessageProcessorMock(); + $processor = $this->createDelegateProcessorMock(); $context = $this->createPsrContextMock(); $context @@ -132,7 +132,7 @@ public function testShouldExecuteConsumptionAndUseCustomClientDestinationName() { $queue = new NullQueue(''); - $processor = $this->createDelegateMessageProcessorMock(); + $processor = $this->createDelegateProcessorMock(); $context = $this->createPsrContextMock(); $context @@ -205,11 +205,11 @@ private function createPsrContextMock() } /** - * @return \PHPUnit_Framework_MockObject_MockObject|DelegateMessageProcessor + * @return \PHPUnit_Framework_MockObject_MockObject|DelegateProcessor */ - private function createDelegateMessageProcessorMock() + private function createDelegateProcessorMock() { - return $this->createMock(DelegateMessageProcessor::class); + return $this->createMock(DelegateProcessor::class); } /** diff --git a/pkg/enqueue/Tests/Symfony/Client/ContainerAwareMessageProcessorRegistryTest.php b/pkg/enqueue/Tests/Symfony/Client/ContainerAwareProcessorRegistryTest.php similarity index 52% rename from pkg/enqueue/Tests/Symfony/Client/ContainerAwareMessageProcessorRegistryTest.php rename to pkg/enqueue/Tests/Symfony/Client/ContainerAwareProcessorRegistryTest.php index 80a3c503d..3604a89b3 100644 --- a/pkg/enqueue/Tests/Symfony/Client/ContainerAwareMessageProcessorRegistryTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/ContainerAwareProcessorRegistryTest.php @@ -1,34 +1,34 @@ assertClassImplements(MessageProcessorRegistryInterface::class, ContainerAwareMessageProcessorRegistry::class); + $this->assertClassImplements(ProcessorRegistryInterface::class, ContainerAwareProcessorRegistry::class); } public function testCouldBeConstructedWithoutAnyArgument() { - new ContainerAwareMessageProcessorRegistry(); + new ContainerAwareProcessorRegistry(); } public function testShouldThrowExceptionIfProcessorIsNotSet() { $this->setExpectedException( \LogicException::class, - 'MessageProcessor was not found. processorName: "processor-name"' + 'Processor was not found. processorName: "processor-name"' ); - $registry = new ContainerAwareMessageProcessorRegistry(); + $registry = new ContainerAwareProcessorRegistry(); $registry->get('processor-name'); } @@ -36,13 +36,13 @@ public function testShouldThrowExceptionIfContainerIsNotSet() { $this->setExpectedException(\LogicException::class, 'Container was not set'); - $registry = new ContainerAwareMessageProcessorRegistry(); + $registry = new ContainerAwareProcessorRegistry(); $registry->set('processor-name', 'processor-id'); $registry->get('processor-name'); } - public function testShouldThrowExceptionIfInstanceOfMessageProcessorIsInvalid() + public function testShouldThrowExceptionIfInstanceOfProcessorIsInvalid() { $this->setExpectedException(\LogicException::class, 'Container was not set'); @@ -51,32 +51,32 @@ public function testShouldThrowExceptionIfInstanceOfMessageProcessorIsInvalid() $container = new Container(); $container->set('processor-id', $processor); - $registry = new ContainerAwareMessageProcessorRegistry(); + $registry = new ContainerAwareProcessorRegistry(); $registry->set('processor-name', 'processor-id'); $registry->get('processor-name'); } - public function testShouldReturnInstanceOfMessageProcessor() + public function testShouldReturnInstanceOfProcessor() { $this->setExpectedException(\LogicException::class, 'Container was not set'); - $processor = $this->createMessageProcessorMock(); + $processor = $this->createProcessorMock(); $container = new Container(); $container->set('processor-id', $processor); - $registry = new ContainerAwareMessageProcessorRegistry(); + $registry = new ContainerAwareProcessorRegistry(); $registry->set('processor-name', 'processor-id'); $this->assertSame($processor, $registry->get('processor-name')); } /** - * @return \PHPUnit_Framework_MockObject_MockObject|MessageProcessorInterface + * @return \PHPUnit_Framework_MockObject_MockObject|Processor */ - protected function createMessageProcessorMock() + protected function createProcessorMock() { - return $this->createMock(MessageProcessorInterface::class); + return $this->createMock(Processor::class); } } diff --git a/pkg/enqueue/Tests/Symfony/Consumption/ContainerAwareConsumeMessagesCommandTest.php b/pkg/enqueue/Tests/Symfony/Consumption/ContainerAwareConsumeMessagesCommandTest.php index 566e030e6..7ff98f81c 100644 --- a/pkg/enqueue/Tests/Symfony/Consumption/ContainerAwareConsumeMessagesCommandTest.php +++ b/pkg/enqueue/Tests/Symfony/Consumption/ContainerAwareConsumeMessagesCommandTest.php @@ -1,11 +1,11 @@ setExpectedException(\LogicException::class, 'Invalid message processor service given.'. - ' It must be an instance of Enqueue\Consumption\MessageProcessorInterface but stdClass'); + ' It must be an instance of Enqueue\Psr\Processor but stdClass'); $container = new Container(); $container->set('processor-service', new \stdClass()); @@ -67,7 +67,7 @@ public function testShouldThrowExceptionIfProcessorInstanceHasWrongClass() public function testShouldExecuteConsumption() { - $processor = $this->createMessageProcessor(); + $processor = $this->createProcessor(); $queue = $this->createQueueMock(); @@ -129,11 +129,11 @@ protected function createQueueMock() } /** - * @return \PHPUnit_Framework_MockObject_MockObject|MessageProcessorInterface + * @return \PHPUnit_Framework_MockObject_MockObject|Processor */ - protected function createMessageProcessor() + protected function createProcessor() { - return $this->createMock(MessageProcessorInterface::class); + return $this->createMock(Processor::class); } /** diff --git a/pkg/job-queue/CalculateRootJobStatusProcessor.php b/pkg/job-queue/CalculateRootJobStatusProcessor.php index a970a4399..19dd991d6 100644 --- a/pkg/job-queue/CalculateRootJobStatusProcessor.php +++ b/pkg/job-queue/CalculateRootJobStatusProcessor.php @@ -1,16 +1,16 @@ logger->critical(sprintf( - '[DependentJobMessageProcessor] Got invalid message. body: "%s"', + '[DependentJobProcessor] Got invalid message. body: "%s"', $message->getBody() )); @@ -59,7 +59,7 @@ public function process(PsrMessage $message, Context $context) $job = $this->jobStorage->findJobById($data['jobId']); if (!$job) { $this->logger->critical(sprintf( - '[DependentJobMessageProcessor] Job was not found. id: "%s"', + '[DependentJobProcessor] Job was not found. id: "%s"', $data['jobId'] )); @@ -68,7 +68,7 @@ public function process(PsrMessage $message, Context $context) if (!$job->isRoot()) { $this->logger->critical(sprintf( - '[DependentJobMessageProcessor] Expected root job but got child. id: "%s"', + '[DependentJobProcessor] Expected root job but got child. id: "%s"', $data['jobId'] )); @@ -86,7 +86,7 @@ public function process(PsrMessage $message, Context $context) foreach ($dependentJobs as $dependentJob) { if (!isset($dependentJob['topic']) || !isset($dependentJob['message'])) { $this->logger->critical(sprintf( - '[DependentJobMessageProcessor] Got invalid dependent job data. job: "%s", dependentJob: "%s"', + '[DependentJobProcessor] Got invalid dependent job data. job: "%s", dependentJob: "%s"', $job->getId(), JSON::encode($dependentJob) )); diff --git a/pkg/job-queue/Tests/CalculateRootJobStatusProcessorTest.php b/pkg/job-queue/Tests/CalculateRootJobStatusProcessorTest.php index ae7f7f163..4c38ed141 100644 --- a/pkg/job-queue/Tests/CalculateRootJobStatusProcessorTest.php +++ b/pkg/job-queue/Tests/CalculateRootJobStatusProcessorTest.php @@ -1,14 +1,14 @@ assertEquals( [Topics::ROOT_JOB_STOPPED], - DependentJobMessageProcessor::getSubscribedTopics() + DependentJobProcessor::getSubscribedTopics() ); } @@ -32,13 +32,13 @@ public function testShouldLogCriticalAndRejectMessageIfJobIdIsNotSet() $logger ->expects($this->once()) ->method('critical') - ->with('[DependentJobMessageProcessor] Got invalid message. body: "{"key":"value"}"') + ->with('[DependentJobProcessor] Got invalid message. body: "{"key":"value"}"') ; $message = new NullMessage(); $message->setBody(json_encode(['key' => 'value'])); - $processor = new DependentJobMessageProcessor($jobStorage, $producer, $logger); + $processor = new DependentJobProcessor($jobStorage, $producer, $logger); $result = $processor->process($message, $this->createContextMock()); @@ -60,13 +60,13 @@ public function testShouldLogCriticalAndRejectMessageIfJobEntityWasNotFound() $logger ->expects($this->once()) ->method('critical') - ->with('[DependentJobMessageProcessor] Job was not found. id: "12345"') + ->with('[DependentJobProcessor] Job was not found. id: "12345"') ; $message = new NullMessage(); $message->setBody(json_encode(['jobId' => 12345])); - $processor = new DependentJobMessageProcessor($jobStorage, $producer, $logger); + $processor = new DependentJobProcessor($jobStorage, $producer, $logger); $result = $processor->process($message, $this->createContextMock()); @@ -92,13 +92,13 @@ public function testShouldLogCriticalAndRejectMessageIfJobIsNotRoot() $logger ->expects($this->once()) ->method('critical') - ->with('[DependentJobMessageProcessor] Expected root job but got child. id: "12345"') + ->with('[DependentJobProcessor] Expected root job but got child. id: "12345"') ; $message = new NullMessage(); $message->setBody(json_encode(['jobId' => 12345])); - $processor = new DependentJobMessageProcessor($jobStorage, $producer, $logger); + $processor = new DependentJobProcessor($jobStorage, $producer, $logger); $result = $processor->process($message, $this->createContextMock()); @@ -128,7 +128,7 @@ public function testShouldDoNothingIfDependentJobsAreMissing() $message = new NullMessage(); $message->setBody(json_encode(['jobId' => 12345])); - $processor = new DependentJobMessageProcessor($jobStorage, $producer, $logger); + $processor = new DependentJobProcessor($jobStorage, $producer, $logger); $result = $processor->process($message, $this->createContextMock()); @@ -163,13 +163,13 @@ public function testShouldLogCriticalAndRejectMessageIfDependentJobTopicIsMissin $logger ->expects($this->once()) ->method('critical') - ->with('[DependentJobMessageProcessor] Got invalid dependent job data. job: "123", dependentJob: "[]"') + ->with('[DependentJobProcessor] Got invalid dependent job data. job: "123", dependentJob: "[]"') ; $message = new NullMessage(); $message->setBody(json_encode(['jobId' => 12345])); - $processor = new DependentJobMessageProcessor($jobStorage, $producer, $logger); + $processor = new DependentJobProcessor($jobStorage, $producer, $logger); $result = $processor->process($message, $this->createContextMock()); @@ -206,14 +206,14 @@ public function testShouldLogCriticalAndRejectMessageIfDependentJobMessageIsMiss $logger ->expects($this->once()) ->method('critical') - ->with('[DependentJobMessageProcessor] Got invalid dependent job data. '. + ->with('[DependentJobProcessor] Got invalid dependent job data. '. 'job: "123", dependentJob: "{"topic":"topic-name"}"') ; $message = new NullMessage(); $message->setBody(json_encode(['jobId' => 12345])); - $processor = new DependentJobMessageProcessor($jobStorage, $producer, $logger); + $processor = new DependentJobProcessor($jobStorage, $producer, $logger); $result = $processor->process($message, $this->createContextMock()); @@ -257,7 +257,7 @@ public function testShouldPublishDependentMessage() $message = new NullMessage(); $message->setBody(json_encode(['jobId' => 12345])); - $processor = new DependentJobMessageProcessor($jobStorage, $producer, $logger); + $processor = new DependentJobProcessor($jobStorage, $producer, $logger); $result = $processor->process($message, $this->createContextMock()); @@ -305,7 +305,7 @@ public function testShouldPublishDependentMessageWithPriority() $message = new NullMessage(); $message->setBody(json_encode(['jobId' => 12345])); - $processor = new DependentJobMessageProcessor($jobStorage, $producer, $logger); + $processor = new DependentJobProcessor($jobStorage, $producer, $logger); $result = $processor->process($message, $this->createContextMock()); diff --git a/pkg/psr-queue/DeliveryMode.php b/pkg/psr-queue/DeliveryMode.php index 99507b99d..492080a4a 100644 --- a/pkg/psr-queue/DeliveryMode.php +++ b/pkg/psr-queue/DeliveryMode.php @@ -1,7 +1,7 @@ expectException(InvalidDestinationException::class); $this->expectExceptionMessage( - 'The destination must be an instance of Enqueue\Psr\Tests\Exception\DestinationBar'. - ' but got Enqueue\Psr\Tests\Exception\DestinationFoo.' + 'The destination must be an instance of Enqueue\Psr\Tests\DestinationBar'. + ' but got Enqueue\Psr\Tests\DestinationFoo.' ); InvalidDestinationException::assertDestinationInstanceOf(new DestinationFoo(), DestinationBar::class); diff --git a/pkg/psr-queue/Tests/Exception/InvalidMessageExceptionTest.php b/pkg/psr-queue/Tests/InvalidMessageExceptionTest.php similarity index 92% rename from pkg/psr-queue/Tests/Exception/InvalidMessageExceptionTest.php rename to pkg/psr-queue/Tests/InvalidMessageExceptionTest.php index 882967ebb..9e2815638 100644 --- a/pkg/psr-queue/Tests/Exception/InvalidMessageExceptionTest.php +++ b/pkg/psr-queue/Tests/InvalidMessageExceptionTest.php @@ -1,5 +1,5 @@ bind($queue, $processor); $queueConsumer->consume(); @@ -78,10 +78,10 @@ public function testConsumeOneMessageAndSendReplyExit() $replyMessage = $this->stompContext->createMessage(__METHOD__.'.reply'); - $processor = new StubMessageProcessor(); + $processor = new StubProcessor(); $processor->result = Result::reply($replyMessage); - $replyProcessor = new StubMessageProcessor(); + $replyProcessor = new StubProcessor(); $queueConsumer->bind($queue, $processor); $queueConsumer->bind($replyQueue, $replyProcessor); @@ -95,7 +95,7 @@ public function testConsumeOneMessageAndSendReplyExit() } } -class StubMessageProcessor implements MessageProcessorInterface +class StubProcessor implements Processor { public $result = Result::ACK;