From fffb91194b05df68da71cb50781bba0085f84a89 Mon Sep 17 00:00:00 2001 From: Mel Hall <37735232+datamel@users.noreply.github.com> Date: Wed, 13 Oct 2021 10:30:28 +0100 Subject: [PATCH 01/10] Auth user to cylc-flow Log the authenticated user executing the command --- cylc/flow/network/__init__.py | 14 ++++++++++++++ cylc/flow/network/client.py | 35 ++++++++++++++++++++++++++++------- cylc/flow/network/server.py | 22 +++++++++++++++++++++- 3 files changed, 63 insertions(+), 8 deletions(-) diff --git a/cylc/flow/network/__init__.py b/cylc/flow/network/__init__.py index 57e72e11330..ece99a0f837 100644 --- a/cylc/flow/network/__init__.py +++ b/cylc/flow/network/__init__.py @@ -61,6 +61,20 @@ def decode_(message): return msg +def get_authenticated_user(message): + """Returns the authenticated user associated with the message. + + auth_user is sent from ui server in meta data. + """ + return message.get('meta', {}).get('auth_user') + + +def set_authenticated_user(header_message, username): + "Sets the authenticated user associated with the message header" + header_message['meta']['auth_user'] = username + return header_message + + def get_location(workflow: str): """Extract host and port from a workflow's contact file. diff --git a/cylc/flow/network/client.py b/cylc/flow/network/client.py index 5ac2f309e8f..a26ca70f516 100644 --- a/cylc/flow/network/client.py +++ b/cylc/flow/network/client.py @@ -37,6 +37,8 @@ encode_, decode_, get_location, + get_authenticated_user, + set_authenticated_user, ZMQSocketBase ) from cylc.flow.network.client_factory import CommsMeth @@ -78,6 +80,8 @@ class WorkflowRuntimeClient(ZMQSocketBase): If both host and port are provided it is not necessary to load the contact file. + auth_user(str): + The authenticated user who has executed the command. Attributes: host (str): @@ -123,7 +127,8 @@ def __init__( port: int = None, context: object = None, timeout: Union[float, str] = None, - srv_public_key_loc: str = None + srv_public_key_loc: str = None, + auth_user: str = None ): super().__init__(zmq.REQ, context=context) self.workflow = workflow @@ -143,8 +148,7 @@ def __init__( self.poller = None # Connect the ZMQ socket on instantiation self.start(self.host, self.port, srv_public_key_loc) - # gather header info post start - self.header = self.get_header() + self.header = self.get_header(auth_user) def _socket_options(self): """Set socket options after socket instantiation before connect. @@ -159,7 +163,13 @@ def _socket_options(self): self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN) - async def async_request(self, command, args=None, timeout=None): + async def async_request( + self, + command, + args=None, + timeout=None, + auth_user=None + ): """Send an asynchronous request using asyncio. Has the same arguments and return values as ``serial_request``. @@ -175,6 +185,8 @@ async def async_request(self, command, args=None, timeout=None): # there is no need to encrypt messages ourselves before sending. # send message + if auth_user: + set_authenticated_user(self.header, auth_user) msg = {'command': command, 'args': args} msg.update(self.header) LOG.debug('zmq:send %s', msg) @@ -205,7 +217,13 @@ async def async_request(self, command, args=None, timeout=None): error = response['error'] raise ClientError(error['message'], error.get('traceback')) - def serial_request(self, command, args=None, timeout=None): + def serial_request( + self, + command, + args=None, + timeout=None, + auth_user=None + ): """Send a request. For convenience use ``__call__`` to call this method. @@ -229,8 +247,10 @@ def serial_request(self, command, args=None, timeout=None): self.loop.run_until_complete(task) return task.result() - def get_header(self) -> dict: + def get_header(self, auth_user) -> dict: """Return "header" data to attach to each request for traceability. + Args: + auth_user: Authenticated user executing the command. Returns: dict: dictionary with the header information, such as @@ -260,7 +280,8 @@ def get_header(self) -> dict: os.getenv( "CLIENT_COMMS_METH", default=CommsMeth.ZMQ.value - ) + ), + 'auth_user': auth_user, } } diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py index a5502fdaeb9..21980f776b3 100644 --- a/cylc/flow/network/server.py +++ b/cylc/flow/network/server.py @@ -24,7 +24,8 @@ import zmq from cylc.flow import LOG -from cylc.flow.network import encode_, decode_, ZMQSocketBase +from cylc.flow.network import ( + encode_, decode_, ZMQSocketBase, get_authenticated_user) from cylc.flow.network.authorisation import authorise from cylc.flow.network.graphql import ( CylcGraphQLBackend, IgnoreFieldMiddleware, instantiate_middleware @@ -176,6 +177,17 @@ def _bespoke_stop(self): if self.queue is not None: self.queue.put('STOP') + @staticmethod + def parse_request_string(message): + """Returns mutation name from request string for logging""" + try: + req_str = message['args']['request_string'] + + before, _after = req_str.split('(', 1) + return before + except Exception: + return None + def _listener(self): """The server main loop, listen for and serve requests.""" while True: @@ -201,6 +213,14 @@ def _listener(self): # process try: message = decode_(msg) + auth_user = get_authenticated_user(message) + mutation_message = WorkflowRuntimeServer.parse_request_string(message) + if auth_user and mutation_message: + LOG.info( + f"Authenticated user: {auth_user} has submitted " + f"request: {mutation_message}" + ) + except Exception as exc: # purposefully catch generic exception # failed to decode message, possibly resulting from failed # authentication From 304de78a629e6b830f33a2348c8fc331b2f29872 Mon Sep 17 00:00:00 2001 From: Mel Hall <37735232+datamel@users.noreply.github.com> Date: Wed, 17 Nov 2021 10:53:37 +0000 Subject: [PATCH 02/10] flake8 --- cylc/flow/network/__init__.py | 14 ------ cylc/flow/network/authorisation.py | 2 +- cylc/flow/network/client.py | 28 +++++------- cylc/flow/network/resolvers.py | 68 +++++++++++++++++++++-------- cylc/flow/network/schema.py | 7 ++- cylc/flow/network/server.py | 33 +++----------- cylc/flow/scheduler.py | 25 ++++++++++- tests/integration/test_resolvers.py | 12 +++-- 8 files changed, 105 insertions(+), 84 deletions(-) diff --git a/cylc/flow/network/__init__.py b/cylc/flow/network/__init__.py index ece99a0f837..57e72e11330 100644 --- a/cylc/flow/network/__init__.py +++ b/cylc/flow/network/__init__.py @@ -61,20 +61,6 @@ def decode_(message): return msg -def get_authenticated_user(message): - """Returns the authenticated user associated with the message. - - auth_user is sent from ui server in meta data. - """ - return message.get('meta', {}).get('auth_user') - - -def set_authenticated_user(header_message, username): - "Sets the authenticated user associated with the message header" - header_message['meta']['auth_user'] = username - return header_message - - def get_location(workflow: str): """Extract host and port from a workflow's contact file. diff --git a/cylc/flow/network/authorisation.py b/cylc/flow/network/authorisation.py index 3d4b4b918b5..5e6ecb2f8ca 100644 --- a/cylc/flow/network/authorisation.py +++ b/cylc/flow/network/authorisation.py @@ -50,7 +50,7 @@ def _call(self, *args, user='?', meta=None, **kwargs): '[client-command] %s %s://%s@%s:%s', fcn.__name__, comms_method, user, host, prog ) - return fcn(self, *args, **kwargs) + return fcn(self, *args, meta=meta, **kwargs) return _call return wrapper diff --git a/cylc/flow/network/client.py b/cylc/flow/network/client.py index a26ca70f516..fde6c8bd620 100644 --- a/cylc/flow/network/client.py +++ b/cylc/flow/network/client.py @@ -37,8 +37,6 @@ encode_, decode_, get_location, - get_authenticated_user, - set_authenticated_user, ZMQSocketBase ) from cylc.flow.network.client_factory import CommsMeth @@ -80,8 +78,6 @@ class WorkflowRuntimeClient(ZMQSocketBase): If both host and port are provided it is not necessary to load the contact file. - auth_user(str): - The authenticated user who has executed the command. Attributes: host (str): @@ -127,8 +123,7 @@ def __init__( port: int = None, context: object = None, timeout: Union[float, str] = None, - srv_public_key_loc: str = None, - auth_user: str = None + srv_public_key_loc: str = None ): super().__init__(zmq.REQ, context=context) self.workflow = workflow @@ -148,7 +143,8 @@ def __init__( self.poller = None # Connect the ZMQ socket on instantiation self.start(self.host, self.port, srv_public_key_loc) - self.header = self.get_header(auth_user) + # gather header info post start + self.header = self.get_header() def _socket_options(self): """Set socket options after socket instantiation before connect. @@ -168,7 +164,7 @@ async def async_request( command, args=None, timeout=None, - auth_user=None + req_meta=None ): """Send an asynchronous request using asyncio. @@ -185,10 +181,11 @@ async def async_request( # there is no need to encrypt messages ourselves before sending. # send message - if auth_user: - set_authenticated_user(self.header, auth_user) msg = {'command': command, 'args': args} msg.update(self.header) + # add the request metadata + if req_meta: + msg['meta'].update(req_meta) LOG.debug('zmq:send %s', msg) message = encode_(msg) self.socket.send_string(message) @@ -222,7 +219,7 @@ def serial_request( command, args=None, timeout=None, - auth_user=None + req_meta=None ): """Send a request. @@ -243,14 +240,12 @@ def serial_request( """ task = self.loop.create_task( - self.async_request(command, args, timeout)) + self.async_request(command, args, timeout, req_meta)) self.loop.run_until_complete(task) return task.result() - def get_header(self, auth_user) -> dict: + def get_header(self) -> dict: """Return "header" data to attach to each request for traceability. - Args: - auth_user: Authenticated user executing the command. Returns: dict: dictionary with the header information, such as @@ -280,8 +275,7 @@ def get_header(self, auth_user) -> dict: os.getenv( "CLIENT_COMMS_METH", default=CommsMeth.ZMQ.value - ), - 'auth_user': auth_user, + ) } } diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index 69d08225a14..47015cdbe35 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -592,7 +592,7 @@ def __init__(self, data: 'DataStoreMgr', schd: 'Scheduler') -> None: # Mutations async def mutator(self, *m_args): """Mutate workflow.""" - _, command, w_args, args = m_args + _, command, w_args, args, meta = m_args w_ids = [flow[WORKFLOW].id for flow in await self.get_workflows_data(w_args)] if not w_ids: @@ -600,14 +600,14 @@ async def mutator(self, *m_args): return [{ 'response': (False, f'No matching workflow in {workflows}')}] w_id = w_ids[0] - result = await self._mutation_mapper(command, args) + result = await self._mutation_mapper(command, args, meta) if result is None: result = (True, 'Command queued') return [{'id': w_id, 'response': result}] async def nodes_mutator(self, *m_args): """Mutate node items of associated workflows.""" - _, command, tokens_list, w_args, args = m_args + _, command, tokens_list, w_args, args, meta = m_args w_ids = [ workflow[WORKFLOW].id for workflow in await self.get_workflows_data(w_args) @@ -639,23 +639,29 @@ async def nodes_mutator(self, *m_args): args['task_job'] = items[0] else: args['tasks'] = items - result = await self._mutation_mapper(command, args) + result = await self._mutation_mapper(command, args, meta) if result is None: result = (True, 'Command queued') return [{'id': w_id, 'response': result}] async def _mutation_mapper( - self, command: str, kwargs: Dict[str, Any] + self, command: str, kwargs: Dict[str, Any], meta: Dict[str, Any] ) -> Optional[Tuple[bool, str]]: """Map between GraphQL resolvers and internal command interface.""" method = getattr(self, command, None) if method is not None: - return method(**kwargs) + return method(**kwargs, meta=meta) try: self.schd.get_command_method(command) except AttributeError: raise ValueError(f"Command '{command}' not found") - self.schd.command_queue.put((command, tuple(kwargs.values()), {})) + if not meta: + meta = {"auth_user": "unknown user"} + self.schd.queue_command( + command, + kwargs, + meta, + ) return None def broadcast( @@ -664,7 +670,8 @@ def broadcast( cycle_points=None, namespaces=None, settings=None, - cutoff=None + cutoff=None, + meta=None ): """Put or clear broadcasts.""" if mode == 'put_broadcast': @@ -678,7 +685,12 @@ def broadcast( cutoff) raise ValueError('Unsupported broadcast mode') - def put_ext_trigger(self, message, id): # noqa: A002 (graphql interface) + def put_ext_trigger( + self, + message, + id, # noqa: A002 (graphql interface) + meta=None + ): """Server-side external event trigger interface. Args: @@ -697,7 +709,13 @@ def put_ext_trigger(self, message, id): # noqa: A002 (graphql interface) self.schd.ext_trigger_queue.put((message, id)) return (True, 'Event queued') - def put_messages(self, task_job=None, event_time=None, messages=None): + def put_messages( + self, + task_job=None, + event_time=None, + messages=None, + meta=None + ): """Put task messages in queue for processing later by the main loop. Arguments: @@ -750,7 +768,8 @@ def force_spawn_children( self, tasks: Iterable[str], outputs: Optional[Iterable[str]] = None, - flow_num: Optional[int] = None + flow_num: Optional[int] = None, + meta: Optional[Dict[str, Any]] = None ) -> Tuple[bool, str]: """Spawn children of given task outputs. @@ -768,7 +787,8 @@ def force_spawn_children( { "outputs": outputs, "flow_num": flow_num - } + }, + meta ) ) return (True, 'Command queued') @@ -779,7 +799,8 @@ def stop( cycle_point: Optional[str] = None, clock_time: Optional[str] = None, task: Optional[str] = None, - flow_num: Optional[int] = None + flow_num: Optional[int] = None, + meta: Optional[Dict[str, Any]] = None ) -> Tuple[bool, str]: """Stop the workflow or specific flow from spawning any further. @@ -805,11 +826,18 @@ def stop( 'clock_time': clock_time, 'task': task, 'flow_num': flow_num, - }) - )) + }), + meta) + ) return (True, 'Command queued') - def force_trigger_tasks(self, tasks=None, reflow=False, flow_descr=None): + def force_trigger_tasks( + self, + tasks=None, + reflow=False, + flow_descr=None, + meta=None + ): """Trigger submission of task jobs where possible. Args: @@ -831,11 +859,13 @@ def force_trigger_tasks(self, tasks=None, reflow=False, flow_descr=None): """ self.schd.command_queue.put( ( - "force_trigger_tasks", (tasks or [],), + "force_trigger_tasks", + (tasks or [],), { "reflow": reflow, "flow_descr": flow_descr - } - ) + }, + meta + ), ) return (True, 'Command queued') diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 9ea498e759f..290d6d5ba3f 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -1218,7 +1218,8 @@ async def mutator(root, info, command=None, workflows=None, args.update(args.get('args', {})) args.pop('args') resolvers = info.context.get('resolvers') - res = await resolvers.mutator(info, command, w_args, args) + meta = info.context.get('meta') + res = await resolvers.mutator(info, command, w_args, args, meta) return GenericResponse(result=res) @@ -1243,12 +1244,14 @@ async def nodes_mutator(root, info, command, ids, workflows=None, args.update(args.get('args', {})) args.pop('args') resolvers = info.context.get('resolvers') + meta = info.context.get('meta') res = await resolvers.nodes_mutator( info, command, tokens_list, w_args, - args + args, + meta ) return GenericResponse(result=res) diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py index 21980f776b3..2347b2548bb 100644 --- a/cylc/flow/network/server.py +++ b/cylc/flow/network/server.py @@ -24,8 +24,7 @@ import zmq from cylc.flow import LOG -from cylc.flow.network import ( - encode_, decode_, ZMQSocketBase, get_authenticated_user) +from cylc.flow.network import encode_, decode_, ZMQSocketBase from cylc.flow.network.authorisation import authorise from cylc.flow.network.graphql import ( CylcGraphQLBackend, IgnoreFieldMiddleware, instantiate_middleware @@ -177,17 +176,6 @@ def _bespoke_stop(self): if self.queue is not None: self.queue.put('STOP') - @staticmethod - def parse_request_string(message): - """Returns mutation name from request string for logging""" - try: - req_str = message['args']['request_string'] - - before, _after = req_str.split('(', 1) - return before - except Exception: - return None - def _listener(self): """The server main loop, listen for and serve requests.""" while True: @@ -213,14 +201,6 @@ def _listener(self): # process try: message = decode_(msg) - auth_user = get_authenticated_user(message) - mutation_message = WorkflowRuntimeServer.parse_request_string(message) - if auth_user and mutation_message: - LOG.info( - f"Authenticated user: {auth_user} has submitted " - f"request: {mutation_message}" - ) - except Exception as exc: # purposefully catch generic exception # failed to decode message, possibly resulting from failed # authentication @@ -284,7 +264,7 @@ def register_endpoints(self): @authorise() @expose - def api(self, endpoint=None): + def api(self, endpoint=None, meta=None): """Return information about this API. Returns a list of callable endpoints. @@ -317,7 +297,7 @@ def api(self, endpoint=None): @authorise() @expose - def graphql(self, request_string=None, variables=None): + def graphql(self, request_string=None, variables=None, meta=None): """Return the GraphQL scheme execution result. Args: @@ -335,6 +315,7 @@ def graphql(self, request_string=None, variables=None): variable_values=variables, context={ 'resolvers': self.resolvers, + 'meta': meta or {}, }, backend=CylcGraphQLBackend(), middleware=list(instantiate_middleware(self.middleware)), @@ -361,7 +342,7 @@ def graphql(self, request_string=None, variables=None): @authorise() @expose def get_graph_raw( - self, start_point_str, stop_point_str, grouping=None + self, start_point_str, stop_point_str, grouping=None, meta=None ): """Return a textual representation of the workflow graph. @@ -406,7 +387,7 @@ def get_graph_raw( # UIServer Data Commands @authorise() @expose - def pb_entire_workflow(self): + def pb_entire_workflow(self, meta=None): """Send the entire data-store in a single Protobuf message. Returns: @@ -419,7 +400,7 @@ def pb_entire_workflow(self): @authorise() @expose - def pb_data_elements(self, element_type): + def pb_data_elements(self, element_type, meta=None): """Send the specified data elements in delta form. Args: diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 788f81355aa..0a8edc0e9eb 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -823,6 +823,15 @@ def get_command_method(self, command_name): """Return a command processing method or raise AttributeError.""" return getattr(self, f'command_{command_name}') + def queue_command(self, command, kwargs, meta): + if not meta: + meta = {"auth_user": "unknown"} + self.command_queue.put(( + command, + tuple(kwargs.values()), {}, + meta + )) + def process_command_queue(self) -> None: """Process queued commands.""" qsize = self.command_queue.qsize() @@ -831,7 +840,16 @@ def process_command_queue(self) -> None: LOG.info(f"Processing {qsize} queued command(s)") while True: try: - name, args, kwargs = self.command_queue.get(False) + command = (self.command_queue.get(False)) + if len(command) == 4: + name, args, kwargs, meta = command + else: + name, args, kwargs = command + meta = {} + LOG.info( + f"Authenticated user {meta.get('auth_user', '?')} " + f"has submitted request {name}" + ) except Empty: break args_string = ', '.join(str(a) for a in args) @@ -840,6 +858,11 @@ def process_command_queue(self) -> None: ) sep = ', ' if kwargs_string and args_string else '' cmdstr = f"{name}({args_string}{sep}{kwargs_string})" + # print("moooooo") + # if 'tasks' in kwargs: + # import mdb + # print("atttttttatchchhc") + # mdb.debug(ui_server=True) try: n_warnings: Optional[int] = self.get_command_method(name)( *args, **kwargs) diff --git a/tests/integration/test_resolvers.py b/tests/integration/test_resolvers.py index 82eb4ecd8d5..30d6ff6db7b 100644 --- a/tests/integration/test_resolvers.py +++ b/tests/integration/test_resolvers.py @@ -209,11 +209,13 @@ async def test_mutator(mock_flow, flow_args): 'workflow_sel': None }) args = {} + meta = {} response = await mock_flow.resolvers.mutator( None, 'pause', flow_args, - args + args, + meta ) assert response[0]['id'] == mock_flow.id @@ -227,9 +229,10 @@ async def test_nodes_mutator(mock_flow, flow_args): 'workflow_sel': None, }) ids = [Tokens(n) for n in mock_flow.node_ids] + meta = {} response = await mock_flow.resolvers.nodes_mutator( None, 'force_trigger_tasks', ids, flow_args, - {"reflow": False, "flow_descr": ""} + {"reflow": False, "flow_descr": ""}, meta ) assert response[0]['id'] == mock_flow.id @@ -237,7 +240,8 @@ async def test_nodes_mutator(mock_flow, flow_args): @pytest.mark.asyncio async def test_mutation_mapper(mock_flow): """Test the mapping of mutations to internal command methods.""" - response = await mock_flow.resolvers._mutation_mapper('pause', {}) + meta = {} + response = await mock_flow.resolvers._mutation_mapper('pause', {}, meta) assert response is None with pytest.raises(ValueError): - await mock_flow.resolvers._mutation_mapper('non_exist', {}) + await mock_flow.resolvers._mutation_mapper('non_exist', {}, meta) From 70115dc48e3c161b08127a255f84720bb339e879 Mon Sep 17 00:00:00 2001 From: Mel Hall <37735232+datamel@users.noreply.github.com> Date: Wed, 15 Dec 2021 15:13:21 +0000 Subject: [PATCH 03/10] Test fix --- cylc/flow/network/resolvers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index 47015cdbe35..786bbe80fc6 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -742,7 +742,7 @@ def put_messages( (task_job, event_time, severity, message)) return (True, 'Messages queued: %d' % len(messages)) - def set_graph_window_extent(self, n_edge_distance): + def set_graph_window_extent(self, n_edge_distance, meta=None): """Set data-store graph window to new max edge distance. Args: @@ -810,6 +810,7 @@ def stop( clock_time: Wallclock time after which to stop. task: Stop after this task succeeds. flow_num: The flow to stop. + meta: Meta data from ui-server ): Returns: From 6bf4444f20af919948556bfc9f814a72563bfdeb Mon Sep 17 00:00:00 2001 From: Mel Hall <37735232+datamel@users.noreply.github.com> Date: Thu, 16 Dec 2021 11:53:01 +0000 Subject: [PATCH 04/10] facepalm --- cylc/flow/scheduler.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 0a8edc0e9eb..b43fd8b7e20 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -858,11 +858,6 @@ def process_command_queue(self) -> None: ) sep = ', ' if kwargs_string and args_string else '' cmdstr = f"{name}({args_string}{sep}{kwargs_string})" - # print("moooooo") - # if 'tasks' in kwargs: - # import mdb - # print("atttttttatchchhc") - # mdb.debug(ui_server=True) try: n_warnings: Optional[int] = self.get_command_method(name)( *args, **kwargs) From 91d86d6f578779daee538ac9257cb2896ec97745 Mon Sep 17 00:00:00 2001 From: Mel Hall <37735232+datamel@users.noreply.github.com> Date: Thu, 16 Dec 2021 12:32:26 +0000 Subject: [PATCH 05/10] Change ? for Owner for logging user requesting commands --- cylc/flow/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index b43fd8b7e20..67db5f436d4 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -847,8 +847,8 @@ def process_command_queue(self) -> None: name, args, kwargs = command meta = {} LOG.info( - f"Authenticated user {meta.get('auth_user', '?')} " - f"has submitted request {name}" + f"{meta.get('auth_user', 'Owner')} has submitted request " + f"{name}" ) except Empty: break From 1568cd08299669e32142c8f93a9893c5af36ade5 Mon Sep 17 00:00:00 2001 From: Mel Hall <37735232+datamel@users.noreply.github.com> Date: Mon, 17 Jan 2022 12:50:13 +0000 Subject: [PATCH 06/10] back to wip --- cylc/flow/network/resolvers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index 786bbe80fc6..feb761e5abb 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -649,14 +649,14 @@ async def _mutation_mapper( ) -> Optional[Tuple[bool, str]]: """Map between GraphQL resolvers and internal command interface.""" method = getattr(self, command, None) + if not meta: + meta = {"auth_user": "unknown user"} if method is not None: return method(**kwargs, meta=meta) try: self.schd.get_command_method(command) except AttributeError: raise ValueError(f"Command '{command}' not found") - if not meta: - meta = {"auth_user": "unknown user"} self.schd.queue_command( command, kwargs, From a59bcf8603c58e5b019812e3626f339e73dceb69 Mon Sep 17 00:00:00 2001 From: Mel Hall <37735232+datamel@users.noreply.github.com> Date: Wed, 19 Jan 2022 09:46:49 +0000 Subject: [PATCH 07/10] remove unused code and log auth user to command --- cylc/flow/network/authorisation.py | 12 ---- cylc/flow/network/resolvers.py | 76 ++++++-------------------- cylc/flow/network/schema.py | 33 ----------- cylc/flow/scheduler.py | 17 +----- tests/functional/cylc-message/00-ssh.t | 7 +-- tests/integration/test_resolvers.py | 16 ------ 6 files changed, 21 insertions(+), 140 deletions(-) diff --git a/cylc/flow/network/authorisation.py b/cylc/flow/network/authorisation.py index 5e6ecb2f8ca..8440be88d1a 100644 --- a/cylc/flow/network/authorisation.py +++ b/cylc/flow/network/authorisation.py @@ -17,8 +17,6 @@ from functools import wraps -from cylc.flow import LOG - def authorise(): """Add authorisation to an endpoint. @@ -40,16 +38,6 @@ def wrapper(fcn): def _call(self, *args, user='?', meta=None, **kwargs): if not meta: meta = {} - host = meta.get('host', '?') - prog = meta.get('prog', '?') - comms_method = meta.get('comms_method', '?') - - # Hardcoded, for new - but much of this functionality can be - # removed more swingingly. - LOG.debug( - '[client-command] %s %s://%s@%s:%s', - fcn.__name__, comms_method, user, host, prog - ) return fcn(self, *args, meta=meta, **kwargs) return _call diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index feb761e5abb..4b537476547 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -34,6 +34,7 @@ from uuid import uuid4 from graphene.utils.str_converters import to_snake_case +from cylc.flow import LOG from cylc.flow.data_store_mgr import ( EDGES, FAMILY_PROXIES, TASK_PROXIES, WORKFLOW, @@ -605,62 +606,26 @@ async def mutator(self, *m_args): result = (True, 'Command queued') return [{'id': w_id, 'response': result}] - async def nodes_mutator(self, *m_args): - """Mutate node items of associated workflows.""" - _, command, tokens_list, w_args, args, meta = m_args - w_ids = [ - workflow[WORKFLOW].id - for workflow in await self.get_workflows_data(w_args) - ] - if not w_ids: - workflows = list(self.data_store_mgr.data.keys()) - return [{ - 'response': (False, f'No matching workflow in {workflows}')}] - w_id = w_ids[0] - # match proxy ID args with workflows - items = [] - for tokens in tokens_list: - owner = tokens.get('user') - workflow = tokens.get('workflow') - if workflow and owner is None: - owner = "*" - if ( - not (owner and workflow) - or fnmatchcase( - w_id, - tokens.workflow_id, - ) - ): - items.append( - tokens.relative_id - ) - if items: - if command == 'put_messages': - args['task_job'] = items[0] - else: - args['tasks'] = items - result = await self._mutation_mapper(command, args, meta) - if result is None: - result = (True, 'Command queued') - return [{'id': w_id, 'response': result}] - async def _mutation_mapper( self, command: str, kwargs: Dict[str, Any], meta: Dict[str, Any] ) -> Optional[Tuple[bool, str]]: """Map between GraphQL resolvers and internal command interface.""" method = getattr(self, command, None) - if not meta: - meta = {"auth_user": "unknown user"} if method is not None: - return method(**kwargs, meta=meta) + return method(**kwargs) try: self.schd.get_command_method(command) except AttributeError: raise ValueError(f"Command '{command}' not found") + if command != "put_messages": + log_msg = f"[command] {command} " + user = meta.get('auth_user', self.schd.owner) + if user != self.schd.owner: + log_msg += (f"issued by {user}") + LOG.info(log_msg) self.schd.queue_command( command, - kwargs, - meta, + kwargs ) return None @@ -670,8 +635,7 @@ def broadcast( cycle_points=None, namespaces=None, settings=None, - cutoff=None, - meta=None + cutoff=None ): """Put or clear broadcasts.""" if mode == 'put_broadcast': @@ -688,8 +652,7 @@ def broadcast( def put_ext_trigger( self, message, - id, # noqa: A002 (graphql interface) - meta=None + id # noqa: A002 (graphql interface) ): """Server-side external event trigger interface. @@ -713,8 +676,7 @@ def put_messages( self, task_job=None, event_time=None, - messages=None, - meta=None + messages=None ): """Put task messages in queue for processing later by the main loop. @@ -742,7 +704,7 @@ def put_messages( (task_job, event_time, severity, message)) return (True, 'Messages queued: %d' % len(messages)) - def set_graph_window_extent(self, n_edge_distance, meta=None): + def set_graph_window_extent(self, n_edge_distance): """Set data-store graph window to new max edge distance. Args: @@ -768,8 +730,7 @@ def force_spawn_children( self, tasks: Iterable[str], outputs: Optional[Iterable[str]] = None, - flow_num: Optional[int] = None, - meta: Optional[Dict[str, Any]] = None + flow_num: Optional[int] = None ) -> Tuple[bool, str]: """Spawn children of given task outputs. @@ -788,7 +749,6 @@ def force_spawn_children( "outputs": outputs, "flow_num": flow_num }, - meta ) ) return (True, 'Command queued') @@ -800,7 +760,6 @@ def stop( clock_time: Optional[str] = None, task: Optional[str] = None, flow_num: Optional[int] = None, - meta: Optional[Dict[str, Any]] = None ) -> Tuple[bool, str]: """Stop the workflow or specific flow from spawning any further. @@ -810,7 +769,6 @@ def stop( clock_time: Wallclock time after which to stop. task: Stop after this task succeeds. flow_num: The flow to stop. - meta: Meta data from ui-server ): Returns: @@ -828,7 +786,7 @@ def stop( 'task': task, 'flow_num': flow_num, }), - meta) + ) ) return (True, 'Command queued') @@ -837,7 +795,6 @@ def force_trigger_tasks( tasks=None, reflow=False, flow_descr=None, - meta=None ): """Trigger submission of task jobs where possible. @@ -865,8 +822,7 @@ def force_trigger_tasks( { "reflow": reflow, "flow_descr": flow_descr - }, - meta + } ), ) return (True, 'Command queued') diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 290d6d5ba3f..32e90f4e012 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -1222,39 +1222,6 @@ async def mutator(root, info, command=None, workflows=None, res = await resolvers.mutator(info, command, w_args, args, meta) return GenericResponse(result=res) - -async def nodes_mutator(root, info, command, ids, workflows=None, - exworkflows=None, **args): - """Call the resolver method, dealing with multiple node id arguments, - which acts on the workflow service via the internal command queue.""" - tokens_list = [Tokens(n_id, relative=True) for n_id in ids] - # if the workflows arg is empty extract from proxy args - if workflows is None: - workflows = set() - for tokens in tokens_list: - workflows.add(tokens.workflow_id) - if not workflows: - return GenericResponse(result="Error: No given Workflow(s)") - if exworkflows is None: - exworkflows = [] - w_args = {} - w_args['workflows'] = [Tokens(w_id) for w_id in workflows] - w_args['exworkflows'] = [Tokens(w_id) for w_id in exworkflows] - if args.get('args', False): - args.update(args.get('args', {})) - args.pop('args') - resolvers = info.context.get('resolvers') - meta = info.context.get('meta') - res = await resolvers.nodes_mutator( - info, - command, - tokens_list, - w_args, - args, - meta - ) - return GenericResponse(result=res) - # Input types: diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 67db5f436d4..3a287b3e09d 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -823,13 +823,10 @@ def get_command_method(self, command_name): """Return a command processing method or raise AttributeError.""" return getattr(self, f'command_{command_name}') - def queue_command(self, command, kwargs, meta): - if not meta: - meta = {"auth_user": "unknown"} + def queue_command(self, command, kwargs): self.command_queue.put(( command, - tuple(kwargs.values()), {}, - meta + tuple(kwargs.values()), {} )) def process_command_queue(self) -> None: @@ -841,15 +838,7 @@ def process_command_queue(self) -> None: while True: try: command = (self.command_queue.get(False)) - if len(command) == 4: - name, args, kwargs, meta = command - else: - name, args, kwargs = command - meta = {} - LOG.info( - f"{meta.get('auth_user', 'Owner')} has submitted request " - f"{name}" - ) + name, args, kwargs = command except Empty: break args_string = ', '.join(str(a) for a in args) diff --git a/tests/functional/cylc-message/00-ssh.t b/tests/functional/cylc-message/00-ssh.t index 9980d349a30..cede61fb0ea 100755 --- a/tests/functional/cylc-message/00-ssh.t +++ b/tests/functional/cylc-message/00-ssh.t @@ -21,7 +21,7 @@ export REQUIRE_PLATFORM='loc:remote comms:ssh' . "$(dirname "$0")/test_header" #------------------------------------------------------------------------------- -set_test_number 3 +set_test_number 2 install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" run_ok "${TEST_NAME_BASE}-validate" \ @@ -29,8 +29,5 @@ run_ok "${TEST_NAME_BASE}-validate" \ workflow_run_ok "${TEST_NAME_BASE}-run" \ cylc play --debug --no-detach --reference-test "${WORKFLOW_NAME}" -grep_ok "\[client-command\] graphql ssh" \ - "$RUN_DIR/${WORKFLOW_NAME}/log/workflow/log" - -purge +#purge exit diff --git a/tests/integration/test_resolvers.py b/tests/integration/test_resolvers.py index 30d6ff6db7b..dcf1270e80c 100644 --- a/tests/integration/test_resolvers.py +++ b/tests/integration/test_resolvers.py @@ -220,22 +220,6 @@ async def test_mutator(mock_flow, flow_args): assert response[0]['id'] == mock_flow.id -@pytest.mark.asyncio -async def test_nodes_mutator(mock_flow, flow_args): - """Test the nodes mutation method.""" - flow_args['workflows'].append({ - 'user': mock_flow.owner, - 'workflow': mock_flow.name, - 'workflow_sel': None, - }) - ids = [Tokens(n) for n in mock_flow.node_ids] - meta = {} - response = await mock_flow.resolvers.nodes_mutator( - None, 'force_trigger_tasks', ids, flow_args, - {"reflow": False, "flow_descr": ""}, meta - ) - assert response[0]['id'] == mock_flow.id - @pytest.mark.asyncio async def test_mutation_mapper(mock_flow): From cb419410a0d2a7e886405964302b457097ff1c13 Mon Sep 17 00:00:00 2001 From: Mel Hall <37735232+datamel@users.noreply.github.com> Date: Wed, 19 Jan 2022 11:08:22 +0000 Subject: [PATCH 08/10] Remove redundant test --- tests/functional/cylc-message/00-ssh.t | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/cylc-message/00-ssh.t b/tests/functional/cylc-message/00-ssh.t index cede61fb0ea..935c35837ae 100755 --- a/tests/functional/cylc-message/00-ssh.t +++ b/tests/functional/cylc-message/00-ssh.t @@ -29,5 +29,5 @@ run_ok "${TEST_NAME_BASE}-validate" \ workflow_run_ok "${TEST_NAME_BASE}-run" \ cylc play --debug --no-detach --reference-test "${WORKFLOW_NAME}" -#purge +purge exit From baab059d66402fa7ea57c043e03b56d525f98840 Mon Sep 17 00:00:00 2001 From: Mel Hall <37735232+datamel@users.noreply.github.com> Date: Wed, 19 Jan 2022 12:03:49 +0000 Subject: [PATCH 09/10] Tweak log message --- cylc/flow/network/resolvers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index 4b537476547..b5f9d90ab48 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -621,7 +621,7 @@ async def _mutation_mapper( log_msg = f"[command] {command} " user = meta.get('auth_user', self.schd.owner) if user != self.schd.owner: - log_msg += (f"issued by {user}") + log_msg += (f"(issued by {user})") LOG.info(log_msg) self.schd.queue_command( command, From fbe1768d11f3547d37d39b5ae05d79dc9cbab2b5 Mon Sep 17 00:00:00 2001 From: Melanie Hall <37735232+datamel@users.noreply.github.com> Date: Wed, 19 Jan 2022 13:26:52 +0000 Subject: [PATCH 10/10] Update cylc/flow/network/resolvers.py Co-authored-by: Oliver Sanders --- cylc/flow/network/resolvers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index b5f9d90ab48..08423595250 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -618,10 +618,10 @@ async def _mutation_mapper( except AttributeError: raise ValueError(f"Command '{command}' not found") if command != "put_messages": - log_msg = f"[command] {command} " + log_msg = f"[command] {command}" user = meta.get('auth_user', self.schd.owner) if user != self.schd.owner: - log_msg += (f"(issued by {user})") + log_msg += (f" (issued by {user})") LOG.info(log_msg) self.schd.queue_command( command,