Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add security layer negotiation to the GSSAPI authentication. #1283

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 42 additions & 22 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from kafka.protocol.commit import OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.parser import KafkaProtocol
from kafka.protocol.types import Int32
from kafka.protocol.types import Int32, Int8
from kafka.version import __version__


Expand All @@ -39,6 +39,10 @@

DEFAULT_KAFKA_PORT = 9092

SASL_QOP_AUTH = 1
SASL_QOP_AUTH_INT = 2
SASL_QOP_AUTH_CONF = 4

try:
import ssl
ssl_available = True
Expand Down Expand Up @@ -547,43 +551,59 @@ def _try_authenticate_plain(self, future):
return future.success(True)

def _try_authenticate_gssapi(self, future):
auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.hostname
gssapi_name = gssapi.Name(
self.config['sasl_kerberos_service_name'] + '@' + self.hostname,
auth_id,
name_type=gssapi.NameType.hostbased_service
).canonicalize(gssapi.MechType.kerberos)
log.debug('%s: GSSAPI name: %s', self, gssapi_name)

# Exchange tokens until authentication either succeeds or fails
client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate')
received_token = None
# Establish security context and negotiate protection level
# For reference RFC 2222, section 7.2.1
try:
# Exchange tokens until authentication either succeeds or fails
client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate')
received_token = None
while not client_ctx.complete:
# calculate an output token from kafka token (or None if first iteration)
output_token = client_ctx.step(received_token)

# pass output token to kafka, or send empty response if the security
# context is complete (output token is None in that case)
if output_token is None:
continue

# pass output token to kafka
try:
self._send_bytes_blocking(Int32.encode(0))
else:
msg = output_token
size = Int32.encode(len(msg))
self._send_bytes_blocking(size + msg)

# The server will send a token back. Processing of this token either
# establishes a security context, or it needs further token exchange.
# The gssapi will be able to identify the needed next step.
# The connection is closed on failure.
header = self._recv_bytes_blocking(4)
(token_size,) = struct.unpack('>i', header)
received_token = self._recv_bytes_blocking(token_size)

except ConnectionError as e:
log.exception("%s: Error receiving reply from server", self)
error = Errors.ConnectionError("%s: %s" % (self, e))
self.close(error=error)
return future.failure(error)
# The server will send a token back. Processing of this token either
# establishes a security context, or it needs further token exchange.
# The gssapi will be able to identify the needed next step.
# The connection is closed on failure.
header = self._recv_bytes_blocking(4)
(token_size,) = struct.unpack('>i', header)
received_token = self._recv_bytes_blocking(token_size)

# Process the security layer negotiation token, sent by the server
# once the security context is established.

# unwraps message containing supported protection levels and msg size
msg = client_ctx.unwrap(received_token).message
# Kafka currently doesn't support integrity or confidentiality security layers, so we
# simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
# by the server
msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(msg[0])) + msg[1:]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my only concern is that this single call to decode() is requiring a very big change to the low level protocol decoding that will touch everything. And while I agree that the protocol decoding probably should not require BytesIO (or some other stream), I am not excited about including that protocol change here. How about Int8.decode(BytesIO(msg[0])) ?

# add authorization identity to the response, GSS-wrap and send it
msg = client_ctx.wrap(msg + auth_id, False).message
size = Int32.encode(len(msg))
self._send_bytes_blocking(size + msg)

except ConnectionError as e:
log.exception("%s: Error receiving reply from server", self)
error = Errors.ConnectionError("%s: %s" % (self, e))
self.close(error=error)
return future.failure(error)
except Exception as e:
return future.failure(e)

Expand Down
17 changes: 11 additions & 6 deletions kafka/protocol/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@ def _pack(f, value):
raise ValueError(error)


def _unpack(f, data):
def _unpack(f, data, length=None):
try:
if type(data) is not str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What aboutif not isinstance(data, str) rather than limiting yourself with type()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I'll change that. Thanks!

assert length is not None, 'length is required when unpacking from stream'
data = data.read(length)
assert length is None or length == len(data), \
'data for unpacking is not of the expected length'
(value,) = unpack(f, data)
return value
except error:
Expand All @@ -27,7 +32,7 @@ def encode(cls, value):

@classmethod
def decode(cls, data):
return _unpack('>b', data.read(1))
return _unpack('>b', data, 1)


class Int16(AbstractType):
Expand All @@ -37,7 +42,7 @@ def encode(cls, value):

@classmethod
def decode(cls, data):
return _unpack('>h', data.read(2))
return _unpack('>h', data, 2)


class Int32(AbstractType):
Expand All @@ -47,7 +52,7 @@ def encode(cls, value):

@classmethod
def decode(cls, data):
return _unpack('>i', data.read(4))
return _unpack('>i', data, 4)


class Int64(AbstractType):
Expand All @@ -57,7 +62,7 @@ def encode(cls, value):

@classmethod
def decode(cls, data):
return _unpack('>q', data.read(8))
return _unpack('>q', data, 8)


class String(AbstractType):
Expand Down Expand Up @@ -110,7 +115,7 @@ def encode(cls, value):

@classmethod
def decode(cls, data):
return _unpack('>?', data.read(1))
return _unpack('>?', data, 1)


class Schema(AbstractType):
Expand Down