From 5ed8754e5d4e3b9e02e69d5c76c9e928510dcbbf Mon Sep 17 00:00:00 2001 From: Joseph Areeda Date: Mon, 14 Mar 2022 14:37:46 -0700 Subject: [PATCH 1/6] Move consumer to try block --- adc/consumer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/adc/consumer.py b/adc/consumer.py index 97140d1..14668d8 100644 --- a/adc/consumer.py +++ b/adc/consumer.py @@ -118,8 +118,9 @@ def _stream_forever(self, batch_timeout: timedelta = timedelta(seconds=1.0), ) -> Iterator[confluent_kafka.Message]: while True: - messages = self._consumer.consume(batch_size, batch_timeout.total_seconds()) try: + messages = self._consumer.consume(batch_size, + batch_timeout.total_seconds()) for m in messages: err = m.error() if err is None: From 144644c2925f5f17bc1e4a687d871add168b9a66 Mon Sep 17 00:00:00 2001 From: Leo Singer Date: Wed, 2 Feb 2022 18:37:23 -0500 Subject: [PATCH 2/6] Add OAUTHBEARER/OIDC support Add OAUTHBEARER / OpenID Connect (OIDC) support as described in [KIP-768]. This is the authentication mechanism used by the GCN/TACH Kafka broker. * The `SASLMethod` enum gets a new member, `OAUTHBEARER`. * The `SASLAuth` constructor gets a new optional keyword argument, `token_endpoint`. * The presence of the `token_endpoint` keyword argument causes the default SASL method to change to `OAUTHBEARER` and the oauthbearer method to change to `oidc`, because this option is required for OpenID Connect and ignored for all other auth methods. * In OIDC mode, the `username` and `password` positional arguments are interpreted as the client ID and client secret. [KIP-768]: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575 --- adc/auth.py | 24 ++++++++++++++++++++---- tests/test_auth.py | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 4 deletions(-) create mode 100644 tests/test_auth.py diff --git a/adc/auth.py b/adc/auth.py index 6b4cc09..21a75ab 100644 --- a/adc/auth.py +++ b/adc/auth.py @@ -12,6 +12,7 @@ class SASLMethod(Enum): PLAIN = 1 SCRAM_SHA_256 = 2 SCRAM_SHA_512 = 3 + OAUTHBEARER = 4 def __str__(self): return self.name.replace("_", "-") @@ -31,15 +32,24 @@ class SASLAuth(object): ssl : `bool`, optional Whether to enable SSL (enabled by default). method : `SASLMethod`, optional - The SASL method to authenticate, default = SASLMethod.PLAIN. + The SASL method to authenticate. The default is SASLMethod.OAUTHBEARER + if token_endpoint is provided, or SASLMethod.PLAIN otherwise. See valid SASL methods in SASLMethod. ssl_ca_location : `str`, optional If using SSL via a self-signed cert, a path/location to the certificate. + token_endpoint : `str`, optional + The OpenID Connect token endpoint URL. + Required for OAUTHBEARER / OpenID Connect, otherwise ignored. """ - def __init__(self, user, password, ssl=True, method=SASLMethod.PLAIN, **kwargs): + def __init__(self, user, password, ssl=True, method=None, token_endpoint=None, **kwargs): + if method is None: + if token_endpoint is None: + method = SASLMethod.PLAIN + else: + method = SASLMethod.OAUTHBEARER self._method = method # set up SSL options @@ -58,8 +68,14 @@ def __init__(self, user, password, ssl=True, method=SASLMethod.PLAIN, **kwargs): # set up SASL options self._config["sasl.mechanism"] = str(self._method) - self._config["sasl.username"] = user - self._config["sasl.password"] = password + if token_endpoint: + self._config["sasl.oauthbearer.client.id"] = user + self._config["sasl.oauthbearer.client.secret"] = password + self._config["sasl.oauthbearer.method"] = "oidc" + self._config["sasl.oauthbearer.token.endpoint.url"] = token_endpoint + else: + self._config["sasl.username"] = user + self._config["sasl.password"] = password def __call__(self): return self._config diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..606f0c3 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,36 @@ +import pytest + +from adc.auth import SASLAuth, SASLMethod + + +@pytest.mark.parametrize('auth,expected_config', [ + [ + SASLAuth( + 'test', 'test-pass', + token_endpoint='https://example.com/oauth2/token' + ), + { + 'sasl.mechanism': 'OAUTHBEARER', + 'sasl.oauthbearer.client.id': 'test', + 'sasl.oauthbearer.client.secret': 'test-pass', + 'sasl.oauthbearer.method': 'oidc', + 'sasl.oauthbearer.token.endpoint.url': 'https://example.com/oauth2/token', + 'security.protocol': 'SASL_SSL' + } + ], + [ + SASLAuth( + 'test', 'test-pass', + ), + { + 'sasl.mechanism': 'PLAIN', + 'sasl.username': 'test', + 'sasl.password': 'test-pass', + 'security.protocol': 'SASL_SSL' + } + ] +]) +def test_auth(auth, expected_config): + # Check that the key/value pairs in expected_config are a subset + # of the key/value pairs in auth._config. + assert expected_config.items() <= auth._config.items() From 58a5a17aef88f77b5e35066631319c370c58a59a Mon Sep 17 00:00:00 2001 From: Leo Singer Date: Fri, 11 Feb 2022 11:19:07 -0500 Subject: [PATCH 3/6] Raise user-friendly error if librdkafka is too old for OIDC --- adc/consumer.py | 6 ++++-- adc/errors.py | 22 ++++++++++++++++++++++ adc/producer.py | 5 +++-- setup.py | 1 + 4 files changed, 30 insertions(+), 4 deletions(-) diff --git a/adc/consumer.py b/adc/consumer.py index 14668d8..3d79c4b 100644 --- a/adc/consumer.py +++ b/adc/consumer.py @@ -2,6 +2,7 @@ import enum import logging from datetime import timedelta +from multiprocessing import context from typing import Dict, Iterable, Iterator, List, Optional, Set, Union from collections import defaultdict @@ -9,7 +10,7 @@ import confluent_kafka.admin # type: ignore from .auth import SASLAuth -from .errors import ErrorCallback, log_client_errors +from .errors import ErrorCallback, log_client_errors, catch_kafka_version_support_errors class Consumer: @@ -20,7 +21,8 @@ class Consumer: def __init__(self, conf: 'ConsumerConfig') -> None: self.logger = logging.getLogger("adc-streaming.consumer") self.conf = conf - self._consumer = confluent_kafka.Consumer(conf._to_confluent_kafka()) + with catch_kafka_version_support_errors(): + self._consumer = confluent_kafka.Consumer(conf._to_confluent_kafka()) def subscribe(self, topics: Union[str, Iterable], diff --git a/adc/errors.py b/adc/errors.py index adb34c9..11864ca 100644 --- a/adc/errors.py +++ b/adc/errors.py @@ -1,7 +1,9 @@ +from contextlib import contextmanager import logging from typing import Callable import confluent_kafka # type: ignore +from packaging.version import Version logger = logging.getLogger("adc-streaming") @@ -52,3 +54,23 @@ def __init__(self, error): self.fatal = error.fatal() msg = f"Error communicating with Kafka: code={self.name} {self.reason}" super(KafkaException, self).__init__(msg) + + +@contextmanager +def catch_kafka_version_support_errors(): + try: + yield + except confluent_kafka.KafkaException as e: + err, *_ = e.args + librdkafka_version, *_ = confluent_kafka.libversion() + if ( + err.code() == confluent_kafka.KafkaError._INVALID_ARG + and any(err.str() == f'No such configuration property: "{key}"' + for key in ['sasl.oauthbearer.client.id', + 'sasl.oauthbearer.client.secret', + 'sasl.oauthbearer.method', + 'sasl.oauthbearer.token.endpoint.url']) + and Version(librdkafka_version) < Version('1.9.0') + ): + raise RuntimeError(f'OpenID Connect support requires librdkafka >= 1.9.0, but you have {librdkafka_version}. Please install a newer version of confluent-kafka.') from e + raise diff --git a/adc/producer.py b/adc/producer.py index c4dc892..c83eadf 100644 --- a/adc/producer.py +++ b/adc/producer.py @@ -8,7 +8,7 @@ from .auth import SASLAuth from .errors import (DeliveryCallback, ErrorCallback, log_client_errors, - log_delivery_errors) + log_delivery_errors, catch_kafka_version_support_errors) class Producer: @@ -20,7 +20,8 @@ def __init__(self, conf: 'ProducerConfig') -> None: self.logger = logging.getLogger("adc-streaming.producer") self.conf = conf self.logger.debug(f"connecting to producer with config {conf._to_confluent_kafka()}") - self._producer = confluent_kafka.Producer(conf._to_confluent_kafka()) + with catch_kafka_version_support_errors(): + self._producer = confluent_kafka.Producer(conf._to_confluent_kafka()) def write(self, msg: Union[bytes, 'Serializable'], diff --git a/setup.py b/setup.py index 6f47fb5..3d7fd3c 100644 --- a/setup.py +++ b/setup.py @@ -15,6 +15,7 @@ "docker", "flake8", "isort", + "packaging", "pytest", "pytest-timeout", "pytest-integration", From 1e62af911a0e36dfdf4e640c28007923ff9518e4 Mon Sep 17 00:00:00 2001 From: Leo Singer Date: Wed, 9 Mar 2022 15:21:02 -0500 Subject: [PATCH 4/6] Revert "Raise user-friendly error if librdkafka is too old for OIDC" This reverts commit 2dbb9cec2e08d78ae784cb51d052d189751b5720. --- adc/consumer.py | 6 ++---- adc/errors.py | 22 ---------------------- adc/producer.py | 5 ++--- setup.py | 1 - 4 files changed, 4 insertions(+), 30 deletions(-) diff --git a/adc/consumer.py b/adc/consumer.py index 3d79c4b..14668d8 100644 --- a/adc/consumer.py +++ b/adc/consumer.py @@ -2,7 +2,6 @@ import enum import logging from datetime import timedelta -from multiprocessing import context from typing import Dict, Iterable, Iterator, List, Optional, Set, Union from collections import defaultdict @@ -10,7 +9,7 @@ import confluent_kafka.admin # type: ignore from .auth import SASLAuth -from .errors import ErrorCallback, log_client_errors, catch_kafka_version_support_errors +from .errors import ErrorCallback, log_client_errors class Consumer: @@ -21,8 +20,7 @@ class Consumer: def __init__(self, conf: 'ConsumerConfig') -> None: self.logger = logging.getLogger("adc-streaming.consumer") self.conf = conf - with catch_kafka_version_support_errors(): - self._consumer = confluent_kafka.Consumer(conf._to_confluent_kafka()) + self._consumer = confluent_kafka.Consumer(conf._to_confluent_kafka()) def subscribe(self, topics: Union[str, Iterable], diff --git a/adc/errors.py b/adc/errors.py index 11864ca..adb34c9 100644 --- a/adc/errors.py +++ b/adc/errors.py @@ -1,9 +1,7 @@ -from contextlib import contextmanager import logging from typing import Callable import confluent_kafka # type: ignore -from packaging.version import Version logger = logging.getLogger("adc-streaming") @@ -54,23 +52,3 @@ def __init__(self, error): self.fatal = error.fatal() msg = f"Error communicating with Kafka: code={self.name} {self.reason}" super(KafkaException, self).__init__(msg) - - -@contextmanager -def catch_kafka_version_support_errors(): - try: - yield - except confluent_kafka.KafkaException as e: - err, *_ = e.args - librdkafka_version, *_ = confluent_kafka.libversion() - if ( - err.code() == confluent_kafka.KafkaError._INVALID_ARG - and any(err.str() == f'No such configuration property: "{key}"' - for key in ['sasl.oauthbearer.client.id', - 'sasl.oauthbearer.client.secret', - 'sasl.oauthbearer.method', - 'sasl.oauthbearer.token.endpoint.url']) - and Version(librdkafka_version) < Version('1.9.0') - ): - raise RuntimeError(f'OpenID Connect support requires librdkafka >= 1.9.0, but you have {librdkafka_version}. Please install a newer version of confluent-kafka.') from e - raise diff --git a/adc/producer.py b/adc/producer.py index c83eadf..c4dc892 100644 --- a/adc/producer.py +++ b/adc/producer.py @@ -8,7 +8,7 @@ from .auth import SASLAuth from .errors import (DeliveryCallback, ErrorCallback, log_client_errors, - log_delivery_errors, catch_kafka_version_support_errors) + log_delivery_errors) class Producer: @@ -20,8 +20,7 @@ def __init__(self, conf: 'ProducerConfig') -> None: self.logger = logging.getLogger("adc-streaming.producer") self.conf = conf self.logger.debug(f"connecting to producer with config {conf._to_confluent_kafka()}") - with catch_kafka_version_support_errors(): - self._producer = confluent_kafka.Producer(conf._to_confluent_kafka()) + self._producer = confluent_kafka.Producer(conf._to_confluent_kafka()) def write(self, msg: Union[bytes, 'Serializable'], diff --git a/setup.py b/setup.py index 3d7fd3c..6f47fb5 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,6 @@ "docker", "flake8", "isort", - "packaging", "pytest", "pytest-timeout", "pytest-integration", From b995a25c0cf7fb69e2ae08da6cfe8dbed2e9c709 Mon Sep 17 00:00:00 2001 From: Leo Singer Date: Wed, 9 Mar 2022 15:30:01 -0500 Subject: [PATCH 5/6] Add pure Python implementation of OIDC auth --- adc/consumer.py | 2 ++ adc/oidc.py | 25 +++++++++++++++++++++++++ adc/producer.py | 2 ++ setup.py | 2 ++ 4 files changed, 31 insertions(+) create mode 100644 adc/oidc.py diff --git a/adc/consumer.py b/adc/consumer.py index 14668d8..b10b78e 100644 --- a/adc/consumer.py +++ b/adc/consumer.py @@ -10,6 +10,7 @@ from .auth import SASLAuth from .errors import ErrorCallback, log_client_errors +from .oidc import set_oauth_cb class Consumer: @@ -276,4 +277,5 @@ def as_ms(td: timedelta): if self.auth is not None: config.update(self.auth()) + set_oauth_cb(config) return config diff --git a/adc/oidc.py b/adc/oidc.py new file mode 100644 index 0000000..1353da5 --- /dev/null +++ b/adc/oidc.py @@ -0,0 +1,25 @@ +def set_oauth_cb(config): + """Implement client support for KIP-768 OpenID Connect. + + Apache Kafka 3.1.0 supports authentication using OpenID Client Credentials. + Native support for Python is coming in the next release of librdkafka + (version 1.9.0). Meanwhile, this is a pure Python implementation of the + refresh token callback. + """ + if config.pop('sasl.oauthbearer.method') != 'oidc': + return + + client_id = config.pop('sasl.oauthbearer.client.id') + client_secret = config.pop('sasl.oauthbearer.client.secret') + scope = config.pop('sasl.oauthbearer.scope', None) + token_endpoint = config.pop('sasl.oauthbearer.token.endpoint.url') + + from authlib.integrations.requests_client import OAuth2Session + session = OAuth2Session(client_id, client_secret, scope=scope) + + def oauth_cb(*_, **__): + token = session.fetch_token( + token_endpoint, grant_type='client_credentials') + return token['access_token'], token['expires_at'] + + config['oauth_cb'] = oauth_cb diff --git a/adc/producer.py b/adc/producer.py index c4dc892..57c134b 100644 --- a/adc/producer.py +++ b/adc/producer.py @@ -9,6 +9,7 @@ from .auth import SASLAuth from .errors import (DeliveryCallback, ErrorCallback, log_client_errors, log_delivery_errors) +from .oidc import set_oauth_cb class Producer: @@ -114,6 +115,7 @@ def as_ms(td: timedelta): config["error_cb"] = self.error_callback if self.auth is not None: config.update(self.auth()) + set_oauth_cb(config) return config diff --git a/setup.py b/setup.py index 6f47fb5..0d9af21 100644 --- a/setup.py +++ b/setup.py @@ -3,9 +3,11 @@ # requirements install_requires = [ + "authlib", # FIXME: drop after next release of confluent-kafka with OIDC support "confluent-kafka", "dataclasses ; python_version < '3.7'", "importlib-metadata ; python_version < '3.8'", + "requests", # FIXME: drop after next release of confluent-kafka with OIDC support "tqdm", "certifi>=2020.04.05.1" ] From c4c26bb80be5dd5c51e9d00f70d038579a8c6be5 Mon Sep 17 00:00:00 2001 From: "C. Weaver" Date: Tue, 15 Mar 2022 19:49:35 -0400 Subject: [PATCH 6/6] Add a default when checking for sasl.oauthbearer.method --- adc/oidc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adc/oidc.py b/adc/oidc.py index 1353da5..ffc2752 100644 --- a/adc/oidc.py +++ b/adc/oidc.py @@ -6,7 +6,7 @@ def set_oauth_cb(config): (version 1.9.0). Meanwhile, this is a pure Python implementation of the refresh token callback. """ - if config.pop('sasl.oauthbearer.method') != 'oidc': + if config.pop('sasl.oauthbearer.method', None) != 'oidc': return client_id = config.pop('sasl.oauthbearer.client.id')