From 4c8246451a5bdbd93519f3dfcfbd427d31e43da8 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Mon, 11 Sep 2023 00:27:50 +0200 Subject: [PATCH 1/2] gh-107219: Fix concurrent.futures terminate_broken() Fix a race condition in concurrent.futures. When a process in the process pool was terminated abruptly (while the future was running or pending), close the connection write end. If the call queue is blocked on sending bytes to a worker process, closing the connection write end interrupts the send, so the queue can be closed. Changes: * _ExecutorManagerThread.terminate_broken() now closes call_queue._writer. * multiprocessing PipeConnection.close() now interrupts WaitForMultipleObjects() in _send_bytes() by cancelling the overlapped operation. --- Lib/concurrent/futures/process.py | 4 ++++ Lib/multiprocessing/connection.py | 21 +++++++++++++++++++ ...-09-11-00-32-18.gh-issue-107219.3zqyFT.rst | 5 +++++ 3 files changed, 30 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2023-09-11-00-32-18.gh-issue-107219.3zqyFT.rst diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 9933d3d0e040ea..f4b5cd1d869067 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -510,6 +510,10 @@ def terminate_broken(self, cause): # https://github.com/python/cpython/issues/94777 self.call_queue._reader.close() + # gh-107219: Close the connection writer which can unblock + # Queue._feed() if it was stuck in send_bytes(). + self.call_queue._writer.close() + # clean up resources self.join_executor_internals() diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 04eaea811cfbbe..084d5bb66612fb 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -9,6 +9,7 @@ __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] +import errno import io import os import sys @@ -41,6 +42,7 @@ BUFSIZE = 8192 # A very generous timeout when it comes to local connections... CONNECTION_TIMEOUT = 20. +WSA_OPERATION_ABORTED = 995 _mmap_counter = itertools.count() @@ -272,11 +274,24 @@ class PipeConnection(_ConnectionBase): """ _got_empty_message = False + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._send_ov = None + def _close(self, _CloseHandle=_winapi.CloseHandle): + ov = self._send_ov + if ov is not None: + # Interrupt WaitForMultipleObjects() in _send_bytes() + ov.cancel() _CloseHandle(self._handle) def _send_bytes(self, buf): + if self._send_ov is not None: + # A connection should only be used by a single thread + raise ValueError("concurrent send_bytes() calls " + "are not supported") ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True) + self._send_ov = ov try: if err == _winapi.ERROR_IO_PENDING: waitres = _winapi.WaitForMultipleObjects( @@ -286,7 +301,13 @@ def _send_bytes(self, buf): ov.cancel() raise finally: + self._send_ov = None nwritten, err = ov.GetOverlappedResult(True) + if err == WSA_OPERATION_ABORTED: + # close() was called by another thread while + # WaitForMultipleObjects() was waiting for the overlapped + # operation. + raise OSError(errno.EPIPE, "handle is closed") assert err == 0 assert nwritten == len(buf) diff --git a/Misc/NEWS.d/next/Library/2023-09-11-00-32-18.gh-issue-107219.3zqyFT.rst b/Misc/NEWS.d/next/Library/2023-09-11-00-32-18.gh-issue-107219.3zqyFT.rst new file mode 100644 index 00000000000000..10afbcf823386a --- /dev/null +++ b/Misc/NEWS.d/next/Library/2023-09-11-00-32-18.gh-issue-107219.3zqyFT.rst @@ -0,0 +1,5 @@ +Fix a race condition in ``concurrent.futures``. When a process in the +process pool was terminated abruptly (while the future was running or +pending), close the connection write end. If the call queue is blocked on +sending bytes to a worker process, closing the connection write end interrupts +the send, so the queue can be closed. Patch by Victor Stinner. From 069fbfa35488828d4e23eb4a39c17b7b38bd6a39 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Mon, 11 Sep 2023 09:37:37 +0200 Subject: [PATCH 2/2] Remove PipeConnection.__init__() Address Serhiy's review. --- Lib/multiprocessing/connection.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py index 084d5bb66612fb..7c425a2d8e7034 100644 --- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -273,10 +273,7 @@ class PipeConnection(_ConnectionBase): with FILE_FLAG_OVERLAPPED. """ _got_empty_message = False - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._send_ov = None + _send_ov = None def _close(self, _CloseHandle=_winapi.CloseHandle): ov = self._send_ov