Skip to content

Let servers close faster if there are no active handlers #4805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,8 @@ async def handle_comm(self, comm):
e,
)
break

self._comms[comm] = None
msg = result = None
if close_desired:
await comm.close()
Expand Down Expand Up @@ -595,15 +597,18 @@ async def handle_stream(self, comm, extra=None, every_cycle=[]):
def close(self):
for pc in self.periodic_callbacks.values():
pc.stop()
self.__stopped = True
for listener in self.listeners:
future = listener.stop()
if inspect.isawaitable(future):
yield future
for i in range(20): # let comms close naturally for a second
if not self._comms:
break
else:
for i in range(20):
# If there are still handlers running at this point, give them a
# second to finish gracefully themselves, otherwise...
if any(self._comms.values()):
yield asyncio.sleep(0.05)
else:
break
yield [comm.close() for comm in list(self._comms)] # then forcefully close
for cb in self._ongoing_coroutines:
cb.cancel()
Expand Down
67 changes: 67 additions & 0 deletions distributed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -886,3 +886,70 @@ async def sleep(comm=None):
async def test_server_redundant_kwarg():
with pytest.raises(TypeError, match="unexpected keyword argument"):
await Server({}, typo_kwarg="foo")


async def test_server_comms_mark_active_handlers():
"""Whether handlers are active can be read off of the self._comms values.
ensure this is properly reflected and released. The sentinel for
"open comm but no active handler" is `None`
"""

async def long_handler(comm):
await asyncio.sleep(0.2)
return "done"

server = await Server({"wait": long_handler})
await server.listen(0)
assert server._comms == {}

comm = await connect(server.address)
await comm.write({"op": "wait"})
while not server._comms:
await asyncio.sleep(0.05)
assert set(server._comms.values()) == {"wait"}
assert await comm.read() == "done"
assert set(server._comms.values()) == {None}
await comm.close()
while server._comms:
await asyncio.sleep(0.01)


@pytest.mark.asyncio
async def test_close_fast_without_active_handlers():
async def very_fast(comm):
return "done"

server = await Server({"do_stuff": very_fast})
await server.listen(0)
assert server._comms == {}

comm = await connect(server.address)
await comm.write({"op": "do_stuff"})
while not server._comms:
await asyncio.sleep(0.05)
fut = server.close()

await asyncio.wait_for(fut, 0.1)


@pytest.mark.asyncio
async def test_close_grace_period_for_handlers():
async def long_handler(comm, delay=10):
await asyncio.sleep(delay)
return "done"

server = await Server({"wait": long_handler})
await server.listen(0)
assert server._comms == {}

comm = await connect(server.address)
await comm.write({"op": "wait"})
while not server._comms:
await asyncio.sleep(0.05)
fut = server.close()
# since the handler is running for a while, the close will not immediately
# go through. We'll give the comm about a second to close itself
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(fut, 0.5)
await comm.close()
await server.close()