Skip to content

Commit

Permalink
pythongh-126434: Process signals on dedicated thread
Browse files Browse the repository at this point in the history
  • Loading branch information
ivarref committed Nov 28, 2024
1 parent c15a45a commit c308344
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 62 deletions.
81 changes: 25 additions & 56 deletions Lib/multiprocessing/synchronize.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

__all__ = [
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
]
]

import threading
import sys
Expand Down Expand Up @@ -157,7 +157,7 @@ def __repr__(self):
except Exception:
value = 'unknown'
return '<%s(value=%s, maxvalue=%s)>' % \
(self.__class__.__name__, value, self._semlock.maxvalue)
(self.__class__.__name__, value, self._semlock.maxvalue)

#
# Non-recursive lock
Expand Down Expand Up @@ -253,7 +253,7 @@ def __repr__(self):

def wait(self, timeout=None):
assert self._lock._semlock._is_mine(), \
'must acquire() condition before using wait()'
'must acquire() condition before using wait()'

# indicate that this thread is going to sleep
self._sleeping_count.release()
Expand Down Expand Up @@ -328,72 +328,41 @@ def wait_for(self, predicate, timeout=None):
class Event(object):

def __init__(self, *, ctx):
self._flag = ctx.Value('i', 0)
# Allocate a ctypes.c_ulonglong to hold the set_id:
# Represents the C unsigned long long datatype.
# The constructor accepts an optional integer initializer; no overflow checking is done.
# From https://docs.python.org/3/library/ctypes.html#ctypes.c_ulonglong
# See multiprocessing/sharedctypes.py for typecode to ctypes definitions
self._set_id = ctx.Value('Q', 0)
self._cond = ctx.Condition(ctx.Lock())
self._flag = ctx.Semaphore(0)

def is_set(self):
# From https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Value :
# If lock is True (the default) then a new recursive lock object is created to synchronize access to the value.
with self._flag:
return self._flag.value == 1
with self._cond:
if self._flag.acquire(False):
self._flag.release()
return True
return False

def set(self):
# If set() is called from a signal handler, this is fine as the lock is recursive (i.e. it won't deadlock).
# If the thread interrupted by the signal handler is wait()-ing and the signal handler calls set(),
# this is fine as wait() spins on the value. Thus, after the signal handler is done, the thread will
# return from wait()
# Fixes https://github.com/python/cpython/issues/85772
with self._flag:
with self._set_id:
if self._flag.value == 0:
# There is a theoretical chance of a race here. It requires the following conditions:
# The interrupted thread must be wait()ing.
# Then set() must be called reentrant for the maximum value of c_ulonglong + 1 times, i.e.
# ./python.exe -c 'from ctypes import *; print(256**sizeof(c_ulonglong()))', which
# evaluates to 18446744073709551616 on a 64 bit machine.
# All interruptions must happen exactly after `if self._flag.value == 0:`.
# The _set_id value will then wrap around. Then clear() must be called
# before the original wait() code continues. The wait() code will then continue
# to (incorrectly) wait. This case should be safe to ignore. The stack
# will probably grow too large before there is any chance of this actually happening.

self._flag.value = 1
self._set_id.value += 1
# There is no race here by reentrant set() when reaching the maximum value for `self._set_id.value`.
# ctypes.c_ulonglong will overflow without any exception:
# https://docs.python.org/3/library/ctypes.html#ctypes.c_ulonglong
# > no overflow checking is done.
# This means that we do not need to check if some maximum value is reached:
# C will wrap around the value for us.
with self._cond:
self._flag.acquire(False)
self._flag.release()
self._cond.notify_all()

def clear(self):
with self._flag:
self._flag.value = 0
with self._cond:
self._flag.acquire(False)

