diff --git a/beeflow/client/bee_client.py b/beeflow/client/bee_client.py index ac024c705..7bc1b53e3 100644 --- a/beeflow/client/bee_client.py +++ b/beeflow/client/bee_client.py @@ -464,7 +464,10 @@ def query(wf_id: str = typer.Argument(..., callback=match_short_id)): wf_status = resp.json()['wf_status'] typer.echo(wf_status) for _task_id, task_name, task_state in tasks_status: - typer.echo(f'{task_name}--{task_state}') + if wf_status == 'No Start': + typer.echo(f'{task_name}') + else: + typer.echo(f'{task_name}--{task_state}') logging.info('Query workflow: {resp.text}') return wf_status, tasks_status @@ -525,7 +528,7 @@ def cancel(wf_id: str = typer.Argument(..., callback=match_short_id)): """Cancel a paused or running workflow.""" long_wf_id = wf_id wf_status = get_wf_status(wf_id) - if wf_status in ('Running', 'Paused'): + if wf_status in ('Running', 'Paused', 'No Start'): try: conn = _wfm_conn() resp = conn.delete(_resource(long_wf_id), json={'option': 'cancel'}, timeout=60) diff --git a/beeflow/common/gdb/gdb_driver.py b/beeflow/common/gdb/gdb_driver.py index 085ea387d..9218dc06e 100644 --- a/beeflow/common/gdb/gdb_driver.py +++ b/beeflow/common/gdb/gdb_driver.py @@ -57,7 +57,7 @@ def reset_workflow(self, new_id): """ @abstractmethod - def load_task(self, task): + def load_task(self, task, task_state): """Load a task into a stored workflow. Dependencies should be automatically deduced and generated by the graph database diff --git a/beeflow/common/gdb/neo4j_cypher.py b/beeflow/common/gdb/neo4j_cypher.py index 14346460a..6a6cb1f1c 100644 --- a/beeflow/common/gdb/neo4j_cypher.py +++ b/beeflow/common/gdb/neo4j_cypher.py @@ -183,7 +183,7 @@ def create_task_output_nodes(tx, task): value=output.value, glob=output.glob) -def create_task_metadata_node(tx, task): +def create_task_metadata_node(tx, task, task_state): """Create a task metadata node in the Neo4j database. The node holds metadata about a task's execution state. @@ -192,9 +192,9 @@ def create_task_metadata_node(tx, task): :type task: Task """ metadata_query = ("MATCH (t:Task {id: $task_id}) " - "CREATE (m:Metadata {state: 'WAITING'})-[:DESCRIBES]->(t)") + "CREATE (m:Metadata {state: $task_state})-[:DESCRIBES]->(t)") - tx.run(metadata_query, task_id=task.id) + tx.run(metadata_query, task_id=task.id, task_state=task_state) def add_dependencies(tx, task, old_task=None, restarted_task=False): @@ -329,7 +329,7 @@ def get_workflow_tasks(tx, wf_id): :type wf_id: str :rtype: neo4j.Result """ - workflow_query = "MATCH t:Task WHERE t.workflow_id = $wf_id RETURN t" + workflow_query = "MATCH (t:Task) WHERE t.workflow_id = $wf_id RETURN t" return [rec['t'] for rec in tx.run(workflow_query, wf_id=wf_id)] diff --git a/beeflow/common/gdb/neo4j_driver.py b/beeflow/common/gdb/neo4j_driver.py index 06b51ea40..48131a722 100644 --- a/beeflow/common/gdb/neo4j_driver.py +++ b/beeflow/common/gdb/neo4j_driver.py @@ -131,7 +131,7 @@ def reset_workflow(self, old_id, new_id): session.write_transaction(tx.reset_tasks_metadata, wf_id=old_id) session.write_transaction(tx.reset_workflow_id, old_id=old_id, new_id=new_id) - def load_task(self, task): + def load_task(self, task, task_state): """Load a task into a workflow stored in the Neo4j database. Dependencies are automatically deduced and generated by Neo4j upon loading @@ -148,7 +148,8 @@ def load_task(self, task): session.write_transaction(tx.create_task_requirement_nodes, task=task) session.write_transaction(tx.create_task_input_nodes, task=task) session.write_transaction(tx.create_task_output_nodes, task=task) - session.write_transaction(tx.create_task_metadata_node, task=task) + session.write_transaction(tx.create_task_metadata_node, task=task, + task_state=task_state) session.write_transaction(tx.add_dependencies, task=task) def initialize_ready_tasks(self, workflow_id): diff --git a/beeflow/common/wf_interface.py b/beeflow/common/wf_interface.py index 38395714e..8fe1054cd 100644 --- a/beeflow/common/wf_interface.py +++ b/beeflow/common/wf_interface.py @@ -73,7 +73,7 @@ def reset_workflow(self, workflow_id): self._workflow_id = workflow_id self._gdb_driver.set_workflow_state(self._workflow_id, 'SUBMITTED') - def add_task(self, task): + def add_task(self, task, task_state): """Add a new task to a BEE workflow. :param task: the name of the file to which to redirect stderr @@ -90,7 +90,7 @@ def add_task(self, task): task.hints = [] # Load the new task into the graph database - self._gdb_driver.load_task(task) + self._gdb_driver.load_task(task, task_state) def restart_task(self, task, checkpoint_file): """Restart a failed BEE workflow task. diff --git a/beeflow/tests/mocks.py b/beeflow/tests/mocks.py index b5d11c992..1d7cada86 100644 --- a/beeflow/tests/mocks.py +++ b/beeflow/tests/mocks.py @@ -166,10 +166,10 @@ def reset_workflow(self, old_id, new_id): #noqa not using parameter in mock self.task_metadata[task_id] = {} self.task_states[task_id] = 'WAITING' - def load_task(self, task): + def load_task(self, task, task_state): """Load a task into a workflow in the graph database.""" self.tasks[task.id] = task - self.task_states[task.id] = 'WAITING' + self.task_states[task.id] = task_state self.task_metadata[task.id] = {} self.inputs[task.id] = {} self.outputs[task.id] = {} @@ -186,7 +186,8 @@ def initialize_ready_tasks(self, workflow_id): #noqa not using parameter in mock def restart_task(self, _old_task, new_task): """Create a new task from a failed task checkpoint restart enabled.""" - self.load_task(new_task) + task_state = "WAITING" + self.load_task(new_task, task_state) def finalize_task(self, task): """Set a task's state to completed.""" diff --git a/beeflow/tests/test_wf_interface.py b/beeflow/tests/test_wf_interface.py index c3a72403e..84d9d53e5 100644 --- a/beeflow/tests/test_wf_interface.py +++ b/beeflow/tests/test_wf_interface.py @@ -168,7 +168,9 @@ def test_add_task(self): stderr=stderr, workflow_id=workflow_id) - self.wfi.add_task(task) + task_state = "WAITING" + + self.wfi.add_task(task, task_state) # Task object assertions self.assertEqual(task_name, task.name) @@ -225,7 +227,9 @@ def test_restart_task(self): stderr=stderr, workflow_id=workflow_id) - self.wfi.add_task(task) + task_state = "WAITING" + + self.wfi.add_task(task, task_state) # Restart the task, should create a new Task new_task = self.wfi.restart_task(task, test_checkpoint_file) @@ -322,7 +326,9 @@ def test_get_task_by_id(self): stderr=stderr, workflow_id=workflow_id) - self.wfi.add_task(task) + task_state = "WAITING" + + self.wfi.add_task(task, task_state) self.assertEqual(task, self.wfi.get_task_by_id(task.id)) @@ -435,9 +441,11 @@ def test_get_task_state(self): None)], [StepOutput("test_task/output", "File", "output.txt", "output.txt")], None, None, workflow_id) - self.wfi.add_task(task) + task_state = "WAITING" + + self.wfi.add_task(task, task_state) - # Should be WAITING because workflow not yet executed + # Should be WAITING self.assertEqual("WAITING", self.wfi.get_task_state(task)) def test_set_task_state(self): @@ -454,7 +462,8 @@ def test_set_task_state(self): None)], [StepOutput("test_task/output", "File", "output.txt", "output.txt")], None, None, workflow_id) - self.wfi.add_task(task) + task_state = "WAITING" + self.wfi.add_task(task, task_state) self.wfi.set_task_state(task, "RUNNING") @@ -475,7 +484,8 @@ def test_get_task_metadata(self): None)], [StepOutput("test_task/output", "File", "output.txt", "output.txt")], None, None, workflow_id) - self.wfi.add_task(task) + task_state = "WAITING" + self.wfi.add_task(task, task_state) metadata = {"cluster": "fog", "crt": "charliecloud", "container_md5": "67df538c1b6893f4276d10b2af34ccfe", "job_id": 1337} @@ -496,7 +506,8 @@ def test_set_task_metadata(self): None)], [StepOutput("test_task/output", "File", "output.txt", "output.txt")], None, None, workflow_id) - self.wfi.add_task(task) + task_state = "WAITING" + self.wfi.add_task(task, task_state) metadata = {"cluster": "fog", "crt": "charliecloud", "container_md5": "67df538c1b6893f4276d10b2af34ccfe", "job_id": 1337} @@ -522,7 +533,8 @@ def test_get_task_input(self): None)], [StepOutput("test_task/output", "File", "output.txt", "output.txt")], None, None, workflow_id) - self.wfi.add_task(task) + task_state = "WAITING" + self.wfi.add_task(task, task_state) self.assertEqual(task.inputs[0], self.wfi.get_task_input(task, "test_input")) @@ -539,7 +551,8 @@ def test_set_task_input(self): [StepInput("test_input", "File", None, "default.txt", "test_input", None, None, None)], [StepOutput("test_task/output", "File", "output.txt", "output.txt")], None, None, workflow_id) - self.wfi.add_task(task) + task_state = "WAITING" + self.wfi.add_task(task, task_state) test_input = StepInput("test_input", "File", "input.txt", "default.txt", "test_input", None, None, None) @@ -560,7 +573,8 @@ def test_get_task_output(self): None)], [StepOutput("test_task/output", "File", "output.txt", "output.txt")], None, None, workflow_id) - self.wfi.add_task(task) + task_state = "WAITING" + self.wfi.add_task(task, task_state) self.assertEqual(task.outputs[0], self.wfi.get_task_output(task, "test_task/output")) @@ -578,7 +592,8 @@ def test_set_task_output(self): None, None)], [StepOutput("test_task/output", "File", None, "output.txt")], None, None, workflow_id) - self.wfi.add_task(task) + task_state = "WAITING" + self.wfi.add_task(task, task_state) test_output = StepOutput("test_task/output", "File", "output.txt", "output.txt") self.wfi.set_task_output(task, "test_task/output", "output.txt") @@ -599,7 +614,8 @@ def test_workflow_completed(self): [StepOutput("test_task/output", "File", "output.txt", "output.txt")], None, None, workflow_id) - self.wfi.add_task(task) + task_state = "WAITING" + self.wfi.add_task(task, task_state) # Workflow not completed self.assertFalse(self.wfi.workflow_completed()) @@ -662,8 +678,9 @@ def _create_test_tasks(self, workflow_id): workflow_id=workflow_id) ] + task_state = "WAITING" for task in tasks: - self.wfi.add_task(task) + self.wfi.add_task(task, task_state) return tasks diff --git a/beeflow/wf_manager/resources/wf_utils.py b/beeflow/wf_manager/resources/wf_utils.py index ea073d49f..d7b46fb37 100644 --- a/beeflow/wf_manager/resources/wf_utils.py +++ b/beeflow/wf_manager/resources/wf_utils.py @@ -273,17 +273,20 @@ def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None, create_wf_metadata(wf_id, wf_name) db = connect_db(wfm_db, get_db_path()) for task in tasks: - wfi.add_task(task) + task_state = "" if no_start else "WAITING" + wfi.add_task(task, task_state) metadata = wfi.get_task_metadata(task) metadata['workdir'] = wf_workdir wfi.set_task_metadata(task, metadata) - db.workflows.add_task(task.id, wf_id, task.name, "WAITING") + db.workflows.add_task(task.id, wf_id, task.name, task_state) - update_wf_status(wf_id, 'Waiting') - db.workflows.update_workflow_state(wf_id, 'Waiting') if no_start: + update_wf_status(wf_id, 'No Start') + db.workflows.update_workflow_state(wf_id, 'No Start') log.info('Not starting workflow, as requested') else: + update_wf_status(wf_id, 'Waiting') + db.workflows.update_workflow_state(wf_id, 'Waiting') log.info('Starting workflow') db.workflows.update_workflow_state(wf_id, 'Running') start_workflow(wf_id) @@ -296,6 +299,13 @@ def start_workflow(wf_id): state = wfi.get_workflow_state() if state in ('RUNNING', 'PAUSED', 'COMPLETED'): return False + _, tasks = wfi.get_workflow() + tasks.reverse() + for task in tasks: + task_state = wfi.get_task_state(task) + if task_state == '': + wfi.set_task_state(task, 'WAITING') + db.workflows.update_task_state(task.id, wf_id, 'WAITING') wfi.execute_workflow() tasks = wfi.get_ready_tasks() schedule_submit_tasks(wf_id, tasks) diff --git a/coverage.svg b/coverage.svg index a4262d340..012a8497e 100644 --- a/coverage.svg +++ b/coverage.svg @@ -15,7 +15,7 @@ coverage coverage - 69% - 69% + 70% + 70%