From 9aab73884fe12a302202c27d4a8abcb5b624e11b Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 11 May 2021 16:18:07 +0200 Subject: [PATCH] Let servers close faster if there are no active handlers --- distributed/core.py | 13 +++++-- distributed/tests/test_core.py | 68 ++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 4 deletions(-) diff --git a/distributed/core.py b/distributed/core.py index cfd84bce95a..4dd4fbfb7de 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -528,6 +528,8 @@ async def handle_comm(self, comm): e, ) break + + self._comms[comm] = None msg = result = None if close_desired: await comm.close() @@ -598,15 +600,18 @@ async def handle_stream(self, comm, extra=None, every_cycle=[]): def close(self): for pc in self.periodic_callbacks.values(): pc.stop() + self.__stopped = True for listener in self.listeners: future = listener.stop() if inspect.isawaitable(future): yield future - for i in range(20): # let comms close naturally for a second - if not self._comms: - break - else: + for i in range(20): + # If there are still handlers running at this point, give them a + # second to finish gracefully themselves, otherwise... + if any(self._comms.values()): yield asyncio.sleep(0.05) + else: + break yield [comm.close() for comm in list(self._comms)] # then forcefully close for cb in self._ongoing_coroutines: cb.cancel() diff --git a/distributed/tests/test_core.py b/distributed/tests/test_core.py index 6dd76f901ec..b5b5cb3aa9b 100644 --- a/distributed/tests/test_core.py +++ b/distributed/tests/test_core.py @@ -880,3 +880,71 @@ async def sleep(comm=None): # weakref set/dict should be cleaned up assert not len(server._ongoing_coroutines) + + +@pytest.mark.asyncio +async def test_server_comms_mark_active_handlers(): + """Whether handlers are active can be read off of the self._comms values. + ensure this is properly reflected and released. The sentinel for + "open comm but no active handler" is `None` + """ + + async def long_handler(comm): + await asyncio.sleep(0.2) + return "done" + + server = await Server({"wait": long_handler}) + await server.listen(0) + assert server._comms == {} + + comm = await connect(server.address) + await comm.write({"op": "wait"}) + while not server._comms: + await asyncio.sleep(0.05) + assert set(server._comms.values()) == {"wait"} + assert await comm.read() == "done" + assert set(server._comms.values()) == {None} + await comm.close() + while server._comms: + await asyncio.sleep(0.01) + + +@pytest.mark.asyncio +async def test_close_fast_without_active_handlers(): + async def very_fast(comm): + return "done" + + server = await Server({"do_stuff": very_fast}) + await server.listen(0) + assert server._comms == {} + + comm = await connect(server.address) + await comm.write({"op": "do_stuff"}) + while not server._comms: + await asyncio.sleep(0.05) + fut = server.close() + + await asyncio.wait_for(fut, 0.1) + + +@pytest.mark.asyncio +async def test_close_grace_period_for_handlers(): + async def long_handler(comm, delay=10): + await asyncio.sleep(delay) + return "done" + + server = await Server({"wait": long_handler}) + await server.listen(0) + assert server._comms == {} + + comm = await connect(server.address) + await comm.write({"op": "wait"}) + while not server._comms: + await asyncio.sleep(0.05) + fut = server.close() + # since the handler is running for a while, the close will not immediately + # go through. We'll give the comm about a second to close itself + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(fut, 0.5) + await comm.close() + await server.close()