Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created prometheus handler class #790

Merged
merged 2 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 13 additions & 70 deletions helpers/checkpoint_logger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,79 +17,22 @@
)

import sentry_sdk
from shared.metrics import Counter, Histogram, metrics
from shared.metrics import metrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you can also use the opportunity to remove all these statsd metrics

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a followup PR incoming, I'll remove them in that one 👌


logger = logging.getLogger(__name__)
from helpers.checkpoint_logger.prometheus import PROMETHEUS_HANDLER

logger = logging.getLogger(__name__)

T = TypeVar("T", bound="BaseFlow")
TSubflows: TypeAlias = Mapping[T, Iterable[tuple[str, T]]]


CHECKPOINTS_TOTAL_BEGUN = Counter(
"worker_checkpoints_begun",
"Total number of times a flow's first checkpoint was logged.",
["flow"],
)
CHECKPOINTS_TOTAL_SUCCEEDED = Counter(
"worker_checkpoints_succeeded",
"Total number of times one of a flow's success checkpoints was logged.",
["flow"],
)
CHECKPOINTS_TOTAL_FAILED = Counter(
"worker_checkpoints_failed",
"Total number of times one of a flow's failure checkpoints was logged.",
["flow"],
)
CHECKPOINTS_TOTAL_ENDED = Counter(
"worker_checkpoints_ended",
"Total number of times one of a flow's terminal checkpoints (success or failure) was logged.",
["flow"],
)
CHECKPOINTS_ERRORS = Counter(
"worker_checkpoints_errors",
"Total number of errors while trying to log checkpoints",
["flow"],
)
CHECKPOINTS_EVENTS = Counter(
"worker_checkpoints_events",
"Total number of checkpoints logged.",
["flow", "checkpoint"],
)

CHECKPOINTS_SUBFLOW_DURATION = Histogram(
"worker_checkpoints_subflow_duration_seconds",
"Duration of subflows in seconds.",
["flow", "subflow"],
buckets=[
0.05,
0.1,
0.5,
1,
2,
5,
10,
30,
60,
120,
180,
300,
600,
900,
1200,
1800,
2400,
3600,
],
)


def _error(msg, flow, strict=False):
# When a new version of worker rolls out, it will pick up tasks that
# may have been enqueued by the old worker and be missing checkpoints
# data. At least for that reason, we want to allow failing softly.
metrics.incr("worker.checkpoint_logger.error")
CHECKPOINTS_ERRORS.labels(flow=flow.__name__).inc()
PROMETHEUS_HANDLER.log_errors(flow=flow.__name__)
if strict:
raise ValueError(msg)
else:
Expand Down Expand Up @@ -331,12 +274,12 @@ class MyEnum(str, Enum):

def log_counters(obj: T) -> None:
metrics.incr(f"{klass.__name__}.events.{obj.name}")
CHECKPOINTS_EVENTS.labels(flow=klass.__name__, checkpoint=obj.name).inc()
PROMETHEUS_HANDLER.log_checkpoints(flow=klass.__name__, checkpoint=obj.name)

# If this is the first checkpoint, increment the number of flows we've begun
if obj == next(iter(klass.__members__.values())):
metrics.incr(f"{klass.__name__}.total.begun")
CHECKPOINTS_TOTAL_BEGUN.labels(flow=klass.__name__).inc()
PROMETHEUS_HANDLER.log_begun(flow=klass.__name__)
return

is_failure = hasattr(obj, "is_failure") and obj.is_failure()
Expand All @@ -345,14 +288,14 @@ def log_counters(obj: T) -> None:

if is_failure:
metrics.incr(f"{klass.__name__}.total.failed")
CHECKPOINTS_TOTAL_FAILED.labels(flow=klass.__name__).inc()
PROMETHEUS_HANDLER.log_failure(flow=klass.__name__)
elif is_success:
metrics.incr(f"{klass.__name__}.total.succeeded")
CHECKPOINTS_TOTAL_SUCCEEDED.labels(flow=klass.__name__).inc()
PROMETHEUS_HANDLER.log_success(flow=klass.__name__)

if is_terminal:
metrics.incr(f"{klass.__name__}.total.ended")
CHECKPOINTS_TOTAL_ENDED.labels(flow=klass.__name__).inc()
PROMETHEUS_HANDLER.log_total_ended(flow=klass.__name__)

