Skip to content

Commit

Permalink
feat: change Connector.name is using (optional prefix + class name).
Browse files Browse the repository at this point in the history
feat: add `kafka` module to autodiscover connectors and consumers.
fix: do not allow adding to the registry the same key.
fix: retry connector status check when status is UNASSIGNED.

refs #16
  • Loading branch information
bodja committed Oct 7, 2024
1 parent 4fa00be commit 16f7767
Show file tree
Hide file tree
Showing 11 changed files with 199 additions and 97 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class Topic1(Topic):

Consumers define which topics they take care of. Usually you want one consumer per project. If 2 consumers are defined, then they will be started in parallel.

Consumers are auto-discovered and are expected to be located under the `consumers.py`.
Consumers are auto-discovered and are expected to be located under the `some_django_app/kafka/consumers.py` or `some_django_app/consumers.py`.

```python
# ./consumers.py
Expand Down Expand Up @@ -205,6 +205,8 @@ When consumers are started using [start commands](#start-the-Consumers), an addi

## Connectors

Connectors are auto-discovered and are expected to be located under the `some_django_app/kafka/connectors.py` or `some_django_app/connectors.py`.

Connectors are defined as python classes decorated with `@kafka.connectors()` which adds the class to the global registry.

`django_kafka.connect.connector.Connector` implements submission, validation and deletion of the connector configuration.
Expand Down
11 changes: 8 additions & 3 deletions django_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from django_kafka.conf import settings
from django_kafka.exceptions import DjangoKafkaError
from django_kafka.producer import Producer
from django_kafka.registry import ConsumersRegistry, Registry
from django_kafka.registry import ConnectorsRegistry, ConsumersRegistry
from django_kafka.retry.settings import RetrySettings

if TYPE_CHECKING:
Expand All @@ -28,11 +28,16 @@


def autodiscover():
autodiscover_modules("consumers", "connectors")
autodiscover_modules(
"consumers",
"connectors",
"kafka.consumers",
"kafka.connectors",
)


class DjangoKafka:
connectors = Registry["Connector"]()
connectors = ConnectorsRegistry()
consumers = ConsumersRegistry()
retry = RetrySettings

Expand Down
31 changes: 15 additions & 16 deletions django_kafka/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,21 @@
"DEAD_LETTER_TOPIC_SUFFIX": "dlt",
"POLLING_FREQUENCY": 1, # seconds
"SCHEMA_REGISTRY": {},
"CONNECT": {
# Rest API of the kafka-connect instance
"HOST": "",
# `requests.auth.AuthBase` instance or tuple of (username, password) for Basic Auth
"AUTH": None,
# kwargs for `urllib3.util.retry.Retry` initialization
"RETRY": dict(
connect=5,
read=5,
status=5,
backoff_factor=0.5,
status_forcelist=[502, 503, 504],
),
# `django_kafka.connect.client.KafkaConnectSession` would pass this value to every request method call
"REQUESTS_TIMEOUT": 30,
},
# Rest API of the kafka-connect instance
"CONNECT_HOST": "http://kafka-connect",
# `requests.auth.AuthBase` instance or tuple of (username, password) for Basic Auth
"CONNECT_AUTH": None,
# kwargs for `urllib3.util.retry.Retry` initialization
"CONNECT_RETRY": dict(
connect=5,
read=5,
status=5,
backoff_factor=0.5,
status_forcelist=[502, 503, 504],
),
# `django_kafka.connect.client.KafkaConnectSession` would pass this value to every request method call
"CONNECT_REQUESTS_TIMEOUT": 30,
"CONNECTOR_NAME_PREFIX": "",
}


Expand Down
12 changes: 7 additions & 5 deletions django_kafka/connect/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ class Connector(ABC):
@property
def name(self) -> str:
"""Name of the connector."""
return f"{settings.CLIENT_ID}.{self.__class__.__module__}.{self.__class__.__name__}"
if settings.CONNECTOR_NAME_PREFIX:
return f"{settings.CONNECTOR_NAME_PREFIX}.{self.__class__.__name__}"
return self.__class__.__name__

@property
@abstractmethod
Expand All @@ -39,10 +41,10 @@ def config(self) -> dict:

def __init__(self):
self.client = KafkaConnectClient(
host=settings.CONNECT["HOST"],
auth=settings.CONNECT["AUTH"],
retry=settings.CONNECT["RETRY"],
timeout=settings.CONNECT["REQUESTS_TIMEOUT"],
host=settings.CONNECT_HOST,
auth=settings.CONNECT_AUTH,
retry=settings.CONNECT_RETRY,
timeout=settings.CONNECT_REQUESTS_TIMEOUT,
)

def delete(self) -> bool:
Expand Down
34 changes: 22 additions & 12 deletions django_kafka/management/commands/kafka_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from django_kafka import kafka
from django_kafka.exceptions import DjangoKafkaError
from django_kafka.connect.connector import Connector, ConnectorStatus
from django_kafka.utils import retry

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -134,22 +135,31 @@ def handle_status(self):
self.stdout.write(f"{connector_path}: ", ending="")

connector = kafka.connectors[connector_path]()
if connector.mark_for_removal:
self.stdout.write(self.style.WARNING("skip (REASON: marked for removal)"))
continue

try:
status = connector.status()
except (DjangoKafkaError, RetryError) as error:
self._connector_is_running(connector)
except DjangoKafkaError as error:
self.has_failures = True
self.stdout.write(self.style.ERROR("failed to retrieve"))
self.stdout.write(self.style.ERROR(str(error)))

@retry((DjangoKafkaError,), tries=3, delay=1, backoff=2)
def _connector_is_running(self, connector: Connector):
if connector.mark_for_removal:
self.stdout.write(self.style.WARNING("skip (REASON: marked for removal)"))
return
try:
status = connector.status()
except (DjangoKafkaError, RetryError) as error:
self.has_failures = True
self.stdout.write(self.style.ERROR("failed to retrieve"))
self.stdout.write(self.style.ERROR(str(error)))
else:
if status == ConnectorStatus.RUNNING:
self.stdout.write(self.style.SUCCESS(status))
elif status == ConnectorStatus.UNASSIGNED:
raise DjangoKafkaError(f"Connector status is {status}.")
else:
if status == ConnectorStatus.RUNNING:
self.stdout.write(self.style.SUCCESS(status))
else:
self.has_failures = True
self.stdout.write(self.style.ERROR(status))
self.has_failures = True
self.stdout.write(self.style.ERROR(status))

def handle_delete(self, connector: Connector):
try:
Expand Down
8 changes: 8 additions & 0 deletions django_kafka/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

if TYPE_CHECKING:
from django_kafka.consumer import Consumer
from django_kafka.connect.connector import Connector


T = TypeVar('T')
Expand Down Expand Up @@ -34,9 +35,16 @@ def get_key(self, cls: Type[T]) -> str:

def register(self, cls: Type[T]):
key = self.get_key(cls)
if key in self._classes:
raise DjangoKafkaError(f"`{key}` is already registered.")
self._classes[key] = cls


class ConnectorsRegistry(Registry["Connector"]):
def get_key(self, cls) -> str:
return cls().name


class ConsumersRegistry(Registry["Consumer"]):

def register(self, cls):
Expand Down
14 changes: 4 additions & 10 deletions django_kafka/tests/connect/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,16 @@ class MyConnector(Connector):
config = {}


@override_settings(CONNECT={
"HOST": "http://localhost",
"AUTH": ("user", "pass"),
"RETRY": {},
"REQUESTS_TIMEOUT": 5,
})
class ConnectorTestCase(SimpleTestCase):
@patch("django_kafka.connect.connector.KafkaConnectClient", spec=True)
def test_init_request_session(self, mock_client):
connector = MyConnector()

mock_client.assert_called_with(
host=settings.CONNECT["HOST"],
auth=settings.CONNECT["AUTH"],
retry=settings.CONNECT["RETRY"],
timeout=settings.CONNECT["REQUESTS_TIMEOUT"],
host=settings.CONNECT_HOST,
auth=settings.CONNECT_AUTH,
retry=settings.CONNECT_RETRY,
timeout=settings.CONNECT_REQUESTS_TIMEOUT,
)
self.assertIsInstance(connector.client, KafkaConnectClient)

Expand Down
4 changes: 3 additions & 1 deletion django_kafka/tests/connect/test_kafka_connect_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ def test_handle_status__unassigned(self):
with patch_kafka_connectors(**{"status.return_value": ConnectorStatus.UNASSIGNED}) as connector:
self.command.handle_status()

connector.status.assert_called_once_with()
# status UNASSIGNED is retried 3 times
self.assertEqual(connector.status.call_count, 3)

self.assertTrue(self.command.has_failures)

def test_handle_status__django_kafka_error(self):
Expand Down
123 changes: 86 additions & 37 deletions django_kafka/tests/test_registry.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,84 @@
from typing import Type
from unittest import mock

from django.test import TestCase
from django.test import SimpleTestCase

from django_kafka.connect.connector import Connector
from django_kafka.consumer import Consumer, Topics
from django_kafka.registry import ConsumersRegistry
from django_kafka.exceptions import DjangoKafkaError
from django_kafka.registry import ConsumersRegistry, Registry, ConnectorsRegistry


class ConsumersRegistryTestCase(TestCase):
def _get_consumer_cls(self, name, group_id) -> Type[Consumer]:
return type[Consumer](
name,
(Consumer,),
{"config": {"group.id": group_id}, "topics": Topics()},
class RegistryTestCase(SimpleTestCase):
def _gen_cls(self, name):
return type(name, (object, ), {})

def test_registering_same_key_not_allowed(self):
registry = Registry()

cls = self._gen_cls("SomeClass")
key = registry.get_key(cls)
error_msg = f"`{key}` is already registered."

registry()(cls)

with self.assertRaisesMessage(DjangoKafkaError, error_msg):
registry()(cls)

def test_get_key(self):
cls_a = self._gen_cls("ClassA")
cls_b = self._gen_cls("ClassB")
registry = Registry()

key_a = registry.get_key(cls_a)
self.assertEqual(
key_a,
f"{cls_a.__module__}.{cls_a.__name__}",
)

key_b = registry.get_key(cls_b)
self.assertEqual(
key_b,
f"{cls_b.__module__}.{cls_b.__name__}",
)

def test_decorator_adds_to_registry(self):
consumer_cls_a = self._get_consumer_cls("ConsumerA", "group_a")
consumer_cls_b = self._get_consumer_cls("ConsumerB", "group_b")
cls_a = self._gen_cls("ClassA")
cls_b = self._gen_cls("classB")

registry = ConsumersRegistry()
registry = Registry()

self.assertIs(registry()(consumer_cls_a), consumer_cls_a)
self.assertIs(registry()(consumer_cls_b), consumer_cls_b)
self.assertIs(registry()(cls_a), cls_a)
self.assertIs(registry()(cls_b), cls_b)

key_a = registry.get_key(consumer_cls_a)
self.assertIs(registry[key_a], consumer_cls_a)
key_a = registry.get_key(cls_a)
self.assertIs(registry[key_a], cls_a)

key_b = registry.get_key(consumer_cls_b)
self.assertIs(registry[key_b], consumer_cls_b)
key_b = registry.get_key(cls_b)
self.assertIs(registry[key_b], cls_b)

def test_iter_returns_expected_keys(self):
consumer_cls_a = self._get_consumer_cls("ConsumerA", "group_a")
consumer_cls_b = self._get_consumer_cls("ConsumerB", "group_b")
registry = ConsumersRegistry()
cls_a = self._gen_cls("ClassA")
cls_b = self._gen_cls("ClassB")
registry = Registry()

registry()(consumer_cls_a)
registry()(consumer_cls_b)
registry()(cls_a)
registry()(cls_b)

key_a = registry.get_key(consumer_cls_a)
key_b = registry.get_key(consumer_cls_b)
key_a = registry.get_key(cls_a)
key_b = registry.get_key(cls_b)

self.assertCountEqual(list(registry), [key_a, key_b])


class ConsumersRegistryTestCase(SimpleTestCase):
def _get_consumer_cls(self, name, group_id) -> Type[Consumer]:
return type[Consumer](
name,
(Consumer,),
{"config": {"group.id": group_id}, "topics": Topics()},
)

@mock.patch("django_kafka.retry.consumer.RetryConsumer.build")
def test_retry_consumer_registered(self, mock_retry_consumer_build):
consumer_cls_a = self._get_consumer_cls("ConsumerA", "group_a")
Expand All @@ -61,19 +97,32 @@ def test_retry_consumer_registered(self, mock_retry_consumer_build):
self.assertCountEqual(list(registry), [key_a, retry_key_a, key_b])
self.assertIs(registry[retry_key_a], retry_consumer_mock)

def test_get_key(self):
consumer_cls_a = self._get_consumer_cls("ConsumerA", "group_a")
consumer_cls_b = self._get_consumer_cls("ConsumerB", "group_b")
registry = ConsumersRegistry()

key_a = registry.get_key(consumer_cls_a)
self.assertEqual(
key_a,
f"{consumer_cls_a.__module__}.{consumer_cls_a.__name__}",
class ConnectorsRegistryTestCase(SimpleTestCase):
def _gen_cls(self, name) -> Type[Connector]:
return type[Connector](
name,
(Connector,),
{"config": {}},
)

key_b = registry.get_key(consumer_cls_b)
self.assertEqual(
key_b,
f"{consumer_cls_b.__module__}.{consumer_cls_b.__name__}",
)
def test_get_key(self):
connector_cls = self._gen_cls("ConnectorCls")
registry = ConnectorsRegistry()

key = registry.get_key(connector_cls)
self.assertEqual(key, connector_cls().name)

def test_registering_same_key_not_allowed(self):
registry = ConnectorsRegistry()
connector_a = self._gen_cls("ConnectorA")
# give the same class name to the connector to generate same key
connector_b = self._gen_cls("ConnectorA")

key = registry.get_key(connector_a)
error_msg = f"`{key}` is already registered."

registry()(connector_a)

with self.assertRaisesMessage(DjangoKafkaError, error_msg):
registry()(connector_b)
Loading

0 comments on commit 16f7767

Please sign in to comment.