diff --git a/beeflow/common/gdb/gdb_driver.py b/beeflow/common/gdb/gdb_driver.py index 085ea387d..3bf29e737 100644 --- a/beeflow/common/gdb/gdb_driver.py +++ b/beeflow/common/gdb/gdb_driver.py @@ -30,20 +30,6 @@ def execute_workflow(self): Set the initial tasks' states to 'READY'. """ - @abstractmethod - def pause_workflow(self): - """Pause execution of a running workflow. - - Set workflow from state 'RUNNING' to 'PAUSED'. - """ - - @abstractmethod - def resume_workflow(self): - """Resume execution of a paused workflow. - - Set workflow state from 'PAUSED' to 'RUNNING'. - """ - @abstractmethod def reset_workflow(self, new_id): """Reset the execution state of a stored workflow. diff --git a/beeflow/common/gdb/neo4j_driver.py b/beeflow/common/gdb/neo4j_driver.py index 06b51ea40..02bfd167e 100644 --- a/beeflow/common/gdb/neo4j_driver.py +++ b/beeflow/common/gdb/neo4j_driver.py @@ -93,30 +93,6 @@ def execute_workflow(self, workflow_id): """ self._write_transaction(tx.set_init_task_inputs, wf_id=workflow_id) self._write_transaction(tx.set_runnable_tasks_to_ready, wf_id=workflow_id) - self._write_transaction(tx.set_workflow_state, state='RUNNING', wf_id=workflow_id) - - def pause_workflow(self, workflow_id): - """Pause execution of a running workflow in Neo4j. - - Sets tasks with state 'RUNNING' to 'PAUSED'. - - :param workflow_id: the workflow id - :type workflow_id: str - - """ - with self._driver.session() as session: - session.write_transaction(tx.set_workflow_state, state='PAUSED', wf_id=workflow_id) - - def resume_workflow(self, workflow_id): - """Resume execution of a paused workflow in Neo4j. - - Sets workflow state to 'RUNNING' - - :param workflow_id: the workflow id - :type workflow_id: str - """ - with self._driver.session() as session: - session.write_transaction(tx.set_workflow_state, state='RUNNING', wf_id=workflow_id) def reset_workflow(self, old_id, new_id): """Reset the execution state of an entire workflow. diff --git a/beeflow/common/wf_interface.py b/beeflow/common/wf_interface.py index 38395714e..ef0cdf53d 100644 --- a/beeflow/common/wf_interface.py +++ b/beeflow/common/wf_interface.py @@ -59,14 +59,6 @@ def execute_workflow(self): """Begin execution of a BEE workflow.""" self._gdb_driver.execute_workflow(self._workflow_id) - def pause_workflow(self): - """Pause the execution of a BEE workflow.""" - self._gdb_driver.pause_workflow(self._workflow_id) - - def resume_workflow(self): - """Resume the execution of a paused BEE workflow.""" - self._gdb_driver.resume_workflow(self._workflow_id) - def reset_workflow(self, workflow_id): """Reset the execution state and ID of a BEE workflow.""" self._gdb_driver.reset_workflow(self._workflow_id, workflow_id) diff --git a/beeflow/tests/mocks.py b/beeflow/tests/mocks.py index b5d11c992..292d5f4af 100644 --- a/beeflow/tests/mocks.py +++ b/beeflow/tests/mocks.py @@ -24,14 +24,6 @@ def __init__(self): self._workflow_id = '42' self._loaded = False - def pause_workflow(self): - """Pause a workflow.""" - return - - def resume_workflow(self): - """Resume a workflow.""" - return - def reset_workflow(self, wf_id): #noqa """Reset a workflow.""" wf_id = 0 # noqa @@ -149,14 +141,6 @@ def execute_workflow(self, workflow_id): #noqa not using parameter in mock if self._is_ready(task_id): self.task_states[task_id] = 'READY' - def pause_workflow(self, workflow_id): #noqa not using parameter in mock - """Pause execution of a running workflow.""" - self.workflow_state = 'PAUSED' - - def resume_workflow(self, workflow_id): #noqa not using parameter in mock - """Resume execution of a running workflow.""" - self.workflow_state = 'RESUME' - def reset_workflow(self, old_id, new_id): #noqa not using parameter in mock """Reset the execution state and ID of a workflow.""" self.workflow = deepcopy(self.workflow) diff --git a/beeflow/tests/test_wf_interface.py b/beeflow/tests/test_wf_interface.py index c3a72403e..2296387e9 100644 --- a/beeflow/tests/test_wf_interface.py +++ b/beeflow/tests/test_wf_interface.py @@ -67,40 +67,6 @@ def test_execute_workflow(self): self.assertEqual("READY", self.wfi.get_task_state(tasks[0])) self.assertEqual("RUNNING", self.wfi.get_workflow_state()) - def test_pause_workflow(self): - """Test workflow execution pausing (set running tasks' states to 'PAUSED').""" - workflow_id = generate_workflow_id() - self.wfi.initialize_workflow(Workflow( - "test_workflow", None, None, - [InputParameter("test_input", "File", "input.txt")], - [OutputParameter("test_output", "File", "output.txt", "viz/output")], - workflow_id)) - self._create_test_tasks(workflow_id) - - self.wfi.execute_workflow() - - self.wfi.pause_workflow() - - # Workflow state should now be 'PAUSED' - self.assertEqual("PAUSED", self.wfi.get_workflow_state()) - - def test_resume_workflow(self): - """Test workflow execution resuming (set paused tasks' states to 'RUNNING').""" - workflow_id = generate_workflow_id() - self.wfi.initialize_workflow(Workflow( - "test_workflow", None, None, - [InputParameter("test_input", "File", "input.txt")], - [OutputParameter("test_output", "File", "output.txt", "viz/output")], - workflow_id)) - self._create_test_tasks(workflow_id) - - self.wfi.execute_workflow() - self.wfi.pause_workflow() - self.wfi.resume_workflow() - - # Workflow state should now be 'RESUME' - self.assertEqual("RESUME", self.wfi.get_workflow_state()) - def test_reset_workflow(self): """Test workflow execution resetting (set all tasks to 'WAITING', delete metadata).""" workflow_id = generate_workflow_id() diff --git a/beeflow/tests/test_wf_manager.py b/beeflow/tests/test_wf_manager.py index f46998183..e4e5d7470 100644 --- a/beeflow/tests/test_wf_manager.py +++ b/beeflow/tests/test_wf_manager.py @@ -50,8 +50,12 @@ def teardown_workflow(): @pytest.fixture() -def setup_teardown_workflow(teardown_workflow): +def setup_teardown_workflow(teardown_workflow, mocker, temp_db): """Set up and tear down for tests that use the workflow directory.""" + mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface', + return_value=MockWFI()) + mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', new=lambda: temp_db.db_file) + mocker.patch('beeflow.wf_manager.resources.wf_utils.connect_db', new=mock_connect_db) wf_utils.create_workflow_dir(WF_ID) wf_utils.create_wf_status(WF_ID) yield @@ -162,6 +166,7 @@ def test_start_workflow(client, mocker, temp_db): mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', new=lambda: temp_db.db_file) mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file) mocker.patch('beeflow.common.wf_interface.WorkflowInterface.get_workflow_state', 'Waiting') + mocker.patch('beeflow.wf_manager.resources.wf_utils.read_wf_status', return_value='Waiting') resp = client().post(f'/bee_wfm/v1/jobs/{WF_ID}') assert resp.status_code == 200 @@ -170,7 +175,7 @@ def test_workflow_status(client, mocker, setup_teardown_workflow, temp_db): """Test getting workflow status.""" mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface', return_value=MockWFI()) - mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file) + mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', new=lambda: temp_db.db_file) mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file) wf_name = 'wf' workdir = 'dir' @@ -188,7 +193,7 @@ def test_cancel_workflow(client, mocker, setup_teardown_workflow, temp_db): """Test cancelling a workflow.""" mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface', return_value=MockWFI()) - mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file) + mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', new=lambda: temp_db.db_file) mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file) wf_name = 'wf' @@ -208,7 +213,7 @@ def test_remove_workflow(client, mocker, setup_teardown_workflow, temp_db): """Test removing a workflow.""" mocker.patch('beeflow.wf_manager.resources.wf_utils.get_workflow_interface', return_value=MockWFI()) - mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file) + mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', new=lambda: temp_db.db_file) mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file) wf_name = 'wf' @@ -230,10 +235,12 @@ def test_pause_workflow(client, mocker, setup_teardown_workflow, temp_db): return_value=MockWFI()) mocker.patch('beeflow.tests.mocks.MockWFI.get_workflow_state', return_value='RUNNING') - mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file) + mocker.patch('beeflow.wf_manager.resources.wf_utils.connect_db', new=mock_connect_db) + mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', new=lambda: temp_db.db_file) mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file) + mocker.patch('beeflow.wf_manager.resources.wf_utils.read_wf_status', return_value='RUNNING') - wf_utils.update_wf_status(WF_ID, 'Running') + wf_utils.update_wf_status(WF_ID, 'RUNNING') request = {'option': 'pause'} resp = client().patch(f'/bee_wfm/v1/jobs/{WF_ID}', json=request) assert resp.json['status'] == 'Workflow Paused' @@ -246,13 +253,15 @@ def test_resume_workflow(client, mocker, setup_teardown_workflow, temp_db): return_value=MockWFI()) mocker.patch('beeflow.tests.mocks.MockWFI.get_workflow_state', return_value='PAUSED') + mocker.patch('beeflow.wf_manager.resources.wf_utils.connect_db', new=mock_connect_db) mocker.patch('beeflow.wf_manager.resources.wf_utils.submit_tasks_tm', return_value=None) mocker.patch('beeflow.wf_manager.resources.wf_utils.submit_tasks_scheduler', return_value=None) mocker.patch('beeflow.wf_manager.resources.wf_utils.update_wf_status', return_value=None) - mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', temp_db.db_file) + mocker.patch('beeflow.wf_manager.resources.wf_utils.get_db_path', new=lambda: temp_db.db_file) mocker.patch('beeflow.wf_manager.resources.wf_actions.db_path', temp_db.db_file) + mocker.patch('beeflow.wf_manager.resources.wf_utils.read_wf_status', return_value='PAUSED') - wf_utils.update_wf_status(WF_ID, 'Paused') + wf_utils.update_wf_status(WF_ID, 'PAUSED') request = {'option': 'resume'} resp = client().patch(f'/bee_wfm/v1/jobs/{WF_ID}', json=request) assert resp.json['status'] == 'Workflow Resumed' diff --git a/beeflow/wf_manager/resources/wf_actions.py b/beeflow/wf_manager/resources/wf_actions.py index 30b0d6285..f377f10f9 100644 --- a/beeflow/wf_manager/resources/wf_actions.py +++ b/beeflow/wf_manager/resources/wf_actions.py @@ -23,9 +23,7 @@ def __init__(self): def post(self, wf_id): """Start workflow. Send ready tasks to the task manager.""" - db = connect_db(wfm_db, db_path) if wf_utils.start_workflow(wf_id): - db.workflows.update_workflow_state(wf_id, 'Running') resp = make_response(jsonify(msg='Started workflow!', status='ok'), 200) else: resp_body = jsonify(msg='Cannot start workflow it is {state.lower()}.', status='ok') @@ -46,7 +44,7 @@ def get(wf_id): for task in tasks: tasks_status.append((task.id, task.name, task.state)) - wf_status = db.workflows.get_workflow_state(wf_id) + wf_status = wf_utils.read_wf_status(wf_id) resp = make_response(jsonify(tasks_status=tasks_status, wf_status=wf_status, status='ok'), 200) @@ -58,11 +56,8 @@ def delete(self, wf_id): option = self.reqparse.parse_args()['option'] db = connect_db(wfm_db, db_path) if option == "cancel": - wfi = wf_utils.get_workflow_interface(wf_id) # Remove all tasks currently in the database - wfi.set_workflow_state('Cancelled') wf_utils.update_wf_status(wf_id, 'Cancelled') - db.workflows.update_workflow_state(wf_id, 'Cancelled') log.info(f"Workflow {wf_id} cancelled") resp = make_response(jsonify(status='Cancelled'), 202) elif option == "remove": @@ -79,25 +74,20 @@ def delete(self, wf_id): def patch(self, wf_id): """Pause or resume workflow.""" - db = connect_db(wfm_db, db_path) self.reqparse.add_argument('option', type=str, location='json') option = self.reqparse.parse_args()['option'] wfi = wf_utils.get_workflow_interface(wf_id) log.info('Pausing/resuming workflow') - wf_state = wfi.get_workflow_state() + wf_state = wf_utils.read_wf_status(wf_id) if option == 'pause' and wf_state in ('RUNNING', 'INITIALIZING'): - wfi.pause_workflow() wf_utils.update_wf_status(wf_id, 'Paused') - db.workflows.update_workflow_state(wf_id, 'Paused') log.info(f"Workflow {wf_id} Paused") resp = make_response(jsonify(status='Workflow Paused'), 200) elif option == 'resume' and wf_state == 'PAUSED': - wfi.resume_workflow() + wf_utils.update_wf_status(wf_id, 'Running') tasks = wfi.get_ready_tasks() wf_utils.schedule_submit_tasks(wf_id, tasks) - wf_utils.update_wf_status(wf_id, 'Running') - db.workflows.update_workflow_state(wf_id, 'Running') log.info(f"Workflow {wf_id} Resumed") resp = make_response(jsonify(status='Workflow Resumed'), 200) else: diff --git a/beeflow/wf_manager/resources/wf_utils.py b/beeflow/wf_manager/resources/wf_utils.py index a9818e0fd..451dfa5d6 100644 --- a/beeflow/wf_manager/resources/wf_utils.py +++ b/beeflow/wf_manager/resources/wf_utils.py @@ -100,6 +100,12 @@ def update_wf_status(wf_id, status_msg): with open(status_path, 'w', encoding="utf8") as status: status.write(status_msg) + db = connect_db(wfm_db, get_db_path()) + db.workflows.update_workflow_state(wf_id, status_msg) + + wfi = get_workflow_interface(wf_id) + wfi.set_workflow_state(status_msg) + def read_wf_status(wf_id): """Read workflow status metadata file.""" @@ -288,20 +294,17 @@ def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None, db.workflows.add_task(task.id, wf_id, task.name, "WAITING") update_wf_status(wf_id, 'Waiting') - db.workflows.update_workflow_state(wf_id, 'Waiting') if no_start: log.info('Not starting workflow, as requested') else: log.info('Starting workflow') - db.workflows.update_workflow_state(wf_id, 'Running') start_workflow(wf_id) def start_workflow(wf_id): """Attempt to start the workflow, returning True if successful.""" - db = connect_db(wfm_db, get_db_path()) wfi = get_workflow_interface(wf_id) - state = wfi.get_workflow_state() + state = read_wf_status(wf_id) if state in ('RUNNING', 'PAUSED', 'COMPLETED'): return False wfi.execute_workflow() @@ -309,5 +312,4 @@ def start_workflow(wf_id): schedule_submit_tasks(wf_id, tasks) wf_id = wfi.workflow_id update_wf_status(wf_id, 'Running') - db.workflows.update_workflow_state(wf_id, 'Running') return True