Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subsystem metrics for task manager #12235

Merged
merged 1 commit into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 57 additions & 27 deletions awx/main/analytics/subsystem_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,30 @@
from awx.main.consumers import emit_channel_notification

root_key = 'awx_metrics'
logger = logging.getLogger('awx.main.wsbroadcast')
logger = logging.getLogger('awx.main.analytics')


class BaseM:
def __init__(self, field, help_text):
self.field = field
self.help_text = help_text
self.current_value = 0
self.metric_has_changed = False

def clear_value(self, conn):
def reset_value(self, conn):
conn.hset(root_key, self.field, 0)
self.current_value = 0

def inc(self, value):
self.current_value += value
self.metric_has_changed = True

def set(self, value):
self.current_value = value
self.metric_has_changed = True

def get(self):
return self.current_value

def decode(self, conn):
value = conn.hget(root_key, self.field)
Expand All @@ -34,7 +40,7 @@ def decode(self, conn):
def to_prometheus(self, instance_data):
output_text = f"# HELP {self.field} {self.help_text}\n# TYPE {self.field} gauge\n"
for instance in instance_data:
output_text += f'{self.field}{{node="{instance}"}} {instance_data[instance].get(self.field, -1)}\n' # TODO: fix because this -1 is neccessary when dealing with old instances (ex. you didn't clean up your database)
output_text += f'{self.field}{{node="{instance}"}} {instance_data[instance][self.field]}\n'
return output_text


Expand All @@ -46,8 +52,10 @@ def decode_value(self, value):
return 0.0

def store_value(self, conn):
conn.hincrbyfloat(root_key, self.field, self.current_value)
self.current_value = 0
if self.metric_has_changed:
conn.hincrbyfloat(root_key, self.field, self.current_value)
self.current_value = 0
self.metric_has_changed = False


class IntM(BaseM):
Expand All @@ -58,8 +66,10 @@ def decode_value(self, value):
return 0

def store_value(self, conn):
conn.hincrby(root_key, self.field, self.current_value)
self.current_value = 0
if self.metric_has_changed:
conn.hincrby(root_key, self.field, self.current_value)
self.current_value = 0
self.metric_has_changed = False


class SetIntM(BaseM):
Expand All @@ -70,10 +80,9 @@ def decode_value(self, value):
return 0

def store_value(self, conn):
# do not set value if it has not changed since last time this was called
if self.current_value is not None:
if self.metric_has_changed:
conn.hset(root_key, self.field, self.current_value)
self.current_value = None
self.metric_has_changed = False


class SetFloatM(SetIntM):
fosterseth marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -94,13 +103,13 @@ def __init__(self, field, help_text, buckets):
self.sum = IntM(field + '_sum', '')
super(HistogramM, self).__init__(field, help_text)

def clear_value(self, conn):
def reset_value(self, conn):
conn.hset(root_key, self.field, 0)
self.inf.clear_value(conn)
self.sum.clear_value(conn)
self.inf.reset_value(conn)
self.sum.reset_value(conn)
for b in self.buckets_to_keys.values():
b.clear_value(conn)
super(HistogramM, self).clear_value(conn)
b.reset_value(conn)
super(HistogramM, self).reset_value(conn)

def observe(self, value):
for b in self.buckets:
Expand Down Expand Up @@ -136,7 +145,7 @@ def to_prometheus(self, instance_data):


class Metrics:
def __init__(self, auto_pipe_execute=True, instance_name=None):
def __init__(self, auto_pipe_execute=False, instance_name=None):
self.pipe = redis.Redis.from_url(settings.BROKER_URL).pipeline()
self.conn = redis.Redis.from_url(settings.BROKER_URL)
self.last_pipe_execute = time.time()
Expand All @@ -152,6 +161,8 @@ def __init__(self, auto_pipe_execute=True, instance_name=None):
Instance = apps.get_model('main', 'Instance')
if instance_name:
self.instance_name = instance_name
elif settings.IS_TESTING():
self.instance_name = "awx_testing"
else:
self.instance_name = Instance.objects.me().hostname

