Skip to content

[amqp] Add AMQP secure (SSL) connections support #246

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Oct 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ bin/jp.php
bin/php-parse
bin/google-cloud-batch
vendor
var
.php_cs
.php_cs.cache
composer.lock
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Features:
* [Feature rich](docs/quick_tour.md).
* Implements [JMS](https://docs.oracle.com/javaee/7/api/javax/jms/package-summary.html) like transports based on [queue-interop](https://github.com/queue-interop/queue-interop) interfaces.
* Supported transports
* Amqp based on [the ext](docs/transport/amqp.md), [bunny](docs/transport/amqp_bunny.md), [the lib](docs/transport/amqp_lib.md)
* AMQP(S) based on [the ext](docs/transport/amqp.md), [bunny](docs/transport/amqp_bunny.md), [the lib](docs/transport/amqp_lib.md)
* [Beanstalk](docs/transport/pheanstalk.md)
* [STOMP](docs/transport/stomp.md)
* [Amazon SQS](docs/transport/sqs.md)
Expand Down
8 changes: 8 additions & 0 deletions bin/build-rabbitmq-image.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env bash

set -e
set -x

(cd docker && docker build --rm --force-rm --no-cache --pull --squash --tag "enqueue/rabbitmq:latest" -f Dockerfile.rabbitmq .)
(cd docker && docker login --username="$DOCKER_USER" --password="$DOCKER_PASSWORD")
(cd docker && docker push "enqueue/rabbitmq:latest")
20 changes: 20 additions & 0 deletions bin/build-rabbitmq-ssl-image.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env bash


set -e
set -x

mkdir -p /tmp/roboconf
rm -rf /tmp/roboconf/*

(cd /tmp/roboconf && git clone git@github.com:roboconf/rabbitmq-with-ssl-in-docker.git)

(cd /tmp/roboconf/rabbitmq-with-ssl-in-docker && docker build --rm --force-rm --no-cache --pull --squash --tag "enqueue/rabbitmq-ssl:latest" .)

(cd /tmp/roboconf/rabbitmq-with-ssl-in-docker && docker login --username="$DOCKER_USER" --password="$DOCKER_PASSWORD")
(cd /tmp/roboconf/rabbitmq-with-ssl-in-docker && docker push "enqueue/rabbitmq-ssl:latest")

docker run --rm -v "`pwd`/var/rabbitmq_certificates:/enqueue" "enqueue/rabbitmq-ssl:latest" cp /home/testca/cacert.pem /enqueue/cacert.pem



10 changes: 10 additions & 0 deletions bin/test
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
# $1 host
# $2 port
# $3 attempts

FORCE_EXIT=false

function waitForService()
{
ATTEMPTS=0
Expand All @@ -14,13 +17,20 @@ function waitForService()
printf "service is not running %s:%s\n" $1 $2
exit 1
fi
if [ "$FORCE_EXIT" = true ]; then
exit;
fi

sleep 1
done

printf "service is online %s:%s\n" $1 $2
}

trap "FORCE_EXIT=true" SIGTERM SIGINT

waitForService rabbitmq 5672 50
waitForService rabbitmq_ssl 5671 50
waitForService mysql 3306 50
waitForService redis 6379 50
waitForService beanstalkd 11300 50
Expand Down
12 changes: 11 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ services:
# build: { context: docker, dockerfile: Dockerfile }
depends_on:
- rabbitmq
- rabbitmq_ssl
- mysql
- redis
- beanstalkd
Expand All @@ -17,6 +18,7 @@ services:
- './:/mqdev'
environment:
- AMQP_DSN=amqp://guest:guest@rabbitmq:5672/mqdev
- AMQPS_DSN=amqps://guest:guest@rabbitmq_ssl:5671
- DOCTINE_DSN=mysql://root:rootpass@mysql/mqdev
- SYMFONY__RABBITMQ__HOST=rabbitmq
- SYMFONY__RABBITMQ__USER=guest
Expand Down Expand Up @@ -54,8 +56,16 @@ services:
ports:
- "15677:15672"

rabbitmq_ssl:
image: enqueue/rabbitmq-ssl:latest
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
volumes:
- './var/rabbitmq_certificates:/home/client'

beanstalkd:
image: 'schickling/beanstalkd'
image: 'jonbaldie/beanstalkd'

gearmand:
image: 'artefactual/gearmand'
Expand Down
49 changes: 49 additions & 0 deletions docs/bundle/config_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,21 @@ enqueue:

# The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option.
driver_options: ~

# Should be true if you want to use secure connections. False by default
ssl_on: ~

# This option determines whether ssl client verifies that the server cert is for the server it is known as. True by default.
ssl_verify: ~

# Location of Certificate Authority file on local filesystem which should be used with the verify_peer context option to authenticate the identity of the remote peer. A string.
ssl_cacert: ~

# Path to local certificate file on filesystem. It must be a PEM encoded file which contains your certificate and private key. A string
ssl_cert: ~

# Path to local private key file on filesystem in case of separate files for certificate (local_cert) and private key. A string.
ssl_key: ~
rabbitmq_amqp:
driver: ~ # One of "ext"; "lib"; "bunny"

Expand Down Expand Up @@ -137,6 +152,21 @@ enqueue:
# The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option.
driver_options: ~

# Should be true if you want to use secure connections. False by default
ssl_on: ~

# This option determines whether ssl client verifies that the server cert is for the server it is known as. True by default.
ssl_verify: ~

# Location of Certificate Authority file on local filesystem which should be used with the verify_peer context option to authenticate the identity of the remote peer. A string.
ssl_cacert: ~

# Path to local certificate file on filesystem. It must be a PEM encoded file which contains your certificate and private key. A string
ssl_cert: ~

# Path to local private key file on filesystem in case of separate files for certificate (local_cert) and private key. A string.
ssl_key: ~

# The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id
delay_strategy: dlx
fs:
Expand Down Expand Up @@ -196,6 +226,25 @@ enqueue:

# the connection will be performed as later as possible, if the option set to true
lazy: true
gps:

# The connection to Google Pub/Sub broker set as a string. Other parameters are ignored if set
dsn: ~

# The project ID from the Google Developer's Console.
projectId: ~

# The full path to your service account credentials.json file retrieved from the Google Developers Console.
keyFilePath: ~

# Number of retries for a failed request.
retries: 3

# Scopes to be used for the request.
scopes: []

# The connection will be performed as later as possible, if the option set to true
lazy: true
client:
traceable_producer: false
prefix: enqueue
Expand Down
8 changes: 8 additions & 0 deletions docs/transport/amqp.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ $factory = new AmqpConnectionFactory([
// same as above but given as DSN string
$factory = new AmqpConnectionFactory('amqp://user:pass@example.com:10000/%2f');

// SSL or secure connection
$factory = new AmqpConnectionFactory([
'dsn' => 'amqps:',
'ssl_cacert' => '/path/to/cacert.pem',
'ssl_cert' => '/path/to/cert.pem',
'ssl_key' => '/path/to/key.pem',
]);

$psrContext = $factory->createContext();

// if you have enqueue/enqueue library installed you can use a function from there to create the context
Expand Down
8 changes: 8 additions & 0 deletions docs/transport/amqp_lib.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ $factory = new AmqpConnectionFactory([
// same as above but given as DSN string
$factory = new AmqpConnectionFactory('amqp://user:pass@example.com:10000/%2f');

// SSL or secure connection
$factory = new AmqpConnectionFactory([
'dsn' => 'amqps:',
'ssl_cacert' => '/path/to/cacert.pem',
'ssl_cert' => '/path/to/cert.pem',
'ssl_key' => '/path/to/key.pem',
]);

$psrContext = $factory->createContext();

// if you have enqueue/enqueue library installed you can use a function from there to create the context
Expand Down
4 changes: 4 additions & 0 deletions pkg/amqp-bunny/AmqpConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public function getConfig()
*/
private function establishConnection()
{
if ($this->config->isSslOn()) {
throw new \LogicException('The bunny library does not support SSL connections');
}

if (false == $this->client) {
$bunnyConfig = [];
$bunnyConfig['host'] = $this->config->getHost();
Expand Down
2 changes: 1 addition & 1 deletion pkg/amqp-bunny/Tests/AmqpConnectionFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public function testShouldSupportAmqpLibScheme()
new AmqpConnectionFactory('amqp+bunny:');

$this->expectException(\LogicException::class);
$this->expectExceptionMessage('The given DSN scheme "amqp+foo" is not supported. Could be one of "amqp", "amqp+bunny" only.');
$this->expectExceptionMessage('The given DSN scheme "amqp+foo" is not supported. Could be one of "amqp", "amqps", "amqp+bunny" only.');
new AmqpConnectionFactory('amqp+foo:');
}
}
59 changes: 59 additions & 0 deletions pkg/amqp-bunny/Tests/Spec/AmqpSslSendToAndReceiveFromQueueTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?php

namespace Enqueue\AmqpBunny\Tests\Spec;

use Enqueue\AmqpBunny\AmqpConnectionFactory;
use Enqueue\AmqpBunny\AmqpContext;
use Interop\Queue\PsrContext;
use Interop\Queue\Spec\SendToAndReceiveFromQueueSpec;

/**
* @group functional
*/
class AmqpSslSendToAndReceiveFromQueueTest extends SendToAndReceiveFromQueueSpec
{
public function test()
{
$this->expectException(\LogicException::class);
$this->expectExceptionMessage('The bunny library does not support SSL connections');
parent::test();
}

/**
* {@inheritdoc}
*/
protected function createContext()
{
$baseDir = realpath(__DIR__.'/../../../../');

// guard
$this->assertNotEmpty($baseDir);

$certDir = $baseDir.'/var/rabbitmq_certificates';
$this->assertDirectoryExists($certDir);

$factory = new AmqpConnectionFactory([
'dsn' => getenv('AMQPS_DSN'),
'ssl_verify' => false,
'ssl_cacert' => $certDir.'/cacert.pem',
'ssl_cert' => $certDir.'/cert.pem',
'ssl_key' => $certDir.'/key.pem',
]);

return $factory->createContext();
}

/**
* {@inheritdoc}
*
* @param AmqpContext $context
*/
protected function createQueue(PsrContext $context, $queueName)
{
$queue = $context->createQueue($queueName);
$context->declareQueue($queue);
$context->purgeQueue($queue);

return $queue;
}
}
10 changes: 10 additions & 0 deletions pkg/amqp-ext/AmqpConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public function __construct($config = 'amqp:')
{
$this->config = (new ConnectionConfig($config))
->addSupportedScheme('amqp+ext')
->addSupportedScheme('amqps+ext')
->addDefaultOption('receive_method', 'basic_get')
->parse()
;
Expand Down Expand Up @@ -113,11 +114,20 @@ private function establishConnection()
$extConfig['read_timeout'] = $this->config->getReadTimeout();
$extConfig['write_timeout'] = $this->config->getWriteTimeout();
$extConfig['connect_timeout'] = $this->config->getConnectionTimeout();
$extConfig['heartbeat'] = $this->config->getHeartbeat();

if ($this->config->isSslOn()) {
$extConfig['verify'] = $this->config->isSslVerify();
$extConfig['cacert'] = $this->config->getSslCaCert();
$extConfig['cert'] = $this->config->getSslCert();
$extConfig['key'] = $this->config->getSslKey();
}

$this->connection = new \AMQPConnection($extConfig);

$this->config->isPersisted() ? $this->connection->pconnect() : $this->connection->connect();
}

if (false == $this->connection->isConnected()) {
$this->config->isPersisted() ? $this->connection->preconnect() : $this->connection->reconnect();
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/amqp-ext/Tests/AmqpConnectionFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ public function testShouldSupportAmqpExtScheme()
{
// no exception here
new AmqpConnectionFactory('amqp+ext:');
new AmqpConnectionFactory('amqps+ext:');

$this->expectException(\LogicException::class);
$this->expectExceptionMessage('The given DSN scheme "amqp+foo" is not supported. Could be one of "amqp", "amqp+ext" only.');
$this->expectExceptionMessage('The given DSN scheme "amqp+foo" is not supported. Could be one of "amqp", "amqps", "amqp+ext", "amqps+ext" only.');
new AmqpConnectionFactory('amqp+foo:');
}

Expand Down
52 changes: 52 additions & 0 deletions pkg/amqp-ext/Tests/Spec/AmqpSslSendToAndReceiveFromQueueTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?php

namespace Enqueue\AmqpExt\Tests\Spec;

use Enqueue\AmqpExt\AmqpConnectionFactory;
use Enqueue\AmqpExt\AmqpContext;
use Interop\Queue\PsrContext;
use Interop\Queue\Spec\SendToAndReceiveFromQueueSpec;

/**
* @group functional
*/
class AmqpSslSendToAndReceiveFromQueueTest extends SendToAndReceiveFromQueueSpec
{
/**
* {@inheritdoc}
*/
protected function createContext()
{
$baseDir = realpath(__DIR__.'/../../../../');

// guard
$this->assertNotEmpty($baseDir);

$certDir = $baseDir.'/var/rabbitmq_certificates';
$this->assertDirectoryExists($certDir);

$factory = new AmqpConnectionFactory([
'dsn' => getenv('AMQPS_DSN'),
'ssl_verify' => false,
'ssl_cacert' => $certDir.'/cacert.pem',
'ssl_cert' => $certDir.'/cert.pem',
'ssl_key' => $certDir.'/key.pem',
]);

return $factory->createContext();
}

/**
* {@inheritdoc}
*
* @param AmqpContext $context
*/
protected function createQueue(PsrContext $context, $queueName)
{
$queue = $context->createQueue($queueName);
$context->declareQueue($queue);
$context->purgeQueue($queue);

return $queue;
}
}
Loading