From 3e649e6e990d63ab14237e84cc757f16a15ff2ab Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Mon, 30 May 2022 16:26:44 -0400 Subject: [PATCH] record stats if task manager times out --- awx/main/analytics/subsystem_metrics.py | 1 + awx/main/scheduler/task_manager.py | 30 ++++++++++++++++--------- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index db509e5c72b0..8b6e69be7499 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -177,6 +177,7 @@ def __init__(self, auto_pipe_execute=True, instance_name=None): FloatM('task_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), FloatM('task_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'), FloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'), + IntM('task_manager_total_runs', 'Number of task manager runs'), FloatM('task_manager_tasks_started', 'Number of tasks started'), FloatM('task_manager_running_processed', 'Number of running tasks processed'), FloatM('task_manager_pending_processed', 'Number of pending tasks processed'), diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 43c078f20117..d381d5f48be0 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -8,6 +8,7 @@ import json import time import sys +import signal # Django from django.db import transaction, connection @@ -685,6 +686,22 @@ def _schedule(self): self.process_tasks(all_sorted_tasks) return finished_wfjs + def record_aggregate_metrics(self, *args): + if not settings.IS_TESTING(sys.argv): + subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False) + subsystem_metrics.inc("task_manager_total_runs", 1) + if ( + "_schedule_seconds" not in self.aggregate_metrics + or self.aggregate_metrics["_schedule_seconds"] > settings.SUBSYSTEM_METRICS_TASK_MANAGER_EXECUTION_TIME + ): + # clear all task manager related values to wipe out data + # from previous task manager run + task_manager_metrics = [m for m in subsystem_metrics.METRICS.keys() if m.startswith("task_manager") and not m.endswith("total_runs")] + subsystem_metrics.clear_values(fields=task_manager_metrics) + for a, v in self.aggregate_metrics.items(): + subsystem_metrics.set("task_manager_" + a, v) + subsystem_metrics.pipe_execute() + def schedule(self): # Lock with advisory_lock('task_manager_lock', wait=False) as acquired: @@ -694,15 +711,8 @@ def schedule(self): return logger.debug("Starting Scheduler") with task_manager_bulk_reschedule(): + # if sigterm due to timeout, still record metrics + signal.signal(signal.SIGTERM, self.record_aggregate_metrics) self._schedule() - if not settings.IS_TESTING(sys.argv): - if self.aggregate_metrics["_schedule_seconds"] > settings.SUBSYSTEM_METRICS_TASK_MANAGER_EXECUTION_TIME: - subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False) - # clear all task manager related values to wipe out data - # from previous task manager run - task_manager_metrics = [m for m in subsystem_metrics.METRICS.keys() if m.startswith("task_manager")] - subsystem_metrics.clear_values(fields=task_manager_metrics) - for a, v in self.aggregate_metrics.items(): - subsystem_metrics.set("task_manager_" + a, v) - subsystem_metrics.pipe_execute() + self.record_aggregate_metrics() logger.debug("Finishing Scheduler")