From 348eb1a3c88b3cbc11ba38ff0184dba6f3a9adf7 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 14 Aug 2018 15:36:24 +0300 Subject: [PATCH 1/3] [Consumption] Add QueueConsumerInterface, make QueueConsumer final. --- docs/monolog/send-messages-to-mq.md | 2 +- docs/quick_tour.md | 8 +-- pkg/enqueue/Consumption/QueueConsumer.php | 58 +++++++++---------- .../Symfony/Client/ConsumeMessagesCommand.php | 14 ++--- .../Consumption/ConsumeMessagesCommand.php | 8 +-- .../ContainerAwareConsumeMessagesCommand.php | 8 +-- .../QueueConsumerOptionsCommandTrait.php | 12 ++-- .../Tests/Consumption/QueueConsumerTest.php | 8 +-- .../Client/ConsumeMessagesCommandTest.php | 6 +- .../ConsumeMessagesCommandTest.php | 6 +- ...ntainerAwareConsumeMessagesCommandTest.php | 6 +- .../Mock/QueueConsumerOptionsCommand.php | 6 +- .../QueueConsumerOptionsCommandTraitTest.php | 14 ++--- pkg/enqueue/functions.php | 6 +- pkg/simple-client/SimpleClient.php | 7 +-- 15 files changed, 81 insertions(+), 88 deletions(-) diff --git a/docs/monolog/send-messages-to-mq.md b/docs/monolog/send-messages-to-mq.md index caead3723..591016711 100644 --- a/docs/monolog/send-messages-to-mq.md +++ b/docs/monolog/send-messages-to-mq.md @@ -47,7 +47,7 @@ require_once __DIR__.'/vendor/autoload.php'; $context = (new \Enqueue\Fs\FsConnectionFactory('file://'.__DIR__.'/queue'))->createContext(); $consumer = new QueueConsumer($context); -$consumer->bind('log', function(PsrMessage $message) { +$consumer->bindCallback('log', function(PsrMessage $message) { echo $message->getBody().PHP_EOL; return PsrProcessor::ACK; diff --git a/docs/quick_tour.md b/docs/quick_tour.md index 51fffaff6..1c293cefd 100644 --- a/docs/quick_tour.md +++ b/docs/quick_tour.md @@ -73,12 +73,12 @@ use Enqueue\Consumption\QueueConsumer; $queueConsumer = new QueueConsumer($psrContext); -$queueConsumer->bind('foo_queue', function(PsrMessage $message) { +$queueConsumer->bindCallback('foo_queue', function(PsrMessage $message) { // process message return PsrProcessor::ACK; }); -$queueConsumer->bind('bar_queue', function(PsrMessage $message) { +$queueConsumer->bindCallback('bar_queue', function(PsrMessage $message) { // process message return PsrProcessor::ACK; @@ -145,7 +145,7 @@ $queueConsumer = new QueueConsumer($psrContext, new ChainExtension([ new ReplyExtension() ])); -$queueConsumer->bind('foo', function(PsrMessage $message, PsrContext $context) { +$queueConsumer->bindCallback('foo', function(PsrMessage $message, PsrContext $context) { $replyMessage = $context->createMessage('Hello'); return Result::reply($replyMessage); @@ -255,7 +255,7 @@ use Enqueue\Symfony\Consumption\ConsumeMessagesCommand; /** @var QueueConsumer $queueConsumer */ -$queueConsumer->bind('a_queue', function(PsrMessage $message) { +$queueConsumer->bindCallback('a_queue', function(PsrMessage $message) { // process message }); diff --git a/pkg/enqueue/Consumption/QueueConsumer.php b/pkg/enqueue/Consumption/QueueConsumer.php index 0b82830b5..8c2162596 100644 --- a/pkg/enqueue/Consumption/QueueConsumer.php +++ b/pkg/enqueue/Consumption/QueueConsumer.php @@ -16,7 +16,7 @@ use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; -class QueueConsumer +final class QueueConsumer implements QueueConsumerInterface { /** * @var PsrContext @@ -71,8 +71,8 @@ class QueueConsumer public function __construct( PsrContext $psrContext, ExtensionInterface $extension = null, - $idleTimeout = 0, - $receiveTimeout = 10000 + float $idleTimeout = 0., + float $receiveTimeout = 10000. ) { $this->psrContext = $psrContext; $this->staticExtension = $extension ?: new ChainExtension([]); @@ -85,62 +85,55 @@ public function __construct( } /** - * @param int $timeout + * {@inheritdoc} */ - public function setIdleTimeout($timeout) + public function setIdleTimeout(float $timeout): void { - $this->idleTimeout = (int) $timeout; + $this->idleTimeout = $timeout; } /** - * @return int + * {@inheritdoc} */ - public function getIdleTimeout() + public function getIdleTimeout(): float { return $this->idleTimeout; } /** - * @param int $timeout + * {@inheritdoc} */ - public function setReceiveTimeout($timeout) + public function setReceiveTimeout(float $timeout): void { - $this->receiveTimeout = (int) $timeout; + $this->receiveTimeout = $timeout; } /** - * @return int + * {@inheritdoc} */ - public function getReceiveTimeout() + public function getReceiveTimeout(): float { return $this->receiveTimeout; } /** - * @return PsrContext + * {@inheritdoc} */ - public function getPsrContext() + public function getPsrContext(): PsrContext { return $this->psrContext; } /** - * @param PsrQueue|string $queue - * @param PsrProcessor|callable $processor - * - * @return QueueConsumer + * {@inheritdoc} */ - public function bind($queue, $processor) + public function bind($queue, PsrProcessor $processor): QueueConsumerInterface { if (is_string($queue)) { $queue = $this->psrContext->createQueue($queue); } - if (is_callable($processor)) { - $processor = new CallbackProcessor($processor); - } InvalidArgumentException::assertInstanceOf($queue, PsrQueue::class); - InvalidArgumentException::assertInstanceOf($processor, PsrProcessor::class); if (empty($queue->getQueueName())) { throw new LogicException('The queue name must be not empty.'); @@ -155,14 +148,17 @@ public function bind($queue, $processor) } /** - * Runtime extension - is an extension or a collection of extensions which could be set on runtime. - * Here's a good example: @see LimitsExtensionsCommandTrait. - * - * @param ExtensionInterface|ChainExtension|null $runtimeExtension - * - * @throws \Exception + * {@inheritdoc} + */ + public function bindCallback($queue, callable $processor): QueueConsumerInterface + { + return $this->bind($queue, new CallbackProcessor($processor)); + } + + /** + * {@inheritdoc} */ - public function consume(ExtensionInterface $runtimeExtension = null) + public function consume(ExtensionInterface $runtimeExtension = null): void { if (empty($this->boundProcessors)) { throw new \LogicException('There is nothing to consume. It is required to bind something before calling consume method.'); diff --git a/pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php b/pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php index a3817b3e5..b41bad6a9 100644 --- a/pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php +++ b/pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php @@ -8,7 +8,7 @@ use Enqueue\Client\Meta\QueueMetaRegistry; use Enqueue\Consumption\ChainExtension; use Enqueue\Consumption\Extension\LoggerExtension; -use Enqueue\Consumption\QueueConsumer; +use Enqueue\Consumption\QueueConsumerInterface; use Enqueue\Symfony\Consumption\LimitsExtensionsCommandTrait; use Enqueue\Symfony\Consumption\QueueConsumerOptionsCommandTrait; use Symfony\Component\Console\Command\Command; @@ -25,7 +25,7 @@ class ConsumeMessagesCommand extends Command use QueueConsumerOptionsCommandTrait; /** - * @var QueueConsumer + * @var QueueConsumerInterface */ private $consumer; @@ -45,13 +45,13 @@ class ConsumeMessagesCommand extends Command private $driver; /** - * @param QueueConsumer $consumer - * @param DelegateProcessor $processor - * @param QueueMetaRegistry $queueMetaRegistry - * @param DriverInterface $driver + * @param QueueConsumerInterface $consumer + * @param DelegateProcessor $processor + * @param QueueMetaRegistry $queueMetaRegistry + * @param DriverInterface $driver */ public function __construct( - QueueConsumer $consumer, + QueueConsumerInterface $consumer, DelegateProcessor $processor, QueueMetaRegistry $queueMetaRegistry, DriverInterface $driver diff --git a/pkg/enqueue/Symfony/Consumption/ConsumeMessagesCommand.php b/pkg/enqueue/Symfony/Consumption/ConsumeMessagesCommand.php index c2c7695f1..cbf39c8ee 100644 --- a/pkg/enqueue/Symfony/Consumption/ConsumeMessagesCommand.php +++ b/pkg/enqueue/Symfony/Consumption/ConsumeMessagesCommand.php @@ -4,7 +4,7 @@ use Enqueue\Consumption\ChainExtension; use Enqueue\Consumption\Extension\LoggerExtension; -use Enqueue\Consumption\QueueConsumer; +use Enqueue\Consumption\QueueConsumerInterface; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Logger\ConsoleLogger; @@ -19,14 +19,14 @@ class ConsumeMessagesCommand extends Command implements ContainerAwareInterface use QueueConsumerOptionsCommandTrait; /** - * @var QueueConsumer + * @var QueueConsumerInterface */ protected $consumer; /** - * @param QueueConsumer $consumer + * @param QueueConsumerInterface $consumer */ - public function __construct(QueueConsumer $consumer) + public function __construct(QueueConsumerInterface $consumer) { parent::__construct(null); diff --git a/pkg/enqueue/Symfony/Consumption/ContainerAwareConsumeMessagesCommand.php b/pkg/enqueue/Symfony/Consumption/ContainerAwareConsumeMessagesCommand.php index 297dfa58d..61cc41fc2 100644 --- a/pkg/enqueue/Symfony/Consumption/ContainerAwareConsumeMessagesCommand.php +++ b/pkg/enqueue/Symfony/Consumption/ContainerAwareConsumeMessagesCommand.php @@ -4,7 +4,7 @@ use Enqueue\Consumption\ChainExtension; use Enqueue\Consumption\Extension\LoggerExtension; -use Enqueue\Consumption\QueueConsumer; +use Enqueue\Consumption\QueueConsumerInterface; use Enqueue\Consumption\QueueSubscriberInterface; use Interop\Queue\PsrProcessor; use Symfony\Component\Console\Command\Command; @@ -23,16 +23,16 @@ class ContainerAwareConsumeMessagesCommand extends Command implements ContainerA use QueueConsumerOptionsCommandTrait; /** - * @var QueueConsumer + * @var QueueConsumerInterface */ protected $consumer; /** * ConsumeMessagesCommand constructor. * - * @param QueueConsumer $consumer + * @param QueueConsumerInterface $consumer */ - public function __construct(QueueConsumer $consumer) + public function __construct(QueueConsumerInterface $consumer) { parent::__construct(null); diff --git a/pkg/enqueue/Symfony/Consumption/QueueConsumerOptionsCommandTrait.php b/pkg/enqueue/Symfony/Consumption/QueueConsumerOptionsCommandTrait.php index c6ffd985f..00f75f0ea 100644 --- a/pkg/enqueue/Symfony/Consumption/QueueConsumerOptionsCommandTrait.php +++ b/pkg/enqueue/Symfony/Consumption/QueueConsumerOptionsCommandTrait.php @@ -2,7 +2,7 @@ namespace Enqueue\Symfony\Consumption; -use Enqueue\Consumption\QueueConsumer; +use Enqueue\Consumption\QueueConsumerInterface; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; @@ -20,17 +20,17 @@ protected function configureQueueConsumerOptions() } /** - * @param QueueConsumer $consumer - * @param InputInterface $input + * @param QueueConsumerInterface $consumer + * @param InputInterface $input */ - protected function setQueueConsumerOptions(QueueConsumer $consumer, InputInterface $input) + protected function setQueueConsumerOptions(QueueConsumerInterface $consumer, InputInterface $input) { if (null !== $idleTimeout = $input->getOption('idle-timeout')) { - $consumer->setIdleTimeout((int) $idleTimeout); + $consumer->setIdleTimeout((float) $idleTimeout); } if (null !== $receiveTimeout = $input->getOption('receive-timeout')) { - $consumer->setReceiveTimeout((int) $receiveTimeout); + $consumer->setReceiveTimeout((float) $receiveTimeout); } } } diff --git a/pkg/enqueue/Tests/Consumption/QueueConsumerTest.php b/pkg/enqueue/Tests/Consumption/QueueConsumerTest.php index 752da02ac..8c7b47f63 100644 --- a/pkg/enqueue/Tests/Consumption/QueueConsumerTest.php +++ b/pkg/enqueue/Tests/Consumption/QueueConsumerTest.php @@ -114,18 +114,18 @@ public function testCouldSetGetIdleTimeout() { $consumer = new QueueConsumer($this->createPsrContextStub(), null, 0); - $consumer->setIdleTimeout(123456); + $consumer->setIdleTimeout(123456.1); - $this->assertSame(123456, $consumer->getIdleTimeout()); + $this->assertSame(123456.1, $consumer->getIdleTimeout()); } public function testCouldSetGetReceiveTimeout() { $consumer = new QueueConsumer($this->createPsrContextStub(), null, 0); - $consumer->setReceiveTimeout(123456); + $consumer->setReceiveTimeout(123456.1); - $this->assertSame(123456, $consumer->getReceiveTimeout()); + $this->assertSame(123456.1, $consumer->getReceiveTimeout()); } public function testShouldAllowBindCallbackToQueueName() diff --git a/pkg/enqueue/Tests/Symfony/Client/ConsumeMessagesCommandTest.php b/pkg/enqueue/Tests/Symfony/Client/ConsumeMessagesCommandTest.php index 396417e2b..d2a08df69 100644 --- a/pkg/enqueue/Tests/Symfony/Client/ConsumeMessagesCommandTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/ConsumeMessagesCommandTest.php @@ -7,7 +7,7 @@ use Enqueue\Client\DriverInterface; use Enqueue\Client\Meta\QueueMetaRegistry; use Enqueue\Consumption\ChainExtension; -use Enqueue\Consumption\QueueConsumer; +use Enqueue\Consumption\QueueConsumerInterface; use Enqueue\Null\NullQueue; use Enqueue\Symfony\Client\ConsumeMessagesCommand; use Interop\Queue\PsrContext; @@ -266,11 +266,11 @@ private function createDelegateProcessorMock() } /** - * @return \PHPUnit_Framework_MockObject_MockObject|QueueConsumer + * @return \PHPUnit_Framework_MockObject_MockObject|QueueConsumerInterface */ private function createQueueConsumerMock() { - return $this->createMock(QueueConsumer::class); + return $this->createMock(QueueConsumerInterface::class); } /** diff --git a/pkg/enqueue/Tests/Symfony/Consumption/ConsumeMessagesCommandTest.php b/pkg/enqueue/Tests/Symfony/Consumption/ConsumeMessagesCommandTest.php index aad0a293e..a01fd8fee 100644 --- a/pkg/enqueue/Tests/Symfony/Consumption/ConsumeMessagesCommandTest.php +++ b/pkg/enqueue/Tests/Symfony/Consumption/ConsumeMessagesCommandTest.php @@ -3,7 +3,7 @@ namespace Enqueue\Tests\Symfony\Consumption; use Enqueue\Consumption\ChainExtension; -use Enqueue\Consumption\QueueConsumer; +use Enqueue\Consumption\QueueConsumerInterface; use Enqueue\Symfony\Consumption\ConsumeMessagesCommand; use Interop\Queue\PsrContext; use PHPUnit\Framework\TestCase; @@ -77,10 +77,10 @@ private function createContextMock() } /** - * @return \PHPUnit_Framework_MockObject_MockObject|QueueConsumer + * @return \PHPUnit_Framework_MockObject_MockObject|QueueConsumerInterface */ private function createQueueConsumerMock() { - return $this->createMock(QueueConsumer::class); + return $this->createMock(QueueConsumerInterface::class); } } diff --git a/pkg/enqueue/Tests/Symfony/Consumption/ContainerAwareConsumeMessagesCommandTest.php b/pkg/enqueue/Tests/Symfony/Consumption/ContainerAwareConsumeMessagesCommandTest.php index 78929e04a..6a587ab40 100644 --- a/pkg/enqueue/Tests/Symfony/Consumption/ContainerAwareConsumeMessagesCommandTest.php +++ b/pkg/enqueue/Tests/Symfony/Consumption/ContainerAwareConsumeMessagesCommandTest.php @@ -3,7 +3,7 @@ namespace Enqueue\Tests\Symfony\Consumption; use Enqueue\Consumption\ChainExtension; -use Enqueue\Consumption\QueueConsumer; +use Enqueue\Consumption\QueueConsumerInterface; use Enqueue\Symfony\Consumption\ContainerAwareConsumeMessagesCommand; use Enqueue\Tests\Symfony\Consumption\Mock\QueueSubscriberProcessor; use Interop\Queue\PsrContext; @@ -199,10 +199,10 @@ protected function createProcessor() } /** - * @return \PHPUnit_Framework_MockObject_MockObject|QueueConsumer + * @return \PHPUnit_Framework_MockObject_MockObject|QueueConsumerInterface */ protected function createQueueConsumerMock() { - return $this->createMock(QueueConsumer::class); + return $this->createMock(QueueConsumerInterface::class); } } diff --git a/pkg/enqueue/Tests/Symfony/Consumption/Mock/QueueConsumerOptionsCommand.php b/pkg/enqueue/Tests/Symfony/Consumption/Mock/QueueConsumerOptionsCommand.php index 88a0d8cf2..289a5e711 100644 --- a/pkg/enqueue/Tests/Symfony/Consumption/Mock/QueueConsumerOptionsCommand.php +++ b/pkg/enqueue/Tests/Symfony/Consumption/Mock/QueueConsumerOptionsCommand.php @@ -2,7 +2,7 @@ namespace Enqueue\Tests\Symfony\Consumption\Mock; -use Enqueue\Consumption\QueueConsumer; +use Enqueue\Consumption\QueueConsumerInterface; use Enqueue\Symfony\Consumption\QueueConsumerOptionsCommandTrait; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; @@ -13,11 +13,11 @@ class QueueConsumerOptionsCommand extends Command use QueueConsumerOptionsCommandTrait; /** - * @var QueueConsumer + * @var QueueConsumerInterface */ private $consumer; - public function __construct(QueueConsumer $consumer) + public function __construct(QueueConsumerInterface $consumer) { parent::__construct('queue-consumer-options'); diff --git a/pkg/enqueue/Tests/Symfony/Consumption/QueueConsumerOptionsCommandTraitTest.php b/pkg/enqueue/Tests/Symfony/Consumption/QueueConsumerOptionsCommandTraitTest.php index 57106aa24..f26324c1d 100644 --- a/pkg/enqueue/Tests/Symfony/Consumption/QueueConsumerOptionsCommandTraitTest.php +++ b/pkg/enqueue/Tests/Symfony/Consumption/QueueConsumerOptionsCommandTraitTest.php @@ -2,7 +2,7 @@ namespace Enqueue\Tests\Symfony\Consumption; -use Enqueue\Consumption\QueueConsumer; +use Enqueue\Consumption\QueueConsumerInterface; use Enqueue\Tests\Symfony\Consumption\Mock\QueueConsumerOptionsCommand; use PHPUnit\Framework\TestCase; use Symfony\Component\Console\Tester\CommandTester; @@ -26,28 +26,28 @@ public function testShouldSetQueueConsumerOptions() $consumer ->expects($this->once()) ->method('setIdleTimeout') - ->with($this->identicalTo(123)) + ->with($this->identicalTo(123.1)) ; $consumer ->expects($this->once()) ->method('setReceiveTimeout') - ->with($this->identicalTo(456)) + ->with($this->identicalTo(456.1)) ; $trait = new QueueConsumerOptionsCommand($consumer); $tester = new CommandTester($trait); $tester->execute([ - '--idle-timeout' => '123', - '--receive-timeout' => '456', + '--idle-timeout' => '123.1', + '--receive-timeout' => '456.1', ]); } /** - * @return QueueConsumer|\PHPUnit_Framework_MockObject_MockObject|QueueConsumer + * @return \PHPUnit_Framework_MockObject_MockObject|QueueConsumerInterface */ private function createQueueConsumer() { - return $this->createMock(QueueConsumer::class); + return $this->createMock(QueueConsumerInterface::class); } } diff --git a/pkg/enqueue/functions.php b/pkg/enqueue/functions.php index 537f9964a..2fce13487 100644 --- a/pkg/enqueue/functions.php +++ b/pkg/enqueue/functions.php @@ -169,13 +169,13 @@ function send_queue(PsrContext $c, $queue, $body) /** * @param PsrContext $c - * @param string $queue + * @param string $queueName * @param callable $callback */ -function consume(PsrContext $c, $queue, callable $callback) +function consume(PsrContext $c, string $queueName, callable $callback) { $queueConsumer = new QueueConsumer($c); - $queueConsumer->bind($queue, $callback); + $queueConsumer->bindCallback($queueName, $callback); $queueConsumer->consume(); } diff --git a/pkg/simple-client/SimpleClient.php b/pkg/simple-client/SimpleClient.php index 8114de2e2..916f0dbdb 100644 --- a/pkg/simple-client/SimpleClient.php +++ b/pkg/simple-client/SimpleClient.php @@ -15,7 +15,7 @@ use Enqueue\Client\RouterProcessor; use Enqueue\Consumption\CallbackProcessor; use Enqueue\Consumption\ExtensionInterface; -use Enqueue\Consumption\QueueConsumer; +use Enqueue\Consumption\QueueConsumerInterface; use Enqueue\Dbal\DbalConnectionFactory; use Enqueue\Dbal\Symfony\DbalTransportFactory; use Enqueue\Fs\FsConnectionFactory; @@ -194,10 +194,7 @@ public function getContext() return $this->container->get('enqueue.transport.context'); } - /** - * @return QueueConsumer - */ - public function getQueueConsumer() + public function getQueueConsumer(): QueueConsumerInterface { return $this->container->get('enqueue.client.queue_consumer'); } From f3cf4080d485203b41e6447feb856a1c1b1ba108 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 14 Aug 2018 15:51:23 +0300 Subject: [PATCH 2/3] add queue consumer interface --- .../Consumption/QueueConsumerInterface.php | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 pkg/enqueue/Consumption/QueueConsumerInterface.php diff --git a/pkg/enqueue/Consumption/QueueConsumerInterface.php b/pkg/enqueue/Consumption/QueueConsumerInterface.php new file mode 100644 index 000000000..0add8ab60 --- /dev/null +++ b/pkg/enqueue/Consumption/QueueConsumerInterface.php @@ -0,0 +1,47 @@ + Date: Tue, 14 Aug 2018 16:00:52 +0300 Subject: [PATCH 3/3] fix tests. --- pkg/enqueue/Tests/Consumption/QueueConsumerTest.php | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/pkg/enqueue/Tests/Consumption/QueueConsumerTest.php b/pkg/enqueue/Tests/Consumption/QueueConsumerTest.php index 8c7b47f63..b5282d718 100644 --- a/pkg/enqueue/Tests/Consumption/QueueConsumerTest.php +++ b/pkg/enqueue/Tests/Consumption/QueueConsumerTest.php @@ -101,15 +101,6 @@ public function testThrowIfQueueNeitherInstanceOfQueueNorString() $consumer->bind(new \stdClass(), $processorMock); } - public function testThrowIfProcessorNeitherInstanceOfProcessorNorCallable() - { - $consumer = new QueueConsumer($this->createPsrContextStub(), null, 0); - - $this->expectException(InvalidArgumentException::class); - $this->expectExceptionMessage('The argument must be an instance of Interop\Queue\PsrProcessor but got stdClass.'); - $consumer->bind(new NullQueue(''), new \stdClass()); - } - public function testCouldSetGetIdleTimeout() { $consumer = new QueueConsumer($this->createPsrContextStub(), null, 0); @@ -146,7 +137,7 @@ public function testShouldAllowBindCallbackToQueueName() $consumer = new QueueConsumer($context, null, 0); - $consumer->bind($queueName, $callback); + $consumer->bindCallback($queueName, $callback); $boundProcessors = $this->readAttribute($consumer, 'boundProcessors');