Skip to content

Commit

Permalink
Add subsystem metrics for task manager
Browse files Browse the repository at this point in the history
  • Loading branch information
fosterseth committed May 30, 2022
1 parent d36befd commit ce0b28e
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 4 deletions.
19 changes: 16 additions & 3 deletions awx/main/analytics/subsystem_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,17 @@ def __init__(self, auto_pipe_execute=True, instance_name=None):
FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'),
IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'),
FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'),
FloatM('task_manager_get_tasks_seconds', 'Time spent in loading all tasks from db'),
FloatM('task_manager_start_task_seconds', 'Time spent starting task'),
FloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'),
FloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'),
FloatM('task_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'),
FloatM('task_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'),
FloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
FloatM('task_manager_tasks_started', 'Number of tasks started'),
FloatM('task_manager_running_processed', 'Number of running tasks processed'),
FloatM('task_manager_pending_processed', 'Number of pending tasks processed'),
FloatM('task_manager_tasks_blocked', 'Number of tasks blocked from running'),
]
# turn metric list into dictionary with the metric name as a key
self.METRICS = {}
Expand All @@ -179,9 +190,11 @@ def __init__(self, auto_pipe_execute=True, instance_name=None):
# track last time metrics were sent to other nodes
self.previous_send_metrics = SetFloatM('send_metrics_time', 'Timestamp of previous send_metrics call')

def clear_values(self):
for m in self.METRICS.values():
m.clear_value(self.conn)
def clear_values(self, fields=None):
if not fields:
fields = self.METRICS.keys()
for m in fields:
self.METRICS[m].clear_value(self.conn)
self.metrics_have_changed = True
self.conn.delete(root_key + "_lock")

Expand Down
49 changes: 48 additions & 1 deletion awx/main/scheduler/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import logging
import uuid
import json
import time
import sys

# Django
from django.db import transaction, connection
Expand Down Expand Up @@ -38,12 +40,27 @@
from awx.main.scheduler.dependency_graph import DependencyGraph
from awx.main.scheduler.task_manager_models import TaskManagerInstances
from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups
import awx.main.analytics.subsystem_metrics as s_metrics
from awx.main.utils import decrypt_field


logger = logging.getLogger('awx.main.scheduler')


def timeit(func):
def inner(*args, **kwargs):
t_now = time.perf_counter()
result = func(*args, **kwargs)
dur = time.perf_counter() - t_now
if func.__name__ not in args[0].aggregate_metrics:
args[0].aggregate_metrics[func.__name__ + "_seconds"] = dur
else:
args[0].aggregate_metrics[func.__name__ + "_seconds"] += dur
return result

return inner


class TaskManager:
def __init__(self):
"""
Expand All @@ -62,6 +79,13 @@ def __init__(self):
# will no longer be started and will be started on the next task manager cycle.
self.start_task_limit = settings.START_TASK_LIMIT
self.time_delta_job_explanation = timedelta(seconds=30)
self.aggregate_metrics = {} # for recording subsystem metrics

def inc_metric(self, metric, inc_by=1):
if metric not in self.aggregate_metrics:
self.aggregate_metrics[metric] = inc_by
else:
self.aggregate_metrics[metric] += inc_by

def after_lock_init(self, all_sorted_tasks):
"""
Expand Down Expand Up @@ -100,6 +124,7 @@ def job_blocked_by(self, task):

return None

@timeit
def get_tasks(self, status_list=('pending', 'waiting', 'running')):
jobs = [j for j in Job.objects.filter(status__in=status_list).prefetch_related('instance_group')]
inventory_updates_qs = (
Expand All @@ -125,6 +150,7 @@ def get_inventory_source_tasks(self, all_sorted_tasks):
inventory_ids.add(task.inventory_id)
return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)]

@timeit
def spawn_workflow_graph_jobs(self, workflow_jobs):
for workflow_job in workflow_jobs:
if workflow_job.cancel_flag:
Expand Down Expand Up @@ -231,7 +257,9 @@ def process_finished_workflow_jobs(self, workflow_jobs):
schedule_task_manager()
return result

