Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable SCRAM-SHA-256 and SCRAM-SHA-512 for sasl #1918

Merged
merged 46 commits into from
Dec 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
ce61dfa
add missing dev requirement 'mock'
Oct 1, 2019
b565067
extend KafkaFixture to support scram and plain sasl mechanisms
Oct 1, 2019
fd8f11c
write test for PLAIN sasl mechanism
Oct 1, 2019
67d5276
normalize KafkaFixture.get_clients .get_consumers and .get_producers
Oct 2, 2019
471029b
refactor KafkaFixture.get_clients .get_consumers and .get_producers
Oct 2, 2019
f0b0389
add get_topic_names to KafkaFixture
Oct 2, 2019
d6761bd
fix bug in KafkaFixture.create_topics
Oct 2, 2019
6077c6c
create function to turn special chars to underscore
Oct 2, 2019
37d100d
improve KafkaFixture.get_consumers
Oct 2, 2019
d297767
improve sasl tests and enable tests for scram
Oct 2, 2019
a97386d
implement scram
Oct 2, 2019
3c3c5a0
make sure to wait for scram credentials during kafka startup
Oct 2, 2019
9785c05
ensure compatibility with older python versions
Oct 3, 2019
0570d91
enable logging for tests
Oct 15, 2019
d7fe59b
add more output to SpawnedService
Oct 15, 2019
22b6190
add more checks if SpawnedService is actually running
Oct 15, 2019
84b57ed
switch to actual jaas config file for kafka
Oct 15, 2019
01f48d7
remove sasl mechanism settings from server.properties for 1.1.1
Oct 15, 2019
3f399d3
change scram_pattern in KafkaFixture
Oct 15, 2019
fecdc46
minor refactoring of least_loaded_node
Oct 15, 2019
9a99f95
update docstrings with information about new sasl mechanisms
Oct 15, 2019
60702c6
minor refactoring of test_sasl_integration.py
Oct 15, 2019
b61d68f
trying to make it run in travis
Oct 16, 2019
e641521
go back to raising runtime error in wait_for
Oct 16, 2019
720daa3
reduce kafka startup timeout and log config file content
Oct 16, 2019
de6a360
reduce tests to get quicker feedback
Oct 16, 2019
1f98ee4
output config file contents earlier
Oct 16, 2019
fa1ca01
further investiagte settings
Oct 16, 2019
e41c0be
try to get rid of emtpy log lines
Oct 16, 2019
7af9f97
move investigation to rendering function
Oct 16, 2019
0ef668e
stop on first failed test
Oct 16, 2019
65bb4fa
use strpath in render_template
Oct 16, 2019
082bb4e
check kafka.properties files for the `sasl` keyword
Oct 16, 2019
b1d1ee0
add another commit to kafka.properties to make sure changes are not o…
Oct 16, 2019
a832ae5
Revert "use strpath in render_template"
Oct 16, 2019
97778fc
limit cache to servers/dist
Oct 16, 2019
77ec1b3
set offsets.topic.replication.factor to 1 in all kafka.properties
Oct 16, 2019
8aca0ee
dump logs on sasl_kafka tear down
Oct 16, 2019
d5e9a0f
try shorter topic name for sasl test
Oct 17, 2019
a3aadf3
enable debug mode for kafka
Oct 17, 2019
c0b0437
refactor KafkaFixture._create_topics and include retry logic
Oct 17, 2019
ca3c304
fix bug in KafkaFixture._send_request
Oct 17, 2019
e780de8
another minor bugfix
Oct 17, 2019
1fe4c17
revert changes introduced for debugging
Oct 17, 2019
bce9e41
minor bugfix
Oct 17, 2019
54922a0
increase Kafka startup timeout again
Oct 17, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ addons:
cache:
directories:
- $HOME/.cache/pip
- servers/
- servers/dist
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!


