From 7b2bbdebe4d997f48e29e0a35a626413c119d578 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Fri, 23 Feb 2024 12:21:42 -0700 Subject: [PATCH 1/3] Resubmit tasks in TM for specific failures The Task Manager will now resubmit jobs that failed due to error statuses indicating failures not caused by the job itself, such as NODE_FAIL. These are all Slurm state codes right now and may not be applicable to other schedulers like Flux. --- beeflow/task_manager/background.py | 53 +++++++++++++++++++----------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/beeflow/task_manager/background.py b/beeflow/task_manager/background.py index c07a4ca35..107972c1d 100644 --- a/beeflow/task_manager/background.py +++ b/beeflow/task_manager/background.py @@ -38,6 +38,29 @@ def resolve_environment(task): build_main(task) +def submit_task(db, worker, task): + """Submit (or resubmit) a task.""" + try: + log.info(f'Resolving environment for task {task.name}') + resolve_environment(task) + log.info(f'Environment preparation complete for task {task.name}') + job_id, job_state = worker.submit_task(task) + log.info(f'Job Submitted {task.name}: job_id: {job_id} job_state: {job_state}') + # place job in queue to monitor + db.job_queue.push(task=task, job_id=job_id, job_state=job_state) + # update_task_metadata(task.id, task_metadata) + except Exception as err: # noqa (we have to catch everything here) + # Set job state to failed + job_state = 'SUBMIT_FAIL' + log.error(f'Task Manager submit task {task.name} failed! \n {err}') + log.error(f'{task.name} state: {job_state}') + # Log the traceback information as well + log.error(traceback.format_exc()) + # Send the initial state to WFM + # update_task_state(task.id, job_state, metadata=task_metadata) + return job_state + + def submit_jobs(): """Submit all jobs currently in submit queue to the workload scheduler.""" db = utils.connect_db() @@ -45,26 +68,8 @@ def submit_jobs(): while db.submit_queue.count() >= 1: # Single value dictionary task = db.submit_queue.pop() - try: - log.info(f'Resolving environment for task {task.name}') - resolve_environment(task) - log.info(f'Environment preparation complete for task {task.name}') - job_id, job_state = worker.submit_task(task) - log.info(f'Job Submitted {task.name}: job_id: {job_id} job_state: {job_state}') - # place job in queue to monitor - db.job_queue.push(task=task, job_id=job_id, job_state=job_state) - # update_task_metadata(task.id, task_metadata) - except Exception as err: # noqa (we have to catch everything here) - # Set job state to failed - job_state = 'SUBMIT_FAIL' - log.error(f'Task Manager submit task {task.name} failed! \n {err}') - log.error(f'{task.name} state: {job_state}') - # Log the traceback information as well - log.error(traceback.format_exc()) - finally: - # Send the initial state to WFM - # update_task_state(task.id, job_state, metadata=task_metadata) - update_task_state(task.workflow_id, task.id, job_state) + job_state = submit_task(db, worker, task) + update_task_state(task.workflow_id, task.id, job_state) def update_jobs(): @@ -99,6 +104,14 @@ def update_jobs(): update_task_state(task.workflow_id, task.id, 'FAILED') else: update_task_state(task.workflow_id, task.id, new_job_state) + # States are based on https://slurm.schedmd.com/squeue.html#SECTION_JOB-STATE-CODES + elif new_job_state in ('BOOT_FAIL', 'NODE_FAIL', 'OUT_OF_MEMORY', 'PREEMPTED'): + # Don't update wfm, just resubmit + log.info(f'Task {task.name} in state {new_job_state}') + log.info(f'Resubmitting task {task.name}') + db.job_queue.remove_by_id(id_) + job_state = submit_task(db, worker, task) + update_task_state(task.workflow_id, task.id, job_state) else: update_task_state(task.workflow_id, task.id, new_job_state) From a46fab83c9ed8597dbb12205e51d3746b2be46e5 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Fri, 23 Feb 2024 13:13:10 -0700 Subject: [PATCH 2/3] Add integration test for builder failures --- beeflow/client/bee_client.py | 8 ++--- beeflow/common/integration/utils.py | 11 +++++++ beeflow/common/integration_test.py | 17 ++++++++-- beeflow/wf_manager/resources/wf_actions.py | 4 +-- .../build-failure/Dockerfile.build-failure | 3 ++ ci/test_workflows/build-failure/input.yml | 1 + ci/test_workflows/build-failure/workflow.cwl | 32 +++++++++++++++++++ 7 files changed, 66 insertions(+), 10 deletions(-) create mode 100644 ci/test_workflows/build-failure/Dockerfile.build-failure create mode 100644 ci/test_workflows/build-failure/input.yml create mode 100644 ci/test_workflows/build-failure/workflow.cwl diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index b17bc1f2b..e868a8bde 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -459,11 +459,9 @@ def query(wf_id: str = typer.Argument(..., callback=match_short_id)): tasks_status = resp.json()['tasks_status'] wf_status = resp.json()['wf_status'] - if tasks_status == 'Unavailable': - typer.echo(wf_status) - else: - typer.echo(wf_status) - typer.echo(tasks_status) + typer.echo(wf_status) + for _task_id, task_name, task_state in tasks_status: + typer.echo(f'{task_name}--{task_state}') logging.info('Query workflow: {resp.text}') return wf_status, tasks_status diff --git a/beeflow/common/integration/utils.py b/beeflow/common/integration/utils.py index 2bf4435e1..e83174f41 100644 --- a/beeflow/common/integration/utils.py +++ b/beeflow/common/integration/utils.py @@ -91,6 +91,11 @@ def status(self): """Get the status of the workflow.""" return bee_client.query(self.wf_id)[0] + @property + def task_states(self): + """Get the task states of the workflow.""" + return bee_client.query(self.wf_id)[1] + def cleanup(self): """Clean up any leftover workflow data.""" # Remove the generated tarball @@ -234,3 +239,9 @@ def check_path_exists(path): def check_completed(workflow): """Ensure the workflow has a completed status.""" ci_assert(workflow.status == 'Archived', f'Bad workflow status {workflow.status}') + + +def check_workflow_failed(workflow): + """Ensure that the workflow completed in a Failed state.""" + ci_assert(workflow.status == 'Failed', + f'Workflow did not fail as expected (final status: {workflow.status})') diff --git a/beeflow/common/integration_test.py b/beeflow/common/integration_test.py index 704616c66..822e8c1b0 100644 --- a/beeflow/common/integration_test.py +++ b/beeflow/common/integration_test.py @@ -194,6 +194,20 @@ def multiple_workflows(outer_workdir): utils.check_path_exists(path) +@TEST_RUNNER.add() +def build_failure(outer_workdir): + """Test running a workflow with a bad container.""" + workdir = os.path.join(outer_workdir, uuid.uuid4().hex) + os.makedirs(workdir) + workflow = utils.Workflow('build-failure', 'ci/test_workflows/build-failure', + main_cwl='workflow.cwl', job_file='input.yml', + workdir=workdir, containers=[]) + yield [workflow] + utils.check_workflow_failed(workflow) + # Only one task + util.ci_assert(workflow.task_states[0][2] == 'BUILD_FAILED') + + @TEST_RUNNER.add(ignore=True) def checkpoint_restart(outer_workdir): """Test the clamr-ffmpeg checkpoint restart workflow.""" @@ -220,8 +234,7 @@ def checkpoint_restart_failure(outer_workdir): main_cwl='workflow.cwl', job_file='input.yml', workdir=workdir, containers=[]) yield [workflow] - utils.ci_assert(workflow.status == 'Failed', - f'Workflow did not fail as expected (final status: {workflow.status})') + utils.check_workflow_failed(workflow) def test_input_callback(arg): diff --git a/beeflow/wf_manager/resources/wf_actions.py b/beeflow/wf_manager/resources/wf_actions.py index 49b555483..d19e54ea7 100644 --- a/beeflow/wf_manager/resources/wf_actions.py +++ b/beeflow/wf_manager/resources/wf_actions.py @@ -42,13 +42,11 @@ def get(wf_id): if not tasks: log.info(f"Bad query for wf {wf_id}.") wf_status = 'No workflow with that ID is currently loaded' - tasks_status.append('Unavailable') resp = make_response(jsonify(tasks_status=tasks_status, wf_status=wf_status, status='not found'), 404) for task in tasks: - tasks_status.append(f"{task.name}--{task.state}") - tasks_status = '\n'.join(tasks_status) + tasks_status.append((task.id, task.name, task.state)) wf_status = db.workflows.get_workflow_state(wf_id) resp = make_response(jsonify(tasks_status=tasks_status, diff --git a/ci/test_workflows/build-failure/Dockerfile.build-failure b/ci/test_workflows/build-failure/Dockerfile.build-failure new file mode 100644 index 000000000..745fdb930 --- /dev/null +++ b/ci/test_workflows/build-failure/Dockerfile.build-failure @@ -0,0 +1,3 @@ +FROM some_nonexistent_container + +RUN touch /file diff --git a/ci/test_workflows/build-failure/input.yml b/ci/test_workflows/build-failure/input.yml new file mode 100644 index 000000000..633cf8836 --- /dev/null +++ b/ci/test_workflows/build-failure/input.yml @@ -0,0 +1 @@ +fname: /file diff --git a/ci/test_workflows/build-failure/workflow.cwl b/ci/test_workflows/build-failure/workflow.cwl new file mode 100644 index 000000000..b71987602 --- /dev/null +++ b/ci/test_workflows/build-failure/workflow.cwl @@ -0,0 +1,32 @@ +# Dummy workflow designed to fail at the container build stage +class: Workflow +cwlVersion: v1.2 + +inputs: + fname: string + +outputs: + step0_stdout: + type: File + outputSource: step0/step0_stdout + +steps: + step0: + run: + class: CommandLineTool + baseCommand: ls + stdout: step0_stdout.txt + inputs: + fname: + type: string + inputBinding: {} + outputs: + step0_stdout: + type: stdout + in: + fname: fname + out: [step0_stdout] + hints: + DockerRequirement: + dockerFile: "Dockerfile.build-failure" + beeflow:containerName: "build-failure" From d70f247a861396becc863c08fd7aeccd0d8d7d97 Mon Sep 17 00:00:00 2001 From: Jake Tronge Date: Fri, 23 Feb 2024 14:43:41 -0700 Subject: [PATCH 3/3] Partial builder refactor for better error handling Adds a ContainerBuildError exception to be thrown on builder failures. The Task Manager catches this and sets task states to BUILD_FAIL. --- beeflow/common/build/build_driver.py | 21 ------------------ beeflow/common/build/container_drivers.py | 27 ++++++++++++----------- beeflow/common/build/utils.py | 26 ++++++++++++++++++++++ beeflow/common/build_interfaces.py | 8 ++++--- beeflow/common/crt/charliecloud_driver.py | 2 +- beeflow/common/crt/singularity_driver.py | 2 +- beeflow/common/integration/utils.py | 4 ++-- beeflow/common/integration_test.py | 4 +++- beeflow/task_manager/background.py | 5 +++++ beeflow/tests/test_wf_manager.py | 3 ++- beeflow/wf_manager/resources/wf_update.py | 7 ++++++ 11 files changed, 66 insertions(+), 43 deletions(-) create mode 100644 beeflow/common/build/utils.py diff --git a/beeflow/common/build/build_driver.py b/beeflow/common/build/build_driver.py index b76a64136..84ae5abb7 100644 --- a/beeflow/common/build/build_driver.py +++ b/beeflow/common/build/build_driver.py @@ -1,27 +1,6 @@ """Abstract base class for the handling build systems.""" from abc import ABC, abstractmethod -import jsonpickle - - -def arg2task(task_arg): - """Convert JSON encoded task to Task object. - - The build driver will expect a Task object, and the build - interface starts with a JSON representation of the Task object. - """ - return jsonpickle.decode(task_arg) - - -def task2arg(task): - """Convert Task object to JSON encoded string. - - The build interface needs to pass Task data on the command line, - because each compute node needs to understand the Task description. - JSON format is a convenient way to describe the Task object at the - command line. - """ - return jsonpickle.encode(task) class BuildDriver(ABC): diff --git a/beeflow/common/build/container_drivers.py b/beeflow/common/build/container_drivers.py index 06f53423d..555356dac 100644 --- a/beeflow/common/build/container_drivers.py +++ b/beeflow/common/build/container_drivers.py @@ -9,6 +9,7 @@ import tempfile from beeflow.common.config_driver import BeeConfig as bc from beeflow.common import log as bee_logging +from beeflow.common.build.utils import ContainerBuildError from beeflow.common.build.build_driver import BuildDriver from beeflow.common.crt.charliecloud_driver import CharliecloudDriver as crt_driver @@ -125,8 +126,7 @@ def process_docker_pull(self, addr=None, force=False): # If Requirement is set but not specified, and param empty, do nothing and error. if self.task.requirements == {} and not addr: - log.error("dockerPull set but no image path specified.") - return 1 + raise ContainerBuildError("dockerPull set but no image path specified.") # If no image specified and no image required, nothing to do. if not task_addr and not addr: log.info('No image specified and no image required, nothing to do.') @@ -170,8 +170,7 @@ def process_docker_load(self): log.warning('Charliecloud does not have the concept of a layered image tarball.') log.warning('Did you mean to use dockerImport?') if req_dockerload: - log.warning('dockerLoad specified as requirement.') - return 1 + raise ContainerBuildError('dockerLoad is not supported') return 0 def process_docker_file(self, task_dockerfile=None, force=False): @@ -185,14 +184,14 @@ def process_docker_file(self, task_dockerfile=None, force=False): # beeflow:containerName is always processed before dockerFile, so safe to assume it exists # otherwise, raise an error. if self.container_name is None: - log.error("dockerFile may not be specified without beeflow:containerName") - return 1 + raise ContainerBuildError( + "dockerFile may not be specified without beeflow:containerName" + ) # Need dockerfile in order to build, else fail task_dockerfile = self.get_docker_req('dockerFile') if not task_dockerfile: - log.error("dockerFile not specified as task attribute or parameter.") - return 1 + raise ContainerBuildError("dockerFile not specified as task attribute or parameter.") # Create context directory to use as Dockerfile context, use container name so user # can prep the directory with COPY sources as needed. @@ -281,7 +280,7 @@ def process_docker_image_id(self, param_imageid=None): # If task and parameter still doesn't specify ImageId, consider this an error. if not self.docker_image_id: return 0 - return 1 + raise ContainerBuildError('dockerImageId is required but not found') def process_docker_output_directory(self, param_output_directory=None): """Get and process the CWL compliant dockerOutputDirectory dockerRequirement. @@ -308,8 +307,9 @@ def process_copy_container(self, force=False): # Need container_path to know how dockerfile should be named, else fail task_container_path = self.get_docker_req('beeflow:copyContainer') if not task_container_path: - log.error("beeflow:copyContainer: You must specify the path to an existing container.") - return 1 + raise ContainerBuildError( + "beeflow:copyContainer: You must specify the path to an existing container." + ) if self.container_name: copy_target = '/'.join([self.container_archive, self.container_name + '.tar.gz']) @@ -345,8 +345,9 @@ def process_container_name(self): """ task_container_name = self.get_docker_req('beeflow:containerName') if not task_container_name and self.docker_image_id is None: - log.error("beeflow:containerName: You must specify the containerName or dockerImageId") - return 1 + raise ContainerBuildError( + "beeflow:containerName: You must specify the containerName or dockerImageId" + ) self.container_name = task_container_name log.info(f'Setting container_name to: {self.container_name}') return 0 diff --git a/beeflow/common/build/utils.py b/beeflow/common/build/utils.py new file mode 100644 index 000000000..1cd32eba4 --- /dev/null +++ b/beeflow/common/build/utils.py @@ -0,0 +1,26 @@ +"""Container build utility code.""" +import jsonpickle + + +def arg2task(task_arg): + """Convert JSON encoded task to Task object. + + The build driver will expect a Task object, and the build + interface starts with a JSON representation of the Task object. + """ + return jsonpickle.decode(task_arg) + + +def task2arg(task): + """Convert Task object to JSON encoded string. + + The build interface needs to pass Task data on the command line, + because each compute node needs to understand the Task description. + JSON format is a convenient way to describe the Task object at the + command line. + """ + return jsonpickle.encode(task) + + +class ContainerBuildError(Exception): + """Cotnainer build error class.""" diff --git a/beeflow/common/build_interfaces.py b/beeflow/common/build_interfaces.py index 95288009f..1aa1944a7 100644 --- a/beeflow/common/build_interfaces.py +++ b/beeflow/common/build_interfaces.py @@ -14,7 +14,7 @@ from beeflow.common.build.container_drivers import CharliecloudBuildDriver from beeflow.common.config_driver import BeeConfig as bc from beeflow.common import log as bee_logging -from beeflow.common.build.build_driver import arg2task +from beeflow.common.build.utils import arg2task, ContainerBuildError log = bee_logging.setup(__name__) @@ -47,9 +47,11 @@ def build_main(task): return_code = return_obj.returncode except AttributeError: return_code = int(return_obj) - except CalledProcessError: + except CalledProcessError as error: return_code = 1 - log.warning(f'There was a problem executing {op_dict["op_name"]}!') + raise ContainerBuildError( + f'There was a problem executing {op_dict["op_name"]}!' + ) from error # Case 1: Not the last operation spec'd, but is a terminal operation. if op_dict["op_terminal"] and return_code == 0: op_values = [None, None, None, True] diff --git a/beeflow/common/crt/charliecloud_driver.py b/beeflow/common/crt/charliecloud_driver.py index 19d0fafe9..c9218d230 100644 --- a/beeflow/common/crt/charliecloud_driver.py +++ b/beeflow/common/crt/charliecloud_driver.py @@ -8,7 +8,7 @@ from beeflow.common.crt.crt_driver import (ContainerRuntimeDriver, ContainerRuntimeResult, Command, CommandType) from beeflow.common.config_driver import BeeConfig as bc -from beeflow.common.build.build_driver import task2arg +from beeflow.common.build.utils import task2arg from beeflow.common.container_path import convert_path from beeflow.common import log as bee_logging diff --git a/beeflow/common/crt/singularity_driver.py b/beeflow/common/crt/singularity_driver.py index 829a66d0c..7461f44a0 100644 --- a/beeflow/common/crt/singularity_driver.py +++ b/beeflow/common/crt/singularity_driver.py @@ -4,7 +4,7 @@ """ from beeflow.common.crt.crt_driver import (ContainerRuntimeDriver, ContainerRuntimeResult, Command) -from beeflow.common.build.build_driver import task2arg +from beeflow.common.build.utils import task2arg class SingularityDriver(ContainerRuntimeDriver): diff --git a/beeflow/common/integration/utils.py b/beeflow/common/integration/utils.py index e83174f41..981e0b97a 100644 --- a/beeflow/common/integration/utils.py +++ b/beeflow/common/integration/utils.py @@ -238,10 +238,10 @@ def check_path_exists(path): def check_completed(workflow): """Ensure the workflow has a completed status.""" - ci_assert(workflow.status == 'Archived', f'Bad workflow status {workflow.status}') + ci_assert(workflow.status == 'Archived', f'bad workflow status {workflow.status}') def check_workflow_failed(workflow): """Ensure that the workflow completed in a Failed state.""" ci_assert(workflow.status == 'Failed', - f'Workflow did not fail as expected (final status: {workflow.status})') + f'workflow did not fail as expected (final status: {workflow.status})') diff --git a/beeflow/common/integration_test.py b/beeflow/common/integration_test.py index 822e8c1b0..1dbc5b65b 100644 --- a/beeflow/common/integration_test.py +++ b/beeflow/common/integration_test.py @@ -205,7 +205,9 @@ def build_failure(outer_workdir): yield [workflow] utils.check_workflow_failed(workflow) # Only one task - util.ci_assert(workflow.task_states[0][2] == 'BUILD_FAILED') + task_state = workflow.task_states[0][2] + utils.ci_assert(task_state == 'BUILD_FAIL', + f'task was not in state BUILD_FAIL as expected: {task_state}') @TEST_RUNNER.add(ignore=True) diff --git a/beeflow/task_manager/background.py b/beeflow/task_manager/background.py index 107972c1d..2c1360fbd 100644 --- a/beeflow/task_manager/background.py +++ b/beeflow/task_manager/background.py @@ -7,6 +7,7 @@ import jsonpickle from beeflow.task_manager import utils from beeflow.common import log as bee_logging +from beeflow.common.build.utils import ContainerBuildError from beeflow.common.build_interfaces import build_main @@ -49,6 +50,10 @@ def submit_task(db, worker, task): # place job in queue to monitor db.job_queue.push(task=task, job_id=job_id, job_state=job_state) # update_task_metadata(task.id, task_metadata) + except ContainerBuildError as err: + job_state = 'BUILD_FAIL' + log.error(f'Failed to build container for {task.name}: {err}') + log.error(f'{task.name} state: {job_state}') except Exception as err: # noqa (we have to catch everything here) # Set job state to failed job_state = 'SUBMIT_FAIL' diff --git a/beeflow/tests/test_wf_manager.py b/beeflow/tests/test_wf_manager.py index 7d238e874..0abc0443f 100644 --- a/beeflow/tests/test_wf_manager.py +++ b/beeflow/tests/test_wf_manager.py @@ -190,7 +190,8 @@ def test_workflow_status(client, mocker, setup_teardown_workflow, temp_db): temp_db.workflows.add_task(124, WF_ID, 'task', "RUNNING") resp = client().get(f'/bee_wfm/v1/jobs/{WF_ID}') - assert 'RUNNING' in resp.json['tasks_status'] + tasks_status = resp.json['tasks_status'] + assert tasks_status[0][2] == 'RUNNING' or tasks_status[1][2] == 'RUNNING' def test_cancel_workflow(client, mocker, setup_teardown_workflow, temp_db): diff --git a/beeflow/wf_manager/resources/wf_update.py b/beeflow/wf_manager/resources/wf_update.py index 5fe70ed32..8e557a163 100644 --- a/beeflow/wf_manager/resources/wf_update.py +++ b/beeflow/wf_manager/resources/wf_update.py @@ -99,6 +99,7 @@ def put(self): if new_task is None: log.info('No more restarts') wf_state = wfi.get_workflow_state() + wfi.set_workflow_state('Failed') wf_utils.update_wf_status(wf_id, 'Failed') db.workflows.update_workflow_state(wf_id, 'Failed') return make_response(jsonify(status=f'Task {task_id} set to {job_state}')) @@ -133,6 +134,12 @@ def put(self): pid = db.workflows.get_gdb_pid(wf_id) dep_manager.kill_gdb(pid) + if job_state == 'BUILD_FAIL': + log.error(f'Workflow failed due to failed container build for task {task.name}') + wfi.set_workflow_state('Failed') + wf_utils.update_wf_status(wf_id, 'Failed') + db.workflows.update_workflow_state(wf_id, 'Failed') + resp = make_response(jsonify(status=(f'Task {task_id} belonging to WF {wf_id} set to' f'{job_state}')), 200) return resp