diff --git a/CHANGES.md b/CHANGES.md index 7508c4fd502..7de16fc71d0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -56,6 +56,11 @@ compatibility, the `cylc run` command will automatically symlink an existing ### Enhancements +[#3811](https://github.com/cylc/cylc-flow/pull/3811) - Move from cycle based +to `n` distance dependency graph window node generation and pruning of the +data-store (API/visual backing data). Ability to modify distance of live +workflow via API, with default of `n=1`. + [#3899](https://github.com/cylc/cylc-flow/pull/3899) - CLI changes * Commands no longer re-invoke (so you get `cylc run` not `cylc-run`). * Improve CLI descriptions and help. diff --git a/cylc/flow/config.py b/cylc/flow/config.py index fff0a2f9052..9e12875a740 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -1742,7 +1742,7 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, # (name is left name) self.taskdefs[name].add_graph_child(task_trigger, right, seq) # graph_parents not currently used but might be needed soon: - # self.taskdefs[right].add_graph_parent(task_trigger, name, seq) + self.taskdefs[right].add_graph_parent(task_trigger, name, seq) # Walk down "expr_list" depth first, and replace any items matching a # key in "triggers" ("left" values) with the trigger. @@ -1994,103 +1994,6 @@ def get_graph_raw(self, start_point_string, stop_point_string, self._last_graph_raw_edges = graph_raw_edges return graph_raw_edges - def get_graph_edges(self, start_point, stop_point): - """Convert the abstract graph edges (self.edges, etc) to actual edges - - This method differs from the get_graph_raw; class attributes are not - used to hold information from previous method calls, and only ungrouped - edges are returned. - - Args: - start_point (cylc.flow.cycling.*Point): - Start Integer or ISO8601 Point. - stop_point (cylc.flow.cycling.*Point): - Stop Integer or ISO8601 Point. - """ - - if start_point is None: - raise TypeError( - "get_graph_edges() start_point argument must be a" - " valid cycle point, not 'NoneType'") - # Avoid infinite edge generation - if stop_point is None: - raise TypeError( - "get_graph_edges() stop_point argument must be a" - " valid cycle point, not 'NoneType'") - suite_final_point = get_point( - self.cfg['scheduling']['final cycle point']) - - # Get ICP on-sequence point - actual_first_point = self.get_actual_first_point(self.start_point) - - gr_edges = {} - start_point_offset_cache = {} - point_offset_cache = None - for sequence, edges in self.edges.items(): - # Get first cycle point for this sequence - point = sequence.get_first_point(start_point) - while point is not None: - if point > stop_point: - # Beyond requested final cycle point. - break - if suite_final_point is not None and point > suite_final_point: - # Beyond suite final cycle point. - break - point_offset_cache = {} - for left, right, suicide, cond in edges: - if right: - r_id = (right, point) - else: - r_id = None - if left.startswith('@'): - # @trigger node. - name = left - offset_is_from_icp = False - offset = None - else: - name, offset, _, offset_is_from_icp, _, _ = ( - GraphNodeParser.get_inst().parse(left)) - if offset: - if offset_is_from_icp: - cache = start_point_offset_cache - # use actual ICP first point - rel_point = actual_first_point - else: - cache = point_offset_cache - rel_point = point - try: - l_point = cache[offset] - except KeyError: - l_point = get_point_relative(offset, rel_point) - cache[offset] = l_point - else: - l_point = point - l_id = (name, l_point) - - if l_id is None and r_id is None: - continue - if l_id is not None and actual_first_point > l_id[1]: - # Check that l_id is not earlier than start time. - if r_id is None or r_id[1] < actual_first_point: - continue - # Pre-initial dependency; - # keep right hand node. - l_id = r_id - r_id = None - if point not in gr_edges: - gr_edges[point] = [] - # only used to get task ID here - lstr, rstr = self._close_families(l_id, r_id, {}) - gr_edges[point].append((lstr, rstr, None, suicide, cond)) - # Increment the cycle point. - point = sequence.get_next_point_on_sequence(point) - - del start_point_offset_cache - del point_offset_cache - GraphNodeParser.get_inst().clear() - # Flatten nested list. - return [i for sublist in gr_edges.values() for i in sublist] - def get_node_labels(self, start_point_string, stop_point_string=None): """Return dependency graph node labels.""" stop_point = None diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index 2fc9078d8d0..bb0b877f71a 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -32,16 +32,24 @@ Static data elements are generated on workflow start/restart/reload, which includes workflow, task, and family definition objects. +The cycle point nodes/edges (i.e. task/family proxies) generation is triggered +individually on transition from staging to active task pool. Each active task +is generated along with any children and parents recursively out to a +specified maximum graph distance (n_edge_distance), that can be externally +altered (via API). Collectively this forms the N-Distance-Window on the +workflow graph. + +Pruning of data-store elements is done using both the collection/set of nodes +generated through the associated graph paths of the active nodes and the +tracking of the boundary nodes (n_edge_distance+1) of those active nodes. +Once active, these boundary nodes act as the prune trigger for their +original/generator node(s). Set operations are used to do a diff between the +nodes of active paths (paths whose node is in the active task pool) and the +nodes of flagged paths (whose boundary node(s) have become active). + Updates are triggered by changes in the task pool; migrations of task instances from runahead to live pool, and changes in task state (itask.state.is_updated). -- Graph edges are generated for new cycle points in the pool -- Ghost nodes, task and family cycle point instances containing static data, - are generated from the source & target of edges and pointwise respectively. -- Cycle points are removed/pruned if they are not in the pool and not - the source or target cycle point of the current edge set. The removed include - edge, task, and family cycle point items and an update of the family, - workflow, and manager aggregate attributes. Data elements include a "stamp" field, which is a timestamped ID for use in assessing changes in the data store, for comparisons of a store sync. @@ -51,13 +59,12 @@ """ from collections import Counter -from copy import deepcopy +from copy import copy, deepcopy import json from time import time import zlib from cylc.flow import __version__ as CYLC_VERSION, LOG, ID_DELIM -from cylc.flow.cycling.loader import get_point from cylc.flow.data_messages_pb2 import ( PbEdge, PbEntireWorkflow, PbFamily, PbFamilyProxy, PbJob, PbTask, PbTaskProxy, PbWorkflow, AllDeltas, EDeltas, FDeltas, FPDeltas, @@ -66,7 +73,9 @@ from cylc.flow.suite_status import get_suite_status from cylc.flow.task_id import TaskID from cylc.flow.task_job_logs import JOB_LOG_OPTS +from cylc.flow.task_state import TASK_STATUS_WAITING, TASK_STATUS_EXPIRED from cylc.flow.task_state_prop import extract_group_state +from cylc.flow.taskdef import generate_graph_children, generate_graph_parents from cylc.flow.wallclock import ( TIME_ZONE_LOCAL_INFO, TIME_ZONE_UTC_INFO, @@ -120,7 +129,6 @@ DELTA_FIELDS = {DELTA_ADDED, DELTA_UPDATED, DELTA_PRUNED} - # Protobuf message merging appends repeated field results on merge, # unlike singular fields which are overwritten. This behaviour is # desirable in many cases, but there are exceptions. @@ -192,11 +200,28 @@ def apply_delta(key, delta, data): continue if key == TASK_PROXIES: data[TASKS][data[key][del_id].task].proxies.remove(del_id) + try: + data[FAMILY_PROXIES][ + data[key][del_id].first_parent + ].child_tasks.remove(del_id) + except KeyError: + pass getattr(data[WORKFLOW], key).remove(del_id) elif key == FAMILY_PROXIES: data[FAMILIES][data[key][del_id].family].proxies.remove(del_id) + try: + data[FAMILY_PROXIES][ + data[key][del_id].first_parent + ].child_families.remove(del_id) + except KeyError: + pass getattr(data[WORKFLOW], key).remove(del_id) elif key == EDGES: + edge = data[key][del_id] + if edge.source in data[TASK_PROXIES]: + data[TASK_PROXIES][edge.source].edges.remove(del_id) + if edge.target in data[TASK_PROXIES]: + data[TASK_PROXIES][edge.target].edges.remove(del_id) getattr(data[WORKFLOW], key).edges.remove(del_id) del data[key][del_id] @@ -271,16 +296,10 @@ class DataStoreMgr: Message containing the global information of the workflow. .descendants (dict): Local store of config.get_first_parent_descendants() - .edge_points (dict): - Source point keys of target points lists. - .max_point (cylc.flow.cycling.PointBase): - Maximum cycle point in the pool. - .min_point (cylc.flow.cycling.PointBase): - Minimum cycle point in the pool. + .n_edge_distance (int): + Maximum distance of the data-store graph from the active pool. .parents (dict): Local store of config.get_parent_lists() - .pool_points (set): - Cycle point objects in the task pool. .publish_deltas (list): Collection of the latest applied deltas for publishing. .schd (cylc.flow.scheduler.Scheduler): @@ -293,40 +312,16 @@ class DataStoreMgr: Workflow scheduler instance. """ - # Memory optimization - constrain possible attributes to this list. - __slots__ = [ - 'added', - 'ancestors', - 'data', - 'deltas', - 'delta_queues', - 'descendants', - 'edge_points', - 'max_point', - 'min_point', - 'parents', - 'pool_points', - 'publish_deltas', - 'schd', - 'state_update_families', - 'updated_state_families', - 'updated', - 'updates_pending', - 'workflow_id', - ] - def __init__(self, schd): self.schd = schd self.workflow_id = f'{self.schd.owner}{ID_DELIM}{self.schd.suite}' self.ancestors = {} self.descendants = {} self.parents = {} - self.pool_points = set() - self.max_point = None - self.min_point = None - self.edge_points = {} self.state_update_families = set() self.updated_state_families = set() + self.n_edge_distance = 1 + self.next_n_edge_distance = None # Managed data types self.data = { self.workflow_id: deepcopy(DATA_TEMPLATE) @@ -345,6 +340,13 @@ def __init__(self, schd): self.updates_pending = False self.delta_queues = {self.workflow_id: {}} self.publish_deltas = [] + self.all_task_pool = set() + self.n_window_nodes = {} + self.n_window_edges = {} + self.n_window_boundary_nodes = {} + self.prune_trigger_nodes = {} + self.prune_flagged_nodes = set() + self.prune_pending = False def initiate_data_model(self, reloaded=False): """Initiate or Update data model on start/restart/reload. @@ -360,7 +362,6 @@ def initiate_data_model(self, reloaded=False): # Static elements self.generate_definition_elements() - self.increment_graph_elements() # Tidy and reassign task jobs after reload if reloaded: @@ -374,17 +375,17 @@ def initiate_data_model(self, reloaded=False): for j_id in self.schd.job_pool.task_jobs.get(tp_id, []) ] self.schd.job_pool.reload_deltas() - # Set jobs ref self.data[self.workflow_id][JOBS] = self.schd.job_pool.pool - # Update workflow statuses and totals (assume needed) self.update_workflow() + # Apply current deltas self.apply_deltas(reloaded) self.updates_pending = False self.schd.job_pool.updates_pending = False + # Gather this batch of deltas for publish self.publish_deltas = self.get_publish_deltas() # Clear deltas after application and publishing @@ -539,19 +540,182 @@ def generate_definition_elements(self): self.descendants = descendants self.parents = parents - def generate_ghost_task(self, t_id, tp_id, point_string): + def increment_graph_window( + self, name, point, flow_label, + edge_distance=0, active_id=None, + descendant=False, is_parent=False): + """Generate graph window about given origin to n-edge-distance. + + Args: + name (str): + Task name. + point (cylc.flow.cycling.PointBase): + PointBase derived object. + flow_label (str): + Flow label used to distinguish multiple runs. + edge_distance (int): + Graph distance from active/origin node. + active_id (str): + Active/origin node id. + descendant (bool): + Is the current node a direct descendent of the active/origin. + + Returns: + + None + + """ + # Create this source node + s_node = TaskID.get(name, point) + s_id = f'{self.workflow_id}{ID_DELIM}{point}{ID_DELIM}{name}' + if active_id is None: + active_id = s_id + + # Setup and check if active node is another's boundary node + # to flag its paths for pruning. + if edge_distance == 0: + self.n_window_edges[active_id] = set() + self.n_window_boundary_nodes[active_id] = {} + self.n_window_nodes[active_id] = set() + if active_id in self.prune_trigger_nodes: + self.prune_flagged_nodes.update( + self.prune_trigger_nodes[active_id]) + del self.prune_trigger_nodes[active_id] + self.prune_pending = True + + # This part is vital to constructing a set of boundary nodes + # associated with the current Active node. + if edge_distance > self.n_edge_distance: + if descendant and self.n_edge_distance > 0: + self.n_window_boundary_nodes[ + active_id].setdefault(edge_distance, set()).add(s_id) + return + graph_children = generate_graph_children( + self.schd.config.get_taskdef(name), point) + if ( + (not any(graph_children.values()) and descendant) + or self.n_edge_distance == 0 + ): + self.n_window_boundary_nodes[ + active_id].setdefault(edge_distance, set()).add(s_id) + + self.n_window_nodes[active_id].add(s_id) + # Generate task node + self.generate_ghost_task(s_id, name, point, flow_label, is_parent) + + edge_distance += 1 + + # TODO: xtrigger is suite_state edges too + # Reference set for workflow relations + for items in graph_children.values(): + if edge_distance == 1: + descendant = True + self._expand_graph_window( + s_id, s_node, items, active_id, flow_label, edge_distance, + descendant, False) + + for items in generate_graph_parents( + self.schd.config.get_taskdef(name), point).values(): + self._expand_graph_window( + s_id, s_node, items, active_id, flow_label, edge_distance, + False, True) + + if edge_distance == 1: + levels = self.n_window_boundary_nodes[active_id].keys() + # Could be self-reference node foo:failed => foo + if not levels: + self.n_window_boundary_nodes[active_id][0] = {active_id} + levels = (0,) + # Only trigger pruning for futhest set of boundary nodes + for tp_id in self.n_window_boundary_nodes[active_id][max(levels)]: + self.prune_trigger_nodes.setdefault( + tp_id, set()).add(active_id) + del self.n_window_boundary_nodes[active_id] + if self.n_window_edges[active_id]: + getattr(self.updated[WORKFLOW], EDGES).edges.extend( + self.n_window_edges[active_id]) + + def _expand_graph_window( + self, s_id, s_node, items, active_id, flow_label, edge_distance, + descendant=False, is_parent=False): + """Construct nodes/edges for children/parents of source node.""" + for t_name, t_point, is_abs in items: + t_node = TaskID.get(t_name, t_point) + t_id = ( + f'{self.workflow_id}{ID_DELIM}{t_point}{ID_DELIM}{t_name}') + # Initiate edge element. + if is_parent: + e_id = ( + f'{self.workflow_id}{ID_DELIM}{t_node}{ID_DELIM}{s_node}') + else: + e_id = ( + f'{self.workflow_id}{ID_DELIM}{s_node}{ID_DELIM}{t_node}') + if e_id in self.n_window_edges[active_id]: + continue + if ( + e_id not in self.data[self.workflow_id][EDGES] + and e_id not in self.added[EDGES] + and edge_distance <= self.n_edge_distance + ): + self.added[EDGES][e_id] = PbEdge( + id=e_id, + source=s_id, + target=t_id + ) + # Add edge id to node field for resolver reference + self.updated[TASK_PROXIES].setdefault( + t_id, + PbTaskProxy(id=t_id)).edges.append(e_id) + self.updated[TASK_PROXIES].setdefault( + s_id, + PbTaskProxy(id=s_id)).edges.append(e_id) + self.n_window_edges[active_id].add(e_id) + if t_id in self.n_window_nodes[active_id]: + continue + self.increment_graph_window( + t_name, t_point, flow_label, + copy(edge_distance), active_id, descendant, is_parent) + + def remove_pool_node(self, name, point): + """Remove ID reference and flag isolate node/branch for pruning.""" + tp_id = f'{self.workflow_id}{ID_DELIM}{point}{ID_DELIM}{name}' + if tp_id in self.all_task_pool: + self.all_task_pool.remove(tp_id) + # flagged isolates/end-of-branch nodes for pruning on removal + if ( + tp_id in self.prune_trigger_nodes and + tp_id in self.prune_trigger_nodes[tp_id] + ): + self.prune_flagged_nodes.update(self.prune_trigger_nodes[tp_id]) + del self.prune_trigger_nodes[tp_id] + self.prune_pending = True + + def add_pool_node(self, name, point): + """Add external ID reference for internal task pool node.""" + tp_id = f'{self.workflow_id}{ID_DELIM}{point}{ID_DELIM}{name}' + self.all_task_pool.add(tp_id) + + def generate_ghost_task( + self, tp_id, name, point, flow_label, is_parent=False): """Create task-point element populated with static data. Args: - t_id (str): data-store task ID. - tp_id (str): data-store task proxy ID. - point_string (str): Valid cycle point string. + tp_id (str): + data-store task proxy ID. + name (str): + Task name. + point (cylc.flow.cycling.PointBase): + PointBase derived object. + flow_label (str): + Flow label used to distinguish multiple runs. Returns: None """ + t_id = f'{self.workflow_id}{ID_DELIM}{name}' + point_string = f'{point}' task_proxies = self.data[self.workflow_id][TASK_PROXIES] if tp_id in task_proxies or tp_id in self.added[TASK_PROXIES]: return @@ -568,7 +732,15 @@ def generate_ghost_task(self, t_id, tp_id, point_string): cycle_point=point_string, depth=taskdef.depth, name=taskdef.name, + state=TASK_STATUS_WAITING, + flow_label=flow_label ) + if is_parent and tp_id not in self.n_window_nodes: + # TODO: Load task info from DB + tproxy.state = TASK_STATUS_EXPIRED + else: + tproxy.state = TASK_STATUS_WAITING + tproxy.namespace[:] = taskdef.namespace tproxy.ancestors[:] = [ f'{self.workflow_id}{ID_DELIM}{point_string}{ID_DELIM}{a_name}' @@ -586,6 +758,8 @@ def generate_ghost_task(self, t_id, tp_id, point_string): ) ).proxies.append(tp_id) self.generate_ghost_family(tproxy.first_parent, child_task=tp_id) + self.state_update_families.add(tproxy.first_parent) + self.updates_pending = True def generate_ghost_family(self, fp_id, child_fam=None, child_task=None): """Generate the family-point elements from given ID if non-existent. @@ -655,109 +829,21 @@ def generate_ghost_family(self, fp_id, child_fam=None, child_task=None): elif child_fam not in fp_parent.child_families: fp_parent.child_families.append(child_fam) - def generate_graph_elements(self, start_point=None, stop_point=None): - """Generate edges and [ghost] nodes (family and task proxy elements). - - Args: - start_point (cylc.flow.cycling.PointBase): - Edge generation start point. - stop_point (cylc.flow.cycling.PointBase): - Edge generation stop point. - - """ - if not self.pool_points: - return - config = self.schd.config - if start_point is None: - start_point = min(self.pool_points) - if stop_point is None: - stop_point = max(self.pool_points) - - # Reference set for workflow relations - new_edges = set() - - # Generate ungrouped edges - for edge in config.get_graph_edges(start_point, stop_point): - # Reference or create edge source & target nodes/proxies - s_node = edge[0] - t_node = edge[1] - if s_node is None: - continue - # Is the source cycle point in the task pool? - s_name, s_point = TaskID.split(s_node) - s_point_cls = get_point(s_point) - s_pool_point = False - s_valid = TaskID.is_valid_id(s_node) - if s_valid: - s_pool_point = s_point_cls in self.pool_points - # Is the target cycle point in the task pool? - t_pool_point = False - t_valid = t_node and TaskID.is_valid_id(t_node) - if t_valid: - t_name, t_point = TaskID.split(t_node) - t_point_cls = get_point(t_point) - t_pool_point = get_point(t_point) in self.pool_points - - # Proceed if either source or target cycle points - # are in the task pool. - if not s_pool_point and not t_pool_point: - continue - - # If source/target is valid add/create the corresponding items. - # TODO: if xtrigger is suite_state create remote ID - source_id = ( - f'{self.workflow_id}{ID_DELIM}{s_point}{ID_DELIM}{s_name}') - - # Add valid source before checking for no target, - # as source may be an isolate (hence no edges). - if s_valid: - s_task_id = f'{self.workflow_id}{ID_DELIM}{s_name}' - # Add source points for pruning. - self.edge_points.setdefault(s_point_cls, set()) - self.generate_ghost_task(s_task_id, source_id, s_point) - # If target is valid then created it. - # Edges are only created for valid targets. - # At present targets can't be xtriggers. - if t_valid: - target_id = ( - f'{self.workflow_id}{ID_DELIM}{t_point}{ID_DELIM}{t_name}') - t_task_id = f'{self.workflow_id}{ID_DELIM}{t_name}' - # Add target points to associated source points for pruning. - self.edge_points.setdefault(s_point_cls, set()) - self.edge_points[s_point_cls].add(t_point_cls) - self.generate_ghost_task(t_task_id, target_id, t_point) - - # Initiate edge element. - e_id = ( - f'{self.workflow_id}{ID_DELIM}{s_node}{ID_DELIM}{t_node}') - self.added[EDGES][e_id] = PbEdge( - id=e_id, - suicide=edge[3], - cond=edge[4], - source=source_id, - target=target_id, - ) - new_edges.add(e_id) - - # Add edge id to node field for resolver reference - self.updated[TASK_PROXIES].setdefault( - target_id, - PbTaskProxy(id=target_id)).edges.append(e_id) - if s_valid: - self.updated[TASK_PROXIES].setdefault( - source_id, - PbTaskProxy(id=source_id)).edges.append(e_id) - if new_edges: - getattr(self.updated[WORKFLOW], EDGES).edges.extend(new_edges) - def update_data_structure(self, updated_nodes=None): - """Reflect workflow changes in the data structure.""" - # Update edges & node set - self.increment_graph_elements() + """Workflow batch updates in the data structure.""" # update states and other dynamic fields + # TODO: Event driven task proxy updates (non-Batch) self.update_dynamic_elements(updated_nodes) + self.update_family_proxies() + + # Avoids changing window edge distance during edge/node creation + if self.next_n_edge_distance is not None: + self.n_edge_distance = self.next_n_edge_distance + self.next_n_edge_distance = None # Update workflow statuses and totals if needed + if self.prune_pending: + self.prune_data_store() if self.updates_pending: self.update_workflow() @@ -766,100 +852,125 @@ def update_data_structure(self, updated_nodes=None): self.apply_deltas() self.updates_pending = False self.schd.job_pool.updates_pending = False + # Gather this batch of deltas for publish + self.publish_deltas = self.get_publish_deltas() + # Clear deltas + self.clear_deltas() - self.publish_deltas = self.get_publish_deltas() - - # Clear deltas after application and publishing - self.clear_deltas() + if self.state_update_families: + self.updates_pending = True - def increment_graph_elements(self): - """Generate and/or prune graph elements if needed. + def prune_data_store(self): + """Remove flagged nodes and edges not in the set of active paths.""" - Use the task pool and edge source/target cycle points to find - new points to generate edges and/or old points to prune data-store. + self.prune_pending = False - """ - # Gather task pool cycle points. - old_pool_points = self.pool_points.copy() - self.pool_points = set(self.schd.pool.pool) - # No action if pool is not yet initiated. - if not self.pool_points: + if not self.prune_flagged_nodes: return - # Increment edges: - # - Initially for each cycle point in the pool. - # - For each new cycle point thereafter. - # Using difference and pointwise allows for historical - # task insertion (in gaps). - new_points = self.pool_points.difference(old_pool_points) - if new_points: - for point in new_points: - # All family & task cycle instances are generated and - # populated with static data as 'ghost nodes'. - self.generate_graph_elements(point, point) - self.min_point = min(self.pool_points) - self.max_point = max(self.pool_points) - # Prune data store by cycle point where said point is: - # - Not in the set of pool points. - # - Not a source or target cycle point in the set of edges. - # This ensures a buffer of sources and targets in front and behind the - # task pool, while accommodating exceptions such as ICP dependencies. - # TODO: Turn nodes back to ghost if not in pool? (for suicide) - prune_points = set() - for s_point, t_points in list(self.edge_points.items()): - if (s_point not in self.pool_points and - t_points.isdisjoint(self.pool_points)): - prune_points.add(str(s_point)) - prune_points.update((str(t_p) for t_p in t_points)) - del self.edge_points[s_point] + + in_paths_nodes = set().union(*[ + v + for k, v in self.n_window_nodes.items() + if k in self.all_task_pool + ]) + out_paths_nodes = self.prune_flagged_nodes.union(*[ + v + for k, v in self.n_window_nodes.items() + if k in self.prune_flagged_nodes + ]) + # Trim out any nodes in the runahead pool + out_paths_nodes.difference(self.all_task_pool) + # Prune only nodes not in the paths of active nodes + node_ids = out_paths_nodes.difference(in_paths_nodes) + # Absolute triggers may be present in task pool, so recheck. + # Clear the rest. + self.prune_flagged_nodes.intersection_update(self.all_task_pool) + + tp_data = self.data[self.workflow_id][TASK_PROXIES] + tp_added = self.added[TASK_PROXIES] + parent_ids = set() + for tp_id in list(node_ids): + if tp_id in self.n_window_nodes: + del self.n_window_nodes[tp_id] + if tp_id in self.n_window_edges: + del self.n_window_edges[tp_id] + if tp_id in tp_data: + node = tp_data[tp_id] + elif tp_id in tp_added: + node = tp_added[tp_id] + else: + node_ids.remove(tp_id) continue - t_diffs = t_points.difference(self.pool_points) - if t_diffs: - prune_points.update((str(t_p) for t_p in t_diffs)) - self.edge_points[s_point].difference_update(t_diffs) - # Action pruning if any eligible cycle points are found. - if prune_points: - self.prune_points(prune_points) - if new_points or prune_points: - # Pruned and/or additional elements require - # state/status recalculation, and ID ref updates. + self.deltas[TASK_PROXIES].pruned.append(tp_id) + self.schd.job_pool.remove_task_jobs(tp_id) + self.deltas[EDGES].pruned.extend(node.edges) + parent_ids.add(node.first_parent) + + prune_ids = set() + checked_ids = set() + while parent_ids: + self._family_ascent_point_prune( + next(iter(parent_ids)), + node_ids, parent_ids, checked_ids, prune_ids) + if prune_ids: + self.deltas[FAMILY_PROXIES].pruned.extend(prune_ids) + if node_ids: self.updates_pending = True - def prune_points(self, point_strings): - """Remove old nodes and edges by cycle point. + def _family_ascent_point_prune( + self, fp_id, node_ids, parent_ids, checked_ids, prune_ids): + """Find and prune family recursively checking child families. - Args: - point_strings (iterable): - Iterable of valid cycle point strings. + Recursively map out child families to the bottom from the origin + family. The work back up to origin checking these families are active. """ - flow_data = self.data[self.workflow_id] - if not point_strings: + fp_data = self.data[self.workflow_id][FAMILY_PROXIES] + fp_updated = self.updated[FAMILY_PROXIES] + if fp_id in fp_data: + fam_node = fp_data[fp_id] + # Gather child families, then check/update recursively + child_fam_nodes = [ + n_id + for n_id in fam_node.child_families + if n_id not in checked_ids + ] + for child_id in child_fam_nodes: + self._family_ascent_point_prune( + child_id, node_ids, parent_ids, checked_ids, prune_ids) + child_tasks = set(fam_node.child_tasks) + child_families = set(fam_node.child_families) + # Add in any new children + if fp_id in fp_updated: + if fp_updated[fp_id].child_tasks: + child_tasks.update(fp_updated[fp_id].child_tasks) + if fp_updated[fp_id].child_families: + child_families.update(fp_updated[fp_id].child_families) + # if any child tasks or families are active, don't prune. + if ( + child_tasks.difference(node_ids) + or child_families.difference(prune_ids) + ): + if fp_id in prune_ids: + self.state_update_families.add(fp_id) + else: + if fam_node.first_parent: + parent_ids.add(fam_node.first_parent) + prune_ids.add(fp_id) + checked_ids.add(fp_id) + if fp_id in parent_ids: + parent_ids.remove(fp_id) + + def update_dynamic_elements(self, updated_nodes=None): + """Update data elements containing dynamic/live fields.""" + # If no tasks are given update all + if updated_nodes is None: + updated_nodes = self.schd.pool.get_all_tasks() + if not updated_nodes: return - node_ids = set() - for tp_id, tproxy in list(flow_data[TASK_PROXIES].items()): - if tproxy.cycle_point in point_strings: - node_ids.add(tp_id) - self.deltas[TASK_PROXIES].pruned.append(tp_id) - self.schd.job_pool.remove_task_jobs(tp_id) + self.update_task_proxies(updated_nodes) + self.updates_pending = True - for fp_id, fproxy in list(flow_data[FAMILY_PROXIES].items()): - if fproxy.cycle_point in point_strings: - self.deltas[FAMILY_PROXIES].pruned.append(fp_id) - - for e_id, edge in list(flow_data[EDGES].items()): - if edge.source in node_ids or edge.target in node_ids: - self.deltas[EDGES].pruned.append(e_id) - - # TODO: Do we need prerequisites of non-live tasks? - # If so, How to integrate into GraphQL? n-window might help? - # Old scheduler method use to create non-live task info like this: - # for task_id in bad_items: - # name, point = TaskID.split(task_id) - # for tname in self.schd.config.get_task_name_list(): - # if tname == name: - # itask = TaskProxy( - # self.schd.config.get_taskdef(name), - # get_point(point), flow_label="_") def update_task_proxies(self, updated_tasks=None): """Update dynamic fields of task nodes/proxies. @@ -968,9 +1079,8 @@ def update_family_proxies(self): """ self.updated_state_families.clear() while self.state_update_families: - for fam_id in self.state_update_families: - self._family_ascent_point_update(fam_id) - break + self._family_ascent_point_update( + next(iter(self.state_update_families))) def _family_ascent_point_update(self, fp_id): """Updates the given family and children recursively. @@ -985,8 +1095,15 @@ def _family_ascent_point_update(self, fp_id): fp_data = self.data[self.workflow_id][FAMILY_PROXIES] if fp_id in fp_data: fam_node = fp_data[fp_id] - else: + elif fp_id in fp_added: fam_node = fp_added[fp_id] + else: + # TODO: Shouldn't need with event driven updates + # as nodes will be updated before removal. + if fp_id in self.state_update_families: + self.updated_state_families.add(fp_id) + self.state_update_families.remove(fp_id) + return # Gather child families, then check/update recursively child_fam_nodes = [ n_id @@ -999,6 +1116,7 @@ def _family_ascent_point_update(self, fp_id): fp_updated = self.updated[FAMILY_PROXIES] tp_data = self.data[self.workflow_id][TASK_PROXIES] tp_updated = self.updated[TASK_PROXIES] + tp_added = self.added[TASK_PROXIES] # gather child states for count and set is_held state_counter = Counter({}) is_held_total = 0 @@ -1009,7 +1127,9 @@ def _family_ascent_point_update(self, fp_id): state_counter += Counter(dict(child_node.state_totals)) task_states = [] for tp_id in fam_node.child_tasks: - tp_node = tp_updated.get(tp_id, tp_data.get(tp_id)) + tp_node = tp_updated.get(tp_id) + if tp_node is None or not tp_node.state: + tp_node = tp_added.get(tp_id, tp_data.get(tp_id)) if tp_node is not None: if tp_node.state: task_states.append(tp_node.state) @@ -1035,6 +1155,45 @@ def _family_ascent_point_update(self, fp_id): self.state_update_families.add(fam_node.first_parent) self.state_update_families.remove(fp_id) + def hold_release_tasks(self, hold=True): + """Hold or release all task nodes in the graph window.""" + # Needed, as not all data-store tasks are in the task pool + tp_data = self.data[self.workflow_id][TASK_PROXIES] + tp_added = self.added[TASK_PROXIES] + update_time = time() + for tp_node in list(tp_data.values()) + list(tp_added.values()): + if tp_node.is_held is hold: + continue + tp_delta = self.updated[TASK_PROXIES].setdefault( + tp_node.id, PbTaskProxy(id=tp_node.id)) + tp_delta.stamp = f'{tp_node.id}@{update_time}' + tp_delta.is_held = hold + tp_delta.state = tp_node.state + self.state_update_families.add(tp_node.first_parent) + + def set_graph_window_extent(self, n_edge_distance): + """Set what the max edge distance will change to. + + Args: + n_edge_distance (int): + Maximum edge distance from active node. + + """ + self.next_n_edge_distance = n_edge_distance + self.updates_pending = True + + def set_workflow_ports(self): + # Create new message and copy existing message content + workflow = self.updated[WORKFLOW] + workflow.id = self.workflow_id + workflow.last_updated = time() + workflow.stamp = f'{workflow.id}@{workflow.last_updated}' + + workflow.port = self.schd.port + workflow.pub_port = self.schd.pub_port + + self.updates_pending = True + def update_workflow(self): """Update workflow element status and state totals.""" # Create new message and copy existing message content @@ -1072,24 +1231,14 @@ def update_workflow(self): workflow.status, workflow.status_msg = map( str, get_suite_status(self.schd)) - for key, value in ( - ('oldest_cycle_point', self.min_point), - ('newest_cycle_point', self.max_point), - ('newest_runahead_cycle_point', - self.schd.pool.get_max_point_runahead())): - if value: - setattr(workflow, key, str(value)) - - def update_dynamic_elements(self, updated_nodes=None): - """Update data elements containing dynamic/live fields.""" - # If no tasks are given update all - if updated_nodes is None: - updated_nodes = self.schd.pool.get_all_tasks() - elif not updated_nodes: - return - self.update_task_proxies(updated_nodes) - self.update_family_proxies() - self.updates_pending = True + if self.schd.pool.pool: + pool_points = set(self.schd.pool.pool) + workflow.oldest_cycle_point = str(min(pool_points)) + workflow.newest_cycle_point = str(max(pool_points)) + if self.schd.pool.runahead_pool: + workflow.newest_runahead_cycle_point = str( + max(set(self.schd.pool.runahead_pool)) + ) # TODO: Make the other deltas/updates event driven like this one. def delta_broadcast(self): @@ -1098,17 +1247,6 @@ def delta_broadcast(self): workflow.broadcasts = json.dumps(self.schd.broadcast_mgr.broadcasts) self.updates_pending = True - def clear_deltas(self): - """Clear current deltas.""" - for key in self.deltas: - self.deltas[key].Clear() - if key == WORKFLOW: - self.added[key].Clear() - self.updated[key].Clear() - continue - self.added[key].clear() - self.updated[key].clear() - def apply_deltas(self, reloaded=False): """Gather and apply deltas.""" # Copy in job deltas @@ -1162,6 +1300,17 @@ def apply_deltas(self, reloaded=False): self.schd.job_pool.added.clear() self.schd.job_pool.updated.clear() + def clear_deltas(self): + """Clear current deltas.""" + for key in self.deltas: + self.deltas[key].Clear() + if key == WORKFLOW: + self.added[key].Clear() + self.updated[key].Clear() + continue + self.added[key].clear() + self.updated[key].clear() + # Message collation and dissemination methods: def get_entire_workflow(self): """Gather data elements into single Protobuf message. diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index 15d398acaf1..3ac73fb0093 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -696,6 +696,28 @@ def set_verbosity(self, level): self.schd.command_queue.put(("set_verbosity", (level,), {})) return (True, 'Command queued') + def set_graph_window_extent(self, n_edge_distance): + """Set data-store graph window to new max edge distance. + + Args: + n_edge_distance (int): + Max edge distance 0..n from active node. + + Returns: + tuple: (outcome, message) + + outcome (bool) + True if command successfully queued. + message (str) + Information about outcome. + + """ + if n_edge_distance >= 0: + self.schd.data_store_mgr.set_graph_window_extent(n_edge_distance) + return (True, f'Maximum edge distance set to {n_edge_distance}') + else: + return (False, 'Edge distance cannot be negative') + def force_spawn_children(self, tasks, outputs): """Spawn children of given task outputs. diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index a1a94b89db3..77a32135036 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -558,7 +558,8 @@ async def resolve_broadcasts(root, info, **args): def resolve_json_dump(root, info, **args): - return json.loads(getattr(root, to_snake_case(info.field_name), '{}')) + field = getattr(root, to_snake_case(info.field_name), '{}') or '{}' + return json.loads(field) # Types: @@ -1518,6 +1519,22 @@ class Arguments: result = GenericScalar() +class SetGraphWindowExtent(Mutation): + class Meta: + description = sstrip(''' + Set the maximum graph distance (n) from an active node + of the data-store graph window. + + ''') + resolver = partial(mutator, command='set_graph_window_extent') + + class Arguments: + workflows = List(WorkflowID, required=True) + n_edge_distance = Int(required=True) + + result = GenericScalar() + + class Stop(Mutation): class Meta: description = sstrip(''' @@ -1687,6 +1704,8 @@ class Mutations(ObjectType): reload = Reload.Field(description=Reload._meta.description) set_verbosity = SetVerbosity.Field( description=SetVerbosity._meta.description) + set_graph_window_extent = SetGraphWindowExtent.Field( + description=SetGraphWindowExtent._meta.description) stop = Stop.Field(description=Stop._meta.description) # task actions diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 28253ecfee0..5e3d003b5cb 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -467,7 +467,10 @@ async def configure(self): self.config, self.suite_db_mgr, self.task_events_mgr, - self.job_pool) + self.job_pool, + self.data_store_mgr) + + self.data_store_mgr.initiate_data_model() self.profiler.log_memory("scheduler.py: before load_tasks") if self.is_restart: @@ -542,6 +545,7 @@ async def start_servers(self): self.barrier.wait() self.port = self.server.port self.pub_port = self.publisher.port + self.data_store_mgr.set_workflow_ports() async def log_start(self): if self.is_restart: @@ -584,7 +588,6 @@ async def log_start(self): async def start_scheduler(self): """Start the scheduler main loop.""" try: - self.data_store_mgr.initiate_data_model() self._configure_contact() if self.is_restart: self.restart_remote_init() @@ -1429,12 +1432,14 @@ async def main_loop(self): """The scheduler main loop.""" while True: # MAIN LOOP tinit = time() - has_reloaded = False if self.pool.do_reload: + # Re-initialise data model on reload + self.data_store_mgr.initiate_data_model(reloaded=True) self.pool.reload_taskdefs() self.is_updated = True - has_reloaded = True + await self.publisher.publish( + self.data_store_mgr.publish_deltas) self.process_command_queue() self.release_tasks() @@ -1448,12 +1453,6 @@ async def main_loop(self): self.process_command_queue() self.task_events_mgr.process_events(self) - # Re-initialise data model on reload - if has_reloaded: - self.data_store_mgr.initiate_data_model(reloaded=True) - await self.publisher.publish( - self.data_store_mgr.publish_deltas - ) # Update state summary, database, and uifeed self.suite_db_mgr.put_task_event_timers(self.task_events_mgr) has_updated = await self.update_data_structure() @@ -1754,6 +1753,7 @@ def hold_suite(self, point=None): """Hold all tasks in suite.""" if point is None: self.pool.hold_all_tasks() + self.data_store_mgr.hold_release_tasks() self.task_events_mgr.pflag = True self.suite_db_mgr.put_suite_hold() LOG.info('Suite held.') @@ -1772,6 +1772,7 @@ def release_suite(self): LOG.info("RELEASE: new tasks will be queued when ready") self.pool.set_hold_point(None) self.pool.release_all_tasks() + self.data_store_mgr.hold_release_tasks(hold=False) self.suite_db_mgr.delete_suite_hold() def paused(self): diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index cb1d5638d17..43e18ef6e32 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -144,7 +144,10 @@ class TaskPool: ERR_PREFIX_TASKID_MATCH = "No matching tasks found: " - def __init__(self, config, suite_db_mgr, task_events_mgr, job_pool): + def __init__( + self, config, suite_db_mgr, task_events_mgr, + job_pool, data_store_mgr + ): self.config = config self.stop_point = config.final_point self.suite_db_mgr = suite_db_mgr @@ -152,6 +155,7 @@ def __init__(self, config, suite_db_mgr, task_events_mgr, job_pool): # TODO this is ugly: self.task_events_mgr.spawn_func = self.spawn_on_output self.job_pool = job_pool + self.data_store_mgr = data_store_mgr self.flow_label_mgr = FlowLabelMgr() self.do_reload = False @@ -228,6 +232,7 @@ def add_to_runahead_pool(self, itask, is_new=True): self.runahead_pool[itask.point][itask.identity] = itask self.rhpool_changed = True + # add row to "task_states" table if is_new: # add row to "task_states" table: self.suite_db_mgr.put_insert_task_states(itask, { @@ -540,6 +545,15 @@ def release_runahead_task(self, itask): self.pool_changed = True self.pool_changes.append(itask) LOG.debug("[%s] -released to the task pool", itask) + + # The following two could be called in separate places, + # so haven't merged/removed-one. + # Register pool node reference data-store with ID_DELIM format + self.data_store_mgr.add_pool_node(itask.tdef.name, itask.point) + # Create new data-store n-distance graph window about this task + self.data_store_mgr.increment_graph_window( + itask.tdef.name, itask.point, itask.flow_label) + del self.runahead_pool[itask.point][itask.identity] if not self.runahead_pool[itask.point]: del self.runahead_pool[itask.point] @@ -599,8 +613,11 @@ def remove(self, itask, reason=""): del self.runahead_pool[itask.point] self.rhpool_changed = True + # Notify the data-store manager of their removal + # (the manager uses window boundary tracking for pruning). + self.data_store_mgr.remove_pool_node(itask.tdef.name, itask.point) # Event-driven final update of task_states table. - # TODO: same for datastore (still updated by iterating the task pool) + # TODO: same for datastore (still updated by scheduler loop) self.suite_db_mgr.put_update_task_state(itask) LOG.debug("[%s] -%s", itask, msg) del itask @@ -1205,19 +1222,10 @@ def spawn_task(self, name, point, flow_label=None, reflow=True, taskdef, point, flow_label, submit_num=submit_num, reflow=reflow) - if parent_id is not None: - msg = "(" + parent_id + ") spawned %s.%s flow(%s)" - else: - msg = "(no parent) spawned %s.%s %s" - if flow_label is None: - # Manual trigger: new flow - msg += " (new flow)" - if self.hold_point and itask.point > self.hold_point: # Hold if beyond the suite hold point - LOG.info( - "[%s] -holding (beyond suite hold point) %s", - itask, self.hold_point) + LOG.info("[%s] -holding (beyond suite hold point) %s", + itask, self.hold_point) itask.state.reset(is_held=True) if self.stop_point and itask.point <= self.stop_point: future_trigger_overrun = False @@ -1239,6 +1247,14 @@ def spawn_task(self, name, point, flow_label=None, reflow=True, if itask.state.prerequisites_are_not_all_satisfied(): itask.state.satisfy_me(self.abs_outputs_done) + if parent_id is not None: + msg = "(" + parent_id + ") spawned %s.%s flow(%s)" + else: + msg = "(no parent) spawned %s.%s %s" + if flow_label is None: + # Manual trigger: new flow + msg += " (new flow)" + self.add_to_runahead_pool(itask) LOG.info(msg, name, point, flow_label) return itask diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 785bee43915..3b592803cfa 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -29,6 +29,7 @@ TASK_STATUS_WAITING, TASK_OUTPUT_FAILED, TASK_OUTPUT_SUCCEEDED) +from cylc.flow.taskdef import generate_graph_children from cylc.flow.wallclock import get_unix_time_from_time_string as str2time @@ -232,40 +233,9 @@ def __init__(self, tdef, start_point, flow_label, self.state = TaskState(tdef, self.point, status, is_held) # Determine graph children of this task (for spawning). - self.graph_children = {} - for seq, dout in tdef.graph_children.items(): - for output, downs in dout.items(): - if output not in self.graph_children: - self.graph_children[output] = [] - for name, trigger in downs: - child_point = trigger.get_child_point(self.point, seq) - is_abs = (trigger.offset_is_absolute or - trigger.offset_is_from_icp) - if is_abs: - if trigger.get_parent_point(self.point) != self.point: - # If 'foo[^] => bar' only spawn off of '^'. - continue - if seq.is_on_sequence(child_point): - # E.g.: foo should trigger only on T06: - # PT6H = "waz" - # T06 = "waz[-PT6H] => foo" - self.graph_children[output].append( - (name, child_point, is_abs)) - - if tdef.sequential: - # Add next-instance child. - nexts = [] - for seq in tdef.sequences: - nxt = seq.get_next_point(self.point) - if nxt is not None: - # Within sequence bounds. - nexts.append(nxt) - if nexts: - if TASK_OUTPUT_SUCCEEDED not in self.graph_children: - self.graph_children[TASK_OUTPUT_SUCCEEDED] = [] - self.state.outputs.add(TASK_OUTPUT_SUCCEEDED) - self.graph_children[TASK_OUTPUT_SUCCEEDED].append( - (tdef.name, min(nexts), False)) + self.graph_children = generate_graph_children(tdef, self.point) + if TASK_OUTPUT_SUCCEEDED in self.graph_children: + self.state.outputs.add(TASK_OUTPUT_SUCCEEDED) if TASK_OUTPUT_FAILED in self.graph_children: self.failure_handled = True diff --git a/cylc/flow/taskdef.py b/cylc/flow/taskdef.py index 1b7840e0484..4aa4cc685ef 100644 --- a/cylc/flow/taskdef.py +++ b/cylc/flow/taskdef.py @@ -20,9 +20,83 @@ from cylc.flow.exceptions import TaskDefError from cylc.flow.task_id import TaskID +from cylc.flow.task_state import TASK_OUTPUT_SUCCEEDED from cylc.flow import LOG +def generate_graph_children(tdef, point): + """Determine graph children of this task (for spawning).""" + graph_children = {} + for seq, dout in tdef.graph_children.items(): + for output, downs in dout.items(): + if output not in graph_children: + graph_children[output] = [] + for name, trigger in downs: + child_point = trigger.get_child_point(point, seq) + is_abs = (trigger.offset_is_absolute or + trigger.offset_is_from_icp) + if is_abs: + if trigger.get_parent_point(point) != point: + # If 'foo[^] => bar' only spawn off of '^'. + continue + if seq.is_valid(child_point): + # E.g.: foo should trigger only on T06: + # PT6H = "waz" + # T06 = "waz[-PT6H] => foo" + graph_children[output].append((name, child_point, is_abs)) + + if tdef.sequential: + # Add next-instance child. + nexts = [] + for seq in tdef.sequences: + nxt = seq.get_next_point(point) + if nxt is not None: + # Within sequence bounds. + nexts.append(nxt) + if nexts: + if TASK_OUTPUT_SUCCEEDED not in graph_children: + graph_children[TASK_OUTPUT_SUCCEEDED] = [] + graph_children[TASK_OUTPUT_SUCCEEDED].append( + (tdef.name, min(nexts), False)) + + return graph_children + + +def generate_graph_parents(tdef, point): + """Determine graph parents of this task.""" + graph_parents = {} + for seq, ups in tdef.graph_parents.items(): + graph_parents[seq] = [] + for name, trigger in ups: + parent_point = trigger.get_parent_point(point) + is_abs = (trigger.offset_is_absolute or + trigger.offset_is_from_icp) + if is_abs: + if parent_point != point: + # If 'foo[^] => bar' only spawn off of '^'. + continue + if seq.is_valid(parent_point): + # E.g.: foo should trigger only on T06: + # PT6H = "waz" + # T06 = "waz[-PT6H] => foo" + graph_parents[seq].append((name, parent_point, is_abs)) + + if tdef.sequential: + # Add prev-instance parent. + prevs = [] + for seq in tdef.sequences: + prev = seq.get_prev_point(point) + if prev is not None: + # Within sequence bounds. + prevs.append(prev) + if prevs: + if seq not in graph_parents: + graph_parents[seq] = [] + graph_parents[seq].append((tdef.name, min(prevs), False)) + + return graph_parents + + class TaskDef: """Task definition.""" @@ -33,7 +107,7 @@ class TaskDef: "sequential", "is_coldstart", "suite_polling_cfg", "clocktrigger_offset", "expiration_offset", "namespace_hierarchy", "dependencies", "outputs", "param_var", - "graph_children", + "graph_children", "graph_parents", "external_triggers", "xtrig_labels", "name", "elapsed_times"] # Store the elapsed times for a maximum of 10 cycles @@ -62,8 +136,7 @@ def __init__(self, name, rtcfg, run_mode, start_point): self.dependencies = {} self.outputs = set() self.graph_children = {} - # graph_parents not currently used, but might be soon: - # self.graph_parents = {} + self.graph_parents = {} self.param_var = {} self.external_triggers = [] self.xtrig_labels = {} # {sequence: [labels]} @@ -84,15 +157,15 @@ def add_graph_child(self, trigger, taskname, sequence): trigger.output, []).append((taskname, trigger)) # graph_parents not currently used, but might be soon: - # def add_graph_parent(self, trigger, parent, sequence): - # """Record task instances that I depend on. - # { - # sequence: set([(a,t1), (b,t2), ...]) # (task-name, trigger) - # } - # """ - # if sequence not in self.graph_parents: - # self.graph_parents[sequence] = set() - # self.graph_parents[sequence].add((parent, trigger)) + def add_graph_parent(self, trigger, parent, sequence): + """Record task instances that I depend on. + { + sequence: set([(a,t1), (b,t2), ...]) # (task-name, trigger) + } + """ + if sequence not in self.graph_parents: + self.graph_parents[sequence] = set() + self.graph_parents[sequence].add((parent, trigger)) def add_dependency(self, dependency, sequence): """Add a dependency to a named sequence. diff --git a/cylc/flow/tui/util.py b/cylc/flow/tui/util.py index 52bf359ebc8..528809133ee 100644 --- a/cylc/flow/tui/util.py +++ b/cylc/flow/tui/util.py @@ -127,6 +127,10 @@ def compute_tree(flow): # add leaves for task in flow['taskProxies']: + # If there's no first parent, the child will have been deleted + # during/after API query resolution. So ignore. + if not task['firstParent']: + continue task_node = add_node( 'task', task['id'], nodes, data=task) if task['firstParent']['name'] == 'root': @@ -293,7 +297,7 @@ def render_node(node, data, type_): first_child = None # progress information - if data['state'] == TASK_STATUS_RUNNING: + if data['state'] == TASK_STATUS_RUNNING and first_child: start_time = first_child.get_value()['data']['startedTime'] mean_time = data['task']['meanElapsedTime'] diff --git a/tests/functional/graph-equivalence/multiline_and_refs/c-ref b/tests/functional/graph-equivalence/multiline_and_refs/c-ref index 31af0ec6ba5..35ea7325fc4 100644 --- a/tests/functional/graph-equivalence/multiline_and_refs/c-ref +++ b/tests/functional/graph-equivalence/multiline_and_refs/c-ref @@ -3,3 +3,4 @@ prerequisites (- => not satisfied): - b.1 succeeded outputs (- => not completed): + (None) diff --git a/tests/functional/graph-equivalence/multiline_and_refs/c-ref-2 b/tests/functional/graph-equivalence/multiline_and_refs/c-ref-2 index 31af0ec6ba5..35ea7325fc4 100644 --- a/tests/functional/graph-equivalence/multiline_and_refs/c-ref-2 +++ b/tests/functional/graph-equivalence/multiline_and_refs/c-ref-2 @@ -3,3 +3,4 @@ prerequisites (- => not satisfied): - b.1 succeeded outputs (- => not completed): + (None) diff --git a/tests/functional/graphql/01-workflow.t b/tests/functional/graphql/01-workflow.t index cef2ae2e6ef..7c17ff33e5a 100755 --- a/tests/functional/graphql/01-workflow.t +++ b/tests/functional/graphql/01-workflow.t @@ -75,6 +75,8 @@ SUITE_LOG_DIR="$( cylc cat-log -m p "${SUITE_NAME}" \ cylc stop --max-polls=10 --interval=2 --kill "${SUITE_NAME}" # compare to expectation +# Note: Runahead pool has no members on start-up, which means, +# newestRunaheadCyclePoint is expected to be blank. cat > expected << __HERE__ { "workflows": [ @@ -90,7 +92,7 @@ cat > expected << __HERE__ "title": "foo", "description": "bar" }, - "newestRunaheadCyclePoint": "1", + "newestRunaheadCyclePoint": "", "newestCyclePoint": "1", "oldestCyclePoint": "1", "reloaded": false, diff --git a/tests/functional/graphql/02-root-queries.t b/tests/functional/graphql/02-root-queries.t index 969dbec1df5..e1d1c951766 100755 --- a/tests/functional/graphql/02-root-queries.t +++ b/tests/functional/graphql/02-root-queries.t @@ -144,14 +144,32 @@ cat > expected << __HERE__ { "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190101T00${ID_DELIM}baa" }, + { + "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190101T00${ID_DELIM}bar" + }, { "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190101T00${ID_DELIM}foo" }, { "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190101T00${ID_DELIM}qar" }, + { + "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190101T00${ID_DELIM}qaz" + }, { "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190101T00${ID_DELIM}qux" + }, + { + "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190201T00${ID_DELIM}baa" + }, + { + "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190201T00${ID_DELIM}foo" + }, + { + "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190201T00${ID_DELIM}qar" + }, + { + "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190201T00${ID_DELIM}qux" } ], "family": { @@ -184,10 +202,7 @@ cat > expected << __HERE__ ], "edges": [ { - "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}@wall_clock.20190101T00${ID_DELIM}foo.20190101T00" - }, - { - "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}@wall_clock.20190101T00${ID_DELIM}qux.20190101T00" + "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}baa.20190101T00${ID_DELIM}baa.20190201T00" }, { "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}baa.20190101T00${ID_DELIM}qaz.20190101T00" @@ -195,11 +210,20 @@ cat > expected << __HERE__ { "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}foo.20190101T00${ID_DELIM}bar.20190101T00" }, + { + "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}foo.20190101T00${ID_DELIM}foo.20190201T00" + }, + { + "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}qar.20190101T00${ID_DELIM}qar.20190201T00" + }, { "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}qux.20190101T00${ID_DELIM}bar.20190101T00" }, { "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}qux.20190101T00${ID_DELIM}qaz.20190101T00" + }, + { + "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}qux.20190101T00${ID_DELIM}qux.20190201T00" } ], "nodesEdges": { @@ -209,14 +233,17 @@ cat > expected << __HERE__ }, { "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190101T00${ID_DELIM}foo" + }, + { + "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}20190201T00${ID_DELIM}foo" } ], "edges": [ { - "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}@wall_clock.20190101T00${ID_DELIM}foo.20190101T00" + "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}foo.20190101T00${ID_DELIM}bar.20190101T00" }, { - "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}foo.20190101T00${ID_DELIM}bar.20190101T00" + "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}foo.20190101T00${ID_DELIM}foo.20190201T00" } ] } diff --git a/tests/functional/runahead/06-release-update.t b/tests/functional/runahead/06-release-update.t index 8889dd533d0..7ad49d85e00 100644 --- a/tests/functional/runahead/06-release-update.t +++ b/tests/functional/runahead/06-release-update.t @@ -34,6 +34,9 @@ poll_grep_suite_log -F "spawned bar.${NEXT1}" sleep 10 cylc dump -t "${SUITE_NAME}" | awk '{print $1 $2 $3}' >'log' cmp_ok 'log' - <<__END__ +bar,$YYYY,running, +bar,$NEXT1,waiting, +foo,$YYYY,running, foo,$NEXT1,waiting, __END__ diff --git a/tests/integration/test_data_store_mgr.py b/tests/integration/test_data_store_mgr.py index d48dc412a9d..309286c347a 100644 --- a/tests/integration/test_data_store_mgr.py +++ b/tests/integration/test_data_store_mgr.py @@ -75,10 +75,10 @@ def test_get_entire_workflow(harness): assert len(flow_msg.task_proxies) == len(data[TASK_PROXIES]) -def test_increment_graph_elements(harness): - """Test method that adds and removes elements by cycle point.""" +def test_increment_graph_window(harness): + """Test method that adds and removes elements window boundary.""" schd, data = harness - assert schd.data_store_mgr.pool_points + assert schd.data_store_mgr.prune_trigger_nodes assert len(data[TASK_PROXIES]) == 1 @@ -90,23 +90,24 @@ def test_initiate_data_model(harness): assert len(data[WORKFLOW].task_proxies) == 1 -def test_prune_points(harness): +def test_prune_data_store(harness): """Test method that removes data elements by cycle point.""" schd, data = harness - points = [ - standardise_point_string(p.cycle_point) - for p in data[TASK_PROXIES].values() - ] - point = next(iter(points)) - assert point in points + for itask in schd.pool.get_all_tasks(): + schd.data_store_mgr.increment_graph_window( + itask.tdef.name, itask.point, itask.flow_label + ) + schd.data_store_mgr.apply_deltas() schd.data_store_mgr.clear_deltas() - schd.data_store_mgr.prune_points([point]) + before_count = len(schd.data_store_mgr.data[ + schd.data_store_mgr.workflow_id][TASK_PROXIES]) + assert before_count > 0 + schd.data_store_mgr.prune_flagged_nodes.update(set(data[TASK_PROXIES])) + schd.data_store_mgr.prune_data_store() schd.data_store_mgr.apply_deltas() - assert point not in [ - standardise_point_string(p.cycle_point) - for p in schd.data_store_mgr.data[ - schd.data_store_mgr.workflow_id][TASK_PROXIES].values() - ] + after_count = len(schd.data_store_mgr.data[ + schd.data_store_mgr.workflow_id][TASK_PROXIES]) + assert after_count < before_count def test_update_data_structure(harness):