Skip to content

Commit 59d5d20

Browse files
authored
Merge pull request #149 from php-enqueue/delay-ttl-priority-in-producer
Delay, ttl, priority, in producer
2 parents 16ef505 + e96a670 commit 59d5d20

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1127
-19
lines changed

composer.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
"enqueue/simple-client": "*@dev",
2424
"enqueue/test": "*@dev",
2525
"enqueue/async-event-dispatcher": "*@dev",
26-
"queue-interop/queue-interop": "^0.5@dev",
26+
"queue-interop/queue-interop": "^0.6@dev",
27+
"queue-interop/amqp-interop": "^0.6@dev",
2728
"queue-interop/queue-spec": "^0.5@dev",
28-
"queue-interop/amqp-interop": "^0.5@dev",
2929

3030
"phpunit/phpunit": "^5",
3131
"doctrine/doctrine-bundle": "~1.2",

pkg/amqp-ext/AmqpProducer.php

+67
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Interop\Amqp\AmqpProducer as InteropAmqpProducer;
77
use Interop\Amqp\AmqpQueue;
88
use Interop\Amqp\AmqpTopic;
9+
use Interop\Queue\DeliveryDelayNotSupportedException;
910
use Interop\Queue\InvalidDestinationException;
1011
use Interop\Queue\InvalidMessageException;
1112
use Interop\Queue\PsrDestination;
@@ -14,6 +15,16 @@
1415

