From 60b2da5b5630785ec6739c176c2fff47d74e2c97 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Mon, 16 May 2022 15:37:23 +0100 Subject: [PATCH] data-store: log workflow events to info level * To assist with debugging the UIS under normal operation bump the logging of workflow events (registered, connected, disconnected, etc events) from debug to info levels. * Centralised workflow event logging, included more argument info & gave all events a prefix to help us to `grep` them out when needed. --- cylc/uiserver/data_store_mgr.py | 29 ++++++++++++++++------ cylc/uiserver/tests/test_data_store_mgr.py | 4 +-- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/cylc/uiserver/data_store_mgr.py b/cylc/uiserver/data_store_mgr.py index 428ff9de..78d1eee8 100644 --- a/cylc/uiserver/data_store_mgr.py +++ b/cylc/uiserver/data_store_mgr.py @@ -54,9 +54,25 @@ ) from cylc.flow.workflow_status import WorkflowStatus +from .utils import fmt_call from .workflows_mgr import workflow_request +def log_call(fcn): + """Decorator for data store methods we want to log.""" + fcn_name = f'[data-store] {fcn.__name__}' + + def _inner(*args, **kwargs): # works for serial & async calls + nonlocal fcn + self = args[0] + # log this method call + self.log.info(fmt_call(fcn_name, args[1:], kwargs)) + # then do it + return fcn(*args, **kwargs) + + return _inner + + class DataStoreMgr: """Manage the local data-store acquisition/updates for all workflows.""" @@ -75,13 +91,13 @@ def __init__(self, workflows_mgr, log): self.executors = {} self.delta_queues = {} + @log_call async def register_workflow(self, w_id: str, is_active: bool) -> None: """Register a new workflow with the data store. Call this when a new workflow is discovered on the file system (e.g. installed). """ - self.log.debug(f'register_workflow({w_id})') self.delta_queues[w_id] = {} # create new entry in the data store @@ -95,12 +111,12 @@ async def register_workflow(self, w_id: str, is_active: bool) -> None: status_msg=self._get_status_msg(w_id, is_active), ) + @log_call async def unregister_workflow(self, w_id): """Remove a workflow from the data store entirely. Call this when a workflow is deleted. """ - self.log.debug(f'unregister_workflow({w_id})') if w_id in self.data: self._update_contact(w_id, pruned=True) while any( @@ -110,6 +126,7 @@ async def unregister_workflow(self, w_id): await asyncio.sleep(self.PENDING_DELTA_CHECK_INTERVAL) self._purge_workflow(w_id) + @log_call async def connect_workflow(self, w_id, contact_data): """Initiate workflow subscriptions. @@ -120,7 +137,6 @@ async def connect_workflow(self, w_id, contact_data): blocking the main loop. """ - self.log.debug(f'connect_workflow({w_id})') if self.loop is None: self.loop = asyncio.get_running_loop() @@ -147,19 +163,19 @@ async def connect_workflow(self, w_id, contact_data): if w_id not in successful_updates: # something went wrong, undo any changes to allow for subsequent # connection attempts - self.log.debug(f'failed to connect to {w_id}') + self.log.info(f'failed to connect to {w_id}') self.disconnect_workflow(w_id) return False else: # don't update the contact data until we have successfully updated self._update_contact(w_id, contact_data) + @log_call def disconnect_workflow(self, w_id): """Terminate workflow subscriptions. Call this when a workflow has stopped. """ - self.log.debug(f'disconnect_workflow({w_id})') self._update_contact( w_id, status=WorkflowStatus.STOPPED.value, @@ -190,9 +206,9 @@ def get_workflows(self): active.add(w_id) return active, inactive + @log_call def _purge_workflow(self, w_id): """Purge the manager of a workflow's subscription and data.""" - self.log.debug(f'delete_workflow({w_id})') self.disconnect_workflow(w_id) if w_id in self.data: del self.data[w_id] @@ -241,7 +257,6 @@ def _update_workflow_data(self, topic, delta, w_id): loop_cnt += 1 continue if topic == 'shutdown': - self.log.debug(f'shutdown({w_id})') self._delta_store_to_queues(w_id, topic, delta) # update the status to stopped and set the status message self._update_contact( diff --git a/cylc/uiserver/tests/test_data_store_mgr.py b/cylc/uiserver/tests/test_data_store_mgr.py index bb52ae3d..ff02ac4a 100644 --- a/cylc/uiserver/tests/test_data_store_mgr.py +++ b/cylc/uiserver/tests/test_data_store_mgr.py @@ -252,9 +252,9 @@ async def test_workflow_connect_fail( # the connection should fail because our ZMQ socket is not a # WorkflowRuntimeServer with the correct endpoints and auth assert [record.message for record in caplog.records] == [ - 'connect_workflow(~user/workflow_id)', + "[data-store] connect_workflow('~user/workflow_id', )", 'failed to connect to ~user/workflow_id', - 'disconnect_workflow(~user/workflow_id)', + "[data-store] disconnect_workflow('~user/workflow_id')", ] finally: # tidy up