Skip to content

Commit

Permalink
Implement functionality to manage configs of the Kafka-Connect connec…
Browse files Browse the repository at this point in the history
…tors.

- implement `kafka_connect` management command to manage connectors configurations.
- implement `Connector` interface to define and publish configs.
- implement `KafkaConnectClient` with several methods to talk to Kafka-Connect REST API.

refs #16
  • Loading branch information
bodja committed Sep 30, 2024
1 parent e666e8f commit a1287d8
Show file tree
Hide file tree
Showing 14 changed files with 921 additions and 20 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ build/
.bash_history
db.sqlite3
.ruff_cache
docker-compose.override.yaml
96 changes: 96 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,68 @@ When the consumption of a message in a retryable topic fails, the message is re-

When consumers are started using [start commands](#start-the-Consumers), an additional retry consumer will be started in parallel for any consumer containing a retryable topic. This retry consumer will be assigned to a consumer group whose id is a combination of the original group id and a `.retry` suffix. This consumer is subscribed to the retry topics, and manages the message retry and delay behaviour. Please note that messages are retried directly by the retry consumer and are not sent back to the original topic.

## Connectors

Connectors are defined as python classes decorated with `@kafka.connectors()` which adds the class to the global registry.

`django_kafka.connect.connector.Connector` implements submission, validation and deletion of the connector configuration.

### Define connector:
```python
# Connectors are discovered automatically when placed under the connectors module
# e.g. ./connectors.py

from django_kafka import kafka
from django_kafka.connect.connector import Connector


@kafka.connectors()
class MyConnector(Connector):
config = {
# configuration for the connector
}
```

### Mark a connector for removal:

```python
from django_kafka import kafka
from django_kafka.connect.connector import Connector


@kafka.connectors()
class MyConnector(Connector):
mark_for_removal = True
config = {
# configuration for the connector
}
```

### Manage connectors:

django-kafka provides `./manage.py kafka_connect` management command to manage your connectors.


#### Manage a single connector
```bash
./manage.py kafka_connect path.to.my.SpecialConnector --validate --publish --check-status --ignore-failures
````

#### Manage all connectors
```bash
./manage.py kafka_connect --validate --publish --check-status --ignore-failures
````
`--validate` - validates the config over the connect REST API
`--publish` - create or update the connector or delete when `mark_for_removal = True`
`--check-status` - check the status of the connector is `RUNNING`.
`--ignore-failures` - command wont fail if any of the connectors fail to validate or publish.
See `--help`.
## Settings:
**Defaults:**
Expand All @@ -215,6 +277,22 @@ DJANGO_KAFKA = {
"POLLING_FREQUENCY": 1, # seconds
"SCHEMA_REGISTRY": {},
"ERROR_HANDLER": "django_kafka.error_handlers.ClientErrorHandler",
"CONNECT": {
# Rest API of the kafka-connect instance
"HOST": "http://kafka-connect",
# `requests.auth.AuthBase` instance or tuple of (username, password) for Basic Auth
"AUTH": ("name", "password"),
# kwargs for `urllib3.util.retry.Retry` initialization
"RETRY": dict(
connect=5,
read=5,
status=5,
backoff_factor=0.5,
status_forcelist=[502, 503, 504],
),
# `django_kafka.connect.client.KafkaConnectSession` would pass this value to every request method call
"REQUESTS_TIMEOUT": 30,
},
}
```
Expand Down Expand Up @@ -256,6 +334,24 @@ Default: `django_kafka.error_handlers.ClientErrorHandler`
This is an `error_cb` hook (see [Kafka Client Configuration](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafka-client-configuration) for reference).
It is triggered for client global errors and in case of fatal error it raises `DjangoKafkaError`.
#### `CONNECT`
Default: `{
"HOST": "",
"AUTH": None,
"RETRY": dict(
connect=5,
read=5,
status=5,
backoff_factor=0.5,
status_forcelist=[502, 503, 504],
),
"REQUESTS_TIMEOUT": 30,
}`
Required for `./manage.py kafka_conncect` command.
Used by `django_kafka.connect.connector.Connector` to initialize `django_kafka.connect.client.KafkaConnectClient`.
## Bidirectional data sync with no infinite event loop.
Expand Down
11 changes: 8 additions & 3 deletions django_kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from multiprocessing.pool import Pool
from typing import Optional
from typing import Optional, TYPE_CHECKING

from confluent_kafka.schema_registry import SchemaRegistryClient
from django.utils.functional import cached_property
Expand All @@ -9,9 +9,13 @@
from django_kafka.conf import settings
from django_kafka.exceptions import DjangoKafkaError
from django_kafka.producer import Producer
from django_kafka.registry import ConsumersRegistry
from django_kafka.registry import ConsumersRegistry, Registry
from django_kafka.retry.settings import RetrySettings

if TYPE_CHECKING:
from django_kafka.connect.connector import Connector
from django_kafka.consumer import Consumer

logger = logging.getLogger(__name__)

__version__ = "0.4.2"
Expand All @@ -24,10 +28,11 @@


def autodiscover():
autodiscover_modules("consumers")
autodiscover_modules("consumers", "connectors")


class DjangoKafka:
connectors = Registry["Connector"]()
consumers = ConsumersRegistry()
retry = RetrySettings

Expand Down
16 changes: 16 additions & 0 deletions django_kafka/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,22 @@
"DEAD_LETTER_TOPIC_SUFFIX": "dlt",
"POLLING_FREQUENCY": 1, # seconds
"SCHEMA_REGISTRY": {},
"CONNECT": {
# Rest API of the kafka-connect instance
"HOST": "",
# `requests.auth.AuthBase` instance or tuple of (username, password) for Basic Auth
"AUTH": None,
# kwargs for `urllib3.util.retry.Retry` initialization
"RETRY": dict(
connect=5,
read=5,
status=5,
backoff_factor=0.5,
status_forcelist=[502, 503, 504],
),
# `django_kafka.connect.client.KafkaConnectSession` would pass this value to every request method call
"REQUESTS_TIMEOUT": 30,
},
}


Expand Down
Empty file.
67 changes: 67 additions & 0 deletions django_kafka/connect/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import Optional

import requests
from requests.auth import AuthBase
from urllib3.util import Retry
from requests.adapters import HTTPAdapter

from django_kafka.exceptions import DjangoKafkaError


class KafkaConnectSession(requests.Session):
def __init__(
self,
host: str,
auth: Optional[tuple | AuthBase] = None,
retry: dict = None,
timeout: int = None,
):
super().__init__()
self.auth = auth
self.host = host
self.timeout = timeout
if retry:
self.mount_retry_adapter(retry)

def mount_retry_adapter(self, retry: dict):
self.mount(self.host, HTTPAdapter(max_retries=Retry(**retry)))

def request(self, method, url, *args, **kwargs) -> requests.Response:
kwargs.setdefault("timeout", self.timeout)
return super().request(method, f"{self.host}{url}", *args, **kwargs)


class KafkaConnectClient:
"""
https://kafka.apache.org/documentation/#connect_rest
https://docs.confluent.io/platform/current/connect/references/restapi.html
"""

def __init__(
self,
host: str,
auth: Optional[tuple | AuthBase] = None,
retry: dict = None,
timeout: int = None,
):
self._requests = KafkaConnectSession(host, auth, retry, timeout)

def update_or_create(self, connector_name: str, config: dict):
return self._requests.put(f"/connectors/{connector_name}/config", json=config)

def delete(self, connector_name: str):
return self._requests.delete(f"/connectors/{connector_name}")

def validate(self, config: dict):
if not config.get('connector.class'):
raise DjangoKafkaError("'connector.class' config is required for validation.")

connector_class_name = config.get("connector.class").rsplit(".", 1)[-1]
response = self._requests.put(f"/connector-plugins/{connector_class_name}/config/validate", json=config)
return response

def connector_status(self, connector_name: str):
"""
https://docs.confluent.io/platform/current/connect/references/restapi.html#get--connectors-(string-name)-status
"""
return self._requests.get(f"/connectors/{connector_name}/status")
81 changes: 81 additions & 0 deletions django_kafka/connect/connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from abc import ABC, abstractmethod
from enum import StrEnum

from django_kafka.conf import settings
from django_kafka.exceptions import DjangoKafkaError
from django_kafka.connect.client import KafkaConnectClient

__all__ = [
"Connector",
"ConnectorStatus",
]


class ConnectorStatus(StrEnum):
"""
https://docs.confluent.io/platform/current/connect/monitoring.html#connector-and-task-status
UNASSIGNED: The connector/task has not yet been assigned to a worker.
RUNNING: The connector/task is running.
PAUSED: The connector/task has been administratively paused.
FAILED: The connector/task has failed (usually by raising an exception, which is reported in the status output).
"""
UNASSIGNED = "UNASSIGNED"
RUNNING = "RUNNING"
PAUSED = "PAUSED"


class Connector(ABC):
mark_for_removal = False

@property
def name(self) -> str:
"""Name of the connector."""
return f"{self.__class__.__module__}.{self.__class__.__name__}"

@property
@abstractmethod
def config(self) -> dict:
"""Configurations for the connector."""

def __init__(self):
self.client = KafkaConnectClient(
host=settings.CONNECT["HOST"],
auth=settings.CONNECT["AUTH"],
retry=settings.CONNECT["RETRY"],
timeout=settings.CONNECT["REQUESTS_TIMEOUT"],
)

def delete(self) -> bool:
response = self.client.delete(self.name)

if response.status_code == 404:
return False

if not response.ok:
raise DjangoKafkaError(response.text)

return True

def submit(self) -> dict:
response = self.client.update_or_create(self.name, self.config)

if not response.ok:
raise DjangoKafkaError(response.text)

return response.json()

def is_valid(self, raise_exception=False) -> bool:
response = self.client.validate(self.config)

if raise_exception and not response.ok:
raise DjangoKafkaError(response.text)

return response.ok

def status(self) -> ConnectorStatus:
response = self.client.connector_status(self.name)

if not response.ok:
raise DjangoKafkaError(response.text)

return response.json()["connector"]["state"]
Loading

0 comments on commit a1287d8

Please sign in to comment.