Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: configure ruff and remove legacy tools #100

Merged
merged 1 commit into from
Mar 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
repos:
- repo: local
hooks:
- id: isort
name: isort
language: system
entry: isort
types: [python]
- id: black
name: black
language: system
entry: black
types: [python]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: 'v0.3.2'
hooks:
- id: ruff
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
rev: v4.5.0
hooks:
- id: trailing-whitespace
exclude: ^src/api/client.js$
Expand Down
6 changes: 4 additions & 2 deletions docs/gen_ref_pages.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
with mkdocs_gen_files.open("reference/SUMMARY.md", "w") as nav_file:
nav_file.writelines(nav.build_literate_nav())

readme = Path("README.md").open("r")
with mkdocs_gen_files.open("index.md", "w") as index_file:
with Path("README.md").open("r") as readme, mkdocs_gen_files.open(
"index.md",
"w",
) as index_file:
index_file.writelines(readme.read())
108 changes: 66 additions & 42 deletions pathfinderevents.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@
"""Create CloudEvents from REST-API Requests."""

from __future__ import annotations

import json
import logging
import signal
import sys
from typing import TYPE_CHECKING, Any, Iterable, NoReturn, Self
from urllib.parse import parse_qs

import cherrypy
if TYPE_CHECKING:
from wsgiref.types import StartResponse, WSGIEnvironment

import cherrypy # type: ignore[import-untyped]
from cloudevents.http import CloudEvent
from cloudevents.kafka import to_structured
from configargparse import ArgumentParser, YAMLConfigFileParser
from kafka import KafkaProducer
from configargparse import ( # type: ignore[import-untyped]
ArgumentParser,
YAMLConfigFileParser,
)
from kafka import KafkaProducer # type: ignore[import-untyped]
from werkzeug.exceptions import HTTPException
from werkzeug.routing import Map, Rule
from werkzeug.wrappers import Request, Response

_RUNTIME_ERROR_MISSING_PRODUCER = "run_server called before set_producer"

logger = logging.getLogger(__name__)


Expand All @@ -33,16 +46,18 @@ def from_pathfinder_request(request: Request) -> CloudEvent:
class ApiServer:
"""The API server."""

