Skip to content

Commit

Permalink
Created prometheus handler class (#790)
Browse files Browse the repository at this point in the history
Co-authored-by: Rohit Vinnakota <148245014+rohitvinnakota-codecov@users.noreply.github.com>
  • Loading branch information
adrian-codecov and rohitvinnakota-codecov authored Oct 16, 2024
1 parent 112592f commit 48ce427
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 70 deletions.
83 changes: 13 additions & 70 deletions helpers/checkpoint_logger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,79 +17,22 @@
)

import sentry_sdk
from shared.metrics import Counter, Histogram, metrics
from shared.metrics import metrics

logger = logging.getLogger(__name__)
from helpers.checkpoint_logger.prometheus import PROMETHEUS_HANDLER

logger = logging.getLogger(__name__)

T = TypeVar("T", bound="BaseFlow")
TSubflows: TypeAlias = Mapping[T, Iterable[tuple[str, T]]]


CHECKPOINTS_TOTAL_BEGUN = Counter(
"worker_checkpoints_begun",
"Total number of times a flow's first checkpoint was logged.",
["flow"],
)
CHECKPOINTS_TOTAL_SUCCEEDED = Counter(
"worker_checkpoints_succeeded",
"Total number of times one of a flow's success checkpoints was logged.",
["flow"],
)
CHECKPOINTS_TOTAL_FAILED = Counter(
"worker_checkpoints_failed",
"Total number of times one of a flow's failure checkpoints was logged.",
["flow"],
)
CHECKPOINTS_TOTAL_ENDED = Counter(
"worker_checkpoints_ended",
"Total number of times one of a flow's terminal checkpoints (success or failure) was logged.",
["flow"],
)
CHECKPOINTS_ERRORS = Counter(
"worker_checkpoints_errors",
"Total number of errors while trying to log checkpoints",
["flow"],
)
CHECKPOINTS_EVENTS = Counter(
"worker_checkpoints_events",
"Total number of checkpoints logged.",
["flow", "checkpoint"],
)

CHECKPOINTS_SUBFLOW_DURATION = Histogram(
"worker_checkpoints_subflow_duration_seconds",
"Duration of subflows in seconds.",
["flow", "subflow"],
buckets=[
0.05,
0.1,
0.5,
1,
2,
5,
10,
30,
60,
120,
180,
300,
600,
900,
1200,
1800,
2400,
3600,
],
)


def _error(msg, flow, strict=False):
# When a new version of worker rolls out, it will pick up tasks that
# may have been enqueued by the old worker and be missing checkpoints
# data. At least for that reason, we want to allow failing softly.
metrics.incr("worker.checkpoint_logger.error")
CHECKPOINTS_ERRORS.labels(flow=flow.__name__).inc()
PROMETHEUS_HANDLER.log_errors(flow=flow.__name__)
if strict:
raise ValueError(msg)
else:
Expand Down Expand Up @@ -331,12 +274,12 @@ class MyEnum(str, Enum):

def log_counters(obj: T) -> None:
metrics.incr(f"{klass.__name__}.events.{obj.name}")
CHECKPOINTS_EVENTS.labels(flow=klass.__name__, checkpoint=obj.name).inc()
PROMETHEUS_HANDLER.log_checkpoints(flow=klass.__name__, checkpoint=obj.name)

# If this is the first checkpoint, increment the number of flows we've begun
if obj == next(iter(klass.__members__.values())):
metrics.incr(f"{klass.__name__}.total.begun")
CHECKPOINTS_TOTAL_BEGUN.labels(flow=klass.__name__).inc()
PROMETHEUS_HANDLER.log_begun(flow=klass.__name__)
return

is_failure = hasattr(obj, "is_failure") and obj.is_failure()
Expand All @@ -345,14 +288,14 @@ def log_counters(obj: T) -> None:

if is_failure:
metrics.incr(f"{klass.__name__}.total.failed")
CHECKPOINTS_TOTAL_FAILED.labels(flow=klass.__name__).inc()
PROMETHEUS_HANDLER.log_failure(flow=klass.__name__)
elif is_success:
metrics.incr(f"{klass.__name__}.total.succeeded")
CHECKPOINTS_TOTAL_SUCCEEDED.labels(flow=klass.__name__).inc()
PROMETHEUS_HANDLER.log_success(flow=klass.__name__)

