Skip to content

Commit

Permalink
gh-91227: Ignore ERROR_PORT_UNREACHABLE in proactor recvfrom() (#32011)
Browse files Browse the repository at this point in the history
  • Loading branch information
esoma authored Mar 23, 2024
1 parent 9967b56 commit f11d0d8
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 12 deletions.
29 changes: 17 additions & 12 deletions Lib/asyncio/windows_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import _overlapped
import _winapi
import errno
from functools import partial
import math
import msvcrt
import socket
Expand Down Expand Up @@ -467,6 +468,18 @@ def finish_socket_func(trans, key, ov):
else:
raise

@classmethod
def _finish_recvfrom(cls, trans, key, ov, *, empty_result):
try:
return cls.finish_socket_func(trans, key, ov)
except OSError as exc:
# WSARecvFrom will report ERROR_PORT_UNREACHABLE when the same
# socket is used to send to an address that is not listening.
if exc.winerror == _overlapped.ERROR_PORT_UNREACHABLE:
return empty_result, None
else:
raise

def recv(self, conn, nbytes, flags=0):
self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL)
Expand Down Expand Up @@ -501,7 +514,8 @@ def recvfrom(self, conn, nbytes, flags=0):
except BrokenPipeError:
return self._result((b'', None))

return self._register(ov, conn, self.finish_socket_func)
return self._register(ov, conn, partial(self._finish_recvfrom,
empty_result=b''))

def recvfrom_into(self, conn, buf, flags=0):
self._register_with_iocp(conn)
Expand All @@ -511,17 +525,8 @@ def recvfrom_into(self, conn, buf, flags=0):
except BrokenPipeError:
return self._result((0, None))

def finish_recv(trans, key, ov):
try:
return ov.getresult()
except OSError as exc:
if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
_overlapped.ERROR_OPERATION_ABORTED):
raise ConnectionResetError(*exc.args)
else:
raise

return self._register(ov, conn, finish_recv)
return self._register(ov, conn, partial(self._finish_recvfrom,
empty_result=0))

def sendto(self, conn, buf, flags=0, addr=None):
self._register_with_iocp(conn)
Expand Down
74 changes: 74 additions & 0 deletions Lib/test/test_asyncio/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1378,6 +1378,80 @@ def test_create_datagram_endpoint_sock(self):
tr.close()
self.loop.run_until_complete(pr.done)

def test_datagram_send_to_non_listening_address(self):
# see:
# https://github.com/python/cpython/issues/91227
# https://github.com/python/cpython/issues/88906
# https://bugs.python.org/issue47071
# https://bugs.python.org/issue44743
# The Proactor event loop would fail to receive datagram messages after
# sending a message to an address that wasn't listening.
loop = self.loop

class Protocol(asyncio.DatagramProtocol):

_received_datagram = None

def datagram_received(self, data, addr):
self._received_datagram.set_result(data)

async def wait_for_datagram_received(self):
self._received_datagram = loop.create_future()
result = await asyncio.wait_for(self._received_datagram, 10)
self._received_datagram = None
return result

def create_socket():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setblocking(False)
sock.bind(('127.0.0.1', 0))
return sock

socket_1 = create_socket()
transport_1, protocol_1 = loop.run_until_complete(
loop.create_datagram_endpoint(Protocol, sock=socket_1)
)
addr_1 = socket_1.getsockname()

socket_2 = create_socket()
transport_2, protocol_2 = loop.run_until_complete(
loop.create_datagram_endpoint(Protocol, sock=socket_2)
)
addr_2 = socket_2.getsockname()

# creating and immediately closing this to try to get an address that
# is not listening
socket_3 = create_socket()
transport_3, protocol_3 = loop.run_until_complete(
loop.create_datagram_endpoint(Protocol, sock=socket_3)
)
addr_3 = socket_3.getsockname()
transport_3.abort()

transport_1.sendto(b'a', addr=addr_2)
self.assertEqual(loop.run_until_complete(
protocol_2.wait_for_datagram_received()
), b'a')

transport_2.sendto(b'b', addr=addr_1)
self.assertEqual(loop.run_until_complete(
protocol_1.wait_for_datagram_received()
), b'b')

# this should send to an address that isn't listening
transport_1.sendto(b'c', addr=addr_3)
loop.run_until_complete(asyncio.sleep(0))

