Skip to content

Commit

Permalink
Simplify WorkflowRuntimeServer stop logic
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie committed Aug 25, 2022
1 parent 9ebf949 commit bc69449
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 37 deletions.
20 changes: 9 additions & 11 deletions cylc/flow/network/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,8 @@ def __init__(self, schd):
IgnoreFieldMiddleware,
]

self.queue: 'Queue[str]' = Queue()
self.publish_queue: 'Queue[Iterable[tuple]]' = Queue()
self.stopping = False
self.waiting_to_stop = False
self.stopped = True

self.register_endpoints()
Expand Down Expand Up @@ -215,9 +214,10 @@ async def stop(self, reason: Union[BaseException, str]) -> None:
server's self.thread in order to interrupt the self.operate() loop
and wait for self.thread to terminate.
"""
self.queue.put('STOP')
self.waiting_to_stop = True
if self.thread and self.thread.is_alive():
while not self.stopping:
# Wait for self.operate() loop to finish:
while self.waiting_to_stop:
# Non-async sleep - yield to other threads rather than
# event loop (allows self.operate() running in different
# thread to return)
Expand Down Expand Up @@ -247,13 +247,11 @@ def operate(self) -> None:
# of the listener runs the event loop synchronously
# (in graphql AsyncioExecutor)
while True:
# process messages from the scheduler.
if self.queue.qsize():
message = self.queue.get()
if message == 'STOP':
self.stopping = True
break
raise ValueError('Unknown message "%s"' % message)
if self.waiting_to_stop:
# The self.stop() method is waiting for us to signal that we
# have finished here
self.waiting_to_stop = False
return

# Gather and respond to any requests.
self.replier.listener()
Expand Down
33 changes: 7 additions & 26 deletions tests/integration/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from typing import Callable
from async_timeout import timeout
import asyncio
from getpass import getuser

import pytest
Expand Down Expand Up @@ -80,35 +79,17 @@ def test_pb_entire_workflow(myflow):
assert data.workflow.id == myflow.id


async def test_stop(one, start):
"""Test listener."""
async def test_stop(one: Scheduler, start):
"""Test stop."""
async with start(one):
await one.server.stop('TESTING')
async with timeout(2):
# wait for the server to consume the STOP item from the queue
while True:
if one.server.queue.empty():
break
await asyncio.sleep(0.01)
# Wait for the server to consume the STOP signal.
# If it doesn't, the test will fail with a timeout error.
await one.server.stop('TESTING')
assert one.server.stopped


async def test_operate(one, start):
"""Test operate."""
async with start(one):
one.server.queue.put('STOP')
async with timeout(2):
# wait for the server to consume the STOP item from the queue
while True:
if one.server.queue.empty():
break
await asyncio.sleep(0.01)
# ensure the server is "closed"
with pytest.raises(ValueError):
one.server.queue.put('foobar')
one.server.operate()


async def test_receiver(one, start):
async def test_receiver(one: Scheduler, start):
"""Test the receiver with different message objects."""
async with timeout(5):
async with start(one):
Expand Down

0 comments on commit bc69449

Please sign in to comment.