diff --git a/distributed/core.py b/distributed/core.py index d54592ef5a1..fa3463de3e1 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -666,8 +666,7 @@ async def handle_stream(self, comm, extra=None): await comm.close() assert comm.closed() - @gen.coroutine - def close(self): + async def close(self): for pc in self.periodic_callbacks.values(): pc.stop() @@ -676,23 +675,25 @@ def close(self): for listener in self.listeners: future = listener.stop() if inspect.isawaitable(future): - yield future + await future for i in range(20): # If there are still handlers running at this point, give them a # second to finish gracefully themselves, otherwise... if any(self._comms.values()): - yield asyncio.sleep(0.05) + await asyncio.sleep(0.05) else: break - yield self.rpc.close() - yield [comm.close() for comm in list(self._comms)] # then forcefully close + await self.rpc.close() + await asyncio.gather( + *(comm.close() for comm in self._comms) + ) # then forcefully close for cb in self._ongoing_coroutines: cb.cancel() for i in range(10): if all(c.cancelled() for c in self._ongoing_coroutines): break else: - yield asyncio.sleep(0.01) + await asyncio.sleep(0.01) self._event_finished.set()