Skip to content

Commit

Permalink
Fix KeyboardInterrupt handling logic.
Browse files Browse the repository at this point in the history
When uvloop is run in the main thread we *always* want to set up a
self-pipe and a signal wakeup FD.  That's the only way how libuv
can be notified that a ^C happened and break away from selecting
on sockets.

asyncio does not need to do that, as the 'selectors' module it uses
is already aware of the way Python implements ^C handling.

This translates to a slightly different behavior between asyncio &
uvloop:

1. uvloop needs to always call signal.set_wakeup_fd() when run in the
  main thread;

2. asyncio only needs to call signal.set_wakeup_fd() when a user
  registers a signal handler.

(2) means that if the user had not set up any signals, the signal
wakeup FD stays the same between different asyncio runs.  This commit
fixes uvloop signal implementation to make sure that uvloop behaves
the same way as asyncio in regards to signal wakeup FD between the
loop runs.  It also ensures that uvloop always have a proper
self-pipe set up so that ^C is always supported when it is run in
the main thread.

Issue #295.
  • Loading branch information
1st1 committed Oct 29, 2019
1 parent 6476aad commit c32c703
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 95 deletions.
2 changes: 1 addition & 1 deletion tests/test_dealloc.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async def foo():
return 42
def main():
asyncio.set_event_loop(uvloop.new_event_loop())
uvloop.install()
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(foo())
Expand Down
57 changes: 27 additions & 30 deletions tests/test_regr1.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,36 +74,33 @@ def on_alarm(self, sig, fr):
raise FailedTestError

def run_test(self):
try:
for i in range(10):
for threaded in [True, False]:
if threaded:
qin, qout = queue.Queue(), queue.Queue()
threading.Thread(
target=run_server,
args=(qin, qout),
daemon=True).start()
else:
qin = multiprocessing.Queue()
qout = multiprocessing.Queue()
multiprocessing.Process(
target=run_server,
args=(qin, qout),
daemon=True).start()

addr = qout.get()
loop = self.new_loop()
asyncio.set_event_loop(loop)
loop.create_task(
loop.create_connection(
lambda: EchoClientProtocol(loop),
host=addr[0], port=addr[1]))
loop.run_forever()
loop.close()
qin.put('stop')
qout.get()
finally:
loop.close()
for i in range(10):
for threaded in [True, False]:
if threaded:
qin, qout = queue.Queue(), queue.Queue()
threading.Thread(
target=run_server,
args=(qin, qout),
daemon=True).start()
else:
qin = multiprocessing.Queue()
qout = multiprocessing.Queue()
multiprocessing.Process(
target=run_server,
args=(qin, qout),
daemon=True).start()

addr = qout.get()
loop = self.new_loop()
asyncio.set_event_loop(loop)
loop.create_task(
loop.create_connection(
lambda: EchoClientProtocol(loop),
host=addr[0], port=addr[1]))
loop.run_forever()
loop.close()
qin.put('stop')
qout.get()

