From 3664a793317dcd96d09978cfb542a43e91f3110e Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 7 Oct 2023 19:42:52 +0200 Subject: [PATCH] GH-109978: Allow multiprocessing finalizers to run on a separate thread --- Lib/concurrent/futures/process.py | 15 +- Lib/multiprocessing/connection.py | 4 + Lib/multiprocessing/heap.py | 43 +--- Lib/multiprocessing/managers.py | 8 +- Lib/multiprocessing/pool.py | 7 +- Lib/multiprocessing/queues.py | 4 +- Lib/multiprocessing/synchronize.py | 2 +- Lib/multiprocessing/util.py | 214 ++++++++++++++++-- Lib/test/_test_multiprocessing.py | 15 +- .../test_process_pool.py | 9 +- Lib/test/test_concurrent_futures/util.py | 3 + Lib/test/test_fcntl.py | 2 + Lib/test/test_logging.py | 7 + Lib/test/test_re.py | 2 + Modules/posixmodule.c | 3 + 15 files changed, 265 insertions(+), 73 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index ffaffdb8b3d0aa..b881fd8962ffed 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -561,9 +561,9 @@ def shutdown_workers(self): except queue.Full: break - def join_executor_internals(self): + def join_executor_internals(self, broken=False): with self.shutdown_lock: - self._join_executor_internals() + self._join_executor_internals(broken) def _join_executor_internals(self, broken=False): # If broken, call_queue was closed and so can no longer be used. @@ -759,7 +759,11 @@ def _start_executor_manager_thread(self): if not self._safe_to_dynamically_spawn_children: # ie, using fork. self._launch_processes() self._executor_manager_thread = _ExecutorManagerThread(self) - self._executor_manager_thread.start() + try: + self._executor_manager_thread.start() + except RuntimeError: + self._broken = "Executor manager thread could not be started" + raise BrokenProcessPool(self._broken) _threads_wakeups[self._executor_manager_thread] = \ self._executor_manager_thread_wakeup @@ -860,7 +864,10 @@ def shutdown(self, wait=True, *, cancel_futures=False): self._executor_manager_thread_wakeup.wakeup() if self._executor_manager_thread is not None and wait: - self._executor_manager_thread.join() + try: + self._executor_manager_thread.join() + except RuntimeError: + self._executor_manager_thread.join_executor_internals(broken=True) # To reduce the risk of opening too many files, remove references to # objects that use file descriptors. self._executor_manager_thread = None diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index dbbf106f680964..4793c94422b924 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -491,6 +491,10 @@ def close(self): self._listener = None listener.close() + @property + def closed(self): + return self._listener is None + @property def address(self): return self._listener._address diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py index 6217dfe12689b3..d50b094d4773c7 100644 --- a/Lib/multiprocessing/heap.py +++ b/Lib/multiprocessing/heap.py @@ -142,9 +142,6 @@ def __init__(self, size=mmap.PAGESIZE): self._allocated_blocks = defaultdict(set) self._arenas = [] - # List of pending blocks to free - see comment in free() below - self._pending_free_blocks = [] - # Statistics self._n_mallocs = 0 self._n_frees = 0 @@ -255,43 +252,16 @@ def _remove_allocated_block(self, block): # Arena is entirely free, discard it from this process self._discard_arena(arena) - def _free_pending_blocks(self): - # Free all the blocks in the pending list - called with the lock held. - while True: - try: - block = self._pending_free_blocks.pop() - except IndexError: - break - self._add_free_block(block) - self._remove_allocated_block(block) - def free(self, block): # free a block returned by malloc() - # Since free() can be called asynchronously by the GC, it could happen - # that it's called while self._lock is held: in that case, - # self._lock.acquire() would deadlock (issue #12352). To avoid that, a - # trylock is used instead, and if the lock can't be acquired - # immediately, the block is added to a list of blocks to be freed - # synchronously sometimes later from malloc() or free(), by calling - # _free_pending_blocks() (appending and retrieving from a list is not - # strictly thread-safe but under CPython it's atomic thanks to the GIL). if os.getpid() != self._lastpid: raise ValueError( "My pid ({0:n}) is not last pid {1:n}".format( os.getpid(),self._lastpid)) - if not self._lock.acquire(False): - # can't acquire the lock right now, add the block to the list of - # pending blocks to free - self._pending_free_blocks.append(block) - else: - # we hold the lock - try: - self._n_frees += 1 - self._free_pending_blocks() - self._add_free_block(block) - self._remove_allocated_block(block) - finally: - self._lock.release() + with self._lock: + self._n_frees += 1 + self._add_free_block(block) + self._remove_allocated_block(block) def malloc(self, size): # return a block of right size (possibly rounded up) @@ -303,8 +273,6 @@ def malloc(self, size): self.__init__() # reinitialize after fork with self._lock: self._n_mallocs += 1 - # allow pending blocks to be marked available - self._free_pending_blocks() size = self._roundup(max(size, 1), self._alignment) (arena, start, stop) = self._malloc(size) real_stop = start + size @@ -330,7 +298,8 @@ def __init__(self, size): raise OverflowError("Size {0:n} too large".format(size)) block = BufferWrapper._heap.malloc(size) self._state = (block, size) - util.Finalize(self, BufferWrapper._heap.free, args=(block,)) + util.Finalize(self, BufferWrapper._heap.free, args=(block,), + reentrant=False) def create_memoryview(self): (arena, start, stop), size = self._state diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 273c22a7654f05..9c8d344142624b 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -180,6 +180,7 @@ def serve_forever(self): except (KeyboardInterrupt, SystemExit): pass finally: + self.listener.close() if sys.stdout != sys.__stdout__: # what about stderr? util.debug('resetting stdout, stderr') sys.stdout = sys.__stdout__ @@ -187,7 +188,7 @@ def serve_forever(self): sys.exit(0) def accepter(self): - while True: + while not self.listener.closed: try: c = self.listener.accept() except OSError: @@ -575,7 +576,7 @@ def start(self, initializer=None, initargs=()): self, type(self)._finalize_manager, args=(self._process, self._address, self._authkey, self._state, self._Client, self._shutdown_timeout), - exitpriority=0 + exitpriority=0, reentrant=False ) @classmethod @@ -859,12 +860,11 @@ def _incref(self): self._idset.add(self._id) state = self._manager and self._manager._state - self._close = util.Finalize( self, BaseProxy._decref, args=(self._token, self._authkey, state, self._tls, self._idset, self._Client), - exitpriority=10 + exitpriority=10, reentrant=False ) @staticmethod diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index f979890170b1a1..63c87f7429a989 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -219,6 +219,7 @@ def __init__(self, processes=None, initializer=None, initargs=(), p.terminate() for p in self._pool: p.join() + self._pool.clear() raise sentinels = self._get_sentinels() @@ -257,7 +258,7 @@ def __init__(self, processes=None, initializer=None, initargs=(), args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, self._change_notifier, self._worker_handler, self._task_handler, self._result_handler, self._cache), - exitpriority=15 + exitpriority=15, reentrant=False ) self._state = RUN @@ -665,8 +666,8 @@ def join(self): self._worker_handler.join() self._task_handler.join() self._result_handler.join() - for p in self._pool: - p.join() + while self._pool: + self._pool.pop().join() @staticmethod def _help_stuff_finish(inqueue, task_handler, size): diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 852ae87b276861..4f193d04186ebc 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -201,14 +201,14 @@ def _start_thread(self): self._jointhread = Finalize( self._thread, Queue._finalize_join, [weakref.ref(self._thread)], - exitpriority=-5 + exitpriority=-5, reentrant=False ) # Send sentinel to the thread queue object when garbage collected self._close = Finalize( self, Queue._finalize_close, [self._buffer, self._notempty], - exitpriority=10 + exitpriority=10, reentrant=False ) @staticmethod diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 3ccbfe311c71f3..0b3fb5099f9699 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -79,7 +79,7 @@ def _after_fork(obj): from .resource_tracker import register register(self._semlock.name, "semaphore") util.Finalize(self, SemLock._cleanup, (self._semlock.name,), - exitpriority=0) + exitpriority=0, reentrant=False) @staticmethod def _cleanup(name): diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 28c77df1c32ea8..4303688aba0c21 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -7,13 +7,17 @@ # Licensed to PSF under a Contributor Agreement. # -import os + +import atexit +import functools import itertools +import os import sys -import weakref -import atexit +import queue import threading # we want threading to install it's # cleanup function before multiprocessing does +import warnings +import weakref from subprocess import _args_from_interpreter_flags from . import process @@ -167,6 +171,165 @@ def _run_after_forkers(): def register_after_fork(obj, func): _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj + +class _WorkQueue: + _sentinel = object() + + def __init__(self): + self._queue = queue.SimpleQueue() + self._lock = threading.RLock() + # Note the Condition uses a different lock to avoid a deadlock + # when stopping the thread. + self._queue_drained_cond = threading.Condition() + self._stopped = False + self._thread = None + + def __del__(self): + self._atexit() + + def ensure_running(self): + with self._lock: + if self._stopped: + return + if self._thread is None: + self._start_thread() + + def _start_thread(self): + thread = threading.Thread( + target=self._work_loop, + args=(self._queue, self._queue_drained_cond), + name='multiprocessing.util._WorkQueue', + daemon=True) + thread.start() + # Avoid setting self._thread before the thread starts successfully. + # This should not happen in real life but can happen in + # test_concurrent_futures. + self._thread = thread + + def _atexit(self): + self.stop() + + def _atfork_before(self): + import time + self._lock.acquire() + self._atfork_was_stopped = self._stopped + thread_was_running = self._thread is not None + # Make sure we don't have a running thread + while self._thread is not None: + thread = self._stop_unlocked() + self._lock.release() + try: + thread.join() + finally: + self._lock.acquire() + if thread_was_running: + # HACK: os.fork() queries the system for the number of running threads. + # However, Thread.join() only ensures that the Python thread state + # was destroyed, while the system thread could still be running. + # Give it time to exit. + time.sleep(0.001) + + def _atfork_after_in_parent(self): + self._stopped = self._atfork_was_stopped + self._lock.release() + + def _atfork_after_in_child(self): + self._lock = threading.RLock() + self._queue = queue.SimpleQueue() + + @property + def stopped(self): + with self._lock(): + return self._stopped + + def _stop_unlocked(self): + thread = self._thread + self._stopped = True + if self._thread is not None: + self._queue.put(self._sentinel) + self._thread = None + return thread + + def stop(self): + with self._lock: + thread = self._stop_unlocked() + # To avoid deadlocks between self._lock and the thread-state lock, + # call `thread.join` unlocked. + if thread is not None: + thread.join() + + def reset(self): + # For tests + self.stop() + assert self._queue.qsize() == 0 + self._stopped = False + + def wait_until_idle(self): + with self._queue_drained_cond: + self._queue_drained_cond.wait_for(lambda: self._queue.empty()) + + def enqueue_task(self, cb): + # Can be run concurrently or reentrantly along stop(), or even + # from the work loop. + assert callable(cb) + with self._lock: + if not self._stopped and self._thread is not None: + # Even if called reentrantly from stop(), the fact that + # self._stopped is False ensures that our callback will + # be enqueued before the sentinel. + self._queue.put(cb) + return + # The work loop is not running (perhaps we're shutting down?), + # execute callback directly without taking the lock. + # This might cause issues if this is a reentrant call, but we cannot + # do any better. + try: + cb() + finally: + cb = None + + @classmethod + def _work_loop(cls, queue, cond): + while True: + if queue.empty(): + with cond: + cond.notify_all() + cb = queue.get() + if cb is cls._sentinel: + with cond: + cond.notify_all() + return + try: + cb() + except BaseException as e: + # XXX Ideally would call sys.unraisablehook, but it expects + # a specific type not accessible from Python. + sys.excepthook(type(e), e, e.__traceback__) + finally: + # Clear potential refcycle with exception + cb = None + + +_work_queue = _WorkQueue() +atexit.register(_work_queue._atexit) +if os.name == 'posix': + os.register_at_fork(before=_work_queue._atfork_before, + after_in_parent=_work_queue._atfork_after_in_parent, + after_in_child=_work_queue._atfork_after_in_child) + +def ensure_work_queue(): + _work_queue.ensure_running() + +def reset_work_queue(): + _work_queue.reset() + +def enqueue_task(cb): + _work_queue.enqueue_task(cb) + +def ensure_finalizers_run(): + _work_queue.wait_until_idle() + + # # Finalization using weakrefs # @@ -179,7 +342,8 @@ class Finalize(object): ''' Class which supports object finalization using weakrefs ''' - def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): + def __init__(self, obj, callback, args=(), kwargs=None, + exitpriority=None, reentrant=True): if (exitpriority is not None) and not isinstance(exitpriority,int): raise TypeError( "Exitpriority ({0!r}) must be None or int, not {1!s}".format( @@ -190,9 +354,15 @@ def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): elif exitpriority is None: raise ValueError("Without object, exitpriority cannot be None") + try: + ensure_work_queue() + except RuntimeError: + # Could be at interpreter shutdown, we can only ignore the error + pass self._callback = callback self._args = args self._kwargs = kwargs or {} + self._reentrant = reentrant self._key = (exitpriority, next(_finalizer_counter)) self._pid = os.getpid() @@ -211,16 +381,26 @@ def __call__(self, wr=None, except KeyError: sub_debug('finalizer no longer registered') else: - if self._pid != getpid(): - sub_debug('finalizer ignored because different process') - res = None - else: + try: + if self._pid != getpid(): + sub_debug('finalizer ignored because different process') + return sub_debug('finalizer calling %s with args %s and kwargs %s', self._callback, self._args, self._kwargs) - res = self._callback(*self._args, **self._kwargs) - self._weakref = self._callback = self._args = \ - self._kwargs = self._key = None - return res + # If `wr` is None, the Finalize object was called explicitly + # to shutdown the object, presumably in a non-reentrant + # context. + if self._reentrant or wr is None: + return self._callback(*self._args, **self._kwargs) + else: + # No need to return a value since we are in a weakref + # callback. + enqueue_task(functools.partial(self._callback, + *self._args, + **self._kwargs)) + finally: + self._weakref = self._callback = self._args = \ + self._kwargs = self._key = None def cancel(self): ''' @@ -475,13 +655,17 @@ def _cleanup_tests(): from multiprocessing import forkserver forkserver._forkserver._stop() - # Stop the ResourceTracker process if it's running - from multiprocessing import resource_tracker - resource_tracker._resource_tracker._stop() + # Stop the work queue thread + reset_work_queue() # bpo-37421: Explicitly call _run_finalizers() to remove immediately # temporary directories created by multiprocessing.util.get_temp_dir(). _run_finalizers() support.gc_collect() + # Stop the ResourceTracker process if it's running. We do this last + # as finalizers may restart it otherwise. + from multiprocessing import resource_tracker + resource_tracker._resource_tracker._stop() + support.reap_children() diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index d3e713594b0f4b..15bd8c411ec4a0 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3139,7 +3139,6 @@ def test_rapid_restart(self): shutdown_timeout=SHUTDOWN_TIMEOUT) try: manager.start() - self.addCleanup(manager.shutdown) except OSError as e: if e.errno != errno.EADDRINUSE: raise @@ -3149,8 +3148,9 @@ def test_rapid_restart(self): manager = QueueManager( address=addr, authkey=authkey, serializer=SERIALIZER, shutdown_timeout=SHUTDOWN_TIMEOUT) + finally: if hasattr(manager, "shutdown"): - self.addCleanup(manager.shutdown) + manager.shutdown() class FakeConnection: @@ -3833,8 +3833,8 @@ def test_heap(self): while blocks: blocks.pop() + util.ensure_finalizers_run() self.assertEqual(heap._n_frees, heap._n_mallocs) - self.assertEqual(len(heap._pending_free_blocks), 0) self.assertEqual(len(heap._arenas), 0) self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks) self.assertEqual(len(heap._len_to_seq), 0) @@ -6037,6 +6037,7 @@ def tearDownClass(cls): # bpo-26762: Some multiprocessing objects like Pool create reference # cycles. Trigger a garbage collection to break these cycles. test.support.gc_collect() + util.ensure_finalizers_run() processes = set(multiprocessing.process._dangling) - set(cls.dangling[0]) if processes: @@ -6044,6 +6045,7 @@ def tearDownClass(cls): support.print_warning(f'Dangling processes: {processes}') processes = None + util.reset_work_queue() # Avoid dangling work queue thread threads = set(threading._dangling) - set(cls.dangling[1]) if threads: test.support.environment_altered = True @@ -6120,7 +6122,11 @@ def tearDownClass(cls): f"{multiprocessing.active_children()} " f"active children after {dt:.1f} seconds") - gc.collect() # do garbage collection + i = 0 + while i < 3 and cls.manager._number_of_objects() != 0: + gc.collect() + util.ensure_finalizers_run() + i += 1 if cls.manager._number_of_objects() != 0: # This is not really an error since some tests do not # ensure that all processes which hold a reference to a @@ -6238,6 +6244,7 @@ def tearDownModule(): support.print_warning(f'Dangling processes: {processes}') processes = None + util.reset_work_queue() threads = set(threading._dangling) - set(dangling[1]) if threads: need_sleep = True diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index c73c2da1a01088..f2cadf836855ab 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -1,3 +1,4 @@ +import multiprocessing.util import os import sys import threading @@ -193,6 +194,7 @@ def test_python_finalization_error(self): # during Python finalization. context = self.get_context() + multiprocessing.util.reset_work_queue() # gh-109047: Mock the threading.start_new_thread() function to inject # RuntimeError: simulate the error raised during Python finalization. @@ -202,7 +204,7 @@ def test_python_finalization_error(self): nthread = 0 def mock_start_new_thread(func, *args): nonlocal nthread - if nthread >= 1: + if nthread == 2: raise RuntimeError("can't create new thread at " "interpreter shutdown") nthread += 1 @@ -210,10 +212,11 @@ def mock_start_new_thread(func, *args): with support.swap_attr(threading, '_start_new_thread', mock_start_new_thread): - executor = self.executor_type(max_workers=2, mp_context=context) + executor = self.executor_type(max_workers=4, mp_context=context) with executor: + N = 10 with self.assertRaises(BrokenProcessPool): - list(executor.map(mul, [(2, 3)] * 10)) + list(executor.map(time.sleep, (0.5,) * N)) executor.shutdown() diff --git a/Lib/test/test_concurrent_futures/util.py b/Lib/test/test_concurrent_futures/util.py index dc48bec796b87f..22a8db5e6c077f 100644 --- a/Lib/test/test_concurrent_futures/util.py +++ b/Lib/test/test_concurrent_futures/util.py @@ -1,4 +1,5 @@ import multiprocessing +import multiprocessing.util import sys import time import unittest @@ -30,10 +31,12 @@ def create_future(state=PENDING, exception=None, result=None): class BaseTestCase(unittest.TestCase): def setUp(self): + multiprocessing.util.reset_work_queue() self._thread_key = threading_helper.threading_setup() def tearDown(self): support.reap_children() + multiprocessing.util._cleanup_tests() threading_helper.threading_cleanup(*self._thread_key) diff --git a/Lib/test/test_fcntl.py b/Lib/test/test_fcntl.py index 203dd6fe57dcd9..4a76e8dec91fb6 100644 --- a/Lib/test/test_fcntl.py +++ b/Lib/test/test_fcntl.py @@ -1,6 +1,7 @@ """Test program for the fcntl C module. """ import multiprocessing +import multiprocessing.util import platform import os import struct @@ -46,6 +47,7 @@ def tearDown(self): if self.f and not self.f.closed: self.f.close() unlink(TESTFN) + multiprocessing.util._cleanup_tests() @staticmethod def get_lockdata(): diff --git a/Lib/test/test_logging.py b/Lib/test/test_logging.py index cca02a010b80f4..9dc40ec13fa586 100644 --- a/Lib/test/test_logging.py +++ b/Lib/test/test_logging.py @@ -4080,6 +4080,7 @@ def test_queue_listener_with_multiple_handlers(self): if hasattr(logging.handlers, 'QueueListener'): import multiprocessing + import multiprocessing.util from unittest.mock import patch @threading_helper.requires_working_threading() @@ -4091,6 +4092,10 @@ class QueueListenerTest(BaseTest): repeat = 20 + def tearDown(self): + multiprocessing.util._cleanup_tests() + super().tearDown() + @staticmethod def setup_and_log(log_queue, ident): """ @@ -4932,6 +4937,8 @@ def test_multiprocessing(self): # In other processes, processName is correct when multiprocessing in imported, # but it is (incorrectly) defaulted to 'MainProcess' otherwise (bpo-38762). import multiprocessing + import multiprocessing.util + self.addCleanup(multiprocessing.util._cleanup_tests) parent_conn, child_conn = multiprocessing.Pipe() p = multiprocessing.Process( target=self._extract_logrecord_process_name, diff --git a/Lib/test/test_re.py b/Lib/test/test_re.py index 45bce1925f9e89..c878411f1f056b 100644 --- a/Lib/test/test_re.py +++ b/Lib/test/test_re.py @@ -19,6 +19,7 @@ multiprocessing = None else: import multiprocessing + import multiprocessing.util # Misc tests from Tim Peters' re.doc @@ -2433,6 +2434,7 @@ def test_regression_gh94675(self): input_js = '''a(function() { /////////////////////////////////////////////////////////////////// });''' + self.addCleanup(multiprocessing.util._cleanup_tests) p = multiprocessing.Process(target=pattern.sub, args=('', input_js)) p.start() p.join(SHORT_TIMEOUT) diff --git a/Modules/posixmodule.c b/Modules/posixmodule.c index 0975ef71d44be5..b998dcd50bcc00 100644 --- a/Modules/posixmodule.c +++ b/Modules/posixmodule.c @@ -7625,6 +7625,9 @@ static void warn_about_fork_with_threads(const char* name) { num_python_threads = atoi(field); // 0 on error } } + // XXX This counts the number of system threads, but Python code can only + // call threading.Thread.join(), which can return before the system thread ended. + // This function could therefore print a spurious warning in unlucky cases. #endif if (num_python_threads <= 0) { // Fall back to just the number our threading module knows about.