Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix/monitor-shutdown #452

Merged
merged 16 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/push-pr_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ jobs:
- name: Run pytest over unit test suite
run: |
python3 -m pytest tests/unit/
python3 -m pytest -v --order-scope=module tests/unit/
- name: Run integration test suite for local tests
run: |
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- NOTE: an export command `export LC_ALL='C'` had to be added to fix a bug in the WEAVE CI. This can be removed when we resolve this issue for the `merlin server` command
- Tests for the `celeryadapter.py` module

### Fixed
- The `merlin status` command so that it's consistent in its output whether using redis or rabbitmq as the broker
- The `merlin monitor` command will now keep an allocation up if the queues are empty and workers are still processing tasks

## [1.11.1]
### Fixed
- Typo in `batch.py` that caused lsf launches to fail (`ALL_SGPUS` changed to `ALL_GPUS`)
Expand Down
14 changes: 7 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ install-dev: virtualenv install-merlin install-workflow-deps
# tests require a valid dev install of merlin
unit-tests:
. $(VENV)/bin/activate; \
$(PYTHON) -m pytest $(UNIT); \
$(PYTHON) -m pytest -v --order-scope=module $(UNIT); \


# run CLI tests - these require an active install of merlin in a venv
Expand Down Expand Up @@ -135,9 +135,9 @@ check-flake8:

check-black:
. $(VENV)/bin/activate; \
$(PYTHON) -m black --check --line-length $(MAX_LINE_LENGTH) --target-version py36 $(MRLN); \
$(PYTHON) -m black --check --line-length $(MAX_LINE_LENGTH) --target-version py36 $(TEST); \
$(PYTHON) -m black --check --line-length $(MAX_LINE_LENGTH) --target-version py36 *.py; \
$(PYTHON) -m black --check --line-length $(MAX_LINE_LENGTH) --target-version py38 $(MRLN); \
$(PYTHON) -m black --check --line-length $(MAX_LINE_LENGTH) --target-version py38 $(TEST); \
$(PYTHON) -m black --check --line-length $(MAX_LINE_LENGTH) --target-version py38 *.py; \


check-isort:
Expand Down Expand Up @@ -179,9 +179,9 @@ fix-style:
$(PYTHON) -m isort -w $(MAX_LINE_LENGTH) $(MRLN); \
$(PYTHON) -m isort -w $(MAX_LINE_LENGTH) $(TEST); \
$(PYTHON) -m isort -w $(MAX_LINE_LENGTH) *.py; \
$(PYTHON) -m black --target-version py36 -l $(MAX_LINE_LENGTH) $(MRLN); \
$(PYTHON) -m black --target-version py36 -l $(MAX_LINE_LENGTH) $(TEST); \
$(PYTHON) -m black --target-version py36 -l $(MAX_LINE_LENGTH) *.py; \
$(PYTHON) -m black --target-version py38 -l $(MAX_LINE_LENGTH) $(MRLN); \
$(PYTHON) -m black --target-version py38 -l $(MAX_LINE_LENGTH) $(TEST); \
$(PYTHON) -m black --target-version py38 -l $(MAX_LINE_LENGTH) *.py; \


# Increment the Merlin version. USE ONLY ON DEVELOP BEFORE MERGING TO MASTER.
Expand Down
16 changes: 9 additions & 7 deletions docs/source/merlin_commands.rst
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,15 @@ Monitor (``merlin monitor``)
Batch submission scripts may not keep the batch allocation alive
if there is not a blocking process in the submission script. The
``merlin monitor`` command addresses this by providing a blocking process that
checks for tasks in the queues every (sleep) seconds. When the queues are empty, the
blocking process will exit and allow the allocation to end.
checks for tasks in the queues every (sleep) seconds. When the queues are empty, the
monitor will query celery to see if any workers are still processing tasks from the
queues. If no workers are processing any tasks from the queues and the queues are empty,
the blocking process will exit and allow the allocation to end.

The ``monitor`` function will check for celery workers for up to
10*(sleep) seconds before monitoring begins. The loop happens when the
queue(s) in the spec contain tasks, but no running workers are detected.
This is to protect against a failed worker launch.

.. code:: bash

Expand All @@ -129,11 +136,6 @@ for workers. The default is 60 seconds.

