Skip to content

Commit

Permalink
bpo-32528: Make asyncio.CancelledError a BaseException. (GH-13528)
Browse files Browse the repository at this point in the history
This will address the common mistake many asyncio users make:
an "except Exception" clause breaking Tasks cancellation.

In addition to this change, we stop inheriting asyncio.TimeoutError
and asyncio.InvalidStateError from their concurrent.futures.*
counterparts.  There's no point for these exceptions to share the
inheritance chain.

In 3.9 we'll focus on implementing supervisors and cancel scopes,
which should allow better handling of all exceptions, including
SystemExit and KeyboardInterrupt
  • Loading branch information
1st1 authored May 27, 2019
1 parent 16cefb0 commit 431b540
Show file tree
Hide file tree
Showing 16 changed files with 146 additions and 66 deletions.
16 changes: 11 additions & 5 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def _interleave_addrinfos(addrinfos, first_address_family_count=1):
def _run_until_complete_cb(fut):
if not fut.cancelled():
exc = fut.exception()
if isinstance(exc, BaseException) and not isinstance(exc, Exception):
if isinstance(exc, (SystemExit, KeyboardInterrupt)):
# Issue #22429: run_forever() already finished, no need to
# stop it.
return
Expand Down Expand Up @@ -1196,7 +1196,7 @@ async def start_tls(self, transport, protocol, sslcontext, *,

try:
await waiter
except Exception:
except BaseException:
transport.close()
conmade_cb.cancel()
resume_cb.cancel()
Expand Down Expand Up @@ -1710,7 +1710,9 @@ def call_exception_handler(self, context):
if self._exception_handler is None:
try:
self.default_exception_handler(context)
except Exception:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException:
# Second protection layer for unexpected errors
# in the default implementation, as well as for subclassed
# event loops with overloaded "default_exception_handler".
Expand All @@ -1719,7 +1721,9 @@ def call_exception_handler(self, context):
else:
try:
self._exception_handler(self, context)
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
# Exception in the user set custom exception handler.
try:
# Let's try default handler.
Expand All @@ -1728,7 +1732,9 @@ def call_exception_handler(self, context):
'exception': exc,
'context': context,
})
except Exception:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException:
# Guard 'default_exception_handler' in case it is
# overloaded.
logger.error('Exception in default exception handler '
Expand Down
4 changes: 3 additions & 1 deletion Lib/asyncio/base_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ async def _connect_pipes(self, waiter):
for callback, data in self._pending_calls:
loop.call_soon(callback, *data)
self._pending_calls = None
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
if waiter is not None and not waiter.cancelled():
waiter.set_exception(exc)
else:
Expand Down
4 changes: 3 additions & 1 deletion Lib/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ def cancelled(self):
def _run(self):
try:
self._context.run(self._callback, *self._args)
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
cb = format_helpers._format_callback_source(
self._callback, self._args)
msg = f'Exception in callback {cb}'
Expand Down
9 changes: 3 additions & 6 deletions Lib/asyncio/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,16 @@
'IncompleteReadError', 'LimitOverrunError',
'SendfileNotAvailableError')

import concurrent.futures
from . import base_futures


class CancelledError(concurrent.futures.CancelledError):
class CancelledError(BaseException):
"""The Future or Task was cancelled."""


class TimeoutError(concurrent.futures.TimeoutError):
class TimeoutError(Exception):
"""The operation exceeded the given deadline."""


class InvalidStateError(concurrent.futures.InvalidStateError):
class InvalidStateError(Exception):
"""The operation is not allowed in this state."""


Expand Down
12 changes: 9 additions & 3 deletions Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,9 @@ def _eof_received(self):

