Skip to content

Commit

Permalink
record stats if task manager times out
Browse files Browse the repository at this point in the history
  • Loading branch information
fosterseth committed May 30, 2022
1 parent ce0b28e commit 3e649e6
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 10 deletions.
1 change: 1 addition & 0 deletions awx/main/analytics/subsystem_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def __init__(self, auto_pipe_execute=True, instance_name=None):
FloatM('task_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'),
FloatM('task_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'),
FloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
IntM('task_manager_total_runs', 'Number of task manager runs'),
FloatM('task_manager_tasks_started', 'Number of tasks started'),
FloatM('task_manager_running_processed', 'Number of running tasks processed'),
FloatM('task_manager_pending_processed', 'Number of pending tasks processed'),
Expand Down
30 changes: 20 additions & 10 deletions awx/main/scheduler/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import json
import time
import sys
import signal

# Django
from django.db import transaction, connection
Expand Down Expand Up @@ -685,6 +686,22 @@ def _schedule(self):
self.process_tasks(all_sorted_tasks)
return finished_wfjs

def record_aggregate_metrics(self, *args):
if not settings.IS_TESTING(sys.argv):
subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
subsystem_metrics.inc("task_manager_total_runs", 1)
if (
"_schedule_seconds" not in self.aggregate_metrics
or self.aggregate_metrics["_schedule_seconds"] > settings.SUBSYSTEM_METRICS_TASK_MANAGER_EXECUTION_TIME
):
# clear all task manager related values to wipe out data
# from previous task manager run
task_manager_metrics = [m for m in subsystem_metrics.METRICS.keys() if m.startswith("task_manager") and not m.endswith("total_runs")]
subsystem_metrics.clear_values(fields=task_manager_metrics)
for a, v in self.aggregate_metrics.items():
subsystem_metrics.set("task_manager_" + a, v)
subsystem_metrics.pipe_execute()

def schedule(self):
# Lock
with advisory_lock('task_manager_lock', wait=False) as acquired:
Expand All @@ -694,15 +711,8 @@ def schedule(self):
return
logger.debug("Starting Scheduler")
with task_manager_bulk_reschedule():
# if sigterm due to timeout, still record metrics
signal.signal(signal.SIGTERM, self.record_aggregate_metrics)
self._schedule()
if not settings.IS_TESTING(sys.argv):
if self.aggregate_metrics["_schedule_seconds"] > settings.SUBSYSTEM_METRICS_TASK_MANAGER_EXECUTION_TIME:
subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
# clear all task manager related values to wipe out data
# from previous task manager run
task_manager_metrics = [m for m in subsystem_metrics.METRICS.keys() if m.startswith("task_manager")]
subsystem_metrics.clear_values(fields=task_manager_metrics)
for a, v in self.aggregate_metrics.items():
subsystem_metrics.set("task_manager_" + a, v)
subsystem_metrics.pipe_execute()
self.record_aggregate_metrics()
logger.debug("Finishing Scheduler")

0 comments on commit 3e649e6

Please sign in to comment.