Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make BatchedSend restartable #6329

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
86a36d1
Test temporary network blips
gjoseph92 May 4, 2022
c93894f
Add names
fjetter May 5, 2022
37067e2
Fix `test_worker_reconnects_mid_compute`
gjoseph92 May 11, 2022
bd1e64f
use name
gjoseph92 May 11, 2022
c1543d7
fix misspelling
gjoseph92 May 11, 2022
1a330ba
improve some logs
gjoseph92 May 11, 2022
e319193
`__str__` is redundant
gjoseph92 May 11, 2022
1f7c56e
not closed until task is done
gjoseph92 May 11, 2022
8a76d43
document contracts, at least a little bit
gjoseph92 May 11, 2022
67cea68
send can take anything
gjoseph92 May 11, 2022
d9967b8
unnecessary comm closing before `remove_worker`
gjoseph92 May 11, 2022
2597e3a
fix race condition removing from `stream_comms`
gjoseph92 May 11, 2022
bbd8820
driveby: fix race condition with clients too
gjoseph92 May 11, 2022
fb8f1c9
Improve reasonableness of add_client disconnection
gjoseph92 May 11, 2022
b7824d3
note client closing is slow, but don't fix
gjoseph92 May 11, 2022
3d2374a
Merge branch 'main' into batched-send-restartable
gjoseph92 May 11, 2022
ef9dfcc
fixup! improve some logs
gjoseph92 May 12, 2022
bbfff7c
driveby: Catch multiple unhandled exceptions
gjoseph92 May 12, 2022
40e37fa
fixup! driveby: Catch multiple unhandled
gjoseph92 May 13, 2022
641f975
REVERTME: add AMM debug logs
gjoseph92 May 13, 2022
4fc02bc
driveby: fix Nanny shutdown assertion
gjoseph92 May 13, 2022
f8648cf
REVERTME: only run failing tests
gjoseph92 May 13, 2022
d77a4d0
fixup! REVERTME: only run failing tests
gjoseph92 May 13, 2022
6d2ff20
fixup! REVERTME: only run failing tests
gjoseph92 May 13, 2022
c14fee9
Revert "REVERTME: only run failing tests"
gjoseph92 May 13, 2022
d2898fc
Revert "REVERTME: add AMM debug logs"
gjoseph92 May 13, 2022
d49d5ca
Maybe fix leaking task from client
gjoseph92 May 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 74 additions & 47 deletions distributed/batched.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import asyncio
import logging
from collections import deque

from tornado import gen, locks
from tornado.ioloop import IOLoop

import dask
from dask.utils import parse_timedelta

Expand Down Expand Up @@ -36,17 +34,13 @@ class BatchedSend:
['Hello,', 'world!']
"""

# XXX why doesn't BatchedSend follow either the IOStream or Comm API?

def __init__(self, interval, loop=None, serializers=None):
# XXX is the loop arg useful?
self.loop = loop or IOLoop.current()
def __init__(self, interval, serializers=None, name=None):
self.interval = parse_timedelta(interval, default="ms")
self.waker = locks.Event()
self.stopped = locks.Event()
self.waker = asyncio.Event()
self.please_stop = False
self.buffer = []
self.comm = None
self.name = name
self.message_count = 0
self.batch_count = 0
self.byte_count = 0
Expand All @@ -55,30 +49,43 @@ def __init__(self, interval, loop=None, serializers=None):
maxlen=dask.config.get("distributed.comm.recent-messages-log-length")
)
self.serializers = serializers
self._consecutive_failures = 0
self._background_task = None

def start(self, comm):
# A `BatchedSend` instance can be closed and restarted multiple times with new `comm` objects.
# However, calling `start` on an already-running `BatchedSend` is an error.
if self._background_task and not self._background_task.done():
raise RuntimeError(f"Background task still running for {self!r}")
self.please_stop = False
self.waker.set()
self.next_deadline = None
self.comm = comm
self.loop.add_callback(self._background_send)

self._background_task = asyncio.create_task(
self._background_send(),
name=f"background-send-{self.name}",
)

def closed(self):
return self.comm and self.comm.closed()
return (self.comm is None or self.comm.closed()) and (
self._background_task is None or self._background_task.done()
)

def __repr__(self):
if self.closed():
return "<BatchedSend: closed>"
return f"<BatchedSend {self.name!r}: closed>"
else:
return "<BatchedSend: %d in buffer>" % len(self.buffer)

__str__ = __repr__
return f"<BatchedSend {self.name!r}: {len(self.buffer)} in buffer>"

@gen.coroutine
def _background_send(self):
async def _background_send(self):
while not self.please_stop:
try:
yield self.waker.wait(self.next_deadline)
timeout = None
if self.next_deadline:
timeout = self.next_deadline - time()
await asyncio.wait_for(self.waker.wait(), timeout=timeout)
self.waker.clear()
except gen.TimeoutError:
except asyncio.TimeoutError:
pass
if not self.buffer:
# Nothing to send
Expand All @@ -90,8 +97,9 @@ def _background_send(self):
payload, self.buffer = self.buffer, []
self.batch_count += 1
self.next_deadline = time() + self.interval

try:
nbytes = yield self.comm.write(
nbytes = await self.comm.write(
payload, serializers=self.serializers, on_error="raise"
)
if nbytes < 1e6:
Expand All @@ -100,7 +108,13 @@ def _background_send(self):
self.recent_message_log.append("large-message")
self.byte_count += nbytes
except CommClosedError:
logger.info("Batched Comm Closed %r", self.comm, exc_info=True)
logger.info(
(
f"Batched Comm Closed {self.comm!r} in {self!r}. Lost {len(payload)} messages, "
f"plus {len(self.buffer)} in buffer." # <-- due to upcoming `abort()`
),
exc_info=True,
)
break
except Exception:
# We cannot safely retry self.comm.write, as we have no idea
Expand All @@ -109,66 +123,79 @@ def _background_send(self):
# header has been written, but not the frame payload), therefore
# the only safe thing to do here is to abort the stream without
# any attempt to re-try `write`.
logger.exception("Error in batched write")
logger.exception(
f"Error in batched write in {self!r}. Lost {len(payload)} messages, "
f"plus {len(self.buffer)} in buffer."
)
break
finally:
payload = None # lose ref
else:
# nobreak. We've been gracefully closed.
self.stopped.set()
return

# If we've reached here, it means `break` was hit above and
# there was an exception when using `comm`.
# We can't close gracefully via `.close()` since we can't send messages.
# So we just abort.
# This means that any messages in our buffer our lost.
# To propagate exceptions, we rely on subsequent `BatchedSend.send`
# calls to raise CommClosedErrors.
self.stopped.set()
# The exception will not be propagated.
self.abort()

def send(self, *msgs) -> None:
"""Schedule a message for sending to the other side

