From ecfe031f718c4fd646ce263af13002a6387bac2b Mon Sep 17 00:00:00 2001 From: Adrian Date: Wed, 23 Oct 2024 15:28:52 -0600 Subject: [PATCH] 2686 add checkpoint with repoid (#800) --- helpers/checkpoint_logger/prometheus.py | 122 +++++++-- helpers/tests/unit/test_checkpoint_logger.py | 272 +++++++++++++++++++ rollouts/__init__.py | 2 + tasks/notify_error.py | 1 - 4 files changed, 375 insertions(+), 22 deletions(-) diff --git a/helpers/checkpoint_logger/prometheus.py b/helpers/checkpoint_logger/prometheus.py index de901855c..15bf5b8ca 100644 --- a/helpers/checkpoint_logger/prometheus.py +++ b/helpers/checkpoint_logger/prometheus.py @@ -1,5 +1,29 @@ from shared.metrics import Counter, Histogram +from helpers.log_context import get_log_context +from rollouts import CHECKPOINT_ENABLED_REPOSITORIES + +_subflow_buckets = [ + 0.05, + 0.1, + 0.5, + 1, + 2, + 5, + 10, + 30, + 60, + 120, + 180, + 300, + 600, + 900, + 1200, + 1800, + 2400, + 3600, +] + # Main Counter CHECKPOINTS_TOTAL_BEGUN = Counter( "worker_checkpoints_begun", @@ -35,26 +59,45 @@ "worker_checkpoints_subflow_duration_seconds", "Duration of subflows in seconds.", ["flow", "subflow"], - buckets=[ - 0.05, - 0.1, - 0.5, - 1, - 2, - 5, - 10, - 30, - 60, - 120, - 180, - 300, - 600, - 900, - 1200, - 1800, - 2400, - 3600, - ], + buckets=_subflow_buckets, +) + +# Repo Counters +REPO_CHECKPOINTS_TOTAL_BEGUN = Counter( + "worker_repo_checkpoints_begun", + "Total number of times a flow's first checkpoint was logged. Labeled with a repo id, but only used for select repos.", + ["flow", "repoid"], +) +REPO_CHECKPOINTS_TOTAL_SUCCEEDED = Counter( + "worker_repo_checkpoints_succeeded", + "Total number of times one of a flow's success checkpoints was logged. Labeled with a repo id, but only used for select repos.", + ["flow", "repoid"], +) +REPO_CHECKPOINTS_TOTAL_FAILED = Counter( + "worker_repo_checkpoints_failed", + "Total number of times one of a flow's failure checkpoints was logged. Labeled with a repo id, but only used for select repos.", + ["flow", "repoid"], +) +REPO_CHECKPOINTS_TOTAL_ENDED = Counter( + "worker_repo_checkpoints_ended", + "Total number of times one of a flow's terminal checkpoints (success or failure) was logged. Labeled with a repo id, but only used for select repos.", + ["flow", "repoid"], +) +REPO_CHECKPOINTS_ERRORS = Counter( + "worker_repo_checkpoints_errors", + "Total number of errors while trying to log checkpoints. Labeled with a repo id, but only used for select repos.", + ["flow", "repoid"], +) +REPO_CHECKPOINTS_EVENTS = Counter( + "worker_repo_checkpoints_events", + "Total number of checkpoints logged. Labeled with a repo id, but only used for select repos.", + ["flow", "checkpoint", "repoid"], +) +REPO_CHECKPOINTS_SUBFLOW_DURATION = Histogram( + "worker_repo_checkpoints_subflow_duration_seconds", + "Duration of subflows in seconds. Labeled with a repo id, but only used for select repos.", + ["flow", "subflow", "repoid"], + buckets=_subflow_buckets, ) @@ -69,26 +112,63 @@ class PrometheusCheckpointLoggerHandler: def log_begun(self, flow: str): CHECKPOINTS_TOTAL_BEGUN.labels(flow=flow).inc() + context = get_log_context() + repoid = context and context.repo_id + if repoid and CHECKPOINT_ENABLED_REPOSITORIES.check_value(identifier=repoid): + REPO_CHECKPOINTS_TOTAL_BEGUN.labels(flow=flow, repoid=repoid).inc() def log_failure(self, flow: str): CHECKPOINTS_TOTAL_FAILED.labels(flow=flow).inc() + context = get_log_context() + repoid = context and context.repo_id + if repoid and CHECKPOINT_ENABLED_REPOSITORIES.check_value(identifier=repoid): + REPO_CHECKPOINTS_TOTAL_FAILED.labels(flow=flow, repoid=repoid).inc() def log_success(self, flow: str): CHECKPOINTS_TOTAL_SUCCEEDED.labels(flow=flow).inc() + context = get_log_context() + repoid = context and context.repo_id + if repoid and CHECKPOINT_ENABLED_REPOSITORIES.check_value(identifier=repoid): + REPO_CHECKPOINTS_TOTAL_SUCCEEDED.labels(flow=flow, repoid=repoid).inc() def log_total_ended(self, flow: str): CHECKPOINTS_TOTAL_ENDED.labels(flow=flow).inc() + context = get_log_context() + repoid = context and context.repo_id + if repoid and CHECKPOINT_ENABLED_REPOSITORIES.check_value(identifier=repoid): + REPO_CHECKPOINTS_TOTAL_ENDED.labels(flow=flow, repoid=repoid).inc() def log_checkpoints(self, flow: str, checkpoint: str): CHECKPOINTS_EVENTS.labels(flow=flow, checkpoint=checkpoint).inc() + context = get_log_context() + repoid = context and context.repo_id + if repoid and CHECKPOINT_ENABLED_REPOSITORIES.check_value(identifier=repoid): + REPO_CHECKPOINTS_EVENTS.labels( + flow=flow, checkpoint=checkpoint, repoid=repoid + ).inc() def log_errors(self, flow: str): CHECKPOINTS_ERRORS.labels(flow=flow).inc() + context = get_log_context() + repoid = context and context.repo_id + if repoid and CHECKPOINT_ENABLED_REPOSITORIES.check_value(identifier=repoid): + REPO_CHECKPOINTS_ERRORS.labels(flow=flow, repoid=repoid).inc() - def log_subflow(self, flow: str, subflow: str, duration: int): + def log_subflow( + self, + flow: str, + subflow: str, + duration: int, + ): CHECKPOINTS_SUBFLOW_DURATION.labels(flow=flow, subflow=subflow).observe( duration ) + context = get_log_context() + repoid = context and context.repo_id + if repoid and CHECKPOINT_ENABLED_REPOSITORIES.check_value(identifier=repoid): + REPO_CHECKPOINTS_SUBFLOW_DURATION.labels( + flow=flow, subflow=subflow, repoid=repoid + ).observe(duration) PROMETHEUS_HANDLER = PrometheusCheckpointLoggerHandler() diff --git a/helpers/tests/unit/test_checkpoint_logger.py b/helpers/tests/unit/test_checkpoint_logger.py index 6a60c41a7..6b9709402 100644 --- a/helpers/tests/unit/test_checkpoint_logger.py +++ b/helpers/tests/unit/test_checkpoint_logger.py @@ -16,6 +16,8 @@ subflows, success_events, ) +from helpers.log_context import LogContext, set_log_context +from rollouts import CHECKPOINT_ENABLED_REPOSITORIES class CounterAssertion: @@ -474,6 +476,276 @@ def test_reliability_counters(self): with CounterAssertionSet(counter_assertions): checkpoints.log(DecoratedEnum.BRANCH_2_SUCCESS) + @patch.object(CHECKPOINT_ENABLED_REPOSITORIES, "check_value", return_value=123) + def test_reliability_counters_with_context(self, mock_object): + repoid = 123 + mock_object.return_value = repoid + set_log_context(LogContext(repo_id=repoid)) + checkpoints = CheckpointLogger(DecoratedEnum, dict(context={"repoid": repoid})) + + counter_assertions = [ + CounterAssertion( + "worker_checkpoints_begun_total", {"flow": "DecoratedEnum"}, 1 + ), + CounterAssertion( + "worker_repo_checkpoints_begun_total", + {"flow": "DecoratedEnum", "repoid": f"{repoid}"}, + 1, + ), + CounterAssertion( + "worker_checkpoints_events_total", + {"flow": "DecoratedEnum", "checkpoint": "BEGIN"}, + 1, + ), + CounterAssertion( + "worker_repo_checkpoints_events_total", + {"flow": "DecoratedEnum", "checkpoint": "BEGIN", "repoid": f"{repoid}"}, + 1, + ), + CounterAssertion( + "worker_checkpoints_succeeded_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_repo_checkpoints_succeeded_total", + {"flow": "DecoratedEnum", "repoid": f"{repoid}"}, + 0, + ), + CounterAssertion( + "worker_checkpoints_failed_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_repo_checkpoints_failed_total", + {"flow": "DecoratedEnum", "repoid": f"{repoid}"}, + 0, + ), + CounterAssertion( + "worker_checkpoints_ended_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_repo_checkpoints_ended_total", + {"flow": "DecoratedEnum", "repoid": f"{repoid}"}, + 0, + ), + ] + with CounterAssertionSet(counter_assertions) as a: + checkpoints.log(DecoratedEnum.BEGIN) + + # Nothing special about `CHECKPOINT` - no counters should change + counter_assertions = [ + CounterAssertion( + "worker_checkpoints_begun_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_repo_checkpoints_begun_total", + {"flow": "DecoratedEnum", "repoid": f"{repoid}"}, + 0, + ), + CounterAssertion( + "worker_checkpoints_events_total", + {"flow": "DecoratedEnum", "checkpoint": "BEGIN"}, + 0, + ), + CounterAssertion( + "worker_repo_checkpoints_events_total", + {"flow": "DecoratedEnum", "checkpoint": "BEGIN", "repoid": f"{repoid}"}, + 0, + ), + CounterAssertion( + "worker_checkpoints_events_total", + {"flow": "DecoratedEnum", "checkpoint": "CHECKPOINT"}, + 1, + ), + CounterAssertion( + "worker_repo_checkpoints_events_total", + { + "flow": "DecoratedEnum", + "checkpoint": "CHECKPOINT", + "repoid": f"{repoid}", + }, + 1, + ), + CounterAssertion( + "worker_checkpoints_succeeded_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_repo_checkpoints_succeeded_total", + {"flow": "DecoratedEnum", "repoid": f"{repoid}"}, + 0, + ), + CounterAssertion( + "worker_checkpoints_failed_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_repo_checkpoints_failed_total", + {"flow": "DecoratedEnum", "repoid": f"{repoid}"}, + 0, + ), + CounterAssertion( + "worker_checkpoints_ended_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_reopo_checkpoints_ended_total", + {"flow": "DecoratedEnum", "repoid": f"{repoid}"}, + 0, + ), + ] + with CounterAssertionSet(counter_assertions): + checkpoints.log(DecoratedEnum.CHECKPOINT) + + # Failures should increment both `failed` and `ended` + counter_assertions = [ + CounterAssertion( + "worker_checkpoints_begun_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_repo_checkpoints_begun_total", + {"flow": "DecoratedEnum", "repoid": f"{repoid}"}, + 0, + ), + CounterAssertion( + "worker_checkpoints_succeeded_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_repo_checkpoints_succeeded_total", + {"flow": "DecoratedEnum", "repoid": f"{repoid}"}, + 0, + ), + CounterAssertion( + "worker_checkpoints_failed_total", {"flow": "DecoratedEnum"}, 1 + ), + CounterAssertion( + "worker_repo_checkpoints_failed_total", + {"flow": "DecoratedEnum", "repoid": f"{repoid}"}, + 1, + ), + CounterAssertion( + "worker_checkpoints_ended_total", {"flow": "DecoratedEnum"}, 1 + ), + CounterAssertion( + "worker_repo_checkpoints_ended_total", + {"flow": "DecoratedEnum", "repoid": f"{repoid}"}, + 1, + ), + CounterAssertion( + "worker_checkpoints_events_total", + {"flow": "DecoratedEnum", "checkpoint": "BRANCH_1_FAIL"}, + 1, + ), + CounterAssertion( + "worker_repo_checkpoints_events_total", + { + "flow": "DecoratedEnum", + "checkpoint": "BRANCH_1_FAIL", + "repoid": f"{repoid}", + }, + 1, + ), + ] + with CounterAssertionSet(counter_assertions): + checkpoints.log(DecoratedEnum.BRANCH_1_FAIL) + + # Successes should increment both `succeeded` and `ended` + counter_assertions = [ + CounterAssertion( + "worker_checkpoints_begun_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_repo_checkpoints_begun_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_checkpoints_succeeded_total", {"flow": "DecoratedEnum"}, 1 + ), + CounterAssertion( + "worker_repo_checkpoints_succeeded_total", + {"flow": "DecoratedEnum", "repoid": f"{repoid}"}, + 1, + ), + CounterAssertion( + "worker_checkpoints_failed_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_repo_checkpoints_failed_total", + {"flow": "DecoratedEnum", "repoid": f"{repoid}"}, + 0, + ), + CounterAssertion( + "worker_checkpoints_ended_total", {"flow": "DecoratedEnum"}, 1 + ), + CounterAssertion( + "worker_repo_checkpoints_ended_total", + {"flow": "DecoratedEnum", "repoid": f"{repoid}"}, + 1, + ), + CounterAssertion( + "worker_checkpoints_events_total", + {"flow": "DecoratedEnum", "checkpoint": "BRANCH_1_SUCCESS"}, + 1, + ), + CounterAssertion( + "worker_repo_checkpoints_events_total", + { + "flow": "DecoratedEnum", + "checkpoint": "BRANCH_1_SUCCESS", + "repoid": f"{repoid}", + }, + 1, + ), + ] + with CounterAssertionSet(counter_assertions): + checkpoints.log(DecoratedEnum.BRANCH_1_SUCCESS) + + # A different success path should also increment `succeeded` and `ended` + counter_assertions = [ + CounterAssertion( + "worker_checkpoints_begun_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_repo_checkpoints_begun_total", + {"flow": "DecoratedEnum", "repoid": f"{repoid}"}, + 0, + ), + CounterAssertion( + "worker_checkpoints_succeeded_total", {"flow": "DecoratedEnum"}, 1 + ), + CounterAssertion( + "worker_repo_checkpoints_succeeded_total", + {"flow": "DecoratedEnum", "repoid": f"{repoid}"}, + 1, + ), + CounterAssertion( + "worker_checkpoints_failed_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_repo_checkpoints_failed_total", + {"flow": "DecoratedEnum", "repoid": f"{repoid}"}, + 0, + ), + CounterAssertion( + "worker_checkpoints_ended_total", {"flow": "DecoratedEnum"}, 1 + ), + CounterAssertion( + "worker_repo_checkpoints_ended_total", + {"flow": "DecoratedEnum", "repoid": f"{repoid}"}, + 1, + ), + CounterAssertion( + "worker_checkpoints_events_total", + {"flow": "DecoratedEnum", "checkpoint": "BRANCH_2_SUCCESS"}, + 1, + ), + CounterAssertion( + "worker_repo_checkpoints_events_total", + { + "flow": "DecoratedEnum", + "checkpoint": "BRANCH_2_SUCCESS", + "repoid": f"{repoid}", + }, + 1, + ), + ] + with CounterAssertionSet(counter_assertions): + checkpoints.log(DecoratedEnum.BRANCH_2_SUCCESS) + def test_serialize_between_tasks(self): """ BaseFlow must inherit from str in order for our checkpoints dict to be diff --git a/rollouts/__init__.py b/rollouts/__init__.py index 67003e32f..b19aa71e9 100644 --- a/rollouts/__init__.py +++ b/rollouts/__init__.py @@ -15,3 +15,5 @@ CARRYFORWARD_BASE_SEARCH_RANGE_BY_OWNER = Feature("carryforward_base_search_range") SYNC_PULL_USE_MERGE_COMMIT_SHA = Feature("sync_pull_use_merge_commit_sha") + +CHECKPOINT_ENABLED_REPOSITORIES = Feature("checkpoint_enabled_repositories") diff --git a/tasks/notify_error.py b/tasks/notify_error.py index c66b01f9e..20710d7b2 100644 --- a/tasks/notify_error.py +++ b/tasks/notify_error.py @@ -43,7 +43,6 @@ def run_impl( ) # get all upload errors for this commit - # commit_yaml = UserYaml.from_dict(current_yaml) checkpoints = checkpoints_from_kwargs(UploadFlow, kwargs)