Skip to content

Commit

Permalink
Change Connector.name generation logic.
Browse files Browse the repository at this point in the history
Do not allow adding to the registry the same key.

- Use class name only instead of the module and class name together, to avoid issues when the connector is moved to a different module
- Registry will now fail when attempting to register the same key.

refs #16
  • Loading branch information
bodja committed Oct 4, 2024
1 parent 4fa00be commit eee48ac
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 40 deletions.
4 changes: 2 additions & 2 deletions django_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from django_kafka.conf import settings
from django_kafka.exceptions import DjangoKafkaError
from django_kafka.producer import Producer
from django_kafka.registry import ConsumersRegistry, Registry
from django_kafka.registry import ConsumersRegistry, Registry, ConnectorsRegistry
from django_kafka.retry.settings import RetrySettings

if TYPE_CHECKING:
Expand All @@ -32,7 +32,7 @@ def autodiscover():


class DjangoKafka:
connectors = Registry["Connector"]()
connectors = ConnectorsRegistry()
consumers = ConsumersRegistry()
retry = RetrySettings

Expand Down
2 changes: 1 addition & 1 deletion django_kafka/connect/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Connector(ABC):
@property
def name(self) -> str:
"""Name of the connector."""
return f"{settings.CLIENT_ID}.{self.__class__.__module__}.{self.__class__.__name__}"
return f"{settings.CLIENT_ID}.{self.__class__.__name__}"

@property
@abstractmethod
Expand Down
8 changes: 8 additions & 0 deletions django_kafka/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

if TYPE_CHECKING:
from django_kafka.consumer import Consumer
from django_kafka.connect.connector import Connector


T = TypeVar('T')
Expand Down Expand Up @@ -34,9 +35,16 @@ def get_key(self, cls: Type[T]) -> str:

def register(self, cls: Type[T]):
key = self.get_key(cls)
if key in self._classes:
raise DjangoKafkaError(f"`{key}` is already registered.")
self._classes[key] = cls


class ConnectorsRegistry(Registry["Connector"]):
def get_key(self, cls) -> str:
return cls().name


class ConsumersRegistry(Registry["Consumer"]):

def register(self, cls):
Expand Down
123 changes: 86 additions & 37 deletions django_kafka/tests/test_registry.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,84 @@
from typing import Type
from unittest import mock

from django.test import TestCase
from django.test import SimpleTestCase

from django_kafka.connect.connector import Connector
from django_kafka.consumer import Consumer, Topics
from django_kafka.registry import ConsumersRegistry
from django_kafka.exceptions import DjangoKafkaError
from django_kafka.registry import ConsumersRegistry, Registry, ConnectorsRegistry


