Skip to content

Commit

Permalink
ssl: Support using client-side SSL certificates, closes dpkp#253 and d…
Browse files Browse the repository at this point in the history
…pkp#447

Kafka 0.9.0+ supports using SSL encrypted connections authenticated
with client certificates. This adds support for these. After this
when using SSL/TLS for Kafka broker connections,
TLSv1.0+ is used for authentication, protocols earlier than that are
deprecated from the get go as insecure.

If keyfile/certificate are left empty, the client can still use
SSL/TLS for the transport, but in that case the server must be
configured to accept unauthenticated users.
  • Loading branch information
Ormod committed Nov 16, 2015
1 parent 35c089b commit fa2c0fc
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 7 deletions.
19 changes: 17 additions & 2 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,22 @@ class KafkaClient(object):
# socket timeout.
def __init__(self, hosts, client_id=CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS,
correlation_id=0):
correlation_id=0,
ca="/etc/ssl/certs/ca-bundle.crt",
certfile=None,
keyfile=None,
ssl=False,
verify_hostname=False):
# We need one connection to bootstrap
self.client_id = kafka_bytestring(client_id)
self.timeout = timeout
self.hosts = collect_hosts(hosts)
self.correlation_id = correlation_id
self.ca = ca
self.certfile = certfile
self.keyfile = keyfile
self.ssl = ssl
self.verify_hostname = verify_hostname

# create connections only when we need them
self.conns = {}
Expand All @@ -55,7 +65,12 @@ def _get_conn(self, host, port):
self.conns[host_key] = KafkaConnection(
host,
port,
timeout=self.timeout
timeout=self.timeout,
ca=self.ca,
certfile=self.certfile,
keyfile=self.keyfile,
ssl=self.ssl,
verify_hostname=self.verify_hostname
)

return self.conns[host_key]
Expand Down
46 changes: 41 additions & 5 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
from random import shuffle
import socket
import ssl
import struct

import six
Expand Down Expand Up @@ -52,12 +53,23 @@ class KafkaConnection(object):
port: the port number the kafka broker is listening on
timeout: default 120. The socket timeout for sending and receiving data
in seconds. None means no timeout, so a request can block forever.
certfile: the SSL certificate file of the client (requires Kafka 0.9+)
keyfile: the SSL key file of the client (requires Kafka 0.9+)
ca: The SSL CA (requires Kafka 0.9+)
ssl: Use SSL when communicating with Kafka
verify_hostname: Whether or not to verify the server's hostname against its cert
"""
def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS,
ssl=False, ca=None, certfile=None, keyfile=None, verify_hostname=False):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
self.timeout = timeout
self.ca = ca
self.certfile = certfile
self.keyfile = keyfile
self.ssl = ssl
self.verify_hostname = verify_hostname
self._sock = None

self.reinit()
Expand Down Expand Up @@ -101,7 +113,7 @@ def _read_bytes(self, num_bytes):
if data == b'':
raise socket.error("Not enough data to read message -- did server kill socket?")

except socket.error:
except (socket.error, ssl.SSLError):
log.exception('Unable to receive data from Kafka')
self._raise_connection_error()

Expand Down Expand Up @@ -134,7 +146,7 @@ def send(self, request_id, payload):

try:
self._sock.sendall(payload)
except socket.error:
except (socket.error, ssl.SSLError):
log.exception('Unable to send payload to Kafka')
self._raise_connection_error()

Expand Down Expand Up @@ -171,6 +183,11 @@ def copy(self):
c.host = copy.copy(self.host)
c.port = copy.copy(self.port)
c.timeout = copy.copy(self.timeout)
c.ca = copy.copy(self.ca)
c.certfile = copy.copy(self.certfile)
c.keyfile = copy.copy(self.keyfile)
c.ssl = copy.copy(self.ssl)
c.verify_hostname = copy.copy(self.verify_hostname)
c._sock = None
return c

Expand All @@ -185,7 +202,7 @@ def close(self):
# closed by the server
try:
self._sock.shutdown(socket.SHUT_RDWR)
except socket.error:
except (socket.error, ssl.SSLError):
pass

# Closing the socket should always succeed
Expand All @@ -209,5 +226,24 @@ def reinit(self):
try:
self._sock = socket.create_connection((self.host, self.port), self.timeout)
except socket.error:
log.exception('Unable to connect to kafka broker at %s:%d' % (self.host, self.port))
log.exception('Unable to connect to Kafka broker at %s:%r', self.host, self.port)
self._raise_connection_error()

if self.ssl:
# Disallow use of SSLv2 and V3 (meaning we require TLSv1.0+)
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) # pylint: disable=no-member
context.options |= ssl.OP_NO_SSLv2 # pylint: disable=no-member
context.options |= ssl.OP_NO_SSLv3 # pylint: disable=no-member
context.verify_mode = ssl.CERT_OPTIONAL
if self.verify_hostname:
context.check_hostname = True
if self.ca:
context.load_verify_locations(self.ca)
context.verify_mode = ssl.CERT_REQUIRED
if self.certfile and self.keyfile:
context.load_cert_chain(certfile=self.certfile, keyfile=self.keyfile)
try:
self._sock = context.wrap_socket(self._sock, server_hostname=self.host)
except ssl.SSLError:
log.exception('Unable to connect to Kafka broker at %s:%r over SSL', self.host, self.port)
self._raise_connection_error()

0 comments on commit fa2c0fc

Please sign in to comment.