klass.log_counters = log_counters
return klass
Expand All @@ -373,7 +316,7 @@ class CheckpointLogger(Generic[T]):
reconstructed from its serialized data allowing you to begin a flow on one host
and log its completion on another (as long as clock drift is marginal).

See `UploadFlow` for an example of defining a flow. It's recomended that you
See `UploadFlow` for an example of defining a flow. It's recommended that you
define your flow with the decorators in this file:
- `@success_events()`, `@failure_events()`: designate some events as terminal
success/fail states of your flow.
Expand Down Expand Up @@ -489,9 +432,9 @@ def submit_subflow(self: _Self, metric: str, start: T, end: T) -> _Self:
if duration:
sentry_sdk.set_measurement(metric, duration, "milliseconds")
duration_in_seconds = duration / 1000
CHECKPOINTS_SUBFLOW_DURATION.labels(
flow=self.cls.__name__, subflow=metric
).observe(duration_in_seconds)
PROMETHEUS_HANDLER.log_subflow(
flow=self.cls.__name__, subflow=metric, duration=duration_in_seconds
)

return self

Expand Down
94 changes: 94 additions & 0 deletions helpers/checkpoint_logger/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from shared.metrics import Counter, Histogram

# Main Counter
CHECKPOINTS_TOTAL_BEGUN = Counter(
"worker_checkpoints_begun",
"Total number of times a flow's first checkpoint was logged.",
["flow"],
)
CHECKPOINTS_TOTAL_SUCCEEDED = Counter(
"worker_checkpoints_succeeded",
"Total number of times one of a flow's success checkpoints was logged.",
["flow"],
)
CHECKPOINTS_TOTAL_FAILED = Counter(
"worker_checkpoints_failed",
"Total number of times one of a flow's failure checkpoints was logged.",
["flow"],
)
CHECKPOINTS_TOTAL_ENDED = Counter(
"worker_checkpoints_ended",
"Total number of times one of a flow's terminal checkpoints (success or failure) was logged.",
["flow"],
)
CHECKPOINTS_ERRORS = Counter(
"worker_checkpoints_errors",
"Total number of errors while trying to log checkpoints",
["flow"],
)
CHECKPOINTS_EVENTS = Counter(
"worker_checkpoints_events",
"Total number of checkpoints logged.",
["flow", "checkpoint"],
)
CHECKPOINTS_SUBFLOW_DURATION = Histogram(
"worker_checkpoints_subflow_duration_seconds",
"Duration of subflows in seconds.",
["flow", "subflow"],
buckets=[
0.05,
0.1,
0.5,
1,
2,
5,
10,
30,
60,
120,
180,
300,
600,
900,
1200,
1800,
2400,
3600,
],
)


class PrometheusCheckpointLoggerHandler:
"""
PrometheusCheckpointLoggerHandler is a class that is responsible for all
Prometheus related logs. This checkpoint logic is responsible for logging
metrics to any checkpoints we define. This class is made with the intent
of extending different checkpoints for metrics for different needs. The
methods in this class are mainly used by the CheckpointLogger class.
"""

def log_begun(self, flow: str):
CHECKPOINTS_TOTAL_BEGUN.labels(flow=flow).inc()

def log_failure(self, flow: str):
CHECKPOINTS_TOTAL_FAILED.labels(flow=flow).inc()

def log_success(self, flow: str):
CHECKPOINTS_TOTAL_SUCCEEDED.labels(flow=flow).inc()

def log_total_ended(self, flow: str):
CHECKPOINTS_TOTAL_ENDED.labels(flow=flow).inc()

def log_checkpoints(self, flow: str, checkpoint: str):
CHECKPOINTS_EVENTS.labels(flow=flow, checkpoint=checkpoint).inc()

def log_errors(self, flow: str):
CHECKPOINTS_ERRORS.labels(flow=flow).inc()

def log_subflow(self, flow: str, subflow: str, duration: int):
CHECKPOINTS_SUBFLOW_DURATION.labels(flow=flow, subflow=subflow).observe(
duration
)


PROMETHEUS_HANDLER = PrometheusCheckpointLoggerHandler()
Loading