From 849055b606d7746630451e4bb3c4ae5770b34503 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 24 Oct 2019 00:10:55 -0700 Subject: [PATCH 01/29] On Windows, run tests with LSPs installed Also: - Increase test timeout, because for some reason the "PCTools" LSP is sometimes incredibly slow to install. - Factor out common curl options in ci.sh, for more consistency/reliability - Run cleanup/codecov upload even if tests failed --- azure-pipelines.yml | 10 +++++++++- ci.sh | 47 ++++++++++++++++++++++++++++++++++++++------- 2 files changed, 49 insertions(+), 8 deletions(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index d598ecc8e..a65ed920a 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -39,7 +39,7 @@ jobs: - job: 'Windows' pool: vmImage: 'vs2017-win2016' - timeoutInMinutes: 10 + timeoutInMinutes: 20 strategy: # Python version list: # 64-bit: https://www.nuget.org/packages/python/ @@ -63,6 +63,14 @@ jobs: "Python 3.7, 64 bit": python.version: '3.7.2' python.pkg: 'python' + "with IFS LSP, Python 3.7, 64 bit": + python.version: '3.7.2' + python.pkg: 'python' + lsp: 'http://www.proxifier.com/download/ProxifierSetup.exe' + "with non-IFS LSP, Python 3.7, 64 bit": + python.version: '3.7.2' + python.pkg: 'python' + lsp: 'http://download.pctools.com/mirror/updates/9.0.0.2308-SDavfree-lite_en.exe' steps: - task: NuGetToolInstaller@0 diff --git a/ci.sh b/ci.sh index fa054812a..1daf728af 100755 --- a/ci.sh +++ b/ci.sh @@ -12,6 +12,11 @@ else CODECOV_NAME="${TRAVIS_OS_NAME}-${TRAVIS_PYTHON_VERSION:-unknown}" fi +# We always want to retry on failure, and we have to set --connect-timeout to +# work around a curl bug: +# https://github.com/curl/curl/issues/4461 +CURL="curl --connect-timeout 5 --retry 5" + ################################################################ # Bootstrap python environment, if necessary ################################################################ @@ -47,12 +52,12 @@ fi if [ "$TRAVIS_OS_NAME" = "osx" ]; then CODECOV_NAME="osx_${MACPYTHON}" - curl -Lo macpython.pkg https://www.python.org/ftp/python/${MACPYTHON}/python-${MACPYTHON}-macosx10.6.pkg + $CURL -Lo macpython.pkg https://www.python.org/ftp/python/${MACPYTHON}/python-${MACPYTHON}-macosx10.6.pkg sudo installer -pkg macpython.pkg -target / ls /Library/Frameworks/Python.framework/Versions/*/bin/ PYTHON_EXE=/Library/Frameworks/Python.framework/Versions/*/bin/python3 # The pip in older MacPython releases doesn't support a new enough TLS - curl https://bootstrap.pypa.io/get-pip.py | sudo $PYTHON_EXE + $CURL https://bootstrap.pypa.io/get-pip.py | sudo $PYTHON_EXE sudo $PYTHON_EXE -m pip install virtualenv $PYTHON_EXE -m virtualenv testenv source testenv/bin/activate @@ -62,7 +67,7 @@ fi if [ "$PYPY_NIGHTLY_BRANCH" != "" ]; then CODECOV_NAME="pypy_nightly_${PYPY_NIGHTLY_BRANCH}" - curl -fLo pypy.tar.bz2 http://buildbot.pypy.org/nightly/${PYPY_NIGHTLY_BRANCH}/pypy-c-jit-latest-linux64.tar.bz2 + $CURL -fLo pypy.tar.bz2 http://buildbot.pypy.org/nightly/${PYPY_NIGHTLY_BRANCH}/pypy-c-jit-latest-linux64.tar.bz2 if [ ! -s pypy.tar.bz2 ]; then # We know: # - curl succeeded (200 response code; -f means "exit with error if @@ -115,12 +120,40 @@ else # Actual tests python -m pip install -r test-requirements.txt + # If we're testing with a LSP installed, then it might break network + # stuff, so wait until after we've finished setting everything else + # up. + if [ "$LSP" != "" ]; then + echo "Installing LSP from ${LSP}" + $CURL -o lsp-installer.exe "$LSP" + # Double-slashes are how you tell windows-bash that you want a single + # slash, and don't treat this as a unix-style filename that needs to + # be replaced by a windows-style filename. + # http://www.mingw.org/wiki/Posix_path_conversion + ./lsp-installer.exe //silent //norestart + echo "Waiting for LSP to appear in Winsock catalog" + while ! netsh winsock show catalog | grep "Layered Chain Entry"; do + sleep 1 + done + netsh winsock show catalog + fi + mkdir empty cd empty INSTALLDIR=$(python -c "import os, trio; print(os.path.dirname(trio.__file__))") cp ../setup.cfg $INSTALLDIR - pytest -W error -r a --junitxml=../test-results.xml --run-slow ${INSTALLDIR} --cov="$INSTALLDIR" --cov-config=../.coveragerc --verbose + if pytest -W error -r a --junitxml=../test-results.xml --run-slow ${INSTALLDIR} --cov="$INSTALLDIR" --cov-config=../.coveragerc --verbose; then + PASSED=true + else + PASSED=false + fi + + # Remove the LSP again; again we want to do this ASAP to avoid + # accidentally breaking other stuff. + if [ "$LSP" != "" ]; then + netsh winsock reset + fi # Disable coverage on 3.8 until we run 3.8 on Windows CI too # https://github.com/python-trio/trio/pull/784#issuecomment-446438407 @@ -143,9 +176,9 @@ else # bash <(curl ...) # but azure is broken: # https://developercommunity.visualstudio.com/content/problem/743824/bash-task-on-windows-suddenly-fails-with-bash-devf.html - # Also we have to set --connect-timeout to work around: - # https://github.com/curl/curl/issues/4461 - curl --connect-timeout 5 --retry 5 -o codecov.sh https://codecov.io/bash + $CURL -o codecov.sh https://codecov.io/bash bash codecov.sh -n "${CODECOV_NAME}" -F "$FLAG" fi + + $PASSED fi From 22b07e5a01c088f9f50bdb107d23a915c5b0f4bb Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 24 Oct 2019 02:31:44 -0700 Subject: [PATCH 02/29] Improve notify_closing docs --- docs/source/reference-hazmat.rst | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/docs/source/reference-hazmat.rst b/docs/source/reference-hazmat.rst index 75e7d88d3..f16cb0016 100644 --- a/docs/source/reference-hazmat.rst +++ b/docs/source/reference-hazmat.rst @@ -171,7 +171,21 @@ All environments provide the following functions: `~trio.ClosedResourceError`. This doesn't actually close the object – you still have to do that - yourself afterwards. + yourself afterwards. Also, you want to be careful to make sure no + new tasks start waiting on the object in between when you call this + and when it's actually closed. So to close something properly, you + usually want to do these steps in order: + + 1. Explicitly mark the object as closed, so that any new attempts + to use it will abort before they start. + 2. Call `notify_closing` to wake up any already-existing users. + 3. Actually close the object. + + It's also possible to do them in a different order if that's more + convenient, *but only if* you make sure not to have any checkpoints in + between the steps. This way they all happen in a single atomic + step, so other tasks won't be able to tell what order they happened + in anyway. Unix-specific API From c8a63de16ac3e98855196b56632d349878b47bda Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 24 Oct 2019 02:31:59 -0700 Subject: [PATCH 03/29] Checkpoint: basic AFD support working As of this commit, you can call trio._core._generated_io_windows.afd_poll and it seems to basically work. --- trio/_core/_generated_io_windows.py | 7 ++ trio/_core/_io_windows.py | 94 ++++++++++++++++++- trio/_core/_windows_cffi.py | 136 +++++++++++++++++++++++----- 3 files changed, 213 insertions(+), 24 deletions(-) diff --git a/trio/_core/_generated_io_windows.py b/trio/_core/_generated_io_windows.py index 91f26da9a..640f8b9ac 100644 --- a/trio/_core/_generated_io_windows.py +++ b/trio/_core/_generated_io_windows.py @@ -55,6 +55,13 @@ def notify_closing(sock): except AttributeError: raise RuntimeError('must be called from async context') +async def afd_poll(sock, events): + locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True + try: + return await GLOBAL_RUN_CONTEXT.runner.io_manager.afd_poll(sock, events) + except AttributeError: + raise RuntimeError('must be called from async context') + async def write_overlapped(handle, data, file_offset=0): locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True try: diff --git a/trio/_core/_io_windows.py b/trio/_core/_io_windows.py index ef8726f32..aa3e0edb7 100644 --- a/trio/_core/_io_windows.py +++ b/trio/_core/_io_windows.py @@ -19,10 +19,16 @@ ffi, kernel32, ntdll, + ws2_32, INVALID_HANDLE_VALUE, raise_winerror, - ErrorCodes, _handle, + ErrorCodes, + FileFlags, + AFDPollFlags, + WSAIoctls, + CompletionModes, + IoControlCodes, ) # There's a lot to be said about the overall design of a Windows event @@ -79,6 +85,60 @@ def _check(success): return success +def _get_base_socket(sock): + if hasattr(sock, "fileno"): + sock = sock.fileno() + base_ptr = ffi.new("HANDLE *") + out_size = ffi.new("DWORD *") + failed = ws2_32.WSAIoctl( + ffi.cast("SOCKET", sock), + WSAIoctls.SIO_BASE_HANDLE, + ffi.NULL, + 0, + base_ptr, + ffi.sizeof("HANDLE"), + out_size, + ffi.NULL, + ffi.NULL, + ) + if failed: + code = ws2_32.WSAGetLastError() + raise_winerror(code) + return base_ptr[0] + + +# We'll use CreateFile and DeviceIoControl instead of the Nt* versions +def _afd_helper_handle(): + # The "AFD" driver is exposed at the NT path "\Device\Afd". We're using + # the Win32 CreateFile, though, so we have to pass a Win32 path. \\.\ is + # how Win32 refers to the NT \GLOBAL??\ directory, and GLOBALROOT is a + # symlink inside that directory that points to the root of the NT path + # system. So by sticking that in front of the NT path, we get a Win32 + # path. Alternatively, we could use NtCreateFile directly, since it takes + # an NT path. But we already wrap CreateFileW so this was easier. + # References: + # https://blogs.msdn.microsoft.com/jeremykuhne/2016/05/02/dos-to-nt-a-paths-journey/ + # https://stackoverflow.com/a/21704022 + rawname = r"\\.\GLOBALROOT\Device\Afd\Trio".encode("utf-16le") + b"\0\0" + rawname_buf = ffi.from_buffer(rawname) + + handle = kernel32.CreateFileW( + ffi.cast("LPCWSTR", rawname_buf), + FileFlags.SYNCHRONIZE, + FileFlags.FILE_SHARE_READ | FileFlags.FILE_SHARE_WRITE, + ffi.NULL, # no security attributes + FileFlags.OPEN_EXISTING, + FileFlags.FILE_FLAG_OVERLAPPED, + ffi.NULL, # no template file + ) + if handle == INVALID_HANDLE_VALUE: # pragma: no cover + raise_winerror() + return handle + +# "readable" is AFD_POLL_RECEIVE | AFD_POLL_ACCEPT | AFD_POLL_ABORT | AFD_POLL_DISCONNECT | AFD_POLL_LOCAL_CLOSE +# "writable" is AFD_POLL_SEND | AFD_POLL_CONNECT_FAIL | AFD_POLL_ABORT | AFD_POLL_LOCAL_CLOSE + + @attr.s(slots=True, eq=False, frozen=True) class _WindowsStatistics: tasks_waiting_overlapped = attr.ib() @@ -104,6 +164,8 @@ def __init__(self): INVALID_HANDLE_VALUE, ffi.NULL, 0, 0 ) ) + self._afd = _afd_helper_handle() + self.register_with_iocp(self._afd) self._closed = False self._iocp_queue = deque() self._iocp_thread = None @@ -318,6 +380,11 @@ def register_with_iocp(self, handle): # INVALID_PARAMETER seems to be used for both "can't register # because not opened in OVERLAPPED mode" and "already registered" _check(kernel32.CreateIoCompletionPort(handle, self._iocp, 0, 0)) + # Supposedly this makes things slightly faster, by disabling the + # ability to do WaitForSingleObject(handle). We would never want to do + # that anyway, so might as well get the extra speed (if any). + # Ref: http://www.lenholgate.com/blog/2009/09/interesting-blog-posts-on-high-performance-servers.html + _check(kernel32.SetFileCompletionNotificationModes(handle, CompletionModes.FILE_SKIP_SET_EVENT_ON_HANDLE)) @_public async def wait_overlapped(self, handle, lpOverlapped): @@ -457,6 +524,31 @@ async def _perform_overlapped(self, handle, submit_fn): await self.wait_overlapped(handle, lpOverlapped) return lpOverlapped + @_public + async def afd_poll(self, sock, events): + poll_info = ffi.new("AFD_POLL_INFO *") + poll_info.Timeout = 2 ** 63 - 1 # INT64_MAX + poll_info.NumberOfHandles = 1 + poll_info.Exclusive = 0 + poll_info.Handles[0].Handle = _get_base_socket(sock) + poll_info.Handles[0].Status = 0 + poll_info.Handles[0].Events = events + + def submit_afd_poll(lpOverlapped): + _check(kernel32.DeviceIoControl( + self._afd, + IoControlCodes.IOCTL_AFD_POLL, + poll_info, + ffi.sizeof("AFD_POLL_INFO"), + poll_info, + ffi.sizeof("AFD_POLL_INFO"), + ffi.NULL, + lpOverlapped, + )) + + await self._perform_overlapped(self._afd, submit_afd_poll) + return AFDPollFlags(poll_info.Handles[0].Events) + @_public async def write_overlapped(self, handle, data, file_offset=0): with ffi.from_buffer(data) as cbuf: diff --git a/trio/_core/_windows_cffi.py b/trio/_core/_windows_cffi.py index f05e871ce..d12f77d5f 100644 --- a/trio/_core/_windows_cffi.py +++ b/trio/_core/_windows_cffi.py @@ -2,6 +2,10 @@ import re import enum +################################################################ +# Functions and types +################################################################ + LIB = """ // https://msdn.microsoft.com/en-us/library/windows/desktop/aa383751(v=vs.85).aspx typedef int BOOL; @@ -57,6 +61,11 @@ _In_ DWORD NumberOfConcurrentThreads ); +BOOL SetFileCompletionNotificationModes( + HANDLE FileHandle, + UCHAR Flags +); + HANDLE CreateFileW( LPCWSTR lpFileName, DWORD dwDesiredAccess, @@ -144,6 +153,51 @@ NTSTATUS Status ); +int WSAIoctl( + SOCKET s, + DWORD dwIoControlCode, + LPVOID lpvInBuffer, + DWORD cbInBuffer, + LPVOID lpvOutBuffer, + DWORD cbOutBuffer, + LPDWORD lpcbBytesReturned, + LPWSAOVERLAPPED lpOverlapped, + // actually LPWSAOVERLAPPED_COMPLETION_ROUTINE + void* lpCompletionRoutine +); + +int WSAGetLastError(); + +BOOL DeviceIoControl( + HANDLE hDevice, + DWORD dwIoControlCode, + LPVOID lpInBuffer, + DWORD nInBufferSize, + LPVOID lpOutBuffer, + DWORD nOutBufferSize, + LPDWORD lpBytesReturned, + LPOVERLAPPED lpOverlapped +); + +// From https://github.com/piscisaureus/wepoll/blob/master/src/afd.h +typedef struct _AFD_POLL_HANDLE_INFO { + HANDLE Handle; + ULONG Events; + NTSTATUS Status; +} AFD_POLL_HANDLE_INFO, *PAFD_POLL_HANDLE_INFO; + +// This is really defined as a messy union to allow stuff like +// i.DUMMYSTRUCTNAME.LowPart, but we don't need those complications. +// Under all that it's just an int64. +typedef int64_t LARGE_INTEGER; + +typedef struct _AFD_POLL_INFO { + LARGE_INTEGER Timeout; + ULONG NumberOfHandles; + ULONG Exclusive; + AFD_POLL_HANDLE_INFO Handles[1]; +} AFD_POLL_INFO, *PAFD_POLL_INFO; + """ # cribbed from pywincffi @@ -165,32 +219,17 @@ kernel32 = ffi.dlopen("kernel32.dll") ntdll = ffi.dlopen("ntdll.dll") +ws2_32 = ffi.dlopen("ws2_32.dll") -INVALID_HANDLE_VALUE = ffi.cast("HANDLE", -1) - +################################################################ +# Magic numbers +################################################################ -def _handle(obj): - # For now, represent handles as either cffi HANDLEs or as ints. If you - # try to pass in a file descriptor instead, it's not going to work - # out. (For that msvcrt.get_osfhandle does the trick, but I don't know if - # we'll actually need that for anything...) For sockets this doesn't - # matter, Python never allocates an fd. So let's wait until we actually - # encounter the problem before worrying about it. - if type(obj) is int: - return ffi.cast("HANDLE", obj) - else: - return obj - - -def raise_winerror(winerror=None, *, filename=None, filename2=None): - if winerror is None: - winerror, msg = ffi.getwinerror() - else: - _, msg = ffi.getwinerror(winerror) - # See: - # https://docs.python.org/3/library/exceptions.html#exceptions.WindowsError - raise OSError(0, msg, filename, winerror, filename2) +# Here's a great resource for looking these up: +# https://www.magnumdb.com +# (Tip: check the box to see "Hex value") +INVALID_HANDLE_VALUE = ffi.cast("HANDLE", -1) class ErrorCodes(enum.IntEnum): STATUS_TIMEOUT = 0x102 @@ -208,6 +247,7 @@ class ErrorCodes(enum.IntEnum): class FileFlags(enum.IntEnum): GENERIC_READ = 0x80000000 + SYNCHRONIZE = 0x00100000 FILE_FLAG_OVERLAPPED = 0x40000000 FILE_SHARE_READ = 1 FILE_SHARE_WRITE = 2 @@ -217,3 +257,53 @@ class FileFlags(enum.IntEnum): OPEN_EXISTING = 3 OPEN_ALWAYS = 4 TRUNCATE_EXISTING = 5 + + +# https://github.com/piscisaureus/wepoll/blob/master/src/afd.h +class AFDPollFlags(enum.IntFlag): + AFD_POLL_RECEIVE = 0x0001 + AFD_POLL_RECEIVE_EXPEDITED = 0x0002 + AFD_POLL_SEND = 0x0004 + AFD_POLL_DISCONNECT = 0x0008 + AFD_POLL_ABORT = 0x0010 + AFD_POLL_LOCAL_CLOSE = 0x0020 + AFD_POLL_ACCEPT = 0x0080 + AFD_POLL_CONNECT_FAIL = 0x0100 + + +class WSAIoctls(enum.IntEnum): + SIO_BASE_HANDLE = 0x48000022 + +class CompletionModes(enum.IntFlag): + FILE_SKIP_COMPLETION_PORT_ON_SUCCESS = 0x1 + FILE_SKIP_SET_EVENT_ON_HANDLE = 0x2 + + +class IoControlCodes(enum.IntEnum): + IOCTL_AFD_POLL = 0x00012024 + + +################################################################ +# Generic helpers +################################################################ + +def _handle(obj): + # For now, represent handles as either cffi HANDLEs or as ints. If you + # try to pass in a file descriptor instead, it's not going to work + # out. (For that msvcrt.get_osfhandle does the trick, but I don't know if + # we'll actually need that for anything...) For sockets this doesn't + # matter, Python never allocates an fd. So let's wait until we actually + # encounter the problem before worrying about it. + if type(obj) is int: + return ffi.cast("HANDLE", obj) + else: + return obj + + +def raise_winerror(winerror=None, *, filename=None, filename2=None): + if winerror is None: + winerror, msg = ffi.getwinerror() + else: + _, msg = ffi.getwinerror(winerror) + # https://docs.python.org/3/library/exceptions.html#OSError + raise OSError(0, msg, filename, winerror, filename2) From f7ac5aadf229d5d771acf79ebdaf7a5eb047709b Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 24 Oct 2019 16:29:37 -0700 Subject: [PATCH 04/29] Checkpoint: a beautiful but doomed approach This is a nice solid implementation *except* that it turns out I had a fundamental misunderstanding of how AFD_POLL works, so it all falls apart as soon as you have multiple tasks waiting on the same socket simultaneously. --- docs/source/reference-hazmat.rst | 8 +- trio/_core/_io_windows.py | 561 +++++++++++++++---------------- trio/_core/_windows_cffi.py | 13 +- trio/_core/tests/test_io.py | 1 + 4 files changed, 288 insertions(+), 295 deletions(-) diff --git a/docs/source/reference-hazmat.rst b/docs/source/reference-hazmat.rst index f16cb0016..98519392b 100644 --- a/docs/source/reference-hazmat.rst +++ b/docs/source/reference-hazmat.rst @@ -165,10 +165,10 @@ All environments provide the following functions: .. function:: notify_closing(obj) - Call this before closing a file descriptor or ``SOCKET`` handle - that another task might be waiting on. This will cause any - `wait_readable` or `wait_writable` calls to immediately raise - `~trio.ClosedResourceError`. + Call this before closing a file descriptor (on Unix) or socket (on + Windows) that another task might be waiting on. This will cause any + `wait_readable` or `wait_writable` calls on the given object to + immediately wake up and raise `~trio.ClosedResourceError`. This doesn't actually close the object – you still have to do that yourself afterwards. Also, you want to be careful to make sure no diff --git a/trio/_core/_io_windows.py b/trio/_core/_io_windows.py index aa3e0edb7..6d2f7929e 100644 --- a/trio/_core/_io_windows.py +++ b/trio/_core/_io_windows.py @@ -1,20 +1,37 @@ -import itertools +# you can't have multiple AFD_POLL's outstanding simultaneously on the same +# socket. They get mixed up and weird stuff happens -- like you get +# notifications for the first call's flag, but with the second call's +# lpOverlapped. +# +# so we need a muxing system like _io_epoll has +# I think we can set Exclusive=1 to cause the old one to disappear, maybe? +# though really... who knows what that even does. maybe CancelIoEx is safer. +# +# for this case, we can ignore CancelIoEx errors, or the possibility of +# forgetting register_with_iocp +# can even use a dedicated completion key +# +# track {handle: Waiters object (reader task, writer task, lpOverlapped)} +# track {lpOverlapped: Waiters object} +# when need to update wait, CancelIoEx(afd_handle, lpOverlapped) +# and drop from the second dict +# then submit new lpOverlapped and store it in both places +# when event comes in, look it up in lpOverlapped table and figure out who to +# wake up +# when notify_closing, wake up both tasks and then refresh the wait +# +# consider making notify_closing work for more operations -import outcome +import itertools from contextlib import contextmanager -from select import select -import threading -from collections import deque -import signal +import errno +import outcome import attr from .. import _core from ._run import _public -from ._wakeup_socketpair import WakeupSocketpair -from .._util import is_main_thread - from ._windows_cffi import ( ffi, kernel32, @@ -62,6 +79,16 @@ # is set when associating the handle with the IOCP. We don't use it, so should # always set it to zero. # +# Socket state notifications: the public APIs that windows provides for this +# are all really awkward and don't integrate with IOCP. So we drop down to a +# lower level, and talk directly to the socket device driver in the kernel, +# which is called "AFD". The magic IOCTL_AFD_POLL operation lets us request a +# regular IOCP notification when a given socket enters a given state, which is +# exactly what we want. Unfortunately, this is a totally undocumented internal +# API. Fortunately libuv also does this, so we can be pretty confident that MS +# won't break it on us, and there is a *little* bit of information out there +# if you go digging. +# # Job notifications: effectively uses PostQueuedCompletionStatus, the # "completion key" is used to identify which job we're talking about, and the # other two scalars are overloaded to contain arbitrary data. @@ -79,6 +106,10 @@ # - other completion keys are available for user use +def reprO(lpOverlapped): + return hex(int(ffi.cast("uintptr_t", lpOverlapped))) + #return repr(ffi.cast("void *", lpOverlapped)) + def _check(success): if not success: raise_winerror() @@ -107,7 +138,6 @@ def _get_base_socket(sock): return base_ptr[0] -# We'll use CreateFile and DeviceIoControl instead of the Nt* versions def _afd_helper_handle(): # The "AFD" driver is exposed at the NT path "\Device\Afd". We're using # the Win32 CreateFile, though, so we have to pass a Win32 path. \\.\ is @@ -135,8 +165,48 @@ def _afd_helper_handle(): raise_winerror() return handle -# "readable" is AFD_POLL_RECEIVE | AFD_POLL_ACCEPT | AFD_POLL_ABORT | AFD_POLL_DISCONNECT | AFD_POLL_LOCAL_CLOSE -# "writable" is AFD_POLL_SEND | AFD_POLL_CONNECT_FAIL | AFD_POLL_ABORT | AFD_POLL_LOCAL_CLOSE + +# AFD_POLL has a finer-grained set of events than other APIs. We collapse them +# down into Unix-style "readable" and "writable". +# +# There's also a AFD_POLL_LOCAL_CLOSE that we could wait on, to potentially +# catch some cases where someone forgot to call notify_closing. But it's not +# reliable – e.g. if the socket has been dup'ed, then closing one of the +# handles doesn't trigger the event – and it's not available on Unix-like +# platforms. So it seems like including it here would be more likely to mask +# subtle bugs than to actually help anything. + +READABLE_FLAGS = ( + AFDPollFlags.AFD_POLL_RECEIVE + | AFDPollFlags.AFD_POLL_ACCEPT + | AFDPollFlags.AFD_POLL_DISCONNECT # other side sent an EOF + | AFDPollFlags.AFD_POLL_ABORT +) + +WRITABLE_FLAGS = ( + AFDPollFlags.AFD_POLL_SEND + | AFDPollFlags.AFD_POLL_CONNECT_FAIL + | AFDPollFlags.AFD_POLL_ABORT +) + + +# Annoyingly, while the API makes it *seem* like you can happily issue +# as many independent AFD_POLL operations as you want without them interfering +# with each other, in fact if you issue two AFD_POLL operations for the same +# socket at the same time, then Windows gets super confused. For example, if +# we issue one operation from wait_readable, and another independent operation +# from wait_writable, then Windows may complete the wait_writable operation +# when the socket becomes readable. +# +# To avoid this, we have to coalesce all operations on a single socket, which +# is slightly fiddly, though not really any worse than what we have to do for +# epoll. + +@attr.s(slots=True, eq=False): +class AFDWaiters: + read_task = attr.ib(default=None) + write_task = attr.ib(default=None) + current_lpOverlapped = attr.ib(default=None) @attr.s(slots=True, eq=False, frozen=True) @@ -145,10 +215,16 @@ class _WindowsStatistics: completion_key_monitors = attr.ib() tasks_waiting_socket_readable = attr.ib() tasks_waiting_socket_writable = attr.ib() - iocp_backlog = attr.ib() backend = attr.ib(default="windows") +# Maximum number of events to dequeue from the completion port on each pass +# through the run loop. Somewhat arbitrary. Should be large enough to collect +# a good set of tasks on each loop, but not so large to waste tons of memory. +# (Each call to trio.run allocates a buffer that's ~32x this number.) +MAX_EVENTS = 1000 + + @attr.s(frozen=True) class CompletionKeyEventInfo: lpOverlapped = attr.ib() @@ -157,8 +233,13 @@ class CompletionKeyEventInfo: class WindowsIOManager: def __init__(self): - # https://msdn.microsoft.com/en-us/library/windows/desktop/aa363862(v=vs.85).aspx - self._closed = True + # If this method raises an exception, then __del__ could run on a + # half-initialized object. So initialize everything that __del__ + # touches to known values up front, before we do anything that can + # fail. + self._iocp = None + self._afd = None + self._iocp = _check( kernel32.CreateIoCompletionPort( INVALID_HANDLE_VALUE, ffi.NULL, 0, 0 @@ -166,208 +247,116 @@ def __init__(self): ) self._afd = _afd_helper_handle() self.register_with_iocp(self._afd) - self._closed = False - self._iocp_queue = deque() - self._iocp_thread = None + self._events = ffi.new("OVERLAPPED_ENTRY[]", MAX_EVENTS) + + # {lpOverlapped: task} + # These tasks also carry (handle, lpOverlapped) in their + # custom_sleep_data, unless they've already been cancelled or + # rescheduled. self._overlapped_waiters = {} + # These are all listed in overlapped_waiters too; these extra dicts + # are to (a) catch when two tasks try to wait on the same socket at + # the same time, (b) find tasks to wake up in notify_closing. + # {SOCKET: task} + self._wait_readable_tasks = {} + self._wait_writable_tasks = {} self._posted_too_late_to_cancel = set() self._completion_key_queues = {} # Completion key 0 is reserved for regular IO events. # Completion key 1 is used by the fallback post from a regular # IO event's abort_fn to catch the user forgetting to call - # register_wiht_iocp. + # register_with_iocp. self._completion_key_counter = itertools.count(2) - # {stdlib socket object: task} - # except that wakeup socket is mapped to None - self._socket_waiters = {"read": {}, "write": {}} - self._main_thread_waker = WakeupSocketpair() - wakeup_sock = self._main_thread_waker.wakeup_sock - self._socket_waiters["read"][wakeup_sock] = None - - # This is necessary to allow control-C to interrupt select(). - # https://github.com/python-trio/trio/issues/42 - if is_main_thread(): - fileno = self._main_thread_waker.write_sock.fileno() - self._old_signal_wakeup_fd = signal.set_wakeup_fd(fileno) - def statistics(self): return _WindowsStatistics( tasks_waiting_overlapped=len(self._overlapped_waiters), completion_key_monitors=len(self._completion_key_queues), - tasks_waiting_socket_readable=len(self._socket_waiters["read"]), - tasks_waiting_socket_writable=len(self._socket_waiters["write"]), - iocp_backlog=len(self._iocp_queue), + tasks_waiting_socket_readable=len(self._wait_readable_tasks), + tasks_waiting_socket_writable=len(self._wait_writable_tasks), ) def close(self): - if not self._closed: - self._closed = True - _check(kernel32.CloseHandle(self._iocp)) - if self._iocp_thread is not None: - self._iocp_thread.join() - self._main_thread_waker.close() - if is_main_thread(): - signal.set_wakeup_fd(self._old_signal_wakeup_fd) + try: + if self._iocp is not None: + iocp = self._iocp + self._iocp = None + _check(kernel32.CloseHandle(iocp)) + finally: + if self._afd is not None: + afd = self._afd + self._afd = None + _check(kernel32.CloseHandle(afd)) def __del__(self): - # Need to make sure we clean up self._iocp (raw handle) and the IOCP - # thread. self.close() def handle_io(self, timeout): - # Step 0: the first time through, initialize the IOCP thread - if self._iocp_thread is None: - # The rare non-daemonic thread -- close() should always be called, - # even on error paths, and we want to join it there. - self._iocp_thread = threading.Thread( - target=self._iocp_thread_fn, name="trio-IOCP" - ) - self._iocp_thread.start() - - # Step 1: select for sockets, with the given timeout. - # If there are events queued from the IOCP thread, then the timeout is - # implicitly reduced to 0 b/c the wakeup socket has pending data in - # it. - def socket_ready(what, sock, result): - task = self._socket_waiters[what].pop(sock) - _core.reschedule(task, result) - - def socket_check(what, sock): - try: - select([sock], [sock], [sock], 0) - except OSError as exc: - socket_ready(what, sock, outcome.Error(exc)) - - def do_select(): - r_waiting = self._socket_waiters["read"] - w_waiting = self._socket_waiters["write"] - # We select for exceptional conditions on the writable set because - # on Windows, a failed non-blocking connect shows up as - # "exceptional". Everyone else uses "writable" for this, so we - # normalize it. - r, w1, w2 = select(r_waiting, w_waiting, w_waiting, timeout) - return r, set(w1 + w2) - + # arbitrary limit + received = ffi.new("PULONG") + milliseconds = round(1000 * timeout) + if timeout > 0 and milliseconds == 0: + milliseconds = 1 try: - r, w = do_select() - except OSError: - # Some socket was closed or similar. Track it down and get rid of - # it. - for what in ["read", "write"]: - for sock in self._socket_waiters[what]: - socket_check(what, sock) - r, w = do_select() - - for sock in r: - if sock is not self._main_thread_waker.wakeup_sock: - socket_ready("read", sock, outcome.Value(None)) - for sock in w: - socket_ready("write", sock, outcome.Value(None)) - - # Step 2: drain the wakeup socket. - # This must be done before checking the IOCP queue. - self._main_thread_waker.drain() - - # Step 3: process the IOCP queue. If new events arrive while we're - # processing the queue then we leave them for next time. - # XX should probably have some sort emergency bail out if the queue - # gets too long? - for _ in range(len(self._iocp_queue)): - msg = self._iocp_queue.popleft() - if isinstance(msg, BaseException): - # IOCP thread encountered some unexpected error -- give up and - # let the user know. - raise msg - batch, received = msg - for i in range(received): - entry = batch[i] - if entry.lpCompletionKey == 0: - # Regular I/O event, dispatch on lpOverlapped + _check(kernel32.GetQueuedCompletionStatusEx( + self._iocp, self._events, MAX_EVENTS, received, milliseconds, 0 + ) + ) + except OSError as exc: + if exc.winerror == ErrorCodes.WAIT_TIMEOUT: + return + raise + for i in range(received[0]): + entry = self._events[i] + if entry.lpCompletionKey == 0: + # Regular I/O event, dispatch on lpOverlapped + print(f"waking {reprO(entry.lpOverlapped)}") + waiter = self._overlapped_waiters.pop(entry.lpOverlapped) + _core.reschedule(waiter) + elif entry.lpCompletionKey == 1: + # Post made by a regular I/O event's abort_fn + # after it failed to cancel the I/O. If we still + # have a waiter with this lpOverlapped, we didn't + # get the regular I/O completion and almost + # certainly the user forgot to call + # register_with_iocp. + self._posted_too_late_to_cancel.remove(entry.lpOverlapped) + try: waiter = self._overlapped_waiters.pop(entry.lpOverlapped) - _core.reschedule(waiter) - elif entry.lpCompletionKey == 1: - # Post made by a regular I/O event's abort_fn - # after it failed to cancel the I/O. If we still - # have a waiter with this lpOverlapped, we didn't - # get the regular I/O completion and almost - # certainly the user forgot to call - # register_with_iocp. - self._posted_too_late_to_cancel.remove(entry.lpOverlapped) - try: - waiter = self._overlapped_waiters.pop( - entry.lpOverlapped - ) - except KeyError: - # Looks like the actual completion got here - # before this fallback post did -- we're in - # the "expected" case of too-late-to-cancel, - # where the user did nothing wrong and the - # main thread just got backlogged relative to - # the IOCP thread somehow. Nothing more to do. - pass - else: - exc = _core.TrioInternalError( - "Failed to cancel overlapped I/O in {} and didn't " - "receive the completion either. Did you forget to " - "call register_with_iocp()?".format(waiter.name) - ) - # Raising this out of handle_io ensures that - # the user will see our message even if some - # other task is in an uncancellable wait due - # to the same underlying forgot-to-register - # issue (if their CancelIoEx succeeds, we - # have no way of noticing that their completion - # won't arrive). Unfortunately it loses the - # task traceback. If you're debugging this - # error and can't tell where it's coming from, - # try changing this line to - # _core.reschedule(waiter, outcome.Error(exc)) - raise exc + except KeyError: + # Looks like the actual completion got here before this + # fallback post did -- we're in the "expected" case of + # too-late-to-cancel, where the user did nothing wrong. + # Nothing more to do. + pass else: - # dispatch on lpCompletionKey - queue = self._completion_key_queues[entry.lpCompletionKey] - overlapped = int(ffi.cast("uintptr_t", entry.lpOverlapped)) - transferred = entry.dwNumberOfBytesTransferred - info = CompletionKeyEventInfo( - lpOverlapped=overlapped, - dwNumberOfBytesTransferred=transferred, - ) - queue.put_nowait(info) - - def _iocp_thread_fn(self): - # This thread sits calling GetQueuedCompletionStatusEx forever. To - # signal that it should shut down, the main thread just closes the - # IOCP, which causes GetQueuedCompletionStatusEx to return with an - # error: - IOCP_CLOSED_ERRORS = { - # If the IOCP is closed while we're blocked in - # GetQueuedCompletionStatusEx, then we get this error: - ErrorCodes.ERROR_ABANDONED_WAIT_0, - # If the IOCP is already closed when we initiate a - # GetQueuedCompletionStatusEx, then we get this error: - ErrorCodes.ERROR_INVALID_HANDLE, - } - while True: - max_events = 1 - batch = ffi.new("OVERLAPPED_ENTRY[]", max_events) - received = ffi.new("PULONG") - # https://msdn.microsoft.com/en-us/library/windows/desktop/aa364988(v=vs.85).aspx - try: - _check( - kernel32.GetQueuedCompletionStatusEx( - self._iocp, batch, max_events, received, 0xffffffff, 0 + exc = _core.TrioInternalError( + "Failed to cancel overlapped I/O in {} and didn't " + "receive the completion either. Did you forget to " + "call register_with_iocp()?".format(waiter.name) ) + # Raising this out of handle_io ensures that + # the user will see our message even if some + # other task is in an uncancellable wait due + # to the same underlying forgot-to-register + # issue (if their CancelIoEx succeeds, we + # have no way of noticing that their completion + # won't arrive). Unfortunately it loses the + # task traceback. If you're debugging this + # error and can't tell where it's coming from, + # try changing this line to + # _core.reschedule(waiter, outcome.Error(exc)) + raise exc + else: + # dispatch on lpCompletionKey + queue = self._completion_key_queues[entry.lpCompletionKey] + overlapped = int(ffi.cast("uintptr_t", entry.lpOverlapped)) + transferred = entry.dwNumberOfBytesTransferred + info = CompletionKeyEventInfo( + lpOverlapped=overlapped, + dwNumberOfBytesTransferred=transferred, ) - except OSError as exc: - if exc.winerror in IOCP_CLOSED_ERRORS: - # The IOCP handle was closed; time to shut down. - return - else: - self._iocp_queue.append(exc) - return - self._iocp_queue.append((batch, received[0])) - self._main_thread_waker.wakeup_thread_and_signal_safe() + queue.put_nowait(info) @_public def current_iocp(self): @@ -384,7 +373,57 @@ def register_with_iocp(self, handle): # ability to do WaitForSingleObject(handle). We would never want to do # that anyway, so might as well get the extra speed (if any). # Ref: http://www.lenholgate.com/blog/2009/09/interesting-blog-posts-on-high-performance-servers.html - _check(kernel32.SetFileCompletionNotificationModes(handle, CompletionModes.FILE_SKIP_SET_EVENT_ON_HANDLE)) + _check( + kernel32.SetFileCompletionNotificationModes( + handle, CompletionModes.FILE_SKIP_SET_EVENT_ON_HANDLE + ) + ) + + def _try_cancel_io_ex_for_task(self, task): + if task.custom_sleep_data is None: + return + handle, lpOverlapped = task.custom_sleep_data + task.custom_sleep_data = None + try: + _check(kernel32.CancelIoEx(handle, lpOverlapped)) + except OSError as exc: + if exc.winerror == ErrorCodes.ERROR_NOT_FOUND: + # Too late to cancel. If this happens because the + # operation is already completed, we don't need to + # do anything; presumably the IOCP thread will be + # reporting back about that completion soon. But + # another possibility is that the operation was + # performed on a handle that wasn't registered + # with our IOCP (ie, the user forgot to call + # register_with_iocp), in which case we're just + # never going to see the completion. To avoid an + # uncancellable infinite sleep in the latter case, + # we'll PostQueuedCompletionStatus here, and if + # our post arrives before the original completion + # does, we'll assume the handle wasn't registered. + _check( + kernel32.PostQueuedCompletionStatus( + self._iocp, 0, 1, lpOverlapped + ) + ) + # Keep the lpOverlapped referenced so its address + # doesn't get reused until our posted completion + # status has been processed. Otherwise, we can + # get confused about which completion goes with + # which I/O. + self._posted_too_late_to_cancel.add(lpOverlapped) + else: # pragma: no cover + raise _core.TrioInternalError( + "CancelIoEx failed with unexpected error" + ) from exc + + @_public + def notify_closing(self, handle): + handle = _get_base_socket(handle) + for tasks in [self._wait_readable_tasks, self._wait_writable_tasks]: + task = tasks.get(handle) + if task is not None: + self._try_cancel_io_ex_for_task(task) @_public async def wait_overlapped(self, handle, lpOverlapped): @@ -397,49 +436,13 @@ async def wait_overlapped(self, handle, lpOverlapped): ) task = _core.current_task() self._overlapped_waiters[lpOverlapped] = task + task.custom_sleep_data = (handle, lpOverlapped) raise_cancel = None def abort(raise_cancel_): - # https://msdn.microsoft.com/en-us/library/windows/desktop/aa363792(v=vs.85).aspx - # the _check here is probably wrong -- I guess we should just - # ignore errors? but at least it will let us learn what errors are - # possible -- the docs are pretty unclear. nonlocal raise_cancel raise_cancel = raise_cancel_ - try: - _check(kernel32.CancelIoEx(handle, lpOverlapped)) - except OSError as exc: - if exc.winerror == ErrorCodes.ERROR_NOT_FOUND: - # Too late to cancel. If this happens because the - # operation is already completed, we don't need to - # do anything; presumably the IOCP thread will be - # reporting back about that completion soon. But - # another possibility is that the operation was - # performed on a handle that wasn't registered - # with our IOCP (ie, the user forgot to call - # register_with_iocp), in which case we're just - # never going to see the completion. To avoid an - # uncancellable infinite sleep in the latter case, - # we'll PostQueuedCompletionStatus here, and if - # our post arrives before the original completion - # does, we'll assume the handle wasn't registered. - _check( - kernel32.PostQueuedCompletionStatus( - self._iocp, 0, 1, lpOverlapped - ) - ) - - # Keep the lpOverlapped referenced so its address - # doesn't get reused until our posted completion - # status has been processed. Otherwise, we can - # get confused about which completion goes with - # which I/O. - self._posted_too_late_to_cancel.add(lpOverlapped) - - else: # pragma: no cover - raise TrioInternalError( - "CancelIoEx failed with unexpected error" - ) from exc + self._try_cancel_io_ex_for_task(task) return _core.Abort.FAILED await _core.wait_task_rescheduled(abort) @@ -472,82 +475,68 @@ def monitor_completion_key(self): finally: del self._completion_key_queues[key] - async def _wait_socket(self, which, sock): - if not isinstance(sock, int): - sock = sock.fileno() - if sock in self._socket_waiters[which]: - raise _core.BusyResourceError( - "another task is already waiting to {} this socket" - .format(which) - ) - self._socket_waiters[which][sock] = _core.current_task() - - def abort(_): - del self._socket_waiters[which][sock] - return _core.Abort.SUCCEEDED - - await _core.wait_task_rescheduled(abort) - - @_public - async def wait_readable(self, sock): - await self._wait_socket("read", sock) - - @_public - async def wait_writable(self, sock): - await self._wait_socket("write", sock) - - @_public - def notify_closing(self, sock): - if not isinstance(sock, int): - sock = sock.fileno() - for mode in ["read", "write"]: - if sock in self._socket_waiters[mode]: - task = self._socket_waiters[mode].pop(sock) - exc = _core.ClosedResourceError( - "another task closed this socket" - ) - _core.reschedule(task, outcome.Error(exc)) - async def _perform_overlapped(self, handle, submit_fn): # submit_fn(lpOverlapped) submits some I/O # it may raise an OSError with ERROR_IO_PENDING # the handle must already be registered using # register_with_iocp(handle) - await _core.checkpoint_if_cancelled() lpOverlapped = ffi.new("LPOVERLAPPED") + print(f"submitting {reprO(lpOverlapped)}") try: submit_fn(lpOverlapped) except OSError as exc: if exc.winerror != ErrorCodes.ERROR_IO_PENDING: - await _core.cancel_shielded_checkpoint() raise await self.wait_overlapped(handle, lpOverlapped) return lpOverlapped - @_public - async def afd_poll(self, sock, events): + async def _afd_poll(self, sock, events, tasks): + base_handle = _get_base_socket(sock) + if base_handle in tasks: + raise _core.BusyResourceError + tasks[base_handle] = _core.current_task() + poll_info = ffi.new("AFD_POLL_INFO *") - poll_info.Timeout = 2 ** 63 - 1 # INT64_MAX + poll_info.Timeout = 2**63 - 1 # INT64_MAX poll_info.NumberOfHandles = 1 poll_info.Exclusive = 0 - poll_info.Handles[0].Handle = _get_base_socket(sock) + poll_info.Handles[0].Handle = base_handle poll_info.Handles[0].Status = 0 poll_info.Handles[0].Events = events def submit_afd_poll(lpOverlapped): - _check(kernel32.DeviceIoControl( - self._afd, - IoControlCodes.IOCTL_AFD_POLL, - poll_info, - ffi.sizeof("AFD_POLL_INFO"), - poll_info, - ffi.sizeof("AFD_POLL_INFO"), - ffi.NULL, - lpOverlapped, - )) - - await self._perform_overlapped(self._afd, submit_afd_poll) - return AFDPollFlags(poll_info.Handles[0].Events) + _check( + kernel32.DeviceIoControl( + self._afd, + IoControlCodes.IOCTL_AFD_POLL, + poll_info, + ffi.sizeof("AFD_POLL_INFO"), + poll_info, + ffi.sizeof("AFD_POLL_INFO"), + ffi.NULL, + lpOverlapped, + ) + ) + + try: + await self._perform_overlapped(self._afd, submit_afd_poll) + finally: + del tasks[base_handle] + + print("status", poll_info.Handles[0].Status) + print(repr(AFDPollFlags(poll_info.Handles[0].Events))) + + @_public + async def wait_readable(self, sock): + print("wait_readable start") + await self._afd_poll(sock, READABLE_FLAGS, self._wait_readable_tasks) + print("wait_readable finish") + + @_public + async def wait_writable(self, sock): + print("wait_writable start") + await self._afd_poll(sock, WRITABLE_FLAGS, self._wait_writable_tasks) + print("wait_writable finish") @_public async def write_overlapped(self, handle, data, file_offset=0): diff --git a/trio/_core/_windows_cffi.py b/trio/_core/_windows_cffi.py index d12f77d5f..95d1238e3 100644 --- a/trio/_core/_windows_cffi.py +++ b/trio/_core/_windows_cffi.py @@ -231,6 +231,7 @@ INVALID_HANDLE_VALUE = ffi.cast("HANDLE", -1) + class ErrorCodes(enum.IntEnum): STATUS_TIMEOUT = 0x102 WAIT_TIMEOUT = 0x102 @@ -264,16 +265,17 @@ class AFDPollFlags(enum.IntFlag): AFD_POLL_RECEIVE = 0x0001 AFD_POLL_RECEIVE_EXPEDITED = 0x0002 AFD_POLL_SEND = 0x0004 - AFD_POLL_DISCONNECT = 0x0008 - AFD_POLL_ABORT = 0x0010 - AFD_POLL_LOCAL_CLOSE = 0x0020 - AFD_POLL_ACCEPT = 0x0080 - AFD_POLL_CONNECT_FAIL = 0x0100 + AFD_POLL_DISCONNECT = 0x0008 + AFD_POLL_ABORT = 0x0010 + AFD_POLL_LOCAL_CLOSE = 0x0020 + AFD_POLL_ACCEPT = 0x0080 + AFD_POLL_CONNECT_FAIL = 0x0100 class WSAIoctls(enum.IntEnum): SIO_BASE_HANDLE = 0x48000022 + class CompletionModes(enum.IntFlag): FILE_SKIP_COMPLETION_PORT_ON_SUCCESS = 0x1 FILE_SKIP_SET_EVENT_ON_HANDLE = 0x2 @@ -287,6 +289,7 @@ class IoControlCodes(enum.IntEnum): # Generic helpers ################################################################ + def _handle(obj): # For now, represent handles as either cffi HANDLEs or as ints. If you # try to pass in a file descriptor instead, it's not going to work diff --git a/trio/_core/tests/test_io.py b/trio/_core/tests/test_io.py index f06f7777e..ae0d8f7a9 100644 --- a/trio/_core/tests/test_io.py +++ b/trio/_core/tests/test_io.py @@ -216,6 +216,7 @@ async def w_task(sock): assert record == [] b.send(b"x") await wait_all_tasks_blocked() + await trio.sleep(1) assert record == ["r_task"] drain_socket(b) await wait_all_tasks_blocked() From d17431b6c9e2d440ab27bb7c0691180cea86065f Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 25 Oct 2019 02:05:04 -0700 Subject: [PATCH 05/29] Rewritten and working (?) IOCP-only windows backend --- docs/source/reference-hazmat.rst | 6 +- newsfragments/52.feature.rst | 4 + trio/_core/_generated_io_windows.py | 21 +- trio/_core/_io_windows.py | 477 +++++++++++++++++----------- trio/_socket.py | 5 +- trio/_windows_pipes.py | 6 + 6 files changed, 309 insertions(+), 210 deletions(-) create mode 100644 newsfragments/52.feature.rst diff --git a/docs/source/reference-hazmat.rst b/docs/source/reference-hazmat.rst index 98519392b..7f32ebf98 100644 --- a/docs/source/reference-hazmat.rst +++ b/docs/source/reference-hazmat.rst @@ -166,9 +166,9 @@ All environments provide the following functions: .. function:: notify_closing(obj) Call this before closing a file descriptor (on Unix) or socket (on - Windows) that another task might be waiting on. This will cause any - `wait_readable` or `wait_writable` calls on the given object to - immediately wake up and raise `~trio.ClosedResourceError`. + Windows). This will cause any `wait_readable` or `wait_writable` + calls on the given object to immediately wake up and raise + `~trio.ClosedResourceError`. This doesn't actually close the object – you still have to do that yourself afterwards. Also, you want to be careful to make sure no diff --git a/newsfragments/52.feature.rst b/newsfragments/52.feature.rst new file mode 100644 index 000000000..69ba28ade --- /dev/null +++ b/newsfragments/52.feature.rst @@ -0,0 +1,4 @@ +Trio's Windows backend was rewritten to use IOCP exclusively, instead +of a hybrid of ``select`` + IOCP. This should make it much faster and +more scalable. It also simplifies the code internally, and paves the +way for future improvements. diff --git a/trio/_core/_generated_io_windows.py b/trio/_core/_generated_io_windows.py index 640f8b9ac..f0490f990 100644 --- a/trio/_core/_generated_io_windows.py +++ b/trio/_core/_generated_io_windows.py @@ -20,6 +20,13 @@ def register_with_iocp(handle): except AttributeError: raise RuntimeError('must be called from async context') +def notify_closing(handle): + locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True + try: + return GLOBAL_RUN_CONTEXT.runner.io_manager.notify_closing(handle) + except AttributeError: + raise RuntimeError('must be called from async context') + async def wait_overlapped(handle, lpOverlapped): locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True try: @@ -48,20 +55,6 @@ async def wait_writable(sock): except AttributeError: raise RuntimeError('must be called from async context') -def notify_closing(sock): - locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True - try: - return GLOBAL_RUN_CONTEXT.runner.io_manager.notify_closing(sock) - except AttributeError: - raise RuntimeError('must be called from async context') - -async def afd_poll(sock, events): - locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True - try: - return await GLOBAL_RUN_CONTEXT.runner.io_manager.afd_poll(sock, events) - except AttributeError: - raise RuntimeError('must be called from async context') - async def write_overlapped(handle, data, file_offset=0): locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True try: diff --git a/trio/_core/_io_windows.py b/trio/_core/_io_windows.py index 6d2f7929e..453adb297 100644 --- a/trio/_core/_io_windows.py +++ b/trio/_core/_io_windows.py @@ -1,30 +1,6 @@ -# you can't have multiple AFD_POLL's outstanding simultaneously on the same -# socket. They get mixed up and weird stuff happens -- like you get -# notifications for the first call's flag, but with the second call's -# lpOverlapped. -# -# so we need a muxing system like _io_epoll has -# I think we can set Exclusive=1 to cause the old one to disappear, maybe? -# though really... who knows what that even does. maybe CancelIoEx is safer. -# -# for this case, we can ignore CancelIoEx errors, or the possibility of -# forgetting register_with_iocp -# can even use a dedicated completion key -# -# track {handle: Waiters object (reader task, writer task, lpOverlapped)} -# track {lpOverlapped: Waiters object} -# when need to update wait, CancelIoEx(afd_handle, lpOverlapped) -# and drop from the second dict -# then submit new lpOverlapped and store it in both places -# when event comes in, look it up in lpOverlapped table and figure out who to -# wake up -# when notify_closing, wake up both tasks and then refresh the wait -# -# consider making notify_closing work for more operations - import itertools from contextlib import contextmanager -import errno +import enum import outcome import attr @@ -106,10 +82,19 @@ # - other completion keys are available for user use +# The completion keys we use +class CKeys(enum.IntEnum): + AFD_POLL = 0 + WAIT_OVERLAPPED = 1 + LATE_CANCEL = 2 + USER_DEFINED = 3 # and above + + def reprO(lpOverlapped): return hex(int(ffi.cast("uintptr_t", lpOverlapped))) #return repr(ffi.cast("void *", lpOverlapped)) + def _check(success): if not success: raise_winerror() @@ -198,30 +183,39 @@ def _afd_helper_handle(): # from wait_writable, then Windows may complete the wait_writable operation # when the socket becomes readable. # -# To avoid this, we have to coalesce all operations on a single socket, which -# is slightly fiddly, though not really any worse than what we have to do for -# epoll. - -@attr.s(slots=True, eq=False): +# To avoid this, we have to coalesce all the operations on a single socket +# into one, and when the set of waiters changes we have to throw away the old +# operation and start a new one. +@attr.s(slots=True, eq=False) class AFDWaiters: read_task = attr.ib(default=None) write_task = attr.ib(default=None) - current_lpOverlapped = attr.ib(default=None) + current_op = attr.ib(default=None) + + +# We also need to bundle up all the info for a single op into a standalone +# object, because we need to keep all these objects alive until the operation +# finishes, even if we're throwing it away. +@attr.s(slots=True, eq=False, frozen=True) +class AFDPollOp: + lpOverlapped = attr.ib() + poll_info = attr.ib() + waiters = attr.ib() @attr.s(slots=True, eq=False, frozen=True) class _WindowsStatistics: + tasks_waiting_readable = attr.ib() + tasks_waiting_writable = attr.ib() tasks_waiting_overlapped = attr.ib() completion_key_monitors = attr.ib() - tasks_waiting_socket_readable = attr.ib() - tasks_waiting_socket_writable = attr.ib() backend = attr.ib(default="windows") # Maximum number of events to dequeue from the completion port on each pass # through the run loop. Somewhat arbitrary. Should be large enough to collect # a good set of tasks on each loop, but not so large to waste tons of memory. -# (Each call to trio.run allocates a buffer that's ~32x this number.) +# (Each WindowsIOManager holds a buffer whose size is ~32x this number.) MAX_EVENTS = 1000 @@ -234,8 +228,8 @@ class CompletionKeyEventInfo: class WindowsIOManager: def __init__(self): # If this method raises an exception, then __del__ could run on a - # half-initialized object. So initialize everything that __del__ - # touches to known values up front, before we do anything that can + # half-initialized object. So we initialize everything that __del__ + # touches to safe values up front, before we do anything that can # fail. self._iocp = None self._afd = None @@ -245,36 +239,21 @@ def __init__(self): INVALID_HANDLE_VALUE, ffi.NULL, 0, 0 ) ) - self._afd = _afd_helper_handle() - self.register_with_iocp(self._afd) self._events = ffi.new("OVERLAPPED_ENTRY[]", MAX_EVENTS) + self._afd = _afd_helper_handle() + self._register_with_iocp(self._afd, CKeys.AFD_POLL) + # {lpOverlapped: AFDPollOp} + self._afd_ops = {} + # {socket handle: AFDWaiters} + self._afd_waiters = {} + # {lpOverlapped: task} - # These tasks also carry (handle, lpOverlapped) in their - # custom_sleep_data, unless they've already been cancelled or - # rescheduled. self._overlapped_waiters = {} - # These are all listed in overlapped_waiters too; these extra dicts - # are to (a) catch when two tasks try to wait on the same socket at - # the same time, (b) find tasks to wake up in notify_closing. - # {SOCKET: task} - self._wait_readable_tasks = {} - self._wait_writable_tasks = {} self._posted_too_late_to_cancel = set() - self._completion_key_queues = {} - # Completion key 0 is reserved for regular IO events. - # Completion key 1 is used by the fallback post from a regular - # IO event's abort_fn to catch the user forgetting to call - # register_with_iocp. - self._completion_key_counter = itertools.count(2) - def statistics(self): - return _WindowsStatistics( - tasks_waiting_overlapped=len(self._overlapped_waiters), - completion_key_monitors=len(self._completion_key_queues), - tasks_waiting_socket_readable=len(self._wait_readable_tasks), - tasks_waiting_socket_writable=len(self._wait_writable_tasks), - ) + self._completion_key_queues = {} + self._completion_key_counter = itertools.count(CKeys.USER_DEFINED) def close(self): try: @@ -291,29 +270,72 @@ def close(self): def __del__(self): self.close() + def statistics(self): + tasks_waiting_readable = 0 + tasks_waiting_writable = 0 + for waiter in self._afd_waiters.values(): + if waiter.read_task is not None: + tasks_waiting_readable += 1 + if waiter.write_task is not None: + tasks_waiting_writable += 1 + return _WindowsStatistics( + tasks_waiting_readable=tasks_waiting_readable, + tasks_waiting_writable=tasks_waiting_writable, + tasks_waiting_overlapped=len(self._overlapped_waiters), + completion_key_monitors=len(self._completion_key_queues), + ) + def handle_io(self, timeout): - # arbitrary limit received = ffi.new("PULONG") milliseconds = round(1000 * timeout) if timeout > 0 and milliseconds == 0: milliseconds = 1 try: - _check(kernel32.GetQueuedCompletionStatusEx( - self._iocp, self._events, MAX_EVENTS, received, milliseconds, 0 + _check( + kernel32.GetQueuedCompletionStatusEx( + self._iocp, self._events, MAX_EVENTS, received, + milliseconds, 0 + ) ) - ) except OSError as exc: if exc.winerror == ErrorCodes.WAIT_TIMEOUT: return raise for i in range(received[0]): entry = self._events[i] - if entry.lpCompletionKey == 0: + if entry.lpCompletionKey == CKeys.AFD_POLL: + print(f"AFD completion: {entry.lpOverlapped!r}") + lpo = entry.lpOverlapped + op = self._afd_ops.pop(lpo) + waiters = op.waiters + if waiters.current_op is not op: + # Stale op, nothing to do + print("stale, nothing to do") + pass + else: + waiters.current_op = None + # I don't think this can happen, so if it does let's crash + # and get a debug trace. + if lpo.Internal != 0: # pragma: no cover + code = ntdll.RtlNtStatusToDosError(lpo.Internal) + raise_winerror(code) + flags = op.poll_info.Handles[0].Events + print("flags: {AFDPollFlags(flags)!r}") + if waiters.read_task and flags & READABLE_FLAGS: + print("waking reader") + _core.reschedule(waiters.read_task) + waiters.read_task = None + if waiters.write_task and flags & WRITABLE_FLAGS: + print("waking writer") + _core.reschedule(waiters.write_task) + waiters.write_task = None + self._refresh_afd(op.poll_info.Handles[0].Handle) + elif entry.lpCompletionKey == CKeys.WAIT_OVERLAPPED: # Regular I/O event, dispatch on lpOverlapped print(f"waking {reprO(entry.lpOverlapped)}") waiter = self._overlapped_waiters.pop(entry.lpOverlapped) _core.reschedule(waiter) - elif entry.lpCompletionKey == 1: + elif entry.lpCompletionKey == CKeys.LATE_CANCEL: # Post made by a regular I/O event's abort_fn # after it failed to cancel the I/O. If we still # have a waiter with this lpOverlapped, we didn't @@ -358,17 +380,13 @@ def handle_io(self, timeout): ) queue.put_nowait(info) - @_public - def current_iocp(self): - return int(ffi.cast("uintptr_t", self._iocp)) - - @_public - def register_with_iocp(self, handle): + def _register_with_iocp(self, handle, completion_key): handle = _handle(handle) - # https://msdn.microsoft.com/en-us/library/windows/desktop/aa363862(v=vs.85).aspx - # INVALID_PARAMETER seems to be used for both "can't register - # because not opened in OVERLAPPED mode" and "already registered" - _check(kernel32.CreateIoCompletionPort(handle, self._iocp, 0, 0)) + _check( + kernel32.CreateIoCompletionPort( + handle, self._iocp, completion_key, 0 + ) + ) # Supposedly this makes things slightly faster, by disabling the # ability to do WaitForSingleObject(handle). We would never want to do # that anyway, so might as well get the extra speed (if any). @@ -379,51 +397,126 @@ def register_with_iocp(self, handle): ) ) - def _try_cancel_io_ex_for_task(self, task): - if task.custom_sleep_data is None: - return - handle, lpOverlapped = task.custom_sleep_data - task.custom_sleep_data = None - try: - _check(kernel32.CancelIoEx(handle, lpOverlapped)) - except OSError as exc: - if exc.winerror == ErrorCodes.ERROR_NOT_FOUND: - # Too late to cancel. If this happens because the - # operation is already completed, we don't need to - # do anything; presumably the IOCP thread will be - # reporting back about that completion soon. But - # another possibility is that the operation was - # performed on a handle that wasn't registered - # with our IOCP (ie, the user forgot to call - # register_with_iocp), in which case we're just - # never going to see the completion. To avoid an - # uncancellable infinite sleep in the latter case, - # we'll PostQueuedCompletionStatus here, and if - # our post arrives before the original completion - # does, we'll assume the handle wasn't registered. + ################################################################ + # AFD stuff + ################################################################ + + def _refresh_afd(self, base_handle): + waiters = self._afd_waiters[base_handle] + print(f"refreshing AFD for {base_handle!r}: {waiters!r}") + if waiters.current_op is not None: + try: + _check( + kernel32.CancelIoEx( + self._afd, waiters.current_op.lpOverlapped + ) + ) + except OSError as exc: + if exc.winerror != ErrorCodes.ERROR_NOT_FOUND: + raise + waiters.current_op = None + + flags = 0 + if waiters.read_task is not None: + flags |= READABLE_FLAGS + if waiters.write_task is not None: + flags |= WRITABLE_FLAGS + + if not flags: + print("nothing to do; clearing") + del self._afd_waiters[base_handle] + else: + lpOverlapped = ffi.new("LPOVERLAPPED") + print(f"new AFD: {reprO(lpOverlapped)} for {flags!r} on {base_handle!r}") + + poll_info = ffi.new("AFD_POLL_INFO *") + poll_info.Timeout = 2**63 - 1 # INT64_MAX + poll_info.NumberOfHandles = 1 + poll_info.Exclusive = 0 + poll_info.Handles[0].Handle = base_handle + poll_info.Handles[0].Status = 0 + poll_info.Handles[0].Events = flags + + try: _check( - kernel32.PostQueuedCompletionStatus( - self._iocp, 0, 1, lpOverlapped + kernel32.DeviceIoControl( + self._afd, + IoControlCodes.IOCTL_AFD_POLL, + poll_info, + ffi.sizeof("AFD_POLL_INFO"), + poll_info, + ffi.sizeof("AFD_POLL_INFO"), + ffi.NULL, + lpOverlapped, ) ) - # Keep the lpOverlapped referenced so its address - # doesn't get reused until our posted completion - # status has been processed. Otherwise, we can - # get confused about which completion goes with - # which I/O. - self._posted_too_late_to_cancel.add(lpOverlapped) - else: # pragma: no cover - raise _core.TrioInternalError( - "CancelIoEx failed with unexpected error" - ) from exc + except OSError as exc: + if exc.winerror != ErrorCodes.ERROR_IO_PENDING: + raise + + op = AFDPollOp(lpOverlapped, poll_info, waiters) + waiters.current_op = op + self._afd_ops[lpOverlapped] = op + + async def _afd_poll(self, sock, mode): + base_handle = _get_base_socket(sock) + waiters = self._afd_waiters.get(base_handle) + if waiters is None: + waiters = AFDWaiters() + self._afd_waiters[base_handle] = waiters + if getattr(waiters, mode) is not None: + raise _core.BusyResourceError + setattr(waiters, mode, _core.current_task()) + self._refresh_afd(base_handle) + + def abort_fn(_): + print(f"_afd_poll({base_handle!r}, {mode!r}) cancelled") + setattr(waiters, mode, None) + self._refresh_afd(base_handle) + return _core.Abort.SUCCEEDED + + await _core.wait_task_rescheduled(abort_fn) + + @_public + async def wait_readable(self, sock): + print("wait_readable start") + try: + await self._afd_poll(sock, "read_task") + finally: + print("wait_readable finish") + + @_public + async def wait_writable(self, sock): + print("wait_writable start") + try: + await self._afd_poll(sock, "write_task") + finally: + print("wait_writable finish") @_public def notify_closing(self, handle): handle = _get_base_socket(handle) - for tasks in [self._wait_readable_tasks, self._wait_writable_tasks]: - task = tasks.get(handle) - if task is not None: - self._try_cancel_io_ex_for_task(task) + waiters = self._afd_waiters.get(handle) + if waiters is not None: + if waiters.read_task is not None: + _core.reschedule( + waiters.read_task, outcome.Error(_core.ClosedResourceError()) + ) + waiters.read_task = None + if waiters.write_task is not None: + _core.reschedule( + waiters.write_task, outcome.Error(_core.ClosedResourceError()) + ) + waiters.write_task = None + self._refresh_afd(handle) + + ################################################################ + # Regular overlapped operations + ################################################################ + + @_public + def register_with_iocp(self, handle): + self._register_with_iocp(handle, CKeys.WAIT_OVERLAPPED) @_public async def wait_overlapped(self, handle, lpOverlapped): @@ -436,13 +529,43 @@ async def wait_overlapped(self, handle, lpOverlapped): ) task = _core.current_task() self._overlapped_waiters[lpOverlapped] = task - task.custom_sleep_data = (handle, lpOverlapped) raise_cancel = None def abort(raise_cancel_): nonlocal raise_cancel raise_cancel = raise_cancel_ - self._try_cancel_io_ex_for_task(task) + try: + _check(kernel32.CancelIoEx(handle, lpOverlapped)) + except OSError as exc: + if exc.winerror == ErrorCodes.ERROR_NOT_FOUND: + # Too late to cancel. If this happens because the + # operation is already completed, we don't need to + # do anything; presumably the IOCP thread will be + # reporting back about that completion soon. But + # another possibility is that the operation was + # performed on a handle that wasn't registered + # with our IOCP (ie, the user forgot to call + # register_with_iocp), in which case we're just + # never going to see the completion. To avoid an + # uncancellable infinite sleep in the latter case, + # we'll PostQueuedCompletionStatus here, and if + # our post arrives before the original completion + # does, we'll assume the handle wasn't registered. + _check( + kernel32.PostQueuedCompletionStatus( + self._iocp, 0, CKeys.LATE_CANCEL, lpOverlapped + ) + ) + # Keep the lpOverlapped referenced so its address + # doesn't get reused until our posted completion + # status has been processed. Otherwise, we can + # get confused about which completion goes with + # which I/O. + self._posted_too_late_to_cancel.add(lpOverlapped) + else: # pragma: no cover + raise _core.TrioInternalError( + "CancelIoEx failed with unexpected error" + ) from exc return _core.Abort.FAILED await _core.wait_task_rescheduled(abort) @@ -464,22 +587,15 @@ def abort(raise_cancel_): else: raise_winerror(code) - @contextmanager - @_public - def monitor_completion_key(self): - key = next(self._completion_key_counter) - queue = _core.UnboundedQueue() - self._completion_key_queues[key] = queue - try: - yield (key, queue) - finally: - del self._completion_key_queues[key] - async def _perform_overlapped(self, handle, submit_fn): # submit_fn(lpOverlapped) submits some I/O # it may raise an OSError with ERROR_IO_PENDING # the handle must already be registered using # register_with_iocp(handle) + # This always does a schedule point, but it's possible that the + # operation will not be cancellable, depending on how Windows is + # feeling today. So we need to check for cancellation manually. + await _core.checkpoint_if_cancelled() lpOverlapped = ffi.new("LPOVERLAPPED") print(f"submitting {reprO(lpOverlapped)}") try: @@ -490,54 +606,6 @@ async def _perform_overlapped(self, handle, submit_fn): await self.wait_overlapped(handle, lpOverlapped) return lpOverlapped - async def _afd_poll(self, sock, events, tasks): - base_handle = _get_base_socket(sock) - if base_handle in tasks: - raise _core.BusyResourceError - tasks[base_handle] = _core.current_task() - - poll_info = ffi.new("AFD_POLL_INFO *") - poll_info.Timeout = 2**63 - 1 # INT64_MAX - poll_info.NumberOfHandles = 1 - poll_info.Exclusive = 0 - poll_info.Handles[0].Handle = base_handle - poll_info.Handles[0].Status = 0 - poll_info.Handles[0].Events = events - - def submit_afd_poll(lpOverlapped): - _check( - kernel32.DeviceIoControl( - self._afd, - IoControlCodes.IOCTL_AFD_POLL, - poll_info, - ffi.sizeof("AFD_POLL_INFO"), - poll_info, - ffi.sizeof("AFD_POLL_INFO"), - ffi.NULL, - lpOverlapped, - ) - ) - - try: - await self._perform_overlapped(self._afd, submit_afd_poll) - finally: - del tasks[base_handle] - - print("status", poll_info.Handles[0].Status) - print(repr(AFDPollFlags(poll_info.Handles[0].Events))) - - @_public - async def wait_readable(self, sock): - print("wait_readable start") - await self._afd_poll(sock, READABLE_FLAGS, self._wait_readable_tasks) - print("wait_readable finish") - - @_public - async def wait_writable(self, sock): - print("wait_writable start") - await self._afd_poll(sock, WRITABLE_FLAGS, self._wait_writable_tasks) - print("wait_writable finish") - @_public async def write_overlapped(self, handle, data, file_offset=0): with ffi.from_buffer(data) as cbuf: @@ -563,21 +631,48 @@ def submit_write(lpOverlapped): @_public async def readinto_overlapped(self, handle, buffer, file_offset=0): - with ffi.from_buffer(buffer, require_writable=True) as cbuf: - - def submit_read(lpOverlapped): - offset_fields = lpOverlapped.DUMMYUNIONNAME.DUMMYSTRUCTNAME - offset_fields.Offset = file_offset & 0xffffffff - offset_fields.OffsetHigh = file_offset >> 32 - _check( - kernel32.ReadFile( - _handle(handle), - ffi.cast("LPVOID", cbuf), - len(cbuf), - ffi.NULL, - lpOverlapped, + print("readinto_overlapped start") + t = _core.current_task() + print(f"{t._cancel_points}, {t._schedule_points}") + try: + with ffi.from_buffer(buffer, require_writable=True) as cbuf: + + def submit_read(lpOverlapped): + offset_fields = lpOverlapped.DUMMYUNIONNAME.DUMMYSTRUCTNAME + offset_fields.Offset = file_offset & 0xffffffff + offset_fields.OffsetHigh = file_offset >> 32 + _check( + kernel32.ReadFile( + _handle(handle), + ffi.cast("LPVOID", cbuf), + len(cbuf), + ffi.NULL, + lpOverlapped, + ) ) - ) - lpOverlapped = await self._perform_overlapped(handle, submit_read) - return lpOverlapped.InternalHigh + lpOverlapped = await self._perform_overlapped(handle, submit_read) + return lpOverlapped.InternalHigh + finally: + import sys + print("readinto_overlapped done", sys.exc_info()) + print(f"{t._cancel_points}, {t._schedule_points}") + + ################################################################ + # Raw IOCP operations + ################################################################ + + @_public + def current_iocp(self): + return int(ffi.cast("uintptr_t", self._iocp)) + + @contextmanager + @_public + def monitor_completion_key(self): + key = next(self._completion_key_counter) + queue = _core.UnboundedQueue() + self._completion_key_queues[key] = queue + try: + yield (key, queue) + finally: + del self._completion_key_queues[key] diff --git a/trio/_socket.py b/trio/_socket.py index 805bfd444..18403962d 100644 --- a/trio/_socket.py +++ b/trio/_socket.py @@ -452,8 +452,9 @@ def dup(self): return _SocketType(self._sock.dup()) def close(self): - trio.hazmat.notify_closing(self._sock) - self._sock.close() + if self._sock.fileno() != -1: + trio.hazmat.notify_closing(self._sock) + self._sock.close() async def bind(self, address): address = await self._resolve_local_address(address) diff --git a/trio/_windows_pipes.py b/trio/_windows_pipes.py index 67d6ba24d..04bcdc710 100644 --- a/trio/_windows_pipes.py +++ b/trio/_windows_pipes.py @@ -115,6 +115,12 @@ async def receive_some(self, max_bytes=None) -> bytes: # whenever the other end closes, regardless of direction. # Convert this to the Unix behavior of returning EOF to the # reader when the writer closes. + # + # And since we're not raising an exception, we have to + # checkpoint. But readinto_overlapped did raise an exception, + # so it might not have checkpointed for us. So we have to + # checkpoint manually. + await _core.checkpoint() return b"" else: del buffer[size:] From d73b3cfbc87eae6fd756a408107cdf2ddcc0cf5f Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 25 Oct 2019 02:37:08 -0700 Subject: [PATCH 06/29] Remove debug prints --- trio/_core/_io_windows.py | 60 +++++++++++---------------------------- 1 file changed, 17 insertions(+), 43 deletions(-) diff --git a/trio/_core/_io_windows.py b/trio/_core/_io_windows.py index 453adb297..ade794f75 100644 --- a/trio/_core/_io_windows.py +++ b/trio/_core/_io_windows.py @@ -304,13 +304,11 @@ def handle_io(self, timeout): for i in range(received[0]): entry = self._events[i] if entry.lpCompletionKey == CKeys.AFD_POLL: - print(f"AFD completion: {entry.lpOverlapped!r}") lpo = entry.lpOverlapped op = self._afd_ops.pop(lpo) waiters = op.waiters if waiters.current_op is not op: # Stale op, nothing to do - print("stale, nothing to do") pass else: waiters.current_op = None @@ -320,19 +318,15 @@ def handle_io(self, timeout): code = ntdll.RtlNtStatusToDosError(lpo.Internal) raise_winerror(code) flags = op.poll_info.Handles[0].Events - print("flags: {AFDPollFlags(flags)!r}") if waiters.read_task and flags & READABLE_FLAGS: - print("waking reader") _core.reschedule(waiters.read_task) waiters.read_task = None if waiters.write_task and flags & WRITABLE_FLAGS: - print("waking writer") _core.reschedule(waiters.write_task) waiters.write_task = None self._refresh_afd(op.poll_info.Handles[0].Handle) elif entry.lpCompletionKey == CKeys.WAIT_OVERLAPPED: # Regular I/O event, dispatch on lpOverlapped - print(f"waking {reprO(entry.lpOverlapped)}") waiter = self._overlapped_waiters.pop(entry.lpOverlapped) _core.reschedule(waiter) elif entry.lpCompletionKey == CKeys.LATE_CANCEL: @@ -403,7 +397,6 @@ def _register_with_iocp(self, handle, completion_key): def _refresh_afd(self, base_handle): waiters = self._afd_waiters[base_handle] - print(f"refreshing AFD for {base_handle!r}: {waiters!r}") if waiters.current_op is not None: try: _check( @@ -423,11 +416,9 @@ def _refresh_afd(self, base_handle): flags |= WRITABLE_FLAGS if not flags: - print("nothing to do; clearing") del self._afd_waiters[base_handle] else: lpOverlapped = ffi.new("LPOVERLAPPED") - print(f"new AFD: {reprO(lpOverlapped)} for {flags!r} on {base_handle!r}") poll_info = ffi.new("AFD_POLL_INFO *") poll_info.Timeout = 2**63 - 1 # INT64_MAX @@ -470,7 +461,6 @@ async def _afd_poll(self, sock, mode): self._refresh_afd(base_handle) def abort_fn(_): - print(f"_afd_poll({base_handle!r}, {mode!r}) cancelled") setattr(waiters, mode, None) self._refresh_afd(base_handle) return _core.Abort.SUCCEEDED @@ -479,19 +469,11 @@ def abort_fn(_): @_public async def wait_readable(self, sock): - print("wait_readable start") - try: - await self._afd_poll(sock, "read_task") - finally: - print("wait_readable finish") + await self._afd_poll(sock, "read_task") @_public async def wait_writable(self, sock): - print("wait_writable start") - try: - await self._afd_poll(sock, "write_task") - finally: - print("wait_writable finish") + await self._afd_poll(sock, "write_task") @_public def notify_closing(self, handle): @@ -597,7 +579,6 @@ async def _perform_overlapped(self, handle, submit_fn): # feeling today. So we need to check for cancellation manually. await _core.checkpoint_if_cancelled() lpOverlapped = ffi.new("LPOVERLAPPED") - print(f"submitting {reprO(lpOverlapped)}") try: submit_fn(lpOverlapped) except OSError as exc: @@ -631,32 +612,25 @@ def submit_write(lpOverlapped): @_public async def readinto_overlapped(self, handle, buffer, file_offset=0): - print("readinto_overlapped start") t = _core.current_task() - print(f"{t._cancel_points}, {t._schedule_points}") - try: - with ffi.from_buffer(buffer, require_writable=True) as cbuf: + with ffi.from_buffer(buffer, require_writable=True) as cbuf: - def submit_read(lpOverlapped): - offset_fields = lpOverlapped.DUMMYUNIONNAME.DUMMYSTRUCTNAME - offset_fields.Offset = file_offset & 0xffffffff - offset_fields.OffsetHigh = file_offset >> 32 - _check( - kernel32.ReadFile( - _handle(handle), - ffi.cast("LPVOID", cbuf), - len(cbuf), - ffi.NULL, - lpOverlapped, - ) + def submit_read(lpOverlapped): + offset_fields = lpOverlapped.DUMMYUNIONNAME.DUMMYSTRUCTNAME + offset_fields.Offset = file_offset & 0xffffffff + offset_fields.OffsetHigh = file_offset >> 32 + _check( + kernel32.ReadFile( + _handle(handle), + ffi.cast("LPVOID", cbuf), + len(cbuf), + ffi.NULL, + lpOverlapped, ) + ) - lpOverlapped = await self._perform_overlapped(handle, submit_read) - return lpOverlapped.InternalHigh - finally: - import sys - print("readinto_overlapped done", sys.exc_info()) - print(f"{t._cancel_points}, {t._schedule_points}") + lpOverlapped = await self._perform_overlapped(handle, submit_read) + return lpOverlapped.InternalHigh ################################################################ # Raw IOCP operations From c8efc3920c13f88e2ead2f773240a4525653a237 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 25 Oct 2019 02:37:46 -0700 Subject: [PATCH 07/29] yapf --- trio/_core/_io_windows.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/trio/_core/_io_windows.py b/trio/_core/_io_windows.py index ade794f75..dc66bfbd0 100644 --- a/trio/_core/_io_windows.py +++ b/trio/_core/_io_windows.py @@ -482,12 +482,14 @@ def notify_closing(self, handle): if waiters is not None: if waiters.read_task is not None: _core.reschedule( - waiters.read_task, outcome.Error(_core.ClosedResourceError()) + waiters.read_task, + outcome.Error(_core.ClosedResourceError()) ) waiters.read_task = None if waiters.write_task is not None: _core.reschedule( - waiters.write_task, outcome.Error(_core.ClosedResourceError()) + waiters.write_task, + outcome.Error(_core.ClosedResourceError()) ) waiters.write_task = None self._refresh_afd(handle) From 3fdfbe90cdf22bf274a77d888d236024003b61e7 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 25 Oct 2019 02:41:41 -0700 Subject: [PATCH 08/29] Remove a bit more debug code --- trio/_core/_io_windows.py | 1 - trio/_core/tests/test_io.py | 1 - 2 files changed, 2 deletions(-) diff --git a/trio/_core/_io_windows.py b/trio/_core/_io_windows.py index dc66bfbd0..48ee6a195 100644 --- a/trio/_core/_io_windows.py +++ b/trio/_core/_io_windows.py @@ -614,7 +614,6 @@ def submit_write(lpOverlapped): @_public async def readinto_overlapped(self, handle, buffer, file_offset=0): - t = _core.current_task() with ffi.from_buffer(buffer, require_writable=True) as cbuf: def submit_read(lpOverlapped): diff --git a/trio/_core/tests/test_io.py b/trio/_core/tests/test_io.py index ae0d8f7a9..f06f7777e 100644 --- a/trio/_core/tests/test_io.py +++ b/trio/_core/tests/test_io.py @@ -216,7 +216,6 @@ async def w_task(sock): assert record == [] b.send(b"x") await wait_all_tasks_blocked() - await trio.sleep(1) assert record == ["r_task"] drain_socket(b) await wait_all_tasks_blocked() From d6d8fac0adb03ab4b5306f683511353b47bd2b70 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 25 Oct 2019 02:44:37 -0700 Subject: [PATCH 09/29] Move LSP tests up to the top of the azure pipelines order --- azure-pipelines.yml | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index a65ed920a..228e8398e 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -45,6 +45,18 @@ jobs: # 64-bit: https://www.nuget.org/packages/python/ # 32-bit: https://www.nuget.org/packages/pythonx86/ matrix: + # The LSP tests can be super slow for some reason - like + # sometimes it just randomly takes 5 minutes to run the LSP + # installer. So we put them at the top, so they can get started + # earlier. + "with IFS LSP, Python 3.7, 64 bit": + python.version: '3.7.2' + python.pkg: 'python' + lsp: 'http://www.proxifier.com/download/ProxifierSetup.exe' + "with non-IFS LSP, Python 3.7, 64 bit": + python.version: '3.7.2' + python.pkg: 'python' + lsp: 'http://download.pctools.com/mirror/updates/9.0.0.2308-SDavfree-lite_en.exe' "Python 3.5, 32 bit": python.version: '3.5.4' python.pkg: 'pythonx86' @@ -63,14 +75,6 @@ jobs: "Python 3.7, 64 bit": python.version: '3.7.2' python.pkg: 'python' - "with IFS LSP, Python 3.7, 64 bit": - python.version: '3.7.2' - python.pkg: 'python' - lsp: 'http://www.proxifier.com/download/ProxifierSetup.exe' - "with non-IFS LSP, Python 3.7, 64 bit": - python.version: '3.7.2' - python.pkg: 'python' - lsp: 'http://download.pctools.com/mirror/updates/9.0.0.2308-SDavfree-lite_en.exe' steps: - task: NuGetToolInstaller@0 From 1a12bcdedd3e4002102b50a961de46886a3380d0 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 25 Oct 2019 02:48:01 -0700 Subject: [PATCH 10/29] Don't use enum.IntFlag on python 3.5 --- trio/_core/_windows_cffi.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/trio/_core/_windows_cffi.py b/trio/_core/_windows_cffi.py index 95d1238e3..c897ad426 100644 --- a/trio/_core/_windows_cffi.py +++ b/trio/_core/_windows_cffi.py @@ -1,6 +1,10 @@ import cffi import re import enum +try: + from enum import IntFlag +except ImportError: # python 3.5 + from enum import IntEnum as IntFlag ################################################################ # Functions and types @@ -261,7 +265,7 @@ class FileFlags(enum.IntEnum): # https://github.com/piscisaureus/wepoll/blob/master/src/afd.h -class AFDPollFlags(enum.IntFlag): +class AFDPollFlags(IntFlag): AFD_POLL_RECEIVE = 0x0001 AFD_POLL_RECEIVE_EXPEDITED = 0x0002 AFD_POLL_SEND = 0x0004 @@ -276,7 +280,7 @@ class WSAIoctls(enum.IntEnum): SIO_BASE_HANDLE = 0x48000022 -class CompletionModes(enum.IntFlag): +class CompletionModes(IntFlag): FILE_SKIP_COMPLETION_PORT_ON_SUCCESS = 0x1 FILE_SKIP_SET_EVENT_ON_HANDLE = 0x2 From 8c549f32409d1db759009ace9a99c3daae7b40dd Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 25 Oct 2019 02:51:11 -0700 Subject: [PATCH 11/29] Re-run gen_exports.py --- trio/_core/_generated_io_windows.py | 32 ++++++++++++++--------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/trio/_core/_generated_io_windows.py b/trio/_core/_generated_io_windows.py index f0490f990..78dd30db1 100644 --- a/trio/_core/_generated_io_windows.py +++ b/trio/_core/_generated_io_windows.py @@ -6,17 +6,17 @@ -def current_iocp(): +async def wait_readable(sock): locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True try: - return GLOBAL_RUN_CONTEXT.runner.io_manager.current_iocp() + return await GLOBAL_RUN_CONTEXT.runner.io_manager.wait_readable(sock) except AttributeError: raise RuntimeError('must be called from async context') -def register_with_iocp(handle): +async def wait_writable(sock): locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True try: - return GLOBAL_RUN_CONTEXT.runner.io_manager.register_with_iocp(handle) + return await GLOBAL_RUN_CONTEXT.runner.io_manager.wait_writable(sock) except AttributeError: raise RuntimeError('must be called from async context') @@ -27,44 +27,44 @@ def notify_closing(handle): except AttributeError: raise RuntimeError('must be called from async context') -async def wait_overlapped(handle, lpOverlapped): +def register_with_iocp(handle): locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True try: - return await GLOBAL_RUN_CONTEXT.runner.io_manager.wait_overlapped(handle, lpOverlapped) + return GLOBAL_RUN_CONTEXT.runner.io_manager.register_with_iocp(handle) except AttributeError: raise RuntimeError('must be called from async context') -def monitor_completion_key(): +async def wait_overlapped(handle, lpOverlapped): locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True try: - return GLOBAL_RUN_CONTEXT.runner.io_manager.monitor_completion_key() + return await GLOBAL_RUN_CONTEXT.runner.io_manager.wait_overlapped(handle, lpOverlapped) except AttributeError: raise RuntimeError('must be called from async context') -async def wait_readable(sock): +async def write_overlapped(handle, data, file_offset=0): locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True try: - return await GLOBAL_RUN_CONTEXT.runner.io_manager.wait_readable(sock) + return await GLOBAL_RUN_CONTEXT.runner.io_manager.write_overlapped(handle, data, file_offset) except AttributeError: raise RuntimeError('must be called from async context') -async def wait_writable(sock): +async def readinto_overlapped(handle, buffer, file_offset=0): locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True try: - return await GLOBAL_RUN_CONTEXT.runner.io_manager.wait_writable(sock) + return await GLOBAL_RUN_CONTEXT.runner.io_manager.readinto_overlapped(handle, buffer, file_offset) except AttributeError: raise RuntimeError('must be called from async context') -async def write_overlapped(handle, data, file_offset=0): +def current_iocp(): locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True try: - return await GLOBAL_RUN_CONTEXT.runner.io_manager.write_overlapped(handle, data, file_offset) + return GLOBAL_RUN_CONTEXT.runner.io_manager.current_iocp() except AttributeError: raise RuntimeError('must be called from async context') -async def readinto_overlapped(handle, buffer, file_offset=0): +def monitor_completion_key(): locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True try: - return await GLOBAL_RUN_CONTEXT.runner.io_manager.readinto_overlapped(handle, buffer, file_offset) + return GLOBAL_RUN_CONTEXT.runner.io_manager.monitor_completion_key() except AttributeError: raise RuntimeError('must be called from async context') From 1a8ecbe4020ed3ecef579de501c03a84d2ce90bb Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 25 Oct 2019 03:26:18 -0700 Subject: [PATCH 12/29] remove more debug code --- trio/_core/_io_windows.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/trio/_core/_io_windows.py b/trio/_core/_io_windows.py index 48ee6a195..dfffbbec8 100644 --- a/trio/_core/_io_windows.py +++ b/trio/_core/_io_windows.py @@ -90,11 +90,6 @@ class CKeys(enum.IntEnum): USER_DEFINED = 3 # and above -def reprO(lpOverlapped): - return hex(int(ffi.cast("uintptr_t", lpOverlapped))) - #return repr(ffi.cast("void *", lpOverlapped)) - - def _check(success): if not success: raise_winerror() From 45bd65c8d059092b70b84cdaa2a96ceb4b25233f Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 25 Oct 2019 03:26:34 -0700 Subject: [PATCH 13/29] remove stale comment --- trio/_core/tests/test_windows.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/trio/_core/tests/test_windows.py b/trio/_core/tests/test_windows.py index 6eba7bb5b..2fb8a9709 100644 --- a/trio/_core/tests/test_windows.py +++ b/trio/_core/tests/test_windows.py @@ -175,9 +175,3 @@ async def test_too_late_to_cancel(): # fallback completion that was posted when CancelIoEx failed. assert await _core.readinto_overlapped(read_handle, target) == 6 assert target[:6] == b"test2\n" - - -# XX: test setting the iomanager._iocp to something weird to make -# sure that the IOCP thread can send exceptions back to the main thread. -# --> it's not clear if this is actually possible? we just get -# ERROR_INVALID_HANDLE which looks like the IOCP was closed (not an error) From 86eb6effa875549f3d56d463fbc4e122d7222dfe Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 25 Oct 2019 03:26:43 -0700 Subject: [PATCH 14/29] Add test for how notify_closing handles bad input This should improve coverage on Windows backend --- trio/_core/tests/test_io.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/trio/_core/tests/test_io.py b/trio/_core/tests/test_io.py index f06f7777e..8b1730080 100644 --- a/trio/_core/tests/test_io.py +++ b/trio/_core/tests/test_io.py @@ -273,3 +273,19 @@ async def receiver(sock, key): assert results["send_a"] == results["recv_b"] assert results["send_b"] == results["recv_a"] + + +async def test_notify_closing_on_invalid_object(): + # It should either be a no-op (generally on Unix, where we don't know + # which fds are valid), or an OSError (on Windows, where we currently only + # support sockets, so we have to do some validation to figure out whether + # it's a socket or a regular handle). + got_oserror = False + got_no_error = False + try: + trio.hazmat.notify_closing(-1) + except OSError: + got_oserror = True + else: + got_no_error = True + assert got_oserror or got_no_error From 29b9d3019b5f061ad7fb40fdfe691c280ed89328 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 25 Oct 2019 03:29:02 -0700 Subject: [PATCH 15/29] Add some pragma: no cover to errors we think can't happen --- trio/_core/_io_windows.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/trio/_core/_io_windows.py b/trio/_core/_io_windows.py index dfffbbec8..b6b5eb6bf 100644 --- a/trio/_core/_io_windows.py +++ b/trio/_core/_io_windows.py @@ -293,9 +293,9 @@ def handle_io(self, timeout): ) ) except OSError as exc: - if exc.winerror == ErrorCodes.WAIT_TIMEOUT: - return - raise + if exc.winerror != ErrorCodes.WAIT_TIMEOUT: # pragma: no cover + raise + return for i in range(received[0]): entry = self._events[i] if entry.lpCompletionKey == CKeys.AFD_POLL: @@ -400,7 +400,7 @@ def _refresh_afd(self, base_handle): ) ) except OSError as exc: - if exc.winerror != ErrorCodes.ERROR_NOT_FOUND: + if exc.winerror != ErrorCodes.ERROR_NOT_FOUND: # pragma: no cover raise waiters.current_op = None @@ -437,7 +437,7 @@ def _refresh_afd(self, base_handle): ) ) except OSError as exc: - if exc.winerror != ErrorCodes.ERROR_IO_PENDING: + if exc.winerror != ErrorCodes.ERROR_IO_PENDING: # pragma: no cover raise op = AFDPollOp(lpOverlapped, poll_info, waiters) From 630910ce0a7ab985bdbb21fcae071aee9f649a18 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 25 Oct 2019 03:51:43 -0700 Subject: [PATCH 16/29] Rename Windows backend statistics attributes to match epoll backend No need for gratuitous inconsistencies. --- trio/_core/_io_windows.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/trio/_core/_io_windows.py b/trio/_core/_io_windows.py index b6b5eb6bf..1690d5635 100644 --- a/trio/_core/_io_windows.py +++ b/trio/_core/_io_windows.py @@ -200,8 +200,8 @@ class AFDPollOp: @attr.s(slots=True, eq=False, frozen=True) class _WindowsStatistics: - tasks_waiting_readable = attr.ib() - tasks_waiting_writable = attr.ib() + tasks_waiting_read = attr.ib() + tasks_waiting_write = attr.ib() tasks_waiting_overlapped = attr.ib() completion_key_monitors = attr.ib() backend = attr.ib(default="windows") @@ -266,16 +266,16 @@ def __del__(self): self.close() def statistics(self): - tasks_waiting_readable = 0 - tasks_waiting_writable = 0 + tasks_waiting_read = 0 + tasks_waiting_write = 0 for waiter in self._afd_waiters.values(): if waiter.read_task is not None: - tasks_waiting_readable += 1 + tasks_waiting_read += 1 if waiter.write_task is not None: - tasks_waiting_writable += 1 + tasks_waiting_write += 1 return _WindowsStatistics( - tasks_waiting_readable=tasks_waiting_readable, - tasks_waiting_writable=tasks_waiting_writable, + tasks_waiting_read=tasks_waiting_read, + tasks_waiting_write=tasks_waiting_write, tasks_waiting_overlapped=len(self._overlapped_waiters), completion_key_monitors=len(self._completion_key_queues), ) From 2f1519f3694d6a4dd7fdd6a68b16d00ec1f5f943 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 25 Oct 2019 03:56:20 -0700 Subject: [PATCH 17/29] Convert epoll statistics test into generic IO statistics test --- trio/_core/tests/test_epoll.py | 57 ---------------------------------- trio/_core/tests/test_io.py | 46 +++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 57 deletions(-) delete mode 100644 trio/_core/tests/test_epoll.py diff --git a/trio/_core/tests/test_epoll.py b/trio/_core/tests/test_epoll.py deleted file mode 100644 index f123855cf..000000000 --- a/trio/_core/tests/test_epoll.py +++ /dev/null @@ -1,57 +0,0 @@ -# Tests for the special features of the epoll IOManager -# (Tests for common functionality are in test_io) - -# epoll doesn't really have any special features ATM, so this is pretty short. - -import pytest - -import select -import socket as stdlib_socket - -using_epoll = hasattr(select, "epoll") -pytestmark = pytest.mark.skipif(not using_epoll, reason="epoll platforms only") - -from .test_io import fill_socket -from ... import _core -from ...testing import wait_all_tasks_blocked - - -async def test_epoll_statistics(): - a1, b1 = stdlib_socket.socketpair() - a2, b2 = stdlib_socket.socketpair() - a3, b3 = stdlib_socket.socketpair() - for sock in [a1, b1, a2, b2, a3, b3]: - sock.setblocking(False) - with a1, b1, a2, b2, a3, b3: - # let the call_soon_task settle down - await wait_all_tasks_blocked() - - statistics = _core.current_statistics() - print(statistics) - assert statistics.io_statistics.backend == "epoll" - # 1 for call_soon_task - assert statistics.io_statistics.tasks_waiting_read == 1 - assert statistics.io_statistics.tasks_waiting_write == 0 - - # We want: - # - one socket with a writer blocked - # - two sockets with a reader blocked - # - a socket with both blocked - fill_socket(a1) - fill_socket(a3) - async with _core.open_nursery() as nursery: - nursery.start_soon(_core.wait_writable, a1) - nursery.start_soon(_core.wait_readable, a2) - nursery.start_soon(_core.wait_readable, b2) - nursery.start_soon(_core.wait_writable, a3) - nursery.start_soon(_core.wait_readable, a3) - - await wait_all_tasks_blocked() - - statistics = _core.current_statistics() - print(statistics) - # 1 for call_soon_task - assert statistics.io_statistics.tasks_waiting_read == 4 - assert statistics.io_statistics.tasks_waiting_write == 2 - - nursery.cancel_scope.cancel() diff --git a/trio/_core/tests/test_io.py b/trio/_core/tests/test_io.py index 8b1730080..22734fc26 100644 --- a/trio/_core/tests/test_io.py +++ b/trio/_core/tests/test_io.py @@ -289,3 +289,49 @@ async def test_notify_closing_on_invalid_object(): else: got_no_error = True assert got_oserror or got_no_error + + +async def test_io_manager_statistics(): + def check(stats, expected_readers, expected_writers): + if stats.backend in ["epoll", "windows"]: + assert stats.tasks_waiting_read == expected_readers + assert stats.tasks_waiting_write == expected_writers + else: + stats.backend == "kqueue" + assert stats.tasks_waiting == expected_readers + expected_writers + + a1, b1 = stdlib_socket.socketpair() + a2, b2 = stdlib_socket.socketpair() + a3, b3 = stdlib_socket.socketpair() + for sock in [a1, b1, a2, b2, a3, b3]: + sock.setblocking(False) + with a1, b1, a2, b2, a3, b3: + # let the call_soon_task settle down + await wait_all_tasks_blocked() + + statistics = _core.current_statistics() + print(statistics) + # 1 for call_soon_task + check(statistics.io_statistics, 1, 0) + + # We want: + # - one socket with a writer blocked + # - two sockets with a reader blocked + # - a socket with both blocked + fill_socket(a1) + fill_socket(a3) + async with _core.open_nursery() as nursery: + nursery.start_soon(_core.wait_writable, a1) + nursery.start_soon(_core.wait_readable, a2) + nursery.start_soon(_core.wait_readable, b2) + nursery.start_soon(_core.wait_writable, a3) + nursery.start_soon(_core.wait_readable, a3) + + await wait_all_tasks_blocked() + + statistics = _core.current_statistics() + print(statistics) + # 1 for call_soon_task + check(statistics.io_statistics, 4, 2) + + nursery.cancel_scope.cancel() From c5585748913d1198fd9563c553ead0bd8eb93716 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sun, 27 Oct 2019 00:56:02 -0700 Subject: [PATCH 18/29] Add notes-to-self/ to document the weird simultaneous-poll bug Mostly to convince myself I wasn't imagining it. --- notes-to-self/afd-lab.py | 173 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 173 insertions(+) create mode 100644 notes-to-self/afd-lab.py diff --git a/notes-to-self/afd-lab.py b/notes-to-self/afd-lab.py new file mode 100644 index 000000000..e011b2e38 --- /dev/null +++ b/notes-to-self/afd-lab.py @@ -0,0 +1,173 @@ +# A little script to experiment with AFD polling. +# +# This cheats and uses a bunch of internal APIs. Don't follow its example. The +# point is just to experiment with random junk that probably won't work, so we +# can figure out what we actually do want to do internally. + +# Currently this demonstrates what seems to be a weird bug in the Windows +# kernel. If you: +# +# 0. Set up a socket so that it's not writable. +# 1. Submit a SEND poll operation. +# 2. Submit a RECEIVE poll operation. +# 3. Send some data through the socket, to trigger the RECEIVE. +# +# ...then the SEND poll operation completes with the RECEIVE flag set. +# +# This script's output on my machine: +# +# -- Iteration start -- +# Starting a poll for +# Starting a poll for +# Sending another byte +# Poll for : got +# Poll for : Cancelled() +# -- Iteration start -- +# Starting a poll for +# Starting a poll for +# Poll for : got Sending another byte +# Poll for : got +# +# So what we're seeing is: +# +# On the first iteration, where there's initially no data in the socket, the +# SEND completes with the RECEIVE flag set, and the RECEIVE operation doesn't +# return at all, until we cancel it. +# +# On the second iteration, there's already data sitting in the socket from the +# last loop. This time, the RECEIVE returns immediately with the RECEIVE flag +# set, which makes sense -- when starting a RECEIVE poll, it does an immediate +# check to see if there's data already, and if so it does an early exit. But +# the bizarre thing is, when we then send *another* byte of data, the SEND +# operation wakes up with the RECEIVE flag set. +# +# Why is this bizarre? Let me count the ways: +# +# - The SEND operation should never return RECEIVE. +# +# - If it does insist on returning RECEIVE, it should do it immediately, since +# there is already data to receive. But it doesn't. +# +# - And then when we send data into a socket that already has data in it, that +# shouldn't have any effect at all! But instead it wakes up the SEND. +# +# - Also, the RECEIVE call did an early check for data and exited out +# immediately, without going through the whole "register a callback to +# be notified when data arrives" dance. So even if you do have some bug +# in tracking which operations should be woken on which state transitions, +# there's no reason this operation would even touch that tracking data. Yet, +# if we take out the brief RECEIVE, then the SEND *doesn't* wake up. +# +# - Also, if I move the send() call up above the loop, so that there's already +# data in the socket when we start our first iteration, then you would think +# that would just make the first iteration act like it was the second +# iteration. But it doesn't. Instead it makes all the weird behavior +# disappear entirely. +# +# "What do we know … of the world and the universe about us? Our means of +# receiving impressions are absurdly few, and our notions of surrounding +# objects infinitely narrow. We see things only as we are constructed to see +# them, and can gain no idea of their absolute nature. With five feeble senses +# we pretend to comprehend the boundlessly complex cosmos, yet other beings +# with wider, stronger, or different range of senses might not only see very +# differently the things we see, but might see and study whole worlds of +# matter, energy, and life which lie close at hand yet can never be detected +# with the senses we have." + +import sys +import os.path +sys.path.insert(0, os.path.abspath(os.path.dirname(__file__) + r"\..")) + +import trio +print(trio.__file__) +import trio.testing +import socket + +from trio._core._windows_cffi import ( + ffi, kernel32, AFDPollFlags, IoControlCodes, ErrorCodes +) +from trio._core._io_windows import ( + _get_base_socket, _afd_helper_handle, _check +) + +class AFDLab: + def __init__(self): + self._afd = _afd_helper_handle() + trio.hazmat.register_with_iocp(self._afd) + + async def afd_poll(self, sock, flags, *, exclusive=0): + print(f"Starting a poll for {flags!r}") + lpOverlapped = ffi.new("LPOVERLAPPED") + poll_info = ffi.new("AFD_POLL_INFO *") + poll_info.Timeout = 2**63 - 1 # INT64_MAX + poll_info.NumberOfHandles = 1 + poll_info.Exclusive = exclusive + poll_info.Handles[0].Handle = _get_base_socket(sock) + poll_info.Handles[0].Status = 0 + poll_info.Handles[0].Events = flags + + try: + _check( + kernel32.DeviceIoControl( + self._afd, + IoControlCodes.IOCTL_AFD_POLL, + poll_info, + ffi.sizeof("AFD_POLL_INFO"), + poll_info, + ffi.sizeof("AFD_POLL_INFO"), + ffi.NULL, + lpOverlapped, + ) + ) + except OSError as exc: + if exc.winerror != ErrorCodes.ERROR_IO_PENDING: # pragma: no cover + raise + + try: + await trio.hazmat.wait_overlapped(self._afd, lpOverlapped) + except: + print(f"Poll for {flags!r}: {sys.exc_info()[1]!r}") + raise + out_flags = AFDPollFlags(poll_info.Handles[0].Events) + print(f"Poll for {flags!r}: got {out_flags!r}") + return out_flags + + +def fill_socket(sock): + try: + while True: + sock.send(b"x" * 65536) + except BlockingIOError: + pass + + +async def main(): + afdlab = AFDLab() + + a, b = socket.socketpair() + a.setblocking(False) + b.setblocking(False) + + fill_socket(a) + + while True: + print("-- Iteration start --") + async with trio.open_nursery() as nursery: + nursery.start_soon( + afdlab.afd_poll, + a, + AFDPollFlags.AFD_POLL_SEND, + ) + await trio.sleep(2) + nursery.start_soon( + afdlab.afd_poll, + a, + AFDPollFlags.AFD_POLL_RECEIVE, + ) + await trio.sleep(2) + print("Sending another byte") + b.send(b"x") + await trio.sleep(2) + nursery.cancel_scope.cancel() + +trio.run(main) From 0320a8bdb833d64951cc57f6cdb884799bbb9ad7 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sun, 27 Oct 2019 00:58:23 -0700 Subject: [PATCH 19/29] Make our set of poll flags more complete --- trio/_core/_windows_cffi.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/trio/_core/_windows_cffi.py b/trio/_core/_windows_cffi.py index c897ad426..2a82135b0 100644 --- a/trio/_core/_windows_cffi.py +++ b/trio/_core/_windows_cffi.py @@ -264,16 +264,24 @@ class FileFlags(enum.IntEnum): TRUNCATE_EXISTING = 5 -# https://github.com/piscisaureus/wepoll/blob/master/src/afd.h class AFDPollFlags(IntFlag): + # These are drawn from a combination of: + # https://github.com/piscisaureus/wepoll/blob/master/src/afd.h + # https://github.com/reactos/reactos/blob/master/sdk/include/reactos/drivers/afd/shared.h AFD_POLL_RECEIVE = 0x0001 - AFD_POLL_RECEIVE_EXPEDITED = 0x0002 + AFD_POLL_RECEIVE_EXPEDITED = 0x0002 # OOB/urgent data AFD_POLL_SEND = 0x0004 - AFD_POLL_DISCONNECT = 0x0008 - AFD_POLL_ABORT = 0x0010 - AFD_POLL_LOCAL_CLOSE = 0x0020 - AFD_POLL_ACCEPT = 0x0080 - AFD_POLL_CONNECT_FAIL = 0x0100 + AFD_POLL_DISCONNECT = 0x0008 # received EOF (FIN) + AFD_POLL_ABORT = 0x0010 # received RST + AFD_POLL_LOCAL_CLOSE = 0x0020 # local socket object closed + AFD_POLL_CONNECT = 0x0040 # socket is successfully connected + AFD_POLL_ACCEPT = 0x0080 # you can call accept on this socket + AFD_POLL_CONNECT_FAIL = 0x0100 # connect() terminated unsuccessfully + # See WSAEventSelect docs for more details on these four: + AFD_POLL_QOS = 0x0200 + AFD_POLL_GROUP_QOS = 0x0400 + AFD_POLL_ROUTING_INTERFACE_CHANGE = 0x0800 + AFD_POLL_EVENT_ADDRESS_LIST_CHANGE = 0x1000 class WSAIoctls(enum.IntEnum): From a28dfa074ac29b4195593057bf94278d655adf7f Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sun, 27 Oct 2019 00:58:41 -0700 Subject: [PATCH 20/29] Better comments --- trio/_core/_io_windows.py | 185 +++++++++++++++++++++++++++----------- 1 file changed, 133 insertions(+), 52 deletions(-) diff --git a/trio/_core/_io_windows.py b/trio/_core/_io_windows.py index 1690d5635..de48e76c9 100644 --- a/trio/_core/_io_windows.py +++ b/trio/_core/_io_windows.py @@ -32,57 +32,136 @@ # for discussion. This now just has some lower-level notes: # # How IOCP fits together: -# - each notification event (OVERLAPPED_ENTRY) contains: -# - the "completion key" (an integer) -# - pointer to OVERLAPPED -# - dwNumberOfBytesTransferred -# - and in addition, for regular I/O, the OVERLAPPED structure gets filled in -# with: -# - result code (named "Internal") -# - number of bytes transferred (named "InternalHigh"); redundant with -# dwNumberOfBytesTransferred *if* this is a regular I/O event. +# +# The general model is that you call some function like ReadFile or WriteFile +# to tell the kernel that you want it to perform some operation, and the +# kernel goes off and does that in the background, then at some point later it +# sends you a notification that the operation is complete. There are some more +# exotic APIs that don't quite fit this pattern, but most APIs do. +# +# Each background operation is tracked using an OVERLAPPED struct, that +# uniquely identifies that particular operation. +# +# An "IOCP" (or "I/O completion port") is an object that lets the kernel send +# us these notifications -- basically it's just a kernel->userspace queue. +# +# Each IOCP notification is represented by an OVERLAPPED_ENTRY struct, which +# contains 3 fields: +# - The "completion key". This is an opaque integer that we pick, and use +# however is convenient. +# - pointer to the OVERLAPPED struct for the completed operation. +# - dwNumberOfBytesTransferred (an integer). +# +# And in addition, for regular I/O, the OVERLAPPED structure gets filled in +# with: +# - result code (named "Internal") +# - number of bytes transferred (named "InternalHigh"); usually redundant +# with dwNumberOfBytesTransferred. # # There are also some other entries in OVERLAPPED which only matter on input: # - Offset and OffsetHigh which are inputs to {Read,Write}File and # otherwise always zero # - hEvent which is for if you aren't using IOCP; we always set it to zero. # -# PostQueuedCompletionStatus: lets you set the 3 magic scalars to whatever you -# want. +# That describes the usual pattern for operations and the usual meaning of +# these struct fields, but really these are just some arbitrary chunks of +# bytes that get passed back and forth, so some operations like to overload +# them to mean something else. +# +# You can also directly queue an OVERLAPPED_ENTRY object to an IOCP by calling +# PostQueuedCompletionStatus. When you use this you get to set all the +# OVERLAPPED_ENTRY fields to arbitrary values. +# +# You can request to cancel any operation if you know which handle it was +# issued on + the OVERLAPPED struct that identifies it (via CancelIoEx). This +# request might fail because the operation has already completed, or it might +# be queued to happen in the background, so you only find out whether it +# succeeded or failed later, when we get back the notification for the +# operation being complete. +# +# There are three types of operations that we support: +# +# == Regular I/O operations on handles (e.g. files or named pipes) == +# +# Implemented by: register_with_iocp, wait_overlapped +# +# To use these, you have to register the handle with your IOCP first. Once +# it's registered, any operations on that handle will automatically send +# completion events to that IOCP, with a completion key that you specify *when +# the handle is registered* (so you can't use different completion keys for +# different operations). # -# Regular I/O events: these are identified by the pointer-to-OVERLAPPED. The -# "completion key" is a property of a particular handle being operated on that -# is set when associating the handle with the IOCP. We don't use it, so should -# always set it to zero. +# We give these two dedicated completion keys: CKeys.WAIT_OVERLAPPED for +# regular operations, and CKeys.LATE_CANCEL that's used to make +# wait_overlapped cancellable even if the user forgot to call +# register_with_iocp. The problem here is that after we request the cancel, +# wait_overlapped keeps blocking until it sees the completion notification... +# but if the user forgot to register_with_iocp, then the completion will never +# come, so the cancellation will never resolve. To avoid this, whenever we try +# to cancel an I/O operation and the cancellation fails, we use +# PostQueuedCompletionStatus to send a CKeys.LATE_CANCEL notification. If this +# arrives before the real completion, we assume the user forgot to call +# register_with_iocp on their handle, and raise an error accordingly. # -# Socket state notifications: the public APIs that windows provides for this -# are all really awkward and don't integrate with IOCP. So we drop down to a -# lower level, and talk directly to the socket device driver in the kernel, -# which is called "AFD". The magic IOCTL_AFD_POLL operation lets us request a -# regular IOCP notification when a given socket enters a given state, which is -# exactly what we want. Unfortunately, this is a totally undocumented internal -# API. Fortunately libuv also does this, so we can be pretty confident that MS -# won't break it on us, and there is a *little* bit of information out there -# if you go digging. +# == Socket state notifications == # -# Job notifications: effectively uses PostQueuedCompletionStatus, the -# "completion key" is used to identify which job we're talking about, and the -# other two scalars are overloaded to contain arbitrary data. +# Implemented by: wait_readable, wait_writable # -# So our strategy is: -# - when binding handles to the IOCP, we always set the completion key to 0. -# when dispatching received events, when the completion key is 0 we dispatch -# based on lpOverlapped -# - when we try to cancel an I/O operation and the cancellation fails, -# we post a completion with completion key 1; if this arrives before the -# real completion (with completion key 0) we assume the user forgot to -# call register_with_iocp on their handle, and raise an error accordingly -# (without this logic we'd hang forever uninterruptibly waiting for the -# completion that never arrives) -# - other completion keys are available for user use - - -# The completion keys we use +# The public APIs that windows provides for this are all really awkward and +# don't integrate with IOCP. So we drop down to a lower level, and talk +# directly to the socket device driver in the kernel, which is called "AFD". +# Unfortunately, this is a totally undocumented internal API. Fortunately +# libuv also does this, so we can be pretty confident that MS won't break it +# on us, and there is a *little* bit of information out there if you go +# digging. +# +# Basically: we open a magic file that refers to the AFD driver, register the +# magic file with our IOCP, and then we can issue regular overlapped I/O +# operations on that handle. Specifically, the operation we use is called +# IOCTL_AFD_POLL, which lets us pass in a buffer describing which events we're +# interested in on a given socket (readable, writable, etc.). Later, when the +# operation completes, the kernel rewrites the buffer we passed in to record +# which events happened, and uses IOCP as normal to notify us that this +# operation has completed. +# +# There's some trickiness required to handle multiple tasks that are waiting +# on the same socket simultaneously, so instead of using the wait_overlapped +# machinery, we have some dedicated code to handle these operations, and a +# dedicated completion key CKeys.AFD_POLL. +# +# Sources of information: +# - https://github.com/python-trio/trio/issues/52 +# - Wepoll: https://github.com/piscisaureus/wepoll/ +# - libuv: https://github.com/libuv/libuv/ +# - ReactOS: https://github.com/reactos/reactos/ +# - Ancient leaked copies of the Windows NT and Winsock source code: +# https://github.com/pustladi/Windows-2000/blob/661d000d50637ed6fab2329d30e31775046588a9/private/net/sockets/winsock2/wsp/msafd/select.c#L59-L655 +# https://github.com/metoo10987/WinNT4/blob/f5c14e6b42c8f45c20fe88d14c61f9d6e0386b8e/private/ntos/afd/poll.c#L68-L707 +# - The WSAEventSelect docs (this exposes a finer-grained set of events than +# select()) +# +# +# == Everything else == +# +# There are also some weirder APIs for interacting with IOCP. For example, the +# "Job" API lets you specify an IOCP handle and "completion key", and then in +# the future whenever certain events happen it sends uses IOCP to send a +# notification. These notifications don't correspond to any particular +# operation; they're just spontaneous messages you get. The +# "dwNumberOfBytesTransferred" field gets repurposed to carry an identifier +# for the message type (e.g. JOB_OBJECT_MSG_EXIT_PROCESS), and the +# "lpOverlapped" field gets repurposed to carry some arbitrary data that +# depends on the message type (e.g. the pid of the process that exited). +# +# To handle these, we have monitor_completion_key, where we hand out an +# unassigned completion key, let users set it up however they want, and then +# get any events that arrive on that key. +# +# (Note: monitor_completion_key is not documented or fully baked; expect it to +# change in the future.) + + +# Our completion keys class CKeys(enum.IntEnum): AFD_POLL = 0 WAIT_OVERLAPPED = 1 @@ -129,6 +208,10 @@ def _afd_helper_handle(): # References: # https://blogs.msdn.microsoft.com/jeremykuhne/2016/05/02/dos-to-nt-a-paths-journey/ # https://stackoverflow.com/a/21704022 + # + # I'm actually not sure what the \Trio part at the end of the path does. + # Wepoll uses \Device\Afd\Wepoll, so I just copied them. (I'm guessing it + # might be visible in some debug tools, and is otherwise arbitrary?) rawname = r"\\.\GLOBALROOT\Device\Afd\Trio".encode("utf-16le") + b"\0\0" rawname_buf = ffi.from_buffer(rawname) @@ -518,17 +601,15 @@ def abort(raise_cancel_): except OSError as exc: if exc.winerror == ErrorCodes.ERROR_NOT_FOUND: # Too late to cancel. If this happens because the - # operation is already completed, we don't need to - # do anything; presumably the IOCP thread will be - # reporting back about that completion soon. But - # another possibility is that the operation was - # performed on a handle that wasn't registered - # with our IOCP (ie, the user forgot to call - # register_with_iocp), in which case we're just - # never going to see the completion. To avoid an - # uncancellable infinite sleep in the latter case, - # we'll PostQueuedCompletionStatus here, and if - # our post arrives before the original completion + # operation is already completed, we don't need to do + # anything; we'll get a notification of that completion + # soon. But another possibility is that the operation was + # performed on a handle that wasn't registered with our + # IOCP (ie, the user forgot to call register_with_iocp), + # in which case we're just never going to see the + # completion. To avoid an uncancellable infinite sleep in + # the latter case, we'll PostQueuedCompletionStatus here, + # and if our post arrives before the original completion # does, we'll assume the handle wasn't registered. _check( kernel32.PostQueuedCompletionStatus( From 9140391f4fef1f74e43bf1d02dae134a79d51259 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sun, 27 Oct 2019 03:04:24 -0700 Subject: [PATCH 21/29] Let's be paranoid and double-check for weird broken network configs --- trio/_core/_io_windows.py | 22 ++++++++++++++++++++-- trio/_core/_windows_cffi.py | 1 + 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/trio/_core/_io_windows.py b/trio/_core/_io_windows.py index de48e76c9..9892c2389 100644 --- a/trio/_core/_io_windows.py +++ b/trio/_core/_io_windows.py @@ -1,6 +1,7 @@ import itertools from contextlib import contextmanager import enum +import socket import outcome import attr @@ -175,14 +176,14 @@ def _check(success): return success -def _get_base_socket(sock): +def _get_base_socket(sock, *, which=WSAIoctls.SIO_BASE_HANDLE): if hasattr(sock, "fileno"): sock = sock.fileno() base_ptr = ffi.new("HANDLE *") out_size = ffi.new("DWORD *") failed = ws2_32.WSAIoctl( ffi.cast("SOCKET", sock), - WSAIoctls.SIO_BASE_HANDLE, + which, ffi.NULL, 0, base_ptr, @@ -333,6 +334,23 @@ def __init__(self): self._completion_key_queues = {} self._completion_key_counter = itertools.count(CKeys.USER_DEFINED) + with socket.socket() as s: + # LSPs can't override this. + base_handle = _get_base_socket(s, which=WSAIoctls.SIO_BASE_HANDLE) + # LSPs can in theory override this, but we believe that it never + # actually happens in the wild. + select_handle = _get_base_socket( + s, which=WSAIoctls.SIO_BSP_HANDLE_SELECT + ) + if base_handle != select_handle: # pragma: no cover + raise RuntimeError( + "Unexpected network configuration detected. " + "Please file a bug at " + "https://github.com/python-trio/trio/issues/new, " + "and include the output of running: " + "netsh winsock show catalog" + ) + def close(self): try: if self._iocp is not None: diff --git a/trio/_core/_windows_cffi.py b/trio/_core/_windows_cffi.py index 2a82135b0..e2b95a911 100644 --- a/trio/_core/_windows_cffi.py +++ b/trio/_core/_windows_cffi.py @@ -286,6 +286,7 @@ class AFDPollFlags(IntFlag): class WSAIoctls(enum.IntEnum): SIO_BASE_HANDLE = 0x48000022 + SIO_BSP_HANDLE_SELECT = 0x4800001C class CompletionModes(IntFlag): From c6047160d0e5faa62f6cb017f233c883b2da1566 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sun, 27 Oct 2019 03:11:59 -0700 Subject: [PATCH 22/29] Minor cleanups to test_io_manager_statistics --- trio/_core/tests/test_io.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/trio/_core/tests/test_io.py b/trio/_core/tests/test_io.py index 22734fc26..b2fd30166 100644 --- a/trio/_core/tests/test_io.py +++ b/trio/_core/tests/test_io.py @@ -292,13 +292,16 @@ async def test_notify_closing_on_invalid_object(): async def test_io_manager_statistics(): - def check(stats, expected_readers, expected_writers): - if stats.backend in ["epoll", "windows"]: - assert stats.tasks_waiting_read == expected_readers - assert stats.tasks_waiting_write == expected_writers + def check(*, expected_readers, expected_writers): + statistics = _core.current_statistics() + print(statistics) + iostats = statistics.io_statistics + if iostats.backend in ["epoll", "windows"]: + assert iostats.tasks_waiting_read == expected_readers + assert iostats.tasks_waiting_write == expected_writers else: - stats.backend == "kqueue" - assert stats.tasks_waiting == expected_readers + expected_writers + assert iostats.backend == "kqueue" + assert iostats.tasks_waiting == expected_readers + expected_writers a1, b1 = stdlib_socket.socketpair() a2, b2 = stdlib_socket.socketpair() @@ -309,10 +312,8 @@ def check(stats, expected_readers, expected_writers): # let the call_soon_task settle down await wait_all_tasks_blocked() - statistics = _core.current_statistics() - print(statistics) # 1 for call_soon_task - check(statistics.io_statistics, 1, 0) + check(expected_readers=1, expected_writers=0) # We want: # - one socket with a writer blocked @@ -329,9 +330,10 @@ def check(stats, expected_readers, expected_writers): await wait_all_tasks_blocked() - statistics = _core.current_statistics() - print(statistics) - # 1 for call_soon_task - check(statistics.io_statistics, 4, 2) + # +1 for call_soon_task + check(expected_readers=3 + 1, expected_writers=2) nursery.cancel_scope.cancel() + + # 1 for call_soon_task + check(expected_readers=1, expected_writers=0) From e4da787b43cff7352d13358025380423706b1f92 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sun, 27 Oct 2019 03:26:33 -0700 Subject: [PATCH 23/29] Add explicit test that wait_* error out properly on invalid values --- trio/_core/tests/test_io.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/trio/_core/tests/test_io.py b/trio/_core/tests/test_io.py index b2fd30166..466558d8a 100644 --- a/trio/_core/tests/test_io.py +++ b/trio/_core/tests/test_io.py @@ -3,6 +3,7 @@ import socket as stdlib_socket import select import random +import errno from ... import _core from ...testing import wait_all_tasks_blocked, Sequencer, assert_checkpoints @@ -291,6 +292,20 @@ async def test_notify_closing_on_invalid_object(): assert got_oserror or got_no_error +async def test_wait_on_invalid_object(): + # We definitely want to raise an error everywhere if you pass in an + # invalid fd to wait_* + for wait in [trio.hazmat.wait_readable, trio.hazmat.wait_writable]: + with stdlib_socket.socket() as s: + fileno = s.fileno() + # We just closed the socket and don't do anything else in between, so + # we can be confident that the fileno hasn't be reassigned. + with pytest.raises(OSError) as excinfo: + await wait(fileno) + exc = excinfo.value + assert exc.errno == errno.EBADF or exc.winerror == errno.ENOTSOCK + + async def test_io_manager_statistics(): def check(*, expected_readers, expected_writers): statistics = _core.current_statistics() From b2b84d0ea217824deac4ea2531062abebe716a78 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sun, 27 Oct 2019 03:47:24 -0700 Subject: [PATCH 24/29] Apparently you get LOCAL_CLOSE notifications whether you want them or not --- trio/_core/_io_windows.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/trio/_core/_io_windows.py b/trio/_core/_io_windows.py index 9892c2389..4576d5752 100644 --- a/trio/_core/_io_windows.py +++ b/trio/_core/_io_windows.py @@ -233,24 +233,30 @@ def _afd_helper_handle(): # AFD_POLL has a finer-grained set of events than other APIs. We collapse them # down into Unix-style "readable" and "writable". # -# There's also a AFD_POLL_LOCAL_CLOSE that we could wait on, to potentially -# catch some cases where someone forgot to call notify_closing. But it's not -# reliable – e.g. if the socket has been dup'ed, then closing one of the -# handles doesn't trigger the event – and it's not available on Unix-like -# platforms. So it seems like including it here would be more likely to mask -# subtle bugs than to actually help anything. +# Note: AFD_POLL_LOCAL_CLOSE isn't a reliable substitute for notify_closing(), +# because even if the user closes the socket *handle*, the socket *object* +# could still remain open, e.g. if the socket was dup'ed (possibly into +# another process). Explicitly calling notify_closing() guarantees that +# everyone waiting on the *handle* wakes up, which is what you'd expect. +# +# However, we can't avoid getting LOCAL_CLOSE notifications -- the kernel +# delivers them whether we ask for them or not -- so better to include them +# here for documentation, and so that when we check (delivered & requested) we +# get a match. READABLE_FLAGS = ( AFDPollFlags.AFD_POLL_RECEIVE | AFDPollFlags.AFD_POLL_ACCEPT | AFDPollFlags.AFD_POLL_DISCONNECT # other side sent an EOF | AFDPollFlags.AFD_POLL_ABORT + | AFDPollFlags.AFD_POLL_LOCAL_CLOSE ) WRITABLE_FLAGS = ( AFDPollFlags.AFD_POLL_SEND | AFDPollFlags.AFD_POLL_CONNECT_FAIL | AFDPollFlags.AFD_POLL_ABORT + | AFDPollFlags.AFD_POLL_LOCAL_CLOSE ) From 2ce3bcc6ca9a95ecad5393dc549bbf67dce1f121 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sun, 27 Oct 2019 16:26:05 -0700 Subject: [PATCH 25/29] Rewrite newsfragment to explain the change better --- newsfragments/52.feature.rst | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/newsfragments/52.feature.rst b/newsfragments/52.feature.rst index 69ba28ade..79252dca4 100644 --- a/newsfragments/52.feature.rst +++ b/newsfragments/52.feature.rst @@ -1,4 +1,16 @@ -Trio's Windows backend was rewritten to use IOCP exclusively, instead -of a hybrid of ``select`` + IOCP. This should make it much faster and -more scalable. It also simplifies the code internally, and paves the -way for future improvements. +On Windows, the `IOCP subsystem +`__ +is generally the best way to implement async I/O operations – but it's +historically been weak at providing ``select``\-style readiness +notifications, like `trio.hazmat.wait_readable` and +`~trio.hazmat.wait_writable`. We aren't willing to give those up, so +Trio's Windows backend used to use a hybrid of ``select`` + IOCP. This +was complex, slow, and had `limited scalability +`__. + +Fortunately, we found a way to implement ``wait_*`` with IOCP, so +Trio's Windows backend has been completely rewritten, and now uses +IOCP exclusively. As a user, the only difference you should notice is +that Trio should now be faster and more scalable on Windows. This also +simplified the code internally, which should allow for more +improvements in the future. From f9c3b548f5f633412727898ef2dab92f29d15f29 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Tue, 29 Oct 2019 11:53:44 -0700 Subject: [PATCH 26/29] Wording tweak --- newsfragments/52.feature.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/newsfragments/52.feature.rst b/newsfragments/52.feature.rst index 79252dca4..7da9fc8e3 100644 --- a/newsfragments/52.feature.rst +++ b/newsfragments/52.feature.rst @@ -4,8 +4,8 @@ is generally the best way to implement async I/O operations – but it's historically been weak at providing ``select``\-style readiness notifications, like `trio.hazmat.wait_readable` and `~trio.hazmat.wait_writable`. We aren't willing to give those up, so -Trio's Windows backend used to use a hybrid of ``select`` + IOCP. This -was complex, slow, and had `limited scalability +previously Trio's Windows backend used a hybrid of ``select`` + IOCP. +This was complex, slow, and had `limited scalability `__. Fortunately, we found a way to implement ``wait_*`` with IOCP, so From 7818f589326220c21f9b15cb9f7d257c6e14d44b Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Tue, 29 Oct 2019 12:00:10 -0700 Subject: [PATCH 27/29] Update comments to clarify the impact of the AFD_IOCTL_POLL bug --- notes-to-self/afd-lab.py | 3 +++ trio/_core/_io_windows.py | 12 ++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/notes-to-self/afd-lab.py b/notes-to-self/afd-lab.py index e011b2e38..58a6c2279 100644 --- a/notes-to-self/afd-lab.py +++ b/notes-to-self/afd-lab.py @@ -14,6 +14,9 @@ # # ...then the SEND poll operation completes with the RECEIVE flag set. # +# (This bug is why our Windows backend jumps through hoops to avoid ever +# issuing multiple polls simultaneously for the same socket.) +# # This script's output on my machine: # # -- Iteration start -- diff --git a/trio/_core/_io_windows.py b/trio/_core/_io_windows.py index 4576d5752..c4a5d176e 100644 --- a/trio/_core/_io_windows.py +++ b/trio/_core/_io_windows.py @@ -125,9 +125,12 @@ # which events happened, and uses IOCP as normal to notify us that this # operation has completed. # -# There's some trickiness required to handle multiple tasks that are waiting -# on the same socket simultaneously, so instead of using the wait_overlapped -# machinery, we have some dedicated code to handle these operations, and a +# Unfortunately, the Windows kernel seems to have bugs if you try to issue +# multiple simultaneous IOCTL_AFD_POLL operations on the same socket (see +# notes-to-self/afd-lab.py). So if a user calls wait_readable and +# wait_writable at the same time, we have to combine those into a single +# IOCTL_AFD_POLL. This means we can't just use the wait_overlapped machinery. +# Instead we have some dedicated code to handle these operations, and a # dedicated completion key CKeys.AFD_POLL. # # Sources of information: @@ -139,7 +142,8 @@ # https://github.com/pustladi/Windows-2000/blob/661d000d50637ed6fab2329d30e31775046588a9/private/net/sockets/winsock2/wsp/msafd/select.c#L59-L655 # https://github.com/metoo10987/WinNT4/blob/f5c14e6b42c8f45c20fe88d14c61f9d6e0386b8e/private/ntos/afd/poll.c#L68-L707 # - The WSAEventSelect docs (this exposes a finer-grained set of events than -# select()) +# select(), so if you squint you can treat it as a source of information on +# the fine-grained AFD poll types) # # # == Everything else == From ef2d637575ae75faacb46e414a58bc107ba99916 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 31 Oct 2019 00:17:04 -0700 Subject: [PATCH 28/29] Add script to check how wait_readable scales with the number of sockets --- notes-to-self/socket-scaling.py | 64 +++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 notes-to-self/socket-scaling.py diff --git a/notes-to-self/socket-scaling.py b/notes-to-self/socket-scaling.py new file mode 100644 index 000000000..61527e255 --- /dev/null +++ b/notes-to-self/socket-scaling.py @@ -0,0 +1,64 @@ +# Little script to measure how wait_readable scales with the number of +# sockets. We look at three key measurements: +# +# - cost of issuing wait_readable +# - cost of running the scheduler, while wait_readables are blocked in the +# background +# - cost of cancelling wait_readable +# +# On Linux and macOS, these all appear to be ~O(1), as we'd expect. +# +# On Windows: with the old 'select'-based loop, the cost of scheduling grew +# with the number of outstanding sockets, which was bad. +# +# With the new IOCP-based loop, the cost of scheduling is constant, which is +# good. But, we find that the cost of cancelling a single wait_readable +# appears to grow like O(n**2) or so in the number of outstanding +# wait_readables. This is bad -- it means that cancelling all of the +# outstanding operations here is something like O(n**3)! To avoid this, we +# should consider creating multiple AFD helper handles and distributing the +# AFD_POLL operations across them. +# +# To run this on Unix systems, you'll probably first have to run: +# +# ulimit -n 31000 +# +# or similar. + +import time +import trio +import trio.testing +import socket + +async def main(): + for total in [10, 100, 500, 1_000, 10_000, 20_000, 30_000]: + def pt(desc, *, count=total, item="socket"): + nonlocal last_time + now = time.perf_counter() + total_ms = (now - last_time) * 1000 + per_us = total_ms * 1000 / count + print(f"{desc}: {total_ms:.2f} ms total, {per_us:.2f} µs/{item}") + last_time = now + + print(f"\n-- {total} sockets --") + last_time = time.perf_counter() + sockets = [] + for _ in range(total // 2): + a, b = socket.socketpair() + sockets += [a, b] + pt("socket creation") + async with trio.open_nursery() as nursery: + for s in sockets: + nursery.start_soon(trio.hazmat.wait_readable, s) + await trio.testing.wait_all_tasks_blocked() + pt("spawning wait tasks") + for _ in range(1000): + await trio.hazmat.cancel_shielded_checkpoint() + pt("scheduling 1000 times", count=1000, item="schedule") + nursery.cancel_scope.cancel() + pt("cancelling wait tasks") + for sock in sockets: + sock.close() + pt("closing sockets") + +trio.run(main) From 0b9af3bdab7448ca28b50986370f1b76c161cca6 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 31 Oct 2019 00:18:51 -0700 Subject: [PATCH 29/29] Tweak newsfragment again --- newsfragments/52.feature.rst | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/newsfragments/52.feature.rst b/newsfragments/52.feature.rst index 7da9fc8e3..7ac7e4f3f 100644 --- a/newsfragments/52.feature.rst +++ b/newsfragments/52.feature.rst @@ -11,6 +11,9 @@ This was complex, slow, and had `limited scalability Fortunately, we found a way to implement ``wait_*`` with IOCP, so Trio's Windows backend has been completely rewritten, and now uses IOCP exclusively. As a user, the only difference you should notice is -that Trio should now be faster and more scalable on Windows. This also -simplified the code internally, which should allow for more -improvements in the future. +that Trio should now be faster on Windows, and can handle many more +sockets. This also simplified the code internally, which should allow +for more improvements in the future. + +However, this is somewhat experimental, so if you use Windows then +please keep an eye out and let us know if you run into any problems!