Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log Authenticated User from UI-Server in Workflow Log #4522

Merged
merged 11 commits into from
Jan 25, 2022
14 changes: 1 addition & 13 deletions cylc/flow/network/authorisation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

from functools import wraps

from cylc.flow import LOG


def authorise():
"""Add authorisation to an endpoint.
Expand All @@ -40,17 +38,7 @@ def wrapper(fcn):
def _call(self, *args, user='?', meta=None, **kwargs):
if not meta:
meta = {}
host = meta.get('host', '?')
prog = meta.get('prog', '?')
comms_method = meta.get('comms_method', '?')

# Hardcoded, for new - but much of this functionality can be
# removed more swingingly.
LOG.debug(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oliver-sanders, just thinking about removing this line. It does remove the comms_method which may be useful for debugging in future? Want me to put any logging information about the comms method to replace this, or are you happy with the info loss from the removal?

Copy link
Member

@oliver-sanders oliver-sanders Jan 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can pass the comms_method down with the metadata and log it with the command.

'[client-command] %s %s://%s@%s:%s',
fcn.__name__, comms_method, user, host, prog
)
return fcn(self, *args, **kwargs)
return fcn(self, *args, meta=meta, **kwargs)

return _call
return wrapper
21 changes: 18 additions & 3 deletions cylc/flow/network/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,13 @@ def _socket_options(self):
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)

async def async_request(self, command, args=None, timeout=None):
async def async_request(
self,
command,
args=None,
timeout=None,
req_meta=None
):
"""Send an asynchronous request using asyncio.

Has the same arguments and return values as ``serial_request``.
Expand All @@ -177,6 +183,9 @@ async def async_request(self, command, args=None, timeout=None):
# send message
msg = {'command': command, 'args': args}
msg.update(self.header)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can delay this a couple of lines

Here:

        msg.update(self.header)
        msg['meta']['auth_user'] = auth_user

# add the request metadata
if req_meta:
msg['meta'].update(req_meta)
LOG.debug('zmq:send %s', msg)
message = encode_(msg)
self.socket.send_string(message)
Expand Down Expand Up @@ -205,7 +214,13 @@ async def async_request(self, command, args=None, timeout=None):
error = response['error']
raise ClientError(error['message'], error.get('traceback'))

def serial_request(self, command, args=None, timeout=None):
def serial_request(
self,
command,
args=None,
timeout=None,
req_meta=None
):
"""Send a request.

For convenience use ``__call__`` to call this method.
Expand All @@ -225,7 +240,7 @@ def serial_request(self, command, args=None, timeout=None):

"""
task = self.loop.create_task(
self.async_request(command, args, timeout))
self.async_request(command, args, timeout, req_meta))
self.loop.run_until_complete(task)
oliver-sanders marked this conversation as resolved.
Show resolved Hide resolved
return task.result()

Expand Down
91 changes: 39 additions & 52 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from uuid import uuid4

from graphene.utils.str_converters import to_snake_case
from cylc.flow import LOG

