diff --git a/README.rst b/README.rst index 0a66eb1c7..56c0ae89a 100644 --- a/README.rst +++ b/README.rst @@ -461,7 +461,15 @@ Keys to take special care are the ones needed to configure Kafka and advertised_ * - ``master_election_strategy`` - ``lowest`` - Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup) - + * - ``metrics_extended`` + - ``true`` + - Enable extended metrics. Extended metrics: connections_active, [request|response]_size + * - ``statsd_host`` + - ``127.0.0.1`` + - Host of statsd server + * - ``statsd_port`` + - ``8125`` + - Port of statsd server Authentication and authorization of Karapace Schema Registry REST API ===================================================================== diff --git a/karapace.config.json b/karapace.config.json index 55303ff4d..3a4fe0a43 100644 --- a/karapace.config.json +++ b/karapace.config.json @@ -27,5 +27,8 @@ "registry_authfile": null, "topic_name": "_schemas", "protobuf_runtime_directory": "runtime", - "session_timeout_ms": 10000 + "session_timeout_ms": 10000, + "metrics_extended": true, + "statsd_host": "127.0.0.1", + "statsd_port": 8125 } diff --git a/karapace/config.py b/karapace/config.py index f426dfcc5..4c93a2b0c 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -75,6 +75,9 @@ class Config(TypedDict): karapace_registry: bool master_election_strategy: str protobuf_runtime_directory: str + metrics_extended: bool + statsd_host: str + statsd_port: int sentry: NotRequired[Mapping[str, object]] tags: NotRequired[Mapping[str, object]] @@ -144,6 +147,9 @@ class ConfigDefaults(Config, total=False): "karapace_registry": False, "master_election_strategy": "lowest", "protobuf_runtime_directory": "runtime", + "metrics_extended": True, + "statsd_host": "127.0.0.1", + "statsd_port": 8125, } SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD] diff --git a/karapace/metrics.py b/karapace/metrics.py new file mode 100644 index 000000000..bf45bec3a --- /dev/null +++ b/karapace/metrics.py @@ -0,0 +1,119 @@ +""" +karapace - metrics +Supports collection of system metrics +list of supported metrics: +connections-active - The number of active HTTP(S) connections to server. + Data collected inside aiohttp request handler. + +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from __future__ import annotations + +from karapace.config import Config +from karapace.statsd import StatsClient + +import os +import psutil +import schedule +import threading +import time + + +class Singleton(type): + _instance: Singleton | None = None + + def __call__(cls, *args: str, **kwargs: int) -> Singleton: + if cls._instance is None: + instance = super().__call__(*args, **kwargs) + cls._instance = instance + return cls._instance + + +class Metrics(metaclass=Singleton): + def __init__(self) -> None: + self.active = False + self.stats_client: StatsClient | None = None + self.is_ready = False + self.stop_event = threading.Event() + self.worker_thread = threading.Thread(target=self.worker) + self.lock = threading.Lock() + + def setup(self, stats_client: StatsClient, config: Config) -> None: + self.active = config.get("metrics_extended") or False + if not self.active: + return + with self.lock: + if self.is_ready: + return + self.is_ready = True + if not self.stats_client: + self.stats_client = stats_client + else: + self.active = False + return + + schedule.every(10).seconds.do(self.connections) + self.worker_thread.start() + + def request(self, size: int) -> None: + if not self.active: + return + if not isinstance(self.stats_client, StatsClient): + raise RuntimeError("no StatsClient available") + self.stats_client.gauge("request-size", size) + + def response(self, size: int) -> None: + if not self.active: + return + if not isinstance(self.stats_client, StatsClient): + raise RuntimeError("no StatsClient available") + self.stats_client.gauge("response-size", size) + + def are_we_master(self, is_master: bool) -> None: + if not self.active: + return + if not isinstance(self.stats_client, StatsClient): + raise RuntimeError("no StatsClient available") + self.stats_client.gauge("master-slave-role", int(is_master)) + + def latency(self, latency_ms: float) -> None: + if not self.active: + return + if not isinstance(self.stats_client, StatsClient): + raise RuntimeError("no StatsClient available") + self.stats_client.timing("latency_ms", latency_ms) + + def error(self) -> None: + if not self.active: + return + if not isinstance(self.stats_client, StatsClient): + raise RuntimeError("no StatsClient available") + self.stats_client.increase("error_total", 1) + + def connections(self) -> None: + if not self.active: + return + if not isinstance(self.stats_client, StatsClient): + raise RuntimeError("no StatsClient available") + connections = 0 + karapace_proc = psutil.Process(os.getpid()) + + for conn in karapace_proc.connections(kind="tcp"): + if conn.laddr and conn.status == "ESTABLISHED": + connections += 1 + self.stats_client.gauge("connections-active", connections) + + def worker(self) -> None: + while True: + if self.stop_event.is_set(): + break + schedule.run_pending() + time.sleep(1) + + def cleanup(self) -> None: + if not self.active: + return + self.stop_event.set() + if self.worker_thread.is_alive(): + self.worker_thread.join() diff --git a/karapace/rapu.py b/karapace/rapu.py index 2b9decf12..5978d3c30 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -9,6 +9,7 @@ from accept_types import get_best_match from http import HTTPStatus from karapace.config import Config, create_server_ssl_context +from karapace.metrics import Metrics from karapace.statsd import StatsClient from karapace.utils import json_decode, json_encode from karapace.version import __version__ @@ -134,6 +135,8 @@ def __init__( if content_type: self.headers["Content-Type"] = content_type super().__init__(f"HTTPResponse {status.value}") + if not is_success(status): + Metrics().error() def ok(self) -> bool: """True if resposne has a 2xx status_code""" @@ -169,6 +172,7 @@ def __init__( self.stats = StatsClient(config=config) self.app.on_cleanup.append(self.close_by_app) self.not_ready_handler = not_ready_handler + Metrics().setup(self.stats, config) def _create_aiohttp_application(self, *, config: Config) -> aiohttp.web.Application: return aiohttp.web.Application(client_max_size=config["http_request_max_size"]) @@ -183,6 +187,7 @@ async def close(self) -> None: set as hook because the awaitables have to run inside the event loop created by the aiohttp library. """ + Metrics().cleanup() self.stats.close() @staticmethod @@ -269,15 +274,21 @@ async def _handle_request( url=request.url, path_for_stats=path_for_stats, ) + try: if request.method == "OPTIONS": origin = request.headers.get("Origin") if not origin: raise HTTPResponse(body="OPTIONS missing Origin", status=HTTPStatus.BAD_REQUEST) headers = self.cors_and_server_headers_for_request(request=rapu_request, origin=origin) + raise HTTPResponse(body=b"", status=HTTPStatus.OK, headers=headers) body = await request.read() + if body: + Metrics().request(len(body)) + else: + Metrics().request(0) if json_request: if not body: raise HTTPResponse(body="Missing request JSON body", status=HTTPStatus.BAD_REQUEST) @@ -385,6 +396,7 @@ async def _handle_request( ) headers = {"Content-Type": "application/json"} resp = aiohttp.web.Response(body=body, status=status.value, headers=headers) + except asyncio.CancelledError: self.log.debug("Client closed connection") raise @@ -393,6 +405,8 @@ async def _handle_request( self.log.exception("Unexpected error handling user request: %s %s", request.method, request.url) resp = aiohttp.web.Response(text="Internal Server Error", status=HTTPStatus.INTERNAL_SERVER_ERROR.value) finally: + Metrics().response(resp.content_length) + Metrics().latency((time.monotonic() - start_time) * 1000) self.stats.timing( self.app_request_metric, time.monotonic() - start_time, diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index 867eeb633..e29e167c7 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -25,6 +25,7 @@ from karapace.key_format import KeyFormatter from karapace.master_coordinator import MasterCoordinator from karapace.messaging import KarapaceProducer +from karapace.metrics import Metrics from karapace.offset_watcher import OffsetWatcher from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema from karapace.schema_reader import KafkaSchemaReader @@ -123,6 +124,7 @@ async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str | elif not ignore_readiness and self.schema_reader.ready is False: LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready) else: + Metrics().are_we_master(are_we_master) return are_we_master, master_url await asyncio.sleep(1.0) diff --git a/karapace/statsd.py b/karapace/statsd.py index be37d92ef..8115f76f9 100644 --- a/karapace/statsd.py +++ b/karapace/statsd.py @@ -32,7 +32,9 @@ def __init__( host: str = STATSD_HOST, port: int = STATSD_PORT, ) -> None: - self._dest_addr: Final = (host, port) + _host = config.get("statsd_host") if "statsd_host" in config else host + _port = config.get("statsd_port") if "statsd_port" in config else port + self._dest_addr: Final = (_host, _port) self._socket: Final = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._tags: Final = config.get("tags", {}) self.sentry_client: Final = get_sentry_client(sentry_config=config.get("sentry", None)) diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 3b2822d6b..dffa1bd6e 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -123,7 +123,7 @@ multidict==6.0.4 # -r requirements.txt # aiohttp # yarl -networkx==2.8.8 +networkx==3.1 # via -r requirements.txt packaging==23.1 # via @@ -223,6 +223,7 @@ zope-event==5.0 # via gevent zope-interface==6.0 # via gevent - +schedule==1.2.0 + # via -r requirements.txt # The following packages are considered to be unsafe in a requirements file: # setuptools diff --git a/requirements/requirements-typing.in b/requirements/requirements-typing.in index c55f35548..1d11d3198 100644 --- a/requirements/requirements-typing.in +++ b/requirements/requirements-typing.in @@ -5,3 +5,4 @@ mypy types-jsonschema sentry-sdk types-cachetools +types-psutil diff --git a/requirements/requirements-typing.txt b/requirements/requirements-typing.txt index 95e3defdb..d707f02b2 100644 --- a/requirements/requirements-typing.txt +++ b/requirements/requirements-typing.txt @@ -29,3 +29,5 @@ urllib3==2.0.3 # via # -c requirements-dev.txt # sentry-sdk +types-psutil==5.9.5.16 + # via -r requirements-typing.in diff --git a/requirements/requirements.in b/requirements/requirements.in index 65b90c0ba..2650120e9 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -4,7 +4,7 @@ aiohttp<4 aiokafka<1 isodate<1 jsonschema<5 -networkx<3 +networkx<4 protobuf<4 python-dateutil<3 tenacity<9 @@ -14,6 +14,8 @@ watchfiles<1 xxhash~=3.3 rich~=12.6.0 cachetools==5.3.1 +schedule +psutil # Patched dependencies # diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 7112a7d12..9b948d8c8 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -53,7 +53,7 @@ multidict==6.0.4 # via # aiohttp # yarl -networkx==2.8.8 +networkx==3.1 # via -r requirements.in packaging==23.1 # via aiokafka @@ -91,3 +91,7 @@ xxhash==3.3.0 # via -r requirements.in yarl==1.9.2 # via aiohttp +schedule==1.2.0 + # via -r requirements.in +psutil==5.9.5 + # via -r requirements.in