diff --git a/.github/workflows/push-pr_workflow.yml b/.github/workflows/push-pr_workflow.yml index e2d9e164e..bef6f8608 100644 --- a/.github/workflows/push-pr_workflow.yml +++ b/.github/workflows/push-pr_workflow.yml @@ -129,7 +129,7 @@ jobs: - name: Run pytest over unit test suite run: | - python3 -m pytest tests/unit/ + python3 -m pytest -v --order-scope=module tests/unit/ - name: Run integration test suite for local tests run: | diff --git a/CHANGELOG.md b/CHANGELOG.md index 9db50369a..bf0074e9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Pytest fixtures in the `conftest.py` file of the integration test suite - NOTE: an export command `export LC_ALL='C'` had to be added to fix a bug in the WEAVE CI. This can be removed when we resolve this issue for the `merlin server` command - Tests for the `celeryadapter.py` module +- New CeleryTestWorkersManager context to help with starting/stopping workers for tests + +### Fixed +- The `merlin status` command so that it's consistent in its output whether using redis or rabbitmq as the broker +- The `merlin monitor` command will now keep an allocation up if the queues are empty and workers are still processing tasks ## [1.11.1] ### Fixed diff --git a/Makefile b/Makefile index 2f9db031b..b669d51b1 100644 --- a/Makefile +++ b/Makefile @@ -87,7 +87,7 @@ install-dev: virtualenv install-merlin install-workflow-deps # tests require a valid dev install of merlin unit-tests: . $(VENV)/bin/activate; \ - $(PYTHON) -m pytest $(UNIT); \ + $(PYTHON) -m pytest -v --order-scope=module $(UNIT); \ # run CLI tests - these require an active install of merlin in a venv @@ -135,9 +135,9 @@ check-flake8: check-black: . $(VENV)/bin/activate; \ - $(PYTHON) -m black --check --line-length $(MAX_LINE_LENGTH) --target-version py36 $(MRLN); \ - $(PYTHON) -m black --check --line-length $(MAX_LINE_LENGTH) --target-version py36 $(TEST); \ - $(PYTHON) -m black --check --line-length $(MAX_LINE_LENGTH) --target-version py36 *.py; \ + $(PYTHON) -m black --check --line-length $(MAX_LINE_LENGTH) --target-version py38 $(MRLN); \ + $(PYTHON) -m black --check --line-length $(MAX_LINE_LENGTH) --target-version py38 $(TEST); \ + $(PYTHON) -m black --check --line-length $(MAX_LINE_LENGTH) --target-version py38 *.py; \ check-isort: @@ -179,9 +179,9 @@ fix-style: $(PYTHON) -m isort -w $(MAX_LINE_LENGTH) $(MRLN); \ $(PYTHON) -m isort -w $(MAX_LINE_LENGTH) $(TEST); \ $(PYTHON) -m isort -w $(MAX_LINE_LENGTH) *.py; \ - $(PYTHON) -m black --target-version py36 -l $(MAX_LINE_LENGTH) $(MRLN); \ - $(PYTHON) -m black --target-version py36 -l $(MAX_LINE_LENGTH) $(TEST); \ - $(PYTHON) -m black --target-version py36 -l $(MAX_LINE_LENGTH) *.py; \ + $(PYTHON) -m black --target-version py38 -l $(MAX_LINE_LENGTH) $(MRLN); \ + $(PYTHON) -m black --target-version py38 -l $(MAX_LINE_LENGTH) $(TEST); \ + $(PYTHON) -m black --target-version py38 -l $(MAX_LINE_LENGTH) *.py; \ # Increment the Merlin version. USE ONLY ON DEVELOP BEFORE MERGING TO MASTER. diff --git a/docs/source/merlin_commands.rst b/docs/source/merlin_commands.rst index cb9b8eefb..1baa0e7a5 100644 --- a/docs/source/merlin_commands.rst +++ b/docs/source/merlin_commands.rst @@ -110,8 +110,15 @@ Monitor (``merlin monitor``) Batch submission scripts may not keep the batch allocation alive if there is not a blocking process in the submission script. The ``merlin monitor`` command addresses this by providing a blocking process that -checks for tasks in the queues every (sleep) seconds. When the queues are empty, the -blocking process will exit and allow the allocation to end. +checks for tasks in the queues every (sleep) seconds. When the queues are empty, the +monitor will query celery to see if any workers are still processing tasks from the +queues. If no workers are processing any tasks from the queues and the queues are empty, +the blocking process will exit and allow the allocation to end. + +The ``monitor`` function will check for celery workers for up to +10*(sleep) seconds before monitoring begins. The loop happens when the +queue(s) in the spec contain tasks, but no running workers are detected. +This is to protect against a failed worker launch. .. code:: bash @@ -129,11 +136,6 @@ for workers. The default is 60 seconds. The only currently available option for ``--task_server`` is celery, which is the default when this flag is excluded. -The ``monitor`` function will check for celery workers for up to -10*(sleep) seconds before monitoring begins. The loop happens when the -queue(s) in the spec contain tasks, but no running workers are detected. -This is to protect against a failed worker launch. - Purging Tasks (``merlin purge``) -------------------------------- diff --git a/merlin/exceptions/__init__.py b/merlin/exceptions/__init__.py index 72d5a1521..9bc7803a9 100644 --- a/merlin/exceptions/__init__.py +++ b/merlin/exceptions/__init__.py @@ -42,6 +42,7 @@ "HardFailException", "InvalidChainException", "RestartException", + "NoWorkersException", ) @@ -92,3 +93,13 @@ class RestartException(Exception): def __init__(self): super().__init__() + + +class NoWorkersException(Exception): + """ + Exception to signal that no workers were started + to process a non-empty queue(s). + """ + + def __init__(self, message): + super().__init__(message) diff --git a/merlin/main.py b/merlin/main.py index 55496a72c..0ee2e36ce 100644 --- a/merlin/main.py +++ b/merlin/main.py @@ -256,8 +256,9 @@ def query_status(args): print(banner_small) spec, _ = get_merlin_spec_with_override(args) ret = router.query_status(args.task_server, spec, args.steps) - for name, jobs, consumers in ret: - print(f"{name:30} - Workers: {consumers:10} - Queued Tasks: {jobs:10}") + + for name, queue_info in ret.items(): + print(f"{name:30} - Workers: {queue_info['consumers']:10} - Queued Tasks: {queue_info['jobs']:10}") if args.csv is not None: router.dump_status(ret, args.csv) @@ -353,8 +354,13 @@ def process_monitor(args): """ LOG.info("Monitor: checking queues ...") spec, _ = get_merlin_spec_with_override(args) + + # Give the user time to queue up jobs in case they haven't already + time.sleep(args.sleep) + + # Check if we still need our allocation while router.check_merlin_status(args, spec): - LOG.info("Monitor: found tasks in queues") + LOG.info("Monitor: found tasks in queues and/or tasks being processed") time.sleep(args.sleep) LOG.info("Monitor: ... stop condition met") diff --git a/merlin/router.py b/merlin/router.py index 01a10aae7..6c90c1d80 100644 --- a/merlin/router.py +++ b/merlin/router.py @@ -39,9 +39,13 @@ import os import time from datetime import datetime +from typing import Dict, List +from merlin.exceptions import NoWorkersException from merlin.study.celeryadapter import ( + check_celery_workers_processing, create_celery_config, + get_active_celery_queues, get_workers_from_app, purge_celery_tasks, query_celery_queues, @@ -151,12 +155,12 @@ def dump_status(query_return, csv_file): with open(csv_file, mode=fmode) as f: # pylint: disable=W1514,C0103 if f.mode == "w": # add the header f.write("# time") - for name, job, consumer in query_return: + for name in query_return: f.write(f",{name}:tasks,{name}:consumers") f.write("\n") f.write(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") - for name, job, consumer in query_return: - f.write(f",{job},{consumer}") + for queue_info in query_return.values(): + f.write(f",{queue_info['jobs']},{queue_info['consumers']}") f.write("\n") @@ -236,43 +240,130 @@ def create_config(task_server: str, config_dir: str, broker: str, test: str) -> LOG.error("Only celery can be configured currently.") -def check_merlin_status(args, spec): +def get_active_queues(task_server: str) -> Dict[str, List[str]]: """ - Function to check merlin workers and queues to keep - the allocation alive + Get a dictionary of active queues and the workers attached to these queues. + + :param `task_server`: The task server to query for active queues + :returns: A dict where keys are queue names and values are a list of workers watching them + """ + active_queues = {} + + if task_server == "celery": + from merlin.celery import app # pylint: disable=C0415 + + active_queues, _ = get_active_celery_queues(app) + else: + LOG.error("Only celery can be configured currently.") + + return active_queues + + +def wait_for_workers(sleep: int, task_server: str, spec: "MerlinSpec"): # noqa + """ + Wait on workers to start up. Check on worker start 10 times with `sleep` seconds between + each check. If no workers are started in time, raise an error to kill the monitor (there + was likely an issue with the task server that caused worker launch to fail). + + :param `sleep`: An integer representing the amount of seconds to sleep between each check + :param `task_server`: The task server from which to look for workers + :param `spec`: A MerlinSpec object representing the spec we're monitoring + """ + # Get the names of the workers that we're looking for + worker_names = spec.get_worker_names() + LOG.info(f"Checking for the following workers: {worker_names}") + + # Loop until workers are detected + count = 0 + max_count = 10 + while count < max_count: + # This list will include strings comprised of the worker name with the hostname e.g. worker_name@host. + worker_status = get_workers(task_server) + LOG.info(f"Monitor: checking for workers, running workers = {worker_status} ...") + + # Check to see if any of the workers we're looking for in 'worker_names' have started + check = any(any(iwn in iws for iws in worker_status) for iwn in worker_names) + if check: + break + + # Increment count and sleep until the next check + count += 1 + time.sleep(sleep) + + # If no workers were started in time, raise an exception to stop the monitor + if count == max_count: + raise NoWorkersException("Monitor: no workers available to process the non-empty queue") + + +def check_workers_processing(queues_in_spec: List[str], task_server: str) -> bool: + """ + Check if any workers are still processing tasks by querying the task server. + + :param `queues_in_spec`: A list of queues to check if tasks are still active in + :param `task_server`: The task server from which to query + :returns: True if workers are still processing tasks, False otherwise + """ + result = False + + if task_server == "celery": + from merlin.celery import app + + result = check_celery_workers_processing(queues_in_spec, app) + else: + LOG.error("Celery is not specified as the task server!") + + return result + + +def check_merlin_status(args: "Namespace", spec: "MerlinSpec") -> bool: # noqa + """ + Function to check merlin workers and queues to keep the allocation alive :param `args`: parsed CLI arguments - :param `spec`: the parsed spec.yaml + :param `spec`: the parsed spec.yaml as a MerlinSpec object + :returns: True if there are still tasks being processed, False otherwise """ + # Initialize the variable to track if there are still active tasks + active_tasks = False + + # Get info about jobs and workers in our spec from celery queue_status = query_status(args.task_server, spec, args.steps, verbose=False) + LOG.debug(f"Monitor: queue_status: {queue_status}") + # Count the number of jobs that are active + # (Adding up the number of consumers in the same way is inaccurate so we won't do that) total_jobs = 0 - total_consumers = 0 - for _, jobs, consumers in queue_status: - total_jobs += jobs - total_consumers += consumers - - if total_jobs > 0 and total_consumers == 0: - # Determine if any of the workers are on this allocation - worker_names = spec.get_worker_names() - - # Loop until workers are detected. - count = 0 - max_count = 10 - while count < max_count: - # This list will include strings comprised of the worker name with the hostname e.g. worker_name@host. - worker_status = get_workers(args.task_server) - LOG.info(f"Monitor: checking for workers, running workers = {worker_status} ...") - - check = any(any(iwn in iws for iws in worker_status) for iwn in worker_names) - if check: - break - - count += 1 - time.sleep(args.sleep) - - if count == max_count: - LOG.error("Monitor: no workers available to process the non-empty queue") - total_jobs = 0 - - return total_jobs + for queue_info in queue_status.values(): + total_jobs += queue_info["jobs"] + + # Get the queues defined in the spec + queues_in_spec = spec.get_queue_list(["all"]) + LOG.debug(f"Monitor: queues_in_spec: {queues_in_spec}") + + # Get the active queues and the workers that are watching them + active_queues = get_active_queues(args.task_server) + LOG.debug(f"Monitor: active_queues: {active_queues}") + + # Count the number of workers that are active + consumers = set() + for active_queue, workers_on_queue in active_queues.items(): + if active_queue in queues_in_spec: + consumers |= set(workers_on_queue) + LOG.debug(f"Monitor: consumers found: {consumers}") + total_consumers = len(consumers) + + LOG.info(f"Monitor: found {total_jobs} jobs in queues and {total_consumers} workers alive") + + # If there are no workers, wait for the workers to start + if total_consumers == 0: + wait_for_workers(args.sleep, args.task_server, spec) + + # If we're here, workers have started and jobs should be queued + if total_jobs > 0: + active_tasks = True + # If there are no jobs left, see if any workers are still processing them + elif total_jobs == 0: + active_tasks = check_workers_processing(queues_in_spec, args.task_server) + + LOG.debug(f"Monitor: active_tasks: {active_tasks}") + return active_tasks diff --git a/merlin/study/celeryadapter.py b/merlin/study/celeryadapter.py index cd9714dff..84fb96ff1 100644 --- a/merlin/study/celeryadapter.py +++ b/merlin/study/celeryadapter.py @@ -39,12 +39,18 @@ from contextlib import suppress from typing import Dict, List, Optional +from amqp.exceptions import ChannelError +from celery import Celery + +from merlin.config import Config from merlin.study.batch import batch_check_parallel, batch_worker_launch from merlin.utils import apply_list_of_regex, check_machines, get_procs, get_yaml_var, is_running LOG = logging.getLogger(__name__) +# TODO figure out a better way to handle the import of celery app and CONFIG + def run_celery(study, run_mode=None): """ @@ -102,7 +108,7 @@ def get_running_queues(celery_app_name: str, test_mode: bool = False) -> List[st return running_queues -def get_queues(app): +def get_active_celery_queues(app): """Get all active queues and workers for a celery application. Unlike get_running_queues, this goes through the application's server. @@ -117,7 +123,7 @@ def get_queues(app): :example: >>> from merlin.celery import app - >>> queues, workers = get_queues(app) + >>> queues, workers = get_active_celery_queues(app) >>> queue_names = [*queues] >>> workers_on_q0 = queues[queue_names[0]] >>> workers_not_on_q0 = [worker for worker in workers @@ -139,7 +145,7 @@ def get_queues(app): def get_active_workers(app): """ - This is the inverse of get_queues() defined above. This function + This is the inverse of get_active_celery_queues() defined above. This function builds a dict where the keys are worker names and the values are lists of queues attached to the worker. @@ -228,7 +234,7 @@ def query_celery_workers(spec_worker_names, queues, workers_regex): # --queues flag if queues: # Get a mapping between queues and the workers watching them - queue_worker_map, _ = get_queues(app) + queue_worker_map, _ = get_active_celery_queues(app) # Remove duplicates and prepend the celery queue tag to all queues queues = list(set(queues)) celerize_queues(queues) @@ -270,26 +276,54 @@ def query_celery_workers(spec_worker_names, queues, workers_regex): print() -def query_celery_queues(queues): - """Return stats for queues specified. +def query_celery_queues(queues: List[str], app: Celery = None, config: Config = None) -> Dict[str, List[str]]: + """ + Build a dict of information about the number of jobs and consumers attached + to specific queues that we want information on. - Send results to the log. + :param queues: A list of the queues we want to know about + :param app: The celery application (this will be none unless testing) + :param config: The configuration object that has the broker name (this will be none unless testing) + :returns: A dict of info on the number of jobs and consumers for each queue in `queues` """ - from merlin.celery import app # pylint: disable=C0415 + if app is None: + from merlin.celery import app # pylint: disable=C0415 + if config is None: + from merlin.config.configfile import CONFIG as config # pylint: disable=C0415 - connection = app.connection() - found_queues = [] - try: - channel = connection.channel() - for queue in queues: - try: - name, jobs, consumers = channel.queue_declare(queue=queue, passive=True) - found_queues.append((name, jobs, consumers)) - except Exception as e: # pylint: disable=C0103,W0718 - LOG.warning(f"Cannot find queue {queue} on server.{e}") - finally: - connection.close() - return found_queues + # Initialize the dictionary with the info we want about our queues + queue_info = {queue: {"consumers": 0, "jobs": 0} for queue in queues} + + # Open a connection via our Celery app + with app.connection() as conn: + # Open a channel inside our connection + with conn.channel() as channel: + # Loop through all the queues we're searching for + for queue in queues: + try: + # Count the number of jobs and consumers for each queue + _, queue_info[queue]["jobs"], queue_info[queue]["consumers"] = channel.queue_declare( + queue=queue, passive=True + ) + # Redis likes to throw this error when a queue we're looking for has no jobs + except ChannelError: + pass + + # Redis doesn't keep track of consumers attached to queues like rabbit does + # so we have to count this ourselves here + if config.broker.name in ("rediss", "redis"): + # Get a dict of active queues by querying the celery app + active_queues = app.control.inspect().active_queues() + if active_queues is not None: + # Loop through each active queue that was found + for active_queue_list in active_queues.values(): + # Loop through each queue that each worker is watching + for active_queue in active_queue_list: + # If this is a queue we're looking for, increment the consumer count + if active_queue["name"] in queues: + queue_info[active_queue["name"]]["consumers"] += 1 + + return queue_info def get_workers_from_app(): @@ -308,6 +342,27 @@ def get_workers_from_app(): return [*workers] +def check_celery_workers_processing(queues_in_spec: List[str], app: Celery) -> bool: + """ + Query celery to see if any workers are still processing tasks. + + :param queues_in_spec: A list of queues to check if tasks are still active in + :param app: The celery app that we're querying + :returns: True if workers are still processing tasks, False otherwise + """ + # Query celery for active tasks + active_tasks = app.control.inspect().active() + + # Search for the queues we provided if necessary + if active_tasks is not None: + for tasks in active_tasks.values(): + for task in tasks: + if task["delivery_info"]["routing_key"] in queues_in_spec: + return True + + return False + + def _get_workers_to_start(spec, steps): """ Helper function to return a set of workers to start based on @@ -620,7 +675,7 @@ def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None): from merlin.celery import app # pylint: disable=C0415 LOG.debug(f"Sending stop to queues: {queues}, worker_regex: {worker_regex}, spec_worker_names: {spec_worker_names}") - active_queues, _ = get_queues(app) + active_queues, _ = get_active_celery_queues(app) # If not specified, get all the queues if queues is None: diff --git a/requirements/dev.txt b/requirements/dev.txt index 895a89249..6e8722b4b 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -11,3 +11,4 @@ sphinx>=2.0.0 alabaster johnnydep deepdiff +pytest-order diff --git a/tests/celery_test_workers.py b/tests/celery_test_workers.py new file mode 100644 index 000000000..39eb2a39b --- /dev/null +++ b/tests/celery_test_workers.py @@ -0,0 +1,231 @@ +############################################################################### +# Copyright (c) 2023, Lawrence Livermore National Security, LLC. +# Produced at the Lawrence Livermore National Laboratory +# Written by the Merlin dev team, listed in the CONTRIBUTORS file. +# +# +# LLNL-CODE-797170 +# All rights reserved. +# This file is part of Merlin, Version: 1.11.1. +# +# For details, see https://github.com/LLNL/merlin. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +############################################################################### +""" +Module to define functionality for test workers and how to start/stop +them in their own processes. +""" +import multiprocessing +import os +import signal +import subprocess +from time import sleep +from types import TracebackType +from typing import Dict, List, Type + +from celery import Celery + + +class CeleryTestWorkersManager: + """ + A class to handle the setup and teardown of celery workers. + This should be treated as a context and used with python's + built-in 'with' statement. If you use it without this statement, + beware that the processes spun up here may never be stopped. + """ + + def __init__(self, app: Celery): + self.app = app + self.running_workers = [] + self.worker_processes = {} + self.echo_processes = {} + + def __enter__(self): + """This magic method is necessary for allowing this class to be used as a context manager.""" + return self + + def __exit__(self, exc_type: Type[Exception], exc_value: Exception, traceback: TracebackType): + """ + This will always run at the end of a context with statement, even if an error is raised. + It's a safe way to ensure all of our subprocesses are stopped no matter what. + """ + + # Try to stop everything gracefully first + self.stop_all_workers() + + # Check that all the worker processes were stopped, otherwise forcefully terminate them + for worker_process in self.worker_processes.values(): + if worker_process.is_alive(): + worker_process.kill() + + # Check that all the echo processes were stopped, otherwise forcefully terminate them + ps_proc = subprocess.run("ps ux", shell=True, capture_output=True, text=True) + for pid in self.echo_processes.values(): + if str(pid) in ps_proc.stdout: + os.kill(pid, signal.SIGKILL) + + def _is_worker_ready(self, worker_name: str, verbose: bool = False) -> bool: + """ + Check to see if the worker is up and running yet. + + :param worker_name: The name of the worker we're checking on + :param verbose: If true, enable print statements to show where we're at in execution + :returns: True if the worker is running. False otherwise. + """ + ping = self.app.control.inspect().ping(destination=[f"celery@{worker_name}"]) + if verbose: + print(f"ping: {ping}") + return ping is not None and f"celery@{worker_name}" in ping + + def _wait_for_worker_launch(self, worker_name: str, verbose: bool = False): + """ + Poll the worker over a fixed interval of time. If the worker doesn't show up + within the time limit then we'll raise a timeout error. Otherwise, the worker + is up and running and we can continue with our tests. + + :param worker_name: The name of the worker we're checking on + :param verbose: If true, enable print statements to show where we're at in execution + """ + max_wait_time = 2 # Maximum wait time in seconds + wait_interval = 0.5 # Interval between checks in seconds + waited_time = 0 + worker_ready = False + + if verbose: + print(f"waiting for {worker_name} to launch...") + + # Wait until the worker is ready + while waited_time < max_wait_time: + if self._is_worker_ready(worker_name, verbose=verbose): + worker_ready = True + break + + sleep(wait_interval) + waited_time += wait_interval + + if not worker_ready: + raise TimeoutError("Celery workers did not start within the expected time.") + + if verbose: + print(f"{worker_name} launched") + + def start_worker(self, worker_launch_cmd: List[str]): + """ + This is where a worker is actually started. Each worker maintains control of a process until + we tell it to stop, that's why we have to use the multiprocessing library for this. We have to use + app.worker_main instead of the normal "celery -A worker" command to launch the workers + since our celery app is created in a pytest fixture and is unrecognizable by the celery command. + For each worker, the output of it's logs are sent to + /tmp/`whoami`/pytest-of-`whoami`/pytest-current/integration_outfiles_current/ under a file with a name + similar to: test_worker_*.log. + NOTE: pytest-current/ will have the results of the most recent test run. If you want to see a previous run + check under pytest-/. HOWEVER, only the 3 most recent test runs will be saved. + + :param worker_launch_cmd: The command to launch a worker + """ + self.app.worker_main(worker_launch_cmd) + + def launch_worker(self, worker_name: str, queues: List[str], concurrency: int = 1): + """ + Launch a single worker. We'll add the process that the worker is running in to the list of worker processes. + We'll also create an echo process to simulate a celery worker command that will show up with 'ps ux'. + + :param worker_name: The name to give to the worker + :param queues: A list of queues that the worker will be watching + :param concurrency: The concurrency value of the worker (how many child processes to have the worker spin up) + """ + # Check to make sure we have a unique worker name so we can track all processes + if worker_name in self.worker_processes: + self.stop_all_workers() + raise ValueError(f"The worker {worker_name} is already running. Choose a different name.") + + # Create the launch command for this worker + worker_launch_cmd = [ + "worker", + "-n", + worker_name, + "-Q", + ",".join(queues), + "--concurrency", + str(concurrency), + f"--logfile={worker_name}.log", + "--loglevel=DEBUG", + ] + + # Create an echo command to simulate a running celery worker since our celery worker will be spun up in + # a different process and we won't be able to see it with 'ps ux' like we normally would + echo_process = subprocess.Popen( # pylint: disable=consider-using-with + f"echo 'celery merlin_test_app {' '.join(worker_launch_cmd)}'; sleep inf", + shell=True, + preexec_fn=os.setpgrp, # Make this the parent of the group so we can kill the 'sleep inf' that's spun up + ) + self.echo_processes[worker_name] = echo_process.pid + + # Start the worker in a separate process since it'll take control of the entire process until we kill it + worker_process = multiprocessing.Process(target=self.start_worker, args=(worker_launch_cmd,)) + worker_process.start() + self.worker_processes[worker_name] = worker_process + self.running_workers.append(worker_name) + + # Wait for the worker to launch properly + try: + self._wait_for_worker_launch(worker_name, verbose=False) + except TimeoutError as exc: + self.stop_all_workers() + raise exc + + def launch_workers(self, worker_info: Dict[str, Dict]): + """ + Launch multiple workers. This will call `launch_worker` to launch each worker + individually. + + :param worker_info: A dict of worker info with the form + {"worker_name": {"concurrency": , "queues": }} + """ + for worker_name, worker_settings in worker_info.items(): + self.launch_worker(worker_name, worker_settings["queues"], worker_settings["concurrency"]) + + def stop_worker(self, worker_name: str): + """ + Stop a single running worker and its associated processes. + + :param worker_name: The name of the worker to shutdown + """ + # Send a shutdown signal to the worker + self.app.control.broadcast("shutdown", destination=[f"celery@{worker_name}"]) + + # Try to terminate the process gracefully + if self.worker_processes[worker_name] is not None: + self.worker_processes[worker_name].terminate() + process_exit_code = self.worker_processes[worker_name].join(timeout=3) + + # If it won't terminate then force kill it + if process_exit_code is None: + self.worker_processes[worker_name].kill() + + # Terminate the echo process and its sleep inf subprocess + os.killpg(os.getpgid(self.echo_processes[worker_name]), signal.SIGTERM) + sleep(2) + + def stop_all_workers(self): + """ + Stop all of the running workers and the processes associated with them. + """ + for worker_name in self.running_workers: + self.stop_worker(worker_name) diff --git a/tests/conftest.py b/tests/conftest.py index 88932c5db..38c6b0334 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -31,16 +31,18 @@ This module contains pytest fixtures to be used throughout the entire integration test suite. """ -import multiprocessing import os import subprocess from time import sleep -from typing import Dict, List +from typing import Dict import pytest import redis from _pytest.tmpdir import TempPathFactory from celery import Celery +from celery.canvas import Signature + +from tests.celery_test_workers import CeleryTestWorkersManager class RedisServerError(Exception): @@ -124,7 +126,7 @@ def redis_server(merlin_server_dir: str, redis_pass: str) -> str: # pylint: dis # Start the local redis server try: # Need to set LC_ALL='C' before starting the server or else redis causes a failure - subprocess.run("export LC_ALL='C'; merlin server start", shell=True, capture_output=True, text=True, timeout=5) + subprocess.run("export LC_ALL='C'; merlin server start", shell=True, timeout=5) except subprocess.TimeoutExpired: pass @@ -154,108 +156,39 @@ def celery_app(redis_server: str) -> Celery: # pylint: disable=redefined-outer- :param redis_server: The redis server uri we'll use to connect to redis :returns: The celery app object we'll use for testing """ - return Celery("test_app", broker=redis_server, backend=redis_server) + return Celery("merlin_test_app", broker=redis_server, backend=redis_server) @pytest.fixture(scope="session") -def worker_queue_map() -> Dict[str, str]: - """ - Worker and queue names to be used throughout tests - - :returns: A dict of dummy worker/queue associations - """ - return {f"test_worker_{i}": f"test_queue_{i}" for i in range(3)} - - -def are_workers_ready(app: Celery, num_workers: int, verbose: bool = False) -> bool: - """ - Check to see if the workers are up and running yet. - - :param app: The celery app fixture that's connected to our redis server - :param num_workers: An int representing the number of workers we're looking to have started - :param verbose: If true, enable print statements to show where we're at in execution - :returns: True if all workers are running. False otherwise. +def sleep_sig(celery_app: Celery) -> Signature: # pylint: disable=redefined-outer-name """ - app_stats = app.control.inspect().stats() - if verbose: - print(f"app_stats: {app_stats}") - return app_stats is not None and len(app_stats) == num_workers + Create a task registered to our celery app and return a signature for it. + Once requested by a test, you can set the queue you'd like to send this to + with `sleep_sig.set(queue=)`. Here, will likely be + one of the queues defined in the `worker_queue_map` fixture. - -def wait_for_worker_launch(app: Celery, num_workers: int, verbose: bool = False): - """ - Poll the workers over a fixed interval of time. If the workers don't show up - within the time limit then we'll raise a timeout error. Otherwise, the workers - are up and running and we can continue with our tests. - - :param app: The celery app fixture that's connected to our redis server - :param num_workers: An int representing the number of workers we're looking to have started - :param verbose: If true, enable print statements to show where we're at in execution + :param celery_app: The celery app object we'll use for testing + :returns: A celery signature for a task that will sleep for 3 seconds """ - max_wait_time = 2 # Maximum wait time in seconds - wait_interval = 0.5 # Interval between checks in seconds - waited_time = 0 - - if verbose: - print("waiting for workers to launch...") - # Wait until all workers are ready - while not are_workers_ready(app, num_workers, verbose=verbose) and waited_time < max_wait_time: - sleep(wait_interval) - waited_time += wait_interval + # Create a celery task that sleeps for 3 sec + @celery_app.task + def sleep_task(): + print("running sleep task") + sleep(3) - # If all workers are not ready after the maximum wait time, raise an error - if not are_workers_ready(app, num_workers, verbose=verbose): - raise TimeoutError("Celery workers did not start within the expected time.") + # Create a signature for this task + return sleep_task.s() - if verbose: - print("workers launched") - -def shutdown_processes(worker_processes: List[multiprocessing.Process], echo_processes: List[subprocess.Popen]): - """ - Given lists of processes, shut them all down. Worker processes were created with the - multiprocessing library and echo processes were created with the subprocess library, - so we have to shut them down slightly differently. - - :param worker_processes: A list of worker processes to terminate - :param echo_processes: A list of echo processes to terminate +@pytest.fixture(scope="session") +def worker_queue_map() -> Dict[str, str]: """ - # Worker processes were created with the multiprocessing library - for worker_process in worker_processes: - # Try to terminate the process gracefully - worker_process.terminate() - process_exit_code = worker_process.join(timeout=3) - - # If it won't terminate then force kill it - if process_exit_code is None: - worker_process.kill() - - # Gracefully terminate the echo processes - for echo_process in echo_processes: - echo_process.terminate() - echo_process.wait() - - # The echo processes will spawn 3 sleep inf processes that we also need to kill - subprocess.run("ps ux | grep 'sleep inf' | grep -v grep | awk '{print $2}' | xargs kill", shell=True) - + Worker and queue names to be used throughout tests -def start_worker(app: Celery, worker_launch_cmd: List[str]): - """ - This is where a worker is actually started. Each worker maintains control of a process until - we tell it to stop, that's why we have to use the multiprocessing library for this. We have to use - app.worker_main instead of the normal "celery -A worker" command to launch the workers - since our celery app is created in a pytest fixture and is unrecognizable by the celery command. - For each worker, the output of it's logs are sent to - /tmp/`whoami`/pytest-of-`whoami`/pytest-current/integration_outfiles_current/ under a file with a name - similar to: test_worker_*.log. - NOTE: pytest-current/ will have the results of the most recent test run. If you want to see a previous run - check under pytest-/. HOWEVER, only the 3 most recent test runs will be saved. - - :param app: The celery app fixture that's connected to our redis server - :param worker_launch_cmd: The command to launch a worker + :returns: A dict of dummy worker/queue associations """ - app.worker_main(worker_launch_cmd) + return {f"test_worker_{i}": f"test_queue_{i}" for i in range(3)} @pytest.fixture(scope="class") @@ -267,36 +200,10 @@ def launch_workers(celery_app: Celery, worker_queue_map: Dict[str, str]): # pyl :param celery_app: The celery app fixture that's connected to our redis server :param worker_queue_map: A dict where the keys are worker names and the values are queue names """ - # Create the processes that will start the workers and store them in a list - worker_processes = [] - echo_processes = [] - for worker, queue in worker_queue_map.items(): - worker_launch_cmd = ["worker", "-n", worker, "-Q", queue, "--concurrency", "1", f"--logfile={worker}.log"] - - # We have to use this dummy echo command to simulate a celery worker command that will show up with 'ps ux' - # We'll sleep for infinity here and then kill this process during shutdown - echo_process = subprocess.Popen( # pylint: disable=consider-using-with - f"echo 'celery test_app {' '.join(worker_launch_cmd)}'; sleep inf", shell=True - ) - echo_processes.append(echo_process) - - # We launch workers in their own process since they maintain control of a process until we stop them - worker_process = multiprocessing.Process(target=start_worker, args=(celery_app, worker_launch_cmd)) - worker_process.start() - worker_processes.append(worker_process) - - # Ensure that the workers start properly before letting tests use them - try: - num_workers = len(worker_queue_map) - wait_for_worker_launch(celery_app, num_workers, verbose=False) - except TimeoutError as exc: - # If workers don't launch in time, we need to make sure these processes stop - shutdown_processes(worker_processes, echo_processes) - raise exc - - # Give control to the tests that need to use workers - yield - - # Shut down the workers and terminate the processes - celery_app.control.broadcast("shutdown", destination=list(worker_queue_map.keys())) - shutdown_processes(worker_processes, echo_processes) + # Format worker info in a format the our workers manager will be able to read + # (basically just add in concurrency value to worker_queue_map) + worker_info = {worker_name: {"concurrency": 1, "queues": [queue]} for worker_name, queue in worker_queue_map.items()} + + with CeleryTestWorkersManager(celery_app) as workers_manager: + workers_manager.launch_workers(worker_info) + yield diff --git a/tests/unit/study/test_celeryadapter.py b/tests/unit/study/test_celeryadapter.py index 82e8401e6..67728881e 100644 --- a/tests/unit/study/test_celeryadapter.py +++ b/tests/unit/study/test_celeryadapter.py @@ -30,28 +30,55 @@ """ Tests for the celeryadapter module. """ +from time import sleep from typing import Dict -import celery +import pytest +from celery import Celery +from celery.canvas import Signature from merlin.config import Config from merlin.study import celeryadapter -class TestActiveQueues: +@pytest.mark.order(before="TestInactive") +class TestActive: """ - This class will test queue related functions in the celeryadapter.py module. - It will run tests where we need active queues to interact with. + This class will test functions in the celeryadapter.py module. + It will run tests where we need active queues/workers to interact with. + + NOTE: The tests in this class must be ran before the TestInactive class or else the + Celery workers needed for this class don't start + + TODO: fix the bug noted above and then check if we still need pytest-order """ - def test_query_celery_queues(self, launch_workers: "Fixture"): # noqa: F821 + def test_query_celery_queues( + self, celery_app: Celery, launch_workers: "Fixture", worker_queue_map: Dict[str, str] # noqa: F821 + ): """ Test the query_celery_queues function by providing it with a list of active queues. - This should return a list of tuples. Each tuple will contain information - (name, num jobs, num consumers) for each queue that we provided. + This should return a dict where keys are queue names and values are more dicts containing + the number of jobs and consumers in that queue. + + :param `celery_app`: A pytest fixture for the test Celery app + :param launch_workers: A pytest fixture that launches celery workers for us to interact with + :param worker_queue_map: A pytest fixture that returns a dict of workers and queues """ - # TODO Modify query_celery_queues so the output for a redis broker is the same - # as the output for rabbit broker + # Set up a dummy configuration to use in the test + dummy_config = Config({"broker": {"name": "redis"}}) + + # Get the actual output + queues_to_query = list(worker_queue_map.values()) + actual_queue_info = celeryadapter.query_celery_queues(queues_to_query, app=celery_app, config=dummy_config) + + # Ensure all 3 queues in worker_queue_map were queried before looping + assert len(actual_queue_info) == 3 + + # Ensure each queue has a worker attached + for queue_name, queue_info in actual_queue_info.items(): + assert queue_name in worker_queue_map.values() + assert queue_info == {"consumers": 1, "jobs": 0} def test_get_running_queues(self, launch_workers: "Fixture", worker_queue_map: Dict[str, str]): # noqa: F821 """ @@ -61,14 +88,14 @@ def test_get_running_queues(self, launch_workers: "Fixture", worker_queue_map: D :param `launch_workers`: A pytest fixture that launches celery workers for us to interact with :param `worker_queue_map`: A pytest fixture that returns a dict of workers and queues """ - result = celeryadapter.get_running_queues("test_app", test_mode=True) + result = celeryadapter.get_running_queues("merlin_test_app", test_mode=True) assert sorted(result) == sorted(list(worker_queue_map.values())) - def test_get_queues_active( - self, celery_app: celery.Celery, launch_workers: "Fixture", worker_queue_map: Dict[str, str] # noqa: F821 + def test_get_active_celery_queues( + self, celery_app: Celery, launch_workers: "Fixture", worker_queue_map: Dict[str, str] # noqa: F821 ): """ - Test the get_queues function with queues active. + Test the get_active_celery_queues function with queues active. This should return a tuple where the first entry is a dict of queue info and the second entry is a list of worker names. @@ -77,7 +104,7 @@ def test_get_queues_active( :param `worker_queue_map`: A pytest fixture that returns a dict of workers and queues """ # Start the queues and run the test - queue_result, worker_result = celeryadapter.get_queues(celery_app) + queue_result, worker_result = celeryadapter.get_active_celery_queues(celery_app) # Ensure we got output before looping assert len(queue_result) == len(worker_result) == 3 @@ -104,21 +131,78 @@ def test_get_queues_active( assert queue_result == {} assert worker_result == [] + @pytest.mark.order(index=1) + def test_check_celery_workers_processing_tasks( + self, + celery_app: Celery, + sleep_sig: Signature, + launch_workers: "Fixture", # noqa: F821 + ): + """ + Test the check_celery_workers_processing function with workers active and a task in a queue. + This function will query workers for any tasks they're still processing. We'll send a + a task that sleeps for 3 seconds to our workers before we run this test so that there should be + a task for this function to find. + + NOTE: the celery app fixture shows strange behavior when using app.control.inspect() calls (which + check_celery_workers_processing uses) so we have to run this test first in this class in order to + have it run properly. + + :param celery_app: A pytest fixture for the test Celery app + :param sleep_sig: A pytest fixture for a celery signature of a task that sleeps for 3 sec + :param launch_workers: A pytest fixture that launches celery workers for us to interact with + """ + # Our active workers/queues are test_worker_[0-2]/test_queue_[0-2] so we're + # sending this to test_queue_0 for test_worker_0 to process + queue_for_signature = "test_queue_0" + sleep_sig.set(queue=queue_for_signature) + result = sleep_sig.delay() + + # We need to give the task we just sent to the server a second to get picked up by the worker + sleep(1) -class TestInactiveQueues: + # Run the test now that the task should be getting processed + active_queue_test = celeryadapter.check_celery_workers_processing([queue_for_signature], celery_app) + assert active_queue_test is True + + # Now test that a queue without any tasks returns false + # We sent the signature to task_queue_0 so task_queue_1 shouldn't have any tasks to find + non_active_queue_test = celeryadapter.check_celery_workers_processing(["test_queue_1"], celery_app) + assert non_active_queue_test is False + + # Wait for the worker to finish running the task + result.get() + + +class TestInactive: """ - This class will test queue related functions in the celeryadapter.py module. - It will run tests where we don't need any active queues to interact with. + This class will test functions in the celeryadapter.py module. + It will run tests where we don't need any active queues/workers to interact with. """ - def test_query_celery_queues(self): + def test_query_celery_queues(self, celery_app: Celery, worker_queue_map: Dict[str, str]): # noqa: F821 """ Test the query_celery_queues function by providing it with a list of inactive queues. - This should return a list of strings. Each string will give a message saying that a - particular queue was inactive + This should return a dict where keys are queue names and values are more dicts containing + the number of jobs and consumers in that queue (which should be 0 for both here). + + :param `celery_app`: A pytest fixture for the test Celery app + :param worker_queue_map: A pytest fixture that returns a dict of workers and queues """ - # TODO Modify query_celery_queues so the output for a redis broker is the same - # as the output for rabbit broker + # Set up a dummy configuration to use in the test + dummy_config = Config({"broker": {"name": "redis"}}) + + # Get the actual output + queues_to_query = list(worker_queue_map.values()) + actual_queue_info = celeryadapter.query_celery_queues(queues_to_query, app=celery_app, config=dummy_config) + + # Ensure all 3 queues in worker_queue_map were queried before looping + assert len(actual_queue_info) == 3 + + # Ensure each queue has no worker attached (since the queues should be inactive here) + for queue_name, queue_info in actual_queue_info.items(): + assert queue_name in worker_queue_map.values() + assert queue_info == {"consumers": 0, "jobs": 0} def test_celerize_queues(self, worker_queue_map: Dict[str, str]): """ @@ -144,17 +228,29 @@ def test_get_running_queues(self): Test the get_running_queues function with no queues active. This should return an empty list. """ - result = celeryadapter.get_running_queues("test_app", test_mode=True) + result = celeryadapter.get_running_queues("merlin_test_app", test_mode=True) assert result == [] - def test_get_queues(self, celery_app: celery.Celery): + def test_get_active_celery_queues(self, celery_app: Celery): """ - Test the get_queues function with no queues active. + Test the get_active_celery_queues function with no queues active. This should return a tuple where the first entry is an empty dict and the second entry is an empty list. :param `celery_app`: A pytest fixture for the test Celery app """ - queue_result, worker_result = celeryadapter.get_queues(celery_app) + queue_result, worker_result = celeryadapter.get_active_celery_queues(celery_app) assert queue_result == {} assert worker_result == [] + + def test_check_celery_workers_processing_tasks(self, celery_app: Celery, worker_queue_map: Dict[str, str]): + """ + Test the check_celery_workers_processing function with no workers active. + This function will query workers for any tasks they're still processing. Since no workers are active + this should return False. + + :param celery_app: A pytest fixture for the test Celery app + """ + # Run the test now that the task should be getting processed + result = celeryadapter.check_celery_workers_processing(list(worker_queue_map.values()), celery_app) + assert result is False