Skip to content

Commit

Permalink
new prune/gen scheme
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Oct 7, 2020
1 parent 401a382 commit a7bab88
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 77 deletions.
2 changes: 1 addition & 1 deletion cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1745,7 +1745,7 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
# (name is left name)
self.taskdefs[name].add_graph_child(task_trigger, right, seq)
# graph_parents not currently used but might be needed soon:
# self.taskdefs[right].add_graph_parent(task_trigger, name, seq)
self.taskdefs[right].add_graph_parent(task_trigger, name, seq)

# Walk down "expr_list" depth first, and replace any items matching a
# key in "triggers" ("left" values) with the trigger.
Expand Down
211 changes: 151 additions & 60 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
"""

from collections import Counter
from copy import deepcopy
from copy import copy, deepcopy
import json
from time import time
import zlib
Expand All @@ -65,7 +65,9 @@
from cylc.flow.suite_status import get_suite_status
from cylc.flow.task_id import TaskID
from cylc.flow.task_job_logs import JOB_LOG_OPTS
from cylc.flow.task_proxy import generate_graph_children
from cylc.flow.task_proxy import (
generate_graph_children, generate_graph_parents
)
from cylc.flow.task_state import TASK_STATUS_WAITING
from cylc.flow.task_state_prop import extract_group_state
from cylc.flow.wallclock import (
Expand Down Expand Up @@ -288,7 +290,7 @@ class DataStoreMgr:
Message containing the global information of the workflow.
.descendants (dict):
Local store of config.get_first_parent_descendants()
.n_max (int):
.n_edge_distance (int):
Maximum distance of the data-store graph from the active pool.
.parents (dict):
Local store of config.get_parent_lists()
Expand All @@ -312,7 +314,7 @@ def __init__(self, schd):
self.parents = {}
self.state_update_families = set()
self.updated_state_families = set()
self.n_max = 1
self.n_edge_distance = 1
# Managed data types
self.data = {
self.workflow_id: deepcopy(DATA_TEMPLATE)
Expand All @@ -331,10 +333,13 @@ def __init__(self, schd):
self.updates_pending = False
self.delta_queues = {self.workflow_id: {}}
self.publish_deltas = []
self.in_window_edges = {}
self.out_window_edges = {}
self.in_window_nodes = {}
self.out_window_nodes = {}
self.active_nodes = set()
self.isolate_path_nodes = set()
self.n_window_nodes = {}
self.n_window_edges = {}
self.n_window_boundary_nodes = {}
self.prune_trigger_nodes = {}
self.prune_flagged_nodes = set()
self.prune_pending = False

def initiate_data_model(self, reloaded=False):
Expand Down Expand Up @@ -530,13 +535,13 @@ def generate_definition_elements(self):
self.parents = parents

def increment_graph_window(
self, name, point, flow_label, n_distance=0, active_id=None):
"""Generate graph window about given task proxy to n_max
self, name, point, flow_label, edge_distance=0, active_id=None):
"""Generate graph window about given task proxy to n_edge_distance
Args:
source_itask (cylc.flow.TaskProxy):
Edge source task proxy object.
n_distance (int): Graph distance from active parent.
edge_distance (int): Graph distance from active parent.
Returns:
Expand All @@ -550,34 +555,44 @@ def increment_graph_window(
active_id = s_id

# Generate task node
if edge_distance == 0:
self.active_nodes.add(active_id)
self.n_window_edges[active_id] = set()
self.n_window_boundary_nodes[active_id] = set()
self.n_window_nodes[active_id] = set()
if active_id in self.prune_trigger_nodes:
self.prune_flagged_nodes.update(
self.prune_trigger_nodes[active_id])
self.prune_pending = True

if s_id in self.n_window_nodes[active_id]:
return
if edge_distance > self.n_edge_distance:
self.n_window_boundary_nodes[active_id].add(s_id)
return

self.n_window_nodes[active_id].add(s_id)
self.generate_ghost_task(s_id, name, point, flow_label)
if n_distance == 0:
self.in_window_nodes[active_id] = set()
self.in_window_edges[active_id] = set()
self.in_window_nodes[active_id].add(s_id)

n_distance += 1
if n_distance > self.n_max:
return
edge_distance += 1

# TODO: xtrigger is suite_state edges too
# Reference set for workflow relations
new_edges = set()
for output, items in generate_graph_children(
self.schd.config.get_taskdef(name), point).items():
for t_name, t_point, is_abs in items:
t_node = TaskID.get(t_name, t_point)
t_id = (
f'{self.workflow_id}{ID_DELIM}{t_point}{ID_DELIM}{t_name}')
self.increment_graph_window(
t_name, t_point, flow_label, n_distance, active_id)

# Initiate edge element.
e_id = (
f'{self.workflow_id}{ID_DELIM}{s_node}{ID_DELIM}{t_node}')
if e_id in self.n_window_edges[active_id]:
continue
if (
e_id not in self.data[self.workflow_id][EDGES] and
e_id not in self.added[EDGES]
e_id not in self.added[EDGES] and
edge_distance <= self.n_edge_distance
):
self.added[EDGES][e_id] = PbEdge(
id=e_id,
Expand All @@ -591,23 +606,73 @@ def increment_graph_window(
self.updated[TASK_PROXIES].setdefault(
s_id,
PbTaskProxy(id=s_id)).edges.append(e_id)
new_edges.add(e_id)
if new_edges:
self.updates_pending = True
self.in_window_edges[active_id] |= new_edges
getattr(self.updated[WORKFLOW], EDGES).edges.extend(new_edges)
self.n_window_edges[active_id].add(e_id)
self.increment_graph_window(
t_name, t_point, flow_label,
copy(edge_distance), active_id)

def decrement_graph_window(self, name, point):
for _, items in generate_graph_parents(
self.schd.config.get_taskdef(name), point).items():
for p_name, p_point, is_abs in items:
p_node = TaskID.get(p_name, p_point)
p_id = (
f'{self.workflow_id}{ID_DELIM}{p_point}{ID_DELIM}{p_name}')
e_id = (
f'{self.workflow_id}{ID_DELIM}{p_node}{ID_DELIM}{s_node}')
if e_id in self.n_window_edges[active_id]:
continue
# Create edge element.
if (
e_id not in self.data[self.workflow_id][EDGES] and
e_id not in self.added[EDGES] and
edge_distance <= self.n_edge_distance
):
self.added[EDGES][e_id] = PbEdge(
id=e_id,
source=p_id,
target=s_id
)
# Add edge id to node field for resolver reference
self.updated[TASK_PROXIES].setdefault(
p_id,
PbTaskProxy(id=p_id)).edges.append(e_id)
self.updated[TASK_PROXIES].setdefault(
s_id,
PbTaskProxy(id=s_id)).edges.append(e_id)
self.n_window_edges[active_id].add(e_id)
# create node element.
self.increment_graph_window(
p_name, p_point, flow_label,
copy(edge_distance), active_id)

if edge_distance == 1:
if not self.n_window_boundary_nodes[active_id]:
self.isolate_path_nodes.add(active_id)
# print('BOUNDARY')
# print(self.n_window_nodes[active_id])
# print(self.n_window_boundary_nodes[active_id])
self.n_window_boundary_nodes[active_id].difference_update(
self.n_window_nodes[active_id]
)
for tp_id in self.n_window_boundary_nodes[active_id]:
# print(tp_id)
self.prune_trigger_nodes.setdefault(
tp_id, set()).add(active_id)
if self.n_window_edges[active_id]:
getattr(self.updated[WORKFLOW], EDGES).edges.extend(
self.n_window_edges[active_id])
self.updates_pending = True

def remove_active_node(self, name, point):
"""Flag removal of this active graph window."""

tp_id = f'{self.workflow_id}{ID_DELIM}{point}{ID_DELIM}{name}'
if tp_id in self.in_window_nodes:
self.out_window_nodes[tp_id] = self.in_window_nodes[tp_id]
del self.in_window_nodes[tp_id]
if tp_id in self.in_window_edges:
self.out_window_edges[tp_id] = self.in_window_edges[tp_id]
del self.in_window_edges[tp_id]
self.prune_pending = True
self.active_nodes.remove(tp_id)
if tp_id in self.isolate_path_nodes:
# print('ISOLATE')
# print(tp_id)
self.prune_flagged_nodes.update(self.n_window_nodes[tp_id])
self.isolate_path_nodes.remove(tp_id)
self.prune_pending = True

def generate_ghost_task(self, tp_id, name, point, flow_label):
"""Create task-point element populated with static data.
Expand Down Expand Up @@ -765,41 +830,67 @@ def prune_data_store(self):
"""
self.prune_pending = False
if not self.out_window_nodes and not self.out_window_edges:
flagged_nodes = self.prune_flagged_nodes.difference(self.active_nodes)
if not flagged_nodes:
return

data = self.data[self.workflow_id]

# Gather nodes and edges to prune
node_ids = set().union(*self.out_window_nodes.values()).difference(
*self.in_window_nodes.values())
self.out_window_nodes.clear()
edge_ids = set().union(*self.out_window_edges.values()).difference(
*self.in_window_edges.values())
self.out_window_edges.clear()

for e_id in edge_ids:
self.deltas[EDGES].pruned.append(e_id)
# Gather nodes to prune via a diff of nodes in the dependency paths
# of flagged boundary nodes against all other paths.
in_paths = [
v
for k, v in self.n_window_nodes.items()
if k not in flagged_nodes
]
# print('IN')
# print(in_paths)
out_paths = [
v
for k, v in self.n_window_nodes.items()
if k in flagged_nodes
]
# print('OUT')
# print(out_paths)
node_ids = set().union(*out_paths).difference(*in_paths)
# print('DIFF')
# print(node_ids)
self.prune_flagged_nodes.difference_update(node_ids)

flagged_ids = set()
tp_data = self.data[self.workflow_id][TASK_PROXIES]
tp_added = self.added[TASK_PROXIES]
parent_ids = set()
for tp_id in node_ids:
if tp_id in self.n_window_nodes:
del self.n_window_nodes[tp_id]
if tp_id in self.n_window_edges:
del self.n_window_edges[tp_id]
self.deltas[TASK_PROXIES].pruned.append(tp_id)
self.schd.job_pool.remove_task_jobs(tp_id)
flagged_ids.add(data[TASK_PROXIES][tp_id].first_parent)
if tp_id in tp_data:
node = tp_data[tp_id]
elif tp_id in tp_added:
node = tp_added[tp_id]
else:
# print('NOT HERE!')
# print(tp_id)
continue
self.deltas[EDGES].pruned.extend(node.edges)
parent_ids.add(node.first_parent)

prune_ids = set()
checked_ids = set()
while flagged_ids:
for fp_id in flagged_ids:
# print('PARENTS')
# print(parent_ids)
while parent_ids:
for fp_id in parent_ids:
self._family_ascent_point_prune(
fp_id, node_ids, flagged_ids, checked_ids, prune_ids)
fp_id, node_ids, parent_ids, checked_ids, prune_ids)
break
if prune_ids:
self.deltas[FAMILY_PROXIES].pruned.extend(prune_ids)
self.updates_pending = True

def _family_ascent_point_prune(
self, fp_id, node_ids, flagged_ids, checked_ids, prune_ids):
self, fp_id, node_ids, parent_ids, checked_ids, prune_ids):
fp_data = self.data[self.workflow_id][FAMILY_PROXIES]
if fp_id in fp_data and fp_id not in self.updated[FAMILY_PROXIES]:
fam_node = fp_data[fp_id]
Expand All @@ -811,7 +902,7 @@ def _family_ascent_point_prune(
]
for child_id in child_fam_nodes:
self._family_ascent_point_prune(
child_id, node_ids, flagged_ids, checked_ids, prune_ids)
child_id, node_ids, parent_ids, checked_ids, prune_ids)
if [
child_id
for child_id in fam_node.child_tasks
Expand All @@ -824,11 +915,11 @@ def _family_ascent_point_prune(
self.state_update_families.add(fp_id)
else:
if fam_node.first_parent:
flagged_ids.add(fam_node.first_parent)
parent_ids.add(fam_node.first_parent)
prune_ids.add(fp_id)
checked_ids.add(fp_id)
if fp_id in flagged_ids:
flagged_ids.remove(fp_id)
if fp_id in parent_ids:
parent_ids.remove(fp_id)

def update_dynamic_elements(self, updated_nodes=None):
"""Update data elements containing dynamic/live fields."""
Expand Down
14 changes: 9 additions & 5 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,9 @@ def add_to_runahead_pool(self, itask, is_new=True):
self.rhpool_changed = True

# Create new data-store n-distance graph window about this task
self.data_store_mgr.increment_graph_window(
itask.tdef.name, itask.point, itask.flow_label)
# LOG.info("[%s] -added to the runahead pool", itask)
# self.data_store_mgr.increment_graph_window(
# itask.tdef.name, itask.point, itask.flow_label)

# add row to "task_states" table & data-store
if is_new:
Expand Down Expand Up @@ -520,6 +521,8 @@ def release_runahead_task(self, itask):
self.pool_changed = True
self.pool_changes.append(itask)
LOG.debug("[%s] -released to the task pool", itask)
self.data_store_mgr.increment_graph_window(
itask.tdef.name, itask.point, itask.flow_label)
del self.runahead_pool[itask.point][itask.identity]
if not self.runahead_pool[itask.point]:
del self.runahead_pool[itask.point]
Expand Down Expand Up @@ -580,10 +583,11 @@ def remove(self, itask, reason=""):
del self.runahead_pool[itask.point]
self.rhpool_changed = True

# Notify the data-store manager of
# their removal (the manager uses entry of children otherwise).
self.data_store_mgr.remove_active_node(itask.tdef.name, itask.point)
# Event-driven final update of task_states table.
# TODO: same for datastore (still updated by iterating the task pool)
self.data_store_mgr.decrement_graph_window(
itask.tdef.name, itask.point)
# TODO: same for datastore (still updated by scheduler loop)
self.suite_db_mgr.put_update_task_state(itask)
LOG.debug("[%s] -%s", itask, msg)
del itask
Expand Down
Loading

0 comments on commit a7bab88

Please sign in to comment.