Skip to content

Commit

Permalink
Merge pull request ansible#12235 from fosterseth/subsystem_metrics_ta…
Browse files Browse the repository at this point in the history
…sk_manager

Subsystem metrics for task manager
  • Loading branch information
fosterseth authored Jun 14, 2022
2 parents 9b0a2b0 + 2f82b75 commit 30c060c
Show file tree
Hide file tree
Showing 6 changed files with 377 additions and 33 deletions.
84 changes: 57 additions & 27 deletions awx/main/analytics/subsystem_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,30 @@
from awx.main.consumers import emit_channel_notification

root_key = 'awx_metrics'
logger = logging.getLogger('awx.main.wsbroadcast')
logger = logging.getLogger('awx.main.analytics')


class BaseM:
def __init__(self, field, help_text):
self.field = field
self.help_text = help_text
self.current_value = 0
self.metric_has_changed = False

def clear_value(self, conn):
def reset_value(self, conn):
conn.hset(root_key, self.field, 0)
self.current_value = 0

def inc(self, value):
self.current_value += value
self.metric_has_changed = True

def set(self, value):
self.current_value = value
self.metric_has_changed = True

def get(self):
return self.current_value

def decode(self, conn):
value = conn.hget(root_key, self.field)
Expand All @@ -34,7 +40,7 @@ def decode(self, conn):
def to_prometheus(self, instance_data):
output_text = f"# HELP {self.field} {self.help_text}\n# TYPE {self.field} gauge\n"
for instance in instance_data:
output_text += f'{self.field}{{node="{instance}"}} {instance_data[instance].get(self.field, -1)}\n' # TODO: fix because this -1 is neccessary when dealing with old instances (ex. you didn't clean up your database)
output_text += f'{self.field}{{node="{instance}"}} {instance_data[instance][self.field]}\n'
return output_text


Expand All @@ -46,8 +52,10 @@ def decode_value(self, value):
return 0.0

def store_value(self, conn):
conn.hincrbyfloat(root_key, self.field, self.current_value)
self.current_value = 0
if self.metric_has_changed:
conn.hincrbyfloat(root_key, self.field, self.current_value)
self.current_value = 0
self.metric_has_changed = False


class IntM(BaseM):
Expand All @@ -58,8 +66,10 @@ def decode_value(self, value):
return 0

def store_value(self, conn):
conn.hincrby(root_key, self.field, self.current_value)
self.current_value = 0
if self.metric_has_changed:
conn.hincrby(root_key, self.field, self.current_value)
self.current_value = 0
self.metric_has_changed = False


class SetIntM(BaseM):
Expand All @@ -70,10 +80,9 @@ def decode_value(self, value):
return 0

def store_value(self, conn):
# do not set value if it has not changed since last time this was called
if self.current_value is not None:
if self.metric_has_changed:
conn.hset(root_key, self.field, self.current_value)
self.current_value = None
self.metric_has_changed = False


class SetFloatM(SetIntM):
Expand All @@ -94,13 +103,13 @@ def __init__(self, field, help_text, buckets):
self.sum = IntM(field + '_sum', '')
super(HistogramM, self).__init__(field, help_text)

def clear_value(self, conn):
def reset_value(self, conn):
conn.hset(root_key, self.field, 0)
self.inf.clear_value(conn)
self.sum.clear_value(conn)
self.inf.reset_value(conn)
self.sum.reset_value(conn)
for b in self.buckets_to_keys.values():
b.clear_value(conn)
super(HistogramM, self).clear_value(conn)
b.reset_value(conn)
super(HistogramM, self).reset_value(conn)

def observe(self, value):
for b in self.buckets:
Expand Down Expand Up @@ -136,7 +145,7 @@ def to_prometheus(self, instance_data):


class Metrics:
def __init__(self, auto_pipe_execute=True, instance_name=None):
def __init__(self, auto_pipe_execute=False, instance_name=None):
self.pipe = redis.Redis.from_url(settings.BROKER_URL).pipeline()
self.conn = redis.Redis.from_url(settings.BROKER_URL)
self.last_pipe_execute = time.time()
Expand All @@ -152,6 +161,8 @@ def __init__(self, auto_pipe_execute=True, instance_name=None):
Instance = apps.get_model('main', 'Instance')
if instance_name:
self.instance_name = instance_name
elif settings.IS_TESTING():
self.instance_name = "awx_testing"
else:
self.instance_name = Instance.objects.me().hostname

