diff --git a/cylc/uiserver/data_store_mgr.py b/cylc/uiserver/data_store_mgr.py index e8cd7bfa..347dcf0c 100644 --- a/cylc/uiserver/data_store_mgr.py +++ b/cylc/uiserver/data_store_mgr.py @@ -37,7 +37,7 @@ from copy import deepcopy from functools import partial import logging -from time import sleep +import time from cylc.flow.network.server import PB_METHOD_MAP from cylc.flow.network import MSG_TIMEOUT @@ -46,6 +46,8 @@ EDGES, DATA_TEMPLATE, ALL_DELTAS, DELTAS_MAP, WORKFLOW, apply_delta, generate_checksum, create_delta_store ) +from cylc.flow.suite_files import ContactFileFields as CFF + from .workflows_mgr import workflow_request logger = logging.getLogger(__name__) @@ -67,7 +69,34 @@ def __init__(self, workflows_mgr): self.executors = {} self.delta_queues = {} - async def sync_workflow(self, w_id, *args, **kwargs): + def update_contact(self, w_id, contact_data=None): + delta = DELTAS_MAP[WORKFLOW]() + delta.time = time.time() + flow = delta.updated + flow.id = w_id + if contact_data: + # update with contact file data + flow.name = contact_data['name'] + flow.owner = contact_data['owner'] + flow.host = contact_data[CFF.HOST] + flow.port = int(contact_data[CFF.PORT]) + flow.api_version = int(contact_data[CFF.API]) + else: + # wipe pre-existing contact-file data + flow.host = '' + flow.port = 0 + flow.api_version = 0 + flow.status = 'stopped' + + # Apply to existing workflow data + apply_delta(WORKFLOW, delta, self.data[w_id]) + if 'delta_times' not in self.data[w_id]: + self.data[w_id]['delta_times'] = {} + self.data[w_id]['delta_times'][WORKFLOW] = delta.time + # Queue delta for subscription push + self.delta_store_to_queues(w_id, ALL_DELTAS, delta) + + async def sync_workflow(self, w_id, contact_data): """Run data store sync with workflow services. Subscriptions and sync management is instantiated and run in @@ -76,6 +105,9 @@ async def sync_workflow(self, w_id, *args, **kwargs): """ print(f'$ sync_workflow({w_id})') + + self.update_contact(w_id, contact_data) + if self.loop is None: self.loop = asyncio.get_running_loop() if w_id in self.w_subs: @@ -87,25 +119,30 @@ async def sync_workflow(self, w_id, *args, **kwargs): # non-blocking subscriptions, but this works. self.executors[w_id] = ThreadPoolExecutor() self.executors[w_id].submit( - partial(self.start_subscription, w_id, *args, **kwargs) + partial( + self.start_subscription, + w_id, + contact_data['name'], + contact_data[CFF.HOST], + contact_data[CFF.PORT] + ) ) await self.entire_workflow_update(ids=[w_id]) async def register_workflow(self, w_id, name, owner): print(f'$ register_workflow({w_id})') + self.delta_queues[w_id] = {} + + # create new entry in the data store data = deepcopy(DATA_TEMPLATE) - flow = data[WORKFLOW] - flow.id = w_id - flow.name = name - flow.owner = owner - flow.status = 'stopped' self.data[w_id] = data + # create new entry in the delta store + self.update_contact(w_id) + def stop_workflow(self, w_id): print(f'$ stop_workflow({w_id})') - self.data[w_id]['status'] = 'stopped' - self.data[w_id]['host'] = None - self.data[w_id]['port'] = None + self.update_contact(w_id) def purge_workflow(self, w_id): """Purge the manager of a workflow's subscription and data.""" @@ -158,7 +195,7 @@ def update_workflow_data(self, topic, delta, w_id): while loop_cnt < self.INIT_DATA_WAIT_TIME: if w_id in self.data: break - sleep(self.INIT_DATA_RETRY_DELAY) + time.sleep(self.INIT_DATA_RETRY_DELAY) loop_cnt += 1 continue if topic == 'shutdown': @@ -250,9 +287,9 @@ async def entire_workflow_update(self, ids=None): ids = [] # Prune old data - for w_id in list(self.data): - if w_id not in self.workflows_mgr.active: - del self.data[w_id] + # for w_id in list(self.data): + # if w_id not in self.workflows_mgr.active: + # del self.data[w_id] # Request new data req_method = 'pb_entire_workflow' diff --git a/cylc/uiserver/workflows_mgr.py b/cylc/uiserver/workflows_mgr.py index e685ab9f..a173dec5 100644 --- a/cylc/uiserver/workflows_mgr.py +++ b/cylc/uiserver/workflows_mgr.py @@ -144,8 +144,6 @@ async def _workflow_state_changes(self): The scan data (i.e. the contents of the contact file). """ - owner = getuser() - active_before = set(self.active) inactive_before = set(self.inactive) @@ -153,7 +151,8 @@ async def _workflow_state_changes(self): inactive = set() async for flow in self._scan_pipe: - wid = f'{owner}{ID_DELIM}{flow["name"]}' + flow['owner'] = self.owner + wid = f'{flow["owner"]}{ID_DELIM}{flow["name"]}' flow['id'] = wid if not flow.get('contact'): @@ -197,7 +196,7 @@ async def _register(self, wid, flow): """Register a new workflow with the data store.""" print(f'_register({wid})') await self.uiserver.data_store_mgr.register_workflow( - wid, flow['name'], self.owner + wid, flow['name'], flow['owner'] ) async def _connect(self, wid, flow): @@ -207,9 +206,7 @@ async def _connect(self, wid, flow): flow['req_client'] = SuiteRuntimeClient(flow['name']) await self.uiserver.data_store_mgr.sync_workflow( wid, - flow['name'], - flow[CFF.HOST], - flow[CFF.PUBLISH_PORT] + flow ) async def _disconnect(self, wid):