class ConsumersRegistryTestCase(TestCase):
def _get_consumer_cls(self, name, group_id) -> Type[Consumer]:
return type[Consumer](
name,
(Consumer,),
{"config": {"group.id": group_id}, "topics": Topics()},
class RegistryTestCase(SimpleTestCase):
def _gen_cls(self, name):
return type(name, (object, ), {})

def test_registering_same_key_not_allowed(self):
registry = Registry()

cls = self._gen_cls("SomeClass")
key = registry.get_key(cls)
error_msg = f"`{key}` is already registered."

registry()(cls)

with self.assertRaisesMessage(DjangoKafkaError, error_msg):
registry()(cls)

def test_get_key(self):
cls_a = self._gen_cls("ClassA")
cls_b = self._gen_cls("ClassB")
registry = Registry()

key_a = registry.get_key(cls_a)
self.assertEqual(
key_a,
f"{cls_a.__module__}.{cls_a.__name__}",
)

key_b = registry.get_key(cls_b)
self.assertEqual(
key_b,
f"{cls_b.__module__}.{cls_b.__name__}",
)

def test_decorator_adds_to_registry(self):
consumer_cls_a = self._get_consumer_cls("ConsumerA", "group_a")
consumer_cls_b = self._get_consumer_cls("ConsumerB", "group_b")
cls_a = self._gen_cls("ClassA")
cls_b = self._gen_cls("classB")

registry = ConsumersRegistry()
registry = Registry()

self.assertIs(registry()(consumer_cls_a), consumer_cls_a)
self.assertIs(registry()(consumer_cls_b), consumer_cls_b)
self.assertIs(registry()(cls_a), cls_a)
self.assertIs(registry()(cls_b), cls_b)

key_a = registry.get_key(consumer_cls_a)
self.assertIs(registry[key_a], consumer_cls_a)
key_a = registry.get_key(cls_a)
self.assertIs(registry[key_a], cls_a)

key_b = registry.get_key(consumer_cls_b)
self.assertIs(registry[key_b], consumer_cls_b)
key_b = registry.get_key(cls_b)
self.assertIs(registry[key_b], cls_b)

def test_iter_returns_expected_keys(self):
consumer_cls_a = self._get_consumer_cls("ConsumerA", "group_a")
consumer_cls_b = self._get_consumer_cls("ConsumerB", "group_b")
registry = ConsumersRegistry()
cls_a = self._gen_cls("ClassA")
cls_b = self._gen_cls("ClassB")
registry = Registry()

registry()(consumer_cls_a)
registry()(consumer_cls_b)
registry()(cls_a)
registry()(cls_b)

key_a = registry.get_key(consumer_cls_a)
key_b = registry.get_key(consumer_cls_b)
key_a = registry.get_key(cls_a)
key_b = registry.get_key(cls_b)

self.assertCountEqual(list(registry), [key_a, key_b])


class ConsumersRegistryTestCase(SimpleTestCase):
def _get_consumer_cls(self, name, group_id) -> Type[Consumer]:
return type[Consumer](
name,
(Consumer,),
{"config": {"group.id": group_id}, "topics": Topics()},
)

@mock.patch("django_kafka.retry.consumer.RetryConsumer.build")
def test_retry_consumer_registered(self, mock_retry_consumer_build):
consumer_cls_a = self._get_consumer_cls("ConsumerA", "group_a")
Expand All @@ -61,19 +97,32 @@ def test_retry_consumer_registered(self, mock_retry_consumer_build):
self.assertCountEqual(list(registry), [key_a, retry_key_a, key_b])
self.assertIs(registry[retry_key_a], retry_consumer_mock)

def test_get_key(self):
consumer_cls_a = self._get_consumer_cls("ConsumerA", "group_a")
consumer_cls_b = self._get_consumer_cls("ConsumerB", "group_b")
registry = ConsumersRegistry()

key_a = registry.get_key(consumer_cls_a)
self.assertEqual(
key_a,
f"{consumer_cls_a.__module__}.{consumer_cls_a.__name__}",
class ConnectorsRegistryTestCase(SimpleTestCase):
def _gen_cls(self, name) -> Type[Connector]:
return type[Connector](
name,
(Connector,),
{"config": {}},
)

key_b = registry.get_key(consumer_cls_b)
self.assertEqual(
key_b,
f"{consumer_cls_b.__module__}.{consumer_cls_b.__name__}",
)
def test_get_key(self):
connector_cls = self._gen_cls("ConnectorCls")
registry = ConnectorsRegistry()

key = registry.get_key(connector_cls)
self.assertEqual(key, connector_cls().name)

def test_registering_same_key_not_allowed(self):
registry = ConnectorsRegistry()
connector_a = self._gen_cls("ConnectorA")
# give the same class name to the connector to generate same key
connector_b = self._gen_cls("ConnectorA")

key = registry.get_key(connector_a)
error_msg = f"`{key}` is already registered."

registry()(connector_a)

with self.assertRaisesMessage(DjangoKafkaError, error_msg):
registry()(connector_b)

0 comments on commit eee48ac

Please sign in to comment.