The only currently available option for ``--task_server`` is celery, which is the default when this flag is excluded.

The ``monitor`` function will check for celery workers for up to
10*(sleep) seconds before monitoring begins. The loop happens when the
queue(s) in the spec contain tasks, but no running workers are detected.
This is to protect against a failed worker launch.


Purging Tasks (``merlin purge``)
--------------------------------
Expand Down
11 changes: 11 additions & 0 deletions merlin/exceptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"HardFailException",
"InvalidChainException",
"RestartException",
"NoWorkersException",
)


Expand Down Expand Up @@ -92,3 +93,13 @@ class RestartException(Exception):

def __init__(self):
super().__init__()


class NoWorkersException(Exception):
"""
Exception to signal that no workers were started
to process a non-empty queue(s).
"""

def __init__(self, message):
super().__init__(message)
12 changes: 9 additions & 3 deletions merlin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,9 @@ def query_status(args):
print(banner_small)
spec, _ = get_merlin_spec_with_override(args)
ret = router.query_status(args.task_server, spec, args.steps)
for name, jobs, consumers in ret:
print(f"{name:30} - Workers: {consumers:10} - Queued Tasks: {jobs:10}")

for name, queue_info in ret.items():
print(f"{name:30} - Workers: {queue_info['consumers']:10} - Queued Tasks: {queue_info['jobs']:10}")
if args.csv is not None:
router.dump_status(ret, args.csv)

Expand Down Expand Up @@ -353,8 +354,13 @@ def process_monitor(args):
"""
LOG.info("Monitor: checking queues ...")
spec, _ = get_merlin_spec_with_override(args)

# Give the user time to queue up jobs in case they haven't already
time.sleep(args.sleep)

# Check if we still need our allocation
while router.check_merlin_status(args, spec):
LOG.info("Monitor: found tasks in queues")
LOG.info("Monitor: found tasks in queues and/or tasks being processed")
time.sleep(args.sleep)
LOG.info("Monitor: ... stop condition met")

Expand Down
163 changes: 127 additions & 36 deletions merlin/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,13 @@
import os
import time
from datetime import datetime
from typing import Dict, List

from merlin.exceptions import NoWorkersException
from merlin.study.celeryadapter import (
check_celery_workers_processing,
create_celery_config,
get_active_celery_queues,
get_workers_from_app,
purge_celery_tasks,
query_celery_queues,
Expand Down Expand Up @@ -151,12 +155,12 @@ def dump_status(query_return, csv_file):
with open(csv_file, mode=fmode) as f: # pylint: disable=W1514,C0103
if f.mode == "w": # add the header
f.write("# time")
for name, job, consumer in query_return:
for name in query_return:
f.write(f",{name}:tasks,{name}:consumers")
f.write("\n")
f.write(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
for name, job, consumer in query_return:
f.write(f",{job},{consumer}")
for queue_info in query_return.values():
f.write(f",{queue_info['jobs']},{queue_info['consumers']}")
f.write("\n")


Expand Down Expand Up @@ -236,43 +240,130 @@ def create_config(task_server: str, config_dir: str, broker: str, test: str) ->
LOG.error("Only celery can be configured currently.")


def check_merlin_status(args, spec):
def get_active_queues(task_server: str) -> Dict[str, List[str]]:
"""
Function to check merlin workers and queues to keep
the allocation alive
Get a dictionary of active queues and the workers attached to these queues.

:param `task_server`: The task server to query for active queues
:returns: A dict where keys are queue names and values are a list of workers watching them
"""
active_queues = {}

if task_server == "celery":
from merlin.celery import app # pylint: disable=C0415

active_queues, _ = get_active_celery_queues(app)
else:
LOG.error("Only celery can be configured currently.")

return active_queues


def wait_for_workers(sleep: int, task_server: str, spec: "MerlinSpec"): # noqa
"""
Wait on workers to start up. Check on worker start 10 times with `sleep` seconds between
each check. If no workers are started in time, raise an error to kill the monitor (there
was likely an issue with the task server that caused worker launch to fail).

