Skip to content
This repository was archived by the owner on Nov 23, 2017. It is now read-only.

Handle BaseException in event loop. #305

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 1 addition & 15 deletions asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,6 @@ def _check_resolved_address(sock, address):


def _run_until_complete_cb(fut):
exc = fut._exception
if (isinstance(exc, BaseException)
and not isinstance(exc, Exception)):
# Issue #22429: run_forever() already finished, no need to
# stop it.
return
fut._loop.stop()


Expand Down Expand Up @@ -321,15 +315,7 @@ def run_until_complete(self, future):
future._log_destroy_pending = False

future.add_done_callback(_run_until_complete_cb)
try:
self.run_forever()
except:
if new_task and future.done() and not future.cancelled():
# The coroutine raised a BaseException. Consume the exception
# to not log a warning, the caller doesn't have access to the
# local task.
future.exception()
raise
self.run_forever()
future.remove_done_callback(_run_until_complete_cb)
if not future.done():
raise RuntimeError('Event loop stopped before Future completed.')
Expand Down
2 changes: 1 addition & 1 deletion asyncio/base_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def _connect_pipes(self, waiter):
for callback, data in self._pending_calls:
loop.call_soon(callback, *data)
self._pending_calls = None
except Exception as exc:
except BaseException as exc:
if waiter is not None and not waiter.cancelled():
waiter.set_exception(exc)
else:
Expand Down
2 changes: 1 addition & 1 deletion asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def cancel(self):
def _run(self):
try:
self._callback(*self._args)
except Exception as exc:
except BaseException as exc:
cb = _format_callback_source(self._callback, self._args)
msg = 'Exception in callback {}'.format(cb)
context = {
Expand Down
2 changes: 1 addition & 1 deletion asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ def _loop_self_reading(self, f=None):
except futures.CancelledError:
# _close_self_pipe() has been called, stop waiting for data
return
except Exception as exc:
except BaseException as exc:
self.call_exception_handler({
'message': 'Error on reading from the event loop self pipe',
'exception': exc,
Expand Down
34 changes: 15 additions & 19 deletions asyncio/selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def _accept_connection2(self, protocol_factory, conn, extra,
raise

# It's now up to the protocol to handle the connection.
except Exception as exc:
except BaseException as exc:
if self._debug:
context = {
'message': ('Error on transport creation '
Expand Down Expand Up @@ -334,7 +334,7 @@ def _sock_recv(self, fut, registered, sock, n):
data = sock.recv(n)
except (BlockingIOError, InterruptedError):
self.add_reader(fd, self._sock_recv, fut, True, sock, n)
except Exception as exc:
except BaseException as exc:
fut.set_exception(exc)
else:
fut.set_result(data)
Expand Down Expand Up @@ -371,7 +371,7 @@ def _sock_sendall(self, fut, registered, sock, data):
n = sock.send(data)
except (BlockingIOError, InterruptedError):
n = 0
except Exception as exc:
except BaseException as exc:
fut.set_exception(exc)
return

Expand Down Expand Up @@ -417,7 +417,7 @@ def _sock_connect(self, fut, sock, address):
fut.add_done_callback(functools.partial(self._sock_connect_done,
fd))
self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
except Exception as exc:
except BaseException as exc:
fut.set_exception(exc)
else:
fut.set_result(None)
Expand All @@ -437,7 +437,7 @@ def _sock_connect_cb(self, fut, sock, address):
except (BlockingIOError, InterruptedError):
# socket is still registered, the callback will be retried later
pass
except Exception as exc:
except BaseException as exc:
fut.set_exception(exc)
else:
fut.set_result(None)
Expand Down Expand Up @@ -469,7 +469,7 @@ def _sock_accept(self, fut, registered, sock):
conn.setblocking(False)
except (BlockingIOError, InterruptedError):
self.add_reader(fd, self._sock_accept, fut, True, sock)
except Exception as exc:
except BaseException as exc:
fut.set_exception(exc)
else:
fut.set_result((conn, address))
Expand Down Expand Up @@ -664,7 +664,7 @@ def _read_ready(self):
data = self._sock.recv(self.max_size)
except (BlockingIOError, InterruptedError):
pass
except Exception as exc:
except BaseException as exc:
self._fatal_error(exc, 'Fatal read error on socket transport')
else:
if data:
Expand Down Expand Up @@ -702,7 +702,7 @@ def write(self, data):
n = self._sock.send(data)
except (BlockingIOError, InterruptedError):
pass
except Exception as exc:
except BaseException as exc:
self._fatal_error(exc, 'Fatal write error on socket transport')
return
else:
Expand All @@ -723,7 +723,7 @@ def _write_ready(self):
n = self._sock.send(self._buffer)
except (BlockingIOError, InterruptedError):
pass
except Exception as exc:
except BaseException as exc:
self._loop.remove_writer(self._sock_fd)
self._buffer.clear()
self._fatal_error(exc, 'Fatal write error on socket transport')
Expand Down Expand Up @@ -818,10 +818,6 @@ def _on_handshake(self, start_time):
self._loop.remove_writer(self._sock_fd)
self._sock.close()
self._wakeup_waiter(exc)
if isinstance(exc, Exception):
return
else:
raise

self._loop.remove_reader(self._sock_fd)
self._loop.remove_writer(self._sock_fd)
Expand All @@ -834,7 +830,7 @@ def _on_handshake(self, start_time):
self._sslcontext.verify_mode != ssl.CERT_NONE):
try:
ssl.match_hostname(peercert, self._server_hostname)
except Exception as exc:
except BaseException as exc:
if self._loop.get_debug():
logger.warning("%r: SSL handshake failed "
"on matching the hostname",
Expand Down Expand Up @@ -904,7 +900,7 @@ def _read_ready(self):
self._read_wants_write = True
self._loop.remove_reader(self._sock_fd)
self._loop.add_writer(self._sock_fd, self._write_ready)
except Exception as exc:
except BaseException as exc:
self._fatal_error(exc, 'Fatal read error on SSL transport')
else:
if data:
Expand Down Expand Up @@ -937,7 +933,7 @@ def _write_ready(self):
n = 0
self._loop.remove_writer(self._sock_fd)
self._write_wants_read = True
except Exception as exc:
except BaseException as exc:
self._loop.remove_writer(self._sock_fd)
self._buffer.clear()
self._fatal_error(exc, 'Fatal write error on SSL transport')
Expand Down Expand Up @@ -1004,7 +1000,7 @@ def _read_ready(self):
pass
except OSError as exc:
self._protocol.error_received(exc)
except Exception as exc:
except BaseException as exc:
self._fatal_error(exc, 'Fatal read error on datagram transport')
else:
self._protocol.datagram_received(data, addr)
Expand Down Expand Up @@ -1039,7 +1035,7 @@ def sendto(self, data, addr=None):
except OSError as exc:
self._protocol.error_received(exc)
return
except Exception as exc:
except BaseException as exc:
self._fatal_error(exc,
'Fatal write error on datagram transport')
return
Expand All @@ -1062,7 +1058,7 @@ def _sendto_ready(self):
except OSError as exc:
self._protocol.error_received(exc)
return
except Exception as exc:
except BaseException as exc:
self._fatal_error(exc,
'Fatal write error on datagram transport')
return
Expand Down
11 changes: 2 additions & 9 deletions asyncio/sslproto.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,11 +583,8 @@ def _on_handshake_complete(self, handshake_exc):
logger.warning("%r: SSL handshake failed",
self, exc_info=True)
self._transport.close()
if isinstance(exc, Exception):
self._wakeup_waiter(exc)
return
else:
raise
self._wakeup_waiter(exc)
return

if self._loop.get_debug():
dt = self._loop.time() - self._handshake_start_time
Expand Down Expand Up @@ -645,13 +642,9 @@ def _process_write_backlog(self):
self._write_buffer_size -= len(data)
except BaseException as exc:
if self._in_handshake:
# BaseExceptions will be re-raised in _on_handshake_complete.
self._on_handshake_complete(exc)
else:
self._fatal_error(exc, 'Fatal error on SSL transport')
if not isinstance(exc, Exception):
# BaseException
raise

def _fatal_error(self, exc, message='Fatal error on transport'):
# Should be called from exception handler only.
Expand Down
13 changes: 3 additions & 10 deletions asyncio/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
class Task(futures.Future):
"""A coroutine wrapped in a Future."""

# An important invariant maintained while a Task not done:
# An important invariant maintained while a Task is not done:
#
# - Either _fut_waiter is None, and _step() is scheduled;
# - or _fut_waiter is some Future, and _step() is *not* scheduled.
Expand Down Expand Up @@ -243,11 +243,8 @@ def _step(self, exc=None):
self.set_result(exc.value)
except futures.CancelledError as exc:
super().cancel() # I.e., Future.cancel(self).
except Exception as exc:
self.set_exception(exc)
except BaseException as exc:
self.set_exception(exc)
raise
else:
if isinstance(result, futures.Future):
# Yielded Future must come from Future.__iter__().
Expand Down Expand Up @@ -288,16 +285,12 @@ def _step(self, exc=None):
def _wakeup(self, future):
try:
future.result()
except Exception as exc:
except BaseException as exc:
# This may also be a cancellation.
self._step(exc)
else:
# Don't pass the value of `future.result()` explicitly,
# as `Future.__iter__` and `Future.__await__` don't need it.
# If we call `_step(value, None)` instead of `_step()`,
# Python eval loop would use `.send(value)` method call,
# instead of `__next__()`, which is slower for futures
# that return non-generator iterators from their `__iter__`.
self._step()
self = None # Needed to break cycles when an exception occurs.

Expand Down Expand Up @@ -719,7 +712,7 @@ def run_coroutine_threadsafe(coro, loop):
def callback():
try:
futures._chain_future(ensure_future(coro, loop=loop), future)
except Exception as exc:
except BaseException as exc:
if future.set_running_or_notify_cancel():
future.set_exception(exc)
raise
Expand Down
4 changes: 2 additions & 2 deletions asyncio/transports.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def _maybe_pause_protocol(self):
self._protocol_paused = True
try:
self._protocol.pause_writing()
except Exception as exc:
except BaseException as exc:
self._loop.call_exception_handler({
'message': 'protocol.pause_writing() failed',
'exception': exc,
Expand All @@ -265,7 +265,7 @@ def _maybe_resume_protocol(self):
self._protocol_paused = False
try:
self._protocol.resume_writing()
except Exception as exc:
except BaseException as exc:
self._loop.call_exception_handler({
'message': 'protocol.resume_writing() failed',
'exception': exc,
Expand Down
8 changes: 4 additions & 4 deletions asyncio/unix_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def _make_subprocess_transport(self, protocol, args, shell,
self._child_watcher_callback, transp)
try:
yield from waiter
except Exception as exc:
except BaseException as exc:
# Workaround CPython bug #23353: using yield/yield-from in an
# except block of a generator doesn't clear properly
# sys.exc_info()
Expand Down Expand Up @@ -500,7 +500,7 @@ def write(self, data):
n = os.write(self._fileno, data)
except (BlockingIOError, InterruptedError):
n = 0
except Exception as exc:
except BaseException as exc:
self._conn_lost += 1
self._fatal_error(exc, 'Fatal write error on pipe transport')
return
Expand All @@ -522,7 +522,7 @@ def _write_ready(self):
n = os.write(self._fileno, data)
except (BlockingIOError, InterruptedError):
self._buffer.append(data)
except Exception as exc:
except BaseException as exc:
self._conn_lost += 1
# Remove writer here, _fatal_error() doesn't it
# because _buffer is empty.
Expand Down Expand Up @@ -747,7 +747,7 @@ def attach_loop(self, loop):
def _sig_chld(self):
try:
self._do_waitpid_all()
except Exception as exc:
except BaseException as exc:
# self._loop should always be available here
# as '_sig_chld' is added as a signal handler
# in 'attach_loop'
Expand Down
2 changes: 1 addition & 1 deletion asyncio/windows_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ def _make_subprocess_transport(self, protocol, args, shell,
**kwargs)
try:
yield from waiter
except Exception as exc:
except BaseException as exc:
# Workaround CPython bug #23353: using yield/yield-from in an
# except block of a generator doesn't clear properly sys.exc_info()
err = exc
Expand Down
11 changes: 6 additions & 5 deletions tests/test_base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,8 @@ def raise_keyboard_interrupt():
self.loop.run_until_complete(raise_keyboard_interrupt())
except KeyboardInterrupt:
pass
else:
raise Exception("Expected KeyboardInterrupt")
self.loop.close()
support.gc_collect()

Expand All @@ -745,16 +747,15 @@ def raise_keyboard_interrupt():
self.loop.run_until_complete(raise_keyboard_interrupt())
except KeyboardInterrupt:
pass
else:
raise Exception("Expected KeyboardInterrupt")

def func():
self.loop.stop()
func.called = True
func.called = False
try:
self.loop.call_soon(func)
self.loop.run_forever()
except KeyboardInterrupt:
pass
self.loop.call_soon(func)
self.loop.run_forever()
self.assertTrue(func.called)

def test_single_selecter_event_callback_after_stopping(self):
Expand Down
4 changes: 2 additions & 2 deletions tests/test_selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1165,7 +1165,7 @@ def test_on_handshake_exc(self):
self.sslsock.do_handshake.side_effect = exc
with test_utils.disable_logger():
waiter = asyncio.Future(loop=self.loop)
transport = self.ssl_transport(waiter=waiter)
self.ssl_transport(waiter=waiter)
self.assertTrue(waiter.done())
self.assertIs(exc, waiter.exception())
self.assertTrue(self.sslsock.close.called)
Expand All @@ -1176,7 +1176,7 @@ def test_on_handshake_base_exc(self):
exc = BaseException()
self.sslsock.do_handshake.side_effect = exc
with test_utils.disable_logger():
self.assertRaises(BaseException, transport._on_handshake, 0)
transport._on_handshake(0)
self.assertTrue(self.sslsock.close.called)
self.assertTrue(waiter.done())
self.assertIs(exc, waiter.exception())
Expand Down
Loading