Skip to content

[Consumption] Add QueueConsumerInterface, make QueueConsumer final. #504

New issue

Have a question about this project? No Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “No Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? No Sign in to your account

Merged
merged 3 commits into from
Aug 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/monolog/send-messages-to-mq.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ require_once __DIR__.'/vendor/autoload.php';
$context = (new \Enqueue\Fs\FsConnectionFactory('file://'.__DIR__.'/queue'))->createContext();

$consumer = new QueueConsumer($context);
$consumer->bind('log', function(PsrMessage $message) {
$consumer->bindCallback('log', function(PsrMessage $message) {
echo $message->getBody().PHP_EOL;

return PsrProcessor::ACK;
Expand Down
8 changes: 4 additions & 4 deletions docs/quick_tour.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ use Enqueue\Consumption\QueueConsumer;

$queueConsumer = new QueueConsumer($psrContext);

$queueConsumer->bind('foo_queue', function(PsrMessage $message) {
$queueConsumer->bindCallback('foo_queue', function(PsrMessage $message) {
// process message

return PsrProcessor::ACK;
});
$queueConsumer->bind('bar_queue', function(PsrMessage $message) {
$queueConsumer->bindCallback('bar_queue', function(PsrMessage $message) {
// process message

return PsrProcessor::ACK;
Expand Down Expand Up @@ -145,7 +145,7 @@ $queueConsumer = new QueueConsumer($psrContext, new ChainExtension([
new ReplyExtension()
]));

$queueConsumer->bind('foo', function(PsrMessage $message, PsrContext $context) {
$queueConsumer->bindCallback('foo', function(PsrMessage $message, PsrContext $context) {
$replyMessage = $context->createMessage('Hello');

return Result::reply($replyMessage);
Expand Down Expand Up @@ -255,7 +255,7 @@ use Enqueue\Symfony\Consumption\ConsumeMessagesCommand;

/** @var QueueConsumer $queueConsumer */

$queueConsumer->bind('a_queue', function(PsrMessage $message) {
$queueConsumer->bindCallback('a_queue', function(PsrMessage $message) {
// process message
});

Expand Down
58 changes: 27 additions & 31 deletions pkg/enqueue/Consumption/QueueConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;

class QueueConsumer
final class QueueConsumer implements QueueConsumerInterface
{
/**
* @var PsrContext
Expand Down Expand Up @@ -71,8 +71,8 @@ class QueueConsumer
public function __construct(
PsrContext $psrContext,
ExtensionInterface $extension = null,
$idleTimeout = 0,
$receiveTimeout = 10000
float $idleTimeout = 0.,
float $receiveTimeout = 10000.
) {
$this->psrContext = $psrContext;
$this->staticExtension = $extension ?: new ChainExtension([]);
Expand All @@ -85,62 +85,55 @@ public function __construct(
}

/**
* @param int $timeout
* {@inheritdoc}
*/
public function setIdleTimeout($timeout)
public function setIdleTimeout(float $timeout): void
{
$this->idleTimeout = (int) $timeout;
$this->idleTimeout = $timeout;
}

/**
* @return int
* {@inheritdoc}
*/
public function getIdleTimeout()
public function getIdleTimeout(): float
{
return $this->idleTimeout;
}

/**
* @param int $timeout
* {@inheritdoc}
*/
public function setReceiveTimeout($timeout)
public function setReceiveTimeout(float $timeout): void
{
$this->receiveTimeout = (int) $timeout;
$this->receiveTimeout = $timeout;
}

/**
* @return int
* {@inheritdoc}
*/
public function getReceiveTimeout()
public function getReceiveTimeout(): float
{
return $this->receiveTimeout;
}

/**
* @return PsrContext
* {@inheritdoc}
*/
public function getPsrContext()
public function getPsrContext(): PsrContext
{
return $this->psrContext;
}

/**
* @param PsrQueue|string $queue
* @param PsrProcessor|callable $processor
*
* @return QueueConsumer
* {@inheritdoc}
*/
public function bind($queue, $processor)
public function bind($queue, PsrProcessor $processor): QueueConsumerInterface
{
if (is_string($queue)) {
$queue = $this->psrContext->createQueue($queue);
}
if (is_callable($processor)) {
$processor = new CallbackProcessor($processor);
}

InvalidArgumentException::assertInstanceOf($queue, PsrQueue::class);
InvalidArgumentException::assertInstanceOf($processor, PsrProcessor::class);

if (empty($queue->getQueueName())) {
throw new LogicException('The queue name must be not empty.');
Expand All @@ -155,14 +148,17 @@ public function bind($queue, $processor)
}

/**
* Runtime extension - is an extension or a collection of extensions which could be set on runtime.
* Here's a good example: @see LimitsExtensionsCommandTrait.
*
* @param ExtensionInterface|ChainExtension|null $runtimeExtension
*
* @throws \Exception
* {@inheritdoc}
*/
public function bindCallback($queue, callable $processor): QueueConsumerInterface
{
return $this->bind($queue, new CallbackProcessor($processor));
}

/**
* {@inheritdoc}
*/
public function consume(ExtensionInterface $runtimeExtension = null)
public function consume(ExtensionInterface $runtimeExtension = null): void
{
if (empty($this->boundProcessors)) {
throw new \LogicException('There is nothing to consume. It is required to bind something before calling consume method.');
Expand Down
47 changes: 47 additions & 0 deletions pkg/enqueue/Consumption/QueueConsumerInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php

namespace Enqueue\Consumption;

use Interop\Queue\PsrContext;
use Interop\Queue\PsrProcessor;
use Interop\Queue\PsrQueue;

interface QueueConsumerInterface
{
/**
* Milliseconds.
*/
public function setIdleTimeout(float $timeout): void;

public function getIdleTimeout(): float;

/**
* Milliseconds.
*/
public function setReceiveTimeout(float $timeout): void;

public function getReceiveTimeout(): float;

public function getPsrContext(): PsrContext;

/**
* @param string|PsrQueue $queueName
*/
public function bind($queueName, PsrProcessor $processor): self;

/**
* @param string|PsrQueue $queueName
* @param mixed $queue
*/
public function bindCallback($queue, callable $processor): self;

/**
* Runtime extension - is an extension or a collection of extensions which could be set on runtime.
* Here's a good example: @see LimitsExtensionsCommandTrait.
*
* @param ExtensionInterface|ChainExtension|null $runtimeExtension
*
* @throws \Exception
*/
public function consume(ExtensionInterface $runtimeExtension = null): void;
}
14 changes: 7 additions & 7 deletions pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use Enqueue\Client\Meta\QueueMetaRegistry;
use Enqueue\Consumption\ChainExtension;
use Enqueue\Consumption\Extension\LoggerExtension;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Consumption\QueueConsumerInterface;
use Enqueue\Symfony\Consumption\LimitsExtensionsCommandTrait;
use Enqueue\Symfony\Consumption\QueueConsumerOptionsCommandTrait;
use Symfony\Component\Console\Command\Command;
Expand All @@ -25,7 +25,7 @@ class ConsumeMessagesCommand extends Command
use QueueConsumerOptionsCommandTrait;

/**
* @var QueueConsumer
* @var QueueConsumerInterface
*/
private $consumer;

Expand All @@ -45,13 +45,13 @@ class ConsumeMessagesCommand extends Command
private $driver;

/**
* @param QueueConsumer $consumer
* @param DelegateProcessor $processor
* @param QueueMetaRegistry $queueMetaRegistry
* @param DriverInterface $driver
* @param QueueConsumerInterface $consumer
* @param DelegateProcessor $processor
* @param QueueMetaRegistry $queueMetaRegistry
* @param DriverInterface $driver
*/
public function __construct(
QueueConsumer $consumer,
QueueConsumerInterface $consumer,
DelegateProcessor $processor,
QueueMetaRegistry $queueMetaRegistry,
DriverInterface $driver
Expand Down
8 changes: 4 additions & 4 deletions pkg/enqueue/Symfony/Consumption/ConsumeMessagesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use Enqueue\Consumption\ChainExtension;
use Enqueue\Consumption\Extension\LoggerExtension;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Consumption\QueueConsumerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Logger\ConsoleLogger;
Expand All @@ -19,14 +19,14 @@ class ConsumeMessagesCommand extends Command implements ContainerAwareInterface
use QueueConsumerOptionsCommandTrait;

/**
* @var QueueConsumer
* @var QueueConsumerInterface
*/
protected $consumer;

/**
* @param QueueConsumer $consumer
* @param QueueConsumerInterface $consumer
*/
public function __construct(QueueConsumer $consumer)
public function __construct(QueueConsumerInterface $consumer)
{
parent::__construct(null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use Enqueue\Consumption\ChainExtension;
use Enqueue\Consumption\Extension\LoggerExtension;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Consumption\QueueConsumerInterface;
use Enqueue\Consumption\QueueSubscriberInterface;
use Interop\Queue\PsrProcessor;
use Symfony\Component\Console\Command\Command;
Expand All @@ -23,16 +23,16 @@ class ContainerAwareConsumeMessagesCommand extends Command implements ContainerA
use QueueConsumerOptionsCommandTrait;

/**
* @var QueueConsumer
* @var QueueConsumerInterface
*/
protected $consumer;

/**
* ConsumeMessagesCommand constructor.
*
* @param QueueConsumer $consumer
* @param QueueConsumerInterface $consumer
*/
public function __construct(QueueConsumer $consumer)
public function __construct(QueueConsumerInterface $consumer)
{
parent::__construct(null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Enqueue\Symfony\Consumption;

use Enqueue\Consumption\QueueConsumer;
use Enqueue\Consumption\QueueConsumerInterface;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;

Expand All @@ -20,17 +20,17 @@ protected function configureQueueConsumerOptions()
}

/**
* @param QueueConsumer $consumer
* @param InputInterface $input
* @param QueueConsumerInterface $consumer
* @param InputInterface $input
*/
protected function setQueueConsumerOptions(QueueConsumer $consumer, InputInterface $input)
protected function setQueueConsumerOptions(QueueConsumerInterface $consumer, InputInterface $input)
{
if (null !== $idleTimeout = $input->getOption('idle-timeout')) {
$consumer->setIdleTimeout((int) $idleTimeout);
$consumer->setIdleTimeout((float) $idleTimeout);
}

if (null !== $receiveTimeout = $input->getOption('receive-timeout')) {
$consumer->setReceiveTimeout((int) $receiveTimeout);
$consumer->setReceiveTimeout((float) $receiveTimeout);
}
}
}
19 changes: 5 additions & 14 deletions pkg/enqueue/Tests/Consumption/QueueConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -101,31 +101,22 @@ public function testThrowIfQueueNeitherInstanceOfQueueNorString()
$consumer->bind(new \stdClass(), $processorMock);
}

public function testThrowIfProcessorNeitherInstanceOfProcessorNorCallable()
{
$consumer = new QueueConsumer($this->createPsrContextStub(), null, 0);

$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('The argument must be an instance of Interop\Queue\PsrProcessor but got stdClass.');
$consumer->bind(new NullQueue(''), new \stdClass());
}

public function testCouldSetGetIdleTimeout()
{
$consumer = new QueueConsumer($this->createPsrContextStub(), null, 0);

$consumer->setIdleTimeout(123456);
$consumer->setIdleTimeout(123456.1);

$this->assertSame(123456, $consumer->getIdleTimeout());
$this->assertSame(123456.1, $consumer->getIdleTimeout());
}

public function testCouldSetGetReceiveTimeout()
{
$consumer = new QueueConsumer($this->createPsrContextStub(), null, 0);

$consumer->setReceiveTimeout(123456);
$consumer->setReceiveTimeout(123456.1);

$this->assertSame(123456, $consumer->getReceiveTimeout());
$this->assertSame(123456.1, $consumer->getReceiveTimeout());
}

public function testShouldAllowBindCallbackToQueueName()
Expand All @@ -146,7 +137,7 @@ public function testShouldAllowBindCallbackToQueueName()

$consumer = new QueueConsumer($context, null, 0);

$consumer->bind($queueName, $callback);
$consumer->bindCallback($queueName, $callback);

$boundProcessors = $this->readAttribute($consumer, 'boundProcessors');

Expand Down
Loading