before_install:
- source travis_java_install.sh
Expand Down
10 changes: 5 additions & 5 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ class KafkaAdminClient(object):
metric_group_prefix (str): Prefix for metric names. Default: ''
sasl_mechanism (str): Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
PLAIN, GSSAPI, OAUTHBEARER.
sasl_plain_username (str): username for sasl PLAIN authentication.
Required if sasl_mechanism is PLAIN.
sasl_plain_password (str): password for sasl PLAIN authentication.
Required if sasl_mechanism is PLAIN.
PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
Expand Down
15 changes: 6 additions & 9 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,11 @@ class KafkaClient(object):
metric_group_prefix (str): Prefix for metric names. Default: ''
sasl_mechanism (str): Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
PLAIN, GSSAPI, OAUTHBEARER.
sasl_plain_username (str): username for sasl PLAIN authentication.
Required if sasl_mechanism is PLAIN.
sasl_plain_password (str): password for sasl PLAIN authentication.
Required if sasl_mechanism is PLAIN.
PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
Expand Down Expand Up @@ -767,10 +767,7 @@ def least_loaded_node(self):
inflight = curr_inflight
found = node_id

if found is not None:
return found

return None
return found
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive-by!


def set_topics(self, topics):
"""Set specific topics to track for metadata.
Expand Down
147 changes: 136 additions & 11 deletions kafka/conn.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from __future__ import absolute_import, division

import collections
import base64
import copy
import errno
import hashlib
import hmac
import io
import logging
from random import shuffle, uniform

from uuid import uuid4

# selectors in stdlib as of py3.4
try:
import selectors # pylint: disable=import-error
Expand All @@ -16,7 +20,6 @@

import socket
import struct
import sys
import threading
import time

Expand All @@ -39,6 +42,12 @@
TimeoutError = socket.error
BlockingIOError = Exception

def xor_bytes(left, right):
return bytearray(ord(lb) ^ ord(rb) for lb, rb in zip(left, right))
else:
def xor_bytes(left, right):
return bytes(lb ^ rb for lb, rb in zip(left, right))

log = logging.getLogger(__name__)

DEFAULT_KAFKA_PORT = 9092
Expand Down Expand Up @@ -98,6 +107,69 @@ class ConnectionStates(object):
AUTHENTICATING = '<authenticating>'


class ScramClient:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on moving this to its own file ?

MECHANISMS = {
'SCRAM-SHA-256': hashlib.sha256,
'SCRAM-SHA-512': hashlib.sha512
}

def __init__(self, user, password, mechanism):
self.nonce = str(uuid4()).replace('-', '')
self.auth_message = ''
self.salted_password = None
self.user = user
self.password = password.encode()
self.hashfunc = self.MECHANISMS[mechanism]
self.hashname = ''.join(mechanism.lower().split('-')[1:3])
self.stored_key = None
self.client_key = None
self.client_signature = None
self.client_proof = None
self.server_key = None
self.server_signature = None

def first_message(self):
client_first_bare = 'n={},r={}'.format(self.user, self.nonce)
self.auth_message += client_first_bare
return 'n,,' + client_first_bare

def process_server_first_message(self, server_first_message):
self.auth_message += ',' + server_first_message
params = dict(pair.split('=', 1) for pair in server_first_message.split(','))
server_nonce = params['r']
if not server_nonce.startswith(self.nonce):
raise ValueError("Server nonce, did not start with client nonce!")
self.nonce = server_nonce
self.auth_message += ',c=biws,r=' + self.nonce

salt = base64.b64decode(params['s'].encode())
iterations = int(params['i'])
self.create_salted_password(salt, iterations)

self.client_key = self.hmac(self.salted_password, b'Client Key')
self.stored_key = self.hashfunc(self.client_key).digest()
self.client_signature = self.hmac(self.stored_key, self.auth_message.encode())
self.client_proof = xor_bytes(self.client_key, self.client_signature)
self.server_key = self.hmac(self.salted_password, b'Server Key')
self.server_signature = self.hmac(self.server_key, self.auth_message.encode())

def hmac(self, key, msg):
return hmac.new(key, msg, digestmod=self.hashfunc).digest()

def create_salted_password(self, salt, iterations):
self.salted_password = hashlib.pbkdf2_hmac(
self.hashname, self.password, salt, iterations
)

def final_message(self):
client_final_no_proof = 'c=biws,r=' + self.nonce
return 'c=biws,r={},p={}'.format(self.nonce, base64.b64encode(self.client_proof).decode())

def process_server_final_message(self, server_final_message):
params = dict(pair.split('=', 1) for pair in server_final_message.split(','))
if self.server_signature != base64.b64decode(params['v'].encode()):
raise ValueError("Server sent wrong signature!")

class BrokerConnection(object):
"""Initialize a Kafka broker connection

