Skip to content

Commit

Permalink
Maybe fix leaking task from client
Browse files Browse the repository at this point in the history
  • Loading branch information
gjoseph92 committed May 18, 2022
1 parent d2898fc commit d49d5ca
Showing 1 changed file with 6 additions and 8 deletions.
14 changes: 6 additions & 8 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,8 @@ async def _reconnect(self):
assert self.scheduler_comm.comm.closed()

self.status = "connecting"
if self.scheduler_comm:
await self.scheduler_comm.close()
self.scheduler_comm = None

for st in self.futures.values():
Expand Down Expand Up @@ -1287,6 +1289,7 @@ async def _ensure_connected(self, timeout=None):
if msg[0].get("warning"):
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))

assert not self.scheduler_comm, self.scheduler_comm
bcomm = BatchedSend(interval="10ms", name="Client")
bcomm.start(comm)
self.scheduler_comm = bcomm
Expand Down Expand Up @@ -1514,13 +1517,11 @@ async def _close(self, fast=False):
if self.get == dask.config.get("get", None):
del dask.config.config["get"]

if (
self.scheduler_comm
and self.scheduler_comm.comm
and not self.scheduler_comm.comm.closed()
):
if self.scheduler_comm:
self._send_to_scheduler({"op": "close-client"})
self._send_to_scheduler({"op": "close-stream"})
await self.scheduler_comm.close()
self.scheduler_comm = None

current_task = asyncio.current_task()
handle_report_task = self._handle_report_task
Expand All @@ -1533,9 +1534,6 @@ async def _close(self, fast=False):
with suppress(asyncio.CancelledError, TimeoutError):
await asyncio.wait_for(asyncio.shield(handle_report_task), 0.1)

if self.scheduler_comm:
await self.scheduler_comm.close()

for key in list(self.futures):
self._release_key(key=key)

Expand Down

0 comments on commit d49d5ca

Please sign in to comment.