def wait(self, timeout=None):
start_time = time.monotonic()
set_id = self._set_id.value
while True:
if self._flag.value == 1:
return True
elif set_id != self._set_id.value:
return True # flag is unset, but set_id changed, so there must have been a `set` followed by a `clear`
# during `time.sleep()`. Fixes https://github.com/python/cpython/issues/95826
elif timeout is not None and (time.monotonic() - start_time) > timeout:
return False
with self._cond:
if self._flag.acquire(False):
self._flag.release()
else:
# Fixes https://github.com/python/cpython/issues/85772 by spinning and sleeping.
time.sleep(0.010) # sleep 10 milliseconds
self._cond.wait(timeout)

if self._flag.acquire(False):
self._flag.release()
return True
return False

def __repr__(self) -> str:
set_status = 'set' if self.is_set() else 'unset'
return f"<{type(self).__qualname__} at {id(self):#x} {set_status}>"

#
# Barrier
#
Expand Down
70 changes: 64 additions & 6 deletions Lib/signal.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import _signal
import os
from _signal import *
from enum import IntEnum as _IntEnum
import threading
import queue

_globals = globals()

Expand Down Expand Up @@ -42,6 +45,43 @@ def _enum_to_int(value):
except (ValueError, TypeError):
return value

_init_lock = threading.Lock()
_signal_queue = queue.SimpleQueue() # SimpleQueue has reentrant put, so it can safely be called from signal handlers. https://github.com/python/cpython/issues/59181
_bubble_queue = queue.SimpleQueue()
_signal_thread = None
_signo_to_handler = {}

def _signal_queue_handler():
assert threading.current_thread() is not threading.main_thread()
global _signal_queue, _signo_to_handler, _bubble_queue
while True:
(signo, stack_frame) = _signal_queue.get()
try:
handler = _signo_to_handler.get(signo, None)
handler(signo, stack_frame)
except Exception as e:
_bubble_queue.put(e)
# _signal.raise_signal(SIGTERM) # does not work when using event.wait()
# _thread.interrupt_main(SIGTERM) # does not work when using event.wait()
os.kill(os.getpid(), signo)

def _init_signal_thread():
assert threading.current_thread() is threading.main_thread()
global _signal_thread, _init_lock
with _init_lock:
if _signal_thread is None:
_signal_thread = threading.Thread(target=_signal_queue_handler, daemon=True)
_signal_thread.start()

def _push_signal_to_queue_handler(signo, stack_frame):
assert threading.current_thread() is threading.main_thread()
try:
global _bubble_queue
bubble_exception = _bubble_queue.get(block=False)
raise bubble_exception
except queue.Empty:
global _signal_queue
_signal_queue.put((signo, stack_frame))

# Similar to functools.wraps(), but only assign __doc__.
# __module__ should be preserved,
Expand All @@ -53,16 +93,34 @@ def decorator(wrapper):
return wrapper
return decorator

@_wraps(_signal.signal)
def signal(signalnum, handler):
handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
return _int_to_enum(handler, Handlers)
def signal(signalnum, handler, use_dedicated_thread=True):
assert threading.current_thread() is threading.main_thread()
global _signo_to_handler
signal_int = _enum_to_int(signalnum)
old_handler = _signo_to_handler.get(signal_int, None)
if use_dedicated_thread and callable(handler):
assert callable(handler)
global _signal_thread
if _signal_thread is None:
_init_signal_thread()
_signo_to_handler[signal_int] = handler
handler = _signal.signal(signal_int, _enum_to_int(_push_signal_to_queue_handler))
return old_handler or _int_to_enum(handler, Handlers)
else:
if signal_int in _signo_to_handler:
del _signo_to_handler[signal_int]
handler = _signal.signal(signal_int, _enum_to_int(handler))
return old_handler or _int_to_enum(handler, Handlers)


@_wraps(_signal.getsignal)
def getsignal(signalnum):
handler = _signal.getsignal(signalnum)
return _int_to_enum(handler, Handlers)
global _signo_to_handler
if signalnum in _signo_to_handler:
return _signo_to_handler[signalnum]
else:
handler = _signal.getsignal(signalnum)
return _int_to_enum(handler, Handlers)


if 'pthread_sigmask' in _globals:
Expand Down

0 comments on commit c308344

Please sign in to comment.