Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix authentication with x509 certificates #29

Merged
merged 1 commit into from
Aug 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fedora_messaging/_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def _configure_tls_parameters(parameters):
if cert and key:
_log.info('Authenticating with server using x509 (certfile: %s, keyfile: %s)',
cert, key)
parameters.credentials = pika.credentials.ExternalCredentials()
else:
cert, key = None, None

Expand Down
51 changes: 48 additions & 3 deletions fedora_messaging/tests/unit/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import mock
import pkg_resources
from pika import exceptions as pika_errs, URLParameters
from pika import exceptions as pika_errs, URLParameters, credentials
from jsonschema.exceptions import ValidationError as JSONValidationError

from fedora_messaging import _session, config
Expand Down Expand Up @@ -73,6 +73,31 @@ def test_publish_init_custom_url(self):
self.assertIsNotNone(publisher._parameters.ssl_options)
self.assertEqual(publisher._exchange, "test_exchange")

def test_plain_auth(self):
"""Assert when there's no key or certfile, plain authentication is used"""
with mock.patch.dict(config.conf, {'tls': self.tls_conf}):
publisher = _session.PublisherSession(
"amqps://username:password@rabbit.example.com/vhost",
"test_exchange",
)
self.assertIsInstance(
publisher._parameters.credentials, credentials.PlainCredentials)

def test_external_auth(self):
"""Assert when there's both a key and certfile, external auth is used"""
tls_conf = {
'keyfile': os.path.join(FIXTURES_DIR, 'key.pem'),
'certfile': os.path.join(FIXTURES_DIR, 'cert.pem'),
'ca_cert': os.path.join(FIXTURES_DIR, 'ca_bundle.pem'),
}
with mock.patch.dict(config.conf, {'tls': tls_conf}):
publisher = _session.PublisherSession(
"amqps://username:password@rabbit.example.com/vhost",
"test_exchange",
)
self.assertIsInstance(
publisher._parameters.credentials, credentials.ExternalCredentials)

def test_publish(self):
# Check that the publication works properly.
self.publisher.publish(self.message)
Expand Down Expand Up @@ -176,8 +201,26 @@ def setUp(self):
def tearDown(self):
self.consumer._shutdown()

def test_tls_parameters(self):
"""Assert TLS settings translate to a TLS connection for consumers."""
def test_plain_auth(self):
"""Assert when there's no key or certfile, plain authentication is used"""
tls_conf = {
'amqp_url': 'amqps://',
'tls': {
'keyfile': None,
'certfile': None,
'ca_cert': os.path.join(FIXTURES_DIR, 'ca_bundle.pem'),
}
}

with mock.patch.dict(config.conf, tls_conf):
consumer = _session.ConsumerSession()

self.assertTrue(consumer._parameters.ssl_options is not None)
self.assertIsInstance(
consumer._parameters.credentials, credentials.PlainCredentials)

def test_external_auth(self):
"""Assert when there's both a key and certfile, external auth is used"""
tls_conf = {
'amqp_url': 'amqps://',
'tls': {
Expand All @@ -191,6 +234,8 @@ def test_tls_parameters(self):
consumer = _session.ConsumerSession()

self.assertTrue(consumer._parameters.ssl_options is not None)
self.assertIsInstance(
consumer._parameters.credentials, credentials.ExternalCredentials)

def test_consume(self):
# Test the consume function.
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ blinker
click
jsonschema
pytoml
pika
pika>=0.12
six