From 5f1cadb1d8a4d1a58b758da519e28e97d361edc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 20 Feb 2022 19:40:45 +0200 Subject: [PATCH 01/11] Added three new raw socket methods to asyncio * sock_sendto() * sock_recvfrom() * sock_recvfrom_into() --- Doc/library/asyncio-eventloop.rst | 35 ++++++ Doc/library/asyncio-llapi-index.rst | 9 ++ Lib/asyncio/events.py | 9 ++ Lib/asyncio/proactor_events.py | 9 ++ Lib/asyncio/selector_events.py | 121 ++++++++++++++++++++ Lib/asyncio/windows_events.py | 20 ++++ Lib/test/test_asyncio/test_sock_lowlevel.py | 71 +++++++++++- Lib/test/test_asyncio/utils.py | 25 ++++ Modules/clinic/overlapped.c.h | 35 +++++- Modules/overlapped.c | 114 +++++++++++++----- 10 files changed, 417 insertions(+), 31 deletions(-) diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index 0c65d75dbf8e01..5a5dabbd3f9728 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -872,6 +872,29 @@ convenient. .. versionadded:: 3.7 +.. coroutinemethod:: loop.sock_recvfrom(sock, bufsize) + + Receive a datagram of up to *bufsize* from *sock*. Asynchronous version of + :meth:`socket.recvfrom() `. + + Return a tuple of (received data, remote address). + + *sock* must be a non-blocking socket. + + .. versionadded:: 3.11 + +.. coroutinemethod:: loop.sock_recvfrom_into(sock, buf, nbytes=0) + + Receive a datagram of up to *nbytes* from *sock* into *buf*. + Asynchronous version of + :meth:`socket.recvfrom_into() `. + + Return a tuple of (received data, remote address). + + *sock* must be a non-blocking socket. + + .. versionadded:: 3.11 + .. coroutinemethod:: loop.sock_sendall(sock, data) Send *data* to the *sock* socket. Asynchronous version of @@ -890,6 +913,18 @@ convenient. method, before Python 3.7 it returned a :class:`Future`. Since Python 3.7, this is an ``async def`` method. +.. coroutinemethod:: loop.sock_sendto(sock, data, address) + + Send a datagram from *sock* to *address*. + Asynchronous version of + :meth:`socket.sendto() `. + + Return the number of bytes sent. + + *sock* must be a non-blocking socket. + + .. versionadded:: 3.11 + .. coroutinemethod:: loop.sock_connect(sock, address) Connect *sock* to a remote socket at *address*. diff --git a/Doc/library/asyncio-llapi-index.rst b/Doc/library/asyncio-llapi-index.rst index 0ab322af6dc72d..69b550e43f5aa9 100644 --- a/Doc/library/asyncio-llapi-index.rst +++ b/Doc/library/asyncio-llapi-index.rst @@ -189,9 +189,18 @@ See also the main documentation section about the * - ``await`` :meth:`loop.sock_recv_into` - Receive data from the :class:`~socket.socket` into a buffer. + * - ``await`` :meth:`loop.sock_recvfrom` + - Receive a datagram from the :class:`~socket.socket`. + + * - ``await`` :meth:`loop.sock_recvfrom_into` + - Receive a datagram from the :class:`~socket.socket` into a buffer. + * - ``await`` :meth:`loop.sock_sendall` - Send data to the :class:`~socket.socket`. + * - ``await`` :meth:`loop.sock_sendto` + - Send a datagram via the :class:`~socket.socket` to the given address. + * - ``await`` :meth:`loop.sock_connect` - Connect the :class:`~socket.socket`. diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index 1d305e3ddff1cd..e682a192a887f2 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -546,9 +546,18 @@ async def sock_recv(self, sock, nbytes): async def sock_recv_into(self, sock, buf): raise NotImplementedError + async def sock_recvfrom(self, sock, bufsize): + raise NotImplementedError + + async def sock_recvfrom_into(self, sock, buf, nbytes=0): + raise NotImplementedError + async def sock_sendall(self, sock, data): raise NotImplementedError + async def sock_sendto(self, sock, data, address): + raise NotImplementedError + async def sock_connect(self, sock, address): raise NotImplementedError diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index ae59f30db1c3c9..8f3012d17be3c2 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -699,9 +699,18 @@ async def sock_recv(self, sock, n): async def sock_recv_into(self, sock, buf): return await self._proactor.recv_into(sock, buf) + async def sock_recvfrom(self, sock, bufsize): + return await self._proactor.recvfrom(sock, bufsize) + + async def sock_recvfrom_into(self, sock, buf, nbytes=0): + return await self._proactor.recvfrom_into(sock, buf, nbytes) + async def sock_sendall(self, sock, data): return await self._proactor.send(sock, data) + async def sock_sendto(self, sock, data, address): + return await self._proactor.send(sock, data, 0, address) + async def sock_connect(self, sock, address): return await self._proactor.connect(sock, address) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 63ab15f30fb5d7..a0757f0c7e3334 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -439,6 +439,85 @@ def _sock_recv_into(self, fut, sock, buf): else: fut.set_result(nbytes) + async def sock_recvfrom(self, sock, bufsize): + """Receive a datagram from a datagram socket. + + The return value is a tuple of (bytes, address) representing the + datagram received and the address it came from. + The maximum amount of data to be received at once is specified by + nbytes. + """ + _check_ssl_socket(sock) + if self._debug and sock.gettimeout() != 0: + raise ValueError("the socket must be non-blocking") + try: + return sock.recvfrom(bufsize) + except (BlockingIOError, InterruptedError): + pass + fut = self.create_future() + fd = sock.fileno() + self._ensure_fd_no_transport(fd) + handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize) + fut.add_done_callback( + functools.partial(self._sock_read_done, fd, handle=handle)) + return await fut + + def _sock_recvfrom(self, fut, sock, bufsize): + # _sock_recvfrom() can add itself as an I/O callback if the operation + # can't be done immediately. Don't use it directly, call + # sock_recvfrom(). + if fut.done(): + return + try: + result = sock.recvfrom(bufsize) + except (BlockingIOError, InterruptedError): + return # try again next time + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: + fut.set_exception(exc) + else: + fut.set_result(result) + + async def sock_recvfrom_into(self, sock, buf, nbytes=0): + """Receive data from the socket. + + The received data is written into *buf* (a writable buffer). + The return value is a tuple of (number of bytes written, address). + """ + _check_ssl_socket(sock) + if self._debug and sock.gettimeout() != 0: + raise ValueError("the socket must be non-blocking") + try: + return sock.recvfrom_into(buf, nbytes) + except (BlockingIOError, InterruptedError): + pass + fut = self.create_future() + fd = sock.fileno() + self._ensure_fd_no_transport(fd) + handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf, + nbytes) + fut.add_done_callback( + functools.partial(self._sock_read_done, fd, handle=handle)) + return await fut + + def _sock_recvfrom_into(self, fut, sock, buf, bufsize): + # _sock_recv_into() can add itself as an I/O callback if the operation + # can't be done immediately. Don't use it directly, call + # sock_recv_into(). + if fut.done(): + return + try: + result = sock.recvfrom_into(buf, bufsize) + except (BlockingIOError, InterruptedError): + return # try again next time + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: + fut.set_exception(exc) + else: + fut.set_result(result) + async def sock_sendall(self, sock, data): """Send data to the socket. @@ -492,6 +571,48 @@ def _sock_sendall(self, fut, sock, view, pos): else: pos[0] = start + async def sock_sendto(self, sock, data, address): + """Send data to the socket. + + The socket must be connected to a remote socket. This method continues + to send data from data until either all data has been sent or an + error occurs. None is returned on success. On error, an exception is + raised, and there is no way to determine how much data, if any, was + successfully processed by the receiving end of the connection. + """ + _check_ssl_socket(sock) + if self._debug and sock.gettimeout() != 0: + raise ValueError("the socket must be non-blocking") + try: + return sock.sendto(data, address) + except (BlockingIOError, InterruptedError): + pass + + fut = self.create_future() + fd = sock.fileno() + self._ensure_fd_no_transport(fd) + # use a trick with a list in closure to store a mutable state + handle = self._add_writer(fd, self._sock_sendto, fut, sock, data, + address) + fut.add_done_callback( + functools.partial(self._sock_write_done, fd, handle=handle)) + return await fut + + def _sock_sendto(self, fut, sock, data, address): + if fut.done(): + # Future cancellation can be scheduled on previous loop iteration + return + try: + n = sock.sendto(data, 0, address) + except (BlockingIOError, InterruptedError): + return + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: + fut.set_exception(exc) + else: + fut.set_result(n) + async def sock_connect(self, sock, address): """Connect to a remote socket at address. diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 0d9a07ef4772e5..3a27e570a98357 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -512,6 +512,26 @@ def finish_recv(trans, key, ov): return self._register(ov, conn, finish_recv) + def recvfrom_into(self, conn, buf, flags=0): + self._register_with_iocp(conn) + ov = _overlapped.Overlapped(NULL) + try: + ov.WSARecvFromInto(conn.fileno(), buf, flags) + except BrokenPipeError: + return self._result((b'', None)) + + def finish_recv(trans, key, ov): + try: + return ov.getresult() + except OSError as exc: + if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, + _overlapped.ERROR_OPERATION_ABORTED): + raise ConnectionResetError(*exc.args) + else: + raise + + return self._register(ov, conn, finish_recv) + def sendto(self, conn, buf, flags=0, addr=None): self._register_with_iocp(conn) ov = _overlapped.Overlapped(NULL) diff --git a/Lib/test/test_asyncio/test_sock_lowlevel.py b/Lib/test/test_asyncio/test_sock_lowlevel.py index 14001a4a5001f8..5b088924ab022c 100644 --- a/Lib/test/test_asyncio/test_sock_lowlevel.py +++ b/Lib/test/test_asyncio/test_sock_lowlevel.py @@ -5,11 +5,11 @@ from asyncio import proactor_events from itertools import cycle, islice +from unittest.mock import patch, Mock from test.test_asyncio import utils as test_utils from test import support from test.support import socket_helper - def tearDownModule(): asyncio.set_event_loop_policy(None) @@ -380,6 +380,75 @@ def test_huge_content_recvinto(self): self.loop.run_until_complete( self._basetest_huge_content_recvinto(httpd.address)) + async def _basetest_datagram_recvfrom(self, server_address): + # Happy path, sock.sendto() returns immediately + data = b'\x01' * 4096 + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + sock.setblocking(False) + await self.loop.sock_sendto(sock, data, server_address) + received_data, from_addr = await self.loop.sock_recvfrom( + sock, 4096) + self.assertEqual(received_data, data) + self.assertEqual(from_addr, server_address) + + def test_recvfrom(self): + with test_utils.run_udp_echo_server() as server_address: + self.loop.run_until_complete( + self._basetest_datagram_recvfrom(server_address)) + + async def _basetest_datagram_recvfrom_into(self, server_address): + # Happy path, sock.sendto() returns immediately + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + sock.setblocking(False) + + buf = bytearray(4096) + data = b'\x01' * 4096 + await self.loop.sock_sendto(sock, data, server_address) + num_bytes, from_addr = await self.loop.sock_recvfrom_into( + sock, buf) + self.assertEqual(num_bytes, 4096) + self.assertEqual(buf, data) + self.assertEqual(from_addr, server_address) + + buf = bytearray(4096) + await self.loop.sock_sendto(sock, data, server_address) + num_bytes, from_addr = await self.loop.sock_recvfrom_into( + sock, buf, 2048) + self.assertEqual(num_bytes, 2048) + self.assertEqual(buf[:2048], data[:2048]) + self.assertEqual(from_addr, server_address) + + def test_recvfrom_into(self): + with test_utils.run_udp_echo_server() as server_address: + self.loop.run_until_complete( + self._basetest_datagram_recvfrom_into(server_address)) + + async def _basetest_datagram_sendto_blocking(self, server_address): + # Sad path, sock.sendto() raises BlockingIOError + # This involves patching sock.sendto() to raise BlockingIOError but + # sendto() is not used by the proactor event loop + data = b'\x01' * 4096 + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + sock.setblocking(False) + mock_sock = Mock(socket.socket) + mock_sock.gettimeout = sock.gettimeout + mock_sock.sendto.configure_mock(side_effect=BlockingIOError) + mock_sock.fileno = sock.fileno + self.loop.call_soon( + lambda: setattr(mock_sock, 'sendto', sock.sendto) + ) + await self.loop.sock_sendto(mock_sock, data, server_address) + + received_data, from_addr = await self.loop.sock_recvfrom( + sock, 4096) + self.assertEqual(received_data, data) + self.assertEqual(from_addr, server_address) + + def test_sendto_blocking(self): + with test_utils.run_udp_echo_server() as server_address: + self.loop.run_until_complete( + self._basetest_datagram_sendto_blocking(server_address)) + @socket_helper.skip_unless_bind_unix_socket def test_unix_sock_client_ops(self): with test_utils.run_test_unix_server() as httpd: diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py index 0b9cde6878f37a..c32494d40ccea8 100644 --- a/Lib/test/test_asyncio/utils.py +++ b/Lib/test/test_asyncio/utils.py @@ -281,6 +281,31 @@ def run_test_server(*, host='127.0.0.1', port=0, use_ssl=False): server_ssl_cls=SSLWSGIServer) +def echo_datagrams(sock): + while True: + data, addr = sock.recvfrom(4096) + if data == b'STOP': + sock.close() + break + else: + sock.sendto(data, addr) + + +@contextlib.contextmanager +def run_udp_echo_server(*, host='127.0.0.1', port=0): + addr_info = socket.getaddrinfo(host, port, type=socket.SOCK_DGRAM) + family, type, proto, _, sockaddr = addr_info[0] + sock = socket.socket(family, type, proto) + sock.bind((host, port)) + thread = threading.Thread(target=lambda: echo_datagrams(sock)) + thread.start() + try: + yield sock.getsockname() + finally: + sock.sendto(b'STOP', sock.getsockname()) + thread.join() + + def make_test_protocol(base): dct = {} for name in dir(base): diff --git a/Modules/clinic/overlapped.c.h b/Modules/clinic/overlapped.c.h index efecd9028b7760..4e96ee2bb65772 100644 --- a/Modules/clinic/overlapped.c.h +++ b/Modules/clinic/overlapped.c.h @@ -905,4 +905,37 @@ _overlapped_Overlapped_WSARecvFrom(OverlappedObject *self, PyObject *const *args exit: return return_value; } -/*[clinic end generated code: output=ee2ec2f93c8d334b input=a9049054013a1b77]*/ + +PyDoc_STRVAR(_overlapped_Overlapped_WSARecvFromInto__doc__, +"WSARecvFromInto($self, handle, buf, size, flags=0, /)\n" +"--\n" +"\n" +"Start overlapped receive."); + +#define _OVERLAPPED_OVERLAPPED_WSARECVFROMINTO_METHODDEF \ + {"WSARecvFromInto", (PyCFunction)(void(*)(void))_overlapped_Overlapped_WSARecvFromInto, METH_FASTCALL, _overlapped_Overlapped_WSARecvFromInto__doc__}, + +static PyObject * +_overlapped_Overlapped_WSARecvFromInto_impl(OverlappedObject *self, + HANDLE handle, PyObject *bufobj, + DWORD size, DWORD flags); + +static PyObject * +_overlapped_Overlapped_WSARecvFromInto(OverlappedObject *self, PyObject *const *args, Py_ssize_t nargs) +{ + PyObject *return_value = NULL; + HANDLE handle; + PyObject *bufobj; + DWORD size; + DWORD flags = 0; + + if (!_PyArg_ParseStack(args, nargs, ""F_HANDLE"Ok|k:WSARecvFromInto", + &handle, &bufobj, &size, &flags)) { + goto exit; + } + return_value = _overlapped_Overlapped_WSARecvFromInto_impl(self, handle, bufobj, size, flags); + +exit: + return return_value; +} +/*[clinic end generated code: output=9bc096b4db308930 input=a9049054013a1b77]*/ diff --git a/Modules/overlapped.c b/Modules/overlapped.c index 2ba48c8b845f53..d9f5a3d22034bf 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -1053,6 +1053,46 @@ do_WSARecv(OverlappedObject *self, HANDLE handle, } } +static PyObject * +do_WSARecvFrom(OverlappedObject *self, HANDLE handle, + PyObject *bufobj, DWORD buflen, DWORD flags) +{ + DWORD nread; + WSABUF wsabuf; + int ret; + DWORD err; + + wsabuf.buf = PyBytes_AS_STRING(buf); + wsabuf.len = buflen; + + self->type = TYPE_READFROM; + self->handle = handle; + self->read_from.allocated_buffer = bufobj; + memset(&self->read_from.address, 0, sizeof(self->read_from.address)); + self->read_from.address_length = sizeof(self->read_from.address); + + Py_BEGIN_ALLOW_THREADS + ret = WSARecvFrom((SOCKET)handle, &wsabuf, 1, &nread, &flags, + (SOCKADDR*)&self->read_from.address, + &self->read_from.address_length, + &self->overlapped, NULL); + Py_END_ALLOW_THREADS + + self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS); + switch(err) { + case ERROR_BROKEN_PIPE: + mark_as_completed(&self->overlapped); + return SetFromWindowsErr(err); + case ERROR_SUCCESS: + case ERROR_MORE_DATA: + case ERROR_IO_PENDING: + Py_RETURN_NONE; + default: + self->type = TYPE_NOT_STARTED; + return SetFromWindowsErr(err); + } +} + /*[clinic input] _overlapped.Overlapped.WSARecv @@ -1766,11 +1806,7 @@ _overlapped_Overlapped_WSARecvFrom_impl(OverlappedObject *self, DWORD flags) /*[clinic end generated code: output=13832a2025b86860 input=1b2663fa130e0286]*/ { - DWORD nread; PyObject *buf; - WSABUF wsabuf; - int ret; - DWORD err; if (self->type != TYPE_NONE) { PyErr_SetString(PyExc_ValueError, "operation already attempted"); @@ -1785,38 +1821,56 @@ _overlapped_Overlapped_WSARecvFrom_impl(OverlappedObject *self, return NULL; } - wsabuf.len = size; - wsabuf.buf = PyBytes_AS_STRING(buf); + return do_WSARecvFrom(self, handle, buf, size, flags); +} - self->type = TYPE_READ_FROM; - self->handle = handle; - self->read_from.allocated_buffer = buf; - memset(&self->read_from.address, 0, sizeof(self->read_from.address)); - self->read_from.address_length = sizeof(self->read_from.address); - Py_BEGIN_ALLOW_THREADS - ret = WSARecvFrom((SOCKET)handle, &wsabuf, 1, &nread, &flags, - (SOCKADDR*)&self->read_from.address, - &self->read_from.address_length, - &self->overlapped, NULL); - Py_END_ALLOW_THREADS +/*[clinic input] +_overlapped.Overlapped.WSARecvFromInto - self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS); + handle: HANDLE + buf as bufobj: object + size: DWORD + flags: DWORD = 0 + / - switch(err) { - case ERROR_BROKEN_PIPE: - mark_as_completed(&self->overlapped); - return SetFromWindowsErr(err); - case ERROR_SUCCESS: - case ERROR_MORE_DATA: - case ERROR_IO_PENDING: - Py_RETURN_NONE; - default: - self->type = TYPE_NOT_STARTED; - return SetFromWindowsErr(err); +Start overlapped receive. +[clinic start generated code]*/ + +static PyObject * +_overlapped_Overlapped_WSARecvFromInto_impl(OverlappedObject *self, + HANDLE handle, PyObject *bufobj, + DWORD size, DWORD flags) +/*[clinic end generated code: output=45fc5d883a11c4e5 input=8eacd80d50157434]*/ +{ + if (self->type != TYPE_NONE) { + PyErr_SetString(PyExc_ValueError, "operation already attempted"); + return NULL; + } + + Py_buffer buffer; + if (!PyArg_Parse(bufobj, "y*", &buffer)) + return NULL; + +#if SIZEOF_SIZE_T > SIZEOF_LONG + if (buffer.len > (Py_ssize_t)ULONG_MAX) { + PyBuffer_Release(&buffer); + PyErr_SetString(PyExc_ValueError, "buffer too large"); + return NULL; } +#endif + if (buffer.len < size) { + PyBuffer_Release(&buffer); + PyErr_SetString(PyExc_ValueError, + "nbytes is greater than the length of the buffer"); + return NULL; + } + + PyBuffer_Release(&buffer); + return do_WSARecvFrom(self, handle, bufobj, size, flags); } + #include "clinic/overlapped.c.h" static PyMethodDef Overlapped_methods[] = { @@ -1826,6 +1880,8 @@ static PyMethodDef Overlapped_methods[] = { _OVERLAPPED_OVERLAPPED_READFILEINTO_METHODDEF _OVERLAPPED_OVERLAPPED_WSARECV_METHODDEF _OVERLAPPED_OVERLAPPED_WSARECVINTO_METHODDEF + _OVERLAPPED_OVERLAPPED_WSARECVFROM_METHODDEF + _OVERLAPPED_OVERLAPPED_WSARECVFROMINTO_METHODDEF _OVERLAPPED_OVERLAPPED_WRITEFILE_METHODDEF _OVERLAPPED_OVERLAPPED_WSASEND_METHODDEF _OVERLAPPED_OVERLAPPED_ACCEPTEX_METHODDEF From ef60dcf6ae0751f80cc670a5ce08faf4fc959ed8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 20 Feb 2022 23:03:36 +0200 Subject: [PATCH 02/11] Added news blurb --- .../next/Library/2022-02-20-23-03-32.bpo-46805.HZ8xWG.rst | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2022-02-20-23-03-32.bpo-46805.HZ8xWG.rst diff --git a/Misc/NEWS.d/next/Library/2022-02-20-23-03-32.bpo-46805.HZ8xWG.rst b/Misc/NEWS.d/next/Library/2022-02-20-23-03-32.bpo-46805.HZ8xWG.rst new file mode 100644 index 00000000000000..3c877d5498cd6a --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-02-20-23-03-32.bpo-46805.HZ8xWG.rst @@ -0,0 +1,4 @@ +Added raw datagram socket functions for asyncio: +:meth:`~asyncio.AbstractEventLoop.sock_sendto`, +:meth:`~asyncio.AbstractEventLoop.sock_recvfrom` and +:meth:`~asyncio.AbstractEventLoop.sock_recvfrom_into`. From 928742ca590b10966dca58a52bf0ee7408712423 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 20 Feb 2022 23:57:31 +0200 Subject: [PATCH 03/11] Fixed variable/constant references --- Modules/overlapped.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index d9f5a3d22034bf..b22d06146be449 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -1062,10 +1062,10 @@ do_WSARecvFrom(OverlappedObject *self, HANDLE handle, int ret; DWORD err; - wsabuf.buf = PyBytes_AS_STRING(buf); + wsabuf.buf = PyBytes_AS_STRING(bufobj); wsabuf.len = buflen; - self->type = TYPE_READFROM; + self->type = TYPE_READ_FROM; self->handle = handle; self->read_from.allocated_buffer = bufobj; memset(&self->read_from.address, 0, sizeof(self->read_from.address)); From ebd23534213961b9ff0a417ad233b6cc6c0c265b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 27 Feb 2022 20:22:51 +0200 Subject: [PATCH 04/11] Fixed recvfrom_into() --- Lib/asyncio/proactor_events.py | 5 +- Lib/asyncio/selector_events.py | 3 + Lib/test/test_asyncio/test_sock_lowlevel.py | 14 +- Modules/overlapped.c | 169 ++++++++++++++------ 4 files changed, 132 insertions(+), 59 deletions(-) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 8f3012d17be3c2..3c8ba7b4b0090a 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -703,13 +703,16 @@ async def sock_recvfrom(self, sock, bufsize): return await self._proactor.recvfrom(sock, bufsize) async def sock_recvfrom_into(self, sock, buf, nbytes=0): + if not nbytes: + nbytes = len(buf) + return await self._proactor.recvfrom_into(sock, buf, nbytes) async def sock_sendall(self, sock, data): return await self._proactor.send(sock, data) async def sock_sendto(self, sock, data, address): - return await self._proactor.send(sock, data, 0, address) + return await self._proactor.sendto(sock, data, 0, address) async def sock_connect(self, sock, address): return await self._proactor.connect(sock, address) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index a0757f0c7e3334..817705f6cc5381 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -488,6 +488,9 @@ async def sock_recvfrom_into(self, sock, buf, nbytes=0): _check_ssl_socket(sock) if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") + if not nbytes: + nbytes = len(buf) + try: return sock.recvfrom_into(buf, nbytes) except (BlockingIOError, InterruptedError): diff --git a/Lib/test/test_asyncio/test_sock_lowlevel.py b/Lib/test/test_asyncio/test_sock_lowlevel.py index 5b088924ab022c..db47616d18343e 100644 --- a/Lib/test/test_asyncio/test_sock_lowlevel.py +++ b/Lib/test/test_asyncio/test_sock_lowlevel.py @@ -410,12 +410,12 @@ async def _basetest_datagram_recvfrom_into(self, server_address): self.assertEqual(buf, data) self.assertEqual(from_addr, server_address) - buf = bytearray(4096) + buf = bytearray(8192) await self.loop.sock_sendto(sock, data, server_address) num_bytes, from_addr = await self.loop.sock_recvfrom_into( - sock, buf, 2048) - self.assertEqual(num_bytes, 2048) - self.assertEqual(buf[:2048], data[:2048]) + sock, buf, 4096) + self.assertEqual(num_bytes, 4096) + self.assertEqual(buf[:4096], data[:4096]) self.assertEqual(from_addr, server_address) def test_recvfrom_into(self): @@ -430,7 +430,7 @@ async def _basetest_datagram_sendto_blocking(self, server_address): data = b'\x01' * 4096 with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: sock.setblocking(False) - mock_sock = Mock(socket.socket) + mock_sock = Mock(sock) mock_sock.gettimeout = sock.gettimeout mock_sock.sendto.configure_mock(side_effect=BlockingIOError) mock_sock.fileno = sock.fileno @@ -445,6 +445,10 @@ async def _basetest_datagram_sendto_blocking(self, server_address): self.assertEqual(from_addr, server_address) def test_sendto_blocking(self): + if sys.platform == 'win32': + if isinstance(self.loop, asyncio.ProactorEventLoop): + raise unittest.SkipTest('Not relevant to ProactorEventLoop') + with test_utils.run_udp_echo_server() as server_address: self.loop.run_until_complete( self._basetest_datagram_sendto_blocking(server_address)) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index b22d06146be449..bf97a810b1bd78 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -64,7 +64,7 @@ class _overlapped.Overlapped "OverlappedObject *" "&OverlappedType" enum {TYPE_NONE, TYPE_NOT_STARTED, TYPE_READ, TYPE_READINTO, TYPE_WRITE, TYPE_ACCEPT, TYPE_CONNECT, TYPE_DISCONNECT, TYPE_CONNECT_NAMED_PIPE, TYPE_WAIT_NAMED_PIPE_AND_CONNECT, TYPE_TRANSMIT_FILE, TYPE_READ_FROM, - TYPE_WRITE_TO}; + TYPE_WRITE_TO, TYPE_READ_FROM_INTO}; typedef struct { PyObject_HEAD @@ -91,6 +91,17 @@ typedef struct { struct sockaddr_in6 address; int address_length; } read_from; + + /* Data used for reading from a connectionless socket: + TYPE_READ_FROM_INTO */ + struct { + // A (number of bytes read, (host, port)) tuple + PyObject* result; + /* Buffer passed by the user */ + Py_buffer user_buffer; + struct sockaddr_in6 address; + int address_length; + } read_from_into; }; } OverlappedObject; @@ -662,6 +673,16 @@ Overlapped_clear(OverlappedObject *self) } break; } + case TYPE_READ_FROM_INTO: { + if (self->read_from.result) { + // We've received a message, free the result tuple. + Py_CLEAR(self->read_from.result); + } + if (self->read_from_into.user_buffer.obj) { + PyBuffer_Release(&self->read_from_into.user_buffer); + } + break; + } case TYPE_WRITE: case TYPE_WRITE_TO: case TYPE_READINTO: { @@ -914,6 +935,30 @@ _overlapped_Overlapped_getresult_impl(OverlappedObject *self, BOOL wait) Py_INCREF(self->read_from.result); return self->read_from.result; + case TYPE_READ_FROM_INTO: + // unparse the address + addr = unparse_address((SOCKADDR*)&self->read_from_into.address, + self->read_from_into.address_length); + + if (addr == NULL) { + return NULL; + } + + // The result is a two item tuple: (number of bytes read, address) + self->read_from_into.result = PyTuple_New(2); + if (self->read_from_into.result == NULL) { + Py_CLEAR(addr); + return NULL; + } + + // first item: number of bytes read + PyTuple_SET_ITEM(self->read_from_into.result, 0, + PyLong_FromUnsignedLong((unsigned long)transferred)); + // second item: address + PyTuple_SET_ITEM(self->read_from_into.result, 1, addr); + + Py_INCREF(self->read_from_into.result); + return self->read_from_into.result; default: return PyLong_FromUnsignedLong((unsigned long) transferred); } @@ -1053,45 +1098,6 @@ do_WSARecv(OverlappedObject *self, HANDLE handle, } } -static PyObject * -do_WSARecvFrom(OverlappedObject *self, HANDLE handle, - PyObject *bufobj, DWORD buflen, DWORD flags) -{ - DWORD nread; - WSABUF wsabuf; - int ret; - DWORD err; - - wsabuf.buf = PyBytes_AS_STRING(bufobj); - wsabuf.len = buflen; - - self->type = TYPE_READ_FROM; - self->handle = handle; - self->read_from.allocated_buffer = bufobj; - memset(&self->read_from.address, 0, sizeof(self->read_from.address)); - self->read_from.address_length = sizeof(self->read_from.address); - - Py_BEGIN_ALLOW_THREADS - ret = WSARecvFrom((SOCKET)handle, &wsabuf, 1, &nread, &flags, - (SOCKADDR*)&self->read_from.address, - &self->read_from.address_length, - &self->overlapped, NULL); - Py_END_ALLOW_THREADS - - self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS); - switch(err) { - case ERROR_BROKEN_PIPE: - mark_as_completed(&self->overlapped); - return SetFromWindowsErr(err); - case ERROR_SUCCESS: - case ERROR_MORE_DATA: - case ERROR_IO_PENDING: - Py_RETURN_NONE; - default: - self->type = TYPE_NOT_STARTED; - return SetFromWindowsErr(err); - } -} /*[clinic input] _overlapped.Overlapped.WSARecv @@ -1821,7 +1827,40 @@ _overlapped_Overlapped_WSARecvFrom_impl(OverlappedObject *self, return NULL; } - return do_WSARecvFrom(self, handle, buf, size, flags); + DWORD nread; + WSABUF wsabuf; + int ret; + DWORD err; + + wsabuf.buf = PyBytes_AS_STRING(buf); + wsabuf.len = size; + + self->type = TYPE_READ_FROM; + self->handle = handle; + self->read_from.allocated_buffer = buf; + memset(&self->read_from.address, 0, sizeof(self->read_from.address)); + self->read_from.address_length = sizeof(self->read_from.address); + + Py_BEGIN_ALLOW_THREADS + ret = WSARecvFrom((SOCKET)handle, &wsabuf, 1, &nread, &flags, + (SOCKADDR*)&self->read_from.address, + &self->read_from.address_length, + &self->overlapped, NULL); + Py_END_ALLOW_THREADS + + self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS); + switch (err) { + case ERROR_BROKEN_PIPE: + mark_as_completed(&self->overlapped); + return SetFromWindowsErr(err); + case ERROR_SUCCESS: + case ERROR_MORE_DATA: + case ERROR_IO_PENDING: + Py_RETURN_NONE; + default: + self->type = TYPE_NOT_STARTED; + return SetFromWindowsErr(err); + } } @@ -1848,26 +1887,50 @@ _overlapped_Overlapped_WSARecvFromInto_impl(OverlappedObject *self, return NULL; } - Py_buffer buffer; - if (!PyArg_Parse(bufobj, "y*", &buffer)) + if (!PyArg_Parse(bufobj, "y*", &self->read_from_into.user_buffer)) return NULL; #if SIZEOF_SIZE_T > SIZEOF_LONG - if (buffer.len > (Py_ssize_t)ULONG_MAX) { - PyBuffer_Release(&buffer); + if (self->read_from_into.user_buffer.len > (Py_ssize_t)ULONG_MAX) { + PyBuffer_Release(&self->read_from_into.user_buffer); PyErr_SetString(PyExc_ValueError, "buffer too large"); return NULL; } #endif - if (buffer.len < size) { - PyBuffer_Release(&buffer); - PyErr_SetString(PyExc_ValueError, - "nbytes is greater than the length of the buffer"); - return NULL; - } - PyBuffer_Release(&buffer); - return do_WSARecvFrom(self, handle, bufobj, size, flags); + DWORD nread; + WSABUF wsabuf; + int ret; + DWORD err; + + wsabuf.buf = self->read_from_into.user_buffer.buf; + wsabuf.len = size; + + self->type = TYPE_READ_FROM_INTO; + self->handle = handle; + memset(&self->read_from_into.address, 0, sizeof(self->read_from_into.address)); + self->read_from_into.address_length = sizeof(self->read_from_into.address); + + Py_BEGIN_ALLOW_THREADS + ret = WSARecvFrom((SOCKET)handle, &wsabuf, 1, &nread, &flags, + (SOCKADDR*)&self->read_from_into.address, + &self->read_from_into.address_length, + &self->overlapped, NULL); + Py_END_ALLOW_THREADS + + self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS); + switch (err) { + case ERROR_BROKEN_PIPE: + mark_as_completed(&self->overlapped); + return SetFromWindowsErr(err); + case ERROR_SUCCESS: + case ERROR_MORE_DATA: + case ERROR_IO_PENDING: + Py_RETURN_NONE; + default: + self->type = TYPE_NOT_STARTED; + return SetFromWindowsErr(err); + } } From af18ee1ae5c854b76f2d002f97a2f2ecca908ccc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 27 Feb 2022 22:27:18 +0200 Subject: [PATCH 05/11] Fixed NameError --- Lib/asyncio/selector_events.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 066ca61aad477e..0d0001d1ee6837 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -442,7 +442,7 @@ async def sock_recvfrom(self, sock, bufsize): The maximum amount of data to be received at once is specified by nbytes. """ - _check_ssl_socket(sock) + base_events._check_ssl_socket(sock) if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") try: @@ -480,7 +480,7 @@ async def sock_recvfrom_into(self, sock, buf, nbytes=0): The received data is written into *buf* (a writable buffer). The return value is a tuple of (number of bytes written, address). """ - _check_ssl_socket(sock) + base_events._check_ssl_socket(sock) if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") try: @@ -575,7 +575,7 @@ async def sock_sendto(self, sock, data, address): raised, and there is no way to determine how much data, if any, was successfully processed by the receiving end of the connection. """ - _check_ssl_socket(sock) + base_events._check_ssl_socket(sock) if self._debug and sock.gettimeout() != 0: raise ValueError("the socket must be non-blocking") try: From 239559886be0541820c4206c6a87c6535183b640 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Mon, 28 Feb 2022 01:00:41 +0200 Subject: [PATCH 06/11] Cleanup additions/fixes --- Modules/overlapped.c | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index bf97a810b1bd78..3ec765257e3f23 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -674,9 +674,9 @@ Overlapped_clear(OverlappedObject *self) break; } case TYPE_READ_FROM_INTO: { - if (self->read_from.result) { + if (self->read_from_into.result) { // We've received a message, free the result tuple. - Py_CLEAR(self->read_from.result); + Py_CLEAR(self->read_from_into.result); } if (self->read_from_into.user_buffer.obj) { PyBuffer_Release(&self->read_from_into.user_buffer); @@ -887,6 +887,11 @@ _overlapped_Overlapped_getresult_impl(OverlappedObject *self, BOOL wait) { break; } + else if (self->type == TYPE_READ_FROM_INTO && + self->read_from_into.result != NULL) + { + break; + } /* fall through */ default: return SetFromWindowsErr(err); @@ -1663,6 +1668,13 @@ Overlapped_traverse(OverlappedObject *self, visitproc visit, void *arg) case TYPE_READ_FROM: Py_VISIT(self->read_from.result); Py_VISIT(self->read_from.allocated_buffer); + break; + case TYPE_READ_FROM_INTO: + Py_VISIT(self->read_from_into.result); + if (self->read_from_into.user_buffer.obj) { + Py_VISIT(&self->read_from_into.user_buffer.obj); + } + break; } return 0; } From 7b3c12bced677cb1d3c333746425f5be7b260188 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Fri, 11 Mar 2022 02:21:25 +0200 Subject: [PATCH 07/11] Fixed wrong return value on BrokenPipeError --- Lib/asyncio/windows_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 3a27e570a98357..90b259cbafead2 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -518,7 +518,7 @@ def recvfrom_into(self, conn, buf, flags=0): try: ov.WSARecvFromInto(conn.fileno(), buf, flags) except BrokenPipeError: - return self._result((b'', None)) + return self._result((0, None)) def finish_recv(trans, key, ov): try: From 16177df0aa6bc56f557ee2e9022b2525c419eabe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Fri, 11 Mar 2022 15:02:29 +0200 Subject: [PATCH 08/11] Added item to the "improved modules" section --- Doc/whatsnew/3.11.rst | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/Doc/whatsnew/3.11.rst b/Doc/whatsnew/3.11.rst index ce15fb72f3c49b..27d5b435b4c3a0 100644 --- a/Doc/whatsnew/3.11.rst +++ b/Doc/whatsnew/3.11.rst @@ -226,6 +226,15 @@ New Modules Improved Modules ================ +asyncio +------- + +* Add raw datagram socket functions to the event loop: + :meth:`~asyncio.AbstractEventLoop.sock_sendto`, + :meth:`~asyncio.AbstractEventLoop.sock_recvfrom` and + :meth:`~asyncio.AbstractEventLoop.sock_recvfrom_into`. + (Contributed by Alex Grönholm in :issue:`46805`.) + fractions --------- From 059f645e2dc4618280dba97d17f8acfa7b88cd61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sat, 12 Mar 2022 14:27:58 +0200 Subject: [PATCH 09/11] Fixed the return type description for sock_recvfrom_into() --- Doc/library/asyncio-eventloop.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index d4b09dabd04111..4776853b5a56d8 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -939,7 +939,7 @@ convenient. Asynchronous version of :meth:`socket.recvfrom_into() `. - Return a tuple of (received data, remote address). + Return a tuple of (number of bytes received, remote address). *sock* must be a non-blocking socket. From 7908ea687c55f5b7e54bee8a1319fb5fcfbeb058 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sat, 12 Mar 2022 14:39:02 +0200 Subject: [PATCH 10/11] Fixed code style --- Modules/overlapped.c | 35 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/Modules/overlapped.c b/Modules/overlapped.c index 3ec765257e3f23..b1c7081647093c 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -1825,6 +1825,10 @@ _overlapped_Overlapped_WSARecvFrom_impl(OverlappedObject *self, /*[clinic end generated code: output=13832a2025b86860 input=1b2663fa130e0286]*/ { PyObject *buf; + DWORD nread; + WSABUF wsabuf; + int ret; + DWORD err; if (self->type != TYPE_NONE) { PyErr_SetString(PyExc_ValueError, "operation already attempted"); @@ -1839,11 +1843,6 @@ _overlapped_Overlapped_WSARecvFrom_impl(OverlappedObject *self, return NULL; } - DWORD nread; - WSABUF wsabuf; - int ret; - DWORD err; - wsabuf.buf = PyBytes_AS_STRING(buf); wsabuf.len = size; @@ -1854,10 +1853,10 @@ _overlapped_Overlapped_WSARecvFrom_impl(OverlappedObject *self, self->read_from.address_length = sizeof(self->read_from.address); Py_BEGIN_ALLOW_THREADS - ret = WSARecvFrom((SOCKET)handle, &wsabuf, 1, &nread, &flags, - (SOCKADDR*)&self->read_from.address, - &self->read_from.address_length, - &self->overlapped, NULL); + ret = WSARecvFrom((SOCKET)handle, &wsabuf, 1, &nread, &flags, + (SOCKADDR*)&self->read_from.address, + &self->read_from.address_length, + &self->overlapped, NULL); Py_END_ALLOW_THREADS self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS); @@ -1894,6 +1893,11 @@ _overlapped_Overlapped_WSARecvFromInto_impl(OverlappedObject *self, DWORD size, DWORD flags) /*[clinic end generated code: output=45fc5d883a11c4e5 input=8eacd80d50157434]*/ { + DWORD nread; + WSABUF wsabuf; + int ret; + DWORD err; + if (self->type != TYPE_NONE) { PyErr_SetString(PyExc_ValueError, "operation already attempted"); return NULL; @@ -1910,11 +1914,6 @@ _overlapped_Overlapped_WSARecvFromInto_impl(OverlappedObject *self, } #endif - DWORD nread; - WSABUF wsabuf; - int ret; - DWORD err; - wsabuf.buf = self->read_from_into.user_buffer.buf; wsabuf.len = size; @@ -1924,10 +1923,10 @@ _overlapped_Overlapped_WSARecvFromInto_impl(OverlappedObject *self, self->read_from_into.address_length = sizeof(self->read_from_into.address); Py_BEGIN_ALLOW_THREADS - ret = WSARecvFrom((SOCKET)handle, &wsabuf, 1, &nread, &flags, - (SOCKADDR*)&self->read_from_into.address, - &self->read_from_into.address_length, - &self->overlapped, NULL); + ret = WSARecvFrom((SOCKET)handle, &wsabuf, 1, &nread, &flags, + (SOCKADDR*)&self->read_from_into.address, + &self->read_from_into.address_length, + &self->overlapped, NULL); Py_END_ALLOW_THREADS self->error = err = (ret < 0 ? WSAGetLastError() : ERROR_SUCCESS); From f907a77dd9847c468a744ba08d489ba1b8137f54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sat, 12 Mar 2022 18:33:57 +0200 Subject: [PATCH 11/11] Switched to automated parsing of Py_buffer --- Modules/clinic/overlapped.c.h | 15 ++++++++++----- Modules/overlapped.c | 24 +++++++++--------------- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/Modules/clinic/overlapped.c.h b/Modules/clinic/overlapped.c.h index 4e96ee2bb65772..16d6013ef2f2e5 100644 --- a/Modules/clinic/overlapped.c.h +++ b/Modules/clinic/overlapped.c.h @@ -917,7 +917,7 @@ PyDoc_STRVAR(_overlapped_Overlapped_WSARecvFromInto__doc__, static PyObject * _overlapped_Overlapped_WSARecvFromInto_impl(OverlappedObject *self, - HANDLE handle, PyObject *bufobj, + HANDLE handle, Py_buffer *bufobj, DWORD size, DWORD flags); static PyObject * @@ -925,17 +925,22 @@ _overlapped_Overlapped_WSARecvFromInto(OverlappedObject *self, PyObject *const * { PyObject *return_value = NULL; HANDLE handle; - PyObject *bufobj; + Py_buffer bufobj = {NULL, NULL}; DWORD size; DWORD flags = 0; - if (!_PyArg_ParseStack(args, nargs, ""F_HANDLE"Ok|k:WSARecvFromInto", + if (!_PyArg_ParseStack(args, nargs, ""F_HANDLE"y*k|k:WSARecvFromInto", &handle, &bufobj, &size, &flags)) { goto exit; } - return_value = _overlapped_Overlapped_WSARecvFromInto_impl(self, handle, bufobj, size, flags); + return_value = _overlapped_Overlapped_WSARecvFromInto_impl(self, handle, &bufobj, size, flags); exit: + /* Cleanup for bufobj */ + if (bufobj.obj) { + PyBuffer_Release(&bufobj); + } + return return_value; } -/*[clinic end generated code: output=9bc096b4db308930 input=a9049054013a1b77]*/ +/*[clinic end generated code: output=5c9b17890ef29d52 input=a9049054013a1b77]*/ diff --git a/Modules/overlapped.c b/Modules/overlapped.c index b1c7081647093c..ab9a2f0ce26f62 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -98,7 +98,7 @@ typedef struct { // A (number of bytes read, (host, port)) tuple PyObject* result; /* Buffer passed by the user */ - Py_buffer user_buffer; + Py_buffer *user_buffer; struct sockaddr_in6 address; int address_length; } read_from_into; @@ -678,9 +678,6 @@ Overlapped_clear(OverlappedObject *self) // We've received a message, free the result tuple. Py_CLEAR(self->read_from_into.result); } - if (self->read_from_into.user_buffer.obj) { - PyBuffer_Release(&self->read_from_into.user_buffer); - } break; } case TYPE_WRITE: @@ -1671,8 +1668,8 @@ Overlapped_traverse(OverlappedObject *self, visitproc visit, void *arg) break; case TYPE_READ_FROM_INTO: Py_VISIT(self->read_from_into.result); - if (self->read_from_into.user_buffer.obj) { - Py_VISIT(&self->read_from_into.user_buffer.obj); + if (self->read_from_into.user_buffer->obj) { + Py_VISIT(&self->read_from_into.user_buffer->obj); } break; } @@ -1879,7 +1876,7 @@ _overlapped_Overlapped_WSARecvFrom_impl(OverlappedObject *self, _overlapped.Overlapped.WSARecvFromInto handle: HANDLE - buf as bufobj: object + buf as bufobj: Py_buffer size: DWORD flags: DWORD = 0 / @@ -1889,9 +1886,9 @@ Start overlapped receive. static PyObject * _overlapped_Overlapped_WSARecvFromInto_impl(OverlappedObject *self, - HANDLE handle, PyObject *bufobj, + HANDLE handle, Py_buffer *bufobj, DWORD size, DWORD flags) -/*[clinic end generated code: output=45fc5d883a11c4e5 input=8eacd80d50157434]*/ +/*[clinic end generated code: output=30c7ea171a691757 input=4be4b08d03531e76]*/ { DWORD nread; WSABUF wsabuf; @@ -1903,22 +1900,19 @@ _overlapped_Overlapped_WSARecvFromInto_impl(OverlappedObject *self, return NULL; } - if (!PyArg_Parse(bufobj, "y*", &self->read_from_into.user_buffer)) - return NULL; - #if SIZEOF_SIZE_T > SIZEOF_LONG - if (self->read_from_into.user_buffer.len > (Py_ssize_t)ULONG_MAX) { - PyBuffer_Release(&self->read_from_into.user_buffer); + if (bufobj->len > (Py_ssize_t)ULONG_MAX) { PyErr_SetString(PyExc_ValueError, "buffer too large"); return NULL; } #endif - wsabuf.buf = self->read_from_into.user_buffer.buf; + wsabuf.buf = bufobj->buf; wsabuf.len = size; self->type = TYPE_READ_FROM_INTO; self->handle = handle; + self->read_from_into.user_buffer = bufobj; memset(&self->read_from_into.address, 0, sizeof(self->read_from_into.address)); self->read_from_into.address_length = sizeof(self->read_from_into.address);