Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

data-store: log workflow events to info level #348

Merged
merged 1 commit into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,25 @@
)
from cylc.flow.workflow_status import WorkflowStatus

from .utils import fmt_call
from .workflows_mgr import workflow_request


def log_call(fcn):
"""Decorator for data store methods we want to log."""
fcn_name = f'[data-store] {fcn.__name__}'

def _inner(*args, **kwargs): # works for serial & async calls
nonlocal fcn
self = args[0]
# log this method call
self.log.info(fmt_call(fcn_name, args[1:], kwargs))
# then do it
return fcn(*args, **kwargs)

return _inner


class DataStoreMgr:
"""Manage the local data-store acquisition/updates for all workflows."""

Expand All @@ -75,13 +91,13 @@ def __init__(self, workflows_mgr, log):
self.executors = {}
self.delta_queues = {}

@log_call
async def register_workflow(self, w_id: str, is_active: bool) -> None:
"""Register a new workflow with the data store.

Call this when a new workflow is discovered on the file system
(e.g. installed).
"""
self.log.debug(f'register_workflow({w_id})')
self.delta_queues[w_id] = {}

# create new entry in the data store
Expand All @@ -95,12 +111,12 @@ async def register_workflow(self, w_id: str, is_active: bool) -> None:
status_msg=self._get_status_msg(w_id, is_active),
)

@log_call
async def unregister_workflow(self, w_id):
"""Remove a workflow from the data store entirely.

Call this when a workflow is deleted.
"""
self.log.debug(f'unregister_workflow({w_id})')
if w_id in self.data:
self._update_contact(w_id, pruned=True)
while any(
Expand All @@ -110,6 +126,7 @@ async def unregister_workflow(self, w_id):
await asyncio.sleep(self.PENDING_DELTA_CHECK_INTERVAL)
self._purge_workflow(w_id)

@log_call
async def connect_workflow(self, w_id, contact_data):
"""Initiate workflow subscriptions.

Expand All @@ -120,7 +137,6 @@ async def connect_workflow(self, w_id, contact_data):
blocking the main loop.

"""
self.log.debug(f'connect_workflow({w_id})')
if self.loop is None:
self.loop = asyncio.get_running_loop()

Expand All @@ -147,19 +163,19 @@ async def connect_workflow(self, w_id, contact_data):
if w_id not in successful_updates:
# something went wrong, undo any changes to allow for subsequent
# connection attempts
self.log.debug(f'failed to connect to {w_id}')
self.log.info(f'failed to connect to {w_id}')
self.disconnect_workflow(w_id)
return False
else:
# don't update the contact data until we have successfully updated
self._update_contact(w_id, contact_data)

@log_call
def disconnect_workflow(self, w_id):
"""Terminate workflow subscriptions.

Call this when a workflow has stopped.
"""
self.log.debug(f'disconnect_workflow({w_id})')
self._update_contact(
w_id,
status=WorkflowStatus.STOPPED.value,
Expand Down Expand Up @@ -190,9 +206,9 @@ def get_workflows(self):
active.add(w_id)
return active, inactive

@log_call
def _purge_workflow(self, w_id):
"""Purge the manager of a workflow's subscription and data."""
self.log.debug(f'delete_workflow({w_id})')
self.disconnect_workflow(w_id)
if w_id in self.data:
del self.data[w_id]
Expand Down Expand Up @@ -241,7 +257,6 @@ def _update_workflow_data(self, topic, delta, w_id):
loop_cnt += 1
continue
if topic == 'shutdown':
self.log.debug(f'shutdown({w_id})')
self._delta_store_to_queues(w_id, topic, delta)
# update the status to stopped and set the status message
self._update_contact(
Expand Down
4 changes: 2 additions & 2 deletions cylc/uiserver/tests/test_data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ async def test_workflow_connect_fail(
# the connection should fail because our ZMQ socket is not a
# WorkflowRuntimeServer with the correct endpoints and auth
assert [record.message for record in caplog.records] == [
'connect_workflow(~user/workflow_id)',
"[data-store] connect_workflow('~user/workflow_id', <dict>)",
'failed to connect to ~user/workflow_id',
'disconnect_workflow(~user/workflow_id)',
"[data-store] disconnect_workflow('~user/workflow_id')",
]
finally:
# tidy up
Expand Down
49 changes: 49 additions & 0 deletions cylc/uiserver/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.


def _repr(value):
if isinstance(value, dict):
return '<dict>'
if isinstance(value, set):
return '<set>'
return repr(value)


def fmt_call(name, args, kwargs):
"""Format a Python function call.

Examples:
It formats calls at they would appear in Python code:
>>> fmt_call('foo', (1,), {'x': True})
'foo(1, x=True)'

It handles different data types:
>>> fmt_call('foo', ('str', 42, True, None), {})
"foo('str', 42, True, None)"

It puts in placeholders for dicts and sets (too long for log output):
>>> fmt_call('foo', tuple(), {'a': {'x': 1}, 'b': {'y',}})
'foo(a=<dict>, b=<set>)'

"""
return f'{name}(' + ', '.join(
[
_repr(arg) for arg in args
] + [
f'{key}={_repr(value)}'
for key, value in kwargs.items()
]
) + ')'