Skip to content

[amqp] Delay Strategy #152

New issue

Have a question about this project? No Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “No Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? No Sign in to your account

Merged
merged 14 commits into from
Aug 7, 2017
5 changes: 5 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"enqueue/amqp-ext": "*@dev",
"enqueue/amqp-lib": "*@dev",
"enqueue/amqp-bunny": "*@dev",
"enqueue/amqp-tools": "*@dev",
"php-amqplib/php-amqplib": "^2.7@dev",
"enqueue/redis": "*@dev",
"enqueue/fs": "*@dev",
Expand Down Expand Up @@ -84,6 +85,10 @@
"type": "path",
"url": "pkg/amqp-bunny"
},
{
"type": "path",
"url": "pkg/amqp-tools"
},
{
"type": "path",
"url": "pkg/redis"
Expand Down
16 changes: 13 additions & 3 deletions pkg/amqp-bunny/AmqpConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
namespace Enqueue\AmqpBunny;

use Bunny\Client;
use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Interop\Amqp\AmqpConnectionFactory as InteropAmqpConnectionFactory;

class AmqpConnectionFactory implements InteropAmqpConnectionFactory
class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrategyAware
{
use DelayStrategyAwareTrait;

/**
* @var array
*/
Expand Down Expand Up @@ -72,12 +76,18 @@ public function __construct($config = 'amqp://')
public function createContext()
{
if ($this->config['lazy']) {
return new AmqpContext(function () {
$context = new AmqpContext(function () {
return $this->establishConnection()->channel();
}, $this->config);
$context->setDelayStrategy($this->delayStrategy);

return $context;
}

return new AmqpContext($this->establishConnection()->channel(), $this->config);
$context = new AmqpContext($this->establishConnection()->channel(), $this->config);
$context->setDelayStrategy($this->delayStrategy);

return $context;
}

/**
Expand Down
11 changes: 9 additions & 2 deletions pkg/amqp-bunny/AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
namespace Enqueue\AmqpBunny;

use Bunny\Channel;
use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Interop\Amqp\AmqpBind as InteropAmqpBind;
use Interop\Amqp\AmqpContext as InteropAmqpContext;
use Interop\Amqp\AmqpMessage as InteropAmqpMessage;
Expand All @@ -17,8 +19,10 @@
use Interop\Queue\PsrDestination;
use Interop\Queue\PsrTopic;

class AmqpContext implements InteropAmqpContext
class AmqpContext implements InteropAmqpContext, DelayStrategyAware
{
use DelayStrategyAwareTrait;

/**
* @var Channel
*/
Expand Down Expand Up @@ -124,7 +128,10 @@ public function createConsumer(PsrDestination $destination)
*/
public function createProducer()
{
return new AmqpProducer($this->getBunnyChannel());
$producer = new AmqpProducer($this->getBunnyChannel(), $this);
$producer->setDelayStrategy($this->delayStrategy);

return $producer;
}

/**
Expand Down
37 changes: 27 additions & 10 deletions pkg/amqp-bunny/AmqpProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
namespace Enqueue\AmqpBunny;

use Bunny\Channel;
use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Interop\Amqp\AmqpMessage as InteropAmqpMessage;
use Interop\Amqp\AmqpProducer as InteropAmqpProducer;
use Interop\Amqp\AmqpQueue as InteropAmqpQueue;
Expand All @@ -14,8 +16,10 @@
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrTopic;

class AmqpProducer implements InteropAmqpProducer
class AmqpProducer implements InteropAmqpProducer, DelayStrategyAware
{
use DelayStrategyAwareTrait;

/**
* @var int|null
*/
Expand All @@ -32,11 +36,23 @@ class AmqpProducer implements InteropAmqpProducer
private $channel;

/**
* @param Channel $channel
* @var int
*/
private $deliveryDelay;

/**
* @var AmqpContext
*/
private $context;

/**
* @param Channel $channel
* @param AmqpContext $context
*/
public function __construct(Channel $channel)
public function __construct(Channel $channel, AmqpContext $context)
{
$this->channel = $channel;
$this->context = $context;
}

/**
Expand All @@ -47,8 +63,7 @@ public function send(PsrDestination $destination, PsrMessage $message)
{
$destination instanceof PsrTopic
? InvalidDestinationException::assertDestinationInstanceOf($destination, InteropAmqpTopic::class)
: InvalidDestinationException::assertDestinationInstanceOf($destination, InteropAmqpQueue::class)
;
: InvalidDestinationException::assertDestinationInstanceOf($destination, InteropAmqpQueue::class);

InvalidMessageException::assertMessageInstanceOf($message, InteropAmqpMessage::class);

Expand All @@ -66,7 +81,9 @@ public function send(PsrDestination $destination, PsrMessage $message)
$amqpProperties['application_headers'] = $appProperties;
}

if ($destination instanceof InteropAmqpTopic) {
if ($this->deliveryDelay) {
$this->delayStrategy->delayMessage($this->context, $destination, $message, $this->deliveryDelay);
} elseif ($destination instanceof InteropAmqpTopic) {
$this->channel->publish(
$message->getBody(),
$amqpProperties,
Expand All @@ -92,19 +109,19 @@ public function send(PsrDestination $destination, PsrMessage $message)
*/
public function setDeliveryDelay($deliveryDelay)
{
if (null === $deliveryDelay) {
return;
if (null === $this->delayStrategy) {
throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt();
}

throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt();
$this->deliveryDelay = $deliveryDelay;
}

/**
* {@inheritdoc}
*/
public function getDeliveryDelay()
{
return null;
return $this->deliveryDelay;
}

/**
Expand Down
77 changes: 69 additions & 8 deletions pkg/amqp-bunny/Tests/AmqpProducerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

use Bunny\Channel;
use Bunny\Message;
use Enqueue\AmqpBunny\AmqpContext;
use Enqueue\AmqpBunny\AmqpProducer;
use Enqueue\AmqpTools\DelayStrategy;
use Enqueue\Test\ClassExtensionTrait;
use Interop\Amqp\AmqpMessage as InteropAmqpMessage;
use Interop\Amqp\Impl\AmqpMessage;
use Interop\Amqp\Impl\AmqpQueue;
use Interop\Amqp\Impl\AmqpTopic;
use Interop\Queue\DeliveryDelayNotSupportedException;
use Interop\Queue\InvalidDestinationException;
use Interop\Queue\InvalidMessageException;
use Interop\Queue\PsrDestination;
Expand All @@ -23,7 +26,7 @@ class AmqpProducerTest extends TestCase

public function testCouldBeConstructedWithRequiredArguments()
{
new AmqpProducer($this->createBunnyChannelMock());
new AmqpProducer($this->createBunnyChannelMock(), $this->createContextMock());
}

public function testShouldImplementPsrProducerInterface()
Expand All @@ -33,7 +36,7 @@ public function testShouldImplementPsrProducerInterface()

public function testShouldThrowExceptionWhenDestinationTypeIsInvalid()
{
$producer = new AmqpProducer($this->createBunnyChannelMock());
$producer = new AmqpProducer($this->createBunnyChannelMock(), $this->createContextMock());

$this->expectException(InvalidDestinationException::class);
$this->expectExceptionMessage('The destination must be an instance of Interop\Amqp\AmqpQueue but got');
Expand All @@ -43,7 +46,7 @@ public function testShouldThrowExceptionWhenDestinationTypeIsInvalid()

public function testShouldThrowExceptionWhenMessageTypeIsInvalid()
{
$producer = new AmqpProducer($this->createBunnyChannelMock());
$producer = new AmqpProducer($this->createBunnyChannelMock(), $this->createContextMock());

$this->expectException(InvalidMessageException::class);
$this->expectExceptionMessage('The message must be an instance of Interop\Amqp\AmqpMessage but it is');
Expand All @@ -65,7 +68,7 @@ public function testShouldPublishMessageToTopic()
$message = new AmqpMessage('body');
$message->setRoutingKey('routing-key');

$producer = new AmqpProducer($channel);
$producer = new AmqpProducer($channel, $this->createContextMock());
$producer->send($topic, $message);
}

Expand All @@ -80,10 +83,52 @@ public function testShouldPublishMessageToQueue()

$queue = new AmqpQueue('queue');

$producer = new AmqpProducer($channel);
$producer = new AmqpProducer($channel, $this->createContextMock());
$producer->send($queue, new AmqpMessage('body'));
}

public function testShouldDelayMessage()
{
$channel = $this->createBunnyChannelMock();
$channel
->expects($this->never())
->method('publish')
;

$message = new AmqpMessage('body');
$context = $this->createContextMock();
$queue = new AmqpQueue('queue');

$delayStrategy = $this->createDelayStrategyMock();
$delayStrategy
->expects($this->once())
->method('delayMessage')
->with($this->identicalTo($context), $this->identicalTo($queue), $this->identicalTo($message), 10000)
;

$producer = new AmqpProducer($channel, $context);
$producer->setDelayStrategy($delayStrategy);
$producer->setDeliveryDelay(10000);

$producer->send($queue, $message);
}

public function testShouldThrowExceptionOnSetDeliveryDelayWhenDeliveryStrategyIsNotSet()
{
$channel = $this->createBunnyChannelMock();
$channel
->expects($this->never())
->method('publish')
;

$producer = new AmqpProducer($channel, $this->createContextMock());

$this->expectException(DeliveryDelayNotSupportedException::class);
$this->expectExceptionMessage('The provider does not support delivery delay feature');

$producer->setDeliveryDelay(10000);
}

public function testShouldSetMessageHeaders()
{
$channel = $this->createBunnyChannelMock();
Expand All @@ -93,7 +138,7 @@ public function testShouldSetMessageHeaders()
->with($this->anything(), ['content_type' => 'text/plain'])
;

$producer = new AmqpProducer($channel);
$producer = new AmqpProducer($channel, $this->createContextMock());
$producer->send(new AmqpTopic('name'), new AmqpMessage('body', [], ['content_type' => 'text/plain']));
}

Expand All @@ -106,7 +151,7 @@ public function testShouldSetMessageProperties()
->with($this->anything(), ['application_headers' => ['key' => 'value']])
;

$producer = new AmqpProducer($channel);
$producer = new AmqpProducer($channel, $this->createContextMock());
$producer->send(new AmqpTopic('name'), new AmqpMessage('body', ['key' => 'value']));
}

Expand All @@ -123,7 +168,7 @@ public function testShouldPropagateFlags()
$message->addFlag(InteropAmqpMessage::FLAG_IMMEDIATE);
$message->addFlag(InteropAmqpMessage::FLAG_MANDATORY);

$producer = new AmqpProducer($channel);
$producer = new AmqpProducer($channel, $this->createContextMock());
$producer->send(new AmqpTopic('name'), $message);
}

Expand All @@ -150,4 +195,20 @@ private function createBunnyChannelMock()
{
return $this->createMock(Channel::class);
}

/**
* @return \PHPUnit_Framework_MockObject_MockObject|AmqpContext
*/
private function createContextMock()
{
return $this->createMock(AmqpContext::class);
}

/**
* @return \PHPUnit_Framework_MockObject_MockObject|DelayStrategy
*/
private function createDelayStrategyMock()
{
return $this->createMock(DelayStrategy::class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

namespace Enqueue\AmqpBunny\Tests\Spec;

use Enqueue\AmqpLib\AmqpConnectionFactory;
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
use Interop\Queue\PsrContext;
use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec;

/**
* @group functional
*/
class AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest extends SendAndReceiveDelayedMessageFromQueueSpec
{
/**
* {@inheritdoc}
*/
protected function createContext()
{
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
$factory->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy());

return $factory->createContext();
}

/**
* {@inheritdoc}
*/
protected function createQueue(PsrContext $context, $queueName)
{
$queue = parent::createQueue($context, $queueName);

$context->declareQueue($queue);

return $queue;
}
}
Loading