Skip to content

Commit

Permalink
Raise user-friendly error if librdkafka is too old for OIDC
Browse files Browse the repository at this point in the history
  • Loading branch information
lpsinger committed Feb 11, 2022
1 parent 162357d commit 2dbb9ce
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 4 deletions.
6 changes: 4 additions & 2 deletions adc/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
import enum
import logging
from datetime import timedelta
from multiprocessing import context
from typing import Dict, Iterable, Iterator, List, Optional, Set, Union

import confluent_kafka # type: ignore
import confluent_kafka.admin # type: ignore

from .auth import SASLAuth
from .errors import ErrorCallback, log_client_errors
from .errors import ErrorCallback, log_client_errors, catch_kafka_version_support_errors


class Consumer:
Expand All @@ -19,7 +20,8 @@ class Consumer:
def __init__(self, conf: 'ConsumerConfig') -> None:
self.logger = logging.getLogger("adc-streaming.consumer")
self.conf = conf
self._consumer = confluent_kafka.Consumer(conf._to_confluent_kafka())
with catch_kafka_version_support_errors():
self._consumer = confluent_kafka.Consumer(conf._to_confluent_kafka())

def subscribe(self,
topics: Union[str, Iterable],
Expand Down
22 changes: 22 additions & 0 deletions adc/errors.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from contextlib import contextmanager
import logging
from typing import Callable

import confluent_kafka # type: ignore
from packaging.version import Version

logger = logging.getLogger("adc-streaming")

Expand Down Expand Up @@ -52,3 +54,23 @@ def __init__(self, error):
self.fatal = error.fatal()
msg = f"Error communicating with Kafka: code={self.name} {self.reason}"
super(KafkaException, self).__init__(msg)


@contextmanager
def catch_kafka_version_support_errors():
try:
yield
except confluent_kafka.KafkaException as e:
err, *_ = e.args
librdkafka_version, *_ = confluent_kafka.libversion()
if (
err.code() == confluent_kafka.KafkaError._INVALID_ARG
and any(err.str() == f'No such configuration property: "{key}"'
for key in ['sasl.oauthbearer.client.id',
'sasl.oauthbearer.client.secret',
'sasl.oauthbearer.method',
'sasl.oauthbearer.token.endpoint.url'])
and Version(librdkafka_version) < Version('1.9.0')
):
raise RuntimeError(f'OpenID Connect support requires librdkafka >= 1.9.0, but you have {librdkafka_version}. Please install a newer version of confluent-kafka.') from e
raise
5 changes: 3 additions & 2 deletions adc/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from .auth import SASLAuth
from .errors import (DeliveryCallback, ErrorCallback, log_client_errors,
log_delivery_errors)
log_delivery_errors, catch_kafka_version_support_errors)


class Producer:
Expand All @@ -20,7 +20,8 @@ def __init__(self, conf: 'ProducerConfig') -> None:
self.logger = logging.getLogger("adc-streaming.producer")
self.conf = conf
self.logger.debug(f"connecting to producer with config {conf._to_confluent_kafka()}")
self._producer = confluent_kafka.Producer(conf._to_confluent_kafka())
with catch_kafka_version_support_errors():
self._producer = confluent_kafka.Producer(conf._to_confluent_kafka())

def write(self,
msg: Union[bytes, 'Serializable'],
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"docker",
"flake8",
"isort",
"packaging",
"pytest",
"pytest-timeout",
"pytest-integration",
Expand Down

0 comments on commit 2dbb9ce

Please sign in to comment.