:param `sleep`: An integer representing the amount of seconds to sleep between each check
:param `task_server`: The task server from which to look for workers
:param `spec`: A MerlinSpec object representing the spec we're monitoring
"""
# Get the names of the workers that we're looking for
worker_names = spec.get_worker_names()
LOG.info(f"Checking for the following workers: {worker_names}")

# Loop until workers are detected
count = 0
max_count = 10
while count < max_count:
# This list will include strings comprised of the worker name with the hostname e.g. worker_name@host.
worker_status = get_workers(task_server)
LOG.info(f"Monitor: checking for workers, running workers = {worker_status} ...")

# Check to see if any of the workers we're looking for in 'worker_names' have started
check = any(any(iwn in iws for iws in worker_status) for iwn in worker_names)
if check:
break

# Increment count and sleep until the next check
count += 1
time.sleep(sleep)

# If no workers were started in time, raise an exception to stop the monitor
if count == max_count:
raise NoWorkersException("Monitor: no workers available to process the non-empty queue")


def check_workers_processing(queues_in_spec: List[str], task_server: str) -> bool:
"""
Check if any workers are still processing tasks by querying the task server.

:param `queues_in_spec`: A list of queues to check if tasks are still active in
:param `task_server`: The task server from which to query
:returns: True if workers are still processing tasks, False otherwise
"""
result = False

if task_server == "celery":
from merlin.celery import app

result = check_celery_workers_processing(queues_in_spec, app)
else:
LOG.error("Celery is not specified as the task server!")

return result


def check_merlin_status(args: "Namespace", spec: "MerlinSpec") -> bool: # noqa
"""
Function to check merlin workers and queues to keep the allocation alive

:param `args`: parsed CLI arguments
:param `spec`: the parsed spec.yaml
:param `spec`: the parsed spec.yaml as a MerlinSpec object
:returns: True if there are still tasks being processed, False otherwise
"""
# Initialize the variable to track if there are still active tasks
active_tasks = False

# Get info about jobs and workers in our spec from celery
queue_status = query_status(args.task_server, spec, args.steps, verbose=False)
LOG.debug(f"Monitor: queue_status: {queue_status}")

# Count the number of jobs that are active
# (Adding up the number of consumers in the same way is inaccurate so we won't do that)
total_jobs = 0
total_consumers = 0
for _, jobs, consumers in queue_status:
total_jobs += jobs
total_consumers += consumers

if total_jobs > 0 and total_consumers == 0:
# Determine if any of the workers are on this allocation
worker_names = spec.get_worker_names()

# Loop until workers are detected.
count = 0
max_count = 10
while count < max_count:
# This list will include strings comprised of the worker name with the hostname e.g. worker_name@host.
worker_status = get_workers(args.task_server)
LOG.info(f"Monitor: checking for workers, running workers = {worker_status} ...")

check = any(any(iwn in iws for iws in worker_status) for iwn in worker_names)
if check:
break

count += 1
time.sleep(args.sleep)

if count == max_count:
LOG.error("Monitor: no workers available to process the non-empty queue")
total_jobs = 0

return total_jobs
for queue_info in queue_status.values():
total_jobs += queue_info["jobs"]

# Get the queues defined in the spec
queues_in_spec = spec.get_queue_list(["all"])
LOG.debug(f"Monitor: queues_in_spec: {queues_in_spec}")

# Get the active queues and the workers that are watching them
active_queues = get_active_queues(args.task_server)
LOG.debug(f"Monitor: active_queues: {active_queues}")

# Count the number of workers that are active
consumers = set()
for active_queue, workers_on_queue in active_queues.items():
if active_queue in queues_in_spec:
consumers |= set(workers_on_queue)
LOG.debug(f"Monitor: consumers found: {consumers}")
total_consumers = len(consumers)

LOG.info(f"Monitor: found {total_jobs} jobs in queues and {total_consumers} workers alive")

# If there are no workers, wait for the workers to start
if total_consumers == 0:
wait_for_workers(args.sleep, args.task_server, spec)

# If we're here, workers have started and jobs should be queued
if total_jobs > 0:
active_tasks = True
# If there are no jobs left, see if any workers are still processing them
elif total_jobs == 0:
active_tasks = check_workers_processing(queues_in_spec, args.task_server)

LOG.debug(f"Monitor: active_tasks: {active_tasks}")
return active_tasks
Loading
Loading