Skip to content

Commit

Permalink
post cylc#3823 fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Nov 10, 2020
1 parent 00ae838 commit e2a411d
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
26 changes: 17 additions & 9 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
from cylc.flow.suite_status import get_suite_status
from cylc.flow.task_id import TaskID
from cylc.flow.task_job_logs import JOB_LOG_OPTS
from cylc.flow.task_state import TASK_STATUS_WAITING
from cylc.flow.task_state import TASK_STATUS_WAITING, TASK_STATUS_EXPIRED
from cylc.flow.task_state_prop import extract_group_state
from cylc.flow.taskdef import generate_graph_children, generate_graph_parents
from cylc.flow.wallclock import (
Expand Down Expand Up @@ -542,7 +542,8 @@ def generate_definition_elements(self):

def increment_graph_window(
self, name, point, flow_label,
edge_distance=0, active_id=None, descendant=False):
edge_distance=0, active_id=None,
descendant=False, is_parent=False):
"""Generate graph window about given origin to n-edge-distance.
Args:
Expand Down Expand Up @@ -598,7 +599,7 @@ def increment_graph_window(

self.n_window_nodes[active_id].add(s_id)
# Generate task node
self.generate_ghost_task(s_id, name, point, flow_label)
self.generate_ghost_task(s_id, name, point, flow_label, is_parent)

edge_distance += 1

Expand Down Expand Up @@ -650,9 +651,9 @@ def _expand_graph_window(
if e_id in self.n_window_edges[active_id]:
continue
if (
e_id not in self.data[self.workflow_id][EDGES] and
e_id not in self.added[EDGES] and
edge_distance <= self.n_edge_distance
e_id not in self.data[self.workflow_id][EDGES]
and e_id not in self.added[EDGES]
and edge_distance <= self.n_edge_distance
):
self.added[EDGES][e_id] = PbEdge(
id=e_id,
Expand All @@ -671,7 +672,7 @@ def _expand_graph_window(
continue
self.increment_graph_window(
t_name, t_point, flow_label,
copy(edge_distance), active_id, descendant)
copy(edge_distance), active_id, descendant, is_parent)

def remove_pool_node(self, name, point):
"""Remove ID reference and flag isolate node/branch for pruning."""
Expand All @@ -692,7 +693,8 @@ def add_pool_node(self, name, point):
tp_id = f'{self.workflow_id}{ID_DELIM}{point}{ID_DELIM}{name}'
self.all_task_pool.add(tp_id)

def generate_ghost_task(self, tp_id, name, point, flow_label):
def generate_ghost_task(
self, tp_id, name, point, flow_label, is_parent=False):
"""Create task-point element populated with static data.
Args:
Expand Down Expand Up @@ -731,6 +733,12 @@ def generate_ghost_task(self, tp_id, name, point, flow_label):
state=TASK_STATUS_WAITING,
flow_label=flow_label
)
if is_parent and tp_id not in self.n_window_nodes:
# TODO: Load task info from DB
tproxy.state = TASK_STATUS_EXPIRED
else:
tproxy.state = TASK_STATUS_WAITING

tproxy.namespace[:] = taskdef.namespace
tproxy.ancestors[:] = [
f'{self.workflow_id}{ID_DELIM}{point_string}{ID_DELIM}{a_name}'
Expand Down Expand Up @@ -863,7 +871,7 @@ def prune_data_store(self):
for k, v in self.n_window_nodes.items()
if k in self.all_task_pool
])
out_paths_nodes = set().union(*[
out_paths_nodes = self.prune_flagged_nodes.union(*[
v
for k, v in self.n_window_nodes.items()
if k in self.prune_flagged_nodes
Expand Down
3 changes: 1 addition & 2 deletions tests/functional/runahead/06-release-update.t
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ CYLC_RUN_PID="$!"
poll_suite_running
YYYY="$(date +%Y)"
NEXT1=$(( YYYY + 1 ))
NEXT2=$(( YYYY + 2 ))
poll_grep_suite_log -F "spawned bar.${NEXT1}"

# sleep a little to allow the datastore to update (`cylc dump` sees the
Expand All @@ -37,7 +36,7 @@ cylc dump -t "${SUITE_NAME}" | awk '{print $1 $2 $3}' >'log'
cmp_ok 'log' - <<__END__
bar,$YYYY,running,
bar,$NEXT1,waiting,
bar,$NEXT2,waiting,
foo,$YYYY,running,
foo,$NEXT1,waiting,
__END__

Expand Down

0 comments on commit e2a411d

Please sign in to comment.