Skip to content

Commit

Permalink
pythongh-126434: Continue process signals on dedicated thread, don't …
Browse files Browse the repository at this point in the history
…bubble exception on main thread
  • Loading branch information
ivarref committed Dec 3, 2024
1 parent c308344 commit 5a43697
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 110 deletions.
85 changes: 54 additions & 31 deletions Lib/signal.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import _signal
import os
from _signal import *
from enum import IntEnum as _IntEnum
import threading
import queue
import traceback

_globals = globals()

Expand Down Expand Up @@ -45,43 +45,65 @@ def _enum_to_int(value):
except (ValueError, TypeError):
return value

_init_lock = threading.Lock()
_signal_queue = queue.SimpleQueue() # SimpleQueue has reentrant put, so it can safely be called from signal handlers. https://github.com/python/cpython/issues/59181
_bubble_queue = queue.SimpleQueue()
_signal_thread = None
_signo_to_handler = {}

def _signal_queue_handler():
assert threading.current_thread() is not threading.main_thread()
global _signal_queue, _signo_to_handler, _bubble_queue
while True:
(signo, stack_frame) = _signal_queue.get()
try:
handler = _signo_to_handler.get(signo, None)
handler(signo, stack_frame)
except Exception as e:
_bubble_queue.put(e)
# _signal.raise_signal(SIGTERM) # does not work when using event.wait()
# _thread.interrupt_main(SIGTERM) # does not work when using event.wait()
os.kill(os.getpid(), signo)

def _init_signal_thread():
assert threading.current_thread() is threading.main_thread()
global _signal_thread, _init_lock
with _init_lock:
if _signal_thread is None:
_signal_thread = threading.Thread(target=_signal_queue_handler, daemon=True)
_signal_thread.start()
global _signal_thread
if _signal_thread is None:
_signal_thread = threading.Thread(target=_signal_queue_handler, daemon=True)
_signal_thread.name = 'SignalHandlerThread'
_signal_thread.start()

def _push_signal_to_queue_handler(signo, stack_frame):
def _push_signal_to_queue_handler(signo, _stack_frame):
assert threading.current_thread() is threading.main_thread()
global _signal_queue
_signal_queue.put(signo)

def _sigint_to_str(signo):
for x in valid_signals():
if x == signo:
return x.name
raise RuntimeError('Could not find signal name')

def _log_missing_signal_handler(signo):
import logging
logger = logging.getLogger(__name__)
str_name = ''
for x in valid_signals():
if x == signo:
str_name = x.name
logger.warning('Handler for signal.%s (%d) was not found.', str_name, signo)

def _stop_signal_thread():
global _signal_thread, _signal_queue
if _signal_thread is not None:
_signal_queue.put('STOP_SIGNAL_HANDLER')

def _signal_queue_handler():
try:
global _bubble_queue
bubble_exception = _bubble_queue.get(block=False)
raise bubble_exception
except queue.Empty:
global _signal_queue
_signal_queue.put((signo, stack_frame))
assert threading.current_thread() is not threading.main_thread()
global _signal_queue, _signo_to_handler
while True:
signo = _signal_queue.get()
if signo == 'STOP_SIGNAL_HANDLER':
break
try:
handler = _signo_to_handler.get(signo, None)
if handler is not None:
handler(signo, None)
else:
_log_missing_signal_handler(signo)
except Exception:
traceback.print_exc()
pass
except:
pass
finally:
global _signal_thread
_signal_thread = None

# Similar to functools.wraps(), but only assign __doc__.
# __module__ should be preserved,
Expand All @@ -93,8 +115,9 @@ def decorator(wrapper):
return wrapper
return decorator

def signal(signalnum, handler, use_dedicated_thread=True):
assert threading.current_thread() is threading.main_thread()
def signal(signalnum, handler, use_dedicated_thread=False):
if use_dedicated_thread:
assert threading.current_thread() is threading.main_thread()
global _signo_to_handler
signal_int = _enum_to_int(signalnum)
old_handler = _signo_to_handler.get(signal_int, None)
Expand Down
5 changes: 0 additions & 5 deletions Lib/test/test_multiprocessing_event/__init__.py

This file was deleted.

73 changes: 0 additions & 73 deletions Lib/test/test_multiprocessing_event/test_event.py

This file was deleted.

1 change: 0 additions & 1 deletion Makefile.pre.in
Original file line number Diff line number Diff line change
Expand Up @@ -2520,7 +2520,6 @@ TESTSUBDIRS= idlelib/idle_test \
test/test_interpreters \
test/test_json \
test/test_module \
test/test_multiprocessing_event \
test/test_multiprocessing_fork \
test/test_multiprocessing_forkserver \
test/test_multiprocessing_spawn \
Expand Down
45 changes: 45 additions & 0 deletions bug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import os
import signal
import threading
import time
import multiprocessing

def sigint_self():
time.sleep(1)
print(f'{threading.current_thread().name}: Stopping PID {os.getpid()} with SIGINT')
os.kill(os.getpid(), signal.SIGINT)

def sigkill_self():
time.sleep(5)
print(f'{threading.current_thread().name}: Killing PID {os.getpid()} with SIGKILL')
os.kill(os.getpid(), signal.SIGKILL)

def run_signal_handler_dedicated_thread():
event = multiprocessing.Event()
def sigint_handler(_signo, _stack_frame):
try:
x = 1 / 0
print(f'{threading.current_thread().name}: sigint_handler is setting event')
event.set()
finally:
print(f'{threading.current_thread().name}: sigint_handler is done')

def sigterm_handler(_signo, _stack_frame):
print(f'{threading.current_thread().name}: sigterm_handler is running')
pass

signal.signal(signal.SIGTERM, sigterm_handler)
signal.signal(signal.SIGINT, sigint_handler)

threading.Thread(target=sigint_self, daemon=True).start()
threading.Thread(target=sigkill_self, daemon=True).start() # Used for debugging only.

print(f'{threading.current_thread().name}: Waiting on event. PID = {os.getpid()}')
event.wait()
print(f'{threading.current_thread().name}: Waiting is done')

if __name__ == '__main__':
try:
run_signal_handler_dedicated_thread()
finally:
print(f'{threading.current_thread().name}: Exiting')

0 comments on commit 5a43697

Please sign in to comment.