diff --git a/tests/_testutil.py b/tests/_testutil.py index 21ab8c42..84d9165f 100644 --- a/tests/_testutil.py +++ b/tests/_testutil.py @@ -392,12 +392,21 @@ def assert_message_count(self, messages, num_messages): # Make sure there are no duplicates self.assertEqual(len(set(messages)), num_messages) - def create_ssl_context(self): + def create_ssl_context(self, ca_cert_only: bool = False): + certfile = None + keyfile = None + + if not ca_cert_only: + certfile = str(self.ssl_folder / "cl_client.pem") + keyfile = str(self.ssl_folder / "cl_client.key") + context = create_ssl_context( cafile=str(self.ssl_folder / "ca-cert"), - certfile=str(self.ssl_folder / "cl_client.pem"), - keyfile=str(self.ssl_folder / "cl_client.key"), - password="abcdefgh") + certfile=certfile, + keyfile=keyfile, + password="abcdefgh", + ) + context.check_hostname = False return context diff --git a/tests/test_sasl.py b/tests/test_sasl.py index 1c1c38bf..e12eaafc 100644 --- a/tests/test_sasl.py +++ b/tests/test_sasl.py @@ -38,6 +38,11 @@ def sasl_hosts(self): # Produce/consume by SASL_PLAINTEXT return "{}:{}".format(self.kafka_host, self.kafka_sasl_plain_port) + @property + def sasl_ssl_hosts(self): + # Produce/consume by SASL_SSL + return "{}:{}".format(self.kafka_host, self.kafka_sasl_ssl_port) + @property def group_id(self): return self.topic + "_group" @@ -142,6 +147,42 @@ async def scram_consumer_factory(self, user="test", **kw): await consumer.start() return consumer + async def ssl_producer_factory(self, user="test", **kw): + ssl_context = self.create_ssl_context(ca_cert_only=True) + producer = AIOKafkaProducer( + loop=self.loop, + bootstrap_servers=[self.sasl_ssl_hosts], + security_protocol="SASL_SSL", + sasl_mechanism="PLAIN", + sasl_plain_username=user, + sasl_plain_password=user, + ssl_context=ssl_context, + **kw) + self.add_cleanup(producer.stop) + await producer.start() + return producer + + async def ssl_consumer_factory(self, user="test", **kw): + kwargs = dict( + enable_auto_commit=True, + auto_offset_reset="earliest", + group_id=self.group_id, + ssl_context=self.create_ssl_context(ca_cert_only=True), + ) + kwargs.update(kw) + + consumer = AIOKafkaConsumer( + self.topic, loop=self.loop, + bootstrap_servers=[self.sasl_ssl_hosts], + security_protocol="SASL_SSL", + sasl_mechanism="PLAIN", + sasl_plain_username=user, + sasl_plain_password=user, + **kwargs) + self.add_cleanup(consumer.stop) + await consumer.start() + return consumer + @kafka_versions('>=0.10.0') @run_until_complete async def test_sasl_plaintext_basic(self): @@ -181,6 +222,18 @@ async def test_sasl_plaintext_scram(self): msg = await consumer.getone() self.assertEqual(msg.value, b"Super scram msg") + @kafka_versions('>=0.10.2') + @pytest.mark.ssl + @run_until_complete + async def test_sasl_ssl(self): + producer = await self.ssl_producer_factory() + await producer.send_and_wait(topic=self.topic, + value=b"Super sasl ssl msg") + + consumer = await self.ssl_consumer_factory() + msg = await consumer.getone() + self.assertEqual(msg.value, b"Super sasl ssl msg") + ########################################################################## # Topic Resource ##########################################################################