diff --git a/cylc/uiserver/data_store_mgr.py b/cylc/uiserver/data_store_mgr.py index 347dcf0c..94059fa1 100644 --- a/cylc/uiserver/data_store_mgr.py +++ b/cylc/uiserver/data_store_mgr.py @@ -39,6 +39,7 @@ import logging import time +from cylc.flow import ID_DELIM from cylc.flow.network.server import PB_METHOD_MAP from cylc.flow.network import MSG_TIMEOUT from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg @@ -80,11 +81,14 @@ def update_contact(self, w_id, contact_data=None): flow.owner = contact_data['owner'] flow.host = contact_data[CFF.HOST] flow.port = int(contact_data[CFF.PORT]) + # flow.pub_port = int(contact_data[CFF.PUBLISH_PORT]) flow.api_version = int(contact_data[CFF.API]) else: # wipe pre-existing contact-file data + flow.owner, flow.name = w_id.split(ID_DELIM) flow.host = '' flow.port = 0 + # flow.pub_port = 0 flow.api_version = 0 flow.status = 'stopped' @@ -104,16 +108,16 @@ async def sync_workflow(self, w_id, contact_data): blocking the main loop. """ - print(f'$ sync_workflow({w_id})') - - self.update_contact(w_id, contact_data) - + logger.debug(f'sync_workflow({w_id})') if self.loop is None: self.loop = asyncio.get_running_loop() + + # don't sync if subscription exists if w_id in self.w_subs: return self.delta_queues[w_id] = {} + self.update_contact(w_id, contact_data) # Might be options other than threads to achieve # non-blocking subscriptions, but this works. @@ -124,13 +128,13 @@ async def sync_workflow(self, w_id, contact_data): w_id, contact_data['name'], contact_data[CFF.HOST], - contact_data[CFF.PORT] + contact_data[CFF.PUBLISH_PORT] ) ) await self.entire_workflow_update(ids=[w_id]) async def register_workflow(self, w_id, name, owner): - print(f'$ register_workflow({w_id})') + logger.debug(f'register_workflow({w_id})') self.delta_queues[w_id] = {} # create new entry in the data store @@ -141,21 +145,23 @@ async def register_workflow(self, w_id, name, owner): self.update_contact(w_id) def stop_workflow(self, w_id): - print(f'$ stop_workflow({w_id})') + logger.debug(f'stop_workflow({w_id})') self.update_contact(w_id) - def purge_workflow(self, w_id): + def purge_workflow(self, w_id, data=True): """Purge the manager of a workflow's subscription and data.""" - print(f'$ purge_workflow({w_id})') + logger.debug(f'purge_workflow({w_id})') if w_id in self.w_subs: self.w_subs[w_id].stop() del self.w_subs[w_id] - if w_id in self.data: - del self.data[w_id] - if w_id in self.delta_queues: - del self.delta_queues[w_id] - self.executors[w_id].shutdown(wait=True) - del self.executors[w_id] + if data: + if w_id in self.data: + del self.data[w_id] + if w_id in self.delta_queues: + del self.delta_queues[w_id] + if w_id in self.executors: + self.executors[w_id].shutdown(wait=True) + del self.executors[w_id] def start_subscription(self, w_id, reg, host, port): """Instantiate and run subscriber data-store sync. diff --git a/cylc/uiserver/tests/test_workflows_mgr.py b/cylc/uiserver/tests/test_workflows_mgr.py index 0868287d..e4a31d38 100644 --- a/cylc/uiserver/tests/test_workflows_mgr.py +++ b/cylc/uiserver/tests/test_workflows_mgr.py @@ -13,12 +13,22 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +from itertools import product +from pathlib import Path import pytest +from random import random from pytest_mock import MockFixture -from cylc.uiserver.workflows_mgr import * +from cylc.flow import ID_DELIM from cylc.flow.exceptions import ClientTimeout +from cylc.flow.network import API +from cylc.flow.suite_files import ( + ContactFileFields as CFF, + SuiteFiles +) + +from cylc.uiserver.workflows_mgr import * from .conftest import AsyncClientFixture @@ -152,3 +162,104 @@ def test_workflows_manager_spawn_workflow(): assert not mgr.workflows # TODO: add tests for remaining methods in WorkflowsManager + + +def mk_flow(path, reg, active=True): + """Make a workflow appear on the filesystem for scan purposes. + + Args: + path (pathlib.Path): + The directory to create the mocked workflow in. + reg (str): + The registered name for this workflow. + active (bool): + If True then a contact file will be written. + + """ + run_dir = path / reg + srv_dir = run_dir / SuiteFiles.Service.DIRNAME + contact = srv_dir / SuiteFiles.Service.CONTACT + fconfig = run_dir / SuiteFiles.FLOW_FILE + run_dir.mkdir() + fconfig.touch() # cylc uses this to identify a dir as a workflow + if active: + srv_dir.mkdir() + with open(contact, 'w+') as contact_file: + contact_file.write( + '\n'.join([ + f'{CFF.API}={API}', + f'{CFF.HOST}=42', + f'{CFF.PORT}=42', + f'{CFF.UUID}=42' + ]) + ) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + # generate all possile state changes + 'active_before,active_after', + product(['active', 'inactive', None], repeat=2) +) +async def test_workflow_state_changes(tmp_path, active_before, active_after): + """It correctly identifies workflow state changes from the filesystem.""" + tmp_path /= str(random()) + tmp_path.mkdir() + + # mock the results of the previous scan + wfm = WorkflowsManager(None, context=None, run_dir=tmp_path) + wid = f'{wfm.owner}{ID_DELIM}a' + if active_before == 'active': + wfm.active[wid] = { + CFF.API: API, + CFF.UUID: '42' + } + elif active_before == 'inactive': + wfm.inactive.add(wid) + + # mock the filesystem in the new state + if active_after == 'active': + mk_flow(tmp_path, 'a', active=True) + if active_after == 'inactive': + mk_flow(tmp_path, 'a', active=False) + + # see what state changes the workflow manager detects + changes = [] + async for change in wfm._workflow_state_changes(): + changes.append(change) + + # compare those changes to expectations + if active_before == active_after: + assert changes == [] + else: + assert len(changes) == 1 + assert (wid, active_before, active_after) == changes[0][:3] + + +@pytest.mark.asyncio +async def test_workflow_state_change_restart(tmp_path): + """It identifies workflows which have restarted between scans.""" + # mock the result of the previous scan + wfm = WorkflowsManager(None, context=None, run_dir=tmp_path) + wid = f'{wfm.owner}{ID_DELIM}a' + wfm.active[wid] = { + CFF.API: API, + CFF.UUID: '41' + } + + # create a new workflow with the same name but a different UUID + mk_flow(tmp_path, 'a', active=True) + + # see what state changes the workflow manager detects + changes = [] + async for change in wfm._workflow_state_changes(): + changes.append(change) + + # the flow should be marked as becomming inactive then active again + assert [change[:3] for change in changes] == [ + (wid, 'active', 'inactive'), + (wid, 'inactive', 'active') + ] + + # it should have picked up the new uuid too + assert changes[1][3][CFF.UUID] == '42' diff --git a/cylc/uiserver/workflows_mgr.py b/cylc/uiserver/workflows_mgr.py index a173dec5..e2fb14af 100644 --- a/cylc/uiserver/workflows_mgr.py +++ b/cylc/uiserver/workflows_mgr.py @@ -98,9 +98,8 @@ async def est_workflow(reg, host, port, pub_port, context=None, timeout=None): class WorkflowsManager: - """Discover and Manage workflows.""" - def __init__(self, uiserver, context=None): + def __init__(self, uiserver, context=None, run_dir=None): self.uiserver = uiserver if context is None: self.context = zmq.asyncio.Context() @@ -112,7 +111,7 @@ def __init__(self, uiserver, context=None): self.inactive = set() self._scan_pipe = ( # all flows on the filesystem - scan + scan(run_dir) # only flows which have a contact file # | is_active(True) # stop here is the flow is stopped, else... @@ -171,7 +170,7 @@ async def _workflow_state_changes(self): ): # this flow is running but it's a different run active.add(wid) - yield (wid, 'active', 'inactive', flow) + yield (wid, 'active', 'inactive', self.active[wid]) yield (wid, 'inactive', 'active', flow) else: @@ -190,18 +189,14 @@ async def _workflow_state_changes(self): for wid in inactive_before - (active | inactive): yield (wid, 'inactive', None, None) - # return active, inactive - async def _register(self, wid, flow): """Register a new workflow with the data store.""" - print(f'_register({wid})') await self.uiserver.data_store_mgr.register_workflow( wid, flow['name'], flow['owner'] ) async def _connect(self, wid, flow): """Open a connection to a running workflow.""" - print(f'_connect({wid})') self.active[wid] = flow flow['req_client'] = SuiteRuntimeClient(flow['name']) await self.uiserver.data_store_mgr.sync_workflow( @@ -211,14 +206,12 @@ async def _connect(self, wid, flow): async def _disconnect(self, wid): """Disconnect from a running workflow.""" - print(f'_disconnect({wid})') client = self.active[wid]['req_client'] with suppress(IOError): client.stop(stop_loop=False) async def _unregister(self, wid): """Unregister a workflow from the data store.""" - print(f'_unregister({wid})') self.uiserver.data_store_mgr.purge_workflow(wid) async def _stop(self, wid): @@ -226,6 +219,7 @@ async def _stop(self, wid): The workflow can't do this itself, because it's not running. """ + self.uiserver.data_store_mgr.purge_workflow(wid, data=False) self.uiserver.data_store_mgr.stop_workflow(wid) async def update(self): @@ -245,8 +239,6 @@ async def update(self): self.stopping.clear() async for wid, before, after, flow in self._workflow_state_changes(): - print(f'# {wid} {before}->{after}') - # handle state changes if before == 'active' and after == 'inactive': await self._disconnect(wid)