From f9eac19f1d9e4fcb5ab0b8e9e1120e5a5cf891fc Mon Sep 17 00:00:00 2001 From: bodja Date: Fri, 4 Oct 2024 17:18:18 +0200 Subject: [PATCH] feat: change `Connector.name` is using (optional prefix + class name). feat: add `kafka` module to autodiscover connectors and consumers. feat: add optional context kwarg to `DjangoKafkaError`. fix: retry on 404 when checking for connector status. fix: do not allow adding to the registry the same key. fix: retry connector status check when status is UNASSIGNED. refs #16 --- README.md | 89 ++++++++----- django_kafka/__init__.py | 11 +- django_kafka/conf.py | 31 +++-- django_kafka/connect/connector.py | 27 ++-- django_kafka/exceptions.py | 4 +- .../management/commands/kafka_connect.py | 44 ++++-- django_kafka/registry.py | 8 ++ django_kafka/tests/connect/test_connector.py | 15 +-- .../connect/test_kafka_connect_command.py | 4 +- django_kafka/tests/test_registry.py | 125 ++++++++++++------ django_kafka/tests/test_settings.py | 27 ++-- django_kafka/utils.py | 28 ++++ 12 files changed, 278 insertions(+), 135 deletions(-) create mode 100644 django_kafka/utils.py diff --git a/README.md b/README.md index 576952a..4cec39f 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ class Topic1(Topic): Consumers define which topics they take care of. Usually you want one consumer per project. If 2 consumers are defined, then they will be started in parallel. -Consumers are auto-discovered and are expected to be located under the `consumers.py`. +Consumers are auto-discovered and are expected to be located under the `some_django_app/kafka/consumers.py` or `some_django_app/consumers.py`. ```python # ./consumers.py @@ -205,6 +205,8 @@ When consumers are started using [start commands](#start-the-Consumers), an addi ## Connectors +Connectors are auto-discovered and are expected to be located under the `some_django_app/kafka/connectors.py` or `some_django_app/connectors.py`. + Connectors are defined as python classes decorated with `@kafka.connectors()` which adds the class to the global registry. `django_kafka.connect.connector.Connector` implements submission, validation and deletion of the connector configuration. @@ -271,28 +273,34 @@ See `--help`. ```python DJANGO_KAFKA = { "CLIENT_ID": f"{socket.gethostname()}-python", + "ERROR_HANDLER": "django_kafka.error_handlers.ClientErrorHandler", "GLOBAL_CONFIG": {}, "PRODUCER_CONFIG": {}, "CONSUMER_CONFIG": {}, + "RETRY_CONSUMER_CONFIG": { + "auto.offset.reset": "earliest", + "enable.auto.offset.store": False, + "topic.metadata.refresh.interval.ms": 10000, + }, + "RETRY_TOPIC_SUFFIX": "retry", + "DEAD_LETTER_TOPIC_SUFFIX": "dlt", "POLLING_FREQUENCY": 1, # seconds "SCHEMA_REGISTRY": {}, - "ERROR_HANDLER": "django_kafka.error_handlers.ClientErrorHandler", - "CONNECT": { - # Rest API of the kafka-connect instance - "HOST": "http://kafka-connect", - # `requests.auth.AuthBase` instance or tuple of (username, password) for Basic Auth - "AUTH": ("name", "password"), - # kwargs for `urllib3.util.retry.Retry` initialization - "RETRY": dict( - connect=5, - read=5, - status=5, - backoff_factor=0.5, - status_forcelist=[502, 503, 504], - ), - # `django_kafka.connect.client.KafkaConnectSession` would pass this value to every request method call - "REQUESTS_TIMEOUT": 30, - }, + # Rest API of the kafka-connect instance + "CONNECT_HOST": None, + # `requests.auth.AuthBase` instance or tuple of (username, password) for Basic Auth + "CONNECT_AUTH": None, + # kwargs for `urllib3.util.retry.Retry` initialization + "CONNECT_RETRY": dict( + connect=5, + read=5, + status=5, + backoff_factor=0.5, + status_forcelist=[502, 503, 504], + ), + # `django_kafka.connect.client.KafkaConnectSession` would pass this value to every request method call + "CONNECT_REQUESTS_TIMEOUT": 30, + "CONNECTOR_NAME_PREFIX": "", } ``` @@ -334,21 +342,38 @@ Default: `django_kafka.error_handlers.ClientErrorHandler` This is an `error_cb` hook (see [Kafka Client Configuration](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafka-client-configuration) for reference). It is triggered for client global errors and in case of fatal error it raises `DjangoKafkaError`. -#### `CONNECT` -Default: `{ - "HOST": "", - "AUTH": None, - "RETRY": dict( - connect=5, - read=5, - status=5, - backoff_factor=0.5, - status_forcelist=[502, 503, 504], - ), - "REQUESTS_TIMEOUT": 30, -}` +#### `CONNECT_HOST` +Default: `None` + +Rest API of the kafka-connect instance. + +#### `CONNECT_AUTH` +Default: `None` + +`requests.auth.AuthBase` instance or `("username", "password")` for Basic Auth. + +#### `CONNECT_AUTH` +Default: `dict( + connect=5, + read=5, + status=5, + backoff_factor=0.5, + status_forcelist=[502, 503, 504], +)` + +kwargs for `urllib3.util.retry.Retry` initialization. + +#### `CONNECT_REQUESTS_TIMEOUT` +Default: `30` + +`django_kafka.connect.client.KafkaConnectSession` would pass this value to every request method call. + +#### `CONNECTOR_NAME_PREFIX` +Default: `""` + +Prefix which will be added to the connector name when publishing the connector. -Required for `./manage.py kafka_conncect` command. +`CONNECT_` settings are required for `./manage.py kafka_connect` command which talks to the Rest API of the kafka-connect instance. Used by `django_kafka.connect.connector.Connector` to initialize `django_kafka.connect.client.KafkaConnectClient`. diff --git a/django_kafka/__init__.py b/django_kafka/__init__.py index 75935bf..f465940 100644 --- a/django_kafka/__init__.py +++ b/django_kafka/__init__.py @@ -9,7 +9,7 @@ from django_kafka.conf import settings from django_kafka.exceptions import DjangoKafkaError from django_kafka.producer import Producer -from django_kafka.registry import ConsumersRegistry, Registry +from django_kafka.registry import ConnectorsRegistry, ConsumersRegistry from django_kafka.retry.settings import RetrySettings if TYPE_CHECKING: @@ -28,11 +28,16 @@ def autodiscover(): - autodiscover_modules("consumers", "connectors") + autodiscover_modules( + "consumers", + "connectors", + "kafka.consumers", + "kafka.connectors", + ) class DjangoKafka: - connectors = Registry["Connector"]() + connectors = ConnectorsRegistry() consumers = ConsumersRegistry() retry = RetrySettings diff --git a/django_kafka/conf.py b/django_kafka/conf.py index 103421e..0b7845b 100644 --- a/django_kafka/conf.py +++ b/django_kafka/conf.py @@ -18,22 +18,21 @@ "DEAD_LETTER_TOPIC_SUFFIX": "dlt", "POLLING_FREQUENCY": 1, # seconds "SCHEMA_REGISTRY": {}, - "CONNECT": { - # Rest API of the kafka-connect instance - "HOST": "", - # `requests.auth.AuthBase` instance or tuple of (username, password) for Basic Auth - "AUTH": None, - # kwargs for `urllib3.util.retry.Retry` initialization - "RETRY": dict( - connect=5, - read=5, - status=5, - backoff_factor=0.5, - status_forcelist=[502, 503, 504], - ), - # `django_kafka.connect.client.KafkaConnectSession` would pass this value to every request method call - "REQUESTS_TIMEOUT": 30, - }, + # Rest API of the kafka-connect instance + "CONNECT_HOST": None, # e.g. http://kafka-connect + # `requests.auth.AuthBase` instance or tuple of (username, password) for Basic Auth + "CONNECT_AUTH": None, + # kwargs for `urllib3.util.retry.Retry` initialization + "CONNECT_RETRY": dict( + connect=5, + read=5, + status=5, + backoff_factor=0.5, + status_forcelist=[502, 503, 504], + ), + # `django_kafka.connect.client.KafkaConnectSession` would pass this value to every request method call + "CONNECT_REQUESTS_TIMEOUT": 30, + "CONNECTOR_NAME_PREFIX": "", } diff --git a/django_kafka/connect/connector.py b/django_kafka/connect/connector.py index 6ba2569..3276684 100644 --- a/django_kafka/connect/connector.py +++ b/django_kafka/connect/connector.py @@ -27,10 +27,13 @@ class ConnectorStatus(StrEnum): class Connector(ABC): mark_for_removal = False + @classmethod @property - def name(self) -> str: + def name(cls) -> str: """Name of the connector.""" - return f"{settings.CLIENT_ID}.{self.__class__.__module__}.{self.__class__.__name__}" + if settings.CONNECTOR_NAME_PREFIX: + return f"{settings.CONNECTOR_NAME_PREFIX}.{cls.__name__}" + return cls.__name__ @property @abstractmethod @@ -38,11 +41,14 @@ def config(self) -> dict: """Configurations for the connector.""" def __init__(self): + if not settings.CONNECT_HOST: + raise DjangoKafkaError("Kafka `CONNECT_HOST` is not configured.") + self.client = KafkaConnectClient( - host=settings.CONNECT["HOST"], - auth=settings.CONNECT["AUTH"], - retry=settings.CONNECT["RETRY"], - timeout=settings.CONNECT["REQUESTS_TIMEOUT"], + host=settings.CONNECT_HOST, + auth=settings.CONNECT_AUTH, + retry=settings.CONNECT_RETRY, + timeout=settings.CONNECT_REQUESTS_TIMEOUT, ) def delete(self) -> bool: @@ -52,7 +58,7 @@ def delete(self) -> bool: return False if not response.ok: - raise DjangoKafkaError(response.text) + raise DjangoKafkaError(response.text, context=response) return True @@ -60,7 +66,7 @@ def submit(self) -> dict: response = self.client.update_or_create(self.name, self.config) if not response.ok: - raise DjangoKafkaError(response.text) + raise DjangoKafkaError(response.text, context=response) return response.json() @@ -68,7 +74,7 @@ def is_valid(self, raise_exception=False) -> bool: response = self.client.validate(self.config) if raise_exception and not response.ok: - raise DjangoKafkaError(response.text) + raise DjangoKafkaError(response.text, context=response) return response.ok @@ -76,6 +82,5 @@ def status(self) -> ConnectorStatus: response = self.client.connector_status(self.name) if not response.ok: - raise DjangoKafkaError(response.text) - + raise DjangoKafkaError(response.text, context=response) return response.json()["connector"]["state"] diff --git a/django_kafka/exceptions.py b/django_kafka/exceptions.py index d18f69a..d3cbe49 100644 --- a/django_kafka/exceptions.py +++ b/django_kafka/exceptions.py @@ -1,2 +1,4 @@ class DjangoKafkaError(Exception): - pass + def __init__(self, *args, context: any = None, **kwargs): + super().__init__(*args, **kwargs) + self.context = context diff --git a/django_kafka/management/commands/kafka_connect.py b/django_kafka/management/commands/kafka_connect.py index 084ed33..2325e8d 100644 --- a/django_kafka/management/commands/kafka_connect.py +++ b/django_kafka/management/commands/kafka_connect.py @@ -2,11 +2,13 @@ from django.core.management import CommandError from django.core.management.base import BaseCommand +from requests import Response from requests.exceptions import RetryError from django_kafka import kafka from django_kafka.exceptions import DjangoKafkaError from django_kafka.connect.connector import Connector, ConnectorStatus +from django_kafka.utils import retry logger = logging.getLogger(__name__) @@ -134,22 +136,40 @@ def handle_status(self): self.stdout.write(f"{connector_path}: ", ending="") connector = kafka.connectors[connector_path]() - if connector.mark_for_removal: - self.stdout.write(self.style.WARNING("skip (REASON: marked for removal)")) - continue - try: - status = connector.status() - except (DjangoKafkaError, RetryError) as error: + self._connector_is_running(connector) + except DjangoKafkaError as error: self.has_failures = True - self.stdout.write(self.style.ERROR("failed to retrieve")) self.stdout.write(self.style.ERROR(str(error))) + + @retry((DjangoKafkaError,), tries=3, delay=1, backoff=2) + def _connector_is_running(self, connector: Connector): + if connector.mark_for_removal: + self.stdout.write(self.style.WARNING("skip (REASON: marked for removal)")) + return + try: + status = connector.status() + except DjangoKafkaError as error: + if isinstance(error.context, Response) and error.context.status_code == 404: + # retry: on 404 as some delays are expected + raise + + self.has_failures = True + self.stdout.write(self.style.ERROR("failed to retrieve")) + self.stdout.write(self.style.ERROR(str(error))) + except RetryError as error: + self.has_failures = True + self.stdout.write(self.style.ERROR("failed to retrieve")) + self.stdout.write(self.style.ERROR(str(error))) + else: + if status == ConnectorStatus.RUNNING: + self.stdout.write(self.style.SUCCESS(status)) + elif status == ConnectorStatus.UNASSIGNED: + # retry: on UNASSIGNED as some delays are expected + raise DjangoKafkaError(f"Connector status is {status}.") else: - if status == ConnectorStatus.RUNNING: - self.stdout.write(self.style.SUCCESS(status)) - else: - self.has_failures = True - self.stdout.write(self.style.ERROR(status)) + self.has_failures = True + self.stdout.write(self.style.ERROR(status)) def handle_delete(self, connector: Connector): try: diff --git a/django_kafka/registry.py b/django_kafka/registry.py index e1bd328..fa55bde 100644 --- a/django_kafka/registry.py +++ b/django_kafka/registry.py @@ -4,6 +4,7 @@ if TYPE_CHECKING: from django_kafka.consumer import Consumer + from django_kafka.connect.connector import Connector T = TypeVar('T') @@ -34,9 +35,16 @@ def get_key(self, cls: Type[T]) -> str: def register(self, cls: Type[T]): key = self.get_key(cls) + if key in self._classes: + raise DjangoKafkaError(f"`{key}` is already registered.") self._classes[key] = cls +class ConnectorsRegistry(Registry["Connector"]): + def get_key(self, cls) -> str: + return cls.name + + class ConsumersRegistry(Registry["Consumer"]): def register(self, cls): diff --git a/django_kafka/tests/connect/test_connector.py b/django_kafka/tests/connect/test_connector.py index 15805e4..5684368 100644 --- a/django_kafka/tests/connect/test_connector.py +++ b/django_kafka/tests/connect/test_connector.py @@ -12,22 +12,17 @@ class MyConnector(Connector): config = {} -@override_settings(CONNECT={ - "HOST": "http://localhost", - "AUTH": ("user", "pass"), - "RETRY": {}, - "REQUESTS_TIMEOUT": 5, -}) +@patch.multiple("django_kafka.conf.settings", CONNECT_HOST="http://kafka-connect") class ConnectorTestCase(SimpleTestCase): @patch("django_kafka.connect.connector.KafkaConnectClient", spec=True) def test_init_request_session(self, mock_client): connector = MyConnector() mock_client.assert_called_with( - host=settings.CONNECT["HOST"], - auth=settings.CONNECT["AUTH"], - retry=settings.CONNECT["RETRY"], - timeout=settings.CONNECT["REQUESTS_TIMEOUT"], + host=settings.CONNECT_HOST, + auth=settings.CONNECT_AUTH, + retry=settings.CONNECT_RETRY, + timeout=settings.CONNECT_REQUESTS_TIMEOUT, ) self.assertIsInstance(connector.client, KafkaConnectClient) diff --git a/django_kafka/tests/connect/test_kafka_connect_command.py b/django_kafka/tests/connect/test_kafka_connect_command.py index 39cb601..2cbb75d 100644 --- a/django_kafka/tests/connect/test_kafka_connect_command.py +++ b/django_kafka/tests/connect/test_kafka_connect_command.py @@ -160,7 +160,9 @@ def test_handle_status__unassigned(self): with patch_kafka_connectors(**{"status.return_value": ConnectorStatus.UNASSIGNED}) as connector: self.command.handle_status() - connector.status.assert_called_once_with() + # status UNASSIGNED is retried 3 times + self.assertEqual(connector.status.call_count, 3) + self.assertTrue(self.command.has_failures) def test_handle_status__django_kafka_error(self): diff --git a/django_kafka/tests/test_registry.py b/django_kafka/tests/test_registry.py index cd1d831..1fb4934 100644 --- a/django_kafka/tests/test_registry.py +++ b/django_kafka/tests/test_registry.py @@ -1,48 +1,85 @@ from typing import Type from unittest import mock +from unittest.mock import patch -from django.test import TestCase +from django.test import SimpleTestCase +from django_kafka.connect.connector import Connector from django_kafka.consumer import Consumer, Topics -from django_kafka.registry import ConsumersRegistry +from django_kafka.exceptions import DjangoKafkaError +from django_kafka.registry import ConsumersRegistry, Registry, ConnectorsRegistry -class ConsumersRegistryTestCase(TestCase): - def _get_consumer_cls(self, name, group_id) -> Type[Consumer]: - return type[Consumer]( - name, - (Consumer,), - {"config": {"group.id": group_id}, "topics": Topics()}, +class RegistryTestCase(SimpleTestCase): + def _gen_cls(self, name): + return type(name, (object, ), {}) + + def test_registering_same_key_not_allowed(self): + registry = Registry() + + cls = self._gen_cls("SomeClass") + key = registry.get_key(cls) + error_msg = f"`{key}` is already registered." + + registry()(cls) + + with self.assertRaisesMessage(DjangoKafkaError, error_msg): + registry()(cls) + + def test_get_key(self): + cls_a = self._gen_cls("ClassA") + cls_b = self._gen_cls("ClassB") + registry = Registry() + + key_a = registry.get_key(cls_a) + self.assertEqual( + key_a, + f"{cls_a.__module__}.{cls_a.__name__}", + ) + + key_b = registry.get_key(cls_b) + self.assertEqual( + key_b, + f"{cls_b.__module__}.{cls_b.__name__}", ) def test_decorator_adds_to_registry(self): - consumer_cls_a = self._get_consumer_cls("ConsumerA", "group_a") - consumer_cls_b = self._get_consumer_cls("ConsumerB", "group_b") + cls_a = self._gen_cls("ClassA") + cls_b = self._gen_cls("classB") - registry = ConsumersRegistry() + registry = Registry() - self.assertIs(registry()(consumer_cls_a), consumer_cls_a) - self.assertIs(registry()(consumer_cls_b), consumer_cls_b) + self.assertIs(registry()(cls_a), cls_a) + self.assertIs(registry()(cls_b), cls_b) - key_a = registry.get_key(consumer_cls_a) - self.assertIs(registry[key_a], consumer_cls_a) + key_a = registry.get_key(cls_a) + self.assertIs(registry[key_a], cls_a) - key_b = registry.get_key(consumer_cls_b) - self.assertIs(registry[key_b], consumer_cls_b) + key_b = registry.get_key(cls_b) + self.assertIs(registry[key_b], cls_b) def test_iter_returns_expected_keys(self): - consumer_cls_a = self._get_consumer_cls("ConsumerA", "group_a") - consumer_cls_b = self._get_consumer_cls("ConsumerB", "group_b") - registry = ConsumersRegistry() + cls_a = self._gen_cls("ClassA") + cls_b = self._gen_cls("ClassB") + registry = Registry() - registry()(consumer_cls_a) - registry()(consumer_cls_b) + registry()(cls_a) + registry()(cls_b) - key_a = registry.get_key(consumer_cls_a) - key_b = registry.get_key(consumer_cls_b) + key_a = registry.get_key(cls_a) + key_b = registry.get_key(cls_b) self.assertCountEqual(list(registry), [key_a, key_b]) + +class ConsumersRegistryTestCase(SimpleTestCase): + def _get_consumer_cls(self, name, group_id) -> Type[Consumer]: + return type[Consumer]( + name, + (Consumer,), + {"config": {"group.id": group_id}, "topics": Topics()}, + ) + @mock.patch("django_kafka.retry.consumer.RetryConsumer.build") def test_retry_consumer_registered(self, mock_retry_consumer_build): consumer_cls_a = self._get_consumer_cls("ConsumerA", "group_a") @@ -61,19 +98,33 @@ def test_retry_consumer_registered(self, mock_retry_consumer_build): self.assertCountEqual(list(registry), [key_a, retry_key_a, key_b]) self.assertIs(registry[retry_key_a], retry_consumer_mock) - def test_get_key(self): - consumer_cls_a = self._get_consumer_cls("ConsumerA", "group_a") - consumer_cls_b = self._get_consumer_cls("ConsumerB", "group_b") - registry = ConsumersRegistry() - key_a = registry.get_key(consumer_cls_a) - self.assertEqual( - key_a, - f"{consumer_cls_a.__module__}.{consumer_cls_a.__name__}", +class ConnectorsRegistryTestCase(SimpleTestCase): + def _gen_cls(self, name) -> Type[Connector]: + return type[Connector]( + name, + (Connector,), + {"config": {}}, ) - key_b = registry.get_key(consumer_cls_b) - self.assertEqual( - key_b, - f"{consumer_cls_b.__module__}.{consumer_cls_b.__name__}", - ) + @patch.multiple("django_kafka.conf.settings", CONNECT_HOST="http://kafka-connect") + def test_get_key(self): + connector_cls = self._gen_cls("ConnectorCls") + registry = ConnectorsRegistry() + + key = registry.get_key(connector_cls) + self.assertEqual(key, connector_cls().name) + + def test_registering_same_key_not_allowed(self): + registry = ConnectorsRegistry() + connector_a = self._gen_cls("ConnectorA") + # give the same class name to the connector to generate same key + connector_b = self._gen_cls("ConnectorA") + + key = registry.get_key(connector_a) + error_msg = f"`{key}` is already registered." + + registry()(connector_a) + + with self.assertRaisesMessage(DjangoKafkaError, error_msg): + registry()(connector_b) diff --git a/django_kafka/tests/test_settings.py b/django_kafka/tests/test_settings.py index a20a590..fdacadf 100644 --- a/django_kafka/tests/test_settings.py +++ b/django_kafka/tests/test_settings.py @@ -17,7 +17,11 @@ class SettingsTestCase(TestCase): "DEAD_LETTER_TOPIC_SUFFIX", "POLLING_FREQUENCY", "SCHEMA_REGISTRY", - "CONNECT", + "CONNECT_HOST", + "CONNECT_AUTH", + "CONNECT_RETRY", + "CONNECT_REQUESTS_TIMEOUT", + "CONNECTOR_NAME_PREFIX", ) @patch("django_kafka.consumer.ConfluentConsumer") @@ -48,18 +52,17 @@ def test_user_settings(self, mock_consumer_client): "SCHEMA_REGISTRY": { "url": "https://schema-registry", }, - "CONNECT": { - "HOST": "http://kafka-connect", - "AUTH": ("user", "pass"), - "RETRY": { - "connect": 10, - "read": 10, - "status": 10, - "backoff_factor": 0.1, - "status_forcelist": [502, 503, 504], - }, - "REQUESTS_TIMEOUT": 60, + "CONNECT_HOST": "http://kafka-connect", + "CONNECT_AUTH": ("user", "pass"), + "CONNECT_RETRY": { + "connect": 10, + "read": 10, + "status": 10, + "backoff_factor": 0.1, + "status_forcelist": [502, 503, 504], }, + "CONNECT_REQUESTS_TIMEOUT": 60, + "CONNECTOR_NAME_PREFIX": "project_name" } with override_settings(**{SETTINGS_KEY: user_settings}): for key in self.settings_keys: diff --git a/django_kafka/utils.py b/django_kafka/utils.py new file mode 100644 index 0000000..422f374 --- /dev/null +++ b/django_kafka/utils.py @@ -0,0 +1,28 @@ +import time +from functools import wraps +from typing import Type + + +def retry( + exceptions: tuple[Type[Exception]] = (Exception,), + tries: int = -1, + delay: int = 0, + backoff: int = 1, +): + def decorator(f): + @wraps(f) + def wrapper(*args, **kwargs): + _tries = tries + _delay = delay + + while _tries: + try: + return f(*args, **kwargs) + except exceptions: + _tries -= 1 + if not _tries: + raise + time.sleep(_delay) + _delay *= backoff + return wrapper + return decorator