try:
keep_open = self._protocol.eof_received()
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self._fatal_error(
exc, 'Fatal error: protocol.eof_received() call failed.')
return
Expand All @@ -235,7 +237,9 @@ def _data_received(self, data):
if isinstance(self._protocol, protocols.BufferedProtocol):
try:
protocols._feed_data_to_buffered_proto(self._protocol, data)
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self._fatal_error(exc,
'Fatal error: protocol.buffer_updated() '
'call failed.')
Expand Down Expand Up @@ -625,7 +629,9 @@ def _loop_self_reading(self, f=None):
except exceptions.CancelledError:
# _close_self_pipe() has been called, stop waiting for data
return
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self.call_exception_handler({
'message': 'Error on reading from the event loop self pipe',
'exception': exc,
Expand Down
76 changes: 56 additions & 20 deletions Lib/asyncio/selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,14 @@ async def _accept_connection2(

try:
await waiter
except:
except BaseException:
transport.close()
raise
# It's now up to the protocol to handle the connection.

# It's now up to the protocol to handle the connection.
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
if self._debug:
context = {
'message':
Expand Down Expand Up @@ -370,7 +372,9 @@ def _sock_recv(self, fut, sock, n):
data = sock.recv(n)
except (BlockingIOError, InterruptedError):
return # try again next time
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
fut.set_exception(exc)
else:
fut.set_result(data)
Expand Down Expand Up @@ -404,7 +408,9 @@ def _sock_recv_into(self, fut, sock, buf):
nbytes = sock.recv_into(buf)
except (BlockingIOError, InterruptedError):
return # try again next time
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
fut.set_exception(exc)
else:
fut.set_result(nbytes)
Expand Down Expand Up @@ -447,7 +453,9 @@ def _sock_sendall(self, fut, sock, view, pos):
n = sock.send(view[start:])
except (BlockingIOError, InterruptedError):
return
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
fut.set_exception(exc)
return

Expand Down Expand Up @@ -487,7 +495,9 @@ def _sock_connect(self, fut, sock, address):
fut.add_done_callback(
functools.partial(self._sock_write_done, fd))
self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
fut.set_exception(exc)
else:
fut.set_result(None)
Expand All @@ -507,7 +517,9 @@ def _sock_connect_cb(self, fut, sock, address):
except (BlockingIOError, InterruptedError):
# socket is still registered, the callback will be retried later
pass
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
fut.set_exception(exc)
else:
fut.set_result(None)
Expand Down Expand Up @@ -537,7 +549,9 @@ def _sock_accept(self, fut, registered, sock):
conn.setblocking(False)
except (BlockingIOError, InterruptedError):
self.add_reader(fd, self._sock_accept, fut, True, sock)
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
fut.set_exception(exc)
else:
fut.set_result((conn, address))
Expand Down Expand Up @@ -785,7 +799,9 @@ def _read_ready__get_buffer(self):
buf = self._protocol.get_buffer(-1)
if not len(buf):
raise RuntimeError('get_buffer() returned an empty buffer')
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self._fatal_error(
exc, 'Fatal error: protocol.get_buffer() call failed.')
return
Expand All @@ -794,7 +810,9 @@ def _read_ready__get_buffer(self):
nbytes = self._sock.recv_into(buf)
except (BlockingIOError, InterruptedError):
return
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self._fatal_error(exc, 'Fatal read error on socket transport')
return

Expand All @@ -804,7 +822,9 @@ def _read_ready__get_buffer(self):

try:
self._protocol.buffer_updated(nbytes)
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self._fatal_error(
exc, 'Fatal error: protocol.buffer_updated() call failed.')

Expand All @@ -815,7 +835,9 @@ def _read_ready__data_received(self):
data = self._sock.recv(self.max_size)
except (BlockingIOError, InterruptedError):
return
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self._fatal_error(exc, 'Fatal read error on socket transport')
return

Expand All @@ -825,7 +847,9 @@ def _read_ready__data_received(self):

try:
self._protocol.data_received(data)
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self._fatal_error(
exc, 'Fatal error: protocol.data_received() call failed.')

Expand All @@ -835,7 +859,9 @@ def _read_ready__on_eof(self):

try:
keep_open = self._protocol.eof_received()
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self._fatal_error(
exc, 'Fatal error: protocol.eof_received() call failed.')
return
Expand Down Expand Up @@ -871,7 +897,9 @@ def write(self, data):
n = self._sock.send(data)
except (BlockingIOError, InterruptedError):
pass
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self._fatal_error(exc, 'Fatal write error on socket transport')
return
else:
Expand All @@ -894,7 +922,9 @@ def _write_ready(self):
n = self._sock.send(self._buffer)
except (BlockingIOError, InterruptedError):
pass
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self._loop._remove_writer(self._sock_fd)
self._buffer.clear()
self._fatal_error(exc, 'Fatal write error on socket transport')
Expand Down Expand Up @@ -970,7 +1000,9 @@ def _read_ready(self):
pass
except OSError as exc:
self._protocol.error_received(exc)
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self._fatal_error(exc, 'Fatal read error on datagram transport')
else:
self._protocol.datagram_received(data, addr)
Expand Down Expand Up @@ -1007,7 +1039,9 @@ def sendto(self, data, addr=None):
except OSError as exc:
self._protocol.error_received(exc)
return
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self._fatal_error(
exc, 'Fatal write error on datagram transport')
return
Expand All @@ -1030,7 +1064,9 @@ def _sendto_ready(self):
except OSError as exc:
self._protocol.error_received(exc)
return
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
self._fatal_error(
exc, 'Fatal write error on datagram transport')
return
Expand Down
16 changes: 12 additions & 4 deletions Lib/asyncio/sslproto.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,9 @@ def data_received(self, data):

try:
ssldata, appdata = self._sslpipe.feed_ssldata(data)
except Exception as e:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as e:
self._fatal_error(e, 'SSL error in data received')
return

Expand All @@ -542,7 +544,9 @@ def data_received(self, data):
self._app_protocol, chunk)
else:
self._app_protocol.data_received(chunk)
except Exception as ex:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as ex:
self._fatal_error(
ex, 'application protocol failed to receive SSL data')
return
Expand Down Expand Up @@ -628,7 +632,9 @@ def _on_handshake_complete(self, handshake_exc):
raise handshake_exc

peercert = sslobj.getpeercert()
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
if isinstance(exc, ssl.CertificateError):
msg = 'SSL handshake failed on verifying the certificate'
else:
Expand Down Expand Up @@ -691,7 +697,9 @@ def _process_write_backlog(self):
# delete it and reduce the outstanding buffer size.
del self._write_backlog[0]
self._write_buffer_size -= len(data)
except Exception as exc:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
if self._in_handshake:
# Exceptions will be re-raised in _on_handshake_complete.
self._on_handshake_complete(exc)
Expand Down
4 changes: 3 additions & 1 deletion Lib/asyncio/staggered.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ async def run_one_coro(

try:
result = await coro_fn()
except Exception as e:
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as e:
exceptions[this_index] = e
this_failed.set() # Kickstart the next coroutine
else:
Expand Down
Loading

0 comments on commit 431b540

Please sign in to comment.