Skip to content

Commit

Permalink
feat: change Connector.name is using (optional prefix + class name).
Browse files Browse the repository at this point in the history
feat: add `kafka` module to autodiscover connectors and consumers.
feat: add optional context kwarg to `DjangoKafkaError`.
fix: retry on 404 when checking for connector status.
fix: do not allow adding to the registry the same key.
fix: retry connector status check when status is UNASSIGNED.

refs #16
  • Loading branch information
bodja committed Oct 7, 2024
1 parent 4fa00be commit f9eac19
Show file tree
Hide file tree
Showing 12 changed files with 278 additions and 135 deletions.
89 changes: 57 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class Topic1(Topic):

Consumers define which topics they take care of. Usually you want one consumer per project. If 2 consumers are defined, then they will be started in parallel.

Consumers are auto-discovered and are expected to be located under the `consumers.py`.
Consumers are auto-discovered and are expected to be located under the `some_django_app/kafka/consumers.py` or `some_django_app/consumers.py`.

```python
# ./consumers.py
Expand Down Expand Up @@ -205,6 +205,8 @@ When consumers are started using [start commands](#start-the-Consumers), an addi

## Connectors

Connectors are auto-discovered and are expected to be located under the `some_django_app/kafka/connectors.py` or `some_django_app/connectors.py`.

Connectors are defined as python classes decorated with `@kafka.connectors()` which adds the class to the global registry.

`django_kafka.connect.connector.Connector` implements submission, validation and deletion of the connector configuration.
Expand Down Expand Up @@ -271,28 +273,34 @@ See `--help`.
```python
DJANGO_KAFKA = {
"CLIENT_ID": f"{socket.gethostname()}-python",
"ERROR_HANDLER": "django_kafka.error_handlers.ClientErrorHandler",
"GLOBAL_CONFIG": {},
"PRODUCER_CONFIG": {},
"CONSUMER_CONFIG": {},
"RETRY_CONSUMER_CONFIG": {
"auto.offset.reset": "earliest",
"enable.auto.offset.store": False,
"topic.metadata.refresh.interval.ms": 10000,
},
"RETRY_TOPIC_SUFFIX": "retry",
"DEAD_LETTER_TOPIC_SUFFIX": "dlt",
"POLLING_FREQUENCY": 1, # seconds
"SCHEMA_REGISTRY": {},
"ERROR_HANDLER": "django_kafka.error_handlers.ClientErrorHandler",
"CONNECT": {
# Rest API of the kafka-connect instance
"HOST": "http://kafka-connect",
# `requests.auth.AuthBase` instance or tuple of (username, password) for Basic Auth
"AUTH": ("name", "password"),
# kwargs for `urllib3.util.retry.Retry` initialization
"RETRY": dict(
connect=5,
read=5,
status=5,
backoff_factor=0.5,
status_forcelist=[502, 503, 504],
),
# `django_kafka.connect.client.KafkaConnectSession` would pass this value to every request method call
"REQUESTS_TIMEOUT": 30,
},
# Rest API of the kafka-connect instance
"CONNECT_HOST": None,
# `requests.auth.AuthBase` instance or tuple of (username, password) for Basic Auth
"CONNECT_AUTH": None,
# kwargs for `urllib3.util.retry.Retry` initialization
"CONNECT_RETRY": dict(
connect=5,
read=5,
status=5,
backoff_factor=0.5,
status_forcelist=[502, 503, 504],
),
# `django_kafka.connect.client.KafkaConnectSession` would pass this value to every request method call
"CONNECT_REQUESTS_TIMEOUT": 30,
"CONNECTOR_NAME_PREFIX": "",
}
```
Expand Down Expand Up @@ -334,21 +342,38 @@ Default: `django_kafka.error_handlers.ClientErrorHandler`
This is an `error_cb` hook (see [Kafka Client Configuration](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafka-client-configuration) for reference).
It is triggered for client global errors and in case of fatal error it raises `DjangoKafkaError`.
#### `CONNECT`
Default: `{
"HOST": "",
"AUTH": None,
"RETRY": dict(
connect=5,
read=5,
status=5,
backoff_factor=0.5,
status_forcelist=[502, 503, 504],
),
"REQUESTS_TIMEOUT": 30,
}`
#### `CONNECT_HOST`
Default: `None`
Rest API of the kafka-connect instance.
#### `CONNECT_AUTH`
Default: `None`
`requests.auth.AuthBase` instance or `("username", "password")` for Basic Auth.
#### `CONNECT_AUTH`
Default: `dict(
connect=5,
read=5,
status=5,
backoff_factor=0.5,
status_forcelist=[502, 503, 504],
)`
kwargs for `urllib3.util.retry.Retry` initialization.
#### `CONNECT_REQUESTS_TIMEOUT`
Default: `30`
`django_kafka.connect.client.KafkaConnectSession` would pass this value to every request method call.
#### `CONNECTOR_NAME_PREFIX`
Default: `""`
Prefix which will be added to the connector name when publishing the connector.
Required for `./manage.py kafka_conncect` command.
`CONNECT_` settings are required for `./manage.py kafka_connect` command which talks to the Rest API of the kafka-connect instance.
Used by `django_kafka.connect.connector.Connector` to initialize `django_kafka.connect.client.KafkaConnectClient`.
Expand Down
11 changes: 8 additions & 3 deletions django_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from django_kafka.conf import settings
from django_kafka.exceptions import DjangoKafkaError
from django_kafka.producer import Producer
from django_kafka.registry import ConsumersRegistry, Registry
from django_kafka.registry import ConnectorsRegistry, ConsumersRegistry
from django_kafka.retry.settings import RetrySettings

if TYPE_CHECKING:
Expand All @@ -28,11 +28,16 @@


def autodiscover():
autodiscover_modules("consumers", "connectors")
autodiscover_modules(
"consumers",
"connectors",
"kafka.consumers",
"kafka.connectors",
)


class DjangoKafka:
connectors = Registry["Connector"]()
connectors = ConnectorsRegistry()
consumers = ConsumersRegistry()
retry = RetrySettings

Expand Down
31 changes: 15 additions & 16 deletions django_kafka/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,21 @@
"DEAD_LETTER_TOPIC_SUFFIX": "dlt",
"POLLING_FREQUENCY": 1, # seconds
"SCHEMA_REGISTRY": {},
"CONNECT": {
# Rest API of the kafka-connect instance
"HOST": "",
# `requests.auth.AuthBase` instance or tuple of (username, password) for Basic Auth
"AUTH": None,
# kwargs for `urllib3.util.retry.Retry` initialization
"RETRY": dict(
connect=5,
read=5,
status=5,
backoff_factor=0.5,
status_forcelist=[502, 503, 504],
),
# `django_kafka.connect.client.KafkaConnectSession` would pass this value to every request method call
"REQUESTS_TIMEOUT": 30,
},
# Rest API of the kafka-connect instance
"CONNECT_HOST": None, # e.g. http://kafka-connect
# `requests.auth.AuthBase` instance or tuple of (username, password) for Basic Auth
"CONNECT_AUTH": None,
# kwargs for `urllib3.util.retry.Retry` initialization
"CONNECT_RETRY": dict(
connect=5,
read=5,
status=5,
backoff_factor=0.5,
status_forcelist=[502, 503, 504],
),
# `django_kafka.connect.client.KafkaConnectSession` would pass this value to every request method call
"CONNECT_REQUESTS_TIMEOUT": 30,
"CONNECTOR_NAME_PREFIX": "",
}


Expand Down
27 changes: 16 additions & 11 deletions django_kafka/connect/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,28 @@ class ConnectorStatus(StrEnum):
class Connector(ABC):
mark_for_removal = False

@classmethod
@property
def name(self) -> str:
def name(cls) -> str:
"""Name of the connector."""
return f"{settings.CLIENT_ID}.{self.__class__.__module__}.{self.__class__.__name__}"
if settings.CONNECTOR_NAME_PREFIX:
return f"{settings.CONNECTOR_NAME_PREFIX}.{cls.__name__}"
return cls.__name__

@property
@abstractmethod
def config(self) -> dict:
"""Configurations for the connector."""

def __init__(self):
if not settings.CONNECT_HOST:
raise DjangoKafkaError("Kafka `CONNECT_HOST` is not configured.")

self.client = KafkaConnectClient(
host=settings.CONNECT["HOST"],
auth=settings.CONNECT["AUTH"],
retry=settings.CONNECT["RETRY"],
timeout=settings.CONNECT["REQUESTS_TIMEOUT"],
host=settings.CONNECT_HOST,
auth=settings.CONNECT_AUTH,
retry=settings.CONNECT_RETRY,
timeout=settings.CONNECT_REQUESTS_TIMEOUT,
)

def delete(self) -> bool:
Expand All @@ -52,30 +58,29 @@ def delete(self) -> bool:
return False

if not response.ok:
raise DjangoKafkaError(response.text)
raise DjangoKafkaError(response.text, context=response)

return True

def submit(self) -> dict:
response = self.client.update_or_create(self.name, self.config)

if not response.ok:
raise DjangoKafkaError(response.text)
raise DjangoKafkaError(response.text, context=response)

return response.json()

def is_valid(self, raise_exception=False) -> bool:
response = self.client.validate(self.config)

if raise_exception and not response.ok:
raise DjangoKafkaError(response.text)
raise DjangoKafkaError(response.text, context=response)

return response.ok

def status(self) -> ConnectorStatus:
response = self.client.connector_status(self.name)

if not response.ok:
raise DjangoKafkaError(response.text)

raise DjangoKafkaError(response.text, context=response)
return response.json()["connector"]["state"]
4 changes: 3 additions & 1 deletion django_kafka/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
class DjangoKafkaError(Exception):
pass
def __init__(self, *args, context: any = None, **kwargs):
super().__init__(*args, **kwargs)
self.context = context
44 changes: 32 additions & 12 deletions django_kafka/management/commands/kafka_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

from django.core.management import CommandError
from django.core.management.base import BaseCommand
from requests import Response
from requests.exceptions import RetryError

from django_kafka import kafka
from django_kafka.exceptions import DjangoKafkaError
from django_kafka.connect.connector import Connector, ConnectorStatus
from django_kafka.utils import retry

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -134,22 +136,40 @@ def handle_status(self):
self.stdout.write(f"{connector_path}: ", ending="")

connector = kafka.connectors[connector_path]()
if connector.mark_for_removal:
self.stdout.write(self.style.WARNING("skip (REASON: marked for removal)"))
continue

try:
status = connector.status()
except (DjangoKafkaError, RetryError) as error:
self._connector_is_running(connector)
except DjangoKafkaError as error:
self.has_failures = True
self.stdout.write(self.style.ERROR("failed to retrieve"))
self.stdout.write(self.style.ERROR(str(error)))

@retry((DjangoKafkaError,), tries=3, delay=1, backoff=2)
def _connector_is_running(self, connector: Connector):
if connector.mark_for_removal:
self.stdout.write(self.style.WARNING("skip (REASON: marked for removal)"))
return
try:
status = connector.status()
except DjangoKafkaError as error:
if isinstance(error.context, Response) and error.context.status_code == 404:
# retry: on 404 as some delays are expected
raise

self.has_failures = True
self.stdout.write(self.style.ERROR("failed to retrieve"))
self.stdout.write(self.style.ERROR(str(error)))
except RetryError as error:
self.has_failures = True
self.stdout.write(self.style.ERROR("failed to retrieve"))
self.stdout.write(self.style.ERROR(str(error)))
else:
if status == ConnectorStatus.RUNNING:
self.stdout.write(self.style.SUCCESS(status))
elif status == ConnectorStatus.UNASSIGNED:
# retry: on UNASSIGNED as some delays are expected
raise DjangoKafkaError(f"Connector status is {status}.")
else:
if status == ConnectorStatus.RUNNING:
self.stdout.write(self.style.SUCCESS(status))
else:
self.has_failures = True
self.stdout.write(self.style.ERROR(status))
self.has_failures = True
self.stdout.write(self.style.ERROR(status))

def handle_delete(self, connector: Connector):
try:
Expand Down
8 changes: 8 additions & 0 deletions django_kafka/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

if TYPE_CHECKING:
from django_kafka.consumer import Consumer
from django_kafka.connect.connector import Connector


T = TypeVar('T')
Expand Down Expand Up @@ -34,9 +35,16 @@ def get_key(self, cls: Type[T]) -> str:

def register(self, cls: Type[T]):
key = self.get_key(cls)
if key in self._classes:
raise DjangoKafkaError(f"`{key}` is already registered.")
self._classes[key] = cls


class ConnectorsRegistry(Registry["Connector"]):
def get_key(self, cls) -> str:
return cls.name


class ConsumersRegistry(Registry["Consumer"]):

def register(self, cls):
Expand Down
15 changes: 5 additions & 10 deletions django_kafka/tests/connect/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,17 @@ class MyConnector(Connector):
config = {}


@override_settings(CONNECT={
"HOST": "http://localhost",
"AUTH": ("user", "pass"),
"RETRY": {},
"REQUESTS_TIMEOUT": 5,
})
@patch.multiple("django_kafka.conf.settings", CONNECT_HOST="http://kafka-connect")
class ConnectorTestCase(SimpleTestCase):
@patch("django_kafka.connect.connector.KafkaConnectClient", spec=True)
def test_init_request_session(self, mock_client):
connector = MyConnector()

mock_client.assert_called_with(
host=settings.CONNECT["HOST"],
auth=settings.CONNECT["AUTH"],
retry=settings.CONNECT["RETRY"],
timeout=settings.CONNECT["REQUESTS_TIMEOUT"],
host=settings.CONNECT_HOST,
auth=settings.CONNECT_AUTH,
retry=settings.CONNECT_RETRY,
timeout=settings.CONNECT_REQUESTS_TIMEOUT,
)
self.assertIsInstance(connector.client, KafkaConnectClient)

Expand Down
4 changes: 3 additions & 1 deletion django_kafka/tests/connect/test_kafka_connect_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ def test_handle_status__unassigned(self):
with patch_kafka_connectors(**{"status.return_value": ConnectorStatus.UNASSIGNED}) as connector:
self.command.handle_status()

connector.status.assert_called_once_with()
# status UNASSIGNED is retried 3 times
self.assertEqual(connector.status.call_count, 3)

self.assertTrue(self.command.has_failures)

def test_handle_status__django_kafka_error(self):
Expand Down
Loading

0 comments on commit f9eac19

Please sign in to comment.