Skip to content

Commit

Permalink
Server configure into init, and adjust doc strings.
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Jul 4, 2022
1 parent 7daad49 commit 80bcdd8
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 21 deletions.
3 changes: 2 additions & 1 deletion cylc/flow/network/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class WorkflowPublisher(ZMQSocketBase):
This class contains the logic for the ZMQ message Publisher.
Usage:
* Define ...
* Call publish to send items to subscribers.
"""

Expand All @@ -69,6 +69,7 @@ def _socket_options(self):
def _bespoke_stop(self):
"""Bespoke stop items."""
LOG.debug('stopping zmq publisher...')
self.stopping = True

async def send_multi(self, topic, data, serializer=None):
"""Send multi part message.
Expand Down
17 changes: 10 additions & 7 deletions cylc/flow/network/replier.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@
class WorkflowReplier(ZMQSocketBase):
"""Initiate the REP part of a ZMQ REQ-REP pattern.
This class contains the logic for the ZMQ message replier.
This class contains the logic for the ZMQ message replier. The REQ/REP
pattern is serial, in that it cannot REQ or REP twice in a row. After
receiving you must send a response.
Usage:
<<<<<<< HEAD
* Define ...
=======
* Start the replier.
* Call the listener to process incomming REQ and send the REP.
Expand All @@ -44,7 +43,6 @@ class WorkflowReplier(ZMQSocketBase):
* Expects requests of the format: {"command": CMD, "args": {...}}
* Sends responses of the format: {"data": {...}}
* Sends errors in the format: {"error": {"message": MSG}}
>>>>>>> 1f9dcdf40... expand listner test
"""

Expand All @@ -65,7 +63,12 @@ def _bespoke_stop(self):
self.queue.put('STOP')

def listener(self):
"""The server main loop, listen for and serve requests."""
"""The server main loop, listen for and serve requests.
When called, this method will receive and respond until there are no
more messages then break to the caller.
"""
# Note: we are using CurveZMQ to secure the messages (see
# self.curve_auth, self.socket.curve_...key etc.). We have set up
# public-key cryptography on the ZMQ messaging and sockets, so
Expand All @@ -82,7 +85,7 @@ def listener(self):
# Check for messages
msg = self.socket.recv_string(zmq.NOBLOCK)
except zmq.error.Again:
# No messages, break to parent loop.
# No messages, break to parent loop/caller.
break
except zmq.error.ZMQError as exc:
LOG.exception('unexpected error: %s', exc)
Expand Down
13 changes: 6 additions & 7 deletions cylc/flow/network/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,17 @@ def filter_none(dictionary):
class WorkflowRuntimeServer:
"""Workflow runtime service API facade exposed via zmq.
This class contains the Cylc endpoints.
This class starts and coordinates the publisher and replier, and
contains the Cylc endpoints invoked by the receiver to provide a response
to incomming messages.
Args:
schd (object): The parent object instantiating the server. In
this case, the workflow scheduler.
context (object): The instantiated ZeroMQ context (i.e. zmq.Context())
passed in from the application.
barrier (object): Threading Barrier object used to sync threads, for
the main thread to ensure socket setup has finished.
Usage:
* Define endpoints using the ``expose`` decorator.
* Call endpoints using the function name.
* Endpoints are called via the receiver using the function name.
Message interface:
* Accepts messages of the format: {"command": CMD, "args": {...}}
Expand Down Expand Up @@ -154,7 +152,6 @@ def __init__(self, schd):
self.stopping = False
self.stopped = True

def configure(self):
self.register_endpoints()

def start(self, barrier):
Expand Down Expand Up @@ -213,6 +210,8 @@ async def stop(self, reason):
self.queue.put('STOP')
if self.thread and self.thread.is_alive():
while not self.stopping:
# Non-async sleep - yield to other threads rather
# than event loop.
sleep(self.STOP_SLEEP_INTERVAL)

if self.replier:
Expand Down
12 changes: 6 additions & 6 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ class Scheduler:
options: Values
cylc_config: DictTree # [scheduler] config

# tcp / zmq
server: WorkflowRuntimeServer

# Note: attributes without a default must come before those with defaults

# flow information
Expand Down Expand Up @@ -230,9 +233,6 @@ class Scheduler:
auto_restart_mode: Optional[AutoRestartMode] = None
auto_restart_time: Optional[float] = None

# tcp / zmq
server: Optional[WorkflowRuntimeServer] = None

# queue-released tasks awaiting job preparation
pre_prep_tasks: Optional[List[TaskProxy]] = None

Expand Down Expand Up @@ -339,7 +339,6 @@ async def initialise(self):
self.flow_mgr = FlowMgr(self.workflow_db_mgr)

self.server = WorkflowRuntimeServer(self)
self.server.configure()

self.proc_pool = SubProcPool()
self.command_queue = Queue()
Expand Down Expand Up @@ -597,7 +596,7 @@ async def run_scheduler(self):
)
self.server.publish_queue.put(
self.data_store_mgr.publish_deltas)
# Yield control to other threads
# Non-async sleep - yield to other threads rather than event loop
sleep(0)
self.profiler.start()
await self.main_loop()
Expand Down Expand Up @@ -1649,7 +1648,8 @@ async def update_data_structure(self):
self.data_store_mgr.publish_pending = False
self.server.publish_queue.put(
self.data_store_mgr.publish_deltas)
# Yield control to other threads
# Non-async sleep - yield to other threads rather
# than event loop
sleep(0)
if has_updated:
# Database update
Expand Down

0 comments on commit 80bcdd8

Please sign in to comment.