Expand All @@ -167,10 +178,23 @@ def __init__(self, auto_pipe_execute=True, instance_name=None):
HistogramM(
'callback_receiver_batch_events_insert_db', 'Number of events batch inserted into database', settings.SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS
),
SetFloatM('callback_receiver_event_processing_avg_seconds', 'Average processing time per event per callback receiver batch'),
FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'),
IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'),
FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'),
SetFloatM('callback_receiver_event_processing_avg_seconds', 'Average processing time per event per callback receiver batch'),
SetFloatM('task_manager_get_tasks_seconds', 'Time spent in loading all tasks from db'),
SetFloatM('task_manager_start_task_seconds', 'Time spent starting task'),
SetFloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'),
SetFloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'),
SetFloatM('task_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'),
SetFloatM('task_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'),
SetFloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
IntM('task_manager_schedule_calls', 'Number of calls to task manager schedule'),
SetFloatM('task_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
SetIntM('task_manager_tasks_started', 'Number of tasks started'),
SetIntM('task_manager_running_processed', 'Number of running tasks processed'),
SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'),
SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'),
]
# turn metric list into dictionary with the metric name as a key
self.METRICS = {}
Expand All @@ -180,31 +204,37 @@ def __init__(self, auto_pipe_execute=True, instance_name=None):
# track last time metrics were sent to other nodes
self.previous_send_metrics = SetFloatM('send_metrics_time', 'Timestamp of previous send_metrics call')

def clear_values(self, fields=None):
if not fields:
fields = self.METRICS.keys()
for m in fields:
self.METRICS[m].clear_value(self.conn)
def reset_values(self):
# intended to be called once on app startup to reset all metric
# values to 0
for m in self.METRICS.values():
m.reset_value(self.conn)
self.metrics_have_changed = True
self.conn.delete(root_key + "_lock")

def inc(self, field, value):
if value != 0:
self.METRICS[field].inc(value)
self.metrics_have_changed = True
if self.auto_pipe_execute is True and self.should_pipe_execute() is True:
if self.auto_pipe_execute is True:
self.pipe_execute()

def set(self, field, value):
self.METRICS[field].set(value)
self.metrics_have_changed = True
if self.auto_pipe_execute is True and self.should_pipe_execute() is True:
if self.auto_pipe_execute is True:
self.pipe_execute()

def get(self, field):
return self.METRICS[field].get()

def decode(self, field):
return self.METRICS[field].decode(self.conn)

def observe(self, field, value):
self.METRICS[field].observe(value)
self.metrics_have_changed = True
if self.auto_pipe_execute is True and self.should_pipe_execute() is True:
if self.auto_pipe_execute is True:
self.pipe_execute()

def serialize_local_metrics(self):
Expand Down Expand Up @@ -252,8 +282,8 @@ def pipe_execute(self):

def send_metrics(self):
# more than one thread could be calling this at the same time, so should
# get acquire redis lock before sending metrics
lock = self.conn.lock(root_key + '_lock', thread_local=False)
# acquire redis lock before sending metrics
lock = self.conn.lock(root_key + '_lock')
if not lock.acquire(blocking=False):
return
try:
Expand Down
2 changes: 0 additions & 2 deletions awx/main/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

# Django
from django.conf import settings
import awx.main.analytics.subsystem_metrics as s_metrics

__all__ = ['CallbackQueueDispatcher']

Expand All @@ -28,7 +27,6 @@ def __init__(self):
self.queue = getattr(settings, 'CALLBACK_QUEUE', '')
self.logger = logging.getLogger('awx.main.queue.CallbackQueueDispatcher')
self.connection = redis.Redis.from_url(settings.BROKER_URL)
self.subsystem_metrics = s_metrics.Metrics()

def dispatch(self, obj):
self.connection.rpush(self.queue, json.dumps(obj, cls=AnsibleJSONEncoder))
62 changes: 61 additions & 1 deletion awx/main/scheduler/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import logging
import uuid
import json
import time
import sys
import signal

# Django
from django.db import transaction, connection
Expand Down Expand Up @@ -38,12 +41,24 @@
from awx.main.scheduler.dependency_graph import DependencyGraph
from awx.main.scheduler.task_manager_models import TaskManagerInstances
from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups
import awx.main.analytics.subsystem_metrics as s_metrics
from awx.main.utils import decrypt_field


logger = logging.getLogger('awx.main.scheduler')


def timeit(func):
def inner(*args, **kwargs):
t_now = time.perf_counter()
result = func(*args, **kwargs)
dur = time.perf_counter() - t_now
args[0].subsystem_metrics.inc("task_manager_" + func.__name__ + "_seconds", dur)
return result

return inner


class TaskManager:
def __init__(self):
"""
Expand All @@ -62,6 +77,13 @@ def __init__(self):
# will no longer be started and will be started on the next task manager cycle.
self.start_task_limit = settings.START_TASK_LIMIT
self.time_delta_job_explanation = timedelta(seconds=30)
self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
# initialize each metric to 0 and force metric_has_changed to true. This
# ensures each task manager metric will be overridden when pipe_execute
# is called later.
for m in self.subsystem_metrics.METRICS:
if m.startswith("task_manager"):
self.subsystem_metrics.set(m, 0)

def after_lock_init(self, all_sorted_tasks):
"""
Expand Down Expand Up @@ -100,6 +122,7 @@ def job_blocked_by(self, task):

return None

@timeit
def get_tasks(self, status_list=('pending', 'waiting', 'running')):
jobs = [j for j in Job.objects.filter(status__in=status_list).prefetch_related('instance_group')]
inventory_updates_qs = (
Expand All @@ -125,6 +148,7 @@ def get_inventory_source_tasks(self, all_sorted_tasks):
inventory_ids.add(task.inventory_id)
return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)]

