From bc69449f16cde0e26f6b014010e870192c2a7805 Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Thu, 25 Aug 2022 15:26:26 +0100 Subject: [PATCH] Simplify `WorkflowRuntimeServer` stop logic --- cylc/flow/network/server.py | 20 +++++++++---------- tests/integration/test_server.py | 33 +++++++------------------------- 2 files changed, 16 insertions(+), 37 deletions(-) diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py index 0661f223782..f6847dcef59 100644 --- a/cylc/flow/network/server.py +++ b/cylc/flow/network/server.py @@ -148,9 +148,8 @@ def __init__(self, schd): IgnoreFieldMiddleware, ] - self.queue: 'Queue[str]' = Queue() self.publish_queue: 'Queue[Iterable[tuple]]' = Queue() - self.stopping = False + self.waiting_to_stop = False self.stopped = True self.register_endpoints() @@ -215,9 +214,10 @@ async def stop(self, reason: Union[BaseException, str]) -> None: server's self.thread in order to interrupt the self.operate() loop and wait for self.thread to terminate. """ - self.queue.put('STOP') + self.waiting_to_stop = True if self.thread and self.thread.is_alive(): - while not self.stopping: + # Wait for self.operate() loop to finish: + while self.waiting_to_stop: # Non-async sleep - yield to other threads rather than # event loop (allows self.operate() running in different # thread to return) @@ -247,13 +247,11 @@ def operate(self) -> None: # of the listener runs the event loop synchronously # (in graphql AsyncioExecutor) while True: - # process messages from the scheduler. - if self.queue.qsize(): - message = self.queue.get() - if message == 'STOP': - self.stopping = True - break - raise ValueError('Unknown message "%s"' % message) + if self.waiting_to_stop: + # The self.stop() method is waiting for us to signal that we + # have finished here + self.waiting_to_stop = False + return # Gather and respond to any requests. self.replier.listener() diff --git a/tests/integration/test_server.py b/tests/integration/test_server.py index 8cec055d763..63a6bd0d67c 100644 --- a/tests/integration/test_server.py +++ b/tests/integration/test_server.py @@ -16,7 +16,6 @@ from typing import Callable from async_timeout import timeout -import asyncio from getpass import getuser import pytest @@ -80,35 +79,17 @@ def test_pb_entire_workflow(myflow): assert data.workflow.id == myflow.id -async def test_stop(one, start): - """Test listener.""" +async def test_stop(one: Scheduler, start): + """Test stop.""" async with start(one): - await one.server.stop('TESTING') async with timeout(2): - # wait for the server to consume the STOP item from the queue - while True: - if one.server.queue.empty(): - break - await asyncio.sleep(0.01) + # Wait for the server to consume the STOP signal. + # If it doesn't, the test will fail with a timeout error. + await one.server.stop('TESTING') + assert one.server.stopped -async def test_operate(one, start): - """Test operate.""" - async with start(one): - one.server.queue.put('STOP') - async with timeout(2): - # wait for the server to consume the STOP item from the queue - while True: - if one.server.queue.empty(): - break - await asyncio.sleep(0.01) - # ensure the server is "closed" - with pytest.raises(ValueError): - one.server.queue.put('foobar') - one.server.operate() - - -async def test_receiver(one, start): +async def test_receiver(one: Scheduler, start): """Test the receiver with different message objects.""" async with timeout(5): async with start(one):