@unittest.skipIf(
multiprocessing.get_start_method(False) == 'spawn',
Expand Down
39 changes: 39 additions & 0 deletions tests/test_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,45 @@ async def worker():
loop = """ + self.NEW_LOOP + """
asyncio.set_event_loop(loop)
loop.create_task(worker())
try:
loop.run_forever()
finally:
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.close()
"""

proc = await asyncio.create_subprocess_exec(
sys.executable, b'-W', b'ignore', b'-c', PROG,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)

await proc.stdout.readline()
time.sleep(DELAY)
proc.send_signal(signal.SIGINT)
out, err = await proc.communicate()
self.assertIn(b'KeyboardInterrupt', err)

self.loop.run_until_complete(runner())

@tb.silence_long_exec_warning()
def test_signals_sigint_uvcode_two_loop_runs(self):
async def runner():
PROG = R"""\
import asyncio
import uvloop
srv = None
async def worker():
global srv
cb = lambda *args: None
srv = await asyncio.start_server(cb, '127.0.0.1', 0)
loop = """ + self.NEW_LOOP + """
asyncio.set_event_loop(loop)
loop.run_until_complete(worker())
print('READY', flush=True)
try:
loop.run_forever()
finally:
Expand Down
8 changes: 5 additions & 3 deletions uvloop/loop.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ cdef class Loop:
object _ssock
object _csock
bint _listening_signals
int _old_signal_wakeup_id

set _timers
dict _polls
Expand Down Expand Up @@ -149,6 +150,8 @@ cdef class Loop:

cdef void _handle_exception(self, object ex)

cdef inline _is_main_thread(self)

cdef inline _new_future(self)
cdef inline _check_signal(self, sig)
cdef inline _check_closed(self)
Expand Down Expand Up @@ -186,10 +189,9 @@ cdef class Loop:

cdef _sock_set_reuseport(self, int fd)

cdef _setup_signals(self)
cdef _setup_or_resume_signals(self)
cdef _shutdown_signals(self)
cdef _recv_signals_start(self)
cdef _recv_signals_stop(self)
cdef _pause_signals(self)

cdef _handle_signal(self, sig)
cdef _read_from_self(self)
Expand Down
135 changes: 74 additions & 61 deletions uvloop/loop.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ cdef class Loop:
self._ssock = self._csock = None
self._signal_handlers = {}
self._listening_signals = False
self._old_signal_wakeup_id = -1

self._coroutine_debug_set = False

Expand All @@ -183,6 +184,9 @@ cdef class Loop:

self._servers = set()

cdef inline _is_main_thread(self):
return MAIN_THREAD_ID == PyThread_get_thread_ident()

def __init__(self):
self.set_debug((not sys_ignore_environment
and bool(os_environ.get('PYTHONASYNCIODEBUG'))))
Expand Down Expand Up @@ -241,34 +245,40 @@ cdef class Loop:

self._debug_exception_handler_cnt = 0

cdef _setup_signals(self):
cdef int old_wakeup_fd
cdef _setup_or_resume_signals(self):
if not self._is_main_thread():
return

if self._listening_signals:
return
raise RuntimeError('signals handling has been already setup')

if self._ssock is not None:
raise RuntimeError('self-pipe exists before loop run')

# Create a self-pipe and call set_signal_wakeup_fd() with one
# of its ends. This is needed so that libuv knows that it needs
# to wakeup on ^C (no matter if the SIGINT handler is still the
# standard Python's one or or user set their own.)

self._ssock, self._csock = socket_socketpair()
self._ssock.setblocking(False)
self._csock.setblocking(False)
try:
old_wakeup_fd = _set_signal_wakeup_fd(self._csock.fileno())
except (OSError, ValueError):
# Not the main thread
self._ssock.setblocking(False)
self._csock.setblocking(False)

fileno = self._csock.fileno()

self._old_signal_wakeup_id = _set_signal_wakeup_fd(fileno)
except Exception:
# Out of all statements in the try block, only the
# "_set_signal_wakeup_fd()" call can fail, but it shouldn't,
# as we ensure that the current thread is the main thread.
# Still, if something goes horribly wrong we want to clean up
# the socket pair.
self._ssock.close()
self._csock.close()
self._ssock = self._csock = None
return

self._listening_signals = True
return old_wakeup_fd

cdef _recv_signals_start(self):
cdef object old_wakeup_fd = None
if self._ssock is None:
old_wakeup_fd = self._setup_signals()
if self._ssock is None:
# Not the main thread.
return
self._ssock = None
self._csock = None
raise

self._add_reader(
self._ssock,
Expand All @@ -277,38 +287,49 @@ cdef class Loop:
"Loop._read_from_self",
<method_t>self._read_from_self,
self))
return old_wakeup_fd

cdef _recv_signals_stop(self):
if self._ssock is None:
return
self._listening_signals = True

self._remove_reader(self._ssock)
cdef _pause_signals(self):
if not self._is_main_thread():
if self._listening_signals:
raise RuntimeError(
'cannot pause signals handling; no longer running in '
'the main thread')
else:
return