def __init__(
self,
def __init__( # noqa: PLR0913
self: Self,
*,
bind_addr: str,
bind_port: int,
realm: str,
topic: str,
username: str,
password: str,
debug: bool = False,
):
) -> None:
"""Create ApiServer."""
self.producer: KafkaProducer
self.bind_addr: str = bind_addr
self.bind_port: int = bind_port
Expand All @@ -54,17 +69,18 @@ def __init__(

self.url_map = Map([Rule("/webhook", endpoint="webhook")])

def set_producer(self, producer: KafkaProducer):
def set_producer(self: Self, producer: KafkaProducer) -> None:
"""Set a producer."""
self.producer = producer

def run_server(self):
def run_server(self: Self) -> None:
"""Run the API server."""
if not self.producer:
raise RuntimeError("run_server called before set_producer")
raise RuntimeError(_RUNTIME_ERROR_MISSING_PRODUCER)
if self.debug:
from werkzeug.serving import run_simple

self._server = run_simple(
run_simple(
self.bind_addr,
self.bind_port,
self,
Expand All @@ -75,7 +91,7 @@ def run_server(self):
cherrypy.tree.graft(self, "/")
cherrypy.server.unsubscribe()

self._server = cherrypy._cpserver.Server()
self._server = cherrypy._cpserver.Server() # noqa: SLF001

self._server.socket_host = self.bind_addr
self._server.socket_port = self.bind_port
Expand All @@ -85,24 +101,34 @@ def run_server(self):
cherrypy.engine.start()
cherrypy.engine.block()

def stop_server(self):
def stop_server(self: Self) -> None:
"""Stop the server."""
self._server.stop()
cherrypy.engine.exit()

def __call__(self, environ, start_response):
def __call__(
self: Self,
environ: WSGIEnvironment,
start_response: StartResponse,
) -> Iterable[bytes]:
"""Forward calls to wsgi_app."""
return self.wsgi_app(environ, start_response)

def wsgi_app(self, environ, start_response):
def wsgi_app(
self: Self,
environ: WSGIEnvironment,
start_response: StartResponse,
) -> Iterable[bytes]:
"""Return a wsgi app."""
request = Request(environ)
auth = request.authorization
if auth and self.check_auth(auth.username, auth.password):
if auth and self.check_auth(str(auth.username), str(auth.password)):
response = self.dispatch_request(request)
else:
response = self.auth_required(request)
return response(environ, start_response)

def check_auth(self, username, password):
def check_auth(self: Self, username: str, password: str) -> bool:
"""Check plaintext auth.

Pathfinder doesn't support sending any advanced API credentials like JWT or
Expand All @@ -111,7 +137,7 @@ def check_auth(self, username, password):
"""
return self.username == username and self.password == password

def auth_required(self, request):
def auth_required(self: Self, _: Request) -> Response:
"""Return a 401 unauthorized reponse."""
return Response(
"Could not verify your access level for that URL.\n"
Expand All @@ -120,7 +146,7 @@ def auth_required(self, request):
headers={"WWW-Authenticate": f'Basic realm="{self.realm}"'},
)

def dispatch_request(self, request):
def dispatch_request(self: Self, request: Request) -> Response:
"""Dispatch request and return any errors in response."""
adapter = self.url_map.bind_to_environ(request.environ)
try:
Expand All @@ -133,21 +159,24 @@ def dispatch_request(self, request):
{"Content-Type": "application/json"},
)

def on_webhook(self, request):
def on_webhook(self: Self, request: Request) -> Response:
"""Receive a Pathfinder RestApi call and produce a CloudEvent."""

def on_send_error(ex): # pragma: no cover
def on_send_error(ex: Exception) -> None: # pragma: no cover
logger.error("Failed to send CloudEvent", exc_info=ex)

def _key_mapper(ce: CloudEvent) -> Any | None: # noqa: ANN401
return ".".join(
[
ce.get("type"), # type: ignore[list-item]
ce.get("subject"), # type: ignore[list-item]
],
)

ce = from_pathfinder_request(request)
kafka_msg = to_structured(
ce,
key_mapper=lambda event: ".".join(
[
ce.get("type"),
ce.get("subject"),
]
),
key_mapper=_key_mapper,
)
self.producer.send(
self.topic,
Expand All @@ -157,26 +186,24 @@ def on_send_error(ex): # pragma: no cover
).add_errback(on_send_error)
self.producer.flush()
logger.info(
f"Forwarded event {ce.get('type')} with channel {ce.get('subject')}"
"Forwarded event %s with channel %s",
ce.get("type"),
ce.get("subject"),
)
return Response(
status="200 Event Received",
)


def app(
def app( # noqa: PLR0913
api: ApiServer,
bootstrap_servers: list[str],
security_protocol: str,
tls_cafile: str,
tls_certfile: str,
tls_keyfile: str,
topic: str,
max_messages: int = 0,
):
"""
Set up pathfinder subscription and kafka producer, blocks while processing messages.
"""
) -> None:
"""Set up pathfinder sub, kafka producer & block while processing messages."""
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
security_protocol=security_protocol,
Expand All @@ -189,7 +216,7 @@ def app(
)
api.set_producer(producer)

def on_sigint(*_): # pragma: no cover
def on_sigint(*_: Any) -> NoReturn: # noqa: ANN401 # pragma: no cover
api.stop_server()
producer.flush()
producer.close()
Expand All @@ -202,18 +229,16 @@ def on_sigint(*_): # pragma: no cover
producer.close()


def main(): # pragma: no cover
"""
CLI entrypoint parses args, sets up logging, and calls `app()`.
"""
def main() -> None: # pragma: no cover
"""CLI entrypoint parses args, sets up logging, and calls `app()`."""
parser = ArgumentParser(
__name__,
config_file_parser_class=YAMLConfigFileParser,
default_config_files=[f"{__name__}.yaml"],
)
parser.add(
"--bind-addr",
default="0.0.0.0",
default="127.0.0.1",
env_var="APP_BIND_ADDR",
)
parser.add(
Expand Down Expand Up @@ -286,7 +311,7 @@ def main(): # pragma: no cover
logging.basicConfig(level=logging.INFO)
if options.debug:
logging.basicConfig(level=logging.DEBUG)
logger.info(f"Starting {__name__}...")
logger.info("Starting %s", __name__)

app(
api=ApiServer(
Expand All @@ -303,7 +328,6 @@ def main(): # pragma: no cover
tls_cafile=options.kafka_tls_cafile,
tls_certfile=options.kafka_tls_certfile,
tls_keyfile=options.kafka_tls_keyfile,
topic=options.kafka_topic,
)


Expand Down
Loading
Loading