Skip to content

Commit

Permalink
first commit to change gdb task states
Browse files Browse the repository at this point in the history
  • Loading branch information
leahh committed Sep 3, 2024
1 parent 0401b19 commit 69d8281
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 10 deletions.
2 changes: 1 addition & 1 deletion beeflow/common/gdb/gdb_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def reset_workflow(self, new_id):
"""

@abstractmethod
def load_task(self, task):
def load_task(self, task, task_state):
"""Load a task into a stored workflow.
Dependencies should be automatically deduced and generated by the graph database
Expand Down
6 changes: 3 additions & 3 deletions beeflow/common/gdb/neo4j_cypher.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def create_task_output_nodes(tx, task):
value=output.value, glob=output.glob)


def create_task_metadata_node(tx, task):
def create_task_metadata_node(tx, task, task_state):
"""Create a task metadata node in the Neo4j database.
The node holds metadata about a task's execution state.
Expand All @@ -192,9 +192,9 @@ def create_task_metadata_node(tx, task):
:type task: Task
"""
metadata_query = ("MATCH (t:Task {id: $task_id}) "
"CREATE (m:Metadata {state: 'WAITING'})-[:DESCRIBES]->(t)")
"CREATE (m:Metadata {state: $task_state})-[:DESCRIBES]->(t)")

tx.run(metadata_query, task_id=task.id)
tx.run(metadata_query, task_id=task.id, task_state=task_state)


def add_dependencies(tx, task, old_task=None, restarted_task=False):
Expand Down
4 changes: 2 additions & 2 deletions beeflow/common/gdb/neo4j_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def reset_workflow(self, old_id, new_id):
session.write_transaction(tx.reset_tasks_metadata, wf_id=old_id)
session.write_transaction(tx.reset_workflow_id, old_id=old_id, new_id=new_id)

def load_task(self, task):
def load_task(self, task, task_state):
"""Load a task into a workflow stored in the Neo4j database.
Dependencies are automatically deduced and generated by Neo4j upon loading
Expand All @@ -148,7 +148,7 @@ def load_task(self, task):
session.write_transaction(tx.create_task_requirement_nodes, task=task)
session.write_transaction(tx.create_task_input_nodes, task=task)
session.write_transaction(tx.create_task_output_nodes, task=task)
session.write_transaction(tx.create_task_metadata_node, task=task)
session.write_transaction(tx.create_task_metadata_node, task=task, task_state=task_state)
session.write_transaction(tx.add_dependencies, task=task)

def initialize_ready_tasks(self, workflow_id):
Expand Down
4 changes: 2 additions & 2 deletions beeflow/common/wf_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def reset_workflow(self, workflow_id):
self._workflow_id = workflow_id
self._gdb_driver.set_workflow_state(self._workflow_id, 'SUBMITTED')

def add_task(self, task):
def add_task(self, task, task_state):
"""Add a new task to a BEE workflow.
:param task: the name of the file to which to redirect stderr
Expand All @@ -90,7 +90,7 @@ def add_task(self, task):
task.hints = []

# Load the new task into the graph database
self._gdb_driver.load_task(task)
self._gdb_driver.load_task(task, task_state)

def restart_task(self, task, checkpoint_file):
"""Restart a failed BEE workflow task.
Expand Down
7 changes: 5 additions & 2 deletions beeflow/wf_manager/resources/wf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,12 @@ def setup_workflow(wf_id, wf_name, wf_dir, wf_workdir, no_start, workflow=None,
# Tasks come in backwards
tasks.reverse()
for task in tasks:
task_state = "No Start" if no_start else "WAITING"
if not reexecute:
wfi.add_task(task)
wfi.add_task(task, task_state)
metadata = wfi.get_task_metadata(task)
metadata['workdir'] = wf_workdir
wfi.set_task_metadata(task, metadata)
task_state = "No Start" if no_start else "WAITING"
db.workflows.add_task(task.id, wf_id, task.name, task_state)

if no_start:
Expand All @@ -305,8 +305,11 @@ def start_workflow(wf_id):
db = connect_db(wfm_db, get_db_path())
wfi = get_workflow_interface(wf_id)
state = wfi.get_workflow_state()
log.info(f"Leah state test: {state}")
if state in ('RUNNING', 'PAUSED', 'COMPLETED'):
return False
if state == 'No Start':

wfi.execute_workflow()
tasks = wfi.get_ready_tasks()
schedule_submit_tasks(wf_id, tasks)
Expand Down

0 comments on commit 69d8281

Please sign in to comment.