From ce61dfad83af421a959bfe32f68f82a3703a2eda Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 1 Oct 2019 08:20:45 +0200 Subject: [PATCH 01/46] add missing dev requirement 'mock' --- requirements-dev.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements-dev.txt b/requirements-dev.txt index cb0bbe5a6..d2830905b 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -8,6 +8,7 @@ lz4==2.1.2 xxhash==1.3.0 python-snappy==0.5.3 tox==3.5.3 +mock==3.0.5 pylint==1.9.3 pytest-pylint==0.12.3 pytest-mock==1.10.0 From b5650677177d8a58a0ec9d74e054859b07012aec Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 1 Oct 2019 15:29:18 +0200 Subject: [PATCH 02/46] extend KafkaFixture to support scram and plain sasl mechanisms --- servers/0.10.0.0/resources/kafka.properties | 1 + servers/0.10.0.1/resources/kafka.properties | 1 + servers/0.10.1.1/resources/kafka.properties | 1 + servers/0.10.2.1/resources/kafka.properties | 1 + servers/0.10.2.2/resources/kafka.properties | 1 + servers/0.11.0.0/resources/kafka.properties | 1 + servers/0.11.0.1/resources/kafka.properties | 1 + servers/0.11.0.2/resources/kafka.properties | 1 + servers/0.11.0.3/resources/kafka.properties | 1 + servers/1.0.0/resources/kafka.properties | 1 + servers/1.0.1/resources/kafka.properties | 1 + servers/1.0.2/resources/kafka.properties | 1 + servers/1.1.0/resources/kafka.properties | 1 + servers/1.1.1/resources/kafka.properties | 1 + servers/2.0.0/resources/kafka.properties | 1 + servers/2.0.1/resources/kafka.properties | 1 + servers/2.1.0/resources/kafka.properties | 1 + servers/2.1.1/resources/kafka.properties | 1 + servers/2.2.0/resources/kafka.properties | 1 + servers/2.2.1/resources/kafka.properties | 1 + servers/2.3.0/resources/kafka.properties | 1 + test/fixtures.py | 58 +++++++++++++++++++++ test/test_sasl_integration.py | 56 ++++++++++++++++++++ 23 files changed, 135 insertions(+) create mode 100644 test/test_sasl_integration.py diff --git a/servers/0.10.0.0/resources/kafka.properties b/servers/0.10.0.0/resources/kafka.properties index 7d8e2b1f0..074797523 100644 --- a/servers/0.10.0.0/resources/kafka.properties +++ b/servers/0.10.0.0/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/servers/0.10.0.1/resources/kafka.properties b/servers/0.10.0.1/resources/kafka.properties index 7d8e2b1f0..074797523 100644 --- a/servers/0.10.0.1/resources/kafka.properties +++ b/servers/0.10.0.1/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/servers/0.10.1.1/resources/kafka.properties b/servers/0.10.1.1/resources/kafka.properties index 7d8e2b1f0..074797523 100644 --- a/servers/0.10.1.1/resources/kafka.properties +++ b/servers/0.10.1.1/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/servers/0.10.2.1/resources/kafka.properties b/servers/0.10.2.1/resources/kafka.properties index 7d8e2b1f0..074797523 100644 --- a/servers/0.10.2.1/resources/kafka.properties +++ b/servers/0.10.2.1/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/servers/0.10.2.2/resources/kafka.properties b/servers/0.10.2.2/resources/kafka.properties index 7d8e2b1f0..074797523 100644 --- a/servers/0.10.2.2/resources/kafka.properties +++ b/servers/0.10.2.2/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/servers/0.11.0.0/resources/kafka.properties b/servers/0.11.0.0/resources/kafka.properties index 630dbc5fa..38f751dd0 100644 --- a/servers/0.11.0.0/resources/kafka.properties +++ b/servers/0.11.0.0/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/servers/0.11.0.1/resources/kafka.properties b/servers/0.11.0.1/resources/kafka.properties index 630dbc5fa..38f751dd0 100644 --- a/servers/0.11.0.1/resources/kafka.properties +++ b/servers/0.11.0.1/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/servers/0.11.0.2/resources/kafka.properties b/servers/0.11.0.2/resources/kafka.properties index 630dbc5fa..38f751dd0 100644 --- a/servers/0.11.0.2/resources/kafka.properties +++ b/servers/0.11.0.2/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/servers/0.11.0.3/resources/kafka.properties b/servers/0.11.0.3/resources/kafka.properties index 630dbc5fa..38f751dd0 100644 --- a/servers/0.11.0.3/resources/kafka.properties +++ b/servers/0.11.0.3/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/servers/1.0.0/resources/kafka.properties b/servers/1.0.0/resources/kafka.properties index 630dbc5fa..38f751dd0 100644 --- a/servers/1.0.0/resources/kafka.properties +++ b/servers/1.0.0/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/servers/1.0.1/resources/kafka.properties b/servers/1.0.1/resources/kafka.properties index 630dbc5fa..38f751dd0 100644 --- a/servers/1.0.1/resources/kafka.properties +++ b/servers/1.0.1/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/servers/1.0.2/resources/kafka.properties b/servers/1.0.2/resources/kafka.properties index 630dbc5fa..38f751dd0 100644 --- a/servers/1.0.2/resources/kafka.properties +++ b/servers/1.0.2/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/servers/1.1.0/resources/kafka.properties b/servers/1.1.0/resources/kafka.properties index 630dbc5fa..38f751dd0 100644 --- a/servers/1.1.0/resources/kafka.properties +++ b/servers/1.1.0/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/servers/1.1.1/resources/kafka.properties b/servers/1.1.1/resources/kafka.properties index fe6a89f4a..7770909bb 100644 --- a/servers/1.1.1/resources/kafka.properties +++ b/servers/1.1.1/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/servers/2.0.0/resources/kafka.properties b/servers/2.0.0/resources/kafka.properties index 630dbc5fa..38f751dd0 100644 --- a/servers/2.0.0/resources/kafka.properties +++ b/servers/2.0.0/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/servers/2.0.1/resources/kafka.properties b/servers/2.0.1/resources/kafka.properties index 630dbc5fa..38f751dd0 100644 --- a/servers/2.0.1/resources/kafka.properties +++ b/servers/2.0.1/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/servers/2.1.0/resources/kafka.properties b/servers/2.1.0/resources/kafka.properties index 630dbc5fa..38f751dd0 100644 --- a/servers/2.1.0/resources/kafka.properties +++ b/servers/2.1.0/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/servers/2.1.1/resources/kafka.properties b/servers/2.1.1/resources/kafka.properties index 630dbc5fa..38f751dd0 100644 --- a/servers/2.1.1/resources/kafka.properties +++ b/servers/2.1.1/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/servers/2.2.0/resources/kafka.properties b/servers/2.2.0/resources/kafka.properties index 630dbc5fa..38f751dd0 100644 --- a/servers/2.2.0/resources/kafka.properties +++ b/servers/2.2.0/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/servers/2.2.1/resources/kafka.properties b/servers/2.2.1/resources/kafka.properties index 630dbc5fa..38f751dd0 100644 --- a/servers/2.2.1/resources/kafka.properties +++ b/servers/2.2.1/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/servers/2.3.0/resources/kafka.properties b/servers/2.3.0/resources/kafka.properties index 630dbc5fa..38f751dd0 100644 --- a/servers/2.3.0/resources/kafka.properties +++ b/servers/2.3.0/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} +{sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks ssl.keystore.password=foobar diff --git a/test/fixtures.py b/test/fixtures.py index 557fca699..a28530fac 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -233,6 +233,9 @@ def __del__(self): class KafkaFixture(Fixture): + broker_user = 'alice' + broker_password = 'alice-secret' + @classmethod def instance(cls, broker_id, zookeeper, zk_chroot=None, host=None, port=None, @@ -272,6 +275,7 @@ def __init__(self, host, port, broker_id, zookeeper, zk_chroot, self.auto_create_topic = auto_create_topic self.transport = transport.upper() self.sasl_mechanism = sasl_mechanism.upper() + self.sasl_config = self._sasl_config() self.ssl_dir = self.test_resource('ssl') # TODO: checking for port connection would be better than scanning logs @@ -293,6 +297,60 @@ def __init__(self, host, port, broker_id, zookeeper, zk_chroot, self._client = None + def _sasl_config(self): + if 'SASL' not in self.transport: + return '' + + sasl_config = "sasl.enabled.mechanisms={mechanism}\n" + sasl_config += "sasl.mechanism.inter.broker.protocol={mechanism}\n" + if self.sasl_mechanism == 'PLAIN': + sasl_config += ( + "listener.name.{transport_lower}.plain.sasl.jaas.config=" + + "org.apache.kafka.common.security.plain.PlainLoginModule " + + 'required username="{user}" password="{password}" user_alice="{password}";\n' + ) + elif self.sasl_mechanism in ("SCRAM-SHA-256", "SCRAM-SHA-512"): + sasl_config += ( + "listener.name.{transport_lower}.{mechanism_lower}.sasl.jaas.config=" + + "org.apache.kafka.common.security.scram.ScramLoginModule " + + 'required username="{user}" password="{password}";\n' + ) + # add user to zookeeper for the first server + if self.broker_id == 0: + self._add_scram_user() + else: + raise ValueError("SASL mechanism {} currently not supported".format(self.sasl_mechanism)) + return sasl_config.format( + transport=self.transport, transport_lower=self.transport.lower(), + mechanism=self.sasl_mechanism, mechanism_lower=self.sasl_mechanism.lower(), + user=self.broker_user, password=self.broker_password + ) + + def _add_scram_user(self): + self.out("Adding SCRAM credentials for user {} to zookeeper.".format(self.broker_user)) + args = self.kafka_run_class_args( + "kafka.admin.ConfigCommand", + "--zookeeper", + "%s:%d" % (self.zookeeper.host, + self.zookeeper.port), + "--alter", + "--entity-type", "users", + "--entity-name", self.broker_user, + "--add-config", + "{}=[password={}]".format(self.sasl_mechanism, self.broker_password), + ) + env = self.kafka_run_class_env() + proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + stdout, stderr = proc.communicate() + + if proc.returncode != 0: + self.out("Failed to save credentials to zookeeper!") + self.out(stdout) + self.out(stderr) + raise RuntimeError("Failed to save credentials to zookeeper!") + self.out("User created.") + def bootstrap_server(self): return '%s:%d' % (self.host, self.port) diff --git a/test/test_sasl_integration.py b/test/test_sasl_integration.py new file mode 100644 index 000000000..acdece711 --- /dev/null +++ b/test/test_sasl_integration.py @@ -0,0 +1,56 @@ +import pytest +from . import unittest +from .testutil import random_string + +from test.fixtures import ZookeeperFixture, KafkaFixture +from test.testutil import KafkaIntegrationTestCase, env_kafka_version + + +class SASLIntegrationTestCase(unittest.TestCase): + sasl_mechanism = "PLAIN" + sasl_transport = "SASL_PLAINTEXT" + server = None + zk = None + + @classmethod + def setUpClass(cls) -> None: + cls.zk = ZookeeperFixture.instance() + cls.server = KafkaFixture.instance( + 0, cls.zk, zk_chroot=random_string(10), transport=cls.sasl_transport, sasl_mechanism=cls.sasl_mechanism + ) + + @classmethod + def tearDownClass(cls) -> None: + pass + + +@pytest.mark.skipif( + not env_kafka_version() or env_kafka_version() < (0, 10), reason="No KAFKA_VERSION or version too low" +) +class TestSaslPlain(SASLIntegrationTestCase): + def test_sasl_plain(self): + pass + + def test_sasl_scram(self, strength): + pass + + +@pytest.mark.skipif( + not env_kafka_version() or env_kafka_version() < (0, 10, 2), reason="No KAFKA_VERSION or version too low" +) +class TestSaslScram256(SASLIntegrationTestCase): + sasl_mechanism = "SCRAM-SHA-256" + + def test_sasl_plain(self): + pass + + @pytest.mark.parametrize("strength", [256, 512]) + def test_sasl_scram(self, strength): + pass + + +@pytest.mark.skipif( + not env_kafka_version() or env_kafka_version() < (0, 10, 2), reason="No KAFKA_VERSION or version too low" +) +class TestSaslScram512(SASLIntegrationTestCase): + sasl_mechanism = "SCRAM-SHA-512" From fd8f11c6f44fb5078dfedc8aa447d2c97e059d50 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 1 Oct 2019 17:09:25 +0200 Subject: [PATCH 03/46] write test for PLAIN sasl mechanism --- test/test_sasl_integration.py | 64 +++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 25 deletions(-) diff --git a/test/test_sasl_integration.py b/test/test_sasl_integration.py index acdece711..9b9d87555 100644 --- a/test/test_sasl_integration.py +++ b/test/test_sasl_integration.py @@ -3,11 +3,15 @@ from .testutil import random_string from test.fixtures import ZookeeperFixture, KafkaFixture -from test.testutil import KafkaIntegrationTestCase, env_kafka_version +from test.testutil import env_kafka_version +# from kafka.client import SimpleClient, KafkaClient +# from kafka.producer import KafkaProducer, SimpleProducer +# from kafka.consumer import SimpleConsumer, KafkaConsumer +from kafka.admin import KafkaAdminClient, NewTopic class SASLIntegrationTestCase(unittest.TestCase): - sasl_mechanism = "PLAIN" + sasl_mechanism = None sasl_transport = "SASL_PLAINTEXT" server = None zk = None @@ -23,34 +27,44 @@ def setUpClass(cls) -> None: def tearDownClass(cls) -> None: pass + @classmethod + def bootstrap_servers(cls) -> str: + return "{}:{}".format(cls.server.host, cls.server.port) -@pytest.mark.skipif( - not env_kafka_version() or env_kafka_version() < (0, 10), reason="No KAFKA_VERSION or version too low" -) -class TestSaslPlain(SASLIntegrationTestCase): - def test_sasl_plain(self): - pass + def test_admin(self): + admin = self.create_admin() + admin.create_topics([NewTopic('mytopic', 1, 1)]) - def test_sasl_scram(self, strength): - pass + def create_admin(self) -> KafkaAdminClient: + raise NotImplementedError() @pytest.mark.skipif( - not env_kafka_version() or env_kafka_version() < (0, 10, 2), reason="No KAFKA_VERSION or version too low" + not env_kafka_version() or env_kafka_version() < (0, 10), reason="No KAFKA_VERSION or version too low" ) -class TestSaslScram256(SASLIntegrationTestCase): - sasl_mechanism = "SCRAM-SHA-256" - - def test_sasl_plain(self): - pass - - @pytest.mark.parametrize("strength", [256, 512]) - def test_sasl_scram(self, strength): - pass +class TestSaslPlain(SASLIntegrationTestCase): + sasl_mechanism = "PLAIN" + def create_admin(self) -> KafkaAdminClient: + return KafkaAdminClient( + bootstrap_servers=self.bootstrap_servers(), + security_protocol=self.sasl_transport, + sasl_mechanism=self.sasl_mechanism, + sasl_plain_username=self.server.broker_user, + sasl_plain_password=self.server.broker_password + ) -@pytest.mark.skipif( - not env_kafka_version() or env_kafka_version() < (0, 10, 2), reason="No KAFKA_VERSION or version too low" -) -class TestSaslScram512(SASLIntegrationTestCase): - sasl_mechanism = "SCRAM-SHA-512" +# +# @pytest.mark.skipif( +# not env_kafka_version() or env_kafka_version() < (0, 10, 2), reason="No KAFKA_VERSION or version too low" +# ) +# class TestSaslScram256(SASLIntegrationTestCase): +# sasl_mechanism = "SCRAM-SHA-256" +# +# +# +# @pytest.mark.skipif( +# not env_kafka_version() or env_kafka_version() < (0, 10, 2), reason="No KAFKA_VERSION or version too low" +# ) +# class TestSaslScram512(SASLIntegrationTestCase): +# sasl_mechanism = "SCRAM-SHA-512" From 67d5276724c1974b527309c63ffec8b8330d036a Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 2 Oct 2019 08:19:47 +0200 Subject: [PATCH 04/46] normalize KafkaFixture.get_clients .get_consumers and .get_producers --- test/fixtures.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index a28530fac..e5a152ca0 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -551,11 +551,13 @@ def create_topics(self, topic_names, num_partitions=None, replication_factor=Non for topic_name in topic_names: self._create_topic(topic_name, num_partitions, replication_factor) - def get_clients(self, cnt=1, client_id=None): - if client_id is None: - client_id = 'client' - return tuple(KafkaClient(client_id='%s_%s' % (client_id, random_string(4)), - bootstrap_servers=self.bootstrap_server()) for x in range(cnt)) + def get_clients(self, cnt=1, **params): + params.setdefault('client_id', 'client') + params['bootstrap_servers'] = self.bootstrap_server() + client_id = params['client_id'] + for _ in range(cnt): + params['client_id'] = '%s_%s' % (client_id, random_string(4)) + yield KafkaClient(**params) def get_admin_clients(self, cnt=1, **params): params.setdefault('client_id', 'admin_client') @@ -570,7 +572,7 @@ def get_consumers(self, cnt, topics, **params): params.setdefault('heartbeat_interval_ms', 500) params['bootstrap_servers'] = self.bootstrap_server() client_id = params['client_id'] - for x in range(cnt): + for _ in range(cnt): params['client_id'] = '%s_%s' % (client_id, random_string(4)) yield KafkaConsumer(*topics, **params) @@ -578,6 +580,6 @@ def get_producers(self, cnt, **params): params.setdefault('client_id', 'producer') params['bootstrap_servers'] = self.bootstrap_server() client_id = params['client_id'] - for x in range(cnt): + for _ in range(cnt): params['client_id'] = '%s_%s' % (client_id, random_string(4)) yield KafkaProducer(**params) From 471029b0249975d0a81ee0092cbb4c395b8a9d08 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 2 Oct 2019 08:30:45 +0200 Subject: [PATCH 05/46] refactor KafkaFixture.get_clients .get_consumers and .get_producers --- test/fixtures.py | 57 +++++++++++++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index e5a152ca0..0f3d6b0ef 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -418,7 +418,7 @@ def start(self): else: raise RuntimeError('Failed to start KafkaInstance before max_timeout') - (self._client,) = self.get_clients(1, '_internal_client') + (self._client,) = self.get_clients(1, client_id='_internal_client') self.out("Done!") self.running = True @@ -551,35 +551,42 @@ def create_topics(self, topic_names, num_partitions=None, replication_factor=Non for topic_name in topic_names: self._create_topic(topic_name, num_partitions, replication_factor) - def get_clients(self, cnt=1, **params): - params.setdefault('client_id', 'client') - params['bootstrap_servers'] = self.bootstrap_server() + def _enrich_client_params(self, params, **defaults): + params = params.copy() + for key, value in defaults.items(): + params.setdefault(key, value) + params.setdefault('bootstrap_servers', self.bootstrap_server()) + if 'SASL' in self.transport: + params.setdefault('sasl_mechanism', self.sasl_mechanism) + params.setdefault('security_protocol', self.transport) + if self.sasl_mechanism in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'): + params.setdefault('sasl_plain_username', self.broker_user) + params.setdefault('sasl_plain_password', self.broker_password) + return params + + @staticmethod + def _create_many_clients(cnt, cls, *args, **params): client_id = params['client_id'] for _ in range(cnt): params['client_id'] = '%s_%s' % (client_id, random_string(4)) - yield KafkaClient(**params) + yield cls(*args, **params) - def get_admin_clients(self, cnt=1, **params): - params.setdefault('client_id', 'admin_client') - params['bootstrap_servers'] = self.bootstrap_server() - client_id = params['client_id'] - for x in range(cnt): - params['client_id'] = '%s_%s' % (client_id, random_string(4)) - yield KafkaAdminClient(**params) + def get_clients(self, cnt=1, **params): + params = self._enrich_client_params(params, client_id='client') + for client in self._create_many_clients(cnt, KafkaClient, **params): + yield client + + def get_admin_clients(self, cnt, **params): + params = self._enrich_client_params(params, client_id='admin_client') + for client in self._create_many_clients(cnt, KafkaAdminClient, **params): + yield client def get_consumers(self, cnt, topics, **params): - params.setdefault('client_id', 'consumer') - params.setdefault('heartbeat_interval_ms', 500) - params['bootstrap_servers'] = self.bootstrap_server() - client_id = params['client_id'] - for _ in range(cnt): - params['client_id'] = '%s_%s' % (client_id, random_string(4)) - yield KafkaConsumer(*topics, **params) + params = self._enrich_client_params(params, client_id='consumer', heartbeat_interval_ms=500) + for client in self._create_many_clients(cnt, KafkaConsumer, *topics, **params): + yield client def get_producers(self, cnt, **params): - params.setdefault('client_id', 'producer') - params['bootstrap_servers'] = self.bootstrap_server() - client_id = params['client_id'] - for _ in range(cnt): - params['client_id'] = '%s_%s' % (client_id, random_string(4)) - yield KafkaProducer(**params) + params = self._enrich_client_params(params, client_id='producer') + for client in self._create_many_clients(cnt, KafkaProducer, **params): + yield client From f0b038955160e5d7b6ad1bedee61b41a9902c1e9 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 2 Oct 2019 09:38:13 +0200 Subject: [PATCH 06/46] add get_topic_names to KafkaFixture --- test/fixtures.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/fixtures.py b/test/fixtures.py index 0f3d6b0ef..4e05d633f 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -547,6 +547,24 @@ def _create_topic(self, topic_name, num_partitions, replication_factor, timeout_ self.out(stderr) raise RuntimeError("Failed to create topic %s" % (topic_name,)) + def get_topic_names(self): + args = self.kafka_run_class_args('kafka.admin.TopicCommand', + '--zookeeper', '%s:%s/%s' % (self.zookeeper.host, + self.zookeeper.port, + self.zk_chroot), + '--list' + ) + env = self.kafka_run_class_env() + env.pop('KAFKA_LOG4J_OPTS') + proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = proc.communicate() + if proc.returncode != 0: + self.out("Failed to list topics!") + self.out(stdout) + self.out(stderr) + raise RuntimeError("Failed to list topics!") + return stdout.decode().splitlines(keepends=False) + def create_topics(self, topic_names, num_partitions=None, replication_factor=None): for topic_name in topic_names: self._create_topic(topic_name, num_partitions, replication_factor) From d6761bd630d1df45415f9b305ab1060d1e16bc9c Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 2 Oct 2019 10:57:20 +0200 Subject: [PATCH 07/46] fix bug in KafkaFixture.create_topics --- test/fixtures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/fixtures.py b/test/fixtures.py index 4e05d633f..793f9c771 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -519,7 +519,7 @@ def _create_topic(self, topic_name, num_partitions, replication_factor, timeout_ request = CreateTopicsRequest[0]([(topic_name, num_partitions, replication_factor, [], [])], timeout_ms) result = self._send_request(request, timeout=timeout_ms) - for topic_result in result[0].topic_error_codes: + for topic_result in result[0].topic_errors: error_code = topic_result[1] if error_code != 0: raise errors.for_code(error_code) From 6077c6c16884d60b65aa25e1b196bce6551234bf Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 2 Oct 2019 11:11:12 +0200 Subject: [PATCH 08/46] create function to turn special chars to underscore --- test/testutil.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/test/testutil.py b/test/testutil.py index 77a6673fa..ec4d70bf6 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -2,10 +2,15 @@ import os import random +import re import string import time +def special_to_underscore(string, _matcher=re.compile(r'[^a-zA-Z0-9_]+')): + return _matcher.sub('_', string) + + def random_string(length): return "".join(random.choice(string.ascii_letters) for i in range(length)) From 37d100d734da948da5505cbc96c5846ec2af7dc8 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 2 Oct 2019 11:48:33 +0200 Subject: [PATCH 09/46] improve KafkaFixture.get_consumers --- test/fixtures.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/fixtures.py b/test/fixtures.py index 793f9c771..769248573 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -600,7 +600,9 @@ def get_admin_clients(self, cnt, **params): yield client def get_consumers(self, cnt, topics, **params): - params = self._enrich_client_params(params, client_id='consumer', heartbeat_interval_ms=500) + params = self._enrich_client_params( + params, client_id='consumer', heartbeat_interval_ms=500, auto_offset_reset='earliest' + ) for client in self._create_many_clients(cnt, KafkaConsumer, *topics, **params): yield client From d297767c7286e49d0b29de5680dd4d033debabb0 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 2 Oct 2019 11:48:55 +0200 Subject: [PATCH 10/46] improve sasl tests and enable tests for scram --- test/test_sasl_integration.py | 123 ++++++++++++++++++---------------- 1 file changed, 66 insertions(+), 57 deletions(-) diff --git a/test/test_sasl_integration.py b/test/test_sasl_integration.py index 9b9d87555..4d0e955c2 100644 --- a/test/test_sasl_integration.py +++ b/test/test_sasl_integration.py @@ -1,70 +1,79 @@ +import logging +import uuid + import pytest -from . import unittest -from .testutil import random_string -from test.fixtures import ZookeeperFixture, KafkaFixture -from test.testutil import env_kafka_version -# from kafka.client import SimpleClient, KafkaClient -# from kafka.producer import KafkaProducer, SimpleProducer -# from kafka.consumer import SimpleConsumer, KafkaConsumer -from kafka.admin import KafkaAdminClient, NewTopic +from kafka.admin import NewTopic +from kafka.client import KafkaClient +from kafka.protocol.metadata import MetadataRequest_v1 +from test.testutil import assert_message_count, env_kafka_version, random_string, special_to_underscore -class SASLIntegrationTestCase(unittest.TestCase): - sasl_mechanism = None - sasl_transport = "SASL_PLAINTEXT" - server = None - zk = None +@pytest.fixture( + params=[ + pytest.param( + "PLAIN", marks=pytest.mark.skipif(env_kafka_version() < (0, 10), reason="Requires KAFKA_VERSION >= 0.10") + ), + pytest.param( + "SCRAM-SHA-256", + marks=pytest.mark.skipif(env_kafka_version() < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"), + ), + pytest.param( + "SCRAM-SHA-512", + marks=pytest.mark.skipif(env_kafka_version() < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"), + ), + ] +) +def sasl_kafka(request, kafka_broker_factory): + return kafka_broker_factory(transport="SASL_PLAINTEXT", sasl_mechanism=request.param)[0] - @classmethod - def setUpClass(cls) -> None: - cls.zk = ZookeeperFixture.instance() - cls.server = KafkaFixture.instance( - 0, cls.zk, zk_chroot=random_string(10), transport=cls.sasl_transport, sasl_mechanism=cls.sasl_mechanism - ) - @classmethod - def tearDownClass(cls) -> None: - pass +def test_admin(request, sasl_kafka): + topic_name = special_to_underscore(request.node.name + random_string(4)) + admin, = sasl_kafka.get_admin_clients(1) + admin.create_topics([NewTopic(topic_name, 1, 1)]) + assert topic_name in sasl_kafka.get_topic_names() - @classmethod - def bootstrap_servers(cls) -> str: - return "{}:{}".format(cls.server.host, cls.server.port) - def test_admin(self): - admin = self.create_admin() - admin.create_topics([NewTopic('mytopic', 1, 1)]) +def test_produce_and_consume(request, sasl_kafka): + topic_name = special_to_underscore(request.node.name + random_string(4)) + sasl_kafka.create_topics([topic_name], num_partitions=2) + producer, = sasl_kafka.get_producers(1) - def create_admin(self) -> KafkaAdminClient: - raise NotImplementedError() + messages_and_futures = [] # [(message, produce_future),] + for i in range(100): + encoded_msg = "{}-{}-{}".format(i, request.node.name, uuid.uuid4()).encode("utf-8") + future = producer.send(topic_name, value=encoded_msg, partition=i % 2) + messages_and_futures.append((encoded_msg, future)) + producer.flush() + for (msg, f) in messages_and_futures: + assert f.succeeded() + + consumer, = sasl_kafka.get_consumers(1, [topic_name]) + messages = {0: [], 1: []} + for i, message in enumerate(consumer, 1): + logging.debug("Consumed message %s", repr(message)) + messages[message.partition].append(message) + if i >= 100: + break + + assert_message_count(messages[0], 50) + assert_message_count(messages[1], 50) -@pytest.mark.skipif( - not env_kafka_version() or env_kafka_version() < (0, 10), reason="No KAFKA_VERSION or version too low" -) -class TestSaslPlain(SASLIntegrationTestCase): - sasl_mechanism = "PLAIN" - def create_admin(self) -> KafkaAdminClient: - return KafkaAdminClient( - bootstrap_servers=self.bootstrap_servers(), - security_protocol=self.sasl_transport, - sasl_mechanism=self.sasl_mechanism, - sasl_plain_username=self.server.broker_user, - sasl_plain_password=self.server.broker_password - ) +def test_client(request, sasl_kafka): + topic_name = special_to_underscore(request.node.name + random_string(4)) + sasl_kafka.create_topics([topic_name], num_partitions=1) -# -# @pytest.mark.skipif( -# not env_kafka_version() or env_kafka_version() < (0, 10, 2), reason="No KAFKA_VERSION or version too low" -# ) -# class TestSaslScram256(SASLIntegrationTestCase): -# sasl_mechanism = "SCRAM-SHA-256" -# -# -# -# @pytest.mark.skipif( -# not env_kafka_version() or env_kafka_version() < (0, 10, 2), reason="No KAFKA_VERSION or version too low" -# ) -# class TestSaslScram512(SASLIntegrationTestCase): -# sasl_mechanism = "SCRAM-SHA-512" + client: KafkaClient = next(sasl_kafka.get_clients(1), None) + request = MetadataRequest_v1(None) + client.send(0, request) + for _ in range(10): + result = client.poll(timeout_ms=10_000) + if len(result) > 0: + break + else: + raise TimeoutError("Couldn't fetch topic response from Broker.") + result = result[0] + assert topic_name in [t[1] for t in result.topics] From a97386d2e606411a4126be5a89a69ec4217e73f0 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 2 Oct 2019 18:52:52 +0200 Subject: [PATCH 11/46] implement scram --- kafka/conn.py | 135 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 131 insertions(+), 4 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 815065b40..6385f27fa 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1,13 +1,18 @@ from __future__ import absolute_import, division +import base64 import collections import copy import errno +import hashlib +import hmac import io import logging from random import shuffle, uniform # selectors in stdlib as of py3.4 +from uuid import uuid4 + try: import selectors # pylint: disable=import-error except ImportError: @@ -98,6 +103,73 @@ class ConnectionStates(object): AUTHENTICATING = '' +def xor_bytes(left, right): + return bytes(lb ^ rb for lb, rb in zip(left, right)) + + +class ScramClient: + MECHANISMS = { + 'SCRAM-SHA-256': hashlib.sha256, + 'SCRAM-SHA-512': hashlib.sha512 + } + + def __init__(self, user, password, mechanism): + self.nonce = str(uuid4()).replace('-', '') + self.auth_message = '' + self.salted_password = None + self.user = user + self.password = password.encode() + self.hashfunc = self.MECHANISMS[mechanism] + self.hashname = ''.join(mechanism.lower().split('-')[1:3]) + self.stored_key = None + self.client_key = None + self.client_signature = None + self.client_proof = None + self.server_key = None + self.server_signature = None + + def first_message(self): + client_first_bare = 'n={},r={}'.format(self.user, self.nonce) + self.auth_message += client_first_bare + return 'n,,' + client_first_bare + + def process_server_first_message(self, server_first_message): + self.auth_message += ',' + server_first_message + params = dict(pair.split('=', 1) for pair in server_first_message.split(',')) + server_nonce = params['r'] + if not server_nonce.startswith(self.nonce): + raise ValueError("Server nonce, did not start with client nonce!") + self.nonce = server_nonce + self.auth_message += ',c=biws,r=' + self.nonce + + salt = base64.b64decode(params['s'].encode()) + iterations = int(params['i']) + self.create_salted_password(salt, iterations) + + self.client_key = self.hmac(self.salted_password, b'Client Key') + self.stored_key = self.hashfunc(self.client_key).digest() + self.client_signature = self.hmac(self.stored_key, self.auth_message.encode()) + self.client_proof = xor_bytes(self.client_key, self.client_signature) + self.server_key = self.hmac(self.salted_password, b'Server Key') + self.server_signature = self.hmac(self.server_key, self.auth_message.encode()) + + def hmac(self, key, msg): + return hmac.new(key, msg, digestmod=self.hashfunc).digest() + + def create_salted_password(self, salt, iterations): + self.salted_password = hashlib.pbkdf2_hmac( + self.hashname, self.password, salt, iterations + ) + + def final_message(self): + client_final_no_proof = 'c=biws,r=' + self.nonce + return 'c=biws,r={},p={}'.format(self.nonce, base64.b64encode(self.client_proof).decode()) + + def process_server_final_message(self, server_final_message): + params = dict(pair.split('=', 1) for pair in server_final_message.split(',')) + if self.server_signature != base64.b64decode(params['v'].encode()): + raise ValueError("Server sent wrong signature!") + class BrokerConnection(object): """Initialize a Kafka broker connection @@ -224,7 +296,7 @@ class BrokerConnection(object): 'sasl_oauth_token_provider': None } SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') - SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER') + SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512") def __init__(self, host, port, afi, **configs): self.host = host @@ -259,9 +331,13 @@ def __init__(self, host, port, afi, **configs): if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'): assert self.config['sasl_mechanism'] in self.SASL_MECHANISMS, ( 'sasl_mechanism must be in ' + ', '.join(self.SASL_MECHANISMS)) - if self.config['sasl_mechanism'] == 'PLAIN': - assert self.config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl' - assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl' + if self.config['sasl_mechanism'] in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'): + assert self.config['sasl_plain_username'] is not None, ( + 'sasl_plain_username required for PLAIN or SCRAM sasl' + ) + assert self.config['sasl_plain_password'] is not None, ( + 'sasl_plain_password required for PLAIN or SCRAM sasl' + ) if self.config['sasl_mechanism'] == 'GSSAPI': assert gssapi is not None, 'GSSAPI lib not available' assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl' @@ -552,6 +628,8 @@ def _handle_sasl_handshake_response(self, future, response): return self._try_authenticate_gssapi(future) elif self.config['sasl_mechanism'] == 'OAUTHBEARER': return self._try_authenticate_oauth(future) + elif self.config['sasl_mechanism'].startswith("SCRAM-SHA-"): + return self._try_authenticate_scram(future) else: return future.failure( Errors.UnsupportedSaslMechanismError( @@ -652,6 +730,55 @@ def _try_authenticate_plain(self, future): log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username']) return future.success(True) + def _try_authenticate_scram(self, future): + if self.config['security_protocol'] == 'SASL_PLAINTEXT': + log.warning('%s: Exchanging credentials in the clear', self) + + scram_client = ScramClient( + self.config['sasl_plain_username'], self.config['sasl_plain_password'], self.config['sasl_mechanism'] + ) + + err = None + close = False + with self._lock: + if not self._can_send_recv(): + err = Errors.NodeNotReadyError(str(self)) + close = False + else: + try: + client_first = scram_client.first_message().encode() + size = Int32.encode(len(client_first)) + self._send_bytes_blocking(size + client_first) + + data_len = self._recv_bytes_blocking(4) + data_len = int.from_bytes(data_len, 'big') + server_first = self._recv_bytes_blocking(data_len).decode() + scram_client.process_server_first_message(server_first) + + client_final = scram_client.final_message().encode() + size = Int32.encode(len(client_final)) + self._send_bytes_blocking(size + client_final) + + data_len = self._recv_bytes_blocking(4) + data_len = int.from_bytes(data_len, 'big') + server_final = self._recv_bytes_blocking(data_len).decode() + scram_client.process_server_final_message(server_final) + + except (ConnectionError, TimeoutError) as e: + log.exception("%s: Error receiving reply from server", self) + err = Errors.KafkaConnectionError("%s: %s" % (self, e)) + close = True + + if err is not None: + if close: + self.close(error=err) + return future.failure(err) + + log.info( + '%s: Authenticated as %s via %s', self, self.config['sasl_plain_username'], self.config['sasl_mechanism'] + ) + return future.success(True) + def _try_authenticate_gssapi(self, future): kerberos_damin_name = self.config['sasl_kerberos_domain_name'] or self.host auth_id = self.config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name From 3c3c5a0fb877e04aac3847034689945493bcb2e2 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 2 Oct 2019 18:53:25 +0200 Subject: [PATCH 12/46] make sure to wait for scram credentials during kafka startup --- test/fixtures.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index 769248573..c42a790a5 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -275,13 +275,14 @@ def __init__(self, host, port, broker_id, zookeeper, zk_chroot, self.auto_create_topic = auto_create_topic self.transport = transport.upper() self.sasl_mechanism = sasl_mechanism.upper() - self.sasl_config = self._sasl_config() self.ssl_dir = self.test_resource('ssl') # TODO: checking for port connection would be better than scanning logs # until then, we need the pattern to work across all supported broker versions # The logging format changed slightly in 1.0.0 self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % (broker_id,) + # Need to wait until the broker has fetched user configs from zookeeper in case we use scram as sasl mechanism + self.scram_pattern = r"Processing override for entityPath: users/%s" % (self.broker_user) self.zookeeper = zookeeper self.zk_chroot = zk_chroot @@ -296,6 +297,7 @@ def __init__(self, host, port, broker_id, zookeeper, zk_chroot, self.running = False self._client = None + self.sasl_config = None def _sasl_config(self): if 'SASL' not in self.transport: @@ -315,9 +317,6 @@ def _sasl_config(self): + "org.apache.kafka.common.security.scram.ScramLoginModule " + 'required username="{user}" password="{password}";\n' ) - # add user to zookeeper for the first server - if self.broker_id == 0: - self._add_scram_user() else: raise ValueError("SASL mechanism {} currently not supported".format(self.sasl_mechanism)) return sasl_config.format( @@ -331,8 +330,9 @@ def _add_scram_user(self): args = self.kafka_run_class_args( "kafka.admin.ConfigCommand", "--zookeeper", - "%s:%d" % (self.zookeeper.host, - self.zookeeper.port), + "%s:%d/%s" % (self.zookeeper.host, + self.zookeeper.port, + self.zk_chroot), "--alter", "--entity-type", "users", "--entity-name", self.broker_user, @@ -407,8 +407,12 @@ def start(self): self.child = SpawnedService(args, env) self.child.start() timeout = min(timeout, max(end_at - time.time(), 0)) - if self.child.wait_for(self.start_pattern, timeout=timeout): + if self.child.wait_for(self.start_pattern, timeout=timeout) and ( + not self.sasl_mechanism.startswith('SCRAM-SHA-') + or self.child.wait_for(self.scram_pattern, timeout=timeout) + ): break + self.child.dump_logs() self.child.stop() timeout *= 2 @@ -448,6 +452,10 @@ def open(self): log.info(" tmp_dir = %s", self.tmp_dir.strpath) self._create_zk_chroot() + self.sasl_config = self._sasl_config() + # add user to zookeeper for the first server + if self.sasl_mechanism.startswith("SCRAM-SHA") and self.broker_id == 0: + self._add_scram_user() self.start() atexit.register(self.close) From 9785c059dcaf51c13d6eb788c99bbb345d8d14c1 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Thu, 3 Oct 2019 09:58:02 +0200 Subject: [PATCH 13/46] ensure compatibility with older python versions --- kafka/conn.py | 20 +++++++++----------- test/fixtures.py | 2 +- test/test_sasl_integration.py | 7 +++---- 3 files changed, 13 insertions(+), 16 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 6385f27fa..a9634041b 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1,7 +1,6 @@ from __future__ import absolute_import, division import base64 -import collections import copy import errno import hashlib @@ -10,9 +9,9 @@ import logging from random import shuffle, uniform -# selectors in stdlib as of py3.4 from uuid import uuid4 +# selectors in stdlib as of py3.4 try: import selectors # pylint: disable=import-error except ImportError: @@ -21,7 +20,6 @@ import socket import struct -import sys import threading import time @@ -44,6 +42,12 @@ TimeoutError = socket.error BlockingIOError = Exception + def xor_bytes(left, right): + return bytearray(ord(lb) ^ ord(rb) for lb, rb in zip(left, right)) +else: + def xor_bytes(left, right): + return bytes(lb ^ rb for lb, rb in zip(left, right)) + log = logging.getLogger(__name__) DEFAULT_KAFKA_PORT = 9092 @@ -103,10 +107,6 @@ class ConnectionStates(object): AUTHENTICATING = '' -def xor_bytes(left, right): - return bytes(lb ^ rb for lb, rb in zip(left, right)) - - class ScramClient: MECHANISMS = { 'SCRAM-SHA-256': hashlib.sha256, @@ -750,8 +750,7 @@ def _try_authenticate_scram(self, future): size = Int32.encode(len(client_first)) self._send_bytes_blocking(size + client_first) - data_len = self._recv_bytes_blocking(4) - data_len = int.from_bytes(data_len, 'big') + (data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4)) server_first = self._recv_bytes_blocking(data_len).decode() scram_client.process_server_first_message(server_first) @@ -759,8 +758,7 @@ def _try_authenticate_scram(self, future): size = Int32.encode(len(client_final)) self._send_bytes_blocking(size + client_final) - data_len = self._recv_bytes_blocking(4) - data_len = int.from_bytes(data_len, 'big') + (data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4)) server_final = self._recv_bytes_blocking(data_len).decode() scram_client.process_server_final_message(server_final) diff --git a/test/fixtures.py b/test/fixtures.py index c42a790a5..2dbbb7795 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -571,7 +571,7 @@ def get_topic_names(self): self.out(stdout) self.out(stderr) raise RuntimeError("Failed to list topics!") - return stdout.decode().splitlines(keepends=False) + return stdout.decode().splitlines(False) def create_topics(self, topic_names, num_partitions=None, replication_factor=None): for topic_name in topic_names: diff --git a/test/test_sasl_integration.py b/test/test_sasl_integration.py index 4d0e955c2..69c04996b 100644 --- a/test/test_sasl_integration.py +++ b/test/test_sasl_integration.py @@ -4,7 +4,6 @@ import pytest from kafka.admin import NewTopic -from kafka.client import KafkaClient from kafka.protocol.metadata import MetadataRequest_v1 from test.testutil import assert_message_count, env_kafka_version, random_string, special_to_underscore @@ -66,14 +65,14 @@ def test_client(request, sasl_kafka): topic_name = special_to_underscore(request.node.name + random_string(4)) sasl_kafka.create_topics([topic_name], num_partitions=1) - client: KafkaClient = next(sasl_kafka.get_clients(1), None) + client = next(sasl_kafka.get_clients(1), None) request = MetadataRequest_v1(None) client.send(0, request) for _ in range(10): - result = client.poll(timeout_ms=10_000) + result = client.poll(timeout_ms=10000) if len(result) > 0: break else: - raise TimeoutError("Couldn't fetch topic response from Broker.") + raise RuntimeError("Couldn't fetch topic response from Broker.") result = result[0] assert topic_name in [t[1] for t in result.topics] From 0570d91b9868baa0578fc6b7cd82abfd43eb8ca8 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 15 Oct 2019 20:42:31 +0200 Subject: [PATCH 14/46] enable logging for tests --- test/__init__.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/test/__init__.py b/test/__init__.py index 71f667da8..329277dc6 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -2,14 +2,7 @@ # Set default logging handler to avoid "No handler found" warnings. import logging -try: # Python 2.7+ - from logging import NullHandler -except ImportError: - class NullHandler(logging.Handler): - def emit(self, record): - pass - -logging.getLogger(__name__).addHandler(NullHandler()) +logging.basicConfig(level=logging.INFO) from kafka.future import Future Future.error_on_callbacks = True # always fail during testing From d7fe59bc484c07e3a0ff670cffc9336c589210b8 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 15 Oct 2019 20:43:26 +0200 Subject: [PATCH 15/46] add more output to SpawnedService --- test/service.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/test/service.py b/test/service.py index 47fb84643..d2a7f4405 100644 --- a/test/service.py +++ b/test/service.py @@ -45,6 +45,11 @@ def __init__(self, args=None, env=None): self.child = None self.alive = False self.daemon = True + log.info("Created service for command:") + log.info(" "+' '.join(self.args)) + log.info("With environment:") + for key, value in self.env.items(): + log.info(" {key}={value}".format(key=key, value=value)) def _spawn(self): if self.alive: return From 22b61900785e4b978284573af32a47f88480a247 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 15 Oct 2019 20:44:34 +0200 Subject: [PATCH 16/46] add more checks if SpawnedService is actually running --- test/service.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/service.py b/test/service.py index d2a7f4405..483708a14 100644 --- a/test/service.py +++ b/test/service.py @@ -62,7 +62,7 @@ def _spawn(self): bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - self.alive = True + self.alive = self.child.poll() is None def _despawn(self): if self.child.poll() is None: @@ -110,6 +110,9 @@ def dump_logs(self): def wait_for(self, pattern, timeout=30): start = time.time() while True: + if not self.is_alive(): + raise RuntimeError("Child thread died already.") + elapsed = time.time() - start if elapsed >= timeout: log.error("Waiting for %r timed out after %d seconds", pattern, timeout) From 84b57ed1a8e5e07efb02e1923ba7feaf65b78c11 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 15 Oct 2019 21:07:50 +0200 Subject: [PATCH 17/46] switch to actual jaas config file for kafka --- .../0.10.0.0/resources/kafka_server_jaas.conf | 4 + .../0.10.0.1/resources/kafka_server_jaas.conf | 4 + .../0.10.1.1/resources/kafka_server_jaas.conf | 4 + .../0.10.2.1/resources/kafka_server_jaas.conf | 4 + .../0.10.2.2/resources/kafka_server_jaas.conf | 4 + .../0.11.0.0/resources/kafka_server_jaas.conf | 4 + .../0.11.0.1/resources/kafka_server_jaas.conf | 4 + .../0.11.0.2/resources/kafka_server_jaas.conf | 4 + .../0.11.0.3/resources/kafka_server_jaas.conf | 4 + .../1.0.0/resources/kafka_server_jaas.conf | 4 + .../1.0.1/resources/kafka_server_jaas.conf | 4 + .../1.0.2/resources/kafka_server_jaas.conf | 4 + .../1.1.0/resources/kafka_server_jaas.conf | 4 + .../1.1.1/resources/kafka_server_jaas.conf | 4 + .../2.0.0/resources/kafka_server_jaas.conf | 4 + .../2.0.1/resources/kafka_server_jaas.conf | 4 + .../2.1.0/resources/kafka_server_jaas.conf | 4 + .../2.1.1/resources/kafka_server_jaas.conf | 4 + .../2.2.0/resources/kafka_server_jaas.conf | 4 + .../2.2.1/resources/kafka_server_jaas.conf | 4 + .../2.3.0/resources/kafka_server_jaas.conf | 4 + .../trunk/resources/kafka_server_jaas.conf | 4 + test/fixtures.py | 93 ++++++++++++------- 23 files changed, 145 insertions(+), 36 deletions(-) create mode 100644 servers/0.10.0.0/resources/kafka_server_jaas.conf create mode 100644 servers/0.10.0.1/resources/kafka_server_jaas.conf create mode 100644 servers/0.10.1.1/resources/kafka_server_jaas.conf create mode 100644 servers/0.10.2.1/resources/kafka_server_jaas.conf create mode 100644 servers/0.10.2.2/resources/kafka_server_jaas.conf create mode 100644 servers/0.11.0.0/resources/kafka_server_jaas.conf create mode 100644 servers/0.11.0.1/resources/kafka_server_jaas.conf create mode 100644 servers/0.11.0.2/resources/kafka_server_jaas.conf create mode 100644 servers/0.11.0.3/resources/kafka_server_jaas.conf create mode 100644 servers/1.0.0/resources/kafka_server_jaas.conf create mode 100644 servers/1.0.1/resources/kafka_server_jaas.conf create mode 100644 servers/1.0.2/resources/kafka_server_jaas.conf create mode 100644 servers/1.1.0/resources/kafka_server_jaas.conf create mode 100644 servers/1.1.1/resources/kafka_server_jaas.conf create mode 100644 servers/2.0.0/resources/kafka_server_jaas.conf create mode 100644 servers/2.0.1/resources/kafka_server_jaas.conf create mode 100644 servers/2.1.0/resources/kafka_server_jaas.conf create mode 100644 servers/2.1.1/resources/kafka_server_jaas.conf create mode 100644 servers/2.2.0/resources/kafka_server_jaas.conf create mode 100644 servers/2.2.1/resources/kafka_server_jaas.conf create mode 100644 servers/2.3.0/resources/kafka_server_jaas.conf create mode 100644 servers/trunk/resources/kafka_server_jaas.conf diff --git a/servers/0.10.0.0/resources/kafka_server_jaas.conf b/servers/0.10.0.0/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/0.10.0.0/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/0.10.0.1/resources/kafka_server_jaas.conf b/servers/0.10.0.1/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/0.10.0.1/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/0.10.1.1/resources/kafka_server_jaas.conf b/servers/0.10.1.1/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/0.10.1.1/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/0.10.2.1/resources/kafka_server_jaas.conf b/servers/0.10.2.1/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/0.10.2.1/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/0.10.2.2/resources/kafka_server_jaas.conf b/servers/0.10.2.2/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/0.10.2.2/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/0.11.0.0/resources/kafka_server_jaas.conf b/servers/0.11.0.0/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/0.11.0.0/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/0.11.0.1/resources/kafka_server_jaas.conf b/servers/0.11.0.1/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/0.11.0.1/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/0.11.0.2/resources/kafka_server_jaas.conf b/servers/0.11.0.2/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/0.11.0.2/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/0.11.0.3/resources/kafka_server_jaas.conf b/servers/0.11.0.3/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/0.11.0.3/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/1.0.0/resources/kafka_server_jaas.conf b/servers/1.0.0/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/1.0.0/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/1.0.1/resources/kafka_server_jaas.conf b/servers/1.0.1/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/1.0.1/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/1.0.2/resources/kafka_server_jaas.conf b/servers/1.0.2/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/1.0.2/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/1.1.0/resources/kafka_server_jaas.conf b/servers/1.1.0/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/1.1.0/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/1.1.1/resources/kafka_server_jaas.conf b/servers/1.1.1/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/1.1.1/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/2.0.0/resources/kafka_server_jaas.conf b/servers/2.0.0/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/2.0.0/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/2.0.1/resources/kafka_server_jaas.conf b/servers/2.0.1/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/2.0.1/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/2.1.0/resources/kafka_server_jaas.conf b/servers/2.1.0/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/2.1.0/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/2.1.1/resources/kafka_server_jaas.conf b/servers/2.1.1/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/2.1.1/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/2.2.0/resources/kafka_server_jaas.conf b/servers/2.2.0/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/2.2.0/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/2.2.1/resources/kafka_server_jaas.conf b/servers/2.2.1/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/2.2.1/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/2.3.0/resources/kafka_server_jaas.conf b/servers/2.3.0/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/2.3.0/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/trunk/resources/kafka_server_jaas.conf b/servers/trunk/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/trunk/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/test/fixtures.py b/test/fixtures.py index 2dbbb7795..014fb32aa 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -240,7 +240,7 @@ class KafkaFixture(Fixture): def instance(cls, broker_id, zookeeper, zk_chroot=None, host=None, port=None, transport='PLAINTEXT', replicas=1, partitions=2, - sasl_mechanism='PLAIN', auto_create_topic=True, tmp_dir=None): + sasl_mechanism=None, auto_create_topic=True, tmp_dir=None): if zk_chroot is None: zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_") @@ -264,7 +264,7 @@ def instance(cls, broker_id, zookeeper, zk_chroot=None, def __init__(self, host, port, broker_id, zookeeper, zk_chroot, replicas=1, partitions=2, transport='PLAINTEXT', - sasl_mechanism='PLAIN', auto_create_topic=True, + sasl_mechanism=None, auto_create_topic=True, tmp_dir=None): super(KafkaFixture, self).__init__() @@ -274,7 +274,10 @@ def __init__(self, host, port, broker_id, zookeeper, zk_chroot, self.broker_id = broker_id self.auto_create_topic = auto_create_topic self.transport = transport.upper() - self.sasl_mechanism = sasl_mechanism.upper() + if sasl_mechanism is not None: + self.sasl_mechanism = sasl_mechanism.upper() + else: + self.sasl_mechanism = None self.ssl_dir = self.test_resource('ssl') # TODO: checking for port connection would be better than scanning logs @@ -300,30 +303,30 @@ def __init__(self, host, port, broker_id, zookeeper, zk_chroot, self.sasl_config = None def _sasl_config(self): - if 'SASL' not in self.transport: + if not self.sasl_enabled: return '' sasl_config = "sasl.enabled.mechanisms={mechanism}\n" sasl_config += "sasl.mechanism.inter.broker.protocol={mechanism}\n" - if self.sasl_mechanism == 'PLAIN': - sasl_config += ( - "listener.name.{transport_lower}.plain.sasl.jaas.config=" - + "org.apache.kafka.common.security.plain.PlainLoginModule " - + 'required username="{user}" password="{password}" user_alice="{password}";\n' + return sasl_config.format(mechanism=self.sasl_mechanism) + + def _jaas_config(self): + if not self.sasl_enabled: + return '' + + elif self.sasl_mechanism == 'PLAIN': + jaas_config = ( + "org.apache.kafka.common.security.plain.PlainLoginModule required\n" + ' username="{user}" password="{password}" user_{user}="{password}";\n' ) elif self.sasl_mechanism in ("SCRAM-SHA-256", "SCRAM-SHA-512"): - sasl_config += ( - "listener.name.{transport_lower}.{mechanism_lower}.sasl.jaas.config=" - + "org.apache.kafka.common.security.scram.ScramLoginModule " - + 'required username="{user}" password="{password}";\n' + jaas_config = ( + "org.apache.kafka.common.security.scram.ScramLoginModule required\n" + ' username="{user}" password="{password}";\n' ) else: raise ValueError("SASL mechanism {} currently not supported".format(self.sasl_mechanism)) - return sasl_config.format( - transport=self.transport, transport_lower=self.transport.lower(), - mechanism=self.sasl_mechanism, mechanism_lower=self.sasl_mechanism.lower(), - user=self.broker_user, password=self.broker_password - ) + return jaas_config.format(user=self.broker_user, password=self.broker_password) def _add_scram_user(self): self.out("Adding SCRAM credentials for user {} to zookeeper.".format(self.broker_user)) @@ -351,6 +354,10 @@ def _add_scram_user(self): raise RuntimeError("Failed to save credentials to zookeeper!") self.out("User created.") + @property + def sasl_enabled(self): + return self.sasl_mechanism is not None + def bootstrap_server(self): return '%s:%d' % (self.host, self.port) @@ -386,9 +393,15 @@ def _create_zk_chroot(self): def start(self): # Configure Kafka child process properties = self.tmp_dir.join("kafka.properties") - template = self.test_resource("kafka.properties") + jaas_conf = self.tmp_dir.join("kafka_server_jaas.conf") + properties_template = self.test_resource("kafka.properties") + jaas_conf_template = self.test_resource("kafka_server_jaas.conf") + args = self.kafka_run_class_args("kafka.Kafka", properties.strpath) env = self.kafka_run_class_env() + if self.sasl_enabled: + env['KAFKA_OPTS'] = env.get('KAFKA_OPTS', '') + ' -Djava.security.auth.login.config={}'.format(jaas_conf) + self.render_template(jaas_conf_template, jaas_conf, vars(self)) timeout = 5 max_timeout = 120 @@ -403,14 +416,11 @@ def start(self): if auto_port: self.port = get_open_port() self.out('Attempting to start on port %d (try #%d)' % (self.port, tries)) - self.render_template(template, properties, vars(self)) + self.render_template(properties_template, properties, vars(self)) self.child = SpawnedService(args, env) self.child.start() timeout = min(timeout, max(end_at - time.time(), 0)) - if self.child.wait_for(self.start_pattern, timeout=timeout) and ( - not self.sasl_mechanism.startswith('SCRAM-SHA-') - or self.child.wait_for(self.scram_pattern, timeout=timeout) - ): + if self._broker_ready(timeout) and self._scram_user_present(timeout): break self.child.dump_logs() @@ -427,6 +437,15 @@ def start(self): self.out("Done!") self.running = True + def _broker_ready(self, timeout): + return self.child.wait_for(self.start_pattern, timeout=timeout) + + def _scram_user_present(self, timeout): + # no need to wait for scram user if scram is not used + if not self.sasl_enabled or not self.sasl_mechanism.startswith('SCRAM-SHA-'): + return True + return self.child.wait_for(self.scram_pattern, timeout=timeout) + def open(self): if self.running: self.out("Instance already running") @@ -440,21 +459,23 @@ def open(self): self.tmp_dir.ensure('data', dir=True) self.out("Running local instance...") - log.info(" host = %s", self.host) - log.info(" port = %s", self.port or '(auto)') - log.info(" transport = %s", self.transport) - log.info(" broker_id = %s", self.broker_id) - log.info(" zk_host = %s", self.zookeeper.host) - log.info(" zk_port = %s", self.zookeeper.port) - log.info(" zk_chroot = %s", self.zk_chroot) - log.info(" replicas = %s", self.replicas) - log.info(" partitions = %s", self.partitions) - log.info(" tmp_dir = %s", self.tmp_dir.strpath) + log.info(" host = %s", self.host) + log.info(" port = %s", self.port or '(auto)') + log.info(" transport = %s", self.transport) + log.info(" sasl_mechanism = %s", self.sasl_mechanism) + log.info(" broker_id = %s", self.broker_id) + log.info(" zk_host = %s", self.zookeeper.host) + log.info(" zk_port = %s", self.zookeeper.port) + log.info(" zk_chroot = %s", self.zk_chroot) + log.info(" replicas = %s", self.replicas) + log.info(" partitions = %s", self.partitions) + log.info(" tmp_dir = %s", self.tmp_dir.strpath) self._create_zk_chroot() self.sasl_config = self._sasl_config() + self.jaas_config = self._jaas_config() # add user to zookeeper for the first server - if self.sasl_mechanism.startswith("SCRAM-SHA") and self.broker_id == 0: + if self.sasl_enabled and self.sasl_mechanism.startswith("SCRAM-SHA") and self.broker_id == 0: self._add_scram_user() self.start() @@ -582,7 +603,7 @@ def _enrich_client_params(self, params, **defaults): for key, value in defaults.items(): params.setdefault(key, value) params.setdefault('bootstrap_servers', self.bootstrap_server()) - if 'SASL' in self.transport: + if self.sasl_enabled: params.setdefault('sasl_mechanism', self.sasl_mechanism) params.setdefault('security_protocol', self.transport) if self.sasl_mechanism in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'): From 01f48d7a4d05d3eb9158476fea76728ff1bbb1f0 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 15 Oct 2019 22:09:47 +0200 Subject: [PATCH 18/46] remove sasl mechanism settings from server.properties for 1.1.1 --- servers/1.1.1/resources/kafka.properties | 4 ---- 1 file changed, 4 deletions(-) diff --git a/servers/1.1.1/resources/kafka.properties b/servers/1.1.1/resources/kafka.properties index 7770909bb..38f751dd0 100644 --- a/servers/1.1.1/resources/kafka.properties +++ b/servers/1.1.1/resources/kafka.properties @@ -34,10 +34,6 @@ ssl.truststore.password=foobar authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer allow.everyone.if.no.acl.found=true -# List of enabled mechanisms, can be more than one -sasl.enabled.mechanisms=PLAIN -sasl.mechanism.inter.broker.protocol=PLAIN - # The port the socket server listens on #port=9092 From 3f399d3196a9d2441ca9ea6b6b8167509c5a79bc Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 15 Oct 2019 22:10:39 +0200 Subject: [PATCH 19/46] change scram_pattern in KafkaFixture --- test/fixtures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/fixtures.py b/test/fixtures.py index 014fb32aa..ee07a7612 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -285,7 +285,7 @@ def __init__(self, host, port, broker_id, zookeeper, zk_chroot, # The logging format changed slightly in 1.0.0 self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % (broker_id,) # Need to wait until the broker has fetched user configs from zookeeper in case we use scram as sasl mechanism - self.scram_pattern = r"Processing override for entityPath: users/%s" % (self.broker_user) + self.scram_pattern = r"Removing Produce quota for user %s" % (self.broker_user) self.zookeeper = zookeeper self.zk_chroot = zk_chroot From fecdc46333decb5bd17ec967a3538688e488400d Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 15 Oct 2019 22:11:10 +0200 Subject: [PATCH 20/46] minor refactoring of least_loaded_node --- kafka/client_async.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 14677d0b6..9c0b5e615 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -767,10 +767,7 @@ def least_loaded_node(self): inflight = curr_inflight found = node_id - if found is not None: - return found - - return None + return found def set_topics(self, topics): """Set specific topics to track for metadata. From 9a99f950a0a46b18b2c3207ba06f84cb393221c8 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 15 Oct 2019 22:16:49 +0200 Subject: [PATCH 21/46] update docstrings with information about new sasl mechanisms --- kafka/admin/client.py | 10 +++++----- kafka/client_async.py | 10 +++++----- kafka/conn.py | 10 +++++----- kafka/consumer/group.py | 10 +++++----- kafka/producer/kafka.py | 10 +++++----- 5 files changed, 25 insertions(+), 25 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index bb1e2b5cf..70845d8af 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -130,11 +130,11 @@ class KafkaAdminClient(object): metric_group_prefix (str): Prefix for metric names. Default: '' sasl_mechanism (str): Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: - PLAIN, GSSAPI, OAUTHBEARER. - sasl_plain_username (str): username for sasl PLAIN authentication. - Required if sasl_mechanism is PLAIN. - sasl_plain_password (str): password for sasl PLAIN authentication. - Required if sasl_mechanism is PLAIN. + PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512. + sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication. + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. + sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication. + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. sasl_kerberos_service_name (str): Service name to include in GSSAPI sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI diff --git a/kafka/client_async.py b/kafka/client_async.py index 9c0b5e615..299cbe02f 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -143,11 +143,11 @@ class KafkaClient(object): metric_group_prefix (str): Prefix for metric names. Default: '' sasl_mechanism (str): Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: - PLAIN, GSSAPI, OAUTHBEARER. - sasl_plain_username (str): username for sasl PLAIN authentication. - Required if sasl_mechanism is PLAIN. - sasl_plain_password (str): password for sasl PLAIN authentication. - Required if sasl_mechanism is PLAIN. + PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512. + sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication. + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. + sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication. + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. sasl_kerberos_service_name (str): Service name to include in GSSAPI sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI diff --git a/kafka/conn.py b/kafka/conn.py index a9634041b..02f2dcc75 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -249,11 +249,11 @@ class BrokerConnection(object): metric_group_prefix (str): Prefix for metric names. Default: '' sasl_mechanism (str): Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: - PLAIN, GSSAPI, OAUTHBEARER. - sasl_plain_username (str): username for sasl PLAIN authentication. - Required if sasl_mechanism is PLAIN. - sasl_plain_password (str): password for sasl PLAIN authentication. - Required if sasl_mechanism is PLAIN. + PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512. + sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication. + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. + sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication. + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. sasl_kerberos_service_name (str): Service name to include in GSSAPI sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index e9fd44c97..20de215a8 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -231,11 +231,11 @@ class KafkaConsumer(six.Iterator): subscribing to it. Requires 0.10+ Default: True sasl_mechanism (str): Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: - PLAIN, GSSAPI, OAUTHBEARER. - sasl_plain_username (str): Username for sasl PLAIN authentication. - Required if sasl_mechanism is PLAIN. - sasl_plain_password (str): Password for sasl PLAIN authentication. - Required if sasl_mechanism is PLAIN. + PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512. + sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication. + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. + sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication. + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. sasl_kerberos_service_name (str): Service name to include in GSSAPI sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 3ff1a0913..274f7528f 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -268,11 +268,11 @@ class KafkaProducer(object): Default: selectors.DefaultSelector sasl_mechanism (str): Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: - PLAIN, GSSAPI, OAUTHBEARER. - sasl_plain_username (str): username for sasl PLAIN authentication. - Required if sasl_mechanism is PLAIN. - sasl_plain_password (str): password for sasl PLAIN authentication. - Required if sasl_mechanism is PLAIN. + PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512. + sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication. + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. + sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication. + Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. sasl_kerberos_service_name (str): Service name to include in GSSAPI sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI From 60702c624aab7a88b6a661fe5871461ff0b60f34 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Tue, 15 Oct 2019 22:46:34 +0200 Subject: [PATCH 22/46] minor refactoring of test_sasl_integration.py --- test/test_sasl_integration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_sasl_integration.py b/test/test_sasl_integration.py index 69c04996b..ca59a16ad 100644 --- a/test/test_sasl_integration.py +++ b/test/test_sasl_integration.py @@ -65,7 +65,7 @@ def test_client(request, sasl_kafka): topic_name = special_to_underscore(request.node.name + random_string(4)) sasl_kafka.create_topics([topic_name], num_partitions=1) - client = next(sasl_kafka.get_clients(1), None) + client, = sasl_kafka.get_clients(1) request = MetadataRequest_v1(None) client.send(0, request) for _ in range(10): From b61d68f0f17c440b59c1de8a3e17aa1de4a65502 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 16 Oct 2019 08:24:49 +0200 Subject: [PATCH 23/46] trying to make it run in travis --- test/fixtures.py | 6 +++++- test/service.py | 3 ++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index ee07a7612..3a2e1c736 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -400,7 +400,11 @@ def start(self): args = self.kafka_run_class_args("kafka.Kafka", properties.strpath) env = self.kafka_run_class_env() if self.sasl_enabled: - env['KAFKA_OPTS'] = env.get('KAFKA_OPTS', '') + ' -Djava.security.auth.login.config={}'.format(jaas_conf) + opts = env.get('KAFKA_OPTS', '').strip() + if opts: + opts += ' ' + opts += '-Djava.security.auth.login.config={}'.format(jaas_conf.strpath) + env['KAFKA_OPTS'] = opts self.render_template(jaas_conf_template, jaas_conf, vars(self)) timeout = 5 diff --git a/test/service.py b/test/service.py index 483708a14..5a9133ea1 100644 --- a/test/service.py +++ b/test/service.py @@ -111,7 +111,8 @@ def wait_for(self, pattern, timeout=30): start = time.time() while True: if not self.is_alive(): - raise RuntimeError("Child thread died already.") + log.info("Child thread died already.") + return False elapsed = time.time() - start if elapsed >= timeout: From e641521f8c4b64d32edc391b24be31d8ce2c8821 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 16 Oct 2019 08:38:19 +0200 Subject: [PATCH 24/46] go back to raising runtime error in wait_for --- test/service.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/service.py b/test/service.py index 5a9133ea1..483708a14 100644 --- a/test/service.py +++ b/test/service.py @@ -111,8 +111,7 @@ def wait_for(self, pattern, timeout=30): start = time.time() while True: if not self.is_alive(): - log.info("Child thread died already.") - return False + raise RuntimeError("Child thread died already.") elapsed = time.time() - start if elapsed >= timeout: From 720daa3f751d8979faa8edd3f8d66273c4025a5e Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 16 Oct 2019 09:20:02 +0200 Subject: [PATCH 25/46] reduce kafka startup timeout and log config file content --- test/fixtures.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index 3a2e1c736..af4989d41 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -401,14 +401,12 @@ def start(self): env = self.kafka_run_class_env() if self.sasl_enabled: opts = env.get('KAFKA_OPTS', '').strip() - if opts: - opts += ' ' - opts += '-Djava.security.auth.login.config={}'.format(jaas_conf.strpath) + opts += '-Djava.security.auth.login.config={} '.format(jaas_conf.strpath) env['KAFKA_OPTS'] = opts self.render_template(jaas_conf_template, jaas_conf, vars(self)) timeout = 5 - max_timeout = 120 + max_timeout = 30 backoff = 1 end_at = time.time() + max_timeout tries = 1 @@ -429,6 +427,15 @@ def start(self): self.child.dump_logs() self.child.stop() + log.info("server.properties:") + with open(properties, 'r') as o: + for line in o: + log.info(' '+line) + log.info("kafka_server_jaas.conf:") + with open(jaas_conf, 'r') as o: + for line in o: + log.info(' '+line) + timeout *= 2 time.sleep(backoff) tries += 1 From de6a360f6fbd10f0922fffc6d93e9e4c80af1a76 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 16 Oct 2019 09:41:45 +0200 Subject: [PATCH 26/46] reduce tests to get quicker feedback --- .travis.yml | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/.travis.yml b/.travis.yml index 4023972f6..7944c6bcb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,19 +2,20 @@ language: python dist: xenial +# TODO revert changes python: - 2.7 - - 3.4 + # - 3.4 - 3.7 - - pypy2.7-6.0 + # - pypy2.7-6.0 env: - - KAFKA_VERSION=0.8.2.2 - - KAFKA_VERSION=0.9.0.1 + # - KAFKA_VERSION=0.8.2.2 + # - KAFKA_VERSION=0.9.0.1 - KAFKA_VERSION=0.10.2.2 - - KAFKA_VERSION=0.11.0.3 - - KAFKA_VERSION=1.1.1 - - KAFKA_VERSION=2.3.0 + # - KAFKA_VERSION=0.11.0.3 + # - KAFKA_VERSION=1.1.1 + # - KAFKA_VERSION=2.3.0 addons: apt: From 1f98ee47c4cc1988a47f3a3e963b49bdf3d4f72a Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 16 Oct 2019 09:42:26 +0200 Subject: [PATCH 27/46] output config file contents earlier --- test/fixtures.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index af4989d41..f1345136e 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -419,6 +419,16 @@ def start(self): self.port = get_open_port() self.out('Attempting to start on port %d (try #%d)' % (self.port, tries)) self.render_template(properties_template, properties, vars(self)) + + log.info("server.properties:") + with open(properties.strpath, 'r') as o: + for line in o: + log.info(' '+line) + log.info("kafka_server_jaas.conf:") + with open(jaas_conf.strpath, 'r') as o: + for line in o: + log.info(' '+line) + self.child = SpawnedService(args, env) self.child.start() timeout = min(timeout, max(end_at - time.time(), 0)) @@ -427,14 +437,6 @@ def start(self): self.child.dump_logs() self.child.stop() - log.info("server.properties:") - with open(properties, 'r') as o: - for line in o: - log.info(' '+line) - log.info("kafka_server_jaas.conf:") - with open(jaas_conf, 'r') as o: - for line in o: - log.info(' '+line) timeout *= 2 time.sleep(backoff) From fa1ca01269a97286aaf61524843d9f407bdbe20d Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 16 Oct 2019 10:38:18 +0200 Subject: [PATCH 28/46] further investiagte settings --- test/fixtures.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index f1345136e..3f66804ae 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -404,6 +404,10 @@ def start(self): opts += '-Djava.security.auth.login.config={} '.format(jaas_conf.strpath) env['KAFKA_OPTS'] = opts self.render_template(jaas_conf_template, jaas_conf, vars(self)) + log.info("kafka_server_jaas.conf:") + with open(jaas_conf.strpath, 'r') as o: + for line in o: + log.info(' '+line.strip()) timeout = 5 max_timeout = 30 @@ -419,15 +423,12 @@ def start(self): self.port = get_open_port() self.out('Attempting to start on port %d (try #%d)' % (self.port, tries)) self.render_template(properties_template, properties, vars(self)) + log.info("vars(self): {}".format(vars(self))) log.info("server.properties:") with open(properties.strpath, 'r') as o: for line in o: - log.info(' '+line) - log.info("kafka_server_jaas.conf:") - with open(jaas_conf.strpath, 'r') as o: - for line in o: - log.info(' '+line) + log.info(' '+line.strip()) self.child = SpawnedService(args, env) self.child.start() From e41c0be50e7e8387c394244204ca7ad377f17479 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 16 Oct 2019 10:42:22 +0200 Subject: [PATCH 29/46] try to get rid of emtpy log lines --- test/service.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/test/service.py b/test/service.py index 483708a14..3cf4a87be 100644 --- a/test/service.py +++ b/test/service.py @@ -88,12 +88,14 @@ def run(self): raise if self.child.stdout in rds: - line = self.child.stdout.readline() - self.captured_stdout.append(line.decode('utf-8').rstrip()) + line = self.child.stdout.readline().decode('utf-8').rstrip() + if line: + self.captured_stdout.append(line) if self.child.stderr in rds: - line = self.child.stderr.readline() - self.captured_stderr.append(line.decode('utf-8').rstrip()) + line = self.child.stderr.readline().decode('utf-8').rstrip() + if line: + self.captured_stderr.append(line) if self.child.poll() is not None: self.dump_logs() From 7af9f97db03a621453213e675b027a004a91e6ec Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 16 Oct 2019 11:54:32 +0200 Subject: [PATCH 30/46] move investigation to rendering function --- test/fixtures.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index 3f66804ae..552fc830d 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -140,6 +140,14 @@ def render_template(cls, source_file, target_file, binding): dirfd = os.open(os.path.dirname(target_file.strpath), os.O_DIRECTORY) os.fsync(dirfd) os.close(dirfd) + log.info("Template string:") + for line in template.splitlines(): + log.info(' ' + line.strip()) + log.info("Rendered template:") + with open(target_file.strpath, 'r') as o: + for line in o: + log.info(' ' + line.strip()) + log.info("binding: {}".format(binding)) def dump_logs(self): self.child.dump_logs() @@ -300,7 +308,8 @@ def __init__(self, host, port, broker_id, zookeeper, zk_chroot, self.running = False self._client = None - self.sasl_config = None + self.sasl_config = '' + self.jaas_config = '' def _sasl_config(self): if not self.sasl_enabled: @@ -401,13 +410,9 @@ def start(self): env = self.kafka_run_class_env() if self.sasl_enabled: opts = env.get('KAFKA_OPTS', '').strip() - opts += '-Djava.security.auth.login.config={} '.format(jaas_conf.strpath) + opts += ' -Djava.security.auth.login.config={}'.format(jaas_conf.strpath) env['KAFKA_OPTS'] = opts self.render_template(jaas_conf_template, jaas_conf, vars(self)) - log.info("kafka_server_jaas.conf:") - with open(jaas_conf.strpath, 'r') as o: - for line in o: - log.info(' '+line.strip()) timeout = 5 max_timeout = 30 @@ -423,12 +428,6 @@ def start(self): self.port = get_open_port() self.out('Attempting to start on port %d (try #%d)' % (self.port, tries)) self.render_template(properties_template, properties, vars(self)) - log.info("vars(self): {}".format(vars(self))) - - log.info("server.properties:") - with open(properties.strpath, 'r') as o: - for line in o: - log.info(' '+line.strip()) self.child = SpawnedService(args, env) self.child.start() From 0ef668e0f1fa373d54117913dca7bf52270d00a4 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 16 Oct 2019 11:55:28 +0200 Subject: [PATCH 31/46] stop on first failed test --- tox.ini | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 06403d6ed..e2e2d7802 100644 --- a/tox.ini +++ b/tox.ini @@ -19,7 +19,9 @@ deps = xxhash crc32c commands = - py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc} +# TODO: revert change +# py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc} + py.test {posargs:-x} setenv = CRC32C_SW_MODE = auto PROJECT_ROOT = {toxinidir} From 65bb4faada9b28cbb4ffe3d7fe03b1b13a6b9bc9 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 16 Oct 2019 14:31:01 +0200 Subject: [PATCH 32/46] use strpath in render_template --- test/fixtures.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index 552fc830d..2ba86e4f7 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -126,10 +126,10 @@ def kafka_run_class_env(self): @classmethod def render_template(cls, source_file, target_file, binding): - log.info('Rendering %s from template %s', target_file.strpath, source_file) - with open(source_file, "r") as handle: + log.info('Rendering %s from template %s', target_file.strpath, source_file.strpath) + with open(source_file.strpath, "r") as handle: template = handle.read() - assert len(template) > 0, 'Empty template %s' % (source_file,) + assert len(template) > 0, 'Empty template %s' % (source_file.strpath,) with open(target_file.strpath, "w") as handle: handle.write(template.format(**binding)) handle.flush() From 082bb4ef449597cd6c875a6ded0aa73099fcf259 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 16 Oct 2019 14:35:46 +0200 Subject: [PATCH 33/46] check kafka.properties files for the `sasl` keyword --- .travis.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.travis.yml b/.travis.yml index 7944c6bcb..afa65de51 100644 --- a/.travis.yml +++ b/.travis.yml @@ -37,6 +37,11 @@ install: - pip install . script: + - | + for prop in $(find . -name kafka.properties | sort); do + printf "%s: %s\n" "$prop" "$(grep sasl $prop)" + done + - tox -e `if [ "$TRAVIS_PYTHON_VERSION" == "pypy2.7-6.0" ]; then echo pypy; else echo py${TRAVIS_PYTHON_VERSION/./}; fi` after_success: From b1d1ee01389d8a8ed628cefc6254ade3e400c10a Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 16 Oct 2019 14:55:15 +0200 Subject: [PATCH 34/46] add another commit to kafka.properties to make sure changes are not overridden during merge --- servers/0.10.0.0/resources/kafka.properties | 1 + servers/0.10.0.1/resources/kafka.properties | 1 + servers/0.10.1.1/resources/kafka.properties | 1 + servers/0.10.2.1/resources/kafka.properties | 1 + servers/0.10.2.2/resources/kafka.properties | 1 + servers/0.11.0.0/resources/kafka.properties | 1 + servers/0.11.0.1/resources/kafka.properties | 1 + servers/0.11.0.2/resources/kafka.properties | 1 + servers/0.11.0.3/resources/kafka.properties | 1 + servers/1.0.0/resources/kafka.properties | 1 + servers/1.0.1/resources/kafka.properties | 1 + servers/1.0.2/resources/kafka.properties | 1 + servers/1.1.0/resources/kafka.properties | 1 + servers/1.1.1/resources/kafka.properties | 1 + servers/2.0.0/resources/kafka.properties | 1 + servers/2.0.1/resources/kafka.properties | 1 + servers/2.1.0/resources/kafka.properties | 1 + servers/2.1.1/resources/kafka.properties | 1 + servers/2.2.0/resources/kafka.properties | 1 + servers/2.2.1/resources/kafka.properties | 1 + servers/2.3.0/resources/kafka.properties | 1 + 21 files changed, 21 insertions(+) diff --git a/servers/0.10.0.0/resources/kafka.properties b/servers/0.10.0.0/resources/kafka.properties index 074797523..b7626065f 100644 --- a/servers/0.10.0.0/resources/kafka.properties +++ b/servers/0.10.0.0/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks diff --git a/servers/0.10.0.1/resources/kafka.properties b/servers/0.10.0.1/resources/kafka.properties index 074797523..b7626065f 100644 --- a/servers/0.10.0.1/resources/kafka.properties +++ b/servers/0.10.0.1/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks diff --git a/servers/0.10.1.1/resources/kafka.properties b/servers/0.10.1.1/resources/kafka.properties index 074797523..b7626065f 100644 --- a/servers/0.10.1.1/resources/kafka.properties +++ b/servers/0.10.1.1/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks diff --git a/servers/0.10.2.1/resources/kafka.properties b/servers/0.10.2.1/resources/kafka.properties index 074797523..b7626065f 100644 --- a/servers/0.10.2.1/resources/kafka.properties +++ b/servers/0.10.2.1/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks diff --git a/servers/0.10.2.2/resources/kafka.properties b/servers/0.10.2.2/resources/kafka.properties index 074797523..b7626065f 100644 --- a/servers/0.10.2.2/resources/kafka.properties +++ b/servers/0.10.2.2/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks diff --git a/servers/0.11.0.0/resources/kafka.properties b/servers/0.11.0.0/resources/kafka.properties index 38f751dd0..5775cfdc4 100644 --- a/servers/0.11.0.0/resources/kafka.properties +++ b/servers/0.11.0.0/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks diff --git a/servers/0.11.0.1/resources/kafka.properties b/servers/0.11.0.1/resources/kafka.properties index 38f751dd0..5775cfdc4 100644 --- a/servers/0.11.0.1/resources/kafka.properties +++ b/servers/0.11.0.1/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks diff --git a/servers/0.11.0.2/resources/kafka.properties b/servers/0.11.0.2/resources/kafka.properties index 38f751dd0..5775cfdc4 100644 --- a/servers/0.11.0.2/resources/kafka.properties +++ b/servers/0.11.0.2/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks diff --git a/servers/0.11.0.3/resources/kafka.properties b/servers/0.11.0.3/resources/kafka.properties index 38f751dd0..5775cfdc4 100644 --- a/servers/0.11.0.3/resources/kafka.properties +++ b/servers/0.11.0.3/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks diff --git a/servers/1.0.0/resources/kafka.properties b/servers/1.0.0/resources/kafka.properties index 38f751dd0..5775cfdc4 100644 --- a/servers/1.0.0/resources/kafka.properties +++ b/servers/1.0.0/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks diff --git a/servers/1.0.1/resources/kafka.properties b/servers/1.0.1/resources/kafka.properties index 38f751dd0..5775cfdc4 100644 --- a/servers/1.0.1/resources/kafka.properties +++ b/servers/1.0.1/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks diff --git a/servers/1.0.2/resources/kafka.properties b/servers/1.0.2/resources/kafka.properties index 38f751dd0..5775cfdc4 100644 --- a/servers/1.0.2/resources/kafka.properties +++ b/servers/1.0.2/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks diff --git a/servers/1.1.0/resources/kafka.properties b/servers/1.1.0/resources/kafka.properties index 38f751dd0..5775cfdc4 100644 --- a/servers/1.1.0/resources/kafka.properties +++ b/servers/1.1.0/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks diff --git a/servers/1.1.1/resources/kafka.properties b/servers/1.1.1/resources/kafka.properties index 38f751dd0..5775cfdc4 100644 --- a/servers/1.1.1/resources/kafka.properties +++ b/servers/1.1.1/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks diff --git a/servers/2.0.0/resources/kafka.properties b/servers/2.0.0/resources/kafka.properties index 38f751dd0..5775cfdc4 100644 --- a/servers/2.0.0/resources/kafka.properties +++ b/servers/2.0.0/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks diff --git a/servers/2.0.1/resources/kafka.properties b/servers/2.0.1/resources/kafka.properties index 38f751dd0..5775cfdc4 100644 --- a/servers/2.0.1/resources/kafka.properties +++ b/servers/2.0.1/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks diff --git a/servers/2.1.0/resources/kafka.properties b/servers/2.1.0/resources/kafka.properties index 38f751dd0..5775cfdc4 100644 --- a/servers/2.1.0/resources/kafka.properties +++ b/servers/2.1.0/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks diff --git a/servers/2.1.1/resources/kafka.properties b/servers/2.1.1/resources/kafka.properties index 38f751dd0..5775cfdc4 100644 --- a/servers/2.1.1/resources/kafka.properties +++ b/servers/2.1.1/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks diff --git a/servers/2.2.0/resources/kafka.properties b/servers/2.2.0/resources/kafka.properties index 38f751dd0..5775cfdc4 100644 --- a/servers/2.2.0/resources/kafka.properties +++ b/servers/2.2.0/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks diff --git a/servers/2.2.1/resources/kafka.properties b/servers/2.2.1/resources/kafka.properties index 38f751dd0..5775cfdc4 100644 --- a/servers/2.2.1/resources/kafka.properties +++ b/servers/2.2.1/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks diff --git a/servers/2.3.0/resources/kafka.properties b/servers/2.3.0/resources/kafka.properties index 38f751dd0..5775cfdc4 100644 --- a/servers/2.3.0/resources/kafka.properties +++ b/servers/2.3.0/resources/kafka.properties @@ -23,6 +23,7 @@ broker.id={broker_id} listeners={transport}://{host}:{port} security.inter.broker.protocol={transport} + {sasl_config} ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks From a832ae59f396ace21422aa9737a57d25ffb1760a Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 16 Oct 2019 15:24:53 +0200 Subject: [PATCH 35/46] Revert "use strpath in render_template" This reverts commit 65bb4faa --- test/fixtures.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index 2ba86e4f7..552fc830d 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -126,10 +126,10 @@ def kafka_run_class_env(self): @classmethod def render_template(cls, source_file, target_file, binding): - log.info('Rendering %s from template %s', target_file.strpath, source_file.strpath) - with open(source_file.strpath, "r") as handle: + log.info('Rendering %s from template %s', target_file.strpath, source_file) + with open(source_file, "r") as handle: template = handle.read() - assert len(template) > 0, 'Empty template %s' % (source_file.strpath,) + assert len(template) > 0, 'Empty template %s' % (source_file,) with open(target_file.strpath, "w") as handle: handle.write(template.format(**binding)) handle.flush() From 97778fc3d3e14e70708798c4dbec1a40c0805808 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 16 Oct 2019 16:48:08 +0200 Subject: [PATCH 36/46] limit cache to servers/dist --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index afa65de51..ac3c3f4ae 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,7 +26,7 @@ addons: cache: directories: - $HOME/.cache/pip - - servers/ + - servers/dist before_install: - source travis_java_install.sh From 77ec1b36698d1c07a47ec9e91f8be41242be2235 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 16 Oct 2019 17:06:58 +0200 Subject: [PATCH 37/46] set offsets.topic.replication.factor to 1 in all kafka.properties --- servers/0.10.0.0/resources/kafka.properties | 2 +- servers/0.10.0.1/resources/kafka.properties | 2 +- servers/0.10.1.1/resources/kafka.properties | 2 +- servers/0.10.2.1/resources/kafka.properties | 2 +- servers/0.10.2.2/resources/kafka.properties | 2 +- servers/0.9.0.0/resources/kafka.properties | 2 +- servers/0.9.0.1/resources/kafka.properties | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/servers/0.10.0.0/resources/kafka.properties b/servers/0.10.0.0/resources/kafka.properties index b7626065f..daab312b0 100644 --- a/servers/0.10.0.0/resources/kafka.properties +++ b/servers/0.10.0.0/resources/kafka.properties @@ -123,7 +123,7 @@ log.cleaner.enable=false # tune down offset topics to reduce setup time in tests offsets.commit.timeout.ms=500 offsets.topic.num.partitions=2 -offsets.topic.replication.factor=2 +offsets.topic.replication.factor=1 # Allow shorter session timeouts for tests group.min.session.timeout.ms=1000 diff --git a/servers/0.10.0.1/resources/kafka.properties b/servers/0.10.0.1/resources/kafka.properties index b7626065f..daab312b0 100644 --- a/servers/0.10.0.1/resources/kafka.properties +++ b/servers/0.10.0.1/resources/kafka.properties @@ -123,7 +123,7 @@ log.cleaner.enable=false # tune down offset topics to reduce setup time in tests offsets.commit.timeout.ms=500 offsets.topic.num.partitions=2 -offsets.topic.replication.factor=2 +offsets.topic.replication.factor=1 # Allow shorter session timeouts for tests group.min.session.timeout.ms=1000 diff --git a/servers/0.10.1.1/resources/kafka.properties b/servers/0.10.1.1/resources/kafka.properties index b7626065f..daab312b0 100644 --- a/servers/0.10.1.1/resources/kafka.properties +++ b/servers/0.10.1.1/resources/kafka.properties @@ -123,7 +123,7 @@ log.cleaner.enable=false # tune down offset topics to reduce setup time in tests offsets.commit.timeout.ms=500 offsets.topic.num.partitions=2 -offsets.topic.replication.factor=2 +offsets.topic.replication.factor=1 # Allow shorter session timeouts for tests group.min.session.timeout.ms=1000 diff --git a/servers/0.10.2.1/resources/kafka.properties b/servers/0.10.2.1/resources/kafka.properties index b7626065f..daab312b0 100644 --- a/servers/0.10.2.1/resources/kafka.properties +++ b/servers/0.10.2.1/resources/kafka.properties @@ -123,7 +123,7 @@ log.cleaner.enable=false # tune down offset topics to reduce setup time in tests offsets.commit.timeout.ms=500 offsets.topic.num.partitions=2 -offsets.topic.replication.factor=2 +offsets.topic.replication.factor=1 # Allow shorter session timeouts for tests group.min.session.timeout.ms=1000 diff --git a/servers/0.10.2.2/resources/kafka.properties b/servers/0.10.2.2/resources/kafka.properties index b7626065f..daab312b0 100644 --- a/servers/0.10.2.2/resources/kafka.properties +++ b/servers/0.10.2.2/resources/kafka.properties @@ -123,7 +123,7 @@ log.cleaner.enable=false # tune down offset topics to reduce setup time in tests offsets.commit.timeout.ms=500 offsets.topic.num.partitions=2 -offsets.topic.replication.factor=2 +offsets.topic.replication.factor=1 # Allow shorter session timeouts for tests group.min.session.timeout.ms=1000 diff --git a/servers/0.9.0.0/resources/kafka.properties b/servers/0.9.0.0/resources/kafka.properties index b4c4088db..fb859dd44 100644 --- a/servers/0.9.0.0/resources/kafka.properties +++ b/servers/0.9.0.0/resources/kafka.properties @@ -121,7 +121,7 @@ log.cleaner.enable=false # tune down offset topics to reduce setup time in tests offsets.commit.timeout.ms=500 offsets.topic.num.partitions=2 -offsets.topic.replication.factor=2 +offsets.topic.replication.factor=1 # Allow shorter session timeouts for tests group.min.session.timeout.ms=1000 diff --git a/servers/0.9.0.1/resources/kafka.properties b/servers/0.9.0.1/resources/kafka.properties index 7d8e2b1f0..28668db95 100644 --- a/servers/0.9.0.1/resources/kafka.properties +++ b/servers/0.9.0.1/resources/kafka.properties @@ -121,7 +121,7 @@ log.cleaner.enable=false # tune down offset topics to reduce setup time in tests offsets.commit.timeout.ms=500 offsets.topic.num.partitions=2 -offsets.topic.replication.factor=2 +offsets.topic.replication.factor=1 # Allow shorter session timeouts for tests group.min.session.timeout.ms=1000 From 8aca0ee3ed3cef63f483a431bf5e8974e431dd1f Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Wed, 16 Oct 2019 18:01:08 +0200 Subject: [PATCH 38/46] dump logs on sasl_kafka tear down --- test/test_sasl_integration.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/test_sasl_integration.py b/test/test_sasl_integration.py index ca59a16ad..e3a4813ae 100644 --- a/test/test_sasl_integration.py +++ b/test/test_sasl_integration.py @@ -24,7 +24,9 @@ ] ) def sasl_kafka(request, kafka_broker_factory): - return kafka_broker_factory(transport="SASL_PLAINTEXT", sasl_mechanism=request.param)[0] + sasl_kafka = kafka_broker_factory(transport="SASL_PLAINTEXT", sasl_mechanism=request.param)[0] + yield sasl_kafka + sasl_kafka.child.dump_logs() def test_admin(request, sasl_kafka): From d5e9a0fe1a0e9baeb1fff3bd6846a5f12bcf9148 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Thu, 17 Oct 2019 08:14:28 +0200 Subject: [PATCH 39/46] try shorter topic name for sasl test --- test/test_sasl_integration.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/test_sasl_integration.py b/test/test_sasl_integration.py index e3a4813ae..819b96ec6 100644 --- a/test/test_sasl_integration.py +++ b/test/test_sasl_integration.py @@ -37,7 +37,8 @@ def test_admin(request, sasl_kafka): def test_produce_and_consume(request, sasl_kafka): - topic_name = special_to_underscore(request.node.name + random_string(4)) + topic_name = "shortname_" + random_string(4) + # topic_name = special_to_underscore(request.node.name + random_string(4)) sasl_kafka.create_topics([topic_name], num_partitions=2) producer, = sasl_kafka.get_producers(1) From a3aadf3de55854743ea59b706808ec3a7afb677b Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Thu, 17 Oct 2019 08:22:34 +0200 Subject: [PATCH 40/46] enable debug mode for kafka --- servers/0.10.2.2/resources/log4j.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servers/0.10.2.2/resources/log4j.properties b/servers/0.10.2.2/resources/log4j.properties index b0b76aa79..0cdcc24e8 100644 --- a/servers/0.10.2.2/resources/log4j.properties +++ b/servers/0.10.2.2/resources/log4j.properties @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=INFO, stdout, logfile +log4j.rootLogger=DEBUG, stdout, logfile log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout From c0b043755fec3f06eae659b5d3a9b90fa5cdee6d Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Thu, 17 Oct 2019 08:44:15 +0200 Subject: [PATCH 41/46] refactor KafkaFixture._create_topics and include retry logic --- test/fixtures.py | 81 ++++++++++++++++++++++++++++++------------------ 1 file changed, 50 insertions(+), 31 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index 552fc830d..57b9526eb 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -14,6 +14,7 @@ from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 from kafka import errors, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer +from kafka.errors import InvalidReplicationFactorError from kafka.protocol.admin import CreateTopicsRequest from kafka.protocol.metadata import MetadataRequest from test.testutil import env_kafka_version, random_string @@ -546,48 +547,66 @@ def _failure(error): else: pass # retry - def _create_topic(self, topic_name, num_partitions, replication_factor, timeout_ms=10000): + def _create_topic(self, topic_name, num_partitions=None, replication_factor=None, timeout_ms=10000): if num_partitions is None: num_partitions = self.partitions if replication_factor is None: replication_factor = self.replicas + if replication_factor > self.replicas: + msg = "Cannot have more replicas ({}) than brokers ({})".format(replication_factor, self.replicas) + raise ValueError(msg) # Try different methods to create a topic, from the fastest to the slowest if self.auto_create_topic and \ num_partitions == self.partitions and \ replication_factor == self.replicas: - self._send_request(MetadataRequest[0]([topic_name])) + self._create_topic_with_metadata(topic_name) elif env_kafka_version() >= (0, 10, 1, 0): - request = CreateTopicsRequest[0]([(topic_name, num_partitions, - replication_factor, [], [])], timeout_ms) - result = self._send_request(request, timeout=timeout_ms) - for topic_result in result[0].topic_errors: - error_code = topic_result[1] - if error_code != 0: - raise errors.for_code(error_code) + try: + self._create_topic_with_admin_api(topic_name, num_partitions, replication_factor, timeout_ms) + except InvalidReplicationFactorError: + # wait and try again + # on travis the brokers sometimes take a while to find themselves + time.sleep(0.5) + self._create_topic_with_admin_api(topic_name, num_partitions, replication_factor, timeout_ms) else: - args = self.kafka_run_class_args('kafka.admin.TopicCommand', - '--zookeeper', '%s:%s/%s' % (self.zookeeper.host, - self.zookeeper.port, - self.zk_chroot), - '--create', - '--topic', topic_name, - '--partitions', self.partitions \ - if num_partitions is None else num_partitions, - '--replication-factor', self.replicas \ - if replication_factor is None \ - else replication_factor) - if env_kafka_version() >= (0, 10): - args.append('--if-not-exists') - env = self.kafka_run_class_env() - proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = proc.communicate() - if proc.returncode != 0: - if 'kafka.common.TopicExistsException' not in stdout: - self.out("Failed to create topic %s" % (topic_name,)) - self.out(stdout) - self.out(stderr) - raise RuntimeError("Failed to create topic %s" % (topic_name,)) + self._create_topic_with_cli(topic_name, num_partitions, replication_factor) + + def _create_topic_with_metadata(self, topic_name): + self._send_request(MetadataRequest[0]([topic_name])) + + def _create_topic_with_admin_api(self, topic_name, num_partitions, replication_factor, timeout_ms=10000): + request = CreateTopicsRequest[0]([(topic_name, num_partitions, + replication_factor, [], [])], timeout_ms) + result = self._send_request(request, timeout=timeout_ms) + for topic_result in result[0].topic_errors: + error_code = topic_result[1] + if error_code != 0: + raise errors.for_code(error_code) + + def _create_topic_with_cli(self, topic_name, num_partitions, replication_factor): + args = self.kafka_run_class_args('kafka.admin.TopicCommand', + '--zookeeper', '%s:%s/%s' % (self.zookeeper.host, + self.zookeeper.port, + self.zk_chroot), + '--create', + '--topic', topic_name, + '--partitions', self.partitions \ + if num_partitions is None else num_partitions, + '--replication-factor', self.replicas \ + if replication_factor is None \ + else replication_factor) + if env_kafka_version() >= (0, 10): + args.append('--if-not-exists') + env = self.kafka_run_class_env() + proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = proc.communicate() + if proc.returncode != 0: + if 'kafka.common.TopicExistsException' not in stdout: + self.out("Failed to create topic %s" % (topic_name,)) + self.out(stdout) + self.out(stderr) + raise RuntimeError("Failed to create topic %s" % (topic_name,)) def get_topic_names(self): args = self.kafka_run_class_args('kafka.admin.TopicCommand', From ca3c3044c1945f38a5c32efe897c8165af3f08ca Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Thu, 17 Oct 2019 09:22:06 +0200 Subject: [PATCH 42/46] fix bug in KafkaFixture._send_request --- test/fixtures.py | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index 57b9526eb..edf937a77 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -538,7 +538,8 @@ def _failure(error): future = self._client.send(node_id, request) future.error_on_callbacks = True future.add_errback(_failure) - return self._client.poll(future=future, timeout_ms=timeout) + self._client.poll(future=future, timeout_ms=timeout) + return future.value except Exception as exc: time.sleep(1) retries -= 1 @@ -552,30 +553,25 @@ def _create_topic(self, topic_name, num_partitions=None, replication_factor=None num_partitions = self.partitions if replication_factor is None: replication_factor = self.replicas - if replication_factor > self.replicas: - msg = "Cannot have more replicas ({}) than brokers ({})".format(replication_factor, self.replicas) - raise ValueError(msg) # Try different methods to create a topic, from the fastest to the slowest - if self.auto_create_topic and \ - num_partitions == self.partitions and \ - replication_factor == self.replicas: - self._create_topic_with_metadata(topic_name) + if self.auto_create_topic and num_partitions == self.partitions and replication_factor == self.replicas: + self._create_topic_via_metadata(topic_name, timeout_ms) elif env_kafka_version() >= (0, 10, 1, 0): try: - self._create_topic_with_admin_api(topic_name, num_partitions, replication_factor, timeout_ms) + self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms) except InvalidReplicationFactorError: # wait and try again # on travis the brokers sometimes take a while to find themselves time.sleep(0.5) - self._create_topic_with_admin_api(topic_name, num_partitions, replication_factor, timeout_ms) + self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms) else: - self._create_topic_with_cli(topic_name, num_partitions, replication_factor) + self._create_topic_via_cli(topic_name, num_partitions, replication_factor) - def _create_topic_with_metadata(self, topic_name): - self._send_request(MetadataRequest[0]([topic_name])) + def _create_topic_via_metadata(self, topic_name, timeout_ms=10000): + self._send_request(MetadataRequest[0]([topic_name]), timeout_ms) - def _create_topic_with_admin_api(self, topic_name, num_partitions, replication_factor, timeout_ms=10000): + def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_factor, timeout_ms=10000): request = CreateTopicsRequest[0]([(topic_name, num_partitions, replication_factor, [], [])], timeout_ms) result = self._send_request(request, timeout=timeout_ms) @@ -584,7 +580,7 @@ def _create_topic_with_admin_api(self, topic_name, num_partitions, replication_f if error_code != 0: raise errors.for_code(error_code) - def _create_topic_with_cli(self, topic_name, num_partitions, replication_factor): + def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor): args = self.kafka_run_class_args('kafka.admin.TopicCommand', '--zookeeper', '%s:%s/%s' % (self.zookeeper.host, self.zookeeper.port, From e780de82fe191d268b0676138157508fb205fc8e Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Thu, 17 Oct 2019 09:28:01 +0200 Subject: [PATCH 43/46] another minor bugfix --- test/fixtures.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index edf937a77..59bfe5fdc 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -574,8 +574,8 @@ def _create_topic_via_metadata(self, topic_name, timeout_ms=10000): def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_factor, timeout_ms=10000): request = CreateTopicsRequest[0]([(topic_name, num_partitions, replication_factor, [], [])], timeout_ms) - result = self._send_request(request, timeout=timeout_ms) - for topic_result in result[0].topic_errors: + response = self._send_request(request, timeout=timeout_ms) + for topic_result in response.topic_errors: error_code = topic_result[1] if error_code != 0: raise errors.for_code(error_code) From 1fe4c1779b1fb3be3f0c3aab92b76a681328ed51 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Thu, 17 Oct 2019 09:52:07 +0200 Subject: [PATCH 44/46] revert changes introduced for debugging --- .travis.yml | 20 +++++++------------- servers/0.10.2.2/resources/log4j.properties | 2 +- test/fixtures.py | 12 +++++++----- test/service.py | 4 ++-- test/test_sasl_integration.py | 3 +-- tox.ini | 4 +--- 6 files changed, 19 insertions(+), 26 deletions(-) diff --git a/.travis.yml b/.travis.yml index ac3c3f4ae..a245650ab 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,20 +2,19 @@ language: python dist: xenial -# TODO revert changes python: - 2.7 - # - 3.4 + - 3.4 - 3.7 - # - pypy2.7-6.0 + - pypy2.7-6.0 env: - # - KAFKA_VERSION=0.8.2.2 - # - KAFKA_VERSION=0.9.0.1 + - KAFKA_VERSION=0.8.2.2 + - KAFKA_VERSION=0.9.0.1 - KAFKA_VERSION=0.10.2.2 - # - KAFKA_VERSION=0.11.0.3 - # - KAFKA_VERSION=1.1.1 - # - KAFKA_VERSION=2.3.0 + - KAFKA_VERSION=0.11.0.3 + - KAFKA_VERSION=1.1.1 + - KAFKA_VERSION=2.3.0 addons: apt: @@ -37,11 +36,6 @@ install: - pip install . script: - - | - for prop in $(find . -name kafka.properties | sort); do - printf "%s: %s\n" "$prop" "$(grep sasl $prop)" - done - - tox -e `if [ "$TRAVIS_PYTHON_VERSION" == "pypy2.7-6.0" ]; then echo pypy; else echo py${TRAVIS_PYTHON_VERSION/./}; fi` after_success: diff --git a/servers/0.10.2.2/resources/log4j.properties b/servers/0.10.2.2/resources/log4j.properties index 0cdcc24e8..b0b76aa79 100644 --- a/servers/0.10.2.2/resources/log4j.properties +++ b/servers/0.10.2.2/resources/log4j.properties @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=DEBUG, stdout, logfile +log4j.rootLogger=INFO, stdout, logfile log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout diff --git a/test/fixtures.py b/test/fixtures.py index 59bfe5fdc..b9a6f7411 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -141,14 +141,16 @@ def render_template(cls, source_file, target_file, binding): dirfd = os.open(os.path.dirname(target_file.strpath), os.O_DIRECTORY) os.fsync(dirfd) os.close(dirfd) - log.info("Template string:") + log.debug("Template string:") for line in template.splitlines(): - log.info(' ' + line.strip()) - log.info("Rendered template:") + log.debug(' ' + line.strip()) + log.debug("Rendered template:") with open(target_file.strpath, 'r') as o: for line in o: - log.info(' ' + line.strip()) - log.info("binding: {}".format(binding)) + log.debug(' ' + line.strip()) + log.debug("binding:".format(binding)) + for key, value in binding.items(): + log.debug(" {key}={value}".format(key=key, value=value)) def dump_logs(self): self.child.dump_logs() diff --git a/test/service.py b/test/service.py index 3cf4a87be..045d780e7 100644 --- a/test/service.py +++ b/test/service.py @@ -47,9 +47,9 @@ def __init__(self, args=None, env=None): self.daemon = True log.info("Created service for command:") log.info(" "+' '.join(self.args)) - log.info("With environment:") + log.debug("With environment:") for key, value in self.env.items(): - log.info(" {key}={value}".format(key=key, value=value)) + log.debug(" {key}={value}".format(key=key, value=value)) def _spawn(self): if self.alive: return diff --git a/test/test_sasl_integration.py b/test/test_sasl_integration.py index 819b96ec6..e3a4813ae 100644 --- a/test/test_sasl_integration.py +++ b/test/test_sasl_integration.py @@ -37,8 +37,7 @@ def test_admin(request, sasl_kafka): def test_produce_and_consume(request, sasl_kafka): - topic_name = "shortname_" + random_string(4) - # topic_name = special_to_underscore(request.node.name + random_string(4)) + topic_name = special_to_underscore(request.node.name + random_string(4)) sasl_kafka.create_topics([topic_name], num_partitions=2) producer, = sasl_kafka.get_producers(1) diff --git a/tox.ini b/tox.ini index e2e2d7802..06403d6ed 100644 --- a/tox.ini +++ b/tox.ini @@ -19,9 +19,7 @@ deps = xxhash crc32c commands = -# TODO: revert change -# py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc} - py.test {posargs:-x} + py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc} setenv = CRC32C_SW_MODE = auto PROJECT_ROOT = {toxinidir} From bce9e417379f6ab1679956dd5f1ca79e5f907c4a Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Thu, 17 Oct 2019 09:59:25 +0200 Subject: [PATCH 45/46] minor bugfix --- test/fixtures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/fixtures.py b/test/fixtures.py index b9a6f7411..677537cd1 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -148,7 +148,7 @@ def render_template(cls, source_file, target_file, binding): with open(target_file.strpath, 'r') as o: for line in o: log.debug(' ' + line.strip()) - log.debug("binding:".format(binding)) + log.debug("binding:") for key, value in binding.items(): log.debug(" {key}={value}".format(key=key, value=value)) From 54922a0a760ff8dd021f4036b97c6d008e028e09 Mon Sep 17 00:00:00 2001 From: Swen Wenzel Date: Thu, 17 Oct 2019 11:54:58 +0200 Subject: [PATCH 46/46] increase Kafka startup timeout again --- test/fixtures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/fixtures.py b/test/fixtures.py index 677537cd1..78cdc5c24 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -418,7 +418,7 @@ def start(self): self.render_template(jaas_conf_template, jaas_conf, vars(self)) timeout = 5 - max_timeout = 30 + max_timeout = 120 backoff = 1 end_at = time.time() + max_timeout tries = 1