from cylc.flow.data_store_mgr import (
EDGES, FAMILY_PROXIES, TASK_PROXIES, WORKFLOW,
Expand Down Expand Up @@ -592,60 +593,21 @@ def __init__(self, data: 'DataStoreMgr', schd: 'Scheduler') -> None:
# Mutations
async def mutator(self, *m_args):
"""Mutate workflow."""
_, command, w_args, args = m_args
_, command, w_args, args, meta = m_args
w_ids = [flow[WORKFLOW].id
for flow in await self.get_workflows_data(w_args)]
if not w_ids:
workflows = list(self.data_store_mgr.data.keys())
return [{
'response': (False, f'No matching workflow in {workflows}')}]
w_id = w_ids[0]
result = await self._mutation_mapper(command, args)
if result is None:
result = (True, 'Command queued')
return [{'id': w_id, 'response': result}]

async def nodes_mutator(self, *m_args):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is nodes_mutator being removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was picked up when adding the unpacking of the meta and was discussed with @oliver-sanders, who looked at codecov for verification. It is seemingly dead code and was asked to remove it. Pinged @dwsutherland on the sibling to verify it is indeed unused code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was in use, until:
#3469
specifically:
https://github.com/cylc/cylc-flow/pull/3469/files#diff-97f4d62be41218e2d70a203edfca6d44d7c0046b6908cefa8e3af7c02cd31920

And the idea behind it was that mutations being multi-workflow could be given node ids from different workflows, and then these ids would be parsed and mutations sent to the appropriate workflow..

The fact that we moved away from this, means we are happy with not using this functionality... i.e. a node mutation will always be associated with only one workflow and/or the workflow parsing happens somewhere else.

"""Mutate node items of associated workflows."""
_, command, tokens_list, w_args, args = m_args
w_ids = [
workflow[WORKFLOW].id
for workflow in await self.get_workflows_data(w_args)
]
if not w_ids:
workflows = list(self.data_store_mgr.data.keys())
return [{
'response': (False, f'No matching workflow in {workflows}')}]
w_id = w_ids[0]
# match proxy ID args with workflows
items = []
for tokens in tokens_list:
owner = tokens.get('user')
workflow = tokens.get('workflow')
if workflow and owner is None:
owner = "*"
if (
not (owner and workflow)
or fnmatchcase(
w_id,
tokens.workflow_id,
)
):
items.append(
tokens.relative_id
)
if items:
if command == 'put_messages':
args['task_job'] = items[0]
else:
args['tasks'] = items
result = await self._mutation_mapper(command, args)
result = await self._mutation_mapper(command, args, meta)
if result is None:
result = (True, 'Command queued')
return [{'id': w_id, 'response': result}]

async def _mutation_mapper(
self, command: str, kwargs: Dict[str, Any]
self, command: str, kwargs: Dict[str, Any], meta: Dict[str, Any]
) -> Optional[Tuple[bool, str]]:
"""Map between GraphQL resolvers and internal command interface."""
method = getattr(self, command, None)
oliver-sanders marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -655,7 +617,16 @@ async def _mutation_mapper(
self.schd.get_command_method(command)
except AttributeError:
raise ValueError(f"Command '{command}' not found")
self.schd.command_queue.put((command, tuple(kwargs.values()), {}))
if command != "put_messages":
log_msg = f"[command] {command} "
user = meta.get('auth_user', self.schd.owner)
if user != self.schd.owner:
log_msg += (f"(issued by {user})")
datamel marked this conversation as resolved.
Show resolved Hide resolved
LOG.info(log_msg)
self.schd.queue_command(
command,
kwargs
)
return None

def broadcast(
Expand All @@ -678,7 +649,11 @@ def broadcast(
cutoff)
raise ValueError('Unsupported broadcast mode')

def put_ext_trigger(self, message, id): # noqa: A002 (graphql interface)
def put_ext_trigger(
self,
message,
id # noqa: A002 (graphql interface)
):
"""Server-side external event trigger interface.

Args:
Expand All @@ -697,7 +672,12 @@ def put_ext_trigger(self, message, id): # noqa: A002 (graphql interface)
self.schd.ext_trigger_queue.put((message, id))
return (True, 'Event queued')

def put_messages(self, task_job=None, event_time=None, messages=None):
def put_messages(
self,
task_job=None,
event_time=None,
messages=None
):
"""Put task messages in queue for processing later by the main loop.

Arguments:
Expand Down Expand Up @@ -768,7 +748,7 @@ def force_spawn_children(
{
"outputs": outputs,
"flow_num": flow_num
}
},
)
)
return (True, 'Command queued')
Expand All @@ -779,7 +759,7 @@ def stop(
cycle_point: Optional[str] = None,
clock_time: Optional[str] = None,
task: Optional[str] = None,
flow_num: Optional[int] = None
flow_num: Optional[int] = None,
) -> Tuple[bool, str]:
"""Stop the workflow or specific flow from spawning any further.

Expand All @@ -805,11 +785,17 @@ def stop(
'clock_time': clock_time,
'task': task,
'flow_num': flow_num,
})
))
}),
)
)
return (True, 'Command queued')

def force_trigger_tasks(self, tasks=None, reflow=False, flow_descr=None):
def force_trigger_tasks(
self,
tasks=None,
reflow=False,
flow_descr=None,
):
"""Trigger submission of task jobs where possible.

Args:
Expand All @@ -831,11 +817,12 @@ def force_trigger_tasks(self, tasks=None, reflow=False, flow_descr=None):
"""
self.schd.command_queue.put(
(
"force_trigger_tasks", (tasks or [],),
"force_trigger_tasks",
(tasks or [],),
{
"reflow": reflow,
"flow_descr": flow_descr
}
)
),
)
return (True, 'Command queued')
34 changes: 2 additions & 32 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1218,38 +1218,8 @@ async def mutator(root, info, command=None, workflows=None,
args.update(args.get('args', {}))
args.pop('args')
resolvers = info.context.get('resolvers')
res = await resolvers.mutator(info, command, w_args, args)
return GenericResponse(result=res)


async def nodes_mutator(root, info, command, ids, workflows=None,
exworkflows=None, **args):
"""Call the resolver method, dealing with multiple node id arguments,
which acts on the workflow service via the internal command queue."""
tokens_list = [Tokens(n_id, relative=True) for n_id in ids]
# if the workflows arg is empty extract from proxy args
if workflows is None:
workflows = set()
for tokens in tokens_list:
workflows.add(tokens.workflow_id)
if not workflows:
return GenericResponse(result="Error: No given Workflow(s)")
if exworkflows is None:
exworkflows = []
w_args = {}
w_args['workflows'] = [Tokens(w_id) for w_id in workflows]
w_args['exworkflows'] = [Tokens(w_id) for w_id in exworkflows]
if args.get('args', False):
args.update(args.get('args', {}))
args.pop('args')
resolvers = info.context.get('resolvers')
res = await resolvers.nodes_mutator(
info,
command,
tokens_list,
w_args,
args
)
meta = info.context.get('meta')
res = await resolvers.mutator(info, command, w_args, args, meta)
return GenericResponse(result=res)

# Input types:
Expand Down
11 changes: 6 additions & 5 deletions cylc/flow/network/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def register_endpoints(self):

@authorise()
@expose
def api(self, endpoint=None):
def api(self, endpoint=None, meta=None):
"""Return information about this API.

Returns a list of callable endpoints.
Expand Down Expand Up @@ -297,7 +297,7 @@ def api(self, endpoint=None):

@authorise()
@expose
def graphql(self, request_string=None, variables=None):
def graphql(self, request_string=None, variables=None, meta=None):
"""Return the GraphQL scheme execution result.

Args:
Expand All @@ -315,6 +315,7 @@ def graphql(self, request_string=None, variables=None):
variable_values=variables,
context={
'resolvers': self.resolvers,
'meta': meta or {},
},
backend=CylcGraphQLBackend(),
middleware=list(instantiate_middleware(self.middleware)),
Expand All @@ -341,7 +342,7 @@ def graphql(self, request_string=None, variables=None):
@authorise()
@expose
def get_graph_raw(
self, start_point_str, stop_point_str, grouping=None
self, start_point_str, stop_point_str, grouping=None, meta=None
):
"""Return a textual representation of the workflow graph.

Expand Down Expand Up @@ -386,7 +387,7 @@ def get_graph_raw(
# UIServer Data Commands
@authorise()
@expose
def pb_entire_workflow(self):
def pb_entire_workflow(self, meta=None):
"""Send the entire data-store in a single Protobuf message.

Returns:
Expand All @@ -399,7 +400,7 @@ def pb_entire_workflow(self):

@authorise()
@expose
def pb_data_elements(self, element_type):
def pb_data_elements(self, element_type, meta=None):
"""Send the specified data elements in delta form.

Args:
Expand Down
9 changes: 8 additions & 1 deletion cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,12 @@ def get_command_method(self, command_name):
"""Return a command processing method or raise AttributeError."""
return getattr(self, f'command_{command_name}')

def queue_command(self, command, kwargs):
self.command_queue.put((
command,
tuple(kwargs.values()), {}
))

def process_command_queue(self) -> None:
"""Process queued commands."""
qsize = self.command_queue.qsize()
Expand All @@ -831,7 +837,8 @@ def process_command_queue(self) -> None:
LOG.info(f"Processing {qsize} queued command(s)")
while True:
try:
name, args, kwargs = self.command_queue.get(False)
command = (self.command_queue.get(False))
name, args, kwargs = command
except Empty:
break
args_string = ', '.join(str(a) for a in args)
Expand Down
5 changes: 1 addition & 4 deletions tests/functional/cylc-message/00-ssh.t
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@
export REQUIRE_PLATFORM='loc:remote comms:ssh'
. "$(dirname "$0")/test_header"
#-------------------------------------------------------------------------------
set_test_number 3
set_test_number 2
install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"

run_ok "${TEST_NAME_BASE}-validate" \
cylc validate "${WORKFLOW_NAME}"
workflow_run_ok "${TEST_NAME_BASE}-run" \
cylc play --debug --no-detach --reference-test "${WORKFLOW_NAME}"

grep_ok "\[client-command\] graphql ssh" \
"$RUN_DIR/${WORKFLOW_NAME}/log/workflow/log"

Comment on lines -32 to -34
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a safety test to ensure that the comms method is logged.

Perhaps we could add a Cylc client command into the workflow e.g. cylc hold $CYLC_WORKFLOW_ID//1/elephant which should give us a [command] entry that we can check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, are you happy if I do this in the ssh async request PR as this test is broken anyway?

purge
exit
Loading