1516
class AmqpProducer implements InteropAmqpProducer
1617
{
18+
/**
19+
* @var int|null
20+
*/
21+
private $priority;
22+
23+
/**
24+
* @var int|float|null
25+
*/
26+
private $timeToLive;
27+
1728
/**
1829
* @var \AMQPChannel
1930
*/
@@ -42,6 +53,14 @@ public function send(PsrDestination $destination, PsrMessage $message)
4253

4354
InvalidMessageException::assertMessageInstanceOf($message, AmqpMessage::class);
4455

56+
if (null !== $this->priority && null === $message->getPriority()) {
57+
$message->setPriority($this->priority);
58+
}
59+
60+
if (null !== $this->timeToLive && null === $message->getExpiration()) {
61+
$message->setExpiration($this->timeToLive);
62+
}
63+
4564
$amqpAttributes = $message->getHeaders();
4665

4766
if ($message->getProperties()) {
@@ -74,4 +93,52 @@ public function send(PsrDestination $destination, PsrMessage $message)
7493
);
7594
}
7695
}
96+
97+
/**
98+
* {@inheritdoc}
99+
*/
100+
public function setDeliveryDelay($deliveryDelay)
101+
{
102+
throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt();
103+
}
104+
105+
/**
106+
* {@inheritdoc}
107+
*/
108+
public function getDeliveryDelay()
109+
{
110+
return null;
111+
}
112+
113+
/**
114+
* {@inheritdoc}
115+
*/
116+
public function setPriority($priority)
117+
{
118+
$this->priority = $priority;
119+
}
120+
121+
/**
122+
* {@inheritdoc}
123+
*/
124+
public function getPriority()
125+
{
126+
return $this->priority;
127+
}
128+
129+
/**
130+
* {@inheritdoc}
131+
*/
132+
public function setTimeToLive($timeToLive)
133+
{
134+
$this->timeToLive = $timeToLive;
135+
}
136+
137+
/**
138+
* {@inheritdoc}
139+
*/
140+
public function getTimeToLive()
141+
{
142+
return $this->timeToLive;
143+
}
77144
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpExt\Tests\Spec;
4+
5+
use Enqueue\AmqpExt\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\PsrProducerSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpProducerTest extends PsrProducerSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createProducer()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext()->createProducer();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpExt\Tests\Spec;
4+
5+
use Enqueue\AmqpExt\AmqpConnectionFactory;
6+
use Enqueue\AmqpExt\AmqpContext;
7+
use Interop\Queue\PsrContext;
8+
use Interop\Queue\Spec\SendAndReceivePriorityMessagesFromQueueSpec;
9+
10+
/**
11+
* @group functional
12+
*/
13+
class AmqpSendAndReceivePriorityMessagesFromQueueTest extends SendAndReceivePriorityMessagesFromQueueSpec
14+
{
15+
/**
16+
* {@inheritdoc}
17+
*/
18+
protected function createContext()
19+
{
20+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
21+
22+
return $factory->createContext();
23+
}
24+
25+
/**
26+
* {@inheritdoc}
27+
*
28+
* @param AmqpContext $context
29+
*/
30+
protected function createQueue(PsrContext $context, $queueName)
31+
{
32+
$queue = $context->createQueue($queueName);
33+
$queue->setArguments(['x-max-priority' => 10]);
34+
35+
$context->declareQueue($queue);
36+
$context->purgeQueue($queue);
37+
38+
return $queue;
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpExt\Tests\Spec;
4+
5+
use Enqueue\AmqpExt\AmqpConnectionFactory;
6+
use Enqueue\AmqpExt\AmqpContext;
7+
use Interop\Queue\PsrContext;
8+
use Interop\Queue\Spec\SendAndReceiveTimeToLiveMessagesFromQueueSpec;
9+
10+
/**
11+
* @group functional
12+
*/
13+
class AmqpSendAndReceiveTimeToLiveMessagesFromQueueTest extends SendAndReceiveTimeToLiveMessagesFromQueueSpec
14+
{
15+
/**
16+
* {@inheritdoc}
17+
*/
18+
protected function createContext()
19+
{
20+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
21+
22+
return $factory->createContext();
23+
}
24+
25+
/**
26+
* {@inheritdoc}
27+
*
28+
* @param AmqpContext $context
29+
*/
30+
protected function createQueue(PsrContext $context, $queueName)
31+
{
32+
$queue = $context->createQueue($queueName);
33+
$context->declareQueue($queue);
34+
$context->purgeQueue($queue);
35+
36+
return $queue;
37+
}
38+
}

pkg/amqp-ext/composer.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
"require": {
1414
"php": ">=5.6",
1515
"ext-amqp": "^1.6",
16-
"queue-interop/queue-interop": "^0.5@dev",
17-
"queue-interop/amqp-interop": "^0.5@dev",
16+
17+
"queue-interop/amqp-interop": "^0.6@dev",
1818
"psr/log": "^1"
1919
},
2020
"require-dev": {

pkg/amqp-lib/AmqpContext.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use Interop\Queue\PsrTopic;
1818
use PhpAmqpLib\Channel\AMQPChannel;
1919
use PhpAmqpLib\Connection\AbstractConnection;
20+
use PhpAmqpLib\Wire\AMQPTable;
2021

2122
class AmqpContext implements InteropAmqpContext
2223
{
@@ -173,7 +174,7 @@ public function declareQueue(InteropAmqpQueue $queue)
173174
(bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_EXCLUSIVE),
174175
(bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_AUTODELETE),
175176
(bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_NOWAIT),
176-
$queue->getArguments()
177+
$queue->getArguments() ? new AMQPTable($queue->getArguments()) : null
177178
);
178179

179180
return $messageCount;

pkg/amqp-lib/AmqpProducer.php

+67
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Interop\Amqp\AmqpProducer as InteropAmqpProducer;
77
use Interop\Amqp\AmqpQueue as InteropAmqpQueue;
88
use Interop\Amqp\AmqpTopic as InteropAmqpTopic;
9+
use Interop\Queue\DeliveryDelayNotSupportedException;
910
use Interop\Queue\InvalidDestinationException;
1011
use Interop\Queue\InvalidMessageException;
1112
use Interop\Queue\PsrDestination;
@@ -17,6 +18,16 @@
1718

1819
class AmqpProducer implements InteropAmqpProducer
1920
{
21+
/**
22+
* @var int|null
23+
*/
24+
private $priority;
25+
26+
/**
27+
* @var int|float|null
28+
*/
29+
private $timeToLive;
30+
2031
/**
2132
* @var AMQPChannel
2233
*/
@@ -43,6 +54,14 @@ public function send(PsrDestination $destination, PsrMessage $message)
4354

4455
InvalidMessageException::assertMessageInstanceOf($message, InteropAmqpMessage::class);
4556

57+
if (null !== $this->priority && null === $message->getPriority()) {
58+
$message->setPriority($this->priority);
59+
}
60+
61+
if (null !== $this->timeToLive && null === $message->getExpiration()) {
62+
$message->setExpiration($this->timeToLive);
63+
}
64+
4665
$amqpProperties = $message->getHeaders();
4766

4867
if ($appProperties = $message->getProperties()) {
@@ -69,4 +88,52 @@ public function send(PsrDestination $destination, PsrMessage $message)
6988
);
7089
}
7190
}
91+
92+
/**
93+
* {@inheritdoc}
94+
*/
95+
public function setDeliveryDelay($deliveryDelay)
96+
{
97+
throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt();
98+
}
99+
100+
/**
101+
* {@inheritdoc}
102+
*/
103+
public function getDeliveryDelay()
104+
{
105+
return null;
106+
}
107+
108+
/**
109+
* {@inheritdoc}
110+
*/
111+
public function setPriority($priority)
112+
{
113+
$this->priority = $priority;
114+
}
115+
116+
/**
117+
* {@inheritdoc}
118+
*/
119+
public function getPriority()
120+
{
121+
return $this->priority;
122+
}
123+
124+
/**
125+
* {@inheritdoc}
126+
*/
127+
public function setTimeToLive($timeToLive)
128+
{
129+
$this->timeToLive = $timeToLive;
130+
}
131+
132+
/**
133+
* {@inheritdoc}
134+
*/
135+
public function getTimeToLive()
136+
{
137+
return $this->timeToLive;
138+
}
72139
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpLib\Tests\Spec;
4+
5+
use Enqueue\AmqpLib\AmqpConnectionFactory;
6+
use Enqueue\AmqpLib\AmqpContext;
7+
use Interop\Queue\PsrContext;
8+
use Interop\Queue\Spec\SendAndReceivePriorityMessagesFromQueueSpec;
9+
10+
/**
11+
* @group functional
12+
*/
13+
class AmqpSendAndReceivePriorityMessagesFromQueueTest extends SendAndReceivePriorityMessagesFromQueueSpec
14+
{
15+
/**
16+
* {@inheritdoc}
17+
*/
18+
protected function createContext()
19+
{
20+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
21+
22+
return $factory->createContext();
23+
}
24+
25+
/**
26+
* {@inheritdoc}
27+
*
28+
* @param AmqpContext $context
29+
*/
30+
protected function createQueue(PsrContext $context, $queueName)
31+
{
32+
$queue = $context->createQueue($queueName);
33+
$queue->setArguments(['x-max-priority' => 10]);
34+
35+
$context->declareQueue($queue);
36+
$context->purgeQueue($queue);
37+
38+
return $queue;
39+
}
40+
}

0 commit comments

Comments
 (0)