From c7556765fe0421e4051512d79a8bce10afe9939e Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Wed, 4 Nov 2020 15:59:02 +1300 Subject: [PATCH] post #3823 fixes --- cylc/flow/data_store_mgr.py | 26 ++++++++++++------- tests/functional/runahead/06-release-update.t | 3 +-- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index 2411b22e209..fff4ec1b2b2 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -73,7 +73,7 @@ from cylc.flow.suite_status import get_suite_status from cylc.flow.task_id import TaskID from cylc.flow.task_job_logs import JOB_LOG_OPTS -from cylc.flow.task_state import TASK_STATUS_WAITING +from cylc.flow.task_state import TASK_STATUS_WAITING, TASK_STATUS_EXPIRED from cylc.flow.task_state_prop import extract_group_state from cylc.flow.taskdef import generate_graph_children, generate_graph_parents from cylc.flow.wallclock import ( @@ -542,7 +542,8 @@ def generate_definition_elements(self): def increment_graph_window( self, name, point, flow_label, - edge_distance=0, active_id=None, descendant=False): + edge_distance=0, active_id=None, + descendant=False, is_parent=False): """Generate graph window about given origin to n-edge-distance. Args: @@ -598,7 +599,7 @@ def increment_graph_window( self.n_window_nodes[active_id].add(s_id) # Generate task node - self.generate_ghost_task(s_id, name, point, flow_label) + self.generate_ghost_task(s_id, name, point, flow_label, is_parent) edge_distance += 1 @@ -650,9 +651,9 @@ def _expand_graph_window( if e_id in self.n_window_edges[active_id]: continue if ( - e_id not in self.data[self.workflow_id][EDGES] and - e_id not in self.added[EDGES] and - edge_distance <= self.n_edge_distance + e_id not in self.data[self.workflow_id][EDGES] + and e_id not in self.added[EDGES] + and edge_distance <= self.n_edge_distance ): self.added[EDGES][e_id] = PbEdge( id=e_id, @@ -671,7 +672,7 @@ def _expand_graph_window( continue self.increment_graph_window( t_name, t_point, flow_label, - copy(edge_distance), active_id, descendant) + copy(edge_distance), active_id, descendant, is_parent) def remove_pool_node(self, name, point): """Remove ID reference and flag isolate node/branch for pruning.""" @@ -692,7 +693,8 @@ def add_pool_node(self, name, point): tp_id = f'{self.workflow_id}{ID_DELIM}{point}{ID_DELIM}{name}' self.all_task_pool.add(tp_id) - def generate_ghost_task(self, tp_id, name, point, flow_label): + def generate_ghost_task( + self, tp_id, name, point, flow_label, is_parent=False): """Create task-point element populated with static data. Args: @@ -731,6 +733,12 @@ def generate_ghost_task(self, tp_id, name, point, flow_label): state=TASK_STATUS_WAITING, flow_label=flow_label ) + if is_parent and tp_id not in self.n_window_nodes: + # TODO: Load task info from DB + tproxy.state = TASK_STATUS_EXPIRED + else: + tproxy.state = TASK_STATUS_WAITING + tproxy.namespace[:] = taskdef.namespace tproxy.ancestors[:] = [ f'{self.workflow_id}{ID_DELIM}{point_string}{ID_DELIM}{a_name}' @@ -863,7 +871,7 @@ def prune_data_store(self): for k, v in self.n_window_nodes.items() if k in self.all_task_pool ]) - out_paths_nodes = set().union(*[ + out_paths_nodes = self.prune_flagged_nodes.union(*[ v for k, v in self.n_window_nodes.items() if k in self.prune_flagged_nodes diff --git a/tests/functional/runahead/06-release-update.t b/tests/functional/runahead/06-release-update.t index 1f0e4ed5a52..7ad49d85e00 100644 --- a/tests/functional/runahead/06-release-update.t +++ b/tests/functional/runahead/06-release-update.t @@ -27,7 +27,6 @@ CYLC_RUN_PID="$!" poll_suite_running YYYY="$(date +%Y)" NEXT1=$(( YYYY + 1 )) -NEXT2=$(( YYYY + 2 )) poll_grep_suite_log -F "spawned bar.${NEXT1}" # sleep a little to allow the datastore to update (`cylc dump` sees the @@ -37,7 +36,7 @@ cylc dump -t "${SUITE_NAME}" | awk '{print $1 $2 $3}' >'log' cmp_ok 'log' - <<__END__ bar,$YYYY,running, bar,$NEXT1,waiting, -bar,$NEXT2,waiting, +foo,$YYYY,running, foo,$NEXT1,waiting, __END__