@timeit
def spawn_workflow_graph_jobs(self, workflow_jobs):
for workflow_job in workflow_jobs:
if workflow_job.cancel_flag:
Expand Down Expand Up @@ -231,7 +255,9 @@ def process_finished_workflow_jobs(self, workflow_jobs):
schedule_task_manager()
return result

@timeit
def start_task(self, task, instance_group, dependent_tasks=None, instance=None):
self.subsystem_metrics.inc("task_manager_tasks_started", 1)
self.start_task_limit -= 1
if self.start_task_limit == 0:
# schedule another run immediately after this task manager
Expand Down Expand Up @@ -291,6 +317,7 @@ def post_commit():
task.websocket_emit_status(task.status) # adds to on_commit
connection.on_commit(post_commit)

@timeit
def process_running_tasks(self, running_tasks):
for task in running_tasks:
self.dependency_graph.add_job(task)
Expand Down Expand Up @@ -439,6 +466,7 @@ def gen_dep_for_inventory_update(self, inventory_task):
latest_src_project_update.scm_inventory_updates.add(inventory_task)
return created_dependencies

@timeit
def generate_dependencies(self, undeped_tasks):
created_dependencies = []
for task in undeped_tasks:
Expand All @@ -453,6 +481,7 @@ def generate_dependencies(self, undeped_tasks):

return created_dependencies

@timeit
def process_pending_tasks(self, pending_tasks):
running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()}
tasks_to_update_job_explanation = []
Expand All @@ -461,6 +490,7 @@ def process_pending_tasks(self, pending_tasks):
break
blocked_by = self.job_blocked_by(task)
if blocked_by:
self.subsystem_metrics.inc("task_manager_tasks_blocked", 1)
task.log_lifecycle("blocked", blocked_by=blocked_by)
job_explanation = gettext_noop(f"waiting for {blocked_by._meta.model_name}-{blocked_by.id} to finish")
if task.job_explanation != job_explanation:
Expand Down Expand Up @@ -602,17 +632,22 @@ def reap_jobs_from_orphaned_instances(self):

def process_tasks(self, all_sorted_tasks):
running_tasks = [t for t in all_sorted_tasks if t.status in ['waiting', 'running']]

self.process_running_tasks(running_tasks)
self.subsystem_metrics.inc("task_manager_running_processed", len(running_tasks))

pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending']

undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed]
dependencies = self.generate_dependencies(undeped_tasks)
deps_of_deps = self.generate_dependencies(dependencies)
dependencies += deps_of_deps
self.process_pending_tasks(dependencies)
self.subsystem_metrics.inc("task_manager_pending_processed", len(dependencies))

self.process_pending_tasks(pending_tasks)
self.subsystem_metrics.inc("task_manager_pending_processed", len(pending_tasks))

@timeit
def _schedule(self):
finished_wfjs = []
all_sorted_tasks = self.get_tasks()
Expand Down Expand Up @@ -648,6 +683,28 @@ def _schedule(self):
self.process_tasks(all_sorted_tasks)
return finished_wfjs

def record_aggregate_metrics(self, *args):
if not settings.IS_TESTING():
# increment task_manager_schedule_calls regardless if the other
# metrics are recorded
s_metrics.Metrics(auto_pipe_execute=True).inc("task_manager_schedule_calls", 1)
# Only record metrics if the last time recording was more
# than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago.
# Prevents a short-duration task manager that runs directly after a
# long task manager to override useful metrics.
current_time = time.time()
time_last_recorded = current_time - self.subsystem_metrics.decode("task_manager_recorded_timestamp")
if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL:
logger.debug(f"recording metrics, last recorded {time_last_recorded} seconds ago")
self.subsystem_metrics.set("task_manager_recorded_timestamp", current_time)
self.subsystem_metrics.pipe_execute()
else:
logger.debug(f"skipping recording metrics, last recorded {time_last_recorded} seconds ago")

def record_aggregate_metrics_and_exit(self, *args):
self.record_aggregate_metrics()
sys.exit(1)

def schedule(self):
# Lock
with advisory_lock('task_manager_lock', wait=False) as acquired:
Expand All @@ -657,5 +714,8 @@ def schedule(self):
return
logger.debug("Starting Scheduler")
with task_manager_bulk_reschedule():
# if sigterm due to timeout, still record metrics
signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit)
self._schedule()
self.record_aggregate_metrics()
logger.debug("Finishing Scheduler")
3 changes: 2 additions & 1 deletion awx/main/tasks/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ def dispatch_startup():
#
apply_cluster_membership_policies()
cluster_node_heartbeat()
Metrics().clear_values()
m = Metrics()
m.reset_values()

# Update Tower's rsyslog.conf file based on loggins settings in the db
reconfigure_rsyslog()
Expand Down
Loading

0 comments on commit 30c060c

Please sign in to comment.