Skip to content

Commit

Permalink
data-store: log workflow events to info level
Browse files Browse the repository at this point in the history
* To assist with debugging the UIS under normal operation bump the
  logging of workflow events (registered, connected, disconnected, etc events)
  from debug to info levels.
* Centralised workflow event logging, included more argument info & gave
  all events a prefix to help us to `grep` them out when needed.
  • Loading branch information
oliver-sanders committed May 16, 2022
1 parent 0692266 commit 60b2da5
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 9 deletions.
29 changes: 22 additions & 7 deletions cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,25 @@
)
from cylc.flow.workflow_status import WorkflowStatus

from .utils import fmt_call
from .workflows_mgr import workflow_request


def log_call(fcn):
"""Decorator for data store methods we want to log."""
fcn_name = f'[data-store] {fcn.__name__}'

def _inner(*args, **kwargs): # works for serial & async calls
nonlocal fcn
self = args[0]
# log this method call
self.log.info(fmt_call(fcn_name, args[1:], kwargs))
# then do it
return fcn(*args, **kwargs)

return _inner


class DataStoreMgr:
"""Manage the local data-store acquisition/updates for all workflows."""

Expand All @@ -75,13 +91,13 @@ def __init__(self, workflows_mgr, log):
self.executors = {}
self.delta_queues = {}

@log_call
async def register_workflow(self, w_id: str, is_active: bool) -> None:
"""Register a new workflow with the data store.
Call this when a new workflow is discovered on the file system
(e.g. installed).
"""
self.log.debug(f'register_workflow({w_id})')
self.delta_queues[w_id] = {}

# create new entry in the data store
Expand All @@ -95,12 +111,12 @@ async def register_workflow(self, w_id: str, is_active: bool) -> None:
status_msg=self._get_status_msg(w_id, is_active),
)

@log_call
async def unregister_workflow(self, w_id):
"""Remove a workflow from the data store entirely.
Call this when a workflow is deleted.
"""
self.log.debug(f'unregister_workflow({w_id})')
if w_id in self.data:
self._update_contact(w_id, pruned=True)
while any(
Expand All @@ -110,6 +126,7 @@ async def unregister_workflow(self, w_id):
await asyncio.sleep(self.PENDING_DELTA_CHECK_INTERVAL)
self._purge_workflow(w_id)

@log_call
async def connect_workflow(self, w_id, contact_data):
"""Initiate workflow subscriptions.
Expand All @@ -120,7 +137,6 @@ async def connect_workflow(self, w_id, contact_data):
blocking the main loop.
"""
self.log.debug(f'connect_workflow({w_id})')
if self.loop is None:
self.loop = asyncio.get_running_loop()

Expand All @@ -147,19 +163,19 @@ async def connect_workflow(self, w_id, contact_data):
if w_id not in successful_updates:
# something went wrong, undo any changes to allow for subsequent
# connection attempts
self.log.debug(f'failed to connect to {w_id}')
self.log.info(f'failed to connect to {w_id}')
self.disconnect_workflow(w_id)
return False
else:
# don't update the contact data until we have successfully updated
self._update_contact(w_id, contact_data)

@log_call
def disconnect_workflow(self, w_id):
"""Terminate workflow subscriptions.
Call this when a workflow has stopped.
"""
self.log.debug(f'disconnect_workflow({w_id})')
self._update_contact(
w_id,
status=WorkflowStatus.STOPPED.value,
Expand Down Expand Up @@ -190,9 +206,9 @@ def get_workflows(self):
active.add(w_id)
return active, inactive

@log_call
def _purge_workflow(self, w_id):
"""Purge the manager of a workflow's subscription and data."""
self.log.debug(f'delete_workflow({w_id})')
self.disconnect_workflow(w_id)
if w_id in self.data:
del self.data[w_id]
Expand Down Expand Up @@ -241,7 +257,6 @@ def _update_workflow_data(self, topic, delta, w_id):
loop_cnt += 1
continue
if topic == 'shutdown':
self.log.debug(f'shutdown({w_id})')
self._delta_store_to_queues(w_id, topic, delta)
# update the status to stopped and set the status message
self._update_contact(
Expand Down
4 changes: 2 additions & 2 deletions cylc/uiserver/tests/test_data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ async def test_workflow_connect_fail(
# the connection should fail because our ZMQ socket is not a
# WorkflowRuntimeServer with the correct endpoints and auth
assert [record.message for record in caplog.records] == [
'connect_workflow(~user/workflow_id)',
"[data-store] connect_workflow('~user/workflow_id', <dict>)",
'failed to connect to ~user/workflow_id',
'disconnect_workflow(~user/workflow_id)',
"[data-store] disconnect_workflow('~user/workflow_id')",
]
finally:
# tidy up
Expand Down

0 comments on commit 60b2da5

Please sign in to comment.