Skip to content

Commit

Permalink
pythongh-126434: multiprocessing.Event is fully reentrant, i.e. can b…
Browse files Browse the repository at this point in the history
…e called from a signal handler in any combination.
  • Loading branch information
ivarref committed Nov 14, 2024
1 parent f9307f6 commit 25236fe
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 36 deletions.
70 changes: 47 additions & 23 deletions Lib/multiprocessing/synchronize.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,46 +328,70 @@ def wait_for(self, predicate, timeout=None):
class Event(object):

def __init__(self, *, ctx):
self._cond = ctx.Condition(ctx.Lock())
self._threadlocal_wait_lock = None
self._flag = ctx.Value('i', 0)
# Allocate a ctypes.c_ulonglong to hold the set_id:
# Represents the C unsigned long long datatype.
# The constructor accepts an optional integer initializer; no overflow checking is done.
# From https://docs.python.org/3/library/ctypes.html#ctypes.c_ulonglong
# See multiprocessing/sharedctypes.py for typecode to ctypes definitions
self._set_id = ctx.Value('Q', 0)

def is_set(self):
return self._flag.value == 1
# From https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Value :
# If lock is True (the default) then a new recursive lock object is created to synchronize access to the value.
with self._flag:
return self._flag.value == 1

def set(self):
assert not self._cond._lock._semlock._is_mine(), \
'multiprocessing.Event is not reentrant for clear(), set() and wait()'
with self._cond:
if self._threadlocal_wait_lock is not None:
assert not self._threadlocal_wait_lock.v.locked(), \
'multiprocessing.Event.set() cannot be called from a thread that is already wait()-ing'
self._flag.value = 1
self._cond.notify_all()
# If `set` is called from a signal handler, this is fine as the lock is recursive (i.e. it won't deadlock).
# If the thread interrupted by the signal handler is wait()-ing and the signal handler calls set(),
# this is fine as wait() spins on the value. Thus, after the signal handler is done, the thread will
# return from wait()
# Fixes https://github.com/python/cpython/issues/85772
with self._flag:
with self._set_id:
if self._flag.value == 0:
# There is a theoretical chance of race here. It requires the following conditions:
# The interrupted thread must be wait()ing.
# Then set must be called reentrant for the maximum value of c_ulonglong times,
# and all interruptions must happen exactly after `if self._flag.value == 0:`.
# The _set_id value will then wrap around. Then clear() must be called
# before the original wait() code continues. The wait() code will then continue
# to (incorrectly) wait. I think this case is safe to ignore. The stack
# will grow too large before there is any chance of this actually happening.

self._flag.value = 1
self._set_id.value += 1
# There is no race here by reentrant set when reaching the maximum value for `self._set_id.value`.
# ctypes.c_ulonglong will overflow without any exception:
# https://docs.python.org/3/library/ctypes.html#ctypes.c_ulonglong
# > no overflow checking is done.
# This means that we do not need to check if some maximum value is reached:
# C will wrap around the value for us.

def clear(self):
assert not self._cond._lock._semlock._is_mine(), \
'multiprocessing.Event is not reentrant for clear(), set() and wait()'
with self._cond:
with self._flag:
self._flag.value = 0

def wait(self, timeout=None):
assert not self._cond._lock._semlock._is_mine(), \
'multiprocessing.Event is not reentrant for clear(), set() and wait()'
with self._cond:
if self._threadlocal_wait_lock is None:
self._threadlocal_wait_lock = threading.local()
self._threadlocal_wait_lock.v = threading.Lock()

start_time = time.monotonic()
set_id = self._set_id.value
while True:
if self._flag.value == 1:
return True
elif set_id != self._set_id.value:
return True # flag is unset, but set_id changed, so there must have been a `set` followed by a `clear`
# during `time.sleep()`. Fixes https://github.com/python/cpython/issues/95826
elif timeout is not None and (time.monotonic() - start_time) > timeout:
return False
else:
with self._threadlocal_wait_lock.v:
return self._cond.wait(timeout)
# Fixes https://github.com/python/cpython/issues/85772 by spinning and sleeping.
time.sleep(0.000001) # sleep one microsecond

def __repr__(self) -> str:
set_status = 'set' if self.is_set() else 'unset'
return f"<{type(self).__qualname__} at {id(self):#x} {set_status}>"

#
# Barrier
#
Expand Down
File renamed without changes.
2 changes: 2 additions & 0 deletions Lib/test/multiprocessingdata/set_clear_race.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@


# Reproduction code copied and modified from https://github.com/python/cpython/issues/95826
# Fixes the issue above

class SimpleRepro:
def __init__(self):
self.heartbeat_event = multiprocessing.Event()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
import signal
import concurrent.futures
import time
import sys
import os


# Shows that https://github.com/python/cpython/issues/85772 is fixed

def send_sigint(pid):
time.sleep(1)
time.sleep(1) # Make sure shutdown_event.wait() is called
os.kill(pid, signal.SIGINT)


Expand All @@ -26,9 +27,4 @@ def sigterm_handler(_signo, _stack_frame):


if __name__ == '__main__':
try:
run_signal_handler_wait_set_test()
sys.exit(1)
except AssertionError as e:
assert 'multiprocessing.Event.set() cannot be called from a thread that is already wait()-ing' in str(e)
sys.exit(0)
run_signal_handler_wait_set_test()
33 changes: 28 additions & 5 deletions Lib/test/test_multiprocessing_event/test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,48 @@
"Requires os.getppid")
@support.requires_subprocess()
class TestEventSignalHandling(unittest.TestCase):
def test_no_race_for_set_is_set(self):
def test_no_race_for_is_set_set(self):
import subprocess
script = support.findfile("set_is_set.py", subdir="multiprocessingdata")
script = support.findfile("is_set_set.py", subdir="multiprocessingdata")
for x in range(10):
try:
assert 0 == subprocess.call([sys.executable, script], timeout=60)
except subprocess.TimeoutExpired:
assert False, 'subprocess.Timeoutexpired for set_is_set.py'
assert False, 'subprocess.Timeoutexpired for is_set_set.py'

def test_no_race_set_clear(self):
import subprocess
script = support.findfile("set_clear_race.py", subdir="multiprocessingdata")
assert 0 == subprocess.call([sys.executable, script])

def test_wait_set_throws(self):
def test_wait_set_no_deadlock(self):
import subprocess
script = support.findfile("wait_set_throws.py", subdir="multiprocessingdata")
script = support.findfile("wait_set_no_deadlock.py", subdir="multiprocessingdata")
assert 0 == subprocess.call([sys.executable, script])

def test_wait_timeout(self):
event = multiprocessing.Event()
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Event
# multiprocessing.Event: A clone of threading.Event.

# threading.Event: https://docs.python.org/3/library/threading.html#threading.Event

# threading.Event.wait(): https://docs.python.org/3/library/threading.html#threading.Event.wait
# Block as long as the internal flag is false and the timeout, if given, has not expired.
# The return value represents the reason that this blocking method returned;
# True if returning because the internal flag is set to true, or
# False if a timeout is given and the internal flag did not become true within the given wait time.

# When the timeout argument is present and not None, it should be a floating-point number
# specifying a timeout for the operation in seconds, or fractions thereof.

# wait() supports both integer and float:
flag_set = event.wait(1)
assert flag_set == False

flag_set = event.wait(0.1)
assert flag_set == False


if __name__ == '__main__':
unittest.main()
Expand Down

0 comments on commit 25236fe

Please sign in to comment.