Skip to content

Commit

Permalink
Let servers close faster if there are no active handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed May 25, 2021
1 parent d41169b commit 9aab738
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 4 deletions.
13 changes: 9 additions & 4 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,8 @@ async def handle_comm(self, comm):
e,
)
break

self._comms[comm] = None
msg = result = None
if close_desired:
await comm.close()
Expand Down Expand Up @@ -598,15 +600,18 @@ async def handle_stream(self, comm, extra=None, every_cycle=[]):
def close(self):
for pc in self.periodic_callbacks.values():
pc.stop()
self.__stopped = True
for listener in self.listeners:
future = listener.stop()
if inspect.isawaitable(future):
yield future
for i in range(20): # let comms close naturally for a second
if not self._comms:
break
else:
for i in range(20):
# If there are still handlers running at this point, give them a
# second to finish gracefully themselves, otherwise...
if any(self._comms.values()):
yield asyncio.sleep(0.05)
else:
break
yield [comm.close() for comm in list(self._comms)] # then forcefully close
for cb in self._ongoing_coroutines:
cb.cancel()
Expand Down
68 changes: 68 additions & 0 deletions distributed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,3 +880,71 @@ async def sleep(comm=None):

# weakref set/dict should be cleaned up
assert not len(server._ongoing_coroutines)


@pytest.mark.asyncio
async def test_server_comms_mark_active_handlers():
"""Whether handlers are active can be read off of the self._comms values.
ensure this is properly reflected and released. The sentinel for
"open comm but no active handler" is `None`
"""

async def long_handler(comm):
await asyncio.sleep(0.2)
return "done"

server = await Server({"wait": long_handler})
await server.listen(0)
assert server._comms == {}

comm = await connect(server.address)
await comm.write({"op": "wait"})
while not server._comms:
await asyncio.sleep(0.05)
assert set(server._comms.values()) == {"wait"}
assert await comm.read() == "done"
assert set(server._comms.values()) == {None}
await comm.close()
while server._comms:
await asyncio.sleep(0.01)


@pytest.mark.asyncio
async def test_close_fast_without_active_handlers():
async def very_fast(comm):
return "done"

server = await Server({"do_stuff": very_fast})
await server.listen(0)
assert server._comms == {}

comm = await connect(server.address)
await comm.write({"op": "do_stuff"})
while not server._comms:
await asyncio.sleep(0.05)
fut = server.close()

await asyncio.wait_for(fut, 0.1)


@pytest.mark.asyncio
async def test_close_grace_period_for_handlers():
async def long_handler(comm, delay=10):
await asyncio.sleep(delay)
return "done"

server = await Server({"wait": long_handler})
await server.listen(0)
assert server._comms == {}

comm = await connect(server.address)
await comm.write({"op": "wait"})
while not server._comms:
await asyncio.sleep(0.05)
fut = server.close()
# since the handler is running for a while, the close will not immediately
# go through. We'll give the comm about a second to close itself
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(fut, 0.5)
await comm.close()
await server.close()

0 comments on commit 9aab738

Please sign in to comment.