Skip to content

Commit

Permalink
graph prune functional
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Sep 27, 2020
1 parent 1802719 commit 1a18a3e
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 75 deletions.
227 changes: 154 additions & 73 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,28 @@ def apply_delta(key, delta, data):
continue
if key == TASK_PROXIES:
data[TASKS][data[key][del_id].task].proxies.remove(del_id)
try:
data[FAMILY_PROXIES][
data[key][del_id].first_parent
].child_tasks.remove(del_id)
except KeyError:
pass
getattr(data[WORKFLOW], key).remove(del_id)
elif key == FAMILY_PROXIES:
data[FAMILIES][data[key][del_id].family].proxies.remove(del_id)
try:
data[FAMILY_PROXIES][
data[key][del_id].first_parent
].child_families.remove(del_id)
except KeyError:
pass
getattr(data[WORKFLOW], key).remove(del_id)
elif key == EDGES:
edge = data[key][del_id]
if edge.source in data[TASK_PROXIES]:
data[TASK_PROXIES][edge.source].edges.remove(del_id)
if edge.target in data[TASK_PROXIES]:
data[TASK_PROXIES][edge.target].edges.remove(del_id)
getattr(data[WORKFLOW], key).edges.remove(del_id)
del data[key][del_id]

Expand Down Expand Up @@ -285,25 +302,6 @@ class DataStoreMgr:
Workflow scheduler instance.
"""

# Memory optimization - constrain possible attributes to this list.
__slots__ = [
'added',
'ancestors',
'data',
'deltas',
'delta_queues',
'descendants',
'n_max',
'parents',
'publish_deltas',
'schd',
'state_update_families',
'updated_state_families',
'updated',
'updates_pending',
'workflow_id',
]

def __init__(self, schd):
self.schd = schd
self.workflow_id = f'{self.schd.owner}{ID_DELIM}{self.schd.suite}'
Expand Down Expand Up @@ -331,6 +329,11 @@ def __init__(self, schd):
self.updates_pending = False
self.delta_queues = {self.workflow_id: {}}
self.publish_deltas = []
self.in_window_edges = {}
self.out_window_edges = {}
self.in_window_nodes = {}
self.out_window_nodes = {}
self.next_prune_flagged_fams = set()

def initiate_data_model(self, reloaded=False):
"""Initiate or Update data model on start/restart/reload.
Expand Down Expand Up @@ -524,7 +527,8 @@ def generate_definition_elements(self):
self.descendants = descendants
self.parents = parents

def increment_graph_window(self, source_itask, n_distance=None):
def increment_graph_window(
self, source_itask, n_distance=0, active_id=None):
"""Generate graph window about given task proxy to n_max
Args:
Expand All @@ -537,16 +541,20 @@ def increment_graph_window(self, source_itask, n_distance=None):
None
"""
if n_distance is None:
n_distance = 0
# Create this source node
s_node = source_itask.identity
s_id = (
f'{self.workflow_id}{ID_DELIM}{source_itask.point}'
f'{ID_DELIM}{source_itask.tdef.name}')
if active_id is None:
active_id = s_id

# Generate task node
self.generate_ghost_task(s_id, source_itask)
if n_distance == 0:
self.in_window_nodes[active_id] = set()
self.in_window_edges[active_id] = set()
self.in_window_nodes[active_id].add(s_id)

n_distance += 1
if n_distance > self.n_max:
Expand All @@ -570,7 +578,8 @@ def increment_graph_window(self, source_itask, n_distance=None):
reflow=source_itask.reflow,
log_info=False,
)
self.increment_graph_window(target_itask, n_distance)
self.increment_graph_window(
target_itask, n_distance, active_id)

