Skip to content

Commit

Permalink
initial commit of correct gdb states
Browse files Browse the repository at this point in the history
  • Loading branch information
leahh committed Sep 3, 2024
1 parent 69d8281 commit d6e2c13
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 5 deletions.
7 changes: 7 additions & 0 deletions beeflow/common/gdb/gdb_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ def get_ready_tasks(self):
:rtype: list of Task
"""

@abstractmethod
def get_no_start_tasks(self):
"""Return tasks with state 'No Start' from the graph database.
:rtype: list of Task
"""

@abstractmethod
def get_dependent_tasks(self, task):
"""Return the dependent tasks of a workflow task in the graph database.
Expand Down
13 changes: 13 additions & 0 deletions beeflow/common/gdb/neo4j_cypher.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,19 @@ def get_ready_tasks(tx, wf_id):
return [rec['t'] for rec in tx.run(get_ready_query, wf_id=wf_id)]


def get_no_start_tasks(tx, wf_id):
"""Get all tasks that have the no start state.
:param workflow_id: the workflow id
:type workflow_id: str
:rtype: neo4j.Result
"""
get_ready_query = ("MATCH (:Metadata {state: 'No Start'})-[:DESCRIBES]->"
"(t:Task {workflow_id: $wf_id}) RETURN t")

return [rec['t'] for rec in tx.run(get_ready_query, wf_id=wf_id)]


def get_dependent_tasks(tx, task):
"""Get the tasks that depend on a specified task.
Expand Down
11 changes: 11 additions & 0 deletions beeflow/common/gdb/neo4j_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,17 @@ def get_ready_tasks(self, workflow_id):
tuples = self._get_task_data_tuples(task_records)
return [_reconstruct_task(tup[0], tup[1], tup[2], tup[3], tup[4]) for tup in tuples]

def get_no_start_tasks(self, workflow_id):
"""Return tasks with state 'No Start' from the graph database from a particular workflow.
:param workflow_id: the workflow id
:type workflow_id: str
:rtype: list of Task
"""
task_records = self._read_transaction(tx.get_no_start_tasks, wf_id=workflow_id)
tuples = self._get_task_data_tuples(task_records)
return [_reconstruct_task(tup[0], tup[1], tup[2], tup[3], tup[4]) for tup in tuples]

def get_dependent_tasks(self, task):
"""Return the dependent tasks of a specified workflow task.
Expand Down
6 changes: 6 additions & 0 deletions beeflow/common/wf_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@ def get_ready_tasks(self):
"""
return self._gdb_driver.get_ready_tasks(self._workflow_id)

def set_no_start_tasks(self):
"""Set no start tasks to waiting."""
tasks = self._gdb_driver.get_no_start_tasks(self._workflow_id)
for task in tasks:
self.set_task_state(task, "WAITING")

def get_dependent_tasks(self, task):
"""Get the dependents of a task in a BEE workflow.
Expand Down
6 changes: 1 addition & 5 deletions beeflow/wf_manager/resources/wf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,7 @@ def start_workflow(wf_id):
db = connect_db(wfm_db, get_db_path())
wfi = get_workflow_interface(wf_id)
state = wfi.get_workflow_state()
log.info(f"Leah state test: {state}")
if state in ('RUNNING', 'PAUSED', 'COMPLETED'):
return False
if state == 'No Start':

wfi.set_no_start_tasks()
wfi.execute_workflow()
tasks = wfi.get_ready_tasks()
schedule_submit_tasks(wf_id, tasks)
Expand Down

0 comments on commit d6e2c13

Please sign in to comment.