Expand All @@ -167,10 +178,23 @@ def __init__(self, auto_pipe_execute=True, instance_name=None):
HistogramM(
'callback_receiver_batch_events_insert_db', 'Number of events batch inserted into database', settings.SUBSYSTEM_METRICS_BATCH_INSERT_BUCKETS
),
SetFloatM('callback_receiver_event_processing_avg_seconds', 'Average processing time per event per callback receiver batch'),
FloatM('subsystem_metrics_pipe_execute_seconds', 'Time spent saving metrics to redis'),
IntM('subsystem_metrics_pipe_execute_calls', 'Number of calls to pipe_execute'),
FloatM('subsystem_metrics_send_metrics_seconds', 'Time spent sending metrics to other nodes'),
SetFloatM('callback_receiver_event_processing_avg_seconds', 'Average processing time per event per callback receiver batch'),
SetFloatM('task_manager_get_tasks_seconds', 'Time spent in loading all tasks from db'),
SetFloatM('task_manager_start_task_seconds', 'Time spent starting task'),
SetFloatM('task_manager_process_running_tasks_seconds', 'Time spent processing running tasks'),
SetFloatM('task_manager_process_pending_tasks_seconds', 'Time spent processing pending tasks'),
SetFloatM('task_manager_generate_dependencies_seconds', 'Time spent generating dependencies for pending tasks'),
SetFloatM('task_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow jobs'),
SetFloatM('task_manager__schedule_seconds', 'Time spent in running the entire _schedule'),
IntM('task_manager_schedule_calls', 'Number of calls to task manager schedule'),
SetFloatM('task_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
SetIntM('task_manager_tasks_started', 'Number of tasks started'),
SetIntM('task_manager_running_processed', 'Number of running tasks processed'),
SetIntM('task_manager_pending_processed', 'Number of pending tasks processed'),
SetIntM('task_manager_tasks_blocked', 'Number of tasks blocked from running'),
]
# turn metric list into dictionary with the metric name as a key
self.METRICS = {}
Expand All @@ -180,31 +204,37 @@ def __init__(self, auto_pipe_execute=True, instance_name=None):
# track last time metrics were sent to other nodes
self.previous_send_metrics = SetFloatM('send_metrics_time', 'Timestamp of previous send_metrics call')

def clear_values(self, fields=None):
if not fields:
fields = self.METRICS.keys()
for m in fields:
self.METRICS[m].clear_value(self.conn)
def reset_values(self):
# intended to be called once on app startup to reset all metric
# values to 0
for m in self.METRICS.values():
m.reset_value(self.conn)
self.metrics_have_changed = True
self.conn.delete(root_key + "_lock")

def inc(self, field, value):
if value != 0:
self.METRICS[field].inc(value)
self.metrics_have_changed = True
if self.auto_pipe_execute is True and self.should_pipe_execute() is True:
if self.auto_pipe_execute is True:
self.pipe_execute()

def set(self, field, value):
self.METRICS[field].set(value)
self.metrics_have_changed = True
if self.auto_pipe_execute is True and self.should_pipe_execute() is True:
if self.auto_pipe_execute is True:
self.pipe_execute()

def get(self, field):
return self.METRICS[field].get()

def decode(self, field):
return self.METRICS[field].decode(self.conn)

def observe(self, field, value):
self.METRICS[field].observe(value)
self.metrics_have_changed = True
if self.auto_pipe_execute is True and self.should_pipe_execute() is True:
if self.auto_pipe_execute is True:
self.pipe_execute()

def serialize_local_metrics(self):
Expand Down Expand Up @@ -252,8 +282,8 @@ def pipe_execute(self):

def send_metrics(self):
# more than one thread could be calling this at the same time, so should
# get acquire redis lock before sending metrics
lock = self.conn.lock(root_key + '_lock', thread_local=False)
# acquire redis lock before sending metrics
lock = self.conn.lock(root_key + '_lock')
if not lock.acquire(blocking=False):
return
try:
Expand Down
2 changes: 0 additions & 2 deletions awx/main/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