Expand Down Expand Up @@ -177,11 +249,11 @@ class BrokerConnection(object):
metric_group_prefix (str): Prefix for metric names. Default: ''
sasl_mechanism (str): Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
PLAIN, GSSAPI, OAUTHBEARER.
sasl_plain_username (str): username for sasl PLAIN authentication.
Required if sasl_mechanism is PLAIN.
sasl_plain_password (str): password for sasl PLAIN authentication.
Required if sasl_mechanism is PLAIN.
PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
Expand Down Expand Up @@ -224,7 +296,7 @@ class BrokerConnection(object):
'sasl_oauth_token_provider': None
}
SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER')
SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512")

def __init__(self, host, port, afi, **configs):
self.host = host
Expand Down Expand Up @@ -259,9 +331,13 @@ def __init__(self, host, port, afi, **configs):
if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'):
assert self.config['sasl_mechanism'] in self.SASL_MECHANISMS, (
'sasl_mechanism must be in ' + ', '.join(self.SASL_MECHANISMS))
if self.config['sasl_mechanism'] == 'PLAIN':
assert self.config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl'
assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
if self.config['sasl_mechanism'] in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'):
assert self.config['sasl_plain_username'] is not None, (
'sasl_plain_username required for PLAIN or SCRAM sasl'
)
assert self.config['sasl_plain_password'] is not None, (
'sasl_plain_password required for PLAIN or SCRAM sasl'
)
if self.config['sasl_mechanism'] == 'GSSAPI':
assert gssapi is not None, 'GSSAPI lib not available'
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'
Expand Down Expand Up @@ -552,6 +628,8 @@ def _handle_sasl_handshake_response(self, future, response):
return self._try_authenticate_gssapi(future)
elif self.config['sasl_mechanism'] == 'OAUTHBEARER':
return self._try_authenticate_oauth(future)
elif self.config['sasl_mechanism'].startswith("SCRAM-SHA-"):
return self._try_authenticate_scram(future)
else:
return future.failure(
Errors.UnsupportedSaslMechanismError(
Expand Down Expand Up @@ -652,6 +730,53 @@ def _try_authenticate_plain(self, future):
log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username'])
return future.success(True)

def _try_authenticate_scram(self, future):
if self.config['security_protocol'] == 'SASL_PLAINTEXT':
log.warning('%s: Exchanging credentials in the clear', self)

scram_client = ScramClient(
self.config['sasl_plain_username'], self.config['sasl_plain_password'], self.config['sasl_mechanism']
)

err = None
close = False
with self._lock:
if not self._can_send_recv():
err = Errors.NodeNotReadyError(str(self))
close = False
else:
try:
client_first = scram_client.first_message().encode()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we specify an encoding explicitly? is 'utf-8' always the default? does it matter?

size = Int32.encode(len(client_first))
self._send_bytes_blocking(size + client_first)

(data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4))
server_first = self._recv_bytes_blocking(data_len).decode()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly re: codec. would prefer to specific 'utf-8' explicitly

scram_client.process_server_first_message(server_first)

client_final = scram_client.final_message().encode()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here - utf-8

size = Int32.encode(len(client_final))
self._send_bytes_blocking(size + client_final)

(data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4))
server_final = self._recv_bytes_blocking(data_len).decode()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and finally also here

scram_client.process_server_final_message(server_final)

