From e3cb524df7487e54593d35e39bf77b75d03d1291 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Fri, 9 Jun 2023 10:08:14 +0300 Subject: [PATCH 01/19] Karapace metrics --- README.rst | 9 ++ karapace.config.json | 5 +- karapace/config.py | 3 + karapace/karapacemetrics.py | 169 +++++++++++++++++++++++++++ karapace/rapu.py | 16 +++ karapace/schema_registry.py | 2 + karapace/statsd.py | 15 ++- requirements/requirements-dev.txt | 16 +-- requirements/requirements-typing.txt | 6 +- requirements/requirements.in | 2 + requirements/requirements.txt | 6 +- 11 files changed, 235 insertions(+), 14 deletions(-) create mode 100644 karapace/karapacemetrics.py diff --git a/README.rst b/README.rst index 20e9fb786..23a50f92c 100644 --- a/README.rst +++ b/README.rst @@ -458,6 +458,15 @@ Keys to take special care are the ones needed to configure Kafka and advertised_ * - ``master_election_strategy`` - ``lowest`` - Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup) + * - ``metrics_mode`` + - ``statsd`` + - Statistics server mode. For karapace supports ststsd server + * - ``statsd_uri`` + - ``127.0.0.1:8125`` + - Host:Port of statsd server + * - ``metrics_extended`` + - ``true`` + - Enable extended metrics. Extended metrics: connections_active, request_size_avg, request_size_max, response_size_avg, response_size_max Authentication and authorization of Karapace Schema Registry REST API diff --git a/karapace.config.json b/karapace.config.json index 55303ff4d..bded35aa6 100644 --- a/karapace.config.json +++ b/karapace.config.json @@ -27,5 +27,8 @@ "registry_authfile": null, "topic_name": "_schemas", "protobuf_runtime_directory": "runtime", - "session_timeout_ms": 10000 + "session_timeout_ms": 10000, + "metrics_mode": "statsd", + "statsd_uri": "127.0.0.1:8125", + "metrics_extended": true } diff --git a/karapace/config.py b/karapace/config.py index ac094f5dd..7dcebe2b1 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -142,6 +142,9 @@ class ConfigDefaults(Config, total=False): "karapace_registry": False, "master_election_strategy": "lowest", "protobuf_runtime_directory": "runtime", + "metrics_mode": "statsd", + "statsd_uri": "127.0.0.1:8125", + "metrics_extended": True, } SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD] diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py new file mode 100644 index 000000000..2f8acb850 --- /dev/null +++ b/karapace/karapacemetrics.py @@ -0,0 +1,169 @@ +""" +karapace - metrics +Supports collection of system metrics +list of supported metrics: +connections-active - The number of active HTTP(S) connections to server. + Data collected inside aiohttp request handler. + +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from datetime import datetime +from kafka.metrics import MetricName, Metrics +from kafka.metrics.measurable_stat import AbstractMeasurableStat +from kafka.metrics.stats import Avg, Max, Rate, Total +from karapace.config import Config +from karapace.statsd import StatsClient +from typing import Dict, Optional + +import schedule +import threading +import time + + +class Value(AbstractMeasurableStat): + """ + An AbstractSampledStat that maintains a simple average over its samples. + """ + + def __init__(self) -> None: + super().__init__() + self.value = 0.0 + + # pylint: disable=unused-argument + def measure(self, config: object, now: int) -> float: + return self.value + + def record(self, config: object, value: float, time_ms: int) -> None: + self.value = value + + +class Singleton(type): + _instances: Dict["Singleton", "Singleton"] = {} + + def __call__(cls, *args: str, **kwargs: int) -> "Singleton": + if cls not in cls._instances: + instance = super().__call__(*args, **kwargs) + cls._instances[cls] = instance + return cls._instances[cls] + + +class KarapaceMetrics(metaclass=Singleton): + def __init__(self) -> None: + self.active: Optional[object] = None + self.stats_client: Optional[StatsClient] = None + self.is_ready = False + self.metrics = Metrics() + self.event = threading.Event() + self.worker_thread = threading.Thread(target=self.worker) + self.lock = threading.Lock() + + def setup(self, stats_client: StatsClient, config: Config) -> None: + self.active = config.get("metrics_extended") + if not self.active: + return + with self.lock: + if self.is_ready: + return + self.is_ready = True + + sensor = self.metrics.sensor("connections-active") + sensor.add(MetricName("connections-active", "kafka-metrics"), Total()) + + sensor = self.metrics.sensor("request-size") + sensor.add(MetricName("request-size-max", "kafka-metrics"), Max()) + sensor.add(MetricName("request-size-avg", "kafka-metrics"), Avg()) + + sensor = self.metrics.sensor("response-size") + sensor.add(MetricName("response-size-max", "kafka-metrics"), Max()) + sensor.add(MetricName("response-size-avg", "kafka-metrics"), Avg()) + + sensor = self.metrics.sensor("master-slave-role") + sensor.add(MetricName("master-slave-role", "kafka-metrics"), Value()) + + sensor = self.metrics.sensor("request-error-rate") + sensor.add(MetricName("request-error-rate", "kafka-metrics"), Rate()) + + sensor = self.metrics.sensor("request-rate") + sensor.add(MetricName("request-rate", "kafka-metrics"), Rate()) + + sensor = self.metrics.sensor("response-rate") + sensor.add(MetricName("response-rate", "kafka-metrics"), Rate()) + + sensor = self.metrics.sensor("response-byte-rate") + sensor.add(MetricName("response-byte-rate", "kafka-metrics"), Rate()) + + sensor = self.metrics.sensor("latency") + sensor.add(MetricName("latency-max", "kafka-metrics"), Max()) + sensor.add(MetricName("latency-avg", "kafka-metrics"), Avg()) + + self.stats_client = stats_client + + schedule.every(10).seconds.do(self.schedule) + + self.worker_thread.start() + + def connection(self) -> None: + if not self.active: + return + timestamp = int(datetime.utcnow().timestamp() * 1e3) + self.metrics.get_sensor("connections-active").record(1.0, timestamp) + + def request(self, size: int) -> None: + if not self.active: + return + timestamp = int(datetime.utcnow().timestamp() * 1e3) + self.metrics.get_sensor("request-size").record(size, timestamp) + self.metrics.get_sensor("request-rate").record(1, timestamp) + + def response(self, size: int) -> None: + if not self.active: + return + timestamp = int(datetime.utcnow().timestamp() * 1e3) + self.metrics.get_sensor("connections-active").record(-1.0, timestamp) + self.metrics.get_sensor("response-size").record(size, timestamp) + self.metrics.get_sensor("response-byte-rate").record(size, timestamp) + self.metrics.get_sensor("response-rate").record(1, timestamp) + + def are_we_master(self, is_master: bool) -> None: + if not self.active: + return + timestamp = int(datetime.utcnow().timestamp() * 1e3) + self.metrics.get_sensor("master-slave-role").record(int(is_master), timestamp) + + def latency(self, latency_ms: float) -> None: + if not self.active: + return + timestamp = int(datetime.utcnow().timestamp() * 1e3) + self.metrics.get_sensor("latency").record(latency_ms, timestamp) + + def error(self) -> None: + if not self.active: + return + timestamp = int(datetime.utcnow().timestamp() * 1e3) + self.metrics.get_sensor("request-error-rate").record(1, timestamp) + + def report(self) -> None: + if not self.active: + return + if isinstance(self.stats_client, StatsClient): + for metric_name in self.metrics.metrics: + value = self.metrics.metrics[metric_name].value() + self.stats_client.gauge(metric_name.name, value) + + def schedule(self) -> None: + self.report() + + def worker(self) -> None: + while True: + if self.event.is_set(): + break + schedule.run_pending() + time.sleep(1) + + def cleanup(self) -> None: + if not self.active: + return + self.report() + self.event.set() + self.worker_thread.join() diff --git a/karapace/rapu.py b/karapace/rapu.py index c5408ea7d..4a66e9d02 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -9,6 +9,7 @@ from accept_types import get_best_match from http import HTTPStatus from karapace.config import Config, create_server_ssl_context +from karapace.karapacemetrics import KarapaceMetrics from karapace.statsd import StatsClient from karapace.utils import json_decode, json_encode from karapace.version import __version__ @@ -134,6 +135,8 @@ def __init__( if content_type: self.headers["Content-Type"] = content_type super().__init__(f"HTTPResponse {status.value}") + if not is_success(status): + KarapaceMetrics().error() def ok(self) -> bool: """True if resposne has a 2xx status_code""" @@ -169,6 +172,7 @@ def __init__( self.stats = StatsClient(config=config) self.app.on_cleanup.append(self.close_by_app) self.not_ready_handler = not_ready_handler + KarapaceMetrics().setup(self.stats, config) async def close_by_app(self, app: aiohttp.web.Application) -> None: # pylint: disable=unused-argument await self.close() @@ -180,6 +184,7 @@ async def close(self) -> None: set as hook because the awaitables have to run inside the event loop created by the aiohttp library. """ + KarapaceMetrics().cleanup() self.stats.close() @staticmethod @@ -266,15 +271,23 @@ async def _handle_request( url=request.url, path_for_stats=path_for_stats, ) + + KarapaceMetrics().connection() try: if request.method == "OPTIONS": + # self.metrics.request(0) origin = request.headers.get("Origin") if not origin: raise HTTPResponse(body="OPTIONS missing Origin", status=HTTPStatus.BAD_REQUEST) headers = self.cors_and_server_headers_for_request(request=rapu_request, origin=origin) + raise HTTPResponse(body=b"", status=HTTPStatus.OK, headers=headers) body = await request.read() + if body: + KarapaceMetrics().request(len(body)) + else: + KarapaceMetrics().request(0) if json_request: if not body: raise HTTPResponse(body="Missing request JSON body", status=HTTPStatus.BAD_REQUEST) @@ -382,6 +395,7 @@ async def _handle_request( ) headers = {"Content-Type": "application/json"} resp = aiohttp.web.Response(body=body, status=status.value, headers=headers) + except asyncio.CancelledError: self.log.debug("Client closed connection") raise @@ -390,6 +404,8 @@ async def _handle_request( self.log.exception("Unexpected error handling user request: %s %s", request.method, request.url) resp = aiohttp.web.Response(text="Internal Server Error", status=HTTPStatus.INTERNAL_SERVER_ERROR.value) finally: + KarapaceMetrics().response(resp.content_length) + KarapaceMetrics().latency((time.monotonic() - start_time) * 1000) self.stats.timing( self.app_request_metric, time.monotonic() - start_time, diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index a745b7600..475faf9e0 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -22,6 +22,7 @@ VersionNotFoundException, ) from karapace.in_memory_database import InMemoryDatabase +from karapace.karapacemetrics import KarapaceMetrics from karapace.key_format import KeyFormatter from karapace.master_coordinator import MasterCoordinator from karapace.messaging import KarapaceProducer @@ -123,6 +124,7 @@ async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str | elif not ignore_readiness and self.schema_reader.ready is False: LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready) else: + KarapaceMetrics().are_we_master(are_we_master) return are_we_master, master_url await asyncio.sleep(1.0) diff --git a/karapace/statsd.py b/karapace/statsd.py index be37d92ef..1b4ab9bcd 100644 --- a/karapace/statsd.py +++ b/karapace/statsd.py @@ -19,6 +19,7 @@ import logging import socket import time +import urllib STATSD_HOST: Final = "127.0.0.1" STATSD_PORT: Final = 8125 @@ -32,7 +33,19 @@ def __init__( host: str = STATSD_HOST, port: int = STATSD_PORT, ) -> None: - self._dest_addr: Final = (host, port) + _host = host + _port = port + + if config.get("metrics_mode") == "statsd": + statsd_uri = config.get("statsd_uri") + if statsd_uri: + srv = urllib.parse.urlsplit("//" + str(statsd_uri)) + if srv.hostname: + _host = str(srv.hostname) + if srv.port: + _port = int(srv.port) + + self._dest_addr: Final = (_host, _port) self._socket: Final = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._tags: Final = config.get("tags", {}) self.sentry_client: Final = get_sentry_client(sentry_config=config.get("sentry", None)) diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index c0bdf48a9..54c85f3e8 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -8,7 +8,7 @@ accept-types==0.4.1 # via -r requirements.txt aiohttp==3.8.4 # via -r requirements.txt -aiokafka==0.8.0 +aiokafka==0.8.1 # via -r requirements.txt aiosignal==1.3.1 # via @@ -87,7 +87,7 @@ geventhttpclient==2.0.9 # via locust greenlet==2.0.2 # via gevent -hypothesis==6.75.7 +hypothesis==6.76.0 # via -r requirements-dev.in idna==3.4 # via @@ -113,7 +113,7 @@ kafka-python @ https://github.com/aiven/kafka-python/archive/1b95333c9628152066f # aiokafka locust==2.15.1 # via -r requirements-dev.in -markupsafe==2.1.2 +markupsafe==2.1.3 # via # jinja2 # werkzeug @@ -174,7 +174,9 @@ rich==12.5.1 # via -r requirements.txt roundrobin==0.0.4 # via locust -sentry-sdk==1.24.0 +schedule==1.2.0 + # via -r requirements.txt +sentry-sdk==1.25.1 # via -r requirements-dev.in six==1.16.0 # via @@ -194,20 +196,20 @@ tenacity==8.2.2 # via -r requirements.txt tomli==2.0.1 # via pytest -typing-extensions==4.6.2 +typing-extensions==4.6.3 # via # -r requirements.txt # locust # rich ujson==5.7.0 # via -r requirements.txt -urllib3==1.26.16 +urllib3==2.0.3 # via # requests # sentry-sdk watchfiles==0.19.0 # via -r requirements.txt -werkzeug==2.3.4 +werkzeug==2.3.5 # via # flask # locust diff --git a/requirements/requirements-typing.txt b/requirements/requirements-typing.txt index ea4657ecb..32bc10e69 100644 --- a/requirements/requirements-typing.txt +++ b/requirements/requirements-typing.txt @@ -12,7 +12,7 @@ mypy==1.3.0 # via -r requirements-typing.in mypy-extensions==1.0.0 # via mypy -sentry-sdk==1.24.0 +sentry-sdk==1.25.1 # via -r requirements-typing.in tomli==2.0.1 # via @@ -20,11 +20,11 @@ tomli==2.0.1 # mypy types-jsonschema==4.17.0.8 # via -r requirements-typing.in -typing-extensions==4.6.2 +typing-extensions==4.6.3 # via # -c requirements-dev.txt # mypy -urllib3==1.26.16 +urllib3==2.0.3 # via # -c requirements-dev.txt # sentry-sdk diff --git a/requirements/requirements.in b/requirements/requirements.in index 59ae8b39a..13de9a0a0 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -11,9 +11,11 @@ tenacity<9 typing-extensions ujson<6 watchfiles<1 +schedule xxhash~=3.0 rich~=12.5.0 + # Patched dependencies # # Note: It is important to use commits to reference patched dependencies. This diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 9bc49e18d..1537c3051 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -8,7 +8,7 @@ accept-types==0.4.1 # via -r requirements.in aiohttp==3.8.4 # via -r requirements.in -aiokafka==0.8.0 +aiokafka==0.8.1 # via -r requirements.in aiosignal==1.3.1 # via aiohttp @@ -64,6 +64,8 @@ python-dateutil==2.8.2 # via -r requirements.in rich==12.5.1 # via -r requirements.in +schedule==1.2.0 + # via -r requirements.in six==1.16.0 # via # isodate @@ -73,7 +75,7 @@ sniffio==1.3.0 # via anyio tenacity==8.2.2 # via -r requirements.in -typing-extensions==4.6.2 +typing-extensions==4.6.3 # via # -r requirements.in # rich From 8dab84dd341a0766c54f863a5ff1e3d491ced56d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Sat, 10 Jun 2023 11:14:36 +0300 Subject: [PATCH 02/19] fixup issues --- karapace/karapacemetrics.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 2f8acb850..8eca080ab 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -39,7 +39,7 @@ def record(self, config: object, value: float, time_ms: int) -> None: class Singleton(type): - _instances: Dict["Singleton", "Singleton"] = {} + _instances: Dict["type[Singleton]", "Singleton"] = {} def __call__(cls, *args: str, **kwargs: int) -> "Singleton": if cls not in cls._instances: @@ -54,7 +54,7 @@ def __init__(self) -> None: self.stats_client: Optional[StatsClient] = None self.is_ready = False self.metrics = Metrics() - self.event = threading.Event() + self.stop_event = threading.Event() self.worker_thread = threading.Thread(target=self.worker) self.lock = threading.Lock() @@ -99,7 +99,7 @@ def setup(self, stats_client: StatsClient, config: Config) -> None: self.stats_client = stats_client - schedule.every(10).seconds.do(self.schedule) + schedule.every(10).seconds.do(self.report) self.worker_thread.start() @@ -150,13 +150,12 @@ def report(self) -> None: for metric_name in self.metrics.metrics: value = self.metrics.metrics[metric_name].value() self.stats_client.gauge(metric_name.name, value) - - def schedule(self) -> None: - self.report() + else: + raise RuntimeError def worker(self) -> None: while True: - if self.event.is_set(): + if self.stop_event.is_set(): break schedule.run_pending() time.sleep(1) @@ -165,5 +164,5 @@ def cleanup(self) -> None: if not self.active: return self.report() - self.event.set() + self.stop_event.set() self.worker_thread.join() From 2898e318ca9bb29ebdec03a9e5ffb2b9cdea5b6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Sat, 10 Jun 2023 11:23:45 +0300 Subject: [PATCH 03/19] fixup issues --- karapace/karapacemetrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 8eca080ab..86ec9b893 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -39,7 +39,7 @@ def record(self, config: object, value: float, time_ms: int) -> None: class Singleton(type): - _instances: Dict["type[Singleton]", "Singleton"] = {} + _instances: Dict["Singleton", "Singleton"] = {} def __call__(cls, *args: str, **kwargs: int) -> "Singleton": if cls not in cls._instances: From c974579cb3f40f78fee02012c3d443e0cd62584f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Mon, 12 Jun 2023 12:59:59 +0300 Subject: [PATCH 04/19] fixup annotations issue --- karapace/karapacemetrics.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 86ec9b893..1c4042d35 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -8,13 +8,14 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from __future__ import annotations + from datetime import datetime from kafka.metrics import MetricName, Metrics from kafka.metrics.measurable_stat import AbstractMeasurableStat from kafka.metrics.stats import Avg, Max, Rate, Total from karapace.config import Config from karapace.statsd import StatsClient -from typing import Dict, Optional import schedule import threading @@ -39,9 +40,9 @@ def record(self, config: object, value: float, time_ms: int) -> None: class Singleton(type): - _instances: Dict["Singleton", "Singleton"] = {} + _instances: dict[type[object], Singleton] = {} - def __call__(cls, *args: str, **kwargs: int) -> "Singleton": + def __call__(cls, *args: str, **kwargs: int) -> Singleton: if cls not in cls._instances: instance = super().__call__(*args, **kwargs) cls._instances[cls] = instance @@ -50,8 +51,8 @@ def __call__(cls, *args: str, **kwargs: int) -> "Singleton": class KarapaceMetrics(metaclass=Singleton): def __init__(self) -> None: - self.active: Optional[object] = None - self.stats_client: Optional[StatsClient] = None + self.active: object | None = None + self.stats_client: StatsClient | None = None self.is_ready = False self.metrics = Metrics() self.stop_event = threading.Event() @@ -144,15 +145,13 @@ def error(self) -> None: self.metrics.get_sensor("request-error-rate").record(1, timestamp) def report(self) -> None: - if not self.active: - return - if isinstance(self.stats_client, StatsClient): - for metric_name in self.metrics.metrics: - value = self.metrics.metrics[metric_name].value() - self.stats_client.gauge(metric_name.name, value) - else: + if not self.active or not isinstance(self.stats_client, StatsClient): raise RuntimeError + for metric_name in self.metrics.metrics: + value = self.metrics.metrics[metric_name].value() + self.stats_client.gauge(metric_name.name, value) + def worker(self) -> None: while True: if self.stop_event.is_set(): From 7256f5d5d30404fcb71b83510c290b0ad3e475a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Mon, 12 Jun 2023 13:05:28 +0300 Subject: [PATCH 05/19] fixup exception message --- karapace/karapacemetrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 1c4042d35..73c4ea402 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -146,7 +146,7 @@ def error(self) -> None: def report(self) -> None: if not self.active or not isinstance(self.stats_client, StatsClient): - raise RuntimeError + raise RuntimeError("no StatsClient available") for metric_name in self.metrics.metrics: value = self.metrics.metrics[metric_name].value() From ab6ae9682c4c9ad37e723937a84ca40aff732413 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Mon, 12 Jun 2023 14:47:59 +0300 Subject: [PATCH 06/19] get rid of multiple instances of class --- karapace/karapacemetrics.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 73c4ea402..09a8779ef 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -40,13 +40,13 @@ def record(self, config: object, value: float, time_ms: int) -> None: class Singleton(type): - _instances: dict[type[object], Singleton] = {} + _instance: Singleton def __call__(cls, *args: str, **kwargs: int) -> Singleton: - if cls not in cls._instances: + if cls != cls._instance: instance = super().__call__(*args, **kwargs) - cls._instances[cls] = instance - return cls._instances[cls] + cls._instance = instance + return cls._instance class KarapaceMetrics(metaclass=Singleton): From 733d1f2583ec59a10bb93b68b53c62dcfa6acab9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Mon, 12 Jun 2023 15:23:13 +0300 Subject: [PATCH 07/19] fixup issue --- karapace/karapacemetrics.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 09a8779ef..3284d7b50 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -40,10 +40,10 @@ def record(self, config: object, value: float, time_ms: int) -> None: class Singleton(type): - _instance: Singleton + _instance: Singleton | None = None def __call__(cls, *args: str, **kwargs: int) -> Singleton: - if cls != cls._instance: + if cls._instance is None: instance = super().__call__(*args, **kwargs) cls._instance = instance return cls._instance From 8751eea67625a0ee4b5415844c6ca4a504247f44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Fri, 16 Jun 2023 12:25:29 +0300 Subject: [PATCH 08/19] change code to send raw data only --- karapace/karapacemetrics.py | 123 +++++++++++++----------------------- 1 file changed, 43 insertions(+), 80 deletions(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 3284d7b50..74c72bb05 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -11,34 +11,17 @@ from __future__ import annotations from datetime import datetime -from kafka.metrics import MetricName, Metrics -from kafka.metrics.measurable_stat import AbstractMeasurableStat -from kafka.metrics.stats import Avg, Max, Rate, Total +from kafka.metrics import Metrics from karapace.config import Config from karapace.statsd import StatsClient +import os +import psutil import schedule import threading import time -class Value(AbstractMeasurableStat): - """ - An AbstractSampledStat that maintains a simple average over its samples. - """ - - def __init__(self) -> None: - super().__init__() - self.value = 0.0 - - # pylint: disable=unused-argument - def measure(self, config: object, now: int) -> float: - return self.value - - def record(self, config: object, value: float, time_ms: int) -> None: - self.value = value - - class Singleton(type): _instance: Singleton | None = None @@ -58,6 +41,9 @@ def __init__(self) -> None: self.stop_event = threading.Event() self.worker_thread = threading.Thread(target=self.worker) self.lock = threading.Lock() + self.error_count = 0 + self.app_host = "" + self.app_port = 8081 def setup(self, stats_client: StatsClient, config: Config) -> None: self.active = config.get("metrics_extended") @@ -67,90 +53,68 @@ def setup(self, stats_client: StatsClient, config: Config) -> None: if self.is_ready: return self.is_ready = True + if self.stats_client: + self.stats_client = stats_client + else: + self.active = False + return + app_host = config.get("host") + app_port = config.get("port") + if app_host and app_port: + self.app_host = app_host + self.app_port = app_port + else: + raise RuntimeError("No application host or port defined in application") - sensor = self.metrics.sensor("connections-active") - sensor.add(MetricName("connections-active", "kafka-metrics"), Total()) - - sensor = self.metrics.sensor("request-size") - sensor.add(MetricName("request-size-max", "kafka-metrics"), Max()) - sensor.add(MetricName("request-size-avg", "kafka-metrics"), Avg()) - - sensor = self.metrics.sensor("response-size") - sensor.add(MetricName("response-size-max", "kafka-metrics"), Max()) - sensor.add(MetricName("response-size-avg", "kafka-metrics"), Avg()) - - sensor = self.metrics.sensor("master-slave-role") - sensor.add(MetricName("master-slave-role", "kafka-metrics"), Value()) - - sensor = self.metrics.sensor("request-error-rate") - sensor.add(MetricName("request-error-rate", "kafka-metrics"), Rate()) - - sensor = self.metrics.sensor("request-rate") - sensor.add(MetricName("request-rate", "kafka-metrics"), Rate()) - - sensor = self.metrics.sensor("response-rate") - sensor.add(MetricName("response-rate", "kafka-metrics"), Rate()) - - sensor = self.metrics.sensor("response-byte-rate") - sensor.add(MetricName("response-byte-rate", "kafka-metrics"), Rate()) - - sensor = self.metrics.sensor("latency") - sensor.add(MetricName("latency-max", "kafka-metrics"), Max()) - sensor.add(MetricName("latency-avg", "kafka-metrics"), Avg()) - - self.stats_client = stats_client - - schedule.every(10).seconds.do(self.report) + schedule.every(10).seconds.do(self.connections) self.worker_thread.start() - def connection(self) -> None: - if not self.active: - return - timestamp = int(datetime.utcnow().timestamp() * 1e3) - self.metrics.get_sensor("connections-active").record(1.0, timestamp) - def request(self, size: int) -> None: if not self.active: return - timestamp = int(datetime.utcnow().timestamp() * 1e3) - self.metrics.get_sensor("request-size").record(size, timestamp) - self.metrics.get_sensor("request-rate").record(1, timestamp) + if not isinstance(self.stats_client, StatsClient): + raise RuntimeError("no StatsClient available") + self.stats_client.gauge("request-size", size) def response(self, size: int) -> None: if not self.active: return - timestamp = int(datetime.utcnow().timestamp() * 1e3) - self.metrics.get_sensor("connections-active").record(-1.0, timestamp) - self.metrics.get_sensor("response-size").record(size, timestamp) - self.metrics.get_sensor("response-byte-rate").record(size, timestamp) - self.metrics.get_sensor("response-rate").record(1, timestamp) + if not isinstance(self.stats_client, StatsClient): + raise RuntimeError("no StatsClient available") + self.stats_client.gauge("response-size", size) def are_we_master(self, is_master: bool) -> None: if not self.active: return - timestamp = int(datetime.utcnow().timestamp() * 1e3) - self.metrics.get_sensor("master-slave-role").record(int(is_master), timestamp) + self.stats_client.gauge("master-slave-role", int(is_master)) def latency(self, latency_ms: float) -> None: if not self.active: return - timestamp = int(datetime.utcnow().timestamp() * 1e3) - self.metrics.get_sensor("latency").record(latency_ms, timestamp) + if not isinstance(self.stats_client, StatsClient): + raise RuntimeError("no StatsClient available") + self.stats_client.gauge("master-slave-role", latency_ms) def error(self) -> None: if not self.active: return - timestamp = int(datetime.utcnow().timestamp() * 1e3) - self.metrics.get_sensor("request-error-rate").record(1, timestamp) - - def report(self) -> None: - if not self.active or not isinstance(self.stats_client, StatsClient): + if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") + self.error_count += 1 + self.stats_client.gauge("error", self.error_count) - for metric_name in self.metrics.metrics: - value = self.metrics.metrics[metric_name].value() - self.stats_client.gauge(metric_name.name, value) + def connections(self) -> None: + if not self.active: + return + if not isinstance(self.stats_client, StatsClient): + raise RuntimeError("no StatsClient available") + psutil.Process(os.getpid()).connections() + connections = 0 + for conn in psutil.net_connections(kind="tcp"): + if conn.laddr[0] == self.app_host and conn.laddr[1] == self.app_port and conn.status == "ESTABLISHED": + connections += 1 + self.stats_client.gauge("connections-active", connections) def worker(self) -> None: while True: @@ -162,6 +126,5 @@ def worker(self) -> None: def cleanup(self) -> None: if not self.active: return - self.report() self.stop_event.set() self.worker_thread.join() From fedff8f3d9b2b0f2845df28ef25b5b9567aa0d80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Thu, 22 Jun 2023 10:22:52 +0300 Subject: [PATCH 09/19] fixup --- karapace/karapacemetrics.py | 1 - 1 file changed, 1 deletion(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 74c72bb05..c769355ba 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -10,7 +10,6 @@ """ from __future__ import annotations -from datetime import datetime from kafka.metrics import Metrics from karapace.config import Config from karapace.statsd import StatsClient From b70ae03a5efde731294e6eed746b61eac35eb548 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Thu, 22 Jun 2023 11:21:06 +0300 Subject: [PATCH 10/19] fixup code --- karapace/karapacemetrics.py | 4 ++++ requirements/requirements-dev.txt | 7 ++++--- requirements/requirements-typing.txt | 2 +- requirements/requirements.in | 1 + requirements/requirements.txt | 2 ++ 5 files changed, 12 insertions(+), 4 deletions(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index c769355ba..19ab05c09 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -86,6 +86,8 @@ def response(self, size: int) -> None: def are_we_master(self, is_master: bool) -> None: if not self.active: return + if not isinstance(self.stats_client, StatsClient): + raise RuntimeError("no StatsClient available") self.stats_client.gauge("master-slave-role", int(is_master)) def latency(self, latency_ms: float) -> None: @@ -111,6 +113,8 @@ def connections(self) -> None: psutil.Process(os.getpid()).connections() connections = 0 for conn in psutil.net_connections(kind="tcp"): + if not conn.laddr: + continue if conn.laddr[0] == self.app_host and conn.laddr[1] == self.app_port and conn.status == "ESTABLISHED": connections += 1 self.stats_client.gauge("connections-active", connections) diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 7919157c2..eb8948203 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -87,7 +87,7 @@ geventhttpclient==2.0.9 # via locust greenlet==2.0.2 # via gevent -hypothesis==6.78.3 +hypothesis==6.79.1 # via -r requirements-dev.in idna==3.4 # via @@ -95,7 +95,7 @@ idna==3.4 # anyio # requests # yarl -importlib-metadata==6.6.0 +importlib-metadata==6.7.0 # via flask importlib-resources==5.12.0 # via @@ -141,13 +141,14 @@ pkgutil-resolve-name==1.3.10 # via # -r requirements.txt # jsonschema -pluggy==1.0.0 +pluggy==1.2.0 # via pytest protobuf==3.20.3 # via -r requirements.txt psutil==5.9.5 # via # -r requirements-dev.in + # -r requirements.txt # locust # pytest-xdist pygments==2.15.1 diff --git a/requirements/requirements-typing.txt b/requirements/requirements-typing.txt index 32bc10e69..3659aec84 100644 --- a/requirements/requirements-typing.txt +++ b/requirements/requirements-typing.txt @@ -8,7 +8,7 @@ certifi==2023.5.7 # via # -c requirements-dev.txt # sentry-sdk -mypy==1.3.0 +mypy==1.4.0 # via -r requirements-typing.in mypy-extensions==1.0.0 # via mypy diff --git a/requirements/requirements.in b/requirements/requirements.in index 806e20b6f..fbee224b5 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -14,6 +14,7 @@ watchfiles<1 schedule xxhash~=3.0 rich~=12.5.0 +psutil # Patched dependencies diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 671753e9a..ffbeb28a0 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -60,6 +60,8 @@ pkgutil-resolve-name==1.3.10 # via jsonschema protobuf==3.20.3 # via -r requirements.in +psutil==5.9.5 + # via -r requirements.in pygments==2.15.1 # via rich pyrsistent==0.19.3 From a0387a35759b750b11c2476061c553f0bd55bf39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Thu, 22 Jun 2023 23:28:19 +0300 Subject: [PATCH 11/19] fixup --- karapace/karapacemetrics.py | 9 +++++---- karapace/rapu.py | 1 - 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 19ab05c09..47af8971c 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -33,7 +33,7 @@ def __call__(cls, *args: str, **kwargs: int) -> Singleton: class KarapaceMetrics(metaclass=Singleton): def __init__(self) -> None: - self.active: object | None = None + self.active = False self.stats_client: StatsClient | None = None self.is_ready = False self.metrics = Metrics() @@ -52,7 +52,7 @@ def setup(self, stats_client: StatsClient, config: Config) -> None: if self.is_ready: return self.is_ready = True - if self.stats_client: + if not self.stats_client: self.stats_client = stats_client else: self.active = False @@ -66,7 +66,6 @@ def setup(self, stats_client: StatsClient, config: Config) -> None: raise RuntimeError("No application host or port defined in application") schedule.every(10).seconds.do(self.connections) - self.worker_thread.start() def request(self, size: int) -> None: @@ -130,4 +129,6 @@ def cleanup(self) -> None: if not self.active: return self.stop_event.set() - self.worker_thread.join() + if self.worker_thread.is_alive(): + self.worker_thread.join() + diff --git a/karapace/rapu.py b/karapace/rapu.py index fb9d1fe31..677918b97 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -275,7 +275,6 @@ async def _handle_request( path_for_stats=path_for_stats, ) - KarapaceMetrics().connection() try: if request.method == "OPTIONS": # self.metrics.request(0) From 358facc614ef67fa017676a9b515c281d7d03f81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Thu, 22 Jun 2023 23:48:53 +0300 Subject: [PATCH 12/19] fixup --- karapace/karapacemetrics.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 47af8971c..d73a136e2 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -33,7 +33,7 @@ def __call__(cls, *args: str, **kwargs: int) -> Singleton: class KarapaceMetrics(metaclass=Singleton): def __init__(self) -> None: - self.active = False + self.active: object | None = None self.stats_client: StatsClient | None = None self.is_ready = False self.metrics = Metrics() @@ -131,4 +131,3 @@ def cleanup(self) -> None: self.stop_event.set() if self.worker_thread.is_alive(): self.worker_thread.join() - From 8533959346beed87644af522b53caf66a1fd7290 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Tue, 8 Aug 2023 16:27:16 +0300 Subject: [PATCH 13/19] improve code by request --- README.rst | 15 +++++++-------- karapace.config.json | 6 +++--- karapace/config.py | 7 +++++-- karapace/karapacemetrics.py | 30 ++++++++---------------------- karapace/rapu.py | 3 +-- karapace/statsd.py | 15 ++------------- 6 files changed, 26 insertions(+), 50 deletions(-) diff --git a/README.rst b/README.rst index ecfcd28a9..56c0ae89a 100644 --- a/README.rst +++ b/README.rst @@ -461,16 +461,15 @@ Keys to take special care are the ones needed to configure Kafka and advertised_ * - ``master_election_strategy`` - ``lowest`` - Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup) - * - ``metrics_mode`` - - ``statsd`` - - Statistics server mode. For karapace supports ststsd server - * - ``statsd_uri`` - - ``127.0.0.1:8125`` - - Host:Port of statsd server * - ``metrics_extended`` - ``true`` - - Enable extended metrics. Extended metrics: connections_active, request_size_avg, request_size_max, response_size_avg, response_size_max - + - Enable extended metrics. Extended metrics: connections_active, [request|response]_size + * - ``statsd_host`` + - ``127.0.0.1`` + - Host of statsd server + * - ``statsd_port`` + - ``8125`` + - Port of statsd server Authentication and authorization of Karapace Schema Registry REST API ===================================================================== diff --git a/karapace.config.json b/karapace.config.json index bded35aa6..3a4fe0a43 100644 --- a/karapace.config.json +++ b/karapace.config.json @@ -28,7 +28,7 @@ "topic_name": "_schemas", "protobuf_runtime_directory": "runtime", "session_timeout_ms": 10000, - "metrics_mode": "statsd", - "statsd_uri": "127.0.0.1:8125", - "metrics_extended": true + "metrics_extended": true, + "statsd_host": "127.0.0.1", + "statsd_port": 8125 } diff --git a/karapace/config.py b/karapace/config.py index e56217f1e..4725c3a76 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -75,6 +75,9 @@ class Config(TypedDict): karapace_registry: bool master_election_strategy: str protobuf_runtime_directory: str + metrics_extended: bool + statsd_host: str + statsd_port: int sentry: NotRequired[Mapping[str, object]] tags: NotRequired[Mapping[str, object]] @@ -143,9 +146,9 @@ class ConfigDefaults(Config, total=False): "karapace_registry": False, "master_election_strategy": "lowest", "protobuf_runtime_directory": "runtime", - "metrics_mode": "statsd", - "statsd_uri": "127.0.0.1:8125", "metrics_extended": True, + "statsd_host": "127.0.0.1", + "statsd_port": 8125, } SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD] diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index d73a136e2..8f90436f9 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -10,7 +10,6 @@ """ from __future__ import annotations -from kafka.metrics import Metrics from karapace.config import Config from karapace.statsd import StatsClient @@ -33,19 +32,15 @@ def __call__(cls, *args: str, **kwargs: int) -> Singleton: class KarapaceMetrics(metaclass=Singleton): def __init__(self) -> None: - self.active: object | None = None + self.active = False self.stats_client: StatsClient | None = None self.is_ready = False - self.metrics = Metrics() self.stop_event = threading.Event() self.worker_thread = threading.Thread(target=self.worker) self.lock = threading.Lock() - self.error_count = 0 - self.app_host = "" - self.app_port = 8081 def setup(self, stats_client: StatsClient, config: Config) -> None: - self.active = config.get("metrics_extended") + self.active = config.get("metrics_extended") or False if not self.active: return with self.lock: @@ -57,13 +52,6 @@ def setup(self, stats_client: StatsClient, config: Config) -> None: else: self.active = False return - app_host = config.get("host") - app_port = config.get("port") - if app_host and app_port: - self.app_host = app_host - self.app_port = app_port - else: - raise RuntimeError("No application host or port defined in application") schedule.every(10).seconds.do(self.connections) self.worker_thread.start() @@ -94,27 +82,25 @@ def latency(self, latency_ms: float) -> None: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") - self.stats_client.gauge("master-slave-role", latency_ms) + self.stats_client.timing("latency_ms", latency_ms) def error(self) -> None: if not self.active: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") - self.error_count += 1 - self.stats_client.gauge("error", self.error_count) + self.stats_client.increase("error_total", 1) def connections(self) -> None: if not self.active: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") - psutil.Process(os.getpid()).connections() connections = 0 - for conn in psutil.net_connections(kind="tcp"): - if not conn.laddr: - continue - if conn.laddr[0] == self.app_host and conn.laddr[1] == self.app_port and conn.status == "ESTABLISHED": + karapace_proc = psutil.Process(os.getpid()) + + for conn in karapace_proc.connections(kind="tcp"): + if conn.laddr and conn.status == "ESTABLISHED": connections += 1 self.stats_client.gauge("connections-active", connections) diff --git a/karapace/rapu.py b/karapace/rapu.py index 677918b97..ecf53963d 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -19,7 +19,7 @@ import aiohttp.web import aiohttp.web_exceptions import asyncio -import cgi # pylint: disable=deprecated-module +import cgi import hashlib import logging import re @@ -277,7 +277,6 @@ async def _handle_request( try: if request.method == "OPTIONS": - # self.metrics.request(0) origin = request.headers.get("Origin") if not origin: raise HTTPResponse(body="OPTIONS missing Origin", status=HTTPStatus.BAD_REQUEST) diff --git a/karapace/statsd.py b/karapace/statsd.py index 1b4ab9bcd..8115f76f9 100644 --- a/karapace/statsd.py +++ b/karapace/statsd.py @@ -19,7 +19,6 @@ import logging import socket import time -import urllib STATSD_HOST: Final = "127.0.0.1" STATSD_PORT: Final = 8125 @@ -33,18 +32,8 @@ def __init__( host: str = STATSD_HOST, port: int = STATSD_PORT, ) -> None: - _host = host - _port = port - - if config.get("metrics_mode") == "statsd": - statsd_uri = config.get("statsd_uri") - if statsd_uri: - srv = urllib.parse.urlsplit("//" + str(statsd_uri)) - if srv.hostname: - _host = str(srv.hostname) - if srv.port: - _port = int(srv.port) - + _host = config.get("statsd_host") if "statsd_host" in config else host + _port = config.get("statsd_port") if "statsd_port" in config else port self._dest_addr: Final = (_host, _port) self._socket: Final = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._tags: Final = config.get("tags", {}) From 90e221cde5f49ec4edd51e6308afd09fbc9668f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Tue, 8 Aug 2023 16:43:22 +0300 Subject: [PATCH 14/19] add psutil typing support --- requirements/requirements-typing.in | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements/requirements-typing.in b/requirements/requirements-typing.in index c55f35548..1d11d3198 100644 --- a/requirements/requirements-typing.in +++ b/requirements/requirements-typing.in @@ -5,3 +5,4 @@ mypy types-jsonschema sentry-sdk types-cachetools +types-psutil From 4c485766b978ffce6f3f99d0ec01f1fa93721d92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Tue, 8 Aug 2023 20:33:27 +0300 Subject: [PATCH 15/19] fixup --- karapace/rapu.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karapace/rapu.py b/karapace/rapu.py index ecf53963d..d5c827b3a 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -19,7 +19,7 @@ import aiohttp.web import aiohttp.web_exceptions import asyncio -import cgi +import cgi # pylint: disable=deprecated-module import hashlib import logging import re From f9cb6d8d47f02179678c6e5638293fdfc079f86c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CSergiy?= Date: Tue, 8 Aug 2023 20:39:26 +0300 Subject: [PATCH 16/19] fixup --- karapace/rapu.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karapace/rapu.py b/karapace/rapu.py index d5c827b3a..c506445a4 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -19,7 +19,7 @@ import aiohttp.web import aiohttp.web_exceptions import asyncio -import cgi # pylint: disable=deprecated-module +import cgi # pylint: disable=deprecated-module import hashlib import logging import re From 0c73a1a5e3c15fda0d4895d01f2fbfca84e302e9 Mon Sep 17 00:00:00 2001 From: libretto Date: Sat, 2 Sep 2023 20:44:23 +0300 Subject: [PATCH 17/19] refactor --- karapace/{karapacemetrics.py => metrics.py} | 2 +- karapace/rapu.py | 16 ++++++++-------- karapace/schema_registry.py | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) rename karapace/{karapacemetrics.py => metrics.py} (98%) diff --git a/karapace/karapacemetrics.py b/karapace/metrics.py similarity index 98% rename from karapace/karapacemetrics.py rename to karapace/metrics.py index 8f90436f9..bf45bec3a 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/metrics.py @@ -30,7 +30,7 @@ def __call__(cls, *args: str, **kwargs: int) -> Singleton: return cls._instance -class KarapaceMetrics(metaclass=Singleton): +class Metrics(metaclass=Singleton): def __init__(self) -> None: self.active = False self.stats_client: StatsClient | None = None diff --git a/karapace/rapu.py b/karapace/rapu.py index c506445a4..5978d3c30 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -9,7 +9,7 @@ from accept_types import get_best_match from http import HTTPStatus from karapace.config import Config, create_server_ssl_context -from karapace.karapacemetrics import KarapaceMetrics +from karapace.metrics import Metrics from karapace.statsd import StatsClient from karapace.utils import json_decode, json_encode from karapace.version import __version__ @@ -136,7 +136,7 @@ def __init__( self.headers["Content-Type"] = content_type super().__init__(f"HTTPResponse {status.value}") if not is_success(status): - KarapaceMetrics().error() + Metrics().error() def ok(self) -> bool: """True if resposne has a 2xx status_code""" @@ -172,7 +172,7 @@ def __init__( self.stats = StatsClient(config=config) self.app.on_cleanup.append(self.close_by_app) self.not_ready_handler = not_ready_handler - KarapaceMetrics().setup(self.stats, config) + Metrics().setup(self.stats, config) def _create_aiohttp_application(self, *, config: Config) -> aiohttp.web.Application: return aiohttp.web.Application(client_max_size=config["http_request_max_size"]) @@ -187,7 +187,7 @@ async def close(self) -> None: set as hook because the awaitables have to run inside the event loop created by the aiohttp library. """ - KarapaceMetrics().cleanup() + Metrics().cleanup() self.stats.close() @staticmethod @@ -286,9 +286,9 @@ async def _handle_request( body = await request.read() if body: - KarapaceMetrics().request(len(body)) + Metrics().request(len(body)) else: - KarapaceMetrics().request(0) + Metrics().request(0) if json_request: if not body: raise HTTPResponse(body="Missing request JSON body", status=HTTPStatus.BAD_REQUEST) @@ -405,8 +405,8 @@ async def _handle_request( self.log.exception("Unexpected error handling user request: %s %s", request.method, request.url) resp = aiohttp.web.Response(text="Internal Server Error", status=HTTPStatus.INTERNAL_SERVER_ERROR.value) finally: - KarapaceMetrics().response(resp.content_length) - KarapaceMetrics().latency((time.monotonic() - start_time) * 1000) + Metrics().response(resp.content_length) + Metrics().latency((time.monotonic() - start_time) * 1000) self.stats.timing( self.app_request_metric, time.monotonic() - start_time, diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index 8f251d3d8..804d7495e 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -22,7 +22,7 @@ VersionNotFoundException, ) from karapace.in_memory_database import InMemoryDatabase -from karapace.karapacemetrics import KarapaceMetrics +from karapace.metrics import Metrics from karapace.key_format import KeyFormatter from karapace.master_coordinator import MasterCoordinator from karapace.messaging import KarapaceProducer @@ -124,7 +124,7 @@ async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str | elif not ignore_readiness and self.schema_reader.ready is False: LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready) else: - KarapaceMetrics().are_we_master(are_we_master) + Metrics().are_we_master(are_we_master) return are_we_master, master_url await asyncio.sleep(1.0) From c495c506dcdec57e8b3e4371ec4bd86b926450da Mon Sep 17 00:00:00 2001 From: libretto Date: Sat, 2 Sep 2023 20:46:19 +0300 Subject: [PATCH 18/19] fixup --- karapace/schema_registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index 804d7495e..e29e167c7 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -22,10 +22,10 @@ VersionNotFoundException, ) from karapace.in_memory_database import InMemoryDatabase -from karapace.metrics import Metrics from karapace.key_format import KeyFormatter from karapace.master_coordinator import MasterCoordinator from karapace.messaging import KarapaceProducer +from karapace.metrics import Metrics from karapace.offset_watcher import OffsetWatcher from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema from karapace.schema_reader import KafkaSchemaReader From e3a89a15fad05394ab4ab8893309a2cf25f67bf9 Mon Sep 17 00:00:00 2001 From: libretto Date: Sat, 16 Sep 2023 22:35:25 +0300 Subject: [PATCH 19/19] fixup requirements --- requirements/requirements-dev.txt | 57 ++++++++-------------------- requirements/requirements-typing.txt | 10 ++--- requirements/requirements.in | 7 ++-- requirements/requirements.txt | 38 +++++++------------ 4 files changed, 35 insertions(+), 77 deletions(-) diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index fedc61ebf..dffa1bd6e 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -14,7 +14,7 @@ aiosignal==1.3.1 # via # -r requirements.txt # aiohttp -anyio==3.7.1 +anyio==3.7.0 # via # -r requirements.txt # watchfiles @@ -43,26 +43,20 @@ certifi==2023.7.22 # geventhttpclient # requests # sentry-sdk -charset-normalizer==3.2.0 +charset-normalizer==3.1.0 # via # -r requirements.txt # aiohttp # requests -click==8.1.6 +click==8.1.3 # via flask commonmark==0.9.1 # via # -r requirements.txt # rich -configargparse==1.7 +configargparse==1.5.3 # via locust -exceptiongroup==1.1.2 - # via - # -r requirements.txt - # anyio - # hypothesis - # pytest -execnet==2.0.2 +execnet==1.9.0 # via pytest-xdist fancycompleter==0.9.1 # via pdbpp @@ -77,12 +71,12 @@ flask-basicauth==0.2.0 # via locust flask-cors==4.0.0 # via locust -frozenlist==1.4.0 +frozenlist==1.3.3 # via # -r requirements.txt # aiohttp # aiosignal -gevent==23.7.0 +gevent==22.10.2 # via # geventhttpclient # locust @@ -98,13 +92,6 @@ idna==3.4 # anyio # requests # yarl -importlib-metadata==6.8.0 - # via flask -importlib-resources==6.0.1 - # via - # -r requirements.txt - # jsonschema - # jsonschema-specifications iniconfig==2.0.0 # via pytest isodate==0.6.1 @@ -113,7 +100,7 @@ itsdangerous==2.1.2 # via flask jinja2==3.1.2 # via flask -jsonschema==4.19.0 +jsonschema==4.18.4 # via -r requirements.txt jsonschema-specifications==2023.7.1 # via @@ -136,7 +123,7 @@ multidict==6.0.4 # -r requirements.txt # aiohttp # yarl -networkx==2.8.8 +networkx==3.1 # via -r requirements.txt packaging==23.1 # via @@ -145,10 +132,6 @@ packaging==23.1 # pytest pdbpp==0.10.3 # via -r requirements-dev.in -pkgutil-resolve-name==1.3.10 - # via - # -r requirements.txt - # jsonschema pluggy==1.2.0 # via pytest protobuf==3.20.3 @@ -156,10 +139,9 @@ protobuf==3.20.3 psutil==5.9.5 # via # -r requirements-dev.in - # -r requirements.txt # locust # pytest-xdist -pygments==2.16.1 +pygments==2.15.1 # via # -r requirements.txt # pdbpp @@ -179,7 +161,7 @@ python-dateutil==2.8.2 # via -r requirements.txt pyzmq==25.1.0 # via locust -referencing==0.30.2 +referencing==0.30.0 # via # -r requirements.txt # jsonschema @@ -197,8 +179,6 @@ rpds-py==0.9.2 # -r requirements.txt # jsonschema # referencing -schedule==1.2.0 - # via -r requirements.txt sentry-sdk==1.30.0 # via -r requirements-dev.in six==1.16.0 @@ -215,16 +195,13 @@ sortedcontainers==2.4.0 # via hypothesis tenacity==8.2.2 # via -r requirements.txt -tomli==2.0.1 - # via pytest -typing-extensions==4.7.1 +typing-extensions==4.6.3 # via # -r requirements.txt # locust - # rich ujson==5.8.0 # via -r requirements.txt -urllib3==2.0.4 +urllib3==2.0.3 # via # requests # sentry-sdk @@ -242,15 +219,11 @@ yarl==1.9.2 # via # -r requirements.txt # aiohttp -zipp==3.16.2 - # via - # -r requirements.txt - # importlib-metadata - # importlib-resources zope-event==5.0 # via gevent zope-interface==6.0 # via gevent - +schedule==1.2.0 + # via -r requirements.txt # The following packages are considered to be unsafe in a requirements file: # setuptools diff --git a/requirements/requirements-typing.txt b/requirements/requirements-typing.txt index 8e58e4e53..d707f02b2 100644 --- a/requirements/requirements-typing.txt +++ b/requirements/requirements-typing.txt @@ -16,20 +16,18 @@ sentry-sdk==1.30.0 # via # -c requirements-dev.txt # -r requirements-typing.in -tomli==2.0.1 - # via - # -c requirements-dev.txt - # mypy types-cachetools==5.3.0.6 # via -r requirements-typing.in types-jsonschema==4.17.0.10 # via -r requirements-typing.in -typing-extensions==4.7.1 +typing-extensions==4.6.3 # via # -c requirements-dev.txt # -c requirements.txt # mypy -urllib3==2.0.4 +urllib3==2.0.3 # via # -c requirements-dev.txt # sentry-sdk +types-psutil==5.9.5.16 + # via -r requirements-typing.in diff --git a/requirements/requirements.in b/requirements/requirements.in index 8e6893e2b..2650120e9 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -4,19 +4,18 @@ aiohttp<4 aiokafka<1 isodate<1 jsonschema<5 -networkx<3 +networkx<4 protobuf<4 python-dateutil<3 tenacity<9 typing-extensions ujson<6 watchfiles<1 -schedule -psutil xxhash~=3.3 rich~=12.6.0 cachetools==5.3.1 - +schedule +psutil # Patched dependencies # diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 7fe54d377..9b948d8c8 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -12,7 +12,7 @@ aiokafka==0.8.1 # via -r requirements.in aiosignal==1.3.1 # via aiohttp -anyio==3.7.1 +anyio==3.7.0 # via watchfiles async-timeout==4.0.2 # via @@ -27,13 +27,11 @@ avro @ https://github.com/aiven/avro/archive/5a82d57f2a650fd87c819a30e433f1abb2c # via -r requirements.in cachetools==5.3.1 # via -r requirements.in -charset-normalizer==3.2.0 +charset-normalizer==3.1.0 # via aiohttp commonmark==0.9.1 # via rich -exceptiongroup==1.1.2 - # via anyio -frozenlist==1.4.0 +frozenlist==1.3.3 # via # aiohttp # aiosignal @@ -41,13 +39,9 @@ idna==3.4 # via # anyio # yarl -importlib-resources==6.0.1 - # via - # jsonschema - # jsonschema-specifications isodate==0.6.1 # via -r requirements.in -jsonschema==4.19.0 +jsonschema==4.18.4 # via -r requirements.in jsonschema-specifications==2023.7.1 # via jsonschema @@ -59,21 +53,17 @@ multidict==6.0.4 # via # aiohttp # yarl -networkx==2.8.8 +networkx==3.1 # via -r requirements.in packaging==23.1 # via aiokafka -pkgutil-resolve-name==1.3.10 - # via jsonschema protobuf==3.20.3 # via -r requirements.in -psutil==5.9.5 - # via -r requirements.in -pygments==2.16.1 +pygments==2.15.1 # via rich python-dateutil==2.8.2 # via -r requirements.in -referencing==0.30.2 +referencing==0.30.0 # via # jsonschema # jsonschema-specifications @@ -83,8 +73,6 @@ rpds-py==0.9.2 # via # jsonschema # referencing -schedule==1.2.0 - # via -r requirements.in six==1.16.0 # via # isodate @@ -93,10 +81,8 @@ sniffio==1.3.0 # via anyio tenacity==8.2.2 # via -r requirements.in -typing-extensions==4.7.1 - # via - # -r requirements.in - # rich +typing-extensions==4.6.3 + # via -r requirements.in ujson==5.8.0 # via -r requirements.in watchfiles==0.20.0 @@ -105,5 +91,7 @@ xxhash==3.3.0 # via -r requirements.in yarl==1.9.2 # via aiohttp -zipp==3.16.2 - # via importlib-resources +schedule==1.2.0 + # via -r requirements.in +psutil==5.9.5 + # via -r requirements.in