Skip to content

gh-109370: Support closing Connection and PipeConnection from other thread #109397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 81 additions & 26 deletions Lib/multiprocessing/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
BUFSIZE = 8192
# A very generous timeout when it comes to local connections...
CONNECTION_TIMEOUT = 20.
WSA_OPERATION_ABORTED = 995

_mmap_counter = itertools.count()

Expand Down Expand Up @@ -130,12 +129,13 @@ def __init__(self, handle, readable=True, writable=True):
# XXX should we use util.Finalize instead of a __del__?

def __del__(self):
if self._handle is not None:
self._close()
handle = self._handle
if handle is not None:
self._close(handle)

def _check_closed(self):
if self._handle is None:
raise OSError("handle is closed")
raise OSError(errno.EPIPE, "handle is closed")

def _check_readable(self):
if not self._readable:
Expand Down Expand Up @@ -174,11 +174,10 @@ def fileno(self):

def close(self):
"""Close the connection"""
if self._handle is not None:
try:
self._close()
finally:
self._handle = None
handle = self._handle
if handle is not None:
self._handle = None
self._close(handle)

def send_bytes(self, buf, offset=0, size=None):
"""Send the bytes data from a bytes-like object"""
Expand Down Expand Up @@ -275,19 +274,27 @@ class PipeConnection(_ConnectionBase):
_got_empty_message = False
_send_ov = None

def _close(self, _CloseHandle=_winapi.CloseHandle):
def _close(self, handle, *, _CloseHandle=_winapi.CloseHandle):
ov = self._send_ov
if ov is not None:
# Interrupt WaitForMultipleObjects() in _send_bytes()
ov.cancel()
_CloseHandle(self._handle)
_CloseHandle(handle)

def _send_bytes(self, buf):
if self._send_ov is not None:
# A connection should only be used by a single thread
raise ValueError("concurrent send_bytes() calls "
"are not supported")
ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
try:
ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
except OSError as err:
if err.errno == errno.EBADF:
raise OSError(errno.EPIPE, "handle is closed")
raise
except TypeError:
self._check_closed()
raise
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to split this change in two parts? Keep this change for _check_closed() change which now raises BrokenPipeError, and you changes around close(): these ones look straightforward are correct. But write a separated PR to add these try/except?

I dislike these try/except. If there is a risk to land into EBADF case with the current code, maybe some kind of locking is needed to not call the Windows API with a handle, while another thread call close() which invalidates the handle.

self._send_ov = ov
try:
if err == _winapi.ERROR_IO_PENDING:
Expand All @@ -300,7 +307,7 @@ def _send_bytes(self, buf):
finally:
self._send_ov = None
nwritten, err = ov.GetOverlappedResult(True)
if err == WSA_OPERATION_ABORTED:
if err == _winapi.ERROR_OPERATION_ABORTED:
# close() was called by another thread while
# WaitForMultipleObjects() was waiting for the overlapped
# operation.
Expand All @@ -315,8 +322,16 @@ def _recv_bytes(self, maxsize=None):
else:
bsize = 128 if maxsize is None else min(maxsize, 128)
try:
ov, err = _winapi.ReadFile(self._handle, bsize,
overlapped=True)
try:
ov, err = _winapi.ReadFile(self._handle, bsize,
overlapped=True)
except OSError as err:
if err.errno == errno.EBADF:
raise OSError(errno.EPIPE, "handle is closed")
raise
except TypeError:
self._check_closed()
raise
try:
if err == _winapi.ERROR_IO_PENDING:
waitres = _winapi.WaitForMultipleObjects(
Expand All @@ -341,20 +356,44 @@ def _recv_bytes(self, maxsize=None):
raise RuntimeError("shouldn't get here; expected KeyboardInterrupt")

def _poll(self, timeout):
if (self._got_empty_message or
_winapi.PeekNamedPipe(self._handle)[0] != 0):
return True
try:
if (self._got_empty_message or
_winapi.PeekNamedPipe(self._handle)[0] != 0):
return True
except OSError as err:
if err.errno == errno.EBADF:
Copy link
Member

@vstinner vstinner Sep 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not get EBADF. We should just not call functions on closed handle. If we land in this case, we failed badly before, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we know that it was closed in other thread without using locks?

You can get EBADF in the following scenario:

  • Thread A reads self._handle and puts in on the stack.
  • Thread B closes it.
  • Thread A calls function with an already closed file descriptor on the stack.
  • That function fails with EBADF errno.

It is the bast case. The worst case is:

  • Thread A reads self._handle and puts in on the stack.
  • Thread B closes it.
  • Thread B or C opens a new file descriptor which reuses the same integer value.
  • Thread A calls function with a file descriptor which now refers to unrelated file or socket.
  • Unpredictable consequences.

raise OSError(errno.EPIPE, "handle is closed")
raise
except TypeError:
self._check_closed()
raise
return bool(wait([self], timeout))

def _get_more_data(self, ov, maxsize):
buf = ov.getbuffer()
f = io.BytesIO()
f.write(buf)
left = _winapi.PeekNamedPipe(self._handle)[1]
try:
left = _winapi.PeekNamedPipe(self._handle)[1]
except OSError as err:
if err.errno == errno.EBADF:
raise OSError(errno.EPIPE, "handle is closed")
raise
except TypeError:
self._check_closed()
raise
assert left > 0
if maxsize is not None and len(buf) + left > maxsize:
self._bad_message_length()
ov, err = _winapi.ReadFile(self._handle, left, overlapped=True)
try:
ov, err = _winapi.ReadFile(self._handle, left, overlapped=True)
except OSError as err:
if err.errno == errno.EBADF:
raise OSError(errno.EPIPE, "handle is closed")
raise
except TypeError:
self._check_closed()
raise
rbytes, err = ov.GetOverlappedResult(True)
assert err == 0
assert rbytes == left
Expand All @@ -369,20 +408,28 @@ class Connection(_ConnectionBase):
"""

if _winapi:
def _close(self, _close=_multiprocessing.closesocket):
_close(self._handle)
def _close(self, handle, *, _close=_multiprocessing.closesocket):
_close(handle)
_write = _multiprocessing.send
_read = _multiprocessing.recv
else:
def _close(self, _close=os.close):
_close(self._handle)
def _close(self, handle, *, _close=os.close):
_close(handle)
_write = os.write
_read = os.read

def _send(self, buf, write=_write):
remaining = len(buf)
while True:
n = write(self._handle, buf)
try:
n = write(self._handle, buf)
except OSError as err:
if err.errno == errno.EBADF:
raise OSError(errno.EPIPE, "handle is closed")
raise
except TypeError:
self._check_closed()
raise
remaining -= n
if remaining == 0:
break
Expand All @@ -393,7 +440,15 @@ def _recv(self, size, read=_read):
handle = self._handle
remaining = size
while remaining > 0:
chunk = read(handle, remaining)
try:
chunk = read(handle, remaining)
except OSError as err:
if err.errno == errno.EBADF:
raise OSError(errno.EPIPE, "handle is closed")
raise
except TypeError:
self._check_closed()
raise
n = len(chunk)
if n == 0:
if remaining == size:
Expand Down