Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ssl: Support using client-side SSL certificates, closes #253 and #447 #4

Merged
merged 1 commit into from
Nov 17, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,22 @@ class KafkaClient(object):
# socket timeout.
def __init__(self, hosts, client_id=CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS,
correlation_id=0):
correlation_id=0,
ca="/etc/ssl/certs/ca-bundle.crt",
certfile=None,
keyfile=None,
ssl=False,
verify_hostname=False):
# We need one connection to bootstrap
self.client_id = kafka_bytestring(client_id)
self.timeout = timeout
self.hosts = collect_hosts(hosts)
self.correlation_id = correlation_id
self.ca = ca
self.certfile = certfile
self.keyfile = keyfile
self.ssl = ssl
self.verify_hostname = verify_hostname

# create connections only when we need them
self.conns = {}
Expand All @@ -55,7 +65,12 @@ def _get_conn(self, host, port):
self.conns[host_key] = KafkaConnection(
host,
port,
timeout=self.timeout
timeout=self.timeout,
ca=self.ca,
certfile=self.certfile,
keyfile=self.keyfile,
ssl=self.ssl,
verify_hostname=self.verify_hostname
)

return self.conns[host_key]
Expand Down
46 changes: 41 additions & 5 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
from random import shuffle
import socket
import ssl
import struct

import six
Expand Down Expand Up @@ -52,12 +53,23 @@ class KafkaConnection(object):
port: the port number the kafka broker is listening on
timeout: default 120. The socket timeout for sending and receiving data
in seconds. None means no timeout, so a request can block forever.
certfile: the SSL certificate file of the client (requires Kafka 0.9+)
keyfile: the SSL key file of the client (requires Kafka 0.9+)
ca: The SSL CA (requires Kafka 0.9+)
ssl: Use SSL when communicating with Kafka
verify_hostname: Whether or not to verify the server's hostname against its cert
"""
def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS,
ssl=False, ca=None, certfile=None, keyfile=None, verify_hostname=False):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
self.timeout = timeout
self.ca = ca
self.certfile = certfile
self.keyfile = keyfile
self.ssl = ssl
self.verify_hostname = verify_hostname
self._sock = None

self.reinit()
Expand Down Expand Up @@ -101,7 +113,7 @@ def _read_bytes(self, num_bytes):
if data == b'':
raise socket.error("Not enough data to read message -- did server kill socket?")

except socket.error:
except (socket.error, ssl.SSLError):
log.exception('Unable to receive data from Kafka')
self._raise_connection_error()

Expand Down Expand Up @@ -134,7 +146,7 @@ def send(self, request_id, payload):

try:
self._sock.sendall(payload)
except socket.error:
except (socket.error, ssl.SSLError):
log.exception('Unable to send payload to Kafka')
self._raise_connection_error()

Expand Down Expand Up @@ -171,6 +183,11 @@ def copy(self):
c.host = copy.copy(self.host)
c.port = copy.copy(self.port)
c.timeout = copy.copy(self.timeout)
c.ca = copy.copy(self.ca)
c.certfile = copy.copy(self.certfile)
c.keyfile = copy.copy(self.keyfile)
c.ssl = copy.copy(self.ssl)
c.verify_hostname = copy.copy(self.verify_hostname)
c._sock = None
return c

Expand All @@ -185,7 +202,7 @@ def close(self):
# closed by the server
try:
self._sock.shutdown(socket.SHUT_RDWR)
except socket.error:
except (socket.error, ssl.SSLError):
pass

# Closing the socket should always succeed
Expand All @@ -209,5 +226,24 @@ def reinit(self):
try:
self._sock = socket.create_connection((self.host, self.port), self.timeout)
except socket.error:
log.exception('Unable to connect to kafka broker at %s:%d' % (self.host, self.port))
log.exception('Unable to connect to Kafka broker at %s:%r', self.host, self.port)
self._raise_connection_error()

if self.ssl:
# Disallow use of SSLv2 and V3 (meaning we require TLSv1.0+)
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) # pylint: disable=no-member
context.options |= ssl.OP_NO_SSLv2 # pylint: disable=no-member
context.options |= ssl.OP_NO_SSLv3 # pylint: disable=no-member
context.verify_mode = ssl.CERT_OPTIONAL
if self.verify_hostname:
context.check_hostname = True
if self.ca:
context.load_verify_locations(self.ca)
context.verify_mode = ssl.CERT_REQUIRED
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setting verify_mode based on client certificate usage doesn't look right. you probably want to set it if a ca certificate was given instead.

if self.certfile and self.keyfile:
context.load_cert_chain(certfile=self.certfile, keyfile=self.keyfile)
try:
self._sock = context.wrap_socket(self._sock, server_hostname=self.host)
except ssl.SSLError:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should catch socket.error here, too. but I'd rather rearrange the code a bit to always use the common socket.create_connection call (below) and then wrap that socket with ssl if ssl was enabled. It should simplify things a bit.

log.exception('Unable to connect to Kafka broker at %s:%r over SSL', self.host, self.port)
self._raise_connection_error()