# Initiate edge element.
e_id = (
Expand All @@ -594,8 +603,22 @@ def increment_graph_window(self, source_itask, n_distance=None):
new_edges.add(e_id)
if new_edges:
self.updates_pending = True
self.in_window_edges[active_id] |= new_edges
getattr(self.updated[WORKFLOW], EDGES).edges.extend(new_edges)

def decrement_graph_window(self, name, point):
"""Flag removal of this active graph window."""

tp_id = f'{self.workflow_id}{ID_DELIM}{point}{ID_DELIM}{name}'
if tp_id in self.in_window_nodes:
self.out_window_nodes[tp_id] = self.in_window_nodes[tp_id]
del self.in_window_nodes[tp_id]
self.updates_pending = True
if tp_id in self.in_window_edges:
self.out_window_edges[tp_id] = self.in_window_edges[tp_id]
del self.in_window_edges[tp_id]
self.updates_pending = True

def generate_ghost_task(self, tp_id, itask):
"""Create task-point element populated with static data.
Expand Down Expand Up @@ -717,39 +740,16 @@ def generate_ghost_family(self, fp_id, child_fam=None, child_task=None):
elif child_fam not in fp_parent.child_families:
fp_parent.child_families.append(child_fam)

def prune_points(self, point_strings):
"""Remove old nodes and edges by cycle point.
Args:
point_strings (iterable):
Iterable of valid cycle point strings.
"""
flow_data = self.data[self.workflow_id]
if not point_strings:
return
node_ids = set()
for tp_id, tproxy in list(flow_data[TASK_PROXIES].items()):
if tproxy.cycle_point in point_strings:
node_ids.add(tp_id)
self.deltas[TASK_PROXIES].pruned.append(tp_id)
self.schd.job_pool.remove_task_jobs(tp_id)

for fp_id, fproxy in list(flow_data[FAMILY_PROXIES].items()):
if fproxy.cycle_point in point_strings:
self.deltas[FAMILY_PROXIES].pruned.append(fp_id)

for e_id, edge in list(flow_data[EDGES].items()):
if edge.source in node_ids or edge.target in node_ids:
self.deltas[EDGES].pruned.append(e_id)

def update_data_structure(self, updated_nodes=None):
"""Reflect workflow changes in the data structure."""
"""Workflow batch updates in the data structure."""
# update states and other dynamic fields
# TODO: Event driven task proxy updates (non-Batch)
self.update_dynamic_elements(updated_nodes)
self.update_family_proxies()

# Update workflow statuses and totals if needed
if self.updates_pending:
self.prune_data_store()
self.update_workflow()

if self.updates_pending or self.schd.job_pool.updates_pending:
Expand All @@ -762,6 +762,91 @@ def update_data_structure(self, updated_nodes=None):
# Clear deltas
self.clear_deltas()

if self.state_update_families:
self.updates_pending = True

def prune_data_store(self):
"""Remove old nodes and edges by cycle point.
Args:
point_strings (iterable):
Iterable of valid cycle point strings.
"""
if not self.out_window_nodes and not self.out_window_edges:
return

data = self.data[self.workflow_id]

# Gather nodes and edges to prune
node_ids = set().union(*self.out_window_nodes.values()).difference(
*self.in_window_nodes.values())
self.out_window_nodes.clear()
edge_ids = set().union(*self.out_window_edges.values()).difference(
*self.in_window_edges.values())
self.out_window_edges.clear()

for e_id in edge_ids:
self.deltas[EDGES].pruned.append(e_id)

flagged_ids = set()
for tp_id in node_ids:
self.deltas[TASK_PROXIES].pruned.append(tp_id)
self.schd.job_pool.remove_task_jobs(tp_id)
flagged_ids.add(data[TASK_PROXIES][tp_id].first_parent)

prune_ids = set()
checked_ids = set()
while flagged_ids:
for fp_id in flagged_ids:
self._family_ascent_point_prune(
fp_id, node_ids, flagged_ids, checked_ids, prune_ids)
break
if prune_ids:
self.deltas[FAMILY_PROXIES].pruned.extend(prune_ids)

def _family_ascent_point_prune(
self, fp_id, node_ids, flagged_ids, checked_ids, prune_ids):
fp_data = self.data[self.workflow_id][FAMILY_PROXIES]
if fp_id in fp_data and fp_id not in self.updated[FAMILY_PROXIES]:
fam_node = fp_data[fp_id]
# Gather child families, then check/update recursively
child_fam_nodes = [
n_id
for n_id in fam_node.child_families
if n_id not in checked_ids
]
for child_id in child_fam_nodes:
self._family_ascent_point_prune(
child_id, node_ids, flagged_ids, checked_ids, prune_ids)
if [
child_id
for child_id in fam_node.child_tasks
if child_id not in node_ids
] or [
child_id
for child_id in fam_node.child_families
if child_id not in prune_ids
]:
self.state_update_families.add(fp_id)
else:
if fam_node.first_parent:
flagged_ids.add(fam_node.first_parent)
prune_ids.add(fp_id)
checked_ids.add(fp_id)
if fp_id in flagged_ids:
flagged_ids.remove(fp_id)

def update_dynamic_elements(self, updated_nodes=None):
"""Update data elements containing dynamic/live fields."""
# If no tasks are given update all
if updated_nodes is None:
updated_nodes = self.schd.pool.get_all_tasks()
elif not updated_nodes:
return
self.update_task_proxies(updated_nodes)
self.updates_pending = True

def update_task_proxies(self, updated_tasks=None):
"""Update dynamic fields of task nodes/proxies.
Expand Down Expand Up @@ -887,8 +972,15 @@ def _family_ascent_point_update(self, fp_id):
fp_data = self.data[self.workflow_id][FAMILY_PROXIES]
if fp_id in fp_data:
fam_node = fp_data[fp_id]
else:
elif fp_id in fp_added:
fam_node = fp_added[fp_id]
else:
# TODO: Shouldn't need with event driven updates
# as nodes will be updated before removal.
if fp_id in self.state_update_families:
self.updated_state_families.add(fp_id)
self.state_update_families.remove(fp_id)
return
# Gather child families, then check/update recursively
child_fam_nodes = [
n_id
Expand Down Expand Up @@ -1002,35 +1094,13 @@ def update_workflow(self):
max(set(self.schd.pool.runahead_pool))
)

def update_dynamic_elements(self, updated_nodes=None):
"""Update data elements containing dynamic/live fields."""
# If no tasks are given update all
if updated_nodes is None:
updated_nodes = self.schd.pool.get_all_tasks()
elif not updated_nodes:
return
self.update_task_proxies(updated_nodes)
self.update_family_proxies()
self.updates_pending = True

# TODO: Make the other deltas/updates event driven like this one.
def delta_broadcast(self):
"""Collects broadcasts on change event."""
workflow = self.updated[WORKFLOW]
workflow.broadcasts = json.dumps(self.schd.broadcast_mgr.broadcasts)
self.updates_pending = True

def clear_deltas(self):
"""Clear current deltas."""
for key in self.deltas:
self.deltas[key].Clear()
if key == WORKFLOW:
self.added[key].Clear()
self.updated[key].Clear()
continue
self.added[key].clear()
self.updated[key].clear()

def apply_deltas(self, reloaded=False):
"""Gather and apply deltas."""
# Copy in job deltas
Expand Down Expand Up @@ -1084,6 +1154,17 @@ def apply_deltas(self, reloaded=False):
self.schd.job_pool.added.clear()
self.schd.job_pool.updated.clear()

def clear_deltas(self):
"""Clear current deltas."""
for key in self.deltas:
self.deltas[key].Clear()
if key == WORKFLOW:
self.added[key].Clear()
self.updated[key].Clear()
continue
self.added[key].clear()
self.updated[key].clear()

# Message collation and dissemination methods:
def get_entire_workflow(self):
"""Gather data elements into single Protobuf message.
Expand Down
7 changes: 5 additions & 2 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ def add_to_runahead_pool(self, itask, is_new=True):
self.runahead_pool[itask.point][itask.identity] = itask
self.rhpool_changed = True

# Create new data-store n-distance graph window about this task
self.data_store_mgr.increment_graph_window(itask)

# add row to "task_states" table & data-store
if is_new:
self.suite_db_mgr.put_insert_task_states(itask, {
Expand Down Expand Up @@ -516,8 +519,6 @@ def release_runahead_task(self, itask):
self.pool_changed = True
self.pool_changes.append(itask)
LOG.debug("[%s] -released to the task pool", itask)
# Create new data-store n-distance graph window about this task
self.data_store_mgr.increment_graph_window(itask)
del self.runahead_pool[itask.point][itask.identity]
if not self.runahead_pool[itask.point]:
del self.runahead_pool[itask.point]
Expand Down Expand Up @@ -580,6 +581,8 @@ def remove(self, itask, reason=""):

# Event-driven final update of task_states table.
# TODO: same for datastore (still updated by iterating the task pool)
self.data_store_mgr.decrement_graph_window(
itask.tdef.name, itask.point)
self.suite_db_mgr.put_update_task_state(itask)
LOG.debug("[%s] -%s", itask, msg)
del itask
Expand Down

0 comments on commit 1a18a3e

Please sign in to comment.