From 49ea36d624909256407a0470fffcbc53f957dc1c Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 29 Sep 2023 21:31:19 +0200 Subject: [PATCH] gh-109047: concurrent.futures catches PythonFinalizationError (#109810) concurrent.futures: The *executor manager thread* now catches exceptions when adding an item to the *call queue*. During Python finalization, creating a new thread can now raise RuntimeError. Catch the exception and call terminate_broken() in this case. Add test_python_finalization_error() to test_concurrent_futures. concurrent.futures._ExecutorManagerThread changes: * terminate_broken() no longer calls shutdown_workers() since the call queue is no longer working anymore (read and write ends of the queue pipe are closed). * terminate_broken() now terminates child processes, not only wait until they complete. * _ExecutorManagerThread.terminate_broken() now holds shutdown_lock to prevent race conditons with ProcessPoolExecutor.submit(). multiprocessing.Queue changes: * Add _terminate_broken() method. * _start_thread() sets _thread to None on exception to prevent leaking "dangling threads" even if the thread was not started yet. (cherry picked from commit 635184212179b0511768ea1cd57256e134ba2d75) --- Lib/concurrent/futures/process.py | 49 ++++++++++++++----- Lib/multiprocessing/queues.py | 25 ++++++++-- .../test_process_pool.py | 29 +++++++++++ ...-09-25-02-11-14.gh-issue-109047.b1TrqG.rst | 4 ++ 4 files changed, 90 insertions(+), 17 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2023-09-25-02-11-14.gh-issue-109047.b1TrqG.rst diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 301207f59de37a..1306e3c623b210 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -336,7 +336,14 @@ def run(self): # Main loop for the executor manager thread. while True: - self.add_call_item_to_queue() + # gh-109047: During Python finalization, self.call_queue.put() + # creation of a thread can fail with RuntimeError. + try: + self.add_call_item_to_queue() + except BaseException as exc: + cause = format_exception(exc) + self.terminate_broken(cause) + return result_item, is_broken, cause = self.wait_result_broken_or_wakeup() @@ -420,8 +427,8 @@ def wait_result_broken_or_wakeup(self): try: result_item = result_reader.recv() is_broken = False - except BaseException as e: - cause = format_exception(type(e), e, e.__traceback__) + except BaseException as exc: + cause = format_exception(exc) elif wakeup_reader in ready: is_broken = False @@ -464,7 +471,7 @@ def is_shutting_down(self): return (_global_shutdown or executor is None or executor._shutdown_thread) - def terminate_broken(self, cause): + def _terminate_broken(self, cause): # Terminate the executor because it is in a broken state. The cause # argument can be used to display more information on the error that # lead the executor into becoming broken. @@ -489,7 +496,14 @@ def terminate_broken(self, cause): # Mark pending tasks as failed. for work_id, work_item in self.pending_work_items.items(): - work_item.future.set_exception(bpe) + try: + work_item.future.set_exception(bpe) + except _base.InvalidStateError: + # set_exception() fails if the future is cancelled: ignore it. + # Trying to check if the future is cancelled before calling + # set_exception() would leave a race condition if the future is + # cancelled between the check and set_exception(). + pass # Delete references to object. See issue16284 del work_item self.pending_work_items.clear() @@ -499,12 +513,14 @@ def terminate_broken(self, cause): for p in self.processes.values(): p.terminate() - # Prevent queue writing to a pipe which is no longer read. - # https://github.com/python/cpython/issues/94777 - self.call_queue._reader.close() + self.call_queue._terminate_broken() # clean up resources - self.join_executor_internals() + self._join_executor_internals(broken=True) + + def terminate_broken(self, cause): + with self.shutdown_lock: + self._terminate_broken(cause) def flag_executor_shutting_down(self): # Flag the executor as shutting down and cancel remaining tasks if @@ -547,15 +563,24 @@ def shutdown_workers(self): break def join_executor_internals(self): - self.shutdown_workers() + with self.shutdown_lock: + self._join_executor_internals() + + def _join_executor_internals(self, broken=False): + # If broken, call_queue was closed and so can no longer be used. + if not broken: + self.shutdown_workers() + # Release the queue's resources as soon as possible. self.call_queue.close() self.call_queue.join_thread() - with self.shutdown_lock: - self.thread_wakeup.close() + self.thread_wakeup.close() + # If .join() is not called on the created processes then # some ctx.Queue methods may deadlock on Mac OS X. for p in self.processes.values(): + if broken: + p.terminate() p.join() def get_n_children_alive(self): diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index daf9ee94a19431..d36de75749f4ed 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -158,6 +158,15 @@ def cancel_join_thread(self): except AttributeError: pass + def _terminate_broken(self): + # Close a Queue on error. + + # gh-94777: Prevent queue writing to a pipe which is no longer read. + self._reader.close() + + self.close() + self.join_thread() + def _start_thread(self): debug('Queue._start_thread()') @@ -169,13 +178,19 @@ def _start_thread(self): self._wlock, self._reader.close, self._writer.close, self._ignore_epipe, self._on_queue_feeder_error, self._sem), - name='QueueFeederThread' + name='QueueFeederThread', + daemon=True, ) - self._thread.daemon = True - debug('doing self._thread.start()') - self._thread.start() - debug('... done self._thread.start()') + try: + debug('doing self._thread.start()') + self._thread.start() + debug('... done self._thread.start()') + except: + # gh-109047: During Python finalization, creating a thread + # can fail with RuntimeError. + self._thread = None + raise if not self._joincancelled: self._jointhread = Finalize( diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 7763a4946f110c..c73c2da1a01088 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -1,5 +1,6 @@ import os import sys +import threading import time import unittest from concurrent import futures @@ -187,6 +188,34 @@ def test_max_tasks_early_shutdown(self): for i, future in enumerate(futures): self.assertEqual(future.result(), mul(i, i)) + def test_python_finalization_error(self): + # gh-109047: Catch RuntimeError on thread creation + # during Python finalization. + + context = self.get_context() + + # gh-109047: Mock the threading.start_new_thread() function to inject + # RuntimeError: simulate the error raised during Python finalization. + # Block the second creation: create _ExecutorManagerThread, but block + # QueueFeederThread. + orig_start_new_thread = threading._start_new_thread + nthread = 0 + def mock_start_new_thread(func, *args): + nonlocal nthread + if nthread >= 1: + raise RuntimeError("can't create new thread at " + "interpreter shutdown") + nthread += 1 + return orig_start_new_thread(func, *args) + + with support.swap_attr(threading, '_start_new_thread', + mock_start_new_thread): + executor = self.executor_type(max_workers=2, mp_context=context) + with executor: + with self.assertRaises(BrokenProcessPool): + list(executor.map(mul, [(2, 3)] * 10)) + executor.shutdown() + create_executor_tests(globals(), ProcessPoolExecutorTest, executor_mixins=(ProcessPoolForkMixin, diff --git a/Misc/NEWS.d/next/Library/2023-09-25-02-11-14.gh-issue-109047.b1TrqG.rst b/Misc/NEWS.d/next/Library/2023-09-25-02-11-14.gh-issue-109047.b1TrqG.rst new file mode 100644 index 00000000000000..71cb5a80847d0a --- /dev/null +++ b/Misc/NEWS.d/next/Library/2023-09-25-02-11-14.gh-issue-109047.b1TrqG.rst @@ -0,0 +1,4 @@ +:mod:`concurrent.futures`: The *executor manager thread* now catches exceptions +when adding an item to the *call queue*. During Python finalization, creating a +new thread can now raise :exc:`RuntimeError`. Catch the exception and call +``terminate_broken()`` in this case. Patch by Victor Stinner.