diff --git a/pkg/enqueue/Client/Message.php b/pkg/enqueue/Client/Message.php index b2117e16c..37b219c51 100644 --- a/pkg/enqueue/Client/Message.php +++ b/pkg/enqueue/Client/Message.php @@ -4,6 +4,16 @@ class Message { + /** + * @const string + */ + const SCOPE_MESSAGE_BUS = 'enqueue.scope.message_bus'; + + /** + * @const string + */ + const SCOPE_APP = 'enqueue.scope.app'; + /** * @var string|null */ @@ -57,6 +67,7 @@ public function __construct() { $this->headers = []; $this->properties = []; + $this->scope = static::SCOPE_MESSAGE_BUS; } /** @@ -177,6 +188,22 @@ public function setDelay($delay) $this->delay = $delay; } + /** + * @param string $scope + */ + public function setScope($scope) + { + $this->scope = $scope; + } + + /** + * @return string + */ + public function getScope() + { + return $this->scope; + } + /** * @return array */ diff --git a/pkg/enqueue/Client/MessageProducer.php b/pkg/enqueue/Client/MessageProducer.php index 8b0e0c11c..88c8b0052 100644 --- a/pkg/enqueue/Client/MessageProducer.php +++ b/pkg/enqueue/Client/MessageProducer.php @@ -47,7 +47,27 @@ public function send($topic, $message) $message->setPriority(MessagePriority::NORMAL); } - $this->driver->sendToRouter($message); + if (Message::SCOPE_MESSAGE_BUS == $message->getScope()) { + if ($message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) { + throw new \LogicException(sprintf('The %s property must not be set for messages that are sent to message bus.', Config::PARAMETER_PROCESSOR_QUEUE_NAME)); + } + if ($message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) { + throw new \LogicException(sprintf('The %s property must not be set for messages that are sent to message bus.', Config::PARAMETER_PROCESSOR_NAME)); + } + + $this->driver->sendToRouter($message); + } elseif (Message::SCOPE_APP == $message->getScope()) { + if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) { + $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $this->driver->getConfig()->getRouterProcessorName()); + } + if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) { + $message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->driver->getConfig()->getRouterQueueName()); + } + + $this->driver->sendToProcessor($message); + } else { + throw new \LogicException(sprintf('The message scope "%s" is not supported.', $message->getScope())); + } } /** diff --git a/pkg/enqueue/Tests/Client/MessageProducerTest.php b/pkg/enqueue/Tests/Client/MessageProducerTest.php index 7915fb6b5..375f539e0 100644 --- a/pkg/enqueue/Tests/Client/MessageProducerTest.php +++ b/pkg/enqueue/Tests/Client/MessageProducerTest.php @@ -2,6 +2,7 @@ namespace Enqueue\Tests\Client; +use Enqueue\Client\Config; use Enqueue\Client\DriverInterface; use Enqueue\Client\Message; use Enqueue\Client\MessagePriority; @@ -294,6 +295,10 @@ public function testShouldThrowExceptionIfBodyIsObjectOnSend() ->expects($this->never()) ->method('sendToRouter') ; + $driver + ->expects($this->never()) + ->method('sendToProcessor') + ; $producer = new MessageProducer($driver); @@ -312,6 +317,10 @@ public function testShouldThrowExceptionIfBodyIsArrayWithObjectsInsideOnSend() ->expects($this->never()) ->method('sendToRouter') ; + $driver + ->expects($this->never()) + ->method('sendToProcessor') + ; $producer = new MessageProducer($driver); @@ -330,6 +339,10 @@ public function testShouldThrowExceptionIfBodyIsArrayWithObjectsInSubArraysInsid ->expects($this->never()) ->method('sendToRouter') ; + $driver + ->expects($this->never()) + ->method('sendToProcessor') + ; $producer = new MessageProducer($driver); @@ -339,7 +352,7 @@ public function testShouldThrowExceptionIfBodyIsArrayWithObjectsInSubArraysInsid $producer->send($queue, ['foo' => ['bar' => new \stdClass()]]); } - public function testShouldSendJsonSerializableObjectAsJsonString() + public function testShouldSendJsonSerializableObjectAsJsonStringToMessageBus() { $object = new JsonSerializableObject(); @@ -357,7 +370,7 @@ public function testShouldSendJsonSerializableObjectAsJsonString() $producer->send('topic', $object); } - public function testShouldSendMessageJsonSerializableBodyAsJsonString() + public function testShouldSendMessageJsonSerializableBodyAsJsonStringToMessageBus() { $object = new JsonSerializableObject(); @@ -378,6 +391,56 @@ public function testShouldSendMessageJsonSerializableBodyAsJsonString() $producer->send('topic', $message); } + public function testThrowIfTryToSendMessageToMessageBusWithProcessorNamePropertySet() + { + $object = new JsonSerializableObject(); + + $message = new Message(); + $message->setBody($object); + $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'aProcessor'); + + $driver = $this->createDriverStub(); + $driver + ->expects($this->never()) + ->method('sendToRouter') + ; + $driver + ->expects($this->never()) + ->method('sendToProcessor') + ; + + $producer = new MessageProducer($driver); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The enqueue.processor_name property must not be set for messages that are sent to message bus.'); + $producer->send('topic', $message); + } + + public function testThrowIfTryToSendMessageToMessageBusWithProcessorQueueNamePropertySet() + { + $object = new JsonSerializableObject(); + + $message = new Message(); + $message->setBody($object); + $message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, 'aProcessorQueue'); + + $driver = $this->createDriverStub(); + $driver + ->expects($this->never()) + ->method('sendToRouter') + ; + $driver + ->expects($this->never()) + ->method('sendToProcessor') + ; + + $producer = new MessageProducer($driver); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The enqueue.processor_queue_name property must not be set for messages that are sent to message bus.'); + $producer->send('topic', $message); + } + public function testThrowIfNotApplicationJsonContentTypeSetWithJsonSerializableBody() { $object = new JsonSerializableObject(); @@ -391,6 +454,10 @@ public function testThrowIfNotApplicationJsonContentTypeSetWithJsonSerializableB ->expects($this->never()) ->method('sendToRouter') ; + $driver + ->expects($this->never()) + ->method('sendToProcessor') + ; $this->expectException(\LogicException::class); $this->expectExceptionMessage('Content type "application/json" only allowed when body is array'); @@ -399,12 +466,102 @@ public function testThrowIfNotApplicationJsonContentTypeSetWithJsonSerializableB $producer->send('topic', $message); } + public function testShouldSendMessageToApplicationRouter() + { + $message = new Message(); + $message->setBody('aBody'); + $message->setScope(Message::SCOPE_APP); + + $driver = $this->createDriverStub(); + $driver + ->expects($this->never()) + ->method('sendToRouter') + ; + $driver + ->expects($this->once()) + ->method('sendToProcessor') + ->willReturnCallback(function (Message $message) { + self::assertSame('aBody', $message->getBody()); + self::assertSame('a_router_processor_name', $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)); + self::assertSame('a_router_queue', $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)); + }) + ; + + $producer = new MessageProducer($driver); + $producer->send('topic', $message); + } + + public function testShouldSendToCustomMessageToApplicationRouter() + { + $message = new Message(); + $message->setBody('aBody'); + $message->setScope(Message::SCOPE_APP); + $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'aCustomProcessor'); + $message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, 'aCustomProcessorQueue'); + + $driver = $this->createDriverStub(); + $driver + ->expects($this->never()) + ->method('sendToRouter') + ; + $driver + ->expects($this->once()) + ->method('sendToProcessor') + ->willReturnCallback(function (Message $message) { + self::assertSame('aBody', $message->getBody()); + self::assertSame('aCustomProcessor', $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)); + self::assertSame('aCustomProcessorQueue', $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)); + }) + ; + + $producer = new MessageProducer($driver); + $producer->send('topic', $message); + } + + public function testThrowIfUnSupportedScopeGivenOnSend() + { + $message = new Message(); + $message->setScope('iDontKnowScope'); + + $driver = $this->createDriverStub(); + $driver + ->expects($this->never()) + ->method('sendToRouter') + ; + $driver + ->expects($this->never()) + ->method('sendToProcessor') + ; + + $producer = new MessageProducer($driver); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The message scope "iDontKnowScope" is not supported.'); + $producer->send('topic', $message); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|DriverInterface */ protected function createDriverStub() { - return $this->createMock(DriverInterface::class); + $config = new Config( + 'a_prefix', + 'an_app', + 'a_router_topic', + 'a_router_queue', + 'a_default_processor_queue', + 'a_router_processor_name' + ); + + $driverMock = $this->createMock(DriverInterface::class); + $driverMock + ->expects($this->any()) + ->method('getConfig') + ->willReturn($config) + ; + + return $driverMock; } } diff --git a/pkg/enqueue/Tests/Client/MessageTest.php b/pkg/enqueue/Tests/Client/MessageTest.php index 73425c786..d35f00450 100644 --- a/pkg/enqueue/Tests/Client/MessageTest.php +++ b/pkg/enqueue/Tests/Client/MessageTest.php @@ -38,6 +38,15 @@ public function testShouldAllowGetPreviouslySetDelay() self::assertSame('theDelay', $message->getDelay()); } + public function testShouldAllowGetPreviouslySetScope() + { + $message = new Message(); + + $message->setScope('theScope'); + + self::assertSame('theScope', $message->getScope()); + } + public function testShouldAllowGetPreviouslySetExpire() { $message = new Message(); @@ -81,6 +90,13 @@ public function testShouldSetEmptyArrayAsDefaultHeadersInConstructor() self::assertSame([], $message->getHeaders()); } + public function testShouldSetMessageBusScopeInConstructor() + { + $message = new Message(); + + self::assertSame(Message::SCOPE_MESSAGE_BUS, $message->getScope()); + } + public function testShouldAllowGetPreviouslySetHeaders() { $message = new Message();