Skip to content

Commit

Permalink
2686 add checkpoint with repoid (#800)
Browse files Browse the repository at this point in the history
  • Loading branch information
adrian-codecov authored Oct 23, 2024
1 parent 6b1f38e commit ecfe031
Show file tree
Hide file tree
Showing 4 changed files with 375 additions and 22 deletions.
122 changes: 101 additions & 21 deletions helpers/checkpoint_logger/prometheus.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,29 @@
from shared.metrics import Counter, Histogram

from helpers.log_context import get_log_context
from rollouts import CHECKPOINT_ENABLED_REPOSITORIES

_subflow_buckets = [
0.05,
0.1,
0.5,
1,
2,
5,
10,
30,
60,
120,
180,
300,
600,
900,
1200,
1800,
2400,
3600,
]

# Main Counter
CHECKPOINTS_TOTAL_BEGUN = Counter(
"worker_checkpoints_begun",
Expand Down Expand Up @@ -35,26 +59,45 @@
"worker_checkpoints_subflow_duration_seconds",
"Duration of subflows in seconds.",
["flow", "subflow"],
buckets=[
0.05,
0.1,
0.5,
1,
2,
5,
10,
30,
60,
120,
180,
300,
600,
900,
1200,
1800,
2400,
3600,
],
buckets=_subflow_buckets,
)

# Repo Counters
REPO_CHECKPOINTS_TOTAL_BEGUN = Counter(
"worker_repo_checkpoints_begun",
"Total number of times a flow's first checkpoint was logged. Labeled with a repo id, but only used for select repos.",
["flow", "repoid"],
)
REPO_CHECKPOINTS_TOTAL_SUCCEEDED = Counter(
"worker_repo_checkpoints_succeeded",
"Total number of times one of a flow's success checkpoints was logged. Labeled with a repo id, but only used for select repos.",
["flow", "repoid"],
)
REPO_CHECKPOINTS_TOTAL_FAILED = Counter(
"worker_repo_checkpoints_failed",
"Total number of times one of a flow's failure checkpoints was logged. Labeled with a repo id, but only used for select repos.",
["flow", "repoid"],
)
REPO_CHECKPOINTS_TOTAL_ENDED = Counter(
"worker_repo_checkpoints_ended",
"Total number of times one of a flow's terminal checkpoints (success or failure) was logged. Labeled with a repo id, but only used for select repos.",
["flow", "repoid"],
)
REPO_CHECKPOINTS_ERRORS = Counter(
"worker_repo_checkpoints_errors",
"Total number of errors while trying to log checkpoints. Labeled with a repo id, but only used for select repos.",
["flow", "repoid"],
)
REPO_CHECKPOINTS_EVENTS = Counter(
"worker_repo_checkpoints_events",
"Total number of checkpoints logged. Labeled with a repo id, but only used for select repos.",
["flow", "checkpoint", "repoid"],
)
REPO_CHECKPOINTS_SUBFLOW_DURATION = Histogram(
"worker_repo_checkpoints_subflow_duration_seconds",
"Duration of subflows in seconds. Labeled with a repo id, but only used for select repos.",
["flow", "subflow", "repoid"],
buckets=_subflow_buckets,
)


Expand All @@ -69,26 +112,63 @@ class PrometheusCheckpointLoggerHandler:

def log_begun(self, flow: str):
CHECKPOINTS_TOTAL_BEGUN.labels(flow=flow).inc()
context = get_log_context()
repoid = context and context.repo_id
if repoid and CHECKPOINT_ENABLED_REPOSITORIES.check_value(identifier=repoid):
REPO_CHECKPOINTS_TOTAL_BEGUN.labels(flow=flow, repoid=repoid).inc()

def log_failure(self, flow: str):
CHECKPOINTS_TOTAL_FAILED.labels(flow=flow).inc()
context = get_log_context()
repoid = context and context.repo_id
if repoid and CHECKPOINT_ENABLED_REPOSITORIES.check_value(identifier=repoid):
REPO_CHECKPOINTS_TOTAL_FAILED.labels(flow=flow, repoid=repoid).inc()

def log_success(self, flow: str):
CHECKPOINTS_TOTAL_SUCCEEDED.labels(flow=flow).inc()
context = get_log_context()
repoid = context and context.repo_id
if repoid and CHECKPOINT_ENABLED_REPOSITORIES.check_value(identifier=repoid):
REPO_CHECKPOINTS_TOTAL_SUCCEEDED.labels(flow=flow, repoid=repoid).inc()

def log_total_ended(self, flow: str):
CHECKPOINTS_TOTAL_ENDED.labels(flow=flow).inc()
context = get_log_context()
repoid = context and context.repo_id
if repoid and CHECKPOINT_ENABLED_REPOSITORIES.check_value(identifier=repoid):
REPO_CHECKPOINTS_TOTAL_ENDED.labels(flow=flow, repoid=repoid).inc()

def log_checkpoints(self, flow: str, checkpoint: str):
CHECKPOINTS_EVENTS.labels(flow=flow, checkpoint=checkpoint).inc()
context = get_log_context()
repoid = context and context.repo_id
if repoid and CHECKPOINT_ENABLED_REPOSITORIES.check_value(identifier=repoid):
REPO_CHECKPOINTS_EVENTS.labels(
flow=flow, checkpoint=checkpoint, repoid=repoid
).inc()

def log_errors(self, flow: str):
CHECKPOINTS_ERRORS.labels(flow=flow).inc()
context = get_log_context()
repoid = context and context.repo_id
if repoid and CHECKPOINT_ENABLED_REPOSITORIES.check_value(identifier=repoid):
REPO_CHECKPOINTS_ERRORS.labels(flow=flow, repoid=repoid).inc()

def log_subflow(self, flow: str, subflow: str, duration: int):
def log_subflow(
self,
flow: str,
subflow: str,
duration: int,
):
CHECKPOINTS_SUBFLOW_DURATION.labels(flow=flow, subflow=subflow).observe(
duration
)
context = get_log_context()
repoid = context and context.repo_id
if repoid and CHECKPOINT_ENABLED_REPOSITORIES.check_value(identifier=repoid):
REPO_CHECKPOINTS_SUBFLOW_DURATION.labels(
flow=flow, subflow=subflow, repoid=repoid
).observe(duration)


PROMETHEUS_HANDLER = PrometheusCheckpointLoggerHandler()
Loading

0 comments on commit ecfe031

Please sign in to comment.