Skip to content

Commit 0c74914

Browse files
authored
Merge pull request #246 from php-enqueue/amqps
[amqp] Add AMQP secure (SSL) connections support
2 parents 868816e + 0c85c35 commit 0c74914

25 files changed

+599
-13
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ bin/jp.php
99
bin/php-parse
1010
bin/google-cloud-batch
1111
vendor
12+
var
1213
.php_cs
1314
.php_cs.cache
1415
composer.lock

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ Features:
1111
* [Feature rich](docs/quick_tour.md).
1212
* Implements [JMS](https://docs.oracle.com/javaee/7/api/javax/jms/package-summary.html) like transports based on [queue-interop](https://github.com/queue-interop/queue-interop) interfaces.
1313
* Supported transports
14-
* Amqp based on [the ext](docs/transport/amqp.md), [bunny](docs/transport/amqp_bunny.md), [the lib](docs/transport/amqp_lib.md)
14+
* AMQP(S) based on [the ext](docs/transport/amqp.md), [bunny](docs/transport/amqp_bunny.md), [the lib](docs/transport/amqp_lib.md)
1515
* [Beanstalk](docs/transport/pheanstalk.md)
1616
* [STOMP](docs/transport/stomp.md)
1717
* [Amazon SQS](docs/transport/sqs.md)

bin/build-rabbitmq-image.sh

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
#!/usr/bin/env bash
2+
3+
set -e
4+
set -x
5+
6+
(cd docker && docker build --rm --force-rm --no-cache --pull --squash --tag "enqueue/rabbitmq:latest" -f Dockerfile.rabbitmq .)
7+
(cd docker && docker login --username="$DOCKER_USER" --password="$DOCKER_PASSWORD")
8+
(cd docker && docker push "enqueue/rabbitmq:latest")

bin/build-rabbitmq-ssl-image.sh

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#!/usr/bin/env bash
2+
3+
4+
set -e
5+
set -x
6+
7+
mkdir -p /tmp/roboconf
8+
rm -rf /tmp/roboconf/*
9+
10+
(cd /tmp/roboconf && git clone git@github.com:roboconf/rabbitmq-with-ssl-in-docker.git)
11+
12+
(cd /tmp/roboconf/rabbitmq-with-ssl-in-docker && docker build --rm --force-rm --no-cache --pull --squash --tag "enqueue/rabbitmq-ssl:latest" .)
13+
14+
(cd /tmp/roboconf/rabbitmq-with-ssl-in-docker && docker login --username="$DOCKER_USER" --password="$DOCKER_PASSWORD")
15+
(cd /tmp/roboconf/rabbitmq-with-ssl-in-docker && docker push "enqueue/rabbitmq-ssl:latest")
16+
17+
docker run --rm -v "`pwd`/var/rabbitmq_certificates:/enqueue" "enqueue/rabbitmq-ssl:latest" cp /home/testca/cacert.pem /enqueue/cacert.pem
18+
19+
20+

bin/test

+10
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
# $1 host
55
# $2 port
66
# $3 attempts
7+
8+
FORCE_EXIT=false
9+
710
function waitForService()
811
{
912
ATTEMPTS=0
@@ -14,13 +17,20 @@ function waitForService()
1417
printf "service is not running %s:%s\n" $1 $2
1518
exit 1
1619
fi
20+
if [ "$FORCE_EXIT" = true ]; then
21+
exit;
22+
fi
23+
1724
sleep 1
1825
done
1926

2027
printf "service is online %s:%s\n" $1 $2
2128
}
2229

30+
trap "FORCE_EXIT=true" SIGTERM SIGINT
31+
2332
waitForService rabbitmq 5672 50
33+
waitForService rabbitmq_ssl 5671 50
2434
waitForService mysql 3306 50
2535
waitForService redis 6379 50
2636
waitForService beanstalkd 11300 50

docker-compose.yml

+11-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ services:
66
# build: { context: docker, dockerfile: Dockerfile }
77
depends_on:
88
- rabbitmq
9+
- rabbitmq_ssl
910
- mysql
1011
- redis
1112
- beanstalkd
@@ -17,6 +18,7 @@ services:
1718
- './:/mqdev'
1819
environment:
1920
- AMQP_DSN=amqp://guest:guest@rabbitmq:5672/mqdev
21+
- AMQPS_DSN=amqps://guest:guest@rabbitmq_ssl:5671
2022
- DOCTINE_DSN=mysql://root:rootpass@mysql/mqdev
2123
- SYMFONY__RABBITMQ__HOST=rabbitmq
2224
- SYMFONY__RABBITMQ__USER=guest
@@ -54,8 +56,16 @@ services:
5456
ports:
5557
- "15677:15672"
5658

59+
rabbitmq_ssl:
60+
image: enqueue/rabbitmq-ssl:latest
61+
environment:
62+
- RABBITMQ_DEFAULT_USER=guest
63+
- RABBITMQ_DEFAULT_PASS=guest
64+
volumes:
65+
- './var/rabbitmq_certificates:/home/client'
66+
5767
beanstalkd:
58-
image: 'schickling/beanstalkd'
68+
image: 'jonbaldie/beanstalkd'
5969

6070
gearmand:
6171
image: 'artefactual/gearmand'

docs/bundle/config_reference.md

+49
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,21 @@ enqueue:
8787

8888
# The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option.
8989
driver_options: ~
90+
91+
# Should be true if you want to use secure connections. False by default
92+
ssl_on: ~
93+
94+
# This option determines whether ssl client verifies that the server cert is for the server it is known as. True by default.
95+
ssl_verify: ~
96+
97+
# Location of Certificate Authority file on local filesystem which should be used with the verify_peer context option to authenticate the identity of the remote peer. A string.
98+
ssl_cacert: ~
99+
100+
# Path to local certificate file on filesystem. It must be a PEM encoded file which contains your certificate and private key. A string
101+
ssl_cert: ~
102+
103+
# Path to local private key file on filesystem in case of separate files for certificate (local_cert) and private key. A string.
104+
ssl_key: ~
90105
rabbitmq_amqp:
91106
driver: ~ # One of "ext"; "lib"; "bunny"
92107

@@ -137,6 +152,21 @@ enqueue:
137152
# The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option.
138153
driver_options: ~
139154

155+
# Should be true if you want to use secure connections. False by default
156+
ssl_on: ~
157+
158+
# This option determines whether ssl client verifies that the server cert is for the server it is known as. True by default.
159+
ssl_verify: ~
160+
161+
# Location of Certificate Authority file on local filesystem which should be used with the verify_peer context option to authenticate the identity of the remote peer. A string.
162+
ssl_cacert: ~
163+
164+
# Path to local certificate file on filesystem. It must be a PEM encoded file which contains your certificate and private key. A string
165+
ssl_cert: ~
166+
167+
# Path to local private key file on filesystem in case of separate files for certificate (local_cert) and private key. A string.
168+
ssl_key: ~
169+
140170
# The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id
141171
delay_strategy: dlx
142172
fs:
@@ -196,6 +226,25 @@ enqueue:
196226

197227
# the connection will be performed as later as possible, if the option set to true
198228
lazy: true
229+
gps:
230+
231+
# The connection to Google Pub/Sub broker set as a string. Other parameters are ignored if set
232+
dsn: ~
233+
234+
# The project ID from the Google Developer's Console.
235+
projectId: ~
236+
237+
# The full path to your service account credentials.json file retrieved from the Google Developers Console.
238+
keyFilePath: ~
239+
240+
# Number of retries for a failed request.
241+
retries: 3
242+
243+
# Scopes to be used for the request.
244+
scopes: []
245+
246+
# The connection will be performed as later as possible, if the option set to true
247+
lazy: true
199248
client:
200249
traceable_producer: false
201250
prefix: enqueue

docs/transport/amqp.md

+8
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ $factory = new AmqpConnectionFactory([
5050
// same as above but given as DSN string
5151
$factory = new AmqpConnectionFactory('amqp://user:pass@example.com:10000/%2f');
5252

53+
// SSL or secure connection
54+
$factory = new AmqpConnectionFactory([
55+
'dsn' => 'amqps:',
56+
'ssl_cacert' => '/path/to/cacert.pem',
57+
'ssl_cert' => '/path/to/cert.pem',
58+
'ssl_key' => '/path/to/key.pem',
59+
]);
60+
5361
$psrContext = $factory->createContext();
5462

5563
// if you have enqueue/enqueue library installed you can use a function from there to create the context

docs/transport/amqp_lib.md

+8
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ $factory = new AmqpConnectionFactory([
5050
// same as above but given as DSN string
5151
$factory = new AmqpConnectionFactory('amqp://user:pass@example.com:10000/%2f');
5252

53+
// SSL or secure connection
54+
$factory = new AmqpConnectionFactory([
55+
'dsn' => 'amqps:',
56+
'ssl_cacert' => '/path/to/cacert.pem',
57+
'ssl_cert' => '/path/to/cert.pem',
58+
'ssl_key' => '/path/to/key.pem',
59+
]);
60+
5361
$psrContext = $factory->createContext();
5462

5563
// if you have enqueue/enqueue library installed you can use a function from there to create the context

pkg/amqp-bunny/AmqpConnectionFactory.php

+4
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ public function getConfig()
8585
*/
8686
private function establishConnection()
8787
{
88+
if ($this->config->isSslOn()) {
89+
throw new \LogicException('The bunny library does not support SSL connections');
90+
}
91+
8892
if (false == $this->client) {
8993
$bunnyConfig = [];
9094
$bunnyConfig['host'] = $this->config->getHost();

pkg/amqp-bunny/Tests/AmqpConnectionFactoryTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public function testShouldSupportAmqpLibScheme()
2222
new AmqpConnectionFactory('amqp+bunny:');
2323

2424
$this->expectException(\LogicException::class);
25-
$this->expectExceptionMessage('The given DSN scheme "amqp+foo" is not supported. Could be one of "amqp", "amqp+bunny" only.');
25+
$this->expectExceptionMessage('The given DSN scheme "amqp+foo" is not supported. Could be one of "amqp", "amqps", "amqp+bunny" only.');
2626
new AmqpConnectionFactory('amqp+foo:');
2727
}
2828
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpBunny\Tests\Spec;
4+
5+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
6+
use Enqueue\AmqpBunny\AmqpContext;
7+
use Interop\Queue\PsrContext;
8+
use Interop\Queue\Spec\SendToAndReceiveFromQueueSpec;
9+
10+
/**
11+
* @group functional
12+
*/
13+
class AmqpSslSendToAndReceiveFromQueueTest extends SendToAndReceiveFromQueueSpec
14+
{
15+
public function test()
16+
{
17+
$this->expectException(\LogicException::class);
18+
$this->expectExceptionMessage('The bunny library does not support SSL connections');
19+
parent::test();
20+
}
21+
22+
/**
23+
* {@inheritdoc}
24+
*/
25+
protected function createContext()
26+
{
27+
$baseDir = realpath(__DIR__.'/../../../../');
28+
29+
// guard
30+
$this->assertNotEmpty($baseDir);
31+
32+
$certDir = $baseDir.'/var/rabbitmq_certificates';
33+
$this->assertDirectoryExists($certDir);
34+
35+
$factory = new AmqpConnectionFactory([
36+
'dsn' => getenv('AMQPS_DSN'),
37+
'ssl_verify' => false,
38+
'ssl_cacert' => $certDir.'/cacert.pem',
39+
'ssl_cert' => $certDir.'/cert.pem',
40+
'ssl_key' => $certDir.'/key.pem',
41+
]);
42+
43+
return $factory->createContext();
44+
}
45+
46+
/**
47+
* {@inheritdoc}
48+
*
49+
* @param AmqpContext $context
50+
*/
51+
protected function createQueue(PsrContext $context, $queueName)
52+
{
53+
$queue = $context->createQueue($queueName);
54+
$context->declareQueue($queue);
55+
$context->purgeQueue($queue);
56+
57+
return $queue;
58+
}
59+
}

pkg/amqp-ext/AmqpConnectionFactory.php

+10
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public function __construct($config = 'amqp:')
3333
{
3434
$this->config = (new ConnectionConfig($config))
3535
->addSupportedScheme('amqp+ext')
36+
->addSupportedScheme('amqps+ext')
3637
->addDefaultOption('receive_method', 'basic_get')
3738
->parse()
3839
;
@@ -113,11 +114,20 @@ private function establishConnection()
113114
$extConfig['read_timeout'] = $this->config->getReadTimeout();
114115
$extConfig['write_timeout'] = $this->config->getWriteTimeout();
115116
$extConfig['connect_timeout'] = $this->config->getConnectionTimeout();
117+
$extConfig['heartbeat'] = $this->config->getHeartbeat();
118+
119+
if ($this->config->isSslOn()) {
120+
$extConfig['verify'] = $this->config->isSslVerify();
121+
$extConfig['cacert'] = $this->config->getSslCaCert();
122+
$extConfig['cert'] = $this->config->getSslCert();
123+
$extConfig['key'] = $this->config->getSslKey();
124+
}
116125

117126
$this->connection = new \AMQPConnection($extConfig);
118127

119128
$this->config->isPersisted() ? $this->connection->pconnect() : $this->connection->connect();
120129
}
130+
121131
if (false == $this->connection->isConnected()) {
122132
$this->config->isPersisted() ? $this->connection->preconnect() : $this->connection->reconnect();
123133
}

pkg/amqp-ext/Tests/AmqpConnectionFactoryTest.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ public function testShouldSupportAmqpExtScheme()
2121
{
2222
// no exception here
2323
new AmqpConnectionFactory('amqp+ext:');
24+
new AmqpConnectionFactory('amqps+ext:');
2425

2526
$this->expectException(\LogicException::class);
26-
$this->expectExceptionMessage('The given DSN scheme "amqp+foo" is not supported. Could be one of "amqp", "amqp+ext" only.');
27+
$this->expectExceptionMessage('The given DSN scheme "amqp+foo" is not supported. Could be one of "amqp", "amqps", "amqp+ext", "amqps+ext" only.');
2728
new AmqpConnectionFactory('amqp+foo:');
2829
}
2930

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpExt\Tests\Spec;
4+
5+
use Enqueue\AmqpExt\AmqpConnectionFactory;
6+
use Enqueue\AmqpExt\AmqpContext;
7+
use Interop\Queue\PsrContext;
8+
use Interop\Queue\Spec\SendToAndReceiveFromQueueSpec;
9+
10+
/**
11+
* @group functional
12+
*/
13+
class AmqpSslSendToAndReceiveFromQueueTest extends SendToAndReceiveFromQueueSpec
14+
{
15+
/**
16+
* {@inheritdoc}
17+
*/
18+
protected function createContext()
19+
{
20+
$baseDir = realpath(__DIR__.'/../../../../');
21+
22+
// guard
23+
$this->assertNotEmpty($baseDir);
24+
25+
$certDir = $baseDir.'/var/rabbitmq_certificates';
26+
$this->assertDirectoryExists($certDir);
27+
28+
$factory = new AmqpConnectionFactory([
29+
'dsn' => getenv('AMQPS_DSN'),
30+
'ssl_verify' => false,
31+
'ssl_cacert' => $certDir.'/cacert.pem',
32+
'ssl_cert' => $certDir.'/cert.pem',
33+
'ssl_key' => $certDir.'/key.pem',
34+
]);
35+
36+
return $factory->createContext();
37+
}
38+
39+
/**
40+
* {@inheritdoc}
41+
*
42+
* @param AmqpContext $context
43+
*/
44+
protected function createQueue(PsrContext $context, $queueName)
45+
{
46+
$queue = $context->createQueue($queueName);
47+
$context->declareQueue($queue);
48+
$context->purgeQueue($queue);
49+
50+
return $queue;
51+
}
52+
}

0 commit comments

Comments
 (0)