Skip to content

Commit

Permalink
graph prune functional
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Nov 10, 2020
1 parent 01c31b8 commit 462d04d
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 73 deletions.
223 changes: 150 additions & 73 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,28 @@ def apply_delta(key, delta, data):
continue
if key == TASK_PROXIES:
data[TASKS][data[key][del_id].task].proxies.remove(del_id)
try:
data[FAMILY_PROXIES][
data[key][del_id].first_parent
].child_tasks.remove(del_id)
except KeyError:
pass
getattr(data[WORKFLOW], key).remove(del_id)
elif key == FAMILY_PROXIES:
data[FAMILIES][data[key][del_id].family].proxies.remove(del_id)
try:
data[FAMILY_PROXIES][
data[key][del_id].first_parent
].child_families.remove(del_id)
except KeyError:
pass
getattr(data[WORKFLOW], key).remove(del_id)
elif key == EDGES:
edge = data[key][del_id]
if edge.source in data[TASK_PROXIES]:
data[TASK_PROXIES][edge.source].edges.remove(del_id)
if edge.target in data[TASK_PROXIES]:
data[TASK_PROXIES][edge.target].edges.remove(del_id)
getattr(data[WORKFLOW], key).edges.remove(del_id)
del data[key][del_id]

Expand Down Expand Up @@ -287,25 +304,6 @@ class DataStoreMgr:
Workflow scheduler instance.
"""

# Memory optimization - constrain possible attributes to this list.
__slots__ = [
'added',
'ancestors',
'data',
'deltas',
'delta_queues',
'descendants',
'n_max',
'parents',
'publish_deltas',
'schd',
'state_update_families',
'updated_state_families',
'updated',
'updates_pending',
'workflow_id',
]

def __init__(self, schd):
self.schd = schd
self.workflow_id = f'{self.schd.owner}{ID_DELIM}{self.schd.suite}'
Expand Down Expand Up @@ -333,6 +331,11 @@ def __init__(self, schd):
self.updates_pending = False
self.delta_queues = {self.workflow_id: {}}
self.publish_deltas = []
self.in_window_edges = {}
self.out_window_edges = {}
self.in_window_nodes = {}
self.out_window_nodes = {}
self.prune_pending = False

def initiate_data_model(self, reloaded=False):
"""Initiate or Update data model on start/restart/reload.
Expand Down Expand Up @@ -540,8 +543,6 @@ def increment_graph_window(
None
"""
if n_distance is None:
n_distance = 0
# Create this source node
s_node = TaskID.get(name, point)
s_id = f'{self.workflow_id}{ID_DELIM}{point}{ID_DELIM}{name}'
Expand All @@ -550,6 +551,10 @@ def increment_graph_window(

# Generate task node
self.generate_ghost_task(s_id, name, point, flow_label)
if n_distance == 0:
self.in_window_nodes[active_id] = set()
self.in_window_edges[active_id] = set()
self.in_window_nodes[active_id].add(s_id)

n_distance += 1
if n_distance > self.n_max:
Expand All @@ -564,8 +569,6 @@ def increment_graph_window(
t_node = TaskID.get(t_name, t_point)
t_id = (
f'{self.workflow_id}{ID_DELIM}{t_point}{ID_DELIM}{t_name}')
# Even if node is in data store, itask needed for it's
# graph_children.. Somehow make more efficient?
self.increment_graph_window(
t_name, t_point, flow_label, n_distance, active_id)

Expand All @@ -591,8 +594,21 @@ def increment_graph_window(
new_edges.add(e_id)
if new_edges:
self.updates_pending = True
self.in_window_edges[active_id] |= new_edges
getattr(self.updated[WORKFLOW], EDGES).edges.extend(new_edges)

def decrement_graph_window(self, name, point):
"""Flag removal of this active graph window."""

tp_id = f'{self.workflow_id}{ID_DELIM}{point}{ID_DELIM}{name}'
if tp_id in self.in_window_nodes:
self.out_window_nodes[tp_id] = self.in_window_nodes[tp_id]
del self.in_window_nodes[tp_id]
if tp_id in self.in_window_edges:
self.out_window_edges[tp_id] = self.in_window_edges[tp_id]
del self.in_window_edges[tp_id]
self.prune_pending = True

def generate_ghost_task(self, tp_id, name, point, flow_label):
"""Create task-point element populated with static data.
Expand Down Expand Up @@ -714,38 +730,16 @@ def generate_ghost_family(self, fp_id, child_fam=None, child_task=None):
elif child_fam not in fp_parent.child_families:
fp_parent.child_families.append(child_fam)

def prune_points(self, point_strings):
"""Remove old nodes and edges by cycle point.
Args:
point_strings (iterable):
Iterable of valid cycle point strings.
"""
flow_data = self.data[self.workflow_id]
if not point_strings:
return
node_ids = set()
for tp_id, tproxy in list(flow_data[TASK_PROXIES].items()):
if tproxy.cycle_point in point_strings:
node_ids.add(tp_id)
self.deltas[TASK_PROXIES].pruned.append(tp_id)
self.schd.job_pool.remove_task_jobs(tp_id)

for fp_id, fproxy in list(flow_data[FAMILY_PROXIES].items()):
if fproxy.cycle_point in point_strings:
self.deltas[FAMILY_PROXIES].pruned.append(fp_id)

for e_id, edge in list(flow_data[EDGES].items()):
if edge.source in node_ids or edge.target in node_ids:
self.deltas[EDGES].pruned.append(e_id)

def update_data_structure(self, updated_nodes=None):
"""Reflect workflow changes in the data structure."""
"""Workflow batch updates in the data structure."""
# update states and other dynamic fields
# TODO: Event driven task proxy updates (non-Batch)
self.update_dynamic_elements(updated_nodes)
self.update_family_proxies()

# Update workflow statuses and totals if needed
if self.prune_pending:
self.prune_data_store()
if self.updates_pending:
self.update_workflow()

Expand All @@ -759,6 +753,93 @@ def update_data_structure(self, updated_nodes=None):
# Clear deltas
self.clear_deltas()

if self.state_update_families:
self.updates_pending = True

def prune_data_store(self):
"""Remove old nodes and edges by cycle point.
Args:
point_strings (iterable):
Iterable of valid cycle point strings.
"""
self.prune_pending = False
if not self.out_window_nodes and not self.out_window_edges:
return

data = self.data[self.workflow_id]

# Gather nodes and edges to prune
node_ids = set().union(*self.out_window_nodes.values()).difference(
*self.in_window_nodes.values())
self.out_window_nodes.clear()
edge_ids = set().union(*self.out_window_edges.values()).difference(
*self.in_window_edges.values())
self.out_window_edges.clear()

for e_id in edge_ids:
self.deltas[EDGES].pruned.append(e_id)

flagged_ids = set()
for tp_id in node_ids:
self.deltas[TASK_PROXIES].pruned.append(tp_id)
self.schd.job_pool.remove_task_jobs(tp_id)
flagged_ids.add(data[TASK_PROXIES][tp_id].first_parent)

prune_ids = set()
checked_ids = set()
while flagged_ids:
for fp_id in flagged_ids:
self._family_ascent_point_prune(
fp_id, node_ids, flagged_ids, checked_ids, prune_ids)
break
if prune_ids:
self.deltas[FAMILY_PROXIES].pruned.extend(prune_ids)
self.updates_pending = True

def _family_ascent_point_prune(
self, fp_id, node_ids, flagged_ids, checked_ids, prune_ids):
fp_data = self.data[self.workflow_id][FAMILY_PROXIES]
if fp_id in fp_data and fp_id not in self.updated[FAMILY_PROXIES]:
fam_node = fp_data[fp_id]
# Gather child families, then check/update recursively
child_fam_nodes = [
n_id
for n_id in fam_node.child_families
if n_id not in checked_ids
]
for child_id in child_fam_nodes:
self._family_ascent_point_prune(
child_id, node_ids, flagged_ids, checked_ids, prune_ids)
if [
child_id
for child_id in fam_node.child_tasks
if child_id not in node_ids
] or [
child_id
for child_id in fam_node.child_families
if child_id not in prune_ids
]:
self.state_update_families.add(fp_id)
else:
if fam_node.first_parent:
flagged_ids.add(fam_node.first_parent)
prune_ids.add(fp_id)
checked_ids.add(fp_id)
if fp_id in flagged_ids:
flagged_ids.remove(fp_id)

def update_dynamic_elements(self, updated_nodes=None):
"""Update data elements containing dynamic/live fields."""
# If no tasks are given update all
if updated_nodes is None:
updated_nodes = self.schd.pool.get_all_tasks()
elif not updated_nodes:
return
self.update_task_proxies(updated_nodes)
self.updates_pending = True

def update_task_proxies(self, updated_tasks=None):
"""Update dynamic fields of task nodes/proxies.
Expand Down Expand Up @@ -884,8 +965,15 @@ def _family_ascent_point_update(self, fp_id):
fp_data = self.data[self.workflow_id][FAMILY_PROXIES]
if fp_id in fp_data:
fam_node = fp_data[fp_id]
else:
elif fp_id in fp_added:
fam_node = fp_added[fp_id]
else:
# TODO: Shouldn't need with event driven updates
# as nodes will be updated before removal.
if fp_id in self.state_update_families:
self.updated_state_families.add(fp_id)
self.state_update_families.remove(fp_id)
return
# Gather child families, then check/update recursively
child_fam_nodes = [
n_id
Expand Down Expand Up @@ -999,35 +1087,13 @@ def update_workflow(self):
max(set(self.schd.pool.runahead_pool))
)

def update_dynamic_elements(self, updated_nodes=None):
"""Update data elements containing dynamic/live fields."""
# If no tasks are given update all
if updated_nodes is None:
updated_nodes = self.schd.pool.get_all_tasks()
elif not updated_nodes:
return
self.update_task_proxies(updated_nodes)
self.update_family_proxies()
self.updates_pending = True

# TODO: Make the other deltas/updates event driven like this one.
def delta_broadcast(self):
"""Collects broadcasts on change event."""
workflow = self.updated[WORKFLOW]
workflow.broadcasts = json.dumps(self.schd.broadcast_mgr.broadcasts)
self.updates_pending = True

def clear_deltas(self):
"""Clear current deltas."""
for key in self.deltas:
self.deltas[key].Clear()
if key == WORKFLOW:
self.added[key].Clear()
self.updated[key].Clear()
continue
self.added[key].clear()
self.updated[key].clear()

def apply_deltas(self, reloaded=False):
"""Gather and apply deltas."""
# Copy in job deltas
Expand Down Expand Up @@ -1081,6 +1147,17 @@ def apply_deltas(self, reloaded=False):
self.schd.job_pool.added.clear()
self.schd.job_pool.updated.clear()

def clear_deltas(self):
"""Clear current deltas."""
for key in self.deltas:
self.deltas[key].Clear()
if key == WORKFLOW:
self.added[key].Clear()
self.updated[key].Clear()
continue
self.added[key].clear()
self.updated[key].clear()

# Message collation and dissemination methods:
def get_entire_workflow(self):
"""Gather data elements into single Protobuf message.
Expand Down
2 changes: 2 additions & 0 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,8 @@ def remove(self, itask, reason=""):

# Event-driven final update of task_states table.
# TODO: same for datastore (still updated by iterating the task pool)
self.data_store_mgr.decrement_graph_window(
itask.tdef.name, itask.point)
self.suite_db_mgr.put_update_task_state(itask)
LOG.debug("[%s] -%s", itask, msg)
del itask
Expand Down

0 comments on commit 462d04d

Please sign in to comment.