This completes quickly and synchronously
This completes quickly and synchronously. (However, note that like all
`BatchedSend` methods, `send` is not threadsafe.)

Message delivery is *not* gauranteed. There is no way for callers to know when,
or whether, a particular message was received by the other side. When the
underlying comm closes, any currently-buffered messages (as well as data in the
socket's underlying buffer) will be lost.

`send` will never raise an error, even if the `BatchedSend` or underlying comm
is in a closed state.

While `closed` is True, all calls to `send` will be buffered until the next call
to `start`. However, calls to `send` made after the underlying comm has closed,
but before ``await close()`` has returned, may or may not be dropped.

Because `BatchedSend` will drop messages when the comm closes, users of
`BatchedSend` are expected to be implementing their own reconnection logic,
triggered when the comm closes. Reconnection often involves application logic
reconciling state, then calling `start` again with a new comm object.
"""
if self.comm is not None and self.comm.closed():
raise CommClosedError(f"Comm {self.comm!r} already closed.")

self.message_count += len(msgs)
self.buffer.extend(msgs)
# Avoid spurious wakeups if possible
if self.next_deadline is None:
self.waker.set()

@gen.coroutine
def close(self, timeout=None):
"""Flush existing messages and then close comm
if self.comm and not self.comm.closed() and self.next_deadline is None:
self.waker.set()

If set, raises `tornado.util.TimeoutError` after a timeout.
"""
if self.comm is None:
return
async def close(self):
"""Flush existing messages and then close comm"""
self.please_stop = True
self.waker.set()
yield self.stopped.wait(timeout=timeout)
if not self.comm.closed():

if self._background_task:
await self._background_task
self._background_task = None

if self.comm and not self.comm.closed():
try:
if self.buffer:
self.buffer, payload = [], self.buffer
yield self.comm.write(
await self.comm.write(
payload, serializers=self.serializers, on_error="raise"
)
except CommClosedError:
pass
yield self.comm.close()
await self.comm.close()

def abort(self):
if self.comm is None:
return
self.please_stop = True
self.buffer = []
self.waker.set()
if not self.comm.closed():
if self.comm and not self.comm.closed():
self.comm.abort()
20 changes: 7 additions & 13 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,8 @@ async def _reconnect(self):
assert self.scheduler_comm.comm.closed()

self.status = "connecting"
if self.scheduler_comm:
await self.scheduler_comm.close()
self.scheduler_comm = None

for st in self.futures.values():
Expand Down Expand Up @@ -1287,7 +1289,8 @@ async def _ensure_connected(self, timeout=None):
if msg[0].get("warning"):
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))

bcomm = BatchedSend(interval="10ms", loop=self.loop)
assert not self.scheduler_comm, self.scheduler_comm
bcomm = BatchedSend(interval="10ms", name="Client")
bcomm.start(comm)
self.scheduler_comm = bcomm
if self._set_as_default:
Expand Down Expand Up @@ -1514,13 +1517,11 @@ async def _close(self, fast=False):
if self.get == dask.config.get("get", None):
del dask.config.config["get"]

if (
self.scheduler_comm
and self.scheduler_comm.comm
and not self.scheduler_comm.comm.closed()
):
if self.scheduler_comm:
self._send_to_scheduler({"op": "close-client"})
self._send_to_scheduler({"op": "close-stream"})
await self.scheduler_comm.close()
self.scheduler_comm = None

current_task = asyncio.current_task()
handle_report_task = self._handle_report_task
Expand All @@ -1533,13 +1534,6 @@ async def _close(self, fast=False):
with suppress(asyncio.CancelledError, TimeoutError):
await asyncio.wait_for(asyncio.shield(handle_report_task), 0.1)

if (
self.scheduler_comm
and self.scheduler_comm.comm
and not self.scheduler_comm.comm.closed()
):
await self.scheduler_comm.close()

for key in list(self.futures):
self._release_key(key=key)

Expand Down
4 changes: 2 additions & 2 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -926,8 +926,8 @@ async def run():
loop.run_sync(do_stop)
finally:
with suppress(ValueError):
child_stop_q.put({"op": "close"}) # probably redundant
child_stop_q.put({"op": "stop"}) # usually redundant
with suppress(ValueError):
child_stop_q.close() # probably redundant
child_stop_q.close() # usually redundant
child_stop_q.join_thread()
thread.join(timeout=2)
Loading