Skip to content

Commit bc555c5

Browse files
committed
pythongh-109047: concurrent.futures catches PythonFinalizationError
_ExecutorManagerThread of concurrent.futures now catches PythonFinalizationError: call terminate_broken(). The exception occurs when adding an item to the "call_queue" creates a new thread while Python is being finalized. Add test_python_finalization_error() to test_concurrent_futures. concurrent.futures._ExecutorManagerThread changes: * terminate_broken() no longer calls shutdown_workers() since the queue is no longer working anymore (read and write ends of the queue pipe are closed). * wait_result_broken_or_wakeup() now uses the short form of traceback.format_exception(). multiprocessing.Queue changes: * Add _terminate_broken() method. * _start_thread() starts _thread to None on exception to prevent leaking "dangling threads" even if the thread was not started yet.
1 parent 6d969f3 commit bc555c5

File tree

4 files changed

+64
-17
lines changed

4 files changed

+64
-17
lines changed

Diff for: Lib/concurrent/futures/process.py

+15-12
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,12 @@ def run(self):
341341
# Main loop for the executor manager thread.
342342

343343
while True:
344-
self.add_call_item_to_queue()
344+
try:
345+
self.add_call_item_to_queue()
346+
except PythonFinalizationError as exc:
347+
cause = format_exception(exc)
348+
self.terminate_broken(cause)
349+
return
345350

346351
result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
347352

@@ -425,8 +430,8 @@ def wait_result_broken_or_wakeup(self):
425430
try:
426431
result_item = result_reader.recv()
427432
is_broken = False
428-
except BaseException as e:
429-
cause = format_exception(type(e), e, e.__traceback__)
433+
except BaseException as exc:
434+
cause = format_exception(exc)
430435

431436
elif wakeup_reader in ready:
432437
is_broken = False
@@ -515,16 +520,10 @@ def terminate_broken(self, cause):
515520
for p in self.processes.values():
516521
p.terminate()
517522

518-
# Prevent queue writing to a pipe which is no longer read.
519-
# https://github.com/python/cpython/issues/94777
520-
self.call_queue._reader.close()
521-
522-
# gh-107219: Close the connection writer which can unblock
523-
# Queue._feed() if it was stuck in send_bytes().
524-
self.call_queue._writer.close()
523+
self.call_queue._terminate_broken()
525524

526525
# clean up resources
527-
self.join_executor_internals()
526+
self._join_executor_internals(broken=True)
528527

529528
def flag_executor_shutting_down(self):
530529
# Flag the executor as shutting down and cancel remaining tasks if
@@ -567,7 +566,11 @@ def shutdown_workers(self):
567566
break
568567

569568
def join_executor_internals(self):
570-
self.shutdown_workers()
569+
self._join_executor_internals()
570+
571+
def _join_executor_internals(self, broken=False):
572+
if not broken:
573+
self.shutdown_workers()
571574
# Release the queue's resources as soon as possible.
572575
self.call_queue.close()
573576
self.call_queue.join_thread()

Diff for: Lib/multiprocessing/queues.py

+22-5
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,19 @@ def cancel_join_thread(self):
158158
except AttributeError:
159159
pass
160160

161+
def _terminate_broken(self):
162+
# Close a Queue on error.
163+
164+
# gh-94777: Prevent queue writing to a pipe which is no longer read.
165+
self._reader.close()
166+
167+
# gh-107219: Close the connection writer which can unblock
168+
# Queue._feed() if it was stuck in send_bytes().
169+
self._writer.close()
170+
171+
self.close()
172+
self.join_thread()
173+
161174
def _start_thread(self):
162175
debug('Queue._start_thread()')
163176

@@ -169,13 +182,17 @@ def _start_thread(self):
169182
self._wlock, self._reader.close, self._writer.close,
170183
self._ignore_epipe, self._on_queue_feeder_error,
171184
self._sem),
172-
name='QueueFeederThread'
185+
name='QueueFeederThread',
186+
daemon=True,
173187
)
174-
self._thread.daemon = True
175188

176-
debug('doing self._thread.start()')
177-
self._thread.start()
178-
debug('... done self._thread.start()')
189+
try:
190+
debug('doing self._thread.start()')
191+
self._thread.start()
192+
debug('... done self._thread.start()')
193+
except:
194+
self._thread = None
195+
raise
179196

180197
if not self._joincancelled:
181198
self._jointhread = Finalize(

Diff for: Lib/test/test_concurrent_futures/test_process_pool.py

+22
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import os
22
import sys
3+
import threading
34
import time
45
import unittest
56
from concurrent import futures
@@ -187,6 +188,27 @@ def test_max_tasks_early_shutdown(self):
187188
for i, future in enumerate(futures):
188189
self.assertEqual(future.result(), mul(i, i))
189190

191+
def test_python_finalization_error(self):
192+
context = self.get_context()
193+
194+
# Create _ExecutorManagerThread, but block QueueFeederThread
195+
orig_start_new_thread = threading._start_new_thread
196+
nthread = 0
197+
def mock_start_new_thread(func, *args):
198+
nonlocal nthread
199+
if nthread >= 1:
200+
raise PythonFinalizationError()
201+
nthread += 1
202+
return orig_start_new_thread(func, *args)
203+
204+
with support.swap_attr(threading, '_start_new_thread',
205+
mock_start_new_thread):
206+
executor = self.executor_type(max_workers=2, mp_context=context)
207+
with executor:
208+
with self.assertRaises(BrokenProcessPool):
209+
list(executor.map(mul, [(2, 3)] * 10))
210+
executor.shutdown()
211+
190212

191213
create_executor_tests(globals(), ProcessPoolExecutorTest,
192214
executor_mixins=(ProcessPoolForkMixin,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
concurrent.futures: ``_ExecutorManagerThread`` now catches
2+
:exc:`PythonFinalizationError`, it calls ``terminate_broken()``. The
3+
exception occurs while Python is being finalized when adding an item to the
4+
*call queue* tries to create a new *queue feeder* thread. Patch by Victor
5+
Stinner.

0 commit comments

Comments
 (0)