Skip to content

Commit

Permalink
review fix-ups
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Oct 27, 2023
1 parent 15791c3 commit f68a63b
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 32 deletions.
41 changes: 19 additions & 22 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,6 @@ def increment_graph_window(
# There may be short cuts for parent locs, however children will more
# likely be incomplete walks with no 'done_locs' and using parent's
# children will required sifting out cousin branches.
new_locs: List[str]
working_locs: List[str] = []
if self.n_edge_distance > 1:
if c_tag in active_locs:
Expand All @@ -815,20 +814,20 @@ def increment_graph_window(
# Most will be incomplete walks, however, we can check.
# i.e. parents of children may all exist.
if w_loc[:-1] in active_locs:
for b_id in active_locs[w_loc[:-1]]:
if b_id not in all_walks:
for loc_id in active_locs[w_loc[:-1]]:
if loc_id not in all_walks:
loc_done = False
break
else:
continue

Check warning on line 822 in cylc/flow/data_store_mgr.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/data_store_mgr.py#L822

Added line #L822 was not covered by tests
# find child nodes of parent location,
# i.e. 'cpcc' = 'cpc' + 'c'
w_set = set().union(*(
all_walks[b_id]['locations'][w_loc[-1]]
for b_id in active_locs[w_loc[:-1]]
all_walks[loc_id]['locations'][w_loc[-1]]
for loc_id in active_locs[w_loc[:-1]]
if (
b_id in all_walks
and w_loc[-1] in all_walks[b_id]['locations']
loc_id in all_walks
and w_loc[-1] in all_walks[loc_id]['locations']
)
))
w_set.difference_update(active_walk['walk_ids'])
Expand All @@ -846,11 +845,12 @@ def increment_graph_window(
active_walk['done_ids'].update(
active_locs[w_loc[:-1]]
)
new_locs = []
for loc in working_locs:
if loc in active_locs and len(loc) < self.n_edge_distance:
new_locs.extend((loc + c_tag, loc + p_tag))
working_locs = new_locs
working_locs = [
new_loc
for loc in working_locs
if loc in active_locs and len(loc) < self.n_edge_distance
for new_loc in (loc + c_tag, loc + p_tag)
]
n_depth += 1

# Graph walk
Expand Down Expand Up @@ -1167,9 +1167,8 @@ def generate_ghost_task(
n_depth (int): n-window graph edge distance.
Returns:
(is_orphan, graph_children)
Orphan tasks with no children return (True, None) respectively.
None
"""
tp_id = tokens.id
Expand All @@ -1183,10 +1182,6 @@ def generate_ghost_task(
point_string = tokens['cycle']
t_id = self.definition_id(name)

is_orphan = False
if name not in self.schd.config.taskdefs:
is_orphan = True

if itask is None:
itask = self.schd.pool.get_task(point_string, name)

Expand All @@ -1200,7 +1195,9 @@ def generate_ghost_task(
data_mode=True
)

if is_orphan:
is_orphan = False
if name not in self.schd.config.taskdefs:
is_orphan = True
self.generate_orphan_task(itask)

# Most of the time the definition node will be in the store.
Expand Down Expand Up @@ -1796,13 +1793,13 @@ def window_depth_finder(self):
"""Recalculate window depths, creating depth deltas."""
# Setup new window depths
n_window_depths: Dict(int, Set(str)) = {
0: set().union(self.all_task_pool)
0: self.all_task_pool.copy()
}

depth = 1
# Since starting from smaller depth, exclude those whose depth has
# already been found.
depth_found_tasks: Set(str) = set().union(self.all_task_pool)
depth_found_tasks: Set[str] = self.all_task_pool.copy()
while depth <= self.n_edge_distance:
n_window_depths[depth] = set().union(*(
self.n_window_node_walks[n_id]['depths'][depth]
Expand Down Expand Up @@ -1878,7 +1875,7 @@ def prune_data_store(self):
else:
node_ids.remove(tp_id)
continue
self.n_window_edges.difference_update(set(node.edges))
self.n_window_edges.difference_update(node.edges)
if tp_id in self.n_window_node_walks:
del self.n_window_node_walks[tp_id]
if tp_id in self.n_window_completed_walks:
Expand Down
1 change: 0 additions & 1 deletion tests/functional/n-window/01-past-present-future.t
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
TEST_NAME="${TEST_NAME_BASE}-validate"
run_ok "${TEST_NAME}" cylc validate "${WORKFLOW_NAME}"

# First run: task c shuts the scheduler down then fails.
TEST_NAME="${TEST_NAME_BASE}-run"
# 'a => b => c => d => e', 'a' sets window size to 2, 'c' uses cylc show on all.
workflow_run_ok "${TEST_NAME}" cylc play --no-detach --debug "${WORKFLOW_NAME}"
Expand Down
1 change: 0 additions & 1 deletion tests/functional/n-window/02-big-window.t
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
TEST_NAME="${TEST_NAME_BASE}-validate"
run_ok "${TEST_NAME}" cylc validate "${WORKFLOW_NAME}"

# First run: task c shuts the scheduler down then fails.
TEST_NAME="${TEST_NAME_BASE}-run"
# 'a => b => . . . f => g => h', 'a' sets window size to 5,
# 'b => i => j => f', 'c' finds 'a', 'j', 'h'
Expand Down
16 changes: 8 additions & 8 deletions tests/integration/test_increment_graph_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def advance():
# the tasks we will run in order
letters = 'abcdefghijklmnopqrs'
# the graph-depth of the "blink" task at each stage of the workflow
blink_distances = [1] + (list((*range(2, 5), *range(3, 0, -1))) * 3)
blink_distances = [1] + [*range(2, 5), *range(3, 0, -1)] * 3

for ind, blink_distance in zip(range(len(letters)), blink_distances):
previous_task = letters[ind - 1] if ind > 0 else None
Expand Down Expand Up @@ -251,7 +251,7 @@ def advance():
await schd.update_data_structure()

previous_n_window = {}
for previous_task, active_task, n_window in advance():
for previous_task, active_task, expected_n_window in advance():
# mark the previous task as completed
await complete_task(schd, previous_task)
# add the next task to the pool
Expand All @@ -262,28 +262,28 @@ def advance():
added, updated, pruned = get_deltas(schd)

# compare the n-window in the store to what we were expecting
_n_window = await get_n_window(schd)
assert _n_window == n_window
n_window = await get_n_window(schd)
assert n_window == expected_n_window

# compate the deltas to what we were expecting
# compare the deltas to what we were expecting
if active_task != 'a':
# skip the first task as this is complicated by startup logic
assert added == {
key: value
for key, value in n_window.items()
for key, value in expected_n_window.items()
if key not in previous_n_window
}
# Skip added as depth isn't updated
# (the manager only updates those that need it)
assert updated == {
key: value
for key, value in n_window.items()
for key, value in expected_n_window.items()
if key not in added
}
assert pruned == {
key
for key in previous_n_window
if key not in n_window
if key not in expected_n_window
}

previous_n_window = n_window
Expand Down

0 comments on commit f68a63b

Please sign in to comment.