From 8ffdd1eaee1299a35853089e01d9eb16dc509fb7 Mon Sep 17 00:00:00 2001 From: Andre Araujo Date: Wed, 25 Oct 2017 14:04:59 -0700 Subject: [PATCH] Add security layer negotiation to the GSSAPI authentication. When trying to establish a connection with Kafka using SASL with the GSSAPI authentication mechanism the connection was hanging an timing out after 60 secons. On the Kafka broker side I noticed that the SaslServerAuthenticator was going from the AUTHENTICATE to the FAILED state. The GSSAPI auth implementation was missing the second handshake defined in RFC 2222, which happens after the security context is established. This handshake is used by the client and server to negotiate the security layer (QoP) to be used for the connection. Kafka currently only support the "auth" QoP, so the implementation in this commit doesn't make it configurable, but this can be extended later. With this change I was able to successfully connect to a Kerberos-enabled Kafka broker using the SASL_PLAINTEXT protocol and the GSSAPI mechanism. --- kafka/conn.py | 64 +++++++++++++++++++++++++++-------------- kafka/protocol/types.py | 17 +++++++---- 2 files changed, 53 insertions(+), 28 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 51a007c97..dbedd8faf 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -27,7 +27,7 @@ from kafka.protocol.commit import OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest from kafka.protocol.parser import KafkaProtocol -from kafka.protocol.types import Int32 +from kafka.protocol.types import Int32, Int8 from kafka.version import __version__ @@ -39,6 +39,10 @@ DEFAULT_KAFKA_PORT = 9092 +SASL_QOP_AUTH = 1 +SASL_QOP_AUTH_INT = 2 +SASL_QOP_AUTH_CONF = 4 + try: import ssl ssl_available = True @@ -547,43 +551,59 @@ def _try_authenticate_plain(self, future): return future.success(True) def _try_authenticate_gssapi(self, future): + auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.hostname gssapi_name = gssapi.Name( - self.config['sasl_kerberos_service_name'] + '@' + self.hostname, + auth_id, name_type=gssapi.NameType.hostbased_service ).canonicalize(gssapi.MechType.kerberos) log.debug('%s: GSSAPI name: %s', self, gssapi_name) - # Exchange tokens until authentication either succeeds or fails - client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate') - received_token = None + # Establish security context and negotiate protection level + # For reference RFC 2222, section 7.2.1 try: + # Exchange tokens until authentication either succeeds or fails + client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate') + received_token = None while not client_ctx.complete: # calculate an output token from kafka token (or None if first iteration) output_token = client_ctx.step(received_token) + # pass output token to kafka, or send empty response if the security + # context is complete (output token is None in that case) if output_token is None: - continue - - # pass output token to kafka - try: + self._send_bytes_blocking(Int32.encode(0)) + else: msg = output_token size = Int32.encode(len(msg)) self._send_bytes_blocking(size + msg) - # The server will send a token back. Processing of this token either - # establishes a security context, or it needs further token exchange. - # The gssapi will be able to identify the needed next step. - # The connection is closed on failure. - header = self._recv_bytes_blocking(4) - (token_size,) = struct.unpack('>i', header) - received_token = self._recv_bytes_blocking(token_size) - - except ConnectionError as e: - log.exception("%s: Error receiving reply from server", self) - error = Errors.ConnectionError("%s: %s" % (self, e)) - self.close(error=error) - return future.failure(error) + # The server will send a token back. Processing of this token either + # establishes a security context, or it needs further token exchange. + # The gssapi will be able to identify the needed next step. + # The connection is closed on failure. + header = self._recv_bytes_blocking(4) + (token_size,) = struct.unpack('>i', header) + received_token = self._recv_bytes_blocking(token_size) + + # Process the security layer negotiation token, sent by the server + # once the security context is established. + + # unwraps message containing supported protection levels and msg size + msg = client_ctx.unwrap(received_token).message + # Kafka currently doesn't support integrity or confidentiality security layers, so we + # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed + # by the server + msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(msg[0])) + msg[1:] + # add authorization identity to the response, GSS-wrap and send it + msg = client_ctx.wrap(msg + auth_id, False).message + size = Int32.encode(len(msg)) + self._send_bytes_blocking(size + msg) + except ConnectionError as e: + log.exception("%s: Error receiving reply from server", self) + error = Errors.ConnectionError("%s: %s" % (self, e)) + self.close(error=error) + return future.failure(error) except Exception as e: return future.failure(e) diff --git a/kafka/protocol/types.py b/kafka/protocol/types.py index c95bd6d74..56aa602dd 100644 --- a/kafka/protocol/types.py +++ b/kafka/protocol/types.py @@ -12,8 +12,13 @@ def _pack(f, value): raise ValueError(error) -def _unpack(f, data): +def _unpack(f, data, length=None): try: + if type(data) is not str: + assert length is not None, 'length is required when unpacking from stream' + data = data.read(length) + assert length is None or length == len(data), \ + 'data for unpacking is not of the expected length' (value,) = unpack(f, data) return value except error: @@ -27,7 +32,7 @@ def encode(cls, value): @classmethod def decode(cls, data): - return _unpack('>b', data.read(1)) + return _unpack('>b', data, 1) class Int16(AbstractType): @@ -37,7 +42,7 @@ def encode(cls, value): @classmethod def decode(cls, data): - return _unpack('>h', data.read(2)) + return _unpack('>h', data, 2) class Int32(AbstractType): @@ -47,7 +52,7 @@ def encode(cls, value): @classmethod def decode(cls, data): - return _unpack('>i', data.read(4)) + return _unpack('>i', data, 4) class Int64(AbstractType): @@ -57,7 +62,7 @@ def encode(cls, value): @classmethod def decode(cls, data): - return _unpack('>q', data.read(8)) + return _unpack('>q', data, 8) class String(AbstractType): @@ -110,7 +115,7 @@ def encode(cls, value): @classmethod def decode(cls, data): - return _unpack('>?', data.read(1)) + return _unpack('>?', data, 1) class Schema(AbstractType):