Skip to content

Commit a8211a3

Browse files
authored
Merge pull request #504 from php-enqueue/queue-consumer-interface
[Consumption] Add QueueConsumerInterface, make QueueConsumer final.
2 parents 6a99722 + f03ca6d commit a8211a3

16 files changed

+129
-98
lines changed

docs/monolog/send-messages-to-mq.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ require_once __DIR__.'/vendor/autoload.php';
4747
$context = (new \Enqueue\Fs\FsConnectionFactory('file://'.__DIR__.'/queue'))->createContext();
4848

4949
$consumer = new QueueConsumer($context);
50-
$consumer->bind('log', function(PsrMessage $message) {
50+
$consumer->bindCallback('log', function(PsrMessage $message) {
5151
echo $message->getBody().PHP_EOL;
5252

5353
return PsrProcessor::ACK;

docs/quick_tour.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,12 @@ use Enqueue\Consumption\QueueConsumer;
7373

7474
$queueConsumer = new QueueConsumer($psrContext);
7575

76-
$queueConsumer->bind('foo_queue', function(PsrMessage $message) {
76+
$queueConsumer->bindCallback('foo_queue', function(PsrMessage $message) {
7777
// process message
7878

7979
return PsrProcessor::ACK;
8080
});
81-
$queueConsumer->bind('bar_queue', function(PsrMessage $message) {
81+
$queueConsumer->bindCallback('bar_queue', function(PsrMessage $message) {
8282
// process message
8383

8484
return PsrProcessor::ACK;
@@ -145,7 +145,7 @@ $queueConsumer = new QueueConsumer($psrContext, new ChainExtension([
145145
new ReplyExtension()
146146
]));
147147

148-
$queueConsumer->bind('foo', function(PsrMessage $message, PsrContext $context) {
148+
$queueConsumer->bindCallback('foo', function(PsrMessage $message, PsrContext $context) {
149149
$replyMessage = $context->createMessage('Hello');
150150

151151
return Result::reply($replyMessage);
@@ -255,7 +255,7 @@ use Enqueue\Symfony\Consumption\ConsumeMessagesCommand;
255255

256256
/** @var QueueConsumer $queueConsumer */
257257

258-
$queueConsumer->bind('a_queue', function(PsrMessage $message) {
258+
$queueConsumer->bindCallback('a_queue', function(PsrMessage $message) {
259259
// process message
260260
});
261261

pkg/enqueue/Consumption/QueueConsumer.php

+27-31
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
use Psr\Log\LoggerInterface;
1717
use Psr\Log\NullLogger;
1818

19-
class QueueConsumer
19+
final class QueueConsumer implements QueueConsumerInterface
2020
{
2121
/**
2222
* @var PsrContext
@@ -71,8 +71,8 @@ class QueueConsumer
7171
public function __construct(
7272
PsrContext $psrContext,
7373
ExtensionInterface $extension = null,
74-
$idleTimeout = 0,
75-
$receiveTimeout = 10000
74+
float $idleTimeout = 0.,
75+
float $receiveTimeout = 10000.
7676
) {
7777
$this->psrContext = $psrContext;
7878
$this->staticExtension = $extension ?: new ChainExtension([]);
@@ -85,62 +85,55 @@ public function __construct(
8585
}
8686

8787
/**
88-
* @param int $timeout
88+
* {@inheritdoc}
8989
*/
90-
public function setIdleTimeout($timeout)
90+
public function setIdleTimeout(float $timeout): void
9191
{
92-
$this->idleTimeout = (int) $timeout;
92+
$this->idleTimeout = $timeout;
9393
}
9494

9595
/**
96-
* @return int
96+
* {@inheritdoc}
9797
*/
98-
public function getIdleTimeout()
98+
public function getIdleTimeout(): float
9999
{
100100
return $this->idleTimeout;
101101
}
102102

103103
/**
104-
* @param int $timeout
104+
* {@inheritdoc}
105105
*/
106-
public function setReceiveTimeout($timeout)
106+
public function setReceiveTimeout(float $timeout): void
107107
{
108-
$this->receiveTimeout = (int) $timeout;
108+
$this->receiveTimeout = $timeout;
109109
}
110110

111111
/**
112-
* @return int
112+
* {@inheritdoc}
113113
*/
114-
public function getReceiveTimeout()
114+
public function getReceiveTimeout(): float
115115
{
116116
return $this->receiveTimeout;
117117
}
118118

119119
/**
120-
* @return PsrContext
120+
* {@inheritdoc}
121121
*/
122-
public function getPsrContext()
122+
public function getPsrContext(): PsrContext
123123
{
124124
return $this->psrContext;
125125
}
126126

127127
/**
128-
* @param PsrQueue|string $queue
129-
* @param PsrProcessor|callable $processor
130-
*
131-
* @return QueueConsumer
128+
* {@inheritdoc}
132129
*/
133-
public function bind($queue, $processor)
130+
public function bind($queue, PsrProcessor $processor): QueueConsumerInterface
134131
{
135132
if (is_string($queue)) {
136133
$queue = $this->psrContext->createQueue($queue);
137134
}
138-
if (is_callable($processor)) {
139-
$processor = new CallbackProcessor($processor);
140-
}
141135

142136
InvalidArgumentException::assertInstanceOf($queue, PsrQueue::class);
143-
InvalidArgumentException::assertInstanceOf($processor, PsrProcessor::class);
144137

145138
if (empty($queue->getQueueName())) {
146139
throw new LogicException('The queue name must be not empty.');
@@ -155,14 +148,17 @@ public function bind($queue, $processor)
155148
}
156149

157150
/**
158-
* Runtime extension - is an extension or a collection of extensions which could be set on runtime.
159-
* Here's a good example: @see LimitsExtensionsCommandTrait.
160-
*
161-
* @param ExtensionInterface|ChainExtension|null $runtimeExtension
162-
*
163-
* @throws \Exception
151+
* {@inheritdoc}
152+
*/
153+
public function bindCallback($queue, callable $processor): QueueConsumerInterface
154+
{
155+
return $this->bind($queue, new CallbackProcessor($processor));
156+
}
157+
158+
/**
159+
* {@inheritdoc}
164160
*/
165-
public function consume(ExtensionInterface $runtimeExtension = null)
161+
public function consume(ExtensionInterface $runtimeExtension = null): void
166162
{
167163
if (empty($this->boundProcessors)) {
168164
throw new \LogicException('There is nothing to consume. It is required to bind something before calling consume method.');
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
<?php
2+
3+
namespace Enqueue\Consumption;
4+
5+
use Interop\Queue\PsrContext;
6+
use Interop\Queue\PsrProcessor;
7+
use Interop\Queue\PsrQueue;
8+
9+
interface QueueConsumerInterface
10+
{
11+
/**
12+
* Milliseconds.
13+
*/
14+
public function setIdleTimeout(float $timeout): void;
15+
16+
public function getIdleTimeout(): float;
17+
18+
/**
19+
* Milliseconds.
20+
*/
21+
public function setReceiveTimeout(float $timeout): void;
22+
23+
public function getReceiveTimeout(): float;
24+
25+
public function getPsrContext(): PsrContext;
26+
27+
/**
28+
* @param string|PsrQueue $queueName
29+
*/
30+
public function bind($queueName, PsrProcessor $processor): self;
31+
32+
/**
33+
* @param string|PsrQueue $queueName
34+
* @param mixed $queue
35+
*/
36+
public function bindCallback($queue, callable $processor): self;
37+
38+
/**
39+
* Runtime extension - is an extension or a collection of extensions which could be set on runtime.
40+
* Here's a good example: @see LimitsExtensionsCommandTrait.
41+
*
42+
* @param ExtensionInterface|ChainExtension|null $runtimeExtension
43+
*
44+
* @throws \Exception
45+
*/
46+
public function consume(ExtensionInterface $runtimeExtension = null): void;
47+
}

pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php

+7-7
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
use Enqueue\Client\Meta\QueueMetaRegistry;
99
use Enqueue\Consumption\ChainExtension;
1010
use Enqueue\Consumption\Extension\LoggerExtension;
11-
use Enqueue\Consumption\QueueConsumer;
11+
use Enqueue\Consumption\QueueConsumerInterface;
1212
use Enqueue\Symfony\Consumption\LimitsExtensionsCommandTrait;
1313
use Enqueue\Symfony\Consumption\QueueConsumerOptionsCommandTrait;
1414
use Symfony\Component\Console\Command\Command;
@@ -25,7 +25,7 @@ class ConsumeMessagesCommand extends Command
2525
use QueueConsumerOptionsCommandTrait;
2626

2727
/**
28-
* @var QueueConsumer
28+
* @var QueueConsumerInterface
2929
*/
3030
private $consumer;
3131

@@ -45,13 +45,13 @@ class ConsumeMessagesCommand extends Command
4545
private $driver;
4646

4747
/**
48-
* @param QueueConsumer $consumer
49-
* @param DelegateProcessor $processor
50-
* @param QueueMetaRegistry $queueMetaRegistry
51-
* @param DriverInterface $driver
48+
* @param QueueConsumerInterface $consumer
49+
* @param DelegateProcessor $processor
50+
* @param QueueMetaRegistry $queueMetaRegistry
51+
* @param DriverInterface $driver
5252
*/
5353
public function __construct(
54-
QueueConsumer $consumer,
54+
QueueConsumerInterface $consumer,
5555
DelegateProcessor $processor,
5656
QueueMetaRegistry $queueMetaRegistry,
5757
DriverInterface $driver

pkg/enqueue/Symfony/Consumption/ConsumeMessagesCommand.php

+4-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
use Enqueue\Consumption\ChainExtension;
66
use Enqueue\Consumption\Extension\LoggerExtension;
7-
use Enqueue\Consumption\QueueConsumer;
7+
use Enqueue\Consumption\QueueConsumerInterface;
88
use Symfony\Component\Console\Command\Command;
99
use Symfony\Component\Console\Input\InputInterface;
1010
use Symfony\Component\Console\Logger\ConsoleLogger;
@@ -19,14 +19,14 @@ class ConsumeMessagesCommand extends Command implements ContainerAwareInterface
1919
use QueueConsumerOptionsCommandTrait;
2020

2121
/**
22-
* @var QueueConsumer
22+
* @var QueueConsumerInterface
2323
*/
2424
protected $consumer;
2525

2626
/**
27-
* @param QueueConsumer $consumer
27+
* @param QueueConsumerInterface $consumer
2828
*/
29-
public function __construct(QueueConsumer $consumer)
29+
public function __construct(QueueConsumerInterface $consumer)
3030
{
3131
parent::__construct(null);
3232

pkg/enqueue/Symfony/Consumption/ContainerAwareConsumeMessagesCommand.php

+4-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
use Enqueue\Consumption\ChainExtension;
66
use Enqueue\Consumption\Extension\LoggerExtension;
7-
use Enqueue\Consumption\QueueConsumer;
7+
use Enqueue\Consumption\QueueConsumerInterface;
88
use Enqueue\Consumption\QueueSubscriberInterface;
99
use Interop\Queue\PsrProcessor;
1010
use Symfony\Component\Console\Command\Command;
@@ -23,16 +23,16 @@ class ContainerAwareConsumeMessagesCommand extends Command implements ContainerA
2323
use QueueConsumerOptionsCommandTrait;
2424

2525
/**
26-
* @var QueueConsumer
26+
* @var QueueConsumerInterface
2727
*/
2828
protected $consumer;
2929

3030
/**
3131
* ConsumeMessagesCommand constructor.
3232
*
33-
* @param QueueConsumer $consumer
33+
* @param QueueConsumerInterface $consumer
3434
*/
35-
public function __construct(QueueConsumer $consumer)
35+
public function __construct(QueueConsumerInterface $consumer)
3636
{
3737
parent::__construct(null);
3838

pkg/enqueue/Symfony/Consumption/QueueConsumerOptionsCommandTrait.php

+6-6
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Enqueue\Symfony\Consumption;
44

5-
use Enqueue\Consumption\QueueConsumer;
5+
use Enqueue\Consumption\QueueConsumerInterface;
66
use Symfony\Component\Console\Input\InputInterface;
77
use Symfony\Component\Console\Input\InputOption;
88

@@ -20,17 +20,17 @@ protected function configureQueueConsumerOptions()
2020
}
2121

2222
/**
23-
* @param QueueConsumer $consumer
24-
* @param InputInterface $input
23+
* @param QueueConsumerInterface $consumer
24+
* @param InputInterface $input
2525
*/
26-
protected function setQueueConsumerOptions(QueueConsumer $consumer, InputInterface $input)
26+
protected function setQueueConsumerOptions(QueueConsumerInterface $consumer, InputInterface $input)
2727
{
2828
if (null !== $idleTimeout = $input->getOption('idle-timeout')) {
29-
$consumer->setIdleTimeout((int) $idleTimeout);
29+
$consumer->setIdleTimeout((float) $idleTimeout);
3030
}
3131

3232
if (null !== $receiveTimeout = $input->getOption('receive-timeout')) {
33-
$consumer->setReceiveTimeout((int) $receiveTimeout);
33+
$consumer->setReceiveTimeout((float) $receiveTimeout);
3434
}
3535
}
3636
}

pkg/enqueue/Tests/Consumption/QueueConsumerTest.php

+5-14
Original file line numberDiff line numberDiff line change
@@ -101,31 +101,22 @@ public function testThrowIfQueueNeitherInstanceOfQueueNorString()
101101
$consumer->bind(new \stdClass(), $processorMock);
102102
}
103103

104-
public function testThrowIfProcessorNeitherInstanceOfProcessorNorCallable()
105-
{
106-
$consumer = new QueueConsumer($this->createPsrContextStub(), null, 0);
107-
108-
$this->expectException(InvalidArgumentException::class);
109-
$this->expectExceptionMessage('The argument must be an instance of Interop\Queue\PsrProcessor but got stdClass.');
110-
$consumer->bind(new NullQueue(''), new \stdClass());
111-
}
112-
113104
public function testCouldSetGetIdleTimeout()
114105
{
115106
$consumer = new QueueConsumer($this->createPsrContextStub(), null, 0);
116107

117-
$consumer->setIdleTimeout(123456);
108+
$consumer->setIdleTimeout(123456.1);
118109

119-
$this->assertSame(123456, $consumer->getIdleTimeout());
110+
$this->assertSame(123456.1, $consumer->getIdleTimeout());
120111
}
121112

122113
public function testCouldSetGetReceiveTimeout()
123114
{
124115
$consumer = new QueueConsumer($this->createPsrContextStub(), null, 0);
125116

126-
$consumer->setReceiveTimeout(123456);
117+
$consumer->setReceiveTimeout(123456.1);
127118

128-
$this->assertSame(123456, $consumer->getReceiveTimeout());
119+
$this->assertSame(123456.1, $consumer->getReceiveTimeout());
129120
}
130121

131122
public function testShouldAllowBindCallbackToQueueName()
@@ -146,7 +137,7 @@ public function testShouldAllowBindCallbackToQueueName()
146137

147138
$consumer = new QueueConsumer($context, null, 0);
148139

149-
$consumer->bind($queueName, $callback);
140+
$consumer->bindCallback($queueName, $callback);
150141

151142
$boundProcessors = $this->readAttribute($consumer, 'boundProcessors');
152143

0 commit comments

Comments
 (0)