diff --git a/beeflow/common/gdb/neo4j_cypher.py b/beeflow/common/gdb/neo4j_cypher.py index e7701b551..f8c8913a8 100644 --- a/beeflow/common/gdb/neo4j_cypher.py +++ b/beeflow/common/gdb/neo4j_cypher.py @@ -218,17 +218,17 @@ def add_dependencies(tx, task, old_task=None, restarted_task=False): "MATCH (t:Task)<-[:OUTPUT_OF]-(o:Output) " "WITH s, t, sources, collect(o.id) as outputs " "WHERE any(input IN sources WHERE input IN outputs) " - "MERGE (s)-[:DEPENDS_ON]->(t) " - "WITH s " - "MATCH (s)<-[:OUTPUT_OF]-(o:Output) " - "WITH s, collect(o.id) AS outputs " - "MATCH (t:Task)<-[:INPUT_OF]-(i:Input) " - "WITH s, t, outputs, collect(i.source) as sources " - "WHERE any(output IN outputs WHERE output IN sources) " - "MERGE (t)-[:DEPENDS_ON]->(s)") + "MERGE (s)-[:DEPENDS_ON]->(t)") + dependent_query = ("MATCH (s:Task {id: $task_id})<-[:OUTPUT_OF]-(o:Output) " + "WITH s, collect(o.id) AS outputs " + "MATCH (t:Task)<-[:INPUT_OF]-(i:Input) " + "WITH s, t, outputs, collect(i.source) as sources " + "WHERE any(output IN outputs WHERE output IN sources) " + "MERGE (t)-[:DEPENDS_ON]->(s)") tx.run(begins_query, task_id=task.id) tx.run(dependency_query, task_id=task.id) + tx.run(dependent_query, task_id=task.id) def get_task_by_id(tx, task_id):