From ce0b28ee1cb7dae70ab3bceb2957134d16865099 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Fri, 13 May 2022 17:08:46 -0400 Subject: [PATCH] Add subsystem metrics for task manager --- awx/main/analytics/subsystem_metrics.py | 19 ++++++++-- awx/main/scheduler/task_manager.py | 49 ++++++++++++++++++++++++- awx/settings/defaults.py | 3 ++ 3 files changed, 67 insertions(+), 4 deletions(-) diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index a08211b31168..db509e5c72b0 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -170,6 +170,17 @@ def __init__(self, auto_pipe_execute=True, instance_name=None): FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'), IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'), FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'), + FloatM('task_manager_get_tasks_seconds', 'Time spent in loading all tasks from db'), + FloatM('task_manager_start_task_seconds', 'Time spent starting task'), + FloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'), + FloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'), + FloatM('task_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'), + FloatM('task_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'), + FloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'), + FloatM('task_manager_tasks_started', 'Number of tasks started'), + FloatM('task_manager_running_processed', 'Number of running tasks processed'), + FloatM('task_manager_pending_processed', 'Number of pending tasks processed'), + FloatM('task_manager_tasks_blocked', 'Number of tasks blocked from running'), ] # turn metric list into dictionary with the metric name as a key self.METRICS = {} @@ -179,9 +190,11 @@ def __init__(self, auto_pipe_execute=True, instance_name=None): # track last time metrics were sent to other nodes self.previous_send_metrics = SetFloatM('send_metrics_time', 'Timestamp of previous send_metrics call') - def clear_values(self): - for m in self.METRICS.values(): - m.clear_value(self.conn) + def clear_values(self, fields=None): + if not fields: + fields = self.METRICS.keys() + for m in fields: + self.METRICS[m].clear_value(self.conn) self.metrics_have_changed = True self.conn.delete(root_key + "_lock") diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 6fa200fcd68a..43c078f20117 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -6,6 +6,8 @@ import logging import uuid import json +import time +import sys # Django from django.db import transaction, connection @@ -38,12 +40,27 @@ from awx.main.scheduler.dependency_graph import DependencyGraph from awx.main.scheduler.task_manager_models import TaskManagerInstances from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups +import awx.main.analytics.subsystem_metrics as s_metrics from awx.main.utils import decrypt_field logger = logging.getLogger('awx.main.scheduler') +def timeit(func): + def inner(*args, **kwargs): + t_now = time.perf_counter() + result = func(*args, **kwargs) + dur = time.perf_counter() - t_now + if func.__name__ not in args[0].aggregate_metrics: + args[0].aggregate_metrics[func.__name__ + "_seconds"] = dur + else: + args[0].aggregate_metrics[func.__name__ + "_seconds"] += dur + return result + + return inner + + class TaskManager: def __init__(self): """ @@ -62,6 +79,13 @@ def __init__(self): # will no longer be started and will be started on the next task manager cycle. self.start_task_limit = settings.START_TASK_LIMIT self.time_delta_job_explanation = timedelta(seconds=30) + self.aggregate_metrics = {} # for recording subsystem metrics + + def inc_metric(self, metric, inc_by=1): + if metric not in self.aggregate_metrics: + self.aggregate_metrics[metric] = inc_by + else: + self.aggregate_metrics[metric] += inc_by def after_lock_init(self, all_sorted_tasks): """ @@ -100,6 +124,7 @@ def job_blocked_by(self, task): return None + @timeit def get_tasks(self, status_list=('pending', 'waiting', 'running')): jobs = [j for j in Job.objects.filter(status__in=status_list).prefetch_related('instance_group')] inventory_updates_qs = ( @@ -125,6 +150,7 @@ def get_inventory_source_tasks(self, all_sorted_tasks): inventory_ids.add(task.inventory_id) return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)] + @timeit def spawn_workflow_graph_jobs(self, workflow_jobs): for workflow_job in workflow_jobs: if workflow_job.cancel_flag: @@ -231,7 +257,9 @@ def process_finished_workflow_jobs(self, workflow_jobs): schedule_task_manager() return result + @timeit def start_task(self, task, instance_group, dependent_tasks=None, instance=None): + self.inc_metric("tasks_started") self.start_task_limit -= 1 if self.start_task_limit == 0: # schedule another run immediately after this task manager @@ -291,6 +319,7 @@ def post_commit(): task.websocket_emit_status(task.status) # adds to on_commit connection.on_commit(post_commit) + @timeit def process_running_tasks(self, running_tasks): for task in running_tasks: self.dependency_graph.add_job(task) @@ -439,6 +468,7 @@ def gen_dep_for_inventory_update(self, inventory_task): latest_src_project_update.scm_inventory_updates.add(inventory_task) return created_dependencies + @timeit def generate_dependencies(self, undeped_tasks): created_dependencies = [] for task in undeped_tasks: @@ -453,6 +483,7 @@ def generate_dependencies(self, undeped_tasks): return created_dependencies + @timeit def process_pending_tasks(self, pending_tasks): running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()} tasks_to_update_job_explanation = [] @@ -461,6 +492,7 @@ def process_pending_tasks(self, pending_tasks): break blocked_by = self.job_blocked_by(task) if blocked_by: + self.inc_metric("tasks_blocked") task.log_lifecycle("blocked", blocked_by=blocked_by) job_explanation = gettext_noop(f"waiting for {blocked_by._meta.model_name}-{blocked_by.id} to finish") if task.job_explanation != job_explanation: @@ -602,17 +634,22 @@ def reap_jobs_from_orphaned_instances(self): def process_tasks(self, all_sorted_tasks): running_tasks = [t for t in all_sorted_tasks if t.status in ['waiting', 'running']] - self.process_running_tasks(running_tasks) + self.inc_metric("running_processed", inc_by=len(running_tasks)) pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending'] + undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed] dependencies = self.generate_dependencies(undeped_tasks) deps_of_deps = self.generate_dependencies(dependencies) dependencies += deps_of_deps self.process_pending_tasks(dependencies) + self.inc_metric("pending_processed", inc_by=len(dependencies)) + self.process_pending_tasks(pending_tasks) + self.inc_metric("pending_processed", inc_by=len(pending_tasks)) + @timeit def _schedule(self): finished_wfjs = [] all_sorted_tasks = self.get_tasks() @@ -658,4 +695,14 @@ def schedule(self): logger.debug("Starting Scheduler") with task_manager_bulk_reschedule(): self._schedule() + if not settings.IS_TESTING(sys.argv): + if self.aggregate_metrics["_schedule_seconds"] > settings.SUBSYSTEM_METRICS_TASK_MANAGER_EXECUTION_TIME: + subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False) + # clear all task manager related values to wipe out data + # from previous task manager run + task_manager_metrics = [m for m in subsystem_metrics.METRICS.keys() if m.startswith("task_manager")] + subsystem_metrics.clear_values(fields=task_manager_metrics) + for a, v in self.aggregate_metrics.items(): + subsystem_metrics.set("task_manager_" + a, v) + subsystem_metrics.pipe_execute() logger.debug("Finishing Scheduler") diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 24b4ca79ffcf..e3af5b71f2de 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -241,6 +241,9 @@ def IS_TESTING(argv=None): # Interval in seconds for saving local metrics to redis SUBSYSTEM_METRICS_INTERVAL_SAVE_TO_REDIS = 2 +# Only record stats from task manager cycles that are >= this execution time (seconds) +SUBSYSTEM_METRICS_TASK_MANAGER_EXECUTION_TIME = 15 + # The maximum allowed jobs to start on a given task manager cycle START_TASK_LIMIT = 100