# transport 1 should still be able to receive messages after sending to
# an address that wasn't listening
transport_2.sendto(b'd', addr=addr_1)
self.assertEqual(loop.run_until_complete(
protocol_1.wait_for_datagram_received()
), b'd')

transport_1.close()
transport_2.close()

def test_internal_fds(self):
loop = self.create_event_loop()
if not isinstance(loop, selector_events.BaseSelectorEventLoop):
Expand Down
81 changes: 81 additions & 0 deletions Lib/test/test_asyncio/test_sock_lowlevel.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,12 +555,93 @@ class SelectEventLoopTests(BaseSockTestsMixin,
def create_event_loop(self):
return asyncio.SelectorEventLoop()


class ProactorEventLoopTests(BaseSockTestsMixin,
test_utils.TestCase):

def create_event_loop(self):
return asyncio.ProactorEventLoop()


async def _basetest_datagram_send_to_non_listening_address(self,
recvfrom):
# see:
# https://github.com/python/cpython/issues/91227
# https://github.com/python/cpython/issues/88906
# https://bugs.python.org/issue47071
# https://bugs.python.org/issue44743
# The Proactor event loop would fail to receive datagram messages
# after sending a message to an address that wasn't listening.

def create_socket():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setblocking(False)
sock.bind(('127.0.0.1', 0))
return sock

socket_1 = create_socket()
addr_1 = socket_1.getsockname()

socket_2 = create_socket()
addr_2 = socket_2.getsockname()

# creating and immediately closing this to try to get an address
# that is not listening
socket_3 = create_socket()
addr_3 = socket_3.getsockname()
socket_3.shutdown(socket.SHUT_RDWR)
socket_3.close()

socket_1_recv_task = self.loop.create_task(recvfrom(socket_1))
socket_2_recv_task = self.loop.create_task(recvfrom(socket_2))
await asyncio.sleep(0)

await self.loop.sock_sendto(socket_1, b'a', addr_2)
self.assertEqual(await socket_2_recv_task, b'a')

await self.loop.sock_sendto(socket_2, b'b', addr_1)
self.assertEqual(await socket_1_recv_task, b'b')
socket_1_recv_task = self.loop.create_task(recvfrom(socket_1))
await asyncio.sleep(0)

# this should send to an address that isn't listening
await self.loop.sock_sendto(socket_1, b'c', addr_3)
self.assertEqual(await socket_1_recv_task, b'')
socket_1_recv_task = self.loop.create_task(recvfrom(socket_1))
await asyncio.sleep(0)

# socket 1 should still be able to receive messages after sending
# to an address that wasn't listening
socket_2.sendto(b'd', addr_1)
self.assertEqual(await socket_1_recv_task, b'd')

socket_1.shutdown(socket.SHUT_RDWR)
socket_1.close()
socket_2.shutdown(socket.SHUT_RDWR)
socket_2.close()


def test_datagram_send_to_non_listening_address_recvfrom(self):
async def recvfrom(socket):
data, _ = await self.loop.sock_recvfrom(socket, 4096)
return data

self.loop.run_until_complete(
self._basetest_datagram_send_to_non_listening_address(
recvfrom))


def test_datagram_send_to_non_listening_address_recvfrom_into(self):
async def recvfrom_into(socket):
buf = bytearray(4096)
length, _ = await self.loop.sock_recvfrom_into(socket, buf,
4096)
return buf[:length]

self.loop.run_until_complete(
self._basetest_datagram_send_to_non_listening_address(
recvfrom_into))

else:
import selectors

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix the asyncio ProactorEventLoop implementation so that sending a datagram to an address that is not listening does not prevent receiving any more datagrams.
1 change: 1 addition & 0 deletions Modules/overlapped.c
Original file line number Diff line number Diff line change
Expand Up @@ -2056,6 +2056,7 @@ overlapped_exec(PyObject *module)
WINAPI_CONSTANT(F_DWORD, ERROR_OPERATION_ABORTED);
WINAPI_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT);
WINAPI_CONSTANT(F_DWORD, ERROR_PIPE_BUSY);
WINAPI_CONSTANT(F_DWORD, ERROR_PORT_UNREACHABLE);
WINAPI_CONSTANT(F_DWORD, INFINITE);
WINAPI_CONSTANT(F_HANDLE, INVALID_HANDLE_VALUE);
WINAPI_CONSTANT(F_HANDLE, NULL);
Expand Down

0 comments on commit f11d0d8

Please sign in to comment.