# Django
from django.conf import settings
import awx.main.analytics.subsystem_metrics as s_metrics

__all__ = ['CallbackQueueDispatcher']

Expand All @@ -28,7 +27,6 @@ def __init__(self):
self.queue = getattr(settings, 'CALLBACK_QUEUE', '')
self.logger = logging.getLogger('awx.main.queue.CallbackQueueDispatcher')
self.connection = redis.Redis.from_url(settings.BROKER_URL)
self.subsystem_metrics = s_metrics.Metrics()

def dispatch(self, obj):
self.connection.rpush(self.queue, json.dumps(obj, cls=AnsibleJSONEncoder))
62 changes: 61 additions & 1 deletion awx/main/scheduler/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import logging
import uuid
import json
import time
import sys
import signal

# Django
from django.db import transaction, connection
Expand Down Expand Up @@ -38,12 +41,24 @@
from awx.main.scheduler.dependency_graph import DependencyGraph
from awx.main.scheduler.task_manager_models import TaskManagerInstances
from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups
import awx.main.analytics.subsystem_metrics as s_metrics
from awx.main.utils import decrypt_field


logger = logging.getLogger('awx.main.scheduler')


def timeit(func):
def inner(*args, **kwargs):
t_now = time.perf_counter()
result = func(*args, **kwargs)
dur = time.perf_counter() - t_now
args[0].subsystem_metrics.inc("task_manager_" + func.__name__ + "_seconds", dur)
return result

return inner


class TaskManager:
def __init__(self):
"""
Expand All @@ -62,6 +77,13 @@ def __init__(self):
# will no longer be started and will be started on the next task manager cycle.
self.start_task_limit = settings.START_TASK_LIMIT
self.time_delta_job_explanation = timedelta(seconds=30)
self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
# initialize each metric to 0 and force metric_has_changed to true. This
# ensures each task manager metric will be overridden when pipe_execute
# is called later.
for m in self.subsystem_metrics.METRICS:
if m.startswith("task_manager"):
self.subsystem_metrics.set(m, 0)

def after_lock_init(self, all_sorted_tasks):
"""
Expand Down Expand Up @@ -100,6 +122,7 @@ def job_blocked_by(self, task):

return None

@timeit
def get_tasks(self, status_list=('pending', 'waiting', 'running')):
jobs = [j for j in Job.objects.filter(status__in=status_list).prefetch_related('instance_group')]
inventory_updates_qs = (
Expand All @@ -125,6 +148,7 @@ def get_inventory_source_tasks(self, all_sorted_tasks):
inventory_ids.add(task.inventory_id)
return [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)]

@timeit
def spawn_workflow_graph_jobs(self, workflow_jobs):
for workflow_job in workflow_jobs:
if workflow_job.cancel_flag:
Expand Down Expand Up @@ -231,7 +255,9 @@ def process_finished_workflow_jobs(self, workflow_jobs):
schedule_task_manager()
return result

@timeit
def start_task(self, task, instance_group, dependent_tasks=None, instance=None):
self.subsystem_metrics.inc("task_manager_tasks_started", 1)
self.start_task_limit -= 1
if self.start_task_limit == 0:
# schedule another run immediately after this task manager
Expand Down Expand Up @@ -291,6 +317,7 @@ def post_commit():
task.websocket_emit_status(task.status) # adds to on_commit
connection.on_commit(post_commit)

@timeit
def process_running_tasks(self, running_tasks):
for task in running_tasks:
self.dependency_graph.add_job(task)
Expand Down Expand Up @@ -439,6 +466,7 @@ def gen_dep_for_inventory_update(self, inventory_task):
latest_src_project_update.scm_inventory_updates.add(inventory_task)
return created_dependencies