except (ConnectionError, TimeoutError) as e:
log.exception("%s: Error receiving reply from server", self)
err = Errors.KafkaConnectionError("%s: %s" % (self, e))
close = True

if err is not None:
if close:
self.close(error=err)
return future.failure(err)

log.info(
'%s: Authenticated as %s via %s', self, self.config['sasl_plain_username'], self.config['sasl_mechanism']
)
return future.success(True)

def _try_authenticate_gssapi(self, future):
kerberos_damin_name = self.config['sasl_kerberos_domain_name'] or self.host
auth_id = self.config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name
Expand Down
10 changes: 5 additions & 5 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,11 @@ class KafkaConsumer(six.Iterator):
subscribing to it. Requires 0.10+ Default: True
sasl_mechanism (str): Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
PLAIN, GSSAPI, OAUTHBEARER.
sasl_plain_username (str): Username for sasl PLAIN authentication.
Required if sasl_mechanism is PLAIN.
sasl_plain_password (str): Password for sasl PLAIN authentication.
Required if sasl_mechanism is PLAIN.
PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
Expand Down
10 changes: 5 additions & 5 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,11 @@ class KafkaProducer(object):
Default: selectors.DefaultSelector
sasl_mechanism (str): Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
PLAIN, GSSAPI, OAUTHBEARER.
sasl_plain_username (str): username for sasl PLAIN authentication.
Required if sasl_mechanism is PLAIN.
sasl_plain_password (str): password for sasl PLAIN authentication.
Required if sasl_mechanism is PLAIN.
PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ lz4==2.1.2
xxhash==1.3.0
python-snappy==0.5.3
tox==3.5.3
mock==3.0.5
pylint==1.9.3
pytest-pylint==0.12.3
pytest-mock==1.10.0
Expand Down
4 changes: 3 additions & 1 deletion servers/0.10.0.0/resources/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}

{sasl_config}

ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
Expand Down Expand Up @@ -121,7 +123,7 @@ log.cleaner.enable=false
# tune down offset topics to reduce setup time in tests
offsets.commit.timeout.ms=500
offsets.topic.num.partitions=2
offsets.topic.replication.factor=2
offsets.topic.replication.factor=1

# Allow shorter session timeouts for tests
group.min.session.timeout.ms=1000
Expand Down
4 changes: 4 additions & 0 deletions servers/0.10.0.0/resources/kafka_server_jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
KafkaServer {{
{jaas_config}
}};
Client {{}};
4 changes: 3 additions & 1 deletion servers/0.10.0.1/resources/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}

{sasl_config}

ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
Expand Down Expand Up @@ -121,7 +123,7 @@ log.cleaner.enable=false
# tune down offset topics to reduce setup time in tests
offsets.commit.timeout.ms=500
offsets.topic.num.partitions=2
offsets.topic.replication.factor=2
offsets.topic.replication.factor=1

# Allow shorter session timeouts for tests
group.min.session.timeout.ms=1000
Expand Down
4 changes: 4 additions & 0 deletions servers/0.10.0.1/resources/kafka_server_jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
KafkaServer {{
{jaas_config}
}};
Client {{}};
4 changes: 3 additions & 1 deletion servers/0.10.1.1/resources/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ broker.id={broker_id}
listeners={transport}://{host}:{port}
security.inter.broker.protocol={transport}

{sasl_config}

ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks
ssl.keystore.password=foobar
ssl.key.password=foobar
Expand Down Expand Up @@ -121,7 +123,7 @@ log.cleaner.enable=false
# tune down offset topics to reduce setup time in tests
offsets.commit.timeout.ms=500
offsets.topic.num.partitions=2
offsets.topic.replication.factor=2
offsets.topic.replication.factor=1

# Allow shorter session timeouts for tests
group.min.session.timeout.ms=1000
Expand Down
4 changes: 4 additions & 0 deletions servers/0.10.1.1/resources/kafka_server_jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
KafkaServer {{
{jaas_config}
}};
Client {{}};
Loading