@timeit
def start_task(self, task, instance_group, dependent_tasks=None, instance=None):
self.inc_metric("tasks_started")
self.start_task_limit -= 1
if self.start_task_limit == 0:
# schedule another run immediately after this task manager
Expand Down Expand Up @@ -291,6 +319,7 @@ def post_commit():
task.websocket_emit_status(task.status) # adds to on_commit
connection.on_commit(post_commit)

@timeit
def process_running_tasks(self, running_tasks):
for task in running_tasks:
self.dependency_graph.add_job(task)
Expand Down Expand Up @@ -439,6 +468,7 @@ def gen_dep_for_inventory_update(self, inventory_task):
latest_src_project_update.scm_inventory_updates.add(inventory_task)
return created_dependencies

@timeit
def generate_dependencies(self, undeped_tasks):
created_dependencies = []
for task in undeped_tasks:
Expand All @@ -453,6 +483,7 @@ def generate_dependencies(self, undeped_tasks):

return created_dependencies

@timeit
def process_pending_tasks(self, pending_tasks):
running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()}
tasks_to_update_job_explanation = []
Expand All @@ -461,6 +492,7 @@ def process_pending_tasks(self, pending_tasks):
break
blocked_by = self.job_blocked_by(task)
if blocked_by:
self.inc_metric("tasks_blocked")
task.log_lifecycle("blocked", blocked_by=blocked_by)
job_explanation = gettext_noop(f"waiting for {blocked_by._meta.model_name}-{blocked_by.id} to finish")
if task.job_explanation != job_explanation:
Expand Down Expand Up @@ -602,17 +634,22 @@ def reap_jobs_from_orphaned_instances(self):

def process_tasks(self, all_sorted_tasks):
running_tasks = [t for t in all_sorted_tasks if t.status in ['waiting', 'running']]

self.process_running_tasks(running_tasks)
self.inc_metric("running_processed", inc_by=len(running_tasks))

pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending']

undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed]
dependencies = self.generate_dependencies(undeped_tasks)
deps_of_deps = self.generate_dependencies(dependencies)
dependencies += deps_of_deps
self.process_pending_tasks(dependencies)
self.inc_metric("pending_processed", inc_by=len(dependencies))

self.process_pending_tasks(pending_tasks)
self.inc_metric("pending_processed", inc_by=len(pending_tasks))

@timeit
def _schedule(self):
finished_wfjs = []
all_sorted_tasks = self.get_tasks()
Expand Down Expand Up @@ -658,4 +695,14 @@ def schedule(self):
logger.debug("Starting Scheduler")
with task_manager_bulk_reschedule():
self._schedule()
if not settings.IS_TESTING(sys.argv):
if self.aggregate_metrics["_schedule_seconds"] > settings.SUBSYSTEM_METRICS_TASK_MANAGER_EXECUTION_TIME:
subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
# clear all task manager related values to wipe out data
# from previous task manager run
task_manager_metrics = [m for m in subsystem_metrics.METRICS.keys() if m.startswith("task_manager")]
subsystem_metrics.clear_values(fields=task_manager_metrics)
for a, v in self.aggregate_metrics.items():
subsystem_metrics.set("task_manager_" + a, v)
subsystem_metrics.pipe_execute()
logger.debug("Finishing Scheduler")
3 changes: 3 additions & 0 deletions awx/settings/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ def IS_TESTING(argv=None):
# Interval in seconds for saving local metrics to redis
SUBSYSTEM_METRICS_INTERVAL_SAVE_TO_REDIS = 2

# Only record stats from task manager cycles that are >= this execution time (seconds)
SUBSYSTEM_METRICS_TASK_MANAGER_EXECUTION_TIME = 15

# The maximum allowed jobs to start on a given task manager cycle
START_TASK_LIMIT = 100

Expand Down

0 comments on commit ce0b28e

Please sign in to comment.