-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #25 from lsst-sqre/tickets/DM-39646
DM-39646: Add FastAPI app integration through dependencies
- Loading branch information
Showing
14 changed files
with
271 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
name: Dependency Update | ||
|
||
"on": | ||
schedule: | ||
- cron: "0 12 * * 1" | ||
workflow_dispatch: {} | ||
|
||
jobs: | ||
update: | ||
runs-on: ubuntu-latest | ||
timeout-minutes: 10 | ||
|
||
steps: | ||
- uses: actions/checkout@v3 | ||
|
||
- name: Run neophile | ||
uses: lsst-sqre/run-neophile@v1 | ||
with: | ||
python-version: "3.11" | ||
mode: pr | ||
types: pre-commit | ||
app-id: ${{ secrets.NEOPHILE_APP_ID }} | ||
app-secret: ${{ secrets.NEOPHILE_PRIVATE_KEY }} | ||
|
||
- name: Report status | ||
if: always() | ||
uses: ravsamhq/notify-slack-action@v2 | ||
with: | ||
status: ${{ job.status }} | ||
notify_when: "failure" | ||
notification_title: "Periodic dependency update for {repo} failed" | ||
env: | ||
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_ALERT_WEBHOOK }} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
.vscode | ||
|
||
# Byte-compiled / optimized / DLL files | ||
__pycache__/ | ||
*.py[cod] | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
### Backwards-incompatible changes | ||
|
||
- Only Python 3.10 or later is supported. | ||
|
||
### New features | ||
|
||
- Integration into FastAPI apps through dependencies in `kafkit.fastapi.dependencies`: | ||
|
||
- `AioKafkaProducerDependency` provides a Kafka producer based on aiokafka's `AIOKafkaProducer` (requires the `aiokafka` extra). | ||
- `PydanticSchemaManager` provides a Pydantic-based schema manager for Avro schemas, `kafkit.schema.manager.PydanticSchemaManager`. | ||
- `RegistryApiDependency` provides an HTTPX-based Schema Registry client, `kafkit.registry.httpx.RegistryApi`. | ||
|
||
### Other changes | ||
|
||
- Adopt PyPI's trusted publishers mechanism for releases. | ||
- Adopt the new [Neophile](https://github.com/lsst-sqre/neophile) workflow for keeping pre-commit hooks up-to-date. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
"""Kafkit integration with FastApi applications.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
"""FastAPI dependencies for Kafkit applications.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
"""A FastAPI dependency that provides an aiokafka Producer.""" | ||
|
||
import aiokafka # patched for testing | ||
|
||
from kafkit.settings import KafkaConnectionSettings | ||
|
||
__all__ = ["kafka_producer_dependency", "AioKafkaProducerDependency"] | ||
|
||
|
||
class AioKafkaProducerDependency: | ||
"""A FastAPI dependency that provides an aiokafka Producer.""" | ||
|
||
def __init__(self) -> None: | ||
self._producer: aiokafka.AIOKafkaProducer | None = None | ||
|
||
async def initialize(self, settings: KafkaConnectionSettings) -> None: | ||
"""Initialize the dependency (call during FastAPI startup). | ||
Parameters | ||
---------- | ||
settings | ||
The Kafka connection settings. | ||
""" | ||
security_protocol = settings.security_protocol.value | ||
sasl_mechanism = ( | ||
settings.sasl_mechanism.value if settings.sasl_mechanism else None | ||
) | ||
self._producer = aiokafka.AIOKafkaProducer( | ||
bootstrap_servers=settings.bootstrap_servers, | ||
security_protocol=security_protocol, | ||
ssl_context=settings.ssl_context, | ||
sasl_mechanism=sasl_mechanism, | ||
sasl_plain_password=( | ||
settings.sasl_password.get_secret_value() | ||
if settings.sasl_password | ||
else None | ||
), | ||
sasl_plain_username=settings.sasl_username, | ||
) | ||
await self._producer.start() | ||
|
||
async def __call__(self) -> aiokafka.AIOKafkaProducer: | ||
"""Get the dependency (call during FastAPI request handling).""" | ||
if self._producer is None: | ||
raise RuntimeError("Dependency not initialized") | ||
return self._producer | ||
|
||
async def stop(self) -> None: | ||
"""Stop the dependency (call during FastAPI shutdown).""" | ||
if self._producer is None: | ||
raise RuntimeError("Dependency not initialized") | ||
await self._producer.stop() | ||
|
||
|
||
kafka_producer_dependency = AioKafkaProducerDependency() | ||
"""The FastAPI dependency callable that provides an AIOKafkaProducer.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
"""A FastAPI dependency that provides a Kafkit PydanticSchemaManager | ||
for serializing Pydantic models into Avro. | ||
""" | ||
|
||
from collections.abc import Iterable | ||
from typing import Type | ||
|
||
from dataclasses_avroschema.avrodantic import AvroBaseModel | ||
from httpx import AsyncClient | ||
|
||
from kafkit.registry import manager # this is patched in tests | ||
from kafkit.registry.httpx import RegistryApi | ||
|
||
__all__ = [ | ||
"pydantic_schema_manager_dependency", | ||
"PydanticSchemaManagerDependency", | ||
] | ||
|
||
|
||
class PydanticSchemaManagerDependency: | ||
"""A FastAPI dependency that provides a Kafkit PydanticSchemaManager | ||
for serializing Pydantic models into Avro. | ||
""" | ||
|
||
def __init__(self) -> None: | ||
self._schema_manager: manager.PydanticSchemaManager | None = None | ||
|
||
async def initialize( | ||
self, | ||
*, | ||
http_client: AsyncClient, | ||
registry_url: str, | ||
models: Iterable[Type[AvroBaseModel]], | ||
suffix: str = "", | ||
compatibility: str = "FORWARD", | ||
) -> None: | ||
"""Initialize the dependency (call during FastAPI startup). | ||
Parameters | ||
---------- | ||
http_client | ||
The httpx AsyncClient instance to use for HTTP requests. | ||
registry_url | ||
The URL of the Schema Registry. | ||
models | ||
The Pydantic models to register. | ||
suffix | ||
A suffix that is added to the schema name (and thus subject name), | ||
for example ``_dev1``. | ||
compatibility | ||
The compatibility level to use when registering the schemas. | ||
""" | ||
registry_api = RegistryApi(http_client=http_client, url=registry_url) | ||
self._schema_manager = manager.PydanticSchemaManager( | ||
registry=registry_api, suffix=suffix | ||
) | ||
|
||
await self._schema_manager.register_models( | ||
models, compatibility=compatibility | ||
) | ||
|
||
async def __call__(self) -> manager.PydanticSchemaManager: | ||
"""Get the dependency (call during FastAPI request handling).""" | ||
if self._schema_manager is None: | ||
raise RuntimeError("Dependency not initialized") | ||
return self._schema_manager | ||
|
||
|
||
pydantic_schema_manager_dependency = PydanticSchemaManagerDependency() | ||
"""The FastAPI dependency callable that provides a Kafkit PydanticSchemaManager | ||
instance for serializing Pydantic models into Avro. | ||
""" |
Oops, something went wrong.