diff --git a/docs/transport/stomp.md b/docs/transport/stomp.md index c7b460dc5..d5cc79868 100644 --- a/docs/transport/stomp.md +++ b/docs/transport/stomp.md @@ -36,6 +36,12 @@ $factory = new StompConnectionFactory('stomp:'); // same as above $factory = new StompConnectionFactory([]); +// connect via stomp to RabbitMQ (default) - the topic names are prefixed with /exchange +$factory = new StompConnectionFactory('stomp+rabbitmq:'); + +// connect via stomp to ActiveMQ - the topic names are prefixed with /topic +$factory = new StompConnectionFactory('stomp+activemq:'); + // connect to stomp broker at example.com port 1000 using $factory = new StompConnectionFactory([ 'host' => 'example.com', diff --git a/pkg/stomp/StompConnectionFactory.php b/pkg/stomp/StompConnectionFactory.php index c88716308..764145ca2 100644 --- a/pkg/stomp/StompConnectionFactory.php +++ b/pkg/stomp/StompConnectionFactory.php @@ -11,6 +11,9 @@ class StompConnectionFactory implements ConnectionFactory { + const SCHEME_EXT_ACTIVEMQ = 'activemq'; + const SCHEME_EXT_RABBITMQ = 'rabbitmq'; + /** * @var array */ @@ -66,13 +69,15 @@ public function __construct($config = 'stomp:') */ public function createContext(): Context { + $useExchangePrefix = self::SCHEME_EXT_RABBITMQ === $this->config['target'] ? true : false; + if ($this->config['lazy']) { return new StompContext(function () { return $this->establishConnection(); - }); + }, $useExchangePrefix); } - return new StompContext($this->establishConnection()); + return new StompContext($this->establishConnection(), $useExchangePrefix); } private function establishConnection(): BufferedStompClient @@ -103,7 +108,16 @@ private function parseDsn(string $dsn): array throw new \LogicException(sprintf('The given DSN is not supported. Must start with "stomp:".')); } + $schemeExtension = current($dsn->getSchemeExtensions()); + if (false === $schemeExtension) { + $schemeExtension = self::SCHEME_EXT_RABBITMQ; + } + if (self::SCHEME_EXT_ACTIVEMQ !== $schemeExtension && self::SCHEME_EXT_RABBITMQ !== $schemeExtension) { + throw new \LogicException(sprintf('The given DSN is not supported. The scheme extension "%s" provided is invalid. It must be one of "%s" or "%s".', $schemeExtension, self::SCHEME_EXT_ACTIVEMQ, self::SCHEME_EXT_RABBITMQ)); + } + return array_filter(array_replace($dsn->getQuery(), [ + 'target' => $schemeExtension, 'host' => $dsn->getHost(), 'port' => $dsn->getPort(), 'login' => $dsn->getUser(), @@ -120,6 +134,7 @@ private function parseDsn(string $dsn): array private function defaultConfig(): array { return [ + 'target' => self::SCHEME_EXT_RABBITMQ, 'host' => 'localhost', 'port' => 61613, 'login' => 'guest', diff --git a/pkg/stomp/StompContext.php b/pkg/stomp/StompContext.php index e03b11cb6..2824e0b27 100644 --- a/pkg/stomp/StompContext.php +++ b/pkg/stomp/StompContext.php @@ -23,6 +23,11 @@ class StompContext implements Context */ private $stomp; + /** + * @var bool + */ + private $useExchangePrefix; + /** * @var callable */ @@ -30,8 +35,9 @@ class StompContext implements Context /** * @param BufferedStompClient|callable $stomp + * @param bool $useExchangePrefix */ - public function __construct($stomp) + public function __construct($stomp, $useExchangePrefix = true) { if ($stomp instanceof BufferedStompClient) { $this->stomp = $stomp; @@ -40,6 +46,8 @@ public function __construct($stomp) } else { throw new \InvalidArgumentException('The stomp argument must be either BufferedStompClient or callable that return BufferedStompClient.'); } + + $this->useExchangePrefix = $useExchangePrefix; } /** @@ -84,7 +92,7 @@ public function createTopic(string $name): Topic { if (0 !== strpos($name, '/')) { $destination = new StompDestination(); - $destination->setType(StompDestination::TYPE_EXCHANGE); + $destination->setType($this->useExchangePrefix ? StompDestination::TYPE_EXCHANGE : StompDestination::TYPE_TOPIC); $destination->setStompName($name); return $destination; diff --git a/pkg/stomp/Tests/StompConnectionFactoryConfigTest.php b/pkg/stomp/Tests/StompConnectionFactoryConfigTest.php index e8705a010..af8e71bd7 100644 --- a/pkg/stomp/Tests/StompConnectionFactoryConfigTest.php +++ b/pkg/stomp/Tests/StompConnectionFactoryConfigTest.php @@ -55,6 +55,7 @@ public static function provideConfigs() yield [ null, [ + 'target' => 'rabbitmq', 'host' => 'localhost', 'port' => 61613, 'login' => 'guest', @@ -71,6 +72,7 @@ public static function provideConfigs() yield [ 'stomp:', [ + 'target' => 'rabbitmq', 'host' => 'localhost', 'port' => 61613, 'login' => 'guest', @@ -87,6 +89,7 @@ public static function provideConfigs() yield [ [], [ + 'target' => 'rabbitmq', 'host' => 'localhost', 'port' => 61613, 'login' => 'guest', @@ -103,6 +106,43 @@ public static function provideConfigs() yield [ 'stomp://localhost:1234?foo=bar&lazy=0&sync=true', [ + 'target' => 'rabbitmq', + 'host' => 'localhost', + 'port' => 1234, + 'login' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + 'buffer_size' => 1000, + 'connection_timeout' => 1, + 'sync' => true, + 'lazy' => false, + 'foo' => 'bar', + 'ssl_on' => false, + ], + ]; + + yield [ + 'stomp+activemq://localhost:1234?foo=bar&lazy=0&sync=true', + [ + 'target' => 'activemq', + 'host' => 'localhost', + 'port' => 1234, + 'login' => 'guest', + 'password' => 'guest', + 'vhost' => '/', + 'buffer_size' => 1000, + 'connection_timeout' => 1, + 'sync' => true, + 'lazy' => false, + 'foo' => 'bar', + 'ssl_on' => false, + ], + ]; + + yield [ + 'stomp+rabbitmq://localhost:1234?foo=bar&lazy=0&sync=true', + [ + 'target' => 'rabbitmq', 'host' => 'localhost', 'port' => 1234, 'login' => 'guest', @@ -120,6 +160,7 @@ public static function provideConfigs() yield [ ['dsn' => 'stomp://localhost:1234/theVhost?foo=bar&lazy=0&sync=true', 'baz' => 'bazVal', 'foo' => 'fooVal'], [ + 'target' => 'rabbitmq', 'host' => 'localhost', 'port' => 1234, 'login' => 'guest', @@ -138,6 +179,7 @@ public static function provideConfigs() yield [ ['dsn' => 'stomp:///%2f'], [ + 'target' => 'rabbitmq', 'host' => 'localhost', 'port' => 61613, 'login' => 'guest', @@ -154,6 +196,7 @@ public static function provideConfigs() yield [ ['host' => 'localhost', 'port' => 1234, 'foo' => 'bar'], [ + 'target' => 'rabbitmq', 'host' => 'localhost', 'port' => 1234, 'login' => 'guest', diff --git a/pkg/stomp/Tests/StompConnectionFactoryTest.php b/pkg/stomp/Tests/StompConnectionFactoryTest.php index 18a13463f..91986e6d0 100644 --- a/pkg/stomp/Tests/StompConnectionFactoryTest.php +++ b/pkg/stomp/Tests/StompConnectionFactoryTest.php @@ -25,6 +25,31 @@ public function testShouldCreateLazyContext() $this->assertInstanceOf(StompContext::class, $context); $this->assertAttributeEquals(null, 'stomp', $context); + $this->assertAttributeEquals(true, 'useExchangePrefix', $context); $this->assertInternalType('callable', $this->readAttribute($context, 'stompFactory')); } + + public function testShouldCreateRabbitMQContext() + { + $factory = new StompConnectionFactory('stomp+rabbitmq://'); + + $context = $factory->createContext(); + + $this->assertInstanceOf(StompContext::class, $context); + + $this->assertAttributeEquals(null, 'stomp', $context); + $this->assertAttributeEquals(true, 'useExchangePrefix', $context); + } + + public function testShouldCreateActiveMQContext() + { + $factory = new StompConnectionFactory('stomp+activemq://'); + + $context = $factory->createContext(); + + $this->assertInstanceOf(StompContext::class, $context); + + $this->assertAttributeEquals(null, 'stomp', $context); + $this->assertAttributeEquals(false, 'useExchangePrefix', $context); + } } diff --git a/pkg/stomp/Tests/StompContextTest.php b/pkg/stomp/Tests/StompContextTest.php index 49297d3e1..69b577e85 100644 --- a/pkg/stomp/Tests/StompContextTest.php +++ b/pkg/stomp/Tests/StompContextTest.php @@ -79,7 +79,7 @@ public function testCreateQueueShouldCreateDestinationIfNameIsFullDestinationStr $this->assertEquals('/amq/queue/name/routing-key', $destination->getQueueName()); } - public function testShouldCreateTopicInstance() + public function testShouldCreateTopicInstanceWithExchangePrefix() { $context = new StompContext($this->createStompClientMock()); @@ -91,6 +91,18 @@ public function testShouldCreateTopicInstance() $this->assertSame(StompDestination::TYPE_EXCHANGE, $topic->getType()); } + public function testShouldCreateTopicInstanceWithTopicPrefix() + { + $context = new StompContext($this->createStompClientMock(), false); + + $topic = $context->createTopic('the name'); + + $this->assertInstanceOf(StompDestination::class, $topic); + $this->assertSame('/topic/the name', $topic->getQueueName()); + $this->assertSame('/topic/the name', $topic->getTopicName()); + $this->assertSame(StompDestination::TYPE_TOPIC, $topic->getType()); + } + public function testCreateTopicShouldCreateDestinationIfNameIsFullDestinationString() { $context = new StompContext($this->createStompClientMock());