From e982e7d6715d4490c33be89085940cd1603781aa Mon Sep 17 00:00:00 2001 From: Erik Overdahl Date: Thu, 10 Nov 2022 08:54:19 -0600 Subject: [PATCH 1/5] callback: Run callback operations in own thread Move all callback operations into an auxiliary `AraWorker` class that runs in a separate thread started by the `CallbackModule` class, which is now just a shell that puts messages into a queue for the worker thread to process. The callback plugin now does not block the main Ansible thread. --- ara/plugins/callback/ara_default.py | 231 ++++++++++++++++++++-------- 1 file changed, 166 insertions(+), 65 deletions(-) diff --git a/ara/plugins/callback/ara_default.py b/ara/plugins/callback/ara_default.py index 04017e83..1cb432c0 100644 --- a/ara/plugins/callback/ara_default.py +++ b/ara/plugins/callback/ara_default.py @@ -10,7 +10,9 @@ import os import socket import sys +import threading from concurrent.futures import ThreadPoolExecutor +from queue import Queue from ansible import __version__ as ANSIBLE_VERSION, constants as C from ansible.parsing.ajson import AnsibleJSONEncoder @@ -253,18 +255,27 @@ ) -class CallbackModule(CallbackBase): +class CallbackSend: """ - Saves data from an Ansible run into a database + Same as the one defined in ansible.executor.task_queue_manager; + should we just import it? """ - CALLBACK_VERSION = 2.0 - CALLBACK_TYPE = "awesome" - CALLBACK_NAME = "ara_default" + def __init__(self, method_name, *args, **kwargs): + self.method_name = method_name + self.args = args + self.kwargs = kwargs - def __init__(self): - super().__init__() - self.log = logging.getLogger("ara.plugins.callback.default") + +class AraQueueDone: + pass + + +class AraWorker: + def __init__(self, queue: Queue): + self.log = logging.getLogger("ara.plugins.callback.worker") + self.log.info("NEW ARA") + self.queue = queue self.localhost_hostname = None # These are configured in self.set_options self.client = None @@ -296,60 +307,14 @@ def __init__(self): self.delegation_cache = {} self.warned_about_host_length = [] - def set_options(self, task_keys=None, var_options=None, direct=None): - super().set_options(task_keys=task_keys, var_options=var_options, direct=direct) - - self.argument_labels = self.get_option("argument_labels") - self.default_labels = self.get_option("default_labels") - self.ignored_facts = self.get_option("ignored_facts") - self.ignored_arguments = self.get_option("ignored_arguments") - self.ignored_files = self.get_option("ignored_files") - self.localhost_as_hostname = self.get_option("localhost_as_hostname") - self.localhost_as_hostname_format = self.get_option("localhost_as_hostname_format") - self.record_controller = self.get_option("record_controller") - self.record_user = self.get_option("record_user") - - # The intent for the ignored_files default value is to ignore the ansible local tmpdir but the path - # can be changed by the user's configuration so retrieve that and use it instead. - # https://github.com/ansible-community/ara/issues/385 - for pattern in self.ignored_files: - if pattern == ".ansible/tmp": - tmpdir_config = os.path.dirname(C.config.get_config_value("DEFAULT_LOCAL_TMP")) - index = self.ignored_files.index(pattern) - self.ignored_files[index] = tmpdir_config - break - - client = self.get_option("api_client") - endpoint = self.get_option("api_server") - timeout = self.get_option("api_timeout") - username = self.get_option("api_username") - password = self.get_option("api_password") - cert = self.get_option("api_cert") - key = self.get_option("api_key") - ca = self.get_option("api_ca") - insecure = self.get_option("api_insecure") - - verify = False if insecure else True - if ca: - verify = ca - - self.client = client_utils.get_client( - client=client, - endpoint=endpoint, - timeout=timeout, - username=username, - password=password, - cert=cert, - key=key, - verify=verify, - ) - - # TODO: Consider un-hardcoding this and plumbing pool_maxsize to requests.adapters.HTTPAdapter. - # In the meantime default to 4 so we don't go above requests.adapters.DEFAULT_POOLSIZE. - # Otherwise we can hit "urllib3.connectionpool: Connection pool is full" - self.callback_threads = self.get_option("callback_threads") - if self.callback_threads > 4: - self.callback_threads = 4 + def run(self): + while True: + message = self.queue.get() + if isinstance(message, AraQueueDone): + self.queue.task_done() + return + method = getattr(self, message.method_name) + method(*message.args, **message.kwargs) def _submit_thread(self, threadpool, func, *args, **kwargs): # Manages whether or not the function should be threaded to keep things DRY @@ -425,7 +390,8 @@ def v2_playbook_on_start(self, playbook): # Record the playbook file self._submit_thread("global", self._get_or_create_file, path, content) - return self.playbook + # return self.playbook + self.queue.task_done() def v2_playbook_on_play_start(self, play): self.log.debug("v2_playbook_on_play_start") @@ -492,7 +458,8 @@ def v2_playbook_on_play_start(self, play): started=datetime.datetime.now(datetime.timezone.utc).isoformat(), ) - return self.play + # return self.play + self.queue.task_done() def v2_playbook_on_handler_task_start(self, task): self.log.debug("v2_playbook_on_handler_task_start") @@ -522,16 +489,19 @@ def v2_playbook_on_task_start(self, task, is_conditional, handler=False): # Get task self.task = self._get_or_create_task(task, task_file["id"], lineno, handler) - return self.task + # return self.task + self.queue.task_done() def v2_runner_on_start(self, host, task): self.log.debug("v2_runner_on_start") # v2_runner_on_start was added in 2.8 so this doesn't get run for Ansible 2.7 and below. self.result_started[host.get_name()] = datetime.datetime.now(datetime.timezone.utc).isoformat() + self.queue.task_done() def v2_runner_on_ok(self, result, **kwargs): self.log.debug("v2_runner_on_ok") self._submit_thread("task", self._load_result, result, "ok", **kwargs) + self.queue.task_done() def v2_runner_on_unreachable(self, result, **kwargs): self.log.debug("v2_runner_on_unreachable") @@ -544,6 +514,7 @@ def v2_runner_on_unreachable(self, result, **kwargs): ) self.task = self.task_cache[task_uuid] self._submit_thread("task", self._load_result, result, "unreachable", **kwargs) + self.queue.task_done() def v2_runner_on_failed(self, result, **kwargs): self.log.debug("v2_runner_on_failed") @@ -556,21 +527,26 @@ def v2_runner_on_failed(self, result, **kwargs): ) self.task = self.task_cache[task_uuid] self._submit_thread("task", self._load_result, result, "failed", **kwargs) + self.queue.task_done() def v2_runner_on_skipped(self, result, **kwargs): self.log.debug("v2_runner_on_skipped") self._submit_thread("task", self._load_result, result, "skipped", **kwargs) + self.queue.task_done() def v2_runner_item_on_ok(self, result): self.log.debug("v2_runner_item_on_ok") self._update_delegation_cache(result) + self.queue.task_done() def v2_runner_item_on_failed(self, result): self.log.debug("v2_runner_item_on_failed") self._update_delegation_cache(result) + self.queue.task_done() def v2_runner_item_on_skipped(self, result): self.log.debug("v2_runner_item_on_skipped") + self.queue.task_done() pass # result._task.delegate_to can end up being a variable from this hook, don't save it. # https://github.com/ansible/ansible/issues/75339 @@ -579,6 +555,7 @@ def v2_runner_item_on_skipped(self, result): def v2_playbook_on_include(self, included_file): self.log.debug("v2_playbook_on_include") # ara has not used this hook before, maybe we can do something with it in the future. + self.queue.task_done() pass def v2_playbook_on_stats(self, stats): @@ -587,6 +564,7 @@ def v2_playbook_on_stats(self, stats): self._end_play() self._load_stats(stats) self._end_playbook(stats) + self.queue.task_done() def _end_task(self): if self.callback_threads: @@ -855,3 +833,126 @@ def _get_user(self): pass return user + + +class CallbackModule(CallbackBase): + """ + Saves data from an Ansible run into a database + """ + + CALLBACK_VERSION = 2.0 + CALLBACK_TYPE = "awesome" + CALLBACK_NAME = "ara_default" + + def __init__(self): + super().__init__() + self.log = logging.getLogger("ara.plugins.callback.default") + self.queue = Queue() + # started at the end of set_options() + self.worker = AraWorker(queue=self.queue) + self.worker_thread = threading.Thread(target=self.worker.run) + + def __del__(self): + """ + Give the thread as long as possible to complete its work. + """ + self.queue.join() + if self.worker_thread.is_alive(): + self.worker_thread.join() + + def set_options(self, task_keys=None, var_options=None, direct=None): + super().set_options(task_keys=task_keys, var_options=var_options, direct=direct) + + self.worker.argument_labels = self.get_option("argument_labels") + self.worker.default_labels = self.get_option("default_labels") + self.worker.ignored_facts = self.get_option("ignored_facts") + self.worker.ignored_arguments = self.get_option("ignored_arguments") + self.worker.ignored_files = self.get_option("ignored_files") + self.worker.localhost_as_hostname = self.get_option("localhost_as_hostname") + self.worker.localhost_as_hostname_format = self.get_option("localhost_as_hostname_format") + self.worker.record_controller = self.get_option("record_controller") + self.worker.record_user = self.get_option("record_user") + + # The intent for the ignored_files default value is to ignore the ansible local tmpdir but the path + # can be changed by the user's configuration so retrieve that and use it instead. + # https://github.com/ansible-community/ara/issues/385 + for pattern in self.worker.ignored_files: + if pattern == ".ansible/tmp": + tmpdir_config = os.path.dirname(C.config.get_config_value("DEFAULT_LOCAL_TMP")) + index = self.worker.ignored_files.index(pattern) + self.worker.ignored_files[index] = tmpdir_config + break + + client = self.get_option("api_client") + endpoint = self.get_option("api_server") + timeout = self.get_option("api_timeout") + username = self.get_option("api_username") + password = self.get_option("api_password") + cert = self.get_option("api_cert") + key = self.get_option("api_key") + ca = self.get_option("api_ca") + insecure = self.get_option("api_insecure") + + verify = False if insecure else True + if ca: + verify = ca + + self.worker.client = client_utils.get_client( + client=client, + endpoint=endpoint, + timeout=timeout, + username=username, + password=password, + cert=cert, + key=key, + verify=verify, + ) + + # TODO: Consider un-hardcoding this and plumbing pool_maxsize to requests.adapters.HTTPAdapter. + # In the meantime default to 4 so we don't go above requests.adapters.DEFAULT_POOLSIZE. + # Otherwise we can hit "urllib3.connectionpool: Connection pool is full" + self.worker.callback_threads = min(4, self.get_option("callback_threads")) + self.worker_thread.start() + + def v2_playbook_on_start(self, playbook): + self.queue.put(CallbackSend("v2_playbook_on_start", playbook)) + + def v2_playbook_on_play_start(self, play): + self.queue.put(CallbackSend("v2_playbook_on_play_start", play)) + + def v2_playbook_on_handler_task_start(self, task): + self.queue.put(CallbackSend("v2_playbook_on_handler_task_start", task)) + + def v2_playbook_on_task_start(self, task, is_conditional, handler=False): + self.queue.put(CallbackSend("v2_playbook_on_task_start", task, is_conditional, handler)) + + def v2_runner_on_start(self, host, task): + self.queue.put(CallbackSend("v2_runner_on_start", host, task)) + + def v2_runner_on_ok(self, result, **kwargs): + self.queue.put(CallbackSend("v2_runner_on_ok", result, **kwargs)) + + def v2_runner_on_unreachable(self, result, **kwargs): + self.queue.put(CallbackSend("v2_runner_on_unreachable", result, **kwargs)) + + def v2_runner_on_failed(self, result, **kwargs): + self.queue.put(CallbackSend("v2_runner_on_failed", result, **kwargs)) + + def v2_runner_on_skipped(self, result, **kwargs): + self.queue.put(CallbackSend("v2_runner_on_skipped", result, **kwargs)) + + def v2_runner_item_on_ok(self, result): + self.queue.put(CallbackSend("v2_runner_item_on_ok", result)) + + def v2_runner_item_on_failed(self, result): + self.queue.put(CallbackSend("v2_runner_item_on_failed", result)) + + def v2_runner_item_on_skipped(self, result): + self.queue.put(CallbackSend("v2_runner_item_on_skipped", result)) + + def v2_playbook_on_include(self, included_file): + self.queue.put(CallbackSend("v2_playbook_on_include", included_file)) + + def v2_playbook_on_stats(self, stats): + self.queue.put(CallbackSend("v2_playbook_on_stats", stats)) + self.queue.put(AraQueueDone()) From 1bb67b4732bd6e69bc28379cade1f7f605454608 Mon Sep 17 00:00:00 2001 From: Erik Overdahl Date: Fri, 11 Nov 2022 19:38:39 -0600 Subject: [PATCH 2/5] callback: Avoid race condition on play files --- ara/plugins/callback/ara_default.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ara/plugins/callback/ara_default.py b/ara/plugins/callback/ara_default.py index 1cb432c0..2d48d641 100644 --- a/ara/plugins/callback/ara_default.py +++ b/ara/plugins/callback/ara_default.py @@ -432,7 +432,10 @@ def v2_playbook_on_play_start(self, play): self._submit_thread("global", self._set_playbook_labels, labels) # Record all the files involved in the play - for path in play._loader._FILE_CACHE.keys(): + # make a list of the keys and iterate that for thread safety + # avoiding `RuntimeError: dictionary changed size during iteration` + play_files = list(play._loader._FILE_CACHE.keys()) + for path in play_files: # The cache can be pre-populated with files that aren't relevant to the playbook report # If there are matches that should be ignored here, don't record them at all ignored = False From 6397d24cdfb1c2abe28b0040cb6c18e6709479c8 Mon Sep 17 00:00:00 2001 From: Erik Overdahl Date: Mon, 14 Nov 2022 18:23:10 -0600 Subject: [PATCH 3/5] callback: Remove unecessary cleanup Running the thread as "daemon" makes the previous cleanup unecessary. --- ara/plugins/callback/ara_default.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/ara/plugins/callback/ara_default.py b/ara/plugins/callback/ara_default.py index 2d48d641..dabda9e2 100644 --- a/ara/plugins/callback/ara_default.py +++ b/ara/plugins/callback/ara_default.py @@ -855,14 +855,6 @@ def __init__(self): self.worker = AraWorker(queue=self.queue) self.worker_thread = threading.Thread(target=self.worker.run) - def __del__(self): - """ - Give the thread as long as possible to complete its work. - """ - self.queue.join() - if self.worker_thread.is_alive(): - self.worker_thread.join() - def set_options(self, task_keys=None, var_options=None, direct=None): super().set_options(task_keys=task_keys, var_options=var_options, direct=direct) From 18107922bd4aaf00cff32f30038232c9a436bc5b Mon Sep 17 00:00:00 2001 From: Erik Overdahl Date: Tue, 15 Nov 2022 12:07:07 -0600 Subject: [PATCH 4/5] callback: Make worker thread a daemon Prevents program from exiting before all ARA work is finished. (meant to commit this earlier - whoops!) --- ara/plugins/callback/ara_default.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ara/plugins/callback/ara_default.py b/ara/plugins/callback/ara_default.py index dabda9e2..22fcb024 100644 --- a/ara/plugins/callback/ara_default.py +++ b/ara/plugins/callback/ara_default.py @@ -853,7 +853,7 @@ def __init__(self): self.queue = Queue() # started at the end of set_options() self.worker = AraWorker(queue=self.queue) - self.worker_thread = threading.Thread(target=self.worker.run) + self.worker_thread = threading.Thread(target=self.worker.run, daemon=True) def set_options(self, task_keys=None, var_options=None, direct=None): super().set_options(task_keys=task_keys, var_options=var_options, direct=direct) From 5c66638260ce9f219b8761a00f9a373717ceac60 Mon Sep 17 00:00:00 2001 From: Erik Overdahl Date: Fri, 18 Nov 2022 14:16:01 -0600 Subject: [PATCH 5/5] callback: Ensure all requests are recorded Even if the user cancels the run. Brings back the `__del__` method on CallbackModule removed in 6397d24 --- ara/plugins/callback/ara_default.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/ara/plugins/callback/ara_default.py b/ara/plugins/callback/ara_default.py index 22fcb024..306d74e5 100644 --- a/ara/plugins/callback/ara_default.py +++ b/ara/plugins/callback/ara_default.py @@ -855,6 +855,18 @@ def __init__(self): self.worker = AraWorker(queue=self.queue) self.worker_thread = threading.Thread(target=self.worker.run, daemon=True) + def __del__(self): + outstanding = self.queue.qsize() + if outstanding > 0: + self._display.display("ARA: Waiting for all requests to finish (about %d)..." % outstanding) + self.queue.join() + if self.worker_thread.is_alive(): + if self.worker.global_threads: + self.worker.global_threads.shutdown() + if self.worker.task_threads: + self.worker.task_threads.shutdown() + self._display.display("ARA: Done!") + def set_options(self, task_keys=None, var_options=None, direct=None): super().set_options(task_keys=task_keys, var_options=var_options, direct=direct)