From 48ce427deefb1b5f5be217d900504b3a56d7a75e Mon Sep 17 00:00:00 2001 From: Adrian Date: Wed, 16 Oct 2024 13:29:01 -0600 Subject: [PATCH] Created prometheus handler class (#790) Co-authored-by: Rohit Vinnakota <148245014+rohitvinnakota-codecov@users.noreply.github.com> --- helpers/checkpoint_logger/__init__.py | 83 ++++------------------ helpers/checkpoint_logger/prometheus.py | 94 +++++++++++++++++++++++++ 2 files changed, 107 insertions(+), 70 deletions(-) create mode 100644 helpers/checkpoint_logger/prometheus.py diff --git a/helpers/checkpoint_logger/__init__.py b/helpers/checkpoint_logger/__init__.py index 8f2b7c5ce..5ce72080c 100644 --- a/helpers/checkpoint_logger/__init__.py +++ b/helpers/checkpoint_logger/__init__.py @@ -17,79 +17,22 @@ ) import sentry_sdk -from shared.metrics import Counter, Histogram, metrics +from shared.metrics import metrics -logger = logging.getLogger(__name__) +from helpers.checkpoint_logger.prometheus import PROMETHEUS_HANDLER +logger = logging.getLogger(__name__) T = TypeVar("T", bound="BaseFlow") TSubflows: TypeAlias = Mapping[T, Iterable[tuple[str, T]]] -CHECKPOINTS_TOTAL_BEGUN = Counter( - "worker_checkpoints_begun", - "Total number of times a flow's first checkpoint was logged.", - ["flow"], -) -CHECKPOINTS_TOTAL_SUCCEEDED = Counter( - "worker_checkpoints_succeeded", - "Total number of times one of a flow's success checkpoints was logged.", - ["flow"], -) -CHECKPOINTS_TOTAL_FAILED = Counter( - "worker_checkpoints_failed", - "Total number of times one of a flow's failure checkpoints was logged.", - ["flow"], -) -CHECKPOINTS_TOTAL_ENDED = Counter( - "worker_checkpoints_ended", - "Total number of times one of a flow's terminal checkpoints (success or failure) was logged.", - ["flow"], -) -CHECKPOINTS_ERRORS = Counter( - "worker_checkpoints_errors", - "Total number of errors while trying to log checkpoints", - ["flow"], -) -CHECKPOINTS_EVENTS = Counter( - "worker_checkpoints_events", - "Total number of checkpoints logged.", - ["flow", "checkpoint"], -) - -CHECKPOINTS_SUBFLOW_DURATION = Histogram( - "worker_checkpoints_subflow_duration_seconds", - "Duration of subflows in seconds.", - ["flow", "subflow"], - buckets=[ - 0.05, - 0.1, - 0.5, - 1, - 2, - 5, - 10, - 30, - 60, - 120, - 180, - 300, - 600, - 900, - 1200, - 1800, - 2400, - 3600, - ], -) - - def _error(msg, flow, strict=False): # When a new version of worker rolls out, it will pick up tasks that # may have been enqueued by the old worker and be missing checkpoints # data. At least for that reason, we want to allow failing softly. metrics.incr("worker.checkpoint_logger.error") - CHECKPOINTS_ERRORS.labels(flow=flow.__name__).inc() + PROMETHEUS_HANDLER.log_errors(flow=flow.__name__) if strict: raise ValueError(msg) else: @@ -331,12 +274,12 @@ class MyEnum(str, Enum): def log_counters(obj: T) -> None: metrics.incr(f"{klass.__name__}.events.{obj.name}") - CHECKPOINTS_EVENTS.labels(flow=klass.__name__, checkpoint=obj.name).inc() + PROMETHEUS_HANDLER.log_checkpoints(flow=klass.__name__, checkpoint=obj.name) # If this is the first checkpoint, increment the number of flows we've begun if obj == next(iter(klass.__members__.values())): metrics.incr(f"{klass.__name__}.total.begun") - CHECKPOINTS_TOTAL_BEGUN.labels(flow=klass.__name__).inc() + PROMETHEUS_HANDLER.log_begun(flow=klass.__name__) return is_failure = hasattr(obj, "is_failure") and obj.is_failure() @@ -345,14 +288,14 @@ def log_counters(obj: T) -> None: if is_failure: metrics.incr(f"{klass.__name__}.total.failed") - CHECKPOINTS_TOTAL_FAILED.labels(flow=klass.__name__).inc() + PROMETHEUS_HANDLER.log_failure(flow=klass.__name__) elif is_success: metrics.incr(f"{klass.__name__}.total.succeeded") - CHECKPOINTS_TOTAL_SUCCEEDED.labels(flow=klass.__name__).inc() + PROMETHEUS_HANDLER.log_success(flow=klass.__name__) if is_terminal: metrics.incr(f"{klass.__name__}.total.ended") - CHECKPOINTS_TOTAL_ENDED.labels(flow=klass.__name__).inc() + PROMETHEUS_HANDLER.log_total_ended(flow=klass.__name__) klass.log_counters = log_counters return klass @@ -373,7 +316,7 @@ class CheckpointLogger(Generic[T]): reconstructed from its serialized data allowing you to begin a flow on one host and log its completion on another (as long as clock drift is marginal). - See `UploadFlow` for an example of defining a flow. It's recomended that you + See `UploadFlow` for an example of defining a flow. It's recommended that you define your flow with the decorators in this file: - `@success_events()`, `@failure_events()`: designate some events as terminal success/fail states of your flow. @@ -489,9 +432,9 @@ def submit_subflow(self: _Self, metric: str, start: T, end: T) -> _Self: if duration: sentry_sdk.set_measurement(metric, duration, "milliseconds") duration_in_seconds = duration / 1000 - CHECKPOINTS_SUBFLOW_DURATION.labels( - flow=self.cls.__name__, subflow=metric - ).observe(duration_in_seconds) + PROMETHEUS_HANDLER.log_subflow( + flow=self.cls.__name__, subflow=metric, duration=duration_in_seconds + ) return self diff --git a/helpers/checkpoint_logger/prometheus.py b/helpers/checkpoint_logger/prometheus.py new file mode 100644 index 000000000..de901855c --- /dev/null +++ b/helpers/checkpoint_logger/prometheus.py @@ -0,0 +1,94 @@ +from shared.metrics import Counter, Histogram + +# Main Counter +CHECKPOINTS_TOTAL_BEGUN = Counter( + "worker_checkpoints_begun", + "Total number of times a flow's first checkpoint was logged.", + ["flow"], +) +CHECKPOINTS_TOTAL_SUCCEEDED = Counter( + "worker_checkpoints_succeeded", + "Total number of times one of a flow's success checkpoints was logged.", + ["flow"], +) +CHECKPOINTS_TOTAL_FAILED = Counter( + "worker_checkpoints_failed", + "Total number of times one of a flow's failure checkpoints was logged.", + ["flow"], +) +CHECKPOINTS_TOTAL_ENDED = Counter( + "worker_checkpoints_ended", + "Total number of times one of a flow's terminal checkpoints (success or failure) was logged.", + ["flow"], +) +CHECKPOINTS_ERRORS = Counter( + "worker_checkpoints_errors", + "Total number of errors while trying to log checkpoints", + ["flow"], +) +CHECKPOINTS_EVENTS = Counter( + "worker_checkpoints_events", + "Total number of checkpoints logged.", + ["flow", "checkpoint"], +) +CHECKPOINTS_SUBFLOW_DURATION = Histogram( + "worker_checkpoints_subflow_duration_seconds", + "Duration of subflows in seconds.", + ["flow", "subflow"], + buckets=[ + 0.05, + 0.1, + 0.5, + 1, + 2, + 5, + 10, + 30, + 60, + 120, + 180, + 300, + 600, + 900, + 1200, + 1800, + 2400, + 3600, + ], +) + + +class PrometheusCheckpointLoggerHandler: + """ + PrometheusCheckpointLoggerHandler is a class that is responsible for all + Prometheus related logs. This checkpoint logic is responsible for logging + metrics to any checkpoints we define. This class is made with the intent + of extending different checkpoints for metrics for different needs. The + methods in this class are mainly used by the CheckpointLogger class. + """ + + def log_begun(self, flow: str): + CHECKPOINTS_TOTAL_BEGUN.labels(flow=flow).inc() + + def log_failure(self, flow: str): + CHECKPOINTS_TOTAL_FAILED.labels(flow=flow).inc() + + def log_success(self, flow: str): + CHECKPOINTS_TOTAL_SUCCEEDED.labels(flow=flow).inc() + + def log_total_ended(self, flow: str): + CHECKPOINTS_TOTAL_ENDED.labels(flow=flow).inc() + + def log_checkpoints(self, flow: str, checkpoint: str): + CHECKPOINTS_EVENTS.labels(flow=flow, checkpoint=checkpoint).inc() + + def log_errors(self, flow: str): + CHECKPOINTS_ERRORS.labels(flow=flow).inc() + + def log_subflow(self, flow: str, subflow: str, duration: int): + CHECKPOINTS_SUBFLOW_DURATION.labels(flow=flow, subflow=subflow).observe( + duration + ) + + +PROMETHEUS_HANDLER = PrometheusCheckpointLoggerHandler()