From 71da6cb92006c9b5e9c6e3f6868ad29b971280f5 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Wed, 13 Apr 2022 15:03:53 +1200 Subject: [PATCH] Server configure into init, and adjust doc strings. --- cylc/flow/network/publisher.py | 3 ++- cylc/flow/network/replier.py | 17 ++++++++++------- cylc/flow/network/server.py | 13 ++++++------- cylc/flow/scheduler.py | 6 +++--- 4 files changed, 21 insertions(+), 18 deletions(-) diff --git a/cylc/flow/network/publisher.py b/cylc/flow/network/publisher.py index 96d6fd4c4c0..7383e7675df 100644 --- a/cylc/flow/network/publisher.py +++ b/cylc/flow/network/publisher.py @@ -46,7 +46,7 @@ class WorkflowPublisher(ZMQSocketBase): This class contains the logic for the ZMQ message Publisher. Usage: - * Define ... + * Call publish to send items to subscribers. """ @@ -69,6 +69,7 @@ def _socket_options(self): def _bespoke_stop(self): """Bespoke stop items.""" LOG.debug('stopping zmq publisher...') + self.stopping = True async def send_multi(self, topic, data, serializer=None): """Send multi part message. diff --git a/cylc/flow/network/replier.py b/cylc/flow/network/replier.py index 62b8e1fcfec..b03e74d34ef 100644 --- a/cylc/flow/network/replier.py +++ b/cylc/flow/network/replier.py @@ -27,12 +27,11 @@ class WorkflowReplier(ZMQSocketBase): """Initiate the REP part of a ZMQ REQ-REP pattern. - This class contains the logic for the ZMQ message replier. + This class contains the logic for the ZMQ message replier. The REQ/REP + pattern is serial, in that it cannot REQ or REP twice in a row. After + receiving you must send a response. Usage: -<<<<<<< HEAD - * Define ... -======= * Start the replier. * Call the listener to process incomming REQ and send the REP. @@ -44,7 +43,6 @@ class WorkflowReplier(ZMQSocketBase): * Expects requests of the format: {"command": CMD, "args": {...}} * Sends responses of the format: {"data": {...}} * Sends errors in the format: {"error": {"message": MSG}} ->>>>>>> 1f9dcdf40... expand listner test """ @@ -65,7 +63,12 @@ def _bespoke_stop(self): self.queue.put('STOP') def listener(self): - """The server main loop, listen for and serve requests.""" + """The server main loop, listen for and serve requests. + + When called, this method will receive and respond until there are no + more messages then break to the caller. + + """ # Note: we are using CurveZMQ to secure the messages (see # self.curve_auth, self.socket.curve_...key etc.). We have set up # public-key cryptography on the ZMQ messaging and sockets, so @@ -82,7 +85,7 @@ def listener(self): # Check for messages msg = self.socket.recv_string(zmq.NOBLOCK) except zmq.error.Again: - # No messages, break to parent loop. + # No messages, break to parent loop/caller. break except zmq.error.ZMQError as exc: LOG.exception('unexpected error: %s', exc) diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py index 5afd94caf36..3dc4f4a7990 100644 --- a/cylc/flow/network/server.py +++ b/cylc/flow/network/server.py @@ -76,19 +76,17 @@ def filter_none(dictionary): class WorkflowRuntimeServer: """Workflow runtime service API facade exposed via zmq. - This class contains the Cylc endpoints. + This class starts and coordinates the publisher and replier, and + contains the Cylc endpoints invoked by the receiver to provide a response + to incomming messages. Args: schd (object): The parent object instantiating the server. In this case, the workflow scheduler. - context (object): The instantiated ZeroMQ context (i.e. zmq.Context()) - passed in from the application. - barrier (object): Threading Barrier object used to sync threads, for - the main thread to ensure socket setup has finished. Usage: * Define endpoints using the ``expose`` decorator. - * Call endpoints using the function name. + * Endpoints are called via the receiver using the function name. Message interface: * Accepts messages of the format: {"command": CMD, "args": {...}} @@ -154,7 +152,6 @@ def __init__(self, schd): self.stopping = False self.stopped = True - def configure(self): self.register_endpoints() # TODO: this in zmq asyncio context? @@ -214,6 +211,8 @@ async def stop(self, reason): self.queue.put('STOP') if self.thread and self.thread.is_alive(): while not self.stopping: + # Non-async sleep - yield to other threads rather + # than event loop. sleep(self.STOP_SLEEP_INTERVAL) if self.replier: diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 0974cc8df12..63cbfc00591 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -327,7 +327,6 @@ async def initialise(self): self.flow_mgr = FlowMgr(self.workflow_db_mgr) self.server = WorkflowRuntimeServer(self) - self.server.configure() self.proc_pool = SubProcPool() self.command_queue = Queue() @@ -572,7 +571,7 @@ async def run_scheduler(self): ) self.server.publish_queue.put( self.data_store_mgr.publish_deltas) - # Yield control to other threads + # Non-async sleep - yield to other threads rather than event loop sleep(0) self.profiler.start() await self.main_loop() @@ -1603,7 +1602,8 @@ async def update_data_structure(self): self.data_store_mgr.publish_pending = False self.server.publish_queue.put( self.data_store_mgr.publish_deltas) - # Yield control to other threads + # Non-async sleep - yield to other threads rather + # than event loop sleep(0) if has_updated: # Database update