@timeit
def generate_dependencies(self, undeped_tasks):
created_dependencies = []
for task in undeped_tasks:
Expand All @@ -453,6 +481,7 @@ def generate_dependencies(self, undeped_tasks):

return created_dependencies

@timeit
def process_pending_tasks(self, pending_tasks):
running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()}
tasks_to_update_job_explanation = []
Expand All @@ -461,6 +490,7 @@ def process_pending_tasks(self, pending_tasks):
break
blocked_by = self.job_blocked_by(task)
if blocked_by:
self.subsystem_metrics.inc("task_manager_tasks_blocked", 1)
task.log_lifecycle("blocked", blocked_by=blocked_by)
job_explanation = gettext_noop(f"waiting for {blocked_by._meta.model_name}-{blocked_by.id} to finish")
if task.job_explanation != job_explanation:
Expand Down Expand Up @@ -602,17 +632,22 @@ def reap_jobs_from_orphaned_instances(self):

def process_tasks(self, all_sorted_tasks):
running_tasks = [t for t in all_sorted_tasks if t.status in ['waiting', 'running']]

self.process_running_tasks(running_tasks)
self.subsystem_metrics.inc("task_manager_running_processed", len(running_tasks))

pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending']

undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed]
dependencies = self.generate_dependencies(undeped_tasks)
deps_of_deps = self.generate_dependencies(dependencies)
dependencies += deps_of_deps
self.process_pending_tasks(dependencies)
self.subsystem_metrics.inc("task_manager_pending_processed", len(dependencies))

self.process_pending_tasks(pending_tasks)
self.subsystem_metrics.inc("task_manager_pending_processed", len(pending_tasks))

@timeit
def _schedule(self):
finished_wfjs = []
all_sorted_tasks = self.get_tasks()
Expand Down Expand Up @@ -648,6 +683,28 @@ def _schedule(self):
self.process_tasks(all_sorted_tasks)
return finished_wfjs

def record_aggregate_metrics(self, *args):
if not settings.IS_TESTING():
# increment task_manager_schedule_calls regardless if the other
# metrics are recorded
s_metrics.Metrics(auto_pipe_execute=True).inc("task_manager_schedule_calls", 1)
# Only record metrics if the last time recording was more
# than SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL ago.
# Prevents a short-duration task manager that runs directly after a
# long task manager to override useful metrics.
current_time = time.time()
time_last_recorded = current_time - self.subsystem_metrics.decode("task_manager_recorded_timestamp")
if time_last_recorded > settings.SUBSYSTEM_METRICS_TASK_MANAGER_RECORD_INTERVAL:
logger.debug(f"recording metrics, last recorded {time_last_recorded} seconds ago")
self.subsystem_metrics.set("task_manager_recorded_timestamp", current_time)
self.subsystem_metrics.pipe_execute()
else:
logger.debug(f"skipping recording metrics, last recorded {time_last_recorded} seconds ago")

def record_aggregate_metrics_and_exit(self, *args):
self.record_aggregate_metrics()
sys.exit(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this sys.exit(1) do to the transaction? I think it rolls it back right? I assume that would be the same as the current behavior. Right? Probably.


def schedule(self):
# Lock
with advisory_lock('task_manager_lock', wait=False) as acquired:
Expand All @@ -657,5 +714,8 @@ def schedule(self):
return
logger.debug("Starting Scheduler")
with task_manager_bulk_reschedule():
# if sigterm due to timeout, still record metrics
signal.signal(signal.SIGTERM, self.record_aggregate_metrics_and_exit)
self._schedule()
self.record_aggregate_metrics()
logger.debug("Finishing Scheduler")
3 changes: 2 additions & 1 deletion awx/main/tasks/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ def dispatch_startup():
#
apply_cluster_membership_policies()
cluster_node_heartbeat()
Metrics().clear_values()
m = Metrics()
m.reset_values()

# Update Tower's rsyslog.conf file based on loggins settings in the db
reconfigure_rsyslog()
Expand Down
Loading