From d51943fa189a518603df92f71d14d2237265029e Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 18 May 2022 14:40:21 +0200 Subject: [PATCH 1/3] Replace @gen.coroutine with async --- distributed/core.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/distributed/core.py b/distributed/core.py index d54592ef5a1..3baad89f730 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -666,8 +666,7 @@ async def handle_stream(self, comm, extra=None): await comm.close() assert comm.closed() - @gen.coroutine - def close(self): + async def close(self): for pc in self.periodic_callbacks.values(): pc.stop() @@ -676,23 +675,25 @@ def close(self): for listener in self.listeners: future = listener.stop() if inspect.isawaitable(future): - yield future + await future for i in range(20): # If there are still handlers running at this point, give them a # second to finish gracefully themselves, otherwise... if any(self._comms.values()): - yield asyncio.sleep(0.05) + await asyncio.sleep(0.05) else: break - yield self.rpc.close() - yield [comm.close() for comm in list(self._comms)] # then forcefully close + await self.rpc.close() + await asyncio.gather( + [comm.close() for comm in self._comms.values()] + ) # then forcefully close for cb in self._ongoing_coroutines: cb.cancel() for i in range(10): if all(c.cancelled() for c in self._ongoing_coroutines): break else: - yield asyncio.sleep(0.01) + await asyncio.sleep(0.01) self._event_finished.set() From b20105dd6bd40cb4cc53bf2584c19bc0e8eff520 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 18 May 2022 16:30:39 +0200 Subject: [PATCH 2/3] Fix gather --- distributed/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/core.py b/distributed/core.py index 3baad89f730..fa93c761c22 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -685,7 +685,7 @@ async def close(self): break await self.rpc.close() await asyncio.gather( - [comm.close() for comm in self._comms.values()] + *(comm.close() for comm in self._comms.values()) ) # then forcefully close for cb in self._ongoing_coroutines: cb.cancel() From 256492446c719fa8078809c323348108a5dda84f Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 18 May 2022 16:36:00 +0200 Subject: [PATCH 3/3] Minor --- distributed/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/core.py b/distributed/core.py index fa93c761c22..fa3463de3e1 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -685,7 +685,7 @@ async def close(self): break await self.rpc.close() await asyncio.gather( - *(comm.close() for comm in self._comms.values()) + *(comm.close() for comm in self._comms) ) # then forcefully close for cb in self._ongoing_coroutines: cb.cancel()