cdef _shutdown_signals(self):
if not self._listening_signals:
return
raise RuntimeError('signals handling has not been setup')

for sig in list(self._signal_handlers):
self.remove_signal_handler(sig)

if not self._listening_signals:
# `remove_signal_handler` will call `_shutdown_signals` when
# removing last signal handler.
return
self._listening_signals = False

try:
signal_set_wakeup_fd(-1)
except (ValueError, OSError) as exc:
aio_logger.info('set_wakeup_fd(-1) failed: %s', exc)
_set_signal_wakeup_fd(self._old_signal_wakeup_id)

self._remove_reader(self._ssock)
self._ssock.close()
self._csock.close()
self._ssock = None
self._csock = None

self._listening_signals = False
cdef _shutdown_signals(self):
if not self._is_main_thread():
if self._signal_handlers:
aio_logger.warning(
'cannot cleanup signal handlers: closing the event loop '
'in a non-main OS thread')
return

if self._listening_signals:
raise RuntimeError(
'cannot shutdown signals handling as it has not been paused')

if self._ssock:
raise RuntimeError(
'self-pipe was not cleaned up after loop was run')

for sig in list(self._signal_handlers):
self.remove_signal_handler(sig)

def __sighandler(self, signum, frame):
self._signals.add(signum)
Expand Down Expand Up @@ -451,7 +472,6 @@ cdef class Loop:

cdef _run(self, uv.uv_run_mode mode):
cdef int err
cdef object old_wakeup_fd

if self._closed == 1:
raise RuntimeError('unable to start the loop; it was closed')
Expand All @@ -474,7 +494,7 @@ cdef class Loop:
self.handler_check__exec_writes.start()
self.handler_idle.start()

old_wakeup_fd = self._recv_signals_start()
self._setup_or_resume_signals()

if aio_set_running_loop is not None:
aio_set_running_loop(self)
Expand All @@ -484,13 +504,11 @@ cdef class Loop:
if aio_set_running_loop is not None:
aio_set_running_loop(None)

self._recv_signals_stop()
if old_wakeup_fd is not None:
signal_set_wakeup_fd(old_wakeup_fd)

self.handler_check__exec_writes.stop()
self.handler_idle.stop()

self._pause_signals()

self._thread_is_main = 0
self._thread_id = 0
self._running = 0
Expand Down Expand Up @@ -2794,10 +2812,10 @@ cdef class Loop:
cdef:
Handle h

if not self._listening_signals:
self._setup_signals()
if not self._listening_signals:
raise ValueError('set_wakeup_fd only works in main thread')
if not self._is_main_thread():
raise ValueError(
'add_signal_handler() can only be called from '
'the main thread')

if (aio_iscoroutine(callback)
or aio_iscoroutinefunction(callback)):
Expand Down Expand Up @@ -2829,14 +2847,6 @@ cdef class Loop:

self._check_signal(sig)
self._check_closed()
try:
# set_wakeup_fd() raises ValueError if this is not the
# main thread. By calling it early we ensure that an
# event loop running in another thread cannot add a signal
# handler.
_set_signal_wakeup_fd(self._csock.fileno())
except (ValueError, OSError) as exc:
raise RuntimeError(str(exc))

h = new_Handle(self, callback, args or None, None)
self._signal_handlers[sig] = h
Expand Down Expand Up @@ -2866,6 +2876,12 @@ cdef class Loop:
Return True if a signal handler was removed, False if not.
"""

if not self._is_main_thread():
raise ValueError(
'remove_signal_handler() can only be called from '
'the main thread')

self._check_signal(sig)

if not self._listening_signals:
Expand All @@ -2889,9 +2905,6 @@ cdef class Loop:
else:
raise

if not self._signal_handlers:
self._shutdown_signals()

return True

@cython.iterable_coroutine
Expand Down

0 comments on commit c32c703

Please sign in to comment.