Skip to content

Commit 2f8c80f

Browse files
[3.11] gh-107219: Fix concurrent.futures terminate_broken() (GH-109244) (#109255)
gh-107219: Fix concurrent.futures terminate_broken() (GH-109244) Fix a race condition in concurrent.futures. When a process in the process pool was terminated abruptly (while the future was running or pending), close the connection write end. If the call queue is blocked on sending bytes to a worker process, closing the connection write end interrupts the send, so the queue can be closed. Changes: * _ExecutorManagerThread.terminate_broken() now closes call_queue._writer. * multiprocessing PipeConnection.close() now interrupts WaitForMultipleObjects() in _send_bytes() by cancelling the overlapped operation. (cherry picked from commit a9b1f84) Co-authored-by: Victor Stinner <vstinner@python.org>
1 parent a7e80fb commit 2f8c80f

File tree

3 files changed

+27
-0
lines changed

3 files changed

+27
-0
lines changed

Lib/concurrent/futures/process.py

+4
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,10 @@ def terminate_broken(self, cause):
501501
# https://github.com/python/cpython/issues/94777
502502
self.call_queue._reader.close()
503503

504+
# gh-107219: Close the connection writer which can unblock
505+
# Queue._feed() if it was stuck in send_bytes().
506+
self.call_queue._writer.close()
507+
504508
# clean up resources
505509
self.join_executor_internals()
506510

Lib/multiprocessing/connection.py

+18
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
__all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ]
1111

12+
import errno
1213
import io
1314
import os
1415
import sys
@@ -41,6 +42,7 @@
4142
BUFSIZE = 8192
4243
# A very generous timeout when it comes to local connections...
4344
CONNECTION_TIMEOUT = 20.
45+
WSA_OPERATION_ABORTED = 995
4446

4547
_mmap_counter = itertools.count()
4648

@@ -271,12 +273,22 @@ class PipeConnection(_ConnectionBase):
271273
with FILE_FLAG_OVERLAPPED.
272274
"""
273275
_got_empty_message = False
276+
_send_ov = None
274277

275278
def _close(self, _CloseHandle=_winapi.CloseHandle):
279+
ov = self._send_ov
280+
if ov is not None:
281+
# Interrupt WaitForMultipleObjects() in _send_bytes()
282+
ov.cancel()
276283
_CloseHandle(self._handle)
277284

278285
def _send_bytes(self, buf):
286+
if self._send_ov is not None:
287+
# A connection should only be used by a single thread
288+
raise ValueError("concurrent send_bytes() calls "
289+
"are not supported")
279290
ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
291+
self._send_ov = ov
280292
try:
281293
if err == _winapi.ERROR_IO_PENDING:
282294
waitres = _winapi.WaitForMultipleObjects(
@@ -286,7 +298,13 @@ def _send_bytes(self, buf):
286298
ov.cancel()
287299
raise
288300
finally:
301+
self._send_ov = None
289302
nwritten, err = ov.GetOverlappedResult(True)
303+
if err == WSA_OPERATION_ABORTED:
304+
# close() was called by another thread while
305+
# WaitForMultipleObjects() was waiting for the overlapped
306+
# operation.
307+
raise OSError(errno.EPIPE, "handle is closed")
290308
assert err == 0
291309
assert nwritten == len(buf)
292310

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Fix a race condition in ``concurrent.futures``. When a process in the
2+
process pool was terminated abruptly (while the future was running or
3+
pending), close the connection write end. If the call queue is blocked on
4+
sending bytes to a worker process, closing the connection write end interrupts
5+
the send, so the queue can be closed. Patch by Victor Stinner.

0 commit comments

Comments
 (0)