Skip to content

Commit

Permalink
Fix a race condition in trio quic shutdown.
Browse files Browse the repository at this point in the history
It was possible to have a "lost wakeup" situation where we had stuff to
send but the trio worker was blocked indefinitely in the receive.

There is no test for this as the race is very race-y and I can't reproduce it
reliably in the test suite, though I was able to do reliable replication a different
way when debugging.

I also reordered event processing to happen after timer handling but before sending
in the trio and sync quic code.  The async code already worked this way due to its
different struture and needed no changes.
  • Loading branch information
rthalley committed Oct 27, 2023
1 parent cb83902 commit 277ee25
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
3 changes: 2 additions & 1 deletion dns/quic/_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,15 @@ def _worker(self):
key.data()
with self._lock:
self._handle_timer(expiration)
self._handle_events()
with self._lock:
datagrams = self._connection.datagrams_to_send(time.time())
for datagram, _ in datagrams:
try:
self._socket.send(datagram)
except BlockingIOError:
# we let QUIC handle any lossage
pass
self._handle_events()
finally:
with self._lock:
self._done = True
Expand Down
17 changes: 16 additions & 1 deletion dns/quic/_trio.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,23 +81,36 @@ def __init__(self, connection, address, port, source, source_port, manager=None)
self._handshake_complete = trio.Event()
self._run_done = trio.Event()
self._worker_scope = None
self._send_pending = False

async def _worker(self):
try:
await self._socket.connect(self._peer)
while not self._done:
(expiration, interval) = self._get_timer_values(False)
if self._send_pending:
# Do not block forever if sends are pending. Even though we
# have a wake-up mechanism if we've already started the blocking
# read, the possibility of context switching in send means that
# more writes can happen while we have no wake up context, so
# we need self._send_pending to avoid (effectively) a "lost wakeup"
# race.
interval = 0.0
with trio.CancelScope(
deadline=trio.current_time() + interval
) as self._worker_scope:
datagram = await self._socket.recv(QUIC_MAX_DATAGRAM)
self._connection.receive_datagram(datagram, self._peer, time.time())
self._worker_scope = None
self._handle_timer(expiration)
await self._handle_events()
# We clear this now, before sending anything, as sending can cause
# context switches that do more sends. We want to know if that
# happens so we don't block a long time on the recv() above.
self._send_pending = False
datagrams = self._connection.datagrams_to_send(time.time())
for datagram, _ in datagrams:
await self._socket.send(datagram)
await self._handle_events()
finally:
self._done = True
self._handshake_complete.set()
Expand Down Expand Up @@ -129,6 +142,7 @@ async def _handle_events(self):

async def write(self, stream, data, is_end=False):
self._connection.send_stream_data(stream, data, is_end)
self._send_pending = True
if self._worker_scope is not None:
self._worker_scope.cancel()

Expand Down Expand Up @@ -159,6 +173,7 @@ async def close(self):
self._manager.closed(self._peer[0], self._peer[1])
self._closed = True
self._connection.close()
self._send_pending = True
if self._worker_scope is not None:
self._worker_scope.cancel()
await self._run_done.wait()
Expand Down

0 comments on commit 277ee25

Please sign in to comment.