Skip to content

Commit

Permalink
Pause running workflows when stopping beeflow with 'beeflow core stop' (
Browse files Browse the repository at this point in the history
#830)

* Handles active workflows for 'beeflow core stop'
   - Pause running workflows when stopping beeflow with 'beeflow core stop'
   - Add info about stoping beeflow to documentation
   - Set workflow state to Running when 'beeflow resume <wf_id>' command is issued
   - Display beeflow version when starting core
   - Add job updates for tasks job to log
   - Add final job status check with sacct command when state cannot be found otherwise
   - Handle bad job IDs
 ---------

Co-authored-by: Jake Tronge <jtronge@lanl.gov>
  • Loading branch information
pagrubel and jtronge authored Jul 23, 2024
1 parent 9c8bdee commit b4bf36a
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 46 deletions.
26 changes: 20 additions & 6 deletions beeflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import shutil
import datetime
import time
import importlib.metadata

import daemon
import typer
Expand Down Expand Up @@ -246,9 +247,10 @@ def start_slurm_restd():
slurmrestd_log = '/'.join([bee_workdir, 'logs', 'restd.log'])
openapi_version = bc.get('slurm', 'openapi_version')
slurm_args = f'-s openapi/{openapi_version}'
# The following adds the db plugin we opted not to use for now
# slurm_args = f'-s openapi/{openapi_version},openapi/db{openapi_version}'
slurm_socket = paths.slurm_socket()
subprocess.run(['rm', '-f', slurm_socket], check=True)
# log.info("Attempting to open socket: {}".format(slurm_socket))
fp = open(slurmrestd_log, 'w', encoding='utf-8') # noqa
cmd = ['slurmrestd']
cmd.extend(slurm_args.split())
Expand Down Expand Up @@ -407,7 +409,9 @@ def start(foreground: bool = typer.Option(False, '--foreground', '-F',
# It's already running, so print an error and exit
warn(f'Beeflow appears to be running. Check the beeflow log: "{beeflow_log}"')
sys.exit(1)
print('Starting beeflow...')

version = importlib.metadata.version("hpc-beeflow")
print(f'Starting beeflow {version}...')
if not foreground:
print('Run `beeflow core status` for more information.')
# Create the log path if it doesn't exist yet
Expand Down Expand Up @@ -440,16 +444,26 @@ def status():
@app.command()
def stop(query='yes'):
"""Stop the current running beeflow daemon."""
if query == 'yes':
# Check workflow states; warn if there are active states, pause running workflows
workflow_list = bee_client.get_wf_list()
concern_states = {'Running', 'Initializing', 'Waiting'}
concern = {item for row in workflow_list for item in row}.intersection(concern_states)
# For the interactive case
if query == 'yes' and concern:
ans = input("""
** Please ensure all workflows are complete before stopping beeflow. **
** Check the status of workflows by running 'beeflow list'. **
** There are running workflows. **
** Running workflows will be paused. **
Are you sure you want to kill beeflow components? [y/n] """)
else:
ans = 'y'
if ans.lower() != 'y':
return
# Pause running or waiting workflows
workflow_list = bee_client.get_wf_list()
for _name, wf_id, state in workflow_list:
if state in {'Running', 'Waiting'}:
bee_client.pause(wf_id)
resp = cli_connection.send(paths.beeflow_socket(), {'type': 'quit'})
if resp is None:
beeflow_log = paths.log_fname('beeflow')
Expand All @@ -459,7 +473,7 @@ def stop(query='yes'):
sys.exit(1)
# As long as it returned something, we should be good
beeflow_log = paths.log_fname('beeflow')
if query == "no":
if query == "yes":
print(f'Beeflow has stopped. Check the log at "{beeflow_log}".')


Expand Down
2 changes: 1 addition & 1 deletion beeflow/common/config_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def validate_chrun_opts(opts):
default=(shutil.which('slurmrestd') is None),
info='if set, use slurm cli commands instead of slurmrestd')
DEFAULT_SLURMRESTD_SOCK = join_path('/tmp', f'slurm_{USER}_{random.randint(1, 10000)}.sock')
VALIDATOR.option('slurm', 'openapi_version', default='v0.0.38',
VALIDATOR.option('slurm', 'openapi_version', default='v0.0.39',
info='openapi version to use for slurmrestd')
# Scheduler
VALIDATOR.section('scheduler', info='Scheduler configuration section.')
Expand Down
4 changes: 2 additions & 2 deletions beeflow/common/gdb/neo4j_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ def pause_workflow(self):
def resume_workflow(self):
"""Resume execution of a paused workflow in Neo4j.
Sets workflow state to 'PAUSED'
Sets workflow state to 'RUNNING'
"""
with self._driver.session() as session:
session.write_transaction(tx.set_workflow_state, state='RESUME')
session.write_transaction(tx.set_workflow_state, state='RUNNING')

def reset_workflow(self, new_id):
"""Reset the execution state of an entire workflow.
Expand Down
49 changes: 24 additions & 25 deletions beeflow/common/worker/slurm_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
from beeflow.common import log as bee_logging
from beeflow.common.worker.worker import (Worker, WorkerError)
from beeflow.common import validation
from beeflow.common.worker.utils import get_state_sacct
from beeflow.common.worker.utils import parse_key_val


log = bee_logging.setup(__name__)

Expand Down Expand Up @@ -167,17 +170,18 @@ def query_task(self, job_id):
"""Worker queries job; returns job_state."""
try:
resp = self.session.get(f'{self.slurm_url}/job/{job_id}')

if resp.status_code != 200:
raise WorkerError(f'Failed to query job {job_id}')
data = json.loads(resp.text)
# Check for errors in the response
check_slurm_error(data, f'Failed to query job {job_id}')
# For some versions of slurm, the job_state isn't included on failure
try:
job_state = data['jobs'][0]['job_state']
except (KeyError, IndexError) as exc:
raise WorkerError(f'Failed to query job {job_id}') from exc
if resp.status_code == 200:
data = json.loads(resp.text)
# Check for errors in the response
check_slurm_error(data, f'Failed to query job {job_id}, slurm error.')
# For some versions of slurm, the job_state isn't included on failure
try:
job_state = data['jobs'][0]['job_state']
except (KeyError, IndexError) as exc:
raise WorkerError(f'Failed to query job {job_id}') from exc
else:
# If slurmrestd does not find job make last attempt with sacct command
job_state = get_state_sacct(job_id)
except requests.exceptions.ConnectionError:
job_state = "NOT_RESPONDING"
return job_state
Expand Down Expand Up @@ -207,24 +211,19 @@ class SlurmCLIWorker(BaseSlurmWorker):

def query_task(self, job_id):
"""Query job state for the task."""
# Use scontrol since it gives a lot of useful info
# Use scontrol since it gives a lot of useful info; may want to save info
try:
res = subprocess.run(['scontrol', 'show', 'job', str(job_id)],
text=True, check=True, stdout=subprocess.PIPE)
except subprocess.CalledProcessError:
raise WorkerError(
f'Failed to query job {job_id}'
) from None

def parse_key_val(pair):
"""Parse the key-value pair."""
i = pair.find('=')
return (pair[:i], pair[i + 1:])

# Output is in a space-separated list of 'key=value' pairs
pairs = res.stdout.split()
key_vals = dict(parse_key_val(pair) for pair in pairs)
return key_vals['JobState']
# If show job cannot find job get state using sacct (not same info)
job_state = get_state_sacct(job_id)
else:
# Output is in a space-separated list of 'key=value' pairs
pairs = res.stdout.split()
key_vals = dict(parse_key_val(pair) for pair in pairs)
job_state = key_vals['JobState']
return job_state

def cancel_task(self, job_id):
"""Cancel task with job_id; returns job_state."""
Expand Down
25 changes: 25 additions & 0 deletions beeflow/common/worker/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""Worker utility functions."""

import subprocess
from beeflow.common.worker.worker import WorkerError


def get_state_sacct(job_id):
"""Get state from slurm using sacct command, used when other means fail."""
try:
resp = subprocess.run(['sacct', '--parsable', '-j', str(job_id)], text=True, check=True,
stdout=subprocess.PIPE)
data = resp.stdout.splitlines()
header, info = data
header = header.split('|')
info = info.split('|')
state_idx = header.index('State')
return info[state_idx]
except (subprocess.CalledProcessError, ValueError, KeyError) as exc:
raise WorkerError(f'sacct query failed for job {job_id}') from exc


def parse_key_val(pair):
"""Parse the key-value pair separated by '='."""
i = pair.find('=')
return (pair[:i], pair[i + 1:])
7 changes: 3 additions & 4 deletions beeflow/task_manager/background.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def submit_task(db, worker, task):
resolve_environment(task)
log.info(f'Environment preparation complete for task {task.name}')
job_id, job_state = worker.submit_task(task)
log.info(f'Job Submitted {task.name}: job_id: {job_id} job_state: {job_state}')
log.info(f"Job Submitted '{task.name}' job_id: {job_id} job_state: {job_state}")
# place job in queue to monitor
db.job_queue.push(task=task, job_id=job_id, job_state=job_state)
# update_task_metadata(task.id, task_metadata)
Expand Down Expand Up @@ -75,10 +75,10 @@ def update_jobs(db):
# If state changes update the WFM
if job_state != new_job_state:
db.job_queue.update_job_state(id_, new_job_state)
log.info(f"Job Updated '{task.name}' job_id: {job_id} job_state: {new_job_state}")
if new_job_state in ('FAILED', 'TIMELIMIT', 'TIMEOUT'):
# Harvest lastest checkpoint file.
task_checkpoint = task.get_full_requirement('beeflow:CheckpointRequirement')
log.info(f'state: {new_job_state}')
log.info(f'TIMELIMIT/TIMEOUT task_checkpoint: {task_checkpoint}')
if task_checkpoint:
try:
Expand All @@ -94,15 +94,14 @@ def update_jobs(db):
# States are based on https://slurm.schedmd.com/squeue.html#SECTION_JOB-STATE-CODES
elif new_job_state in ('BOOT_FAIL', 'NODE_FAIL', 'OUT_OF_MEMORY', 'PREEMPTED'):
# Don't update wfm, just resubmit
log.info(f'Task {task.name} in state {new_job_state}')
log.info(f'Resubmitting task {task.name}')
db.job_queue.remove_by_id(id_)
job_state = submit_task(db, worker, task)
db.update_queue.push(task.workflow_id, task.id, job_state)
else:
db.update_queue.push(task.workflow_id, task.id, new_job_state)

if job_state in ('ZOMBIE', 'COMPLETED', 'CANCELLED', 'FAILED', 'TIMEOUT', 'TIMELIMIT'):
if job_state in ('UNKNOWN', 'COMPLETED', 'CANCELLED', 'FAILED', 'TIMEOUT', 'TIMELIMIT'):
# Remove from the job queue. Our job is finished
db.job_queue.remove_by_id(id_)

Expand Down
8 changes: 4 additions & 4 deletions beeflow/wf_manager/resources/wf_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def delete(self, wf_id):
wfi.finalize_workflow()
wf_utils.update_wf_status(wf_id, 'Cancelled')
db.workflows.update_workflow_state(wf_id, 'Cancelled')
log.info("Workflow cancelled")
log.info(f"Workflow {wf_id} cancelled")
log.info("Shutting down gdb")
pid = db.workflows.get_gdb_pid(wf_id)
dep_manager.kill_gdb(pid)
Expand All @@ -91,19 +91,19 @@ def patch(self, wf_id):
wfi = wf_utils.get_workflow_interface(wf_id)
log.info('Pausing/resuming workflow')
wf_state = wfi.get_workflow_state()
if option == 'pause' and wf_state == 'RUNNING':
if option == 'pause' and wf_state in ('RUNNING', 'INITIALIZING'):
wfi.pause_workflow()
wf_utils.update_wf_status(wf_id, 'Paused')
db.workflows.update_workflow_state(wf_id, 'Paused')
log.info("Workflow Paused")
log.info(f"Workflow {wf_id} Paused")
resp = make_response(jsonify(status='Workflow Paused'), 200)
elif option == 'resume' and wf_state == 'PAUSED':
wfi.resume_workflow()
tasks = wfi.get_ready_tasks()
wf_utils.schedule_submit_tasks(wf_id, tasks)
wf_utils.update_wf_status(wf_id, 'Running')
db.workflows.update_workflow_state(wf_id, 'Running')
log.info("Workflow Resumed")
log.info(f"Workflow {wf_id} Resumed")
resp = make_response(jsonify(status='Workflow Resumed'), 200)
else:
resp_msg = f'Cannot {option} workflow. It is currently {wf_state.lower()}.'
Expand Down
2 changes: 1 addition & 1 deletion beeflow/wf_manager/resources/wf_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,13 @@ def handle_state_change(self, state_update, task, wfi, db):
else:
wfi.set_task_output(task, output.id, "temp")
tasks = wfi.finalize_task(task)
log.info(f'next tasks to run: {tasks}')
wf_state = wfi.get_workflow_state()
if tasks and wf_state != 'PAUSED':
wf_utils.schedule_submit_tasks(state_update.wf_id, tasks)

if wfi.workflow_completed():
wf_id = wfi.workflow_id
log.info(f"Workflow {wf_id} Completed")
archive_workflow(db, state_update.wf_id)
log.info('Workflow Completed')

Expand Down
11 changes: 8 additions & 3 deletions docs/sphinx/commands.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.. _command-line-interface:
. _command-line-interface:

Command Line Interface
**********************
Expand All @@ -18,11 +18,11 @@ Options:

``beeflow core status``: Check the status of beeflow and the components.

``beeflow core stop``: Stop the current running beeflow daemon.
``beeflow core stop``: Stop running beeflow components. Active workflows will be paused. You may continue running paused workflows with the ``beeflow resume <wf_id>`` command. Once you start beeflow components after a stop, you should check the status of workflows, query any running workflows. If they were intializing when a ``beeflow core stop`` was issued, the workflow may be running with tasks stuck in the waiting state. If this occurs and you want the workflow to continue pause and resume the workflow (``beeflow pause <wf_id>``, ``beeflow resume <wf_id>``) or to start over cancel the workflow (``beeflow cancel <wf_id>``) and resubmit it.

``beeflow core --version``: Display the version number of BEE.

``beeflow core reset``: Stop the beeflow daemon and cleanup the bee_workdir directory to start from a fresh install.
``beeflow core reset``: Stop the beeflow daemon and cleanup the bee_workdir directory to start from a fresh install.

Options:
``--archive``, ``-a``, Backup logs, workflows, and containers in bee_workdir directory before removal. [optional]
Expand Down Expand Up @@ -57,6 +57,11 @@ Arguments:

``beeflow package``: Package a workflow into a tarball.

Arguments:
- WF_PATH Path to the workflow package directory [required]
- PACKAGE_DEST Path for where the packaged workflow should be saved [required]


Arguments:
- WF_PATH, Path to the workflow package directory [required]
- PACKAGE_DEST, Path for where the packaged workflow should be saved [required]
Expand Down

0 comments on commit b4bf36a

Please sign in to comment.