diff --git a/cylc/flow/network/authorisation.py b/cylc/flow/network/authorisation.py index 3d4b4b918b5..8440be88d1a 100644 --- a/cylc/flow/network/authorisation.py +++ b/cylc/flow/network/authorisation.py @@ -17,8 +17,6 @@ from functools import wraps -from cylc.flow import LOG - def authorise(): """Add authorisation to an endpoint. @@ -40,17 +38,7 @@ def wrapper(fcn): def _call(self, *args, user='?', meta=None, **kwargs): if not meta: meta = {} - host = meta.get('host', '?') - prog = meta.get('prog', '?') - comms_method = meta.get('comms_method', '?') - - # Hardcoded, for new - but much of this functionality can be - # removed more swingingly. - LOG.debug( - '[client-command] %s %s://%s@%s:%s', - fcn.__name__, comms_method, user, host, prog - ) - return fcn(self, *args, **kwargs) + return fcn(self, *args, meta=meta, **kwargs) return _call return wrapper diff --git a/cylc/flow/network/client.py b/cylc/flow/network/client.py index 5ac2f309e8f..fde6c8bd620 100644 --- a/cylc/flow/network/client.py +++ b/cylc/flow/network/client.py @@ -159,7 +159,13 @@ def _socket_options(self): self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN) - async def async_request(self, command, args=None, timeout=None): + async def async_request( + self, + command, + args=None, + timeout=None, + req_meta=None + ): """Send an asynchronous request using asyncio. Has the same arguments and return values as ``serial_request``. @@ -177,6 +183,9 @@ async def async_request(self, command, args=None, timeout=None): # send message msg = {'command': command, 'args': args} msg.update(self.header) + # add the request metadata + if req_meta: + msg['meta'].update(req_meta) LOG.debug('zmq:send %s', msg) message = encode_(msg) self.socket.send_string(message) @@ -205,7 +214,13 @@ async def async_request(self, command, args=None, timeout=None): error = response['error'] raise ClientError(error['message'], error.get('traceback')) - def serial_request(self, command, args=None, timeout=None): + def serial_request( + self, + command, + args=None, + timeout=None, + req_meta=None + ): """Send a request. For convenience use ``__call__`` to call this method. @@ -225,7 +240,7 @@ def serial_request(self, command, args=None, timeout=None): """ task = self.loop.create_task( - self.async_request(command, args, timeout)) + self.async_request(command, args, timeout, req_meta)) self.loop.run_until_complete(task) return task.result() diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index 69d08225a14..08423595250 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -34,6 +34,7 @@ from uuid import uuid4 from graphene.utils.str_converters import to_snake_case +from cylc.flow import LOG from cylc.flow.data_store_mgr import ( EDGES, FAMILY_PROXIES, TASK_PROXIES, WORKFLOW, @@ -592,7 +593,7 @@ def __init__(self, data: 'DataStoreMgr', schd: 'Scheduler') -> None: # Mutations async def mutator(self, *m_args): """Mutate workflow.""" - _, command, w_args, args = m_args + _, command, w_args, args, meta = m_args w_ids = [flow[WORKFLOW].id for flow in await self.get_workflows_data(w_args)] if not w_ids: @@ -600,52 +601,13 @@ async def mutator(self, *m_args): return [{ 'response': (False, f'No matching workflow in {workflows}')}] w_id = w_ids[0] - result = await self._mutation_mapper(command, args) - if result is None: - result = (True, 'Command queued') - return [{'id': w_id, 'response': result}] - - async def nodes_mutator(self, *m_args): - """Mutate node items of associated workflows.""" - _, command, tokens_list, w_args, args = m_args - w_ids = [ - workflow[WORKFLOW].id - for workflow in await self.get_workflows_data(w_args) - ] - if not w_ids: - workflows = list(self.data_store_mgr.data.keys()) - return [{ - 'response': (False, f'No matching workflow in {workflows}')}] - w_id = w_ids[0] - # match proxy ID args with workflows - items = [] - for tokens in tokens_list: - owner = tokens.get('user') - workflow = tokens.get('workflow') - if workflow and owner is None: - owner = "*" - if ( - not (owner and workflow) - or fnmatchcase( - w_id, - tokens.workflow_id, - ) - ): - items.append( - tokens.relative_id - ) - if items: - if command == 'put_messages': - args['task_job'] = items[0] - else: - args['tasks'] = items - result = await self._mutation_mapper(command, args) + result = await self._mutation_mapper(command, args, meta) if result is None: result = (True, 'Command queued') return [{'id': w_id, 'response': result}] async def _mutation_mapper( - self, command: str, kwargs: Dict[str, Any] + self, command: str, kwargs: Dict[str, Any], meta: Dict[str, Any] ) -> Optional[Tuple[bool, str]]: """Map between GraphQL resolvers and internal command interface.""" method = getattr(self, command, None) @@ -655,7 +617,16 @@ async def _mutation_mapper( self.schd.get_command_method(command) except AttributeError: raise ValueError(f"Command '{command}' not found") - self.schd.command_queue.put((command, tuple(kwargs.values()), {})) + if command != "put_messages": + log_msg = f"[command] {command}" + user = meta.get('auth_user', self.schd.owner) + if user != self.schd.owner: + log_msg += (f" (issued by {user})") + LOG.info(log_msg) + self.schd.queue_command( + command, + kwargs + ) return None def broadcast( @@ -678,7 +649,11 @@ def broadcast( cutoff) raise ValueError('Unsupported broadcast mode') - def put_ext_trigger(self, message, id): # noqa: A002 (graphql interface) + def put_ext_trigger( + self, + message, + id # noqa: A002 (graphql interface) + ): """Server-side external event trigger interface. Args: @@ -697,7 +672,12 @@ def put_ext_trigger(self, message, id): # noqa: A002 (graphql interface) self.schd.ext_trigger_queue.put((message, id)) return (True, 'Event queued') - def put_messages(self, task_job=None, event_time=None, messages=None): + def put_messages( + self, + task_job=None, + event_time=None, + messages=None + ): """Put task messages in queue for processing later by the main loop. Arguments: @@ -768,7 +748,7 @@ def force_spawn_children( { "outputs": outputs, "flow_num": flow_num - } + }, ) ) return (True, 'Command queued') @@ -779,7 +759,7 @@ def stop( cycle_point: Optional[str] = None, clock_time: Optional[str] = None, task: Optional[str] = None, - flow_num: Optional[int] = None + flow_num: Optional[int] = None, ) -> Tuple[bool, str]: """Stop the workflow or specific flow from spawning any further. @@ -805,11 +785,17 @@ def stop( 'clock_time': clock_time, 'task': task, 'flow_num': flow_num, - }) - )) + }), + ) + ) return (True, 'Command queued') - def force_trigger_tasks(self, tasks=None, reflow=False, flow_descr=None): + def force_trigger_tasks( + self, + tasks=None, + reflow=False, + flow_descr=None, + ): """Trigger submission of task jobs where possible. Args: @@ -831,11 +817,12 @@ def force_trigger_tasks(self, tasks=None, reflow=False, flow_descr=None): """ self.schd.command_queue.put( ( - "force_trigger_tasks", (tasks or [],), + "force_trigger_tasks", + (tasks or [],), { "reflow": reflow, "flow_descr": flow_descr } - ) + ), ) return (True, 'Command queued') diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 9ea498e759f..32e90f4e012 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -1218,38 +1218,8 @@ async def mutator(root, info, command=None, workflows=None, args.update(args.get('args', {})) args.pop('args') resolvers = info.context.get('resolvers') - res = await resolvers.mutator(info, command, w_args, args) - return GenericResponse(result=res) - - -async def nodes_mutator(root, info, command, ids, workflows=None, - exworkflows=None, **args): - """Call the resolver method, dealing with multiple node id arguments, - which acts on the workflow service via the internal command queue.""" - tokens_list = [Tokens(n_id, relative=True) for n_id in ids] - # if the workflows arg is empty extract from proxy args - if workflows is None: - workflows = set() - for tokens in tokens_list: - workflows.add(tokens.workflow_id) - if not workflows: - return GenericResponse(result="Error: No given Workflow(s)") - if exworkflows is None: - exworkflows = [] - w_args = {} - w_args['workflows'] = [Tokens(w_id) for w_id in workflows] - w_args['exworkflows'] = [Tokens(w_id) for w_id in exworkflows] - if args.get('args', False): - args.update(args.get('args', {})) - args.pop('args') - resolvers = info.context.get('resolvers') - res = await resolvers.nodes_mutator( - info, - command, - tokens_list, - w_args, - args - ) + meta = info.context.get('meta') + res = await resolvers.mutator(info, command, w_args, args, meta) return GenericResponse(result=res) # Input types: diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py index a5502fdaeb9..2347b2548bb 100644 --- a/cylc/flow/network/server.py +++ b/cylc/flow/network/server.py @@ -264,7 +264,7 @@ def register_endpoints(self): @authorise() @expose - def api(self, endpoint=None): + def api(self, endpoint=None, meta=None): """Return information about this API. Returns a list of callable endpoints. @@ -297,7 +297,7 @@ def api(self, endpoint=None): @authorise() @expose - def graphql(self, request_string=None, variables=None): + def graphql(self, request_string=None, variables=None, meta=None): """Return the GraphQL scheme execution result. Args: @@ -315,6 +315,7 @@ def graphql(self, request_string=None, variables=None): variable_values=variables, context={ 'resolvers': self.resolvers, + 'meta': meta or {}, }, backend=CylcGraphQLBackend(), middleware=list(instantiate_middleware(self.middleware)), @@ -341,7 +342,7 @@ def graphql(self, request_string=None, variables=None): @authorise() @expose def get_graph_raw( - self, start_point_str, stop_point_str, grouping=None + self, start_point_str, stop_point_str, grouping=None, meta=None ): """Return a textual representation of the workflow graph. @@ -386,7 +387,7 @@ def get_graph_raw( # UIServer Data Commands @authorise() @expose - def pb_entire_workflow(self): + def pb_entire_workflow(self, meta=None): """Send the entire data-store in a single Protobuf message. Returns: @@ -399,7 +400,7 @@ def pb_entire_workflow(self): @authorise() @expose - def pb_data_elements(self, element_type): + def pb_data_elements(self, element_type, meta=None): """Send the specified data elements in delta form. Args: diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 7ae2b935aca..40955064771 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -823,6 +823,12 @@ def get_command_method(self, command_name): """Return a command processing method or raise AttributeError.""" return getattr(self, f'command_{command_name}') + def queue_command(self, command, kwargs): + self.command_queue.put(( + command, + tuple(kwargs.values()), {} + )) + def process_command_queue(self) -> None: """Process queued commands.""" qsize = self.command_queue.qsize() @@ -831,7 +837,8 @@ def process_command_queue(self) -> None: LOG.info(f"Processing {qsize} queued command(s)") while True: try: - name, args, kwargs = self.command_queue.get(False) + command = (self.command_queue.get(False)) + name, args, kwargs = command except Empty: break args_string = ', '.join(str(a) for a in args) diff --git a/tests/functional/cylc-message/00-ssh.t b/tests/functional/cylc-message/00-ssh.t index 9980d349a30..935c35837ae 100755 --- a/tests/functional/cylc-message/00-ssh.t +++ b/tests/functional/cylc-message/00-ssh.t @@ -21,7 +21,7 @@ export REQUIRE_PLATFORM='loc:remote comms:ssh' . "$(dirname "$0")/test_header" #------------------------------------------------------------------------------- -set_test_number 3 +set_test_number 2 install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" run_ok "${TEST_NAME_BASE}-validate" \ @@ -29,8 +29,5 @@ run_ok "${TEST_NAME_BASE}-validate" \ workflow_run_ok "${TEST_NAME_BASE}-run" \ cylc play --debug --no-detach --reference-test "${WORKFLOW_NAME}" -grep_ok "\[client-command\] graphql ssh" \ - "$RUN_DIR/${WORKFLOW_NAME}/log/workflow/log" - purge exit diff --git a/tests/integration/test_resolvers.py b/tests/integration/test_resolvers.py index e66924c04a9..2589bd129b1 100644 --- a/tests/integration/test_resolvers.py +++ b/tests/integration/test_resolvers.py @@ -202,33 +202,21 @@ async def test_mutator(mock_flow, flow_args): 'workflow_sel': None }) args = {} + meta = {} response = await mock_flow.resolvers.mutator( None, 'pause', flow_args, - args - ) - assert response[0]['id'] == mock_flow.id - - -async def test_nodes_mutator(mock_flow, flow_args): - """Test the nodes mutation method.""" - flow_args['workflows'].append({ - 'user': mock_flow.owner, - 'workflow': mock_flow.name, - 'workflow_sel': None, - }) - ids = [Tokens(n) for n in mock_flow.node_ids] - response = await mock_flow.resolvers.nodes_mutator( - None, 'force_trigger_tasks', ids, flow_args, - {"reflow": False, "flow_descr": ""} + args, + meta ) assert response[0]['id'] == mock_flow.id async def test_mutation_mapper(mock_flow): """Test the mapping of mutations to internal command methods.""" - response = await mock_flow.resolvers._mutation_mapper('pause', {}) + meta = {} + response = await mock_flow.resolvers._mutation_mapper('pause', {}, meta) assert response is None with pytest.raises(ValueError): - await mock_flow.resolvers._mutation_mapper('non_exist', {}) + await mock_flow.resolvers._mutation_mapper('non_exist', {}, meta)