diff --git a/kafka/client.py b/kafka/client.py index 13777a449..e77a7445a 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -28,12 +28,22 @@ class KafkaClient(object): # socket timeout. def __init__(self, hosts, client_id=CLIENT_ID, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS, - correlation_id=0): + correlation_id=0, + ca="/etc/ssl/certs/ca-bundle.crt", + certfile=None, + keyfile=None, + ssl=False, + verify_hostname=False): # We need one connection to bootstrap self.client_id = kafka_bytestring(client_id) self.timeout = timeout self.hosts = collect_hosts(hosts) self.correlation_id = correlation_id + self.ca = ca + self.certfile = certfile + self.keyfile = keyfile + self.ssl = ssl + self.verify_hostname = verify_hostname # create connections only when we need them self.conns = {} @@ -55,7 +65,12 @@ def _get_conn(self, host, port): self.conns[host_key] = KafkaConnection( host, port, - timeout=self.timeout + timeout=self.timeout, + ca=self.ca, + certfile=self.certfile, + keyfile=self.keyfile, + ssl=self.ssl, + verify_hostname=self.verify_hostname ) return self.conns[host_key] diff --git a/kafka/conn.py b/kafka/conn.py index bacbf17f9..5f4fe80c4 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -2,6 +2,7 @@ import logging from random import shuffle import socket +import ssl import struct import six @@ -52,12 +53,23 @@ class KafkaConnection(object): port: the port number the kafka broker is listening on timeout: default 120. The socket timeout for sending and receiving data in seconds. None means no timeout, so a request can block forever. + certfile: the SSL certificate file of the client (requires Kafka 0.9+) + keyfile: the SSL key file of the client (requires Kafka 0.9+) + ca: The SSL CA (requires Kafka 0.9+) + ssl: Use SSL when communicating with Kafka + verify_hostname: Whether or not to verify the server's hostname against its cert """ - def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): + def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS, + ssl=False, ca=None, certfile=None, keyfile=None, verify_hostname=False): super(KafkaConnection, self).__init__() self.host = host self.port = port self.timeout = timeout + self.ca = ca + self.certfile = certfile + self.keyfile = keyfile + self.ssl = ssl + self.verify_hostname = verify_hostname self._sock = None self.reinit() @@ -101,7 +113,7 @@ def _read_bytes(self, num_bytes): if data == b'': raise socket.error("Not enough data to read message -- did server kill socket?") - except socket.error: + except (socket.error, ssl.SSLError): log.exception('Unable to receive data from Kafka') self._raise_connection_error() @@ -134,7 +146,7 @@ def send(self, request_id, payload): try: self._sock.sendall(payload) - except socket.error: + except (socket.error, ssl.SSLError): log.exception('Unable to send payload to Kafka') self._raise_connection_error() @@ -171,6 +183,11 @@ def copy(self): c.host = copy.copy(self.host) c.port = copy.copy(self.port) c.timeout = copy.copy(self.timeout) + c.ca = copy.copy(self.ca) + c.certfile = copy.copy(self.certfile) + c.keyfile = copy.copy(self.keyfile) + c.ssl = copy.copy(self.ssl) + c.verify_hostname = copy.copy(self.verify_hostname) c._sock = None return c @@ -185,7 +202,7 @@ def close(self): # closed by the server try: self._sock.shutdown(socket.SHUT_RDWR) - except socket.error: + except (socket.error, ssl.SSLError): pass # Closing the socket should always succeed @@ -209,5 +226,24 @@ def reinit(self): try: self._sock = socket.create_connection((self.host, self.port), self.timeout) except socket.error: - log.exception('Unable to connect to kafka broker at %s:%d' % (self.host, self.port)) + log.exception('Unable to connect to Kafka broker at %s:%r', self.host, self.port) self._raise_connection_error() + + if self.ssl: + # Disallow use of SSLv2 and V3 (meaning we require TLSv1.0+) + context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) # pylint: disable=no-member + context.options |= ssl.OP_NO_SSLv2 # pylint: disable=no-member + context.options |= ssl.OP_NO_SSLv3 # pylint: disable=no-member + context.verify_mode = ssl.CERT_OPTIONAL + if self.verify_hostname: + context.check_hostname = True + if self.ca: + context.load_verify_locations(self.ca) + context.verify_mode = ssl.CERT_REQUIRED + if self.certfile and self.keyfile: + context.load_cert_chain(certfile=self.certfile, keyfile=self.keyfile) + try: + self._sock = context.wrap_socket(self._sock, server_hostname=self.host) + except ssl.SSLError: + log.exception('Unable to connect to Kafka broker at %s:%r over SSL', self.host, self.port) + self._raise_connection_error()