Skip to content

Commit

Permalink
Preserve n-window depth on reload
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie committed Mar 19, 2024
1 parent f97aa86 commit 2434761
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 11 deletions.
10 changes: 5 additions & 5 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ class DataStoreMgr:
ERR_PREFIX_JOBID_MATCH = 'No matching jobs found: '
ERR_PREFIX_JOB_NOT_ON_SEQUENCE = 'Invalid cycle point for job: '

def __init__(self, schd):
def __init__(self, schd, n_edge_distance=1):
self.schd = schd
self.id_ = Tokens(
user=self.schd.owner,
Expand All @@ -477,7 +477,7 @@ def __init__(self, schd):
self.updated_state_families = set()
# Update workflow state totals once more post delta application.
self.state_update_follow_on = False
self.n_edge_distance = 1
self.n_edge_distance = n_edge_distance
self.next_n_edge_distance = None
self.latest_state_tasks = {
state: deque(maxlen=LATEST_STATE_TASKS_QUEUE_SIZE)
Expand Down Expand Up @@ -530,7 +530,7 @@ def initiate_data_model(self, reloaded=False):
"""
# Reset attributes/data-store on reload:
if reloaded:
self.__init__(self.schd)
self.__init__(self.schd, self.n_edge_distance)

# Static elements
self.generate_definition_elements()
Expand Down Expand Up @@ -2110,11 +2110,11 @@ def _family_ascent_point_update(self, fp_id):
self.state_update_families.add(fam_node.first_parent)
self.state_update_families.remove(fp_id)

def set_graph_window_extent(self, n_edge_distance):
def set_graph_window_extent(self, n_edge_distance: int) -> None:
"""Set what the max edge distance will change to.
Args:
n_edge_distance (int):
n_edge_distance:
Maximum edge distance from active node.
"""
Expand Down
13 changes: 7 additions & 6 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,24 +839,25 @@ def put_messages(
)
return (True, f'Messages queued: {len(messages)}')

def set_graph_window_extent(self, n_edge_distance):
def set_graph_window_extent(
self, n_edge_distance: int
) -> Tuple[bool, str]:
"""Set data-store graph window to new max edge distance.
Args:
n_edge_distance (int):
n_edge_distance:
Max edge distance 0..n from active node.
Returns:
tuple: (outcome, message)
outcome (bool)
outcome
True if command successfully queued.
message (str)
message
Information about outcome.
"""
if n_edge_distance >= 0:
self.schd.data_store_mgr.set_graph_window_extent(n_edge_distance)
return (True, f'Maximum edge distance set to {n_edge_distance}')
else:
return (False, 'Edge distance cannot be negative')
return (False, 'Edge distance cannot be negative')
7 changes: 7 additions & 0 deletions tests/integration/test_data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,17 @@ def test_increment_graph_window(harness):

def test_initiate_data_model(harness):
"""Test method that generates all data elements in order."""
schd: Scheduler
schd, data = harness
assert len(data[WORKFLOW].task_proxies) == 2
schd.data_store_mgr.initiate_data_model(reloaded=True)
assert len(data[WORKFLOW].task_proxies) == 2
# Check n-window preserved on reload:
schd.data_store_mgr.set_graph_window_extent(2)
schd.data_store_mgr.update_data_structure()
assert schd.data_store_mgr.n_edge_distance == 2
schd.data_store_mgr.initiate_data_model(reloaded=True)
assert schd.data_store_mgr.n_edge_distance == 2


async def test_delta_task_state(harness):
Expand Down

0 comments on commit 2434761

Please sign in to comment.