if is_terminal:
metrics.incr(f"{klass.__name__}.total.ended")
CHECKPOINTS_TOTAL_ENDED.labels(flow=klass.__name__).inc()
PROMETHEUS_HANDLER.log_total_ended(flow=klass.__name__)

klass.log_counters = log_counters
return klass
Expand All @@ -373,7 +316,7 @@ class CheckpointLogger(Generic[T]):
reconstructed from its serialized data allowing you to begin a flow on one host
and log its completion on another (as long as clock drift is marginal).
See `UploadFlow` for an example of defining a flow. It's recomended that you
See `UploadFlow` for an example of defining a flow. It's recommended that you
define your flow with the decorators in this file:
- `@success_events()`, `@failure_events()`: designate some events as terminal
success/fail states of your flow.
Expand Down Expand Up @@ -489,9 +432,9 @@ def submit_subflow(self: _Self, metric: str, start: T, end: T) -> _Self:
if duration:
sentry_sdk.set_measurement(metric, duration, "milliseconds")
duration_in_seconds = duration / 1000
CHECKPOINTS_SUBFLOW_DURATION.labels(
flow=self.cls.__name__, subflow=metric
).observe(duration_in_seconds)
PROMETHEUS_HANDLER.log_subflow(
flow=self.cls.__name__, subflow=metric, duration=duration_in_seconds
)

return self

Expand Down
94 changes: 94 additions & 0 deletions helpers/checkpoint_logger/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from shared.metrics import Counter, Histogram

# Main Counter
CHECKPOINTS_TOTAL_BEGUN = Counter(
"worker_checkpoints_begun",
"Total number of times a flow's first checkpoint was logged.",
["flow"],
)
CHECKPOINTS_TOTAL_SUCCEEDED = Counter(
"worker_checkpoints_succeeded",
"Total number of times one of a flow's success checkpoints was logged.",
["flow"],
)
CHECKPOINTS_TOTAL_FAILED = Counter(
"worker_checkpoints_failed",
"Total number of times one of a flow's failure checkpoints was logged.",
["flow"],
)
CHECKPOINTS_TOTAL_ENDED = Counter(
"worker_checkpoints_ended",
"Total number of times one of a flow's terminal checkpoints (success or failure) was logged.",
["flow"],
)
CHECKPOINTS_ERRORS = Counter(
"worker_checkpoints_errors",
"Total number of errors while trying to log checkpoints",
["flow"],
)
CHECKPOINTS_EVENTS = Counter(
"worker_checkpoints_events",
"Total number of checkpoints logged.",
["flow", "checkpoint"],
)
CHECKPOINTS_SUBFLOW_DURATION = Histogram(
"worker_checkpoints_subflow_duration_seconds",
"Duration of subflows in seconds.",
["flow", "subflow"],
buckets=[
0.05,
0.1,
0.5,
1,
2,
5,
10,
30,
60,
120,
180,
300,
600,
900,
1200,
1800,
2400,
3600,
],
)


class PrometheusCheckpointLoggerHandler:
"""
PrometheusCheckpointLoggerHandler is a class that is responsible for all
Prometheus related logs. This checkpoint logic is responsible for logging
metrics to any checkpoints we define. This class is made with the intent
of extending different checkpoints for metrics for different needs. The
methods in this class are mainly used by the CheckpointLogger class.
"""

def log_begun(self, flow: str):
CHECKPOINTS_TOTAL_BEGUN.labels(flow=flow).inc()

def log_failure(self, flow: str):
CHECKPOINTS_TOTAL_FAILED.labels(flow=flow).inc()

def log_success(self, flow: str):
CHECKPOINTS_TOTAL_SUCCEEDED.labels(flow=flow).inc()

def log_total_ended(self, flow: str):
CHECKPOINTS_TOTAL_ENDED.labels(flow=flow).inc()

def log_checkpoints(self, flow: str, checkpoint: str):
CHECKPOINTS_EVENTS.labels(flow=flow, checkpoint=checkpoint).inc()

def log_errors(self, flow: str):
CHECKPOINTS_ERRORS.labels(flow=flow).inc()

def log_subflow(self, flow: str, subflow: str, duration: int):
CHECKPOINTS_SUBFLOW_DURATION.labels(flow=flow, subflow=subflow).observe(
duration
)


PROMETHEUS_HANDLER = PrometheusCheckpointLoggerHandler()

0 comments on commit 48ce427

Please sign in to comment.