From 2434761ed95abf39700f13be5bcbc11cef811f10 Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Tue, 19 Mar 2024 17:36:15 +0000 Subject: [PATCH] Preserve n-window depth on reload --- cylc/flow/data_store_mgr.py | 10 +++++----- cylc/flow/network/resolvers.py | 13 +++++++------ tests/integration/test_data_store_mgr.py | 7 +++++++ 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index 7a59e76d7a4..0d79fc22b57 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -463,7 +463,7 @@ class DataStoreMgr: ERR_PREFIX_JOBID_MATCH = 'No matching jobs found: ' ERR_PREFIX_JOB_NOT_ON_SEQUENCE = 'Invalid cycle point for job: ' - def __init__(self, schd): + def __init__(self, schd, n_edge_distance=1): self.schd = schd self.id_ = Tokens( user=self.schd.owner, @@ -477,7 +477,7 @@ def __init__(self, schd): self.updated_state_families = set() # Update workflow state totals once more post delta application. self.state_update_follow_on = False - self.n_edge_distance = 1 + self.n_edge_distance = n_edge_distance self.next_n_edge_distance = None self.latest_state_tasks = { state: deque(maxlen=LATEST_STATE_TASKS_QUEUE_SIZE) @@ -530,7 +530,7 @@ def initiate_data_model(self, reloaded=False): """ # Reset attributes/data-store on reload: if reloaded: - self.__init__(self.schd) + self.__init__(self.schd, self.n_edge_distance) # Static elements self.generate_definition_elements() @@ -2110,11 +2110,11 @@ def _family_ascent_point_update(self, fp_id): self.state_update_families.add(fam_node.first_parent) self.state_update_families.remove(fp_id) - def set_graph_window_extent(self, n_edge_distance): + def set_graph_window_extent(self, n_edge_distance: int) -> None: """Set what the max edge distance will change to. Args: - n_edge_distance (int): + n_edge_distance: Maximum edge distance from active node. """ diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index ea9bde97be2..5451ef501aa 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -839,24 +839,25 @@ def put_messages( ) return (True, f'Messages queued: {len(messages)}') - def set_graph_window_extent(self, n_edge_distance): + def set_graph_window_extent( + self, n_edge_distance: int + ) -> Tuple[bool, str]: """Set data-store graph window to new max edge distance. Args: - n_edge_distance (int): + n_edge_distance: Max edge distance 0..n from active node. Returns: tuple: (outcome, message) - outcome (bool) + outcome True if command successfully queued. - message (str) + message Information about outcome. """ if n_edge_distance >= 0: self.schd.data_store_mgr.set_graph_window_extent(n_edge_distance) return (True, f'Maximum edge distance set to {n_edge_distance}') - else: - return (False, 'Edge distance cannot be negative') + return (False, 'Edge distance cannot be negative') diff --git a/tests/integration/test_data_store_mgr.py b/tests/integration/test_data_store_mgr.py index 78b24200634..65105d4b8a3 100644 --- a/tests/integration/test_data_store_mgr.py +++ b/tests/integration/test_data_store_mgr.py @@ -157,10 +157,17 @@ def test_increment_graph_window(harness): def test_initiate_data_model(harness): """Test method that generates all data elements in order.""" + schd: Scheduler schd, data = harness assert len(data[WORKFLOW].task_proxies) == 2 schd.data_store_mgr.initiate_data_model(reloaded=True) assert len(data[WORKFLOW].task_proxies) == 2 + # Check n-window preserved on reload: + schd.data_store_mgr.set_graph_window_extent(2) + schd.data_store_mgr.update_data_structure() + assert schd.data_store_mgr.n_edge_distance == 2 + schd.data_store_mgr.initiate_data_model(reloaded=True) + assert schd.data_store_mgr.n_edge_distance == 2 async def test_delta_task_state(harness):