Skip to content

Commit

Permalink
Merge branch 'windows_message_pipes' into test
Browse files Browse the repository at this point in the history
  • Loading branch information
richardsheridan committed Dec 18, 2020
2 parents 8cf7610 + daf68a4 commit ec32582
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 17 deletions.
45 changes: 45 additions & 0 deletions trio/_core/_windows_cffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,25 @@
LPOVERLAPPED lpOverlapped
);
BOOL PeekNamedPipe(
HANDLE hNamedPipe,
LPVOID lpBuffer,
DWORD nBufferSize,
LPDWORD lpBytesRead,
LPDWORD lpTotalBytesAvail,
LPDWORD lpBytesLeftThisMessage
);
BOOL GetNamedPipeHandleStateA(
HANDLE hNamedPipe,
LPDWORD lpState,
LPDWORD lpCurInstances,
LPDWORD lpMaxCollectionCount,
LPDWORD lpCollectDataTimeout,
LPSTR lpUserName,
DWORD nMaxUserNameSize
);
// From https://github.com/piscisaureus/wepoll/blob/master/src/afd.h
typedef struct _AFD_POLL_HANDLE_INFO {
HANDLE Handle;
Expand Down Expand Up @@ -245,6 +264,7 @@ class ErrorCodes(enum.IntEnum):
ERROR_INVALID_PARMETER = 87
ERROR_NOT_FOUND = 1168
ERROR_NOT_SOCKET = 10038
ERROR_MORE_DATA = 234


class FileFlags(enum.IntEnum):
Expand Down Expand Up @@ -296,6 +316,13 @@ class IoControlCodes(enum.IntEnum):
IOCTL_AFD_POLL = 0x00012024


class PipeModes(enum.IntFlag):
PIPE_WAIT = 0x00000000
PIPE_NOWAIT = 0x00000001
PIPE_READMODE_BYTE = 0x00000000
PIPE_READMODE_MESSAGE = 0x00000002


################################################################
# Generic helpers
################################################################
Expand All @@ -321,3 +348,21 @@ def raise_winerror(winerror=None, *, filename=None, filename2=None):
_, msg = ffi.getwinerror(winerror)
# https://docs.python.org/3/library/exceptions.html#OSError
raise OSError(0, msg, filename, winerror, filename2)


def get_pipe_state(handle):
lpState = ffi.new("LPDWORD")
if not kernel32.GetNamedPipeHandleStateA(
_handle(handle), lpState, ffi.NULL, ffi.NULL, ffi.NULL, ffi.NULL, 0
):
raise_winerror() # pragma: no cover
return lpState[0]


def peek_pipe_message_left(handle):
left = ffi.new("LPDWORD")
if not kernel32.PeekNamedPipe(
_handle(handle), ffi.NULL, 0, ffi.NULL, ffi.NULL, left
):
raise_winerror() # pragma: no cover
return left[0]
78 changes: 76 additions & 2 deletions trio/_windows_pipes.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import sys
from typing import TYPE_CHECKING
from . import _core
from ._abc import SendStream, ReceiveStream
from ._abc import SendStream, ReceiveStream, SendChannel, ReceiveChannel
from ._util import ConflictDetector, Final
from ._core._windows_cffi import _handle, raise_winerror, kernel32, ffi
from ._core._windows_cffi import (
_handle,
raise_winerror,
kernel32,
ffi,
ErrorCodes,
peek_pipe_message_left,
)

assert sys.platform == "win32" or not TYPE_CHECKING

Expand Down Expand Up @@ -132,3 +139,70 @@ async def receive_some(self, max_bytes=None) -> bytes:

async def aclose(self):
await self._handle_holder.aclose()


class PipeSendChannel(SendChannel[bytes]):
"""Represents a message stream over a pipe object."""

def __init__(self, handle: int) -> None:
self._pss = PipeSendStream(handle)
# needed for "detach" via _handle_holder.handle = -1
self._handle_holder = self._pss._handle_holder

async def send(self, value: bytes):
# Works just fine if the pipe is message-oriented
await self._pss.send_all(value)

async def aclose(self):
await self._handle_holder.aclose()


class PipeReceiveChannel(ReceiveChannel[bytes]):
"""Represents a message stream over a pipe object."""

def __init__(self, handle: int) -> None:
self._handle_holder = _HandleHolder(handle)
self._conflict_detector = ConflictDetector(
"another task is currently using this pipe"
)

async def receive(self) -> bytes:
with self._conflict_detector:
buffer = bytearray(DEFAULT_RECEIVE_SIZE)
try:
received = await self._receive_some_into(buffer)
except OSError as e:
if e.winerror != ErrorCodes.ERROR_MORE_DATA:
raise # pragma: no cover
left = peek_pipe_message_left(self._handle_holder.handle)
# preallocate memory to avoid an extra copy of very large messages
newbuffer = bytearray(DEFAULT_RECEIVE_SIZE + left)
with memoryview(newbuffer) as view:
view[:DEFAULT_RECEIVE_SIZE] = buffer
await self._receive_some_into(view[DEFAULT_RECEIVE_SIZE:])
return newbuffer
else:
del buffer[received:]
return buffer

async def _receive_some_into(self, buffer) -> bytes:
if self._handle_holder.closed:
raise _core.ClosedResourceError("this pipe is already closed")
try:
return await _core.readinto_overlapped(self._handle_holder.handle, buffer)
except BrokenPipeError:
if self._handle_holder.closed:
raise _core.ClosedResourceError(
"another task closed this pipe"
) from None

# Windows raises BrokenPipeError on one end of a pipe
# whenever the other end closes, regardless of direction.
# Convert this to EndOfChannel.
#
# Do we have to checkpoint manually? We are raising an exception.
await _core.checkpoint()
raise _core.EndOfChannel

async def aclose(self):
await self._handle_holder.aclose()
145 changes: 130 additions & 15 deletions trio/tests/test_windows_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,63 @@
from ..testing import wait_all_tasks_blocked, check_one_way_stream

if sys.platform == "win32":
from .._windows_pipes import PipeSendStream, PipeReceiveStream
from .._core._windows_cffi import _handle, kernel32
from .._windows_pipes import (
PipeSendStream,
PipeReceiveStream,
PipeSendChannel,
PipeReceiveChannel,
DEFAULT_RECEIVE_SIZE,
)
from .._core._windows_cffi import (
_handle,
kernel32,
PipeModes,
get_pipe_state,
)
from asyncio.windows_utils import pipe
from multiprocessing.connection import Pipe
else:
pytestmark = pytest.mark.skip(reason="windows only")
pipe = None # type: Any
PipeSendStream = None # type: Any
PipeReceiveStream = None # type: Any
PipeSendChannel = None # type: Any
PipeReceiveChannel = None # type: Any


async def make_pipe() -> "Tuple[PipeSendStream, PipeReceiveStream]":
"""Makes a new pair of pipes."""
async def make_pipe_stream() -> "Tuple[PipeSendStream, PipeReceiveStream]":
"""Makes a new pair of byte-oriented pipes."""
(r, w) = pipe()
assert not (PipeModes.PIPE_READMODE_MESSAGE & get_pipe_state(r))
return PipeSendStream(w), PipeReceiveStream(r)


async def make_pipe_channel() -> "Tuple[PipeSendChannel, PipeReceiveChannel]":
"""Makes a new pair of message-oriented pipes."""
(r_channel, w_channel) = Pipe(duplex=False)
(r, w) = r_channel.fileno(), w_channel.fileno()
# XXX: Check internal details haven't changed suddenly
assert (r_channel._handle, w_channel._handle) == (r, w)
# XXX: Sabotage _ConnectionBase __del__
(r_channel._handle, w_channel._handle) = (None, None)
# XXX: Check internal details haven't changed suddenly
assert r_channel.closed and w_channel.closed
assert PipeModes.PIPE_READMODE_MESSAGE & get_pipe_state(r)
return PipeSendChannel(w), PipeReceiveChannel(r)


async def test_pipe_typecheck():
with pytest.raises(TypeError):
PipeSendStream(1.0)
with pytest.raises(TypeError):
PipeReceiveStream(None)
with pytest.raises(TypeError):
PipeSendChannel(1.0)
with pytest.raises(TypeError):
PipeReceiveChannel(None)


async def test_pipe_error_on_close():
async def test_pipe_stream_error_on_close():
# Make sure we correctly handle a failure from kernel32.CloseHandle
r, w = pipe()

Expand All @@ -49,8 +82,40 @@ async def test_pipe_error_on_close():
await receive_stream.aclose()


async def test_pipes_combined():
write, read = await make_pipe()
async def test_pipe_channel_error_on_close():
# Make sure we correctly handle a failure from kernel32.CloseHandle
send_channel, receive_channel = await make_pipe_channel()

assert kernel32.CloseHandle(_handle(receive_channel._handle_holder.handle))
assert kernel32.CloseHandle(_handle(send_channel._handle_holder.handle))

with pytest.raises(OSError):
await send_channel.aclose()
with pytest.raises(OSError):
await receive_channel.aclose()


async def test_closed_resource_error():
send_stream, receive_stream = await make_pipe_stream()

await send_stream.aclose()
with pytest.raises(_core.ClosedResourceError):
await send_stream.send_all(b"Hello")

send_channel, receive_channel = await make_pipe_channel()

with pytest.raises(_core.ClosedResourceError):
async with _core.open_nursery() as nursery:
nursery.start_soon(receive_channel.receive)
await wait_all_tasks_blocked(0.01)
await receive_channel.aclose()
await send_channel.aclose()
with pytest.raises(_core.ClosedResourceError):
await send_channel.send(b"Hello")


async def test_pipe_streams_combined():
write, read = await make_pipe_stream()
count = 2 ** 20
replicas = 3

Expand All @@ -73,13 +138,37 @@ async def reader():

assert total_received == count * replicas

async with _core.open_nursery() as n:
n.start_soon(sender)
n.start_soon(reader)
async with _core.open_nursery() as nursery:
nursery.start_soon(sender)
nursery.start_soon(reader)


async def test_pipe_channels_combined():
async def sender():
async with write:
b = bytearray(count)
for _ in range(replicas):
await write.send(b)

async def reader():
async with read:
await wait_all_tasks_blocked()
total_received = 0
async for b in read:
total_received += len(b)

assert total_received == count * replicas

for count in (8, DEFAULT_RECEIVE_SIZE, 2 ** 20):
for replicas in (1, 2, 3):
write, read = await make_pipe_channel()
async with _core.open_nursery() as nursery:
nursery.start_soon(sender)
nursery.start_soon(reader)


async def test_async_with():
w, r = await make_pipe()
async def test_async_with_stream():
w, r = await make_pipe_stream()
async with w, r:
pass

Expand All @@ -89,8 +178,19 @@ async def test_async_with():
await r.receive_some(10)


async def test_close_during_write():
w, r = await make_pipe()
async def test_async_with_channel():
w, r = await make_pipe_channel()
async with w, r:
pass

with pytest.raises(_core.ClosedResourceError):
await w.send(None)
with pytest.raises(_core.ClosedResourceError):
await r.receive()


async def test_close_stream_during_write():
w, r = await make_pipe_stream()
async with _core.open_nursery() as nursery:

async def write_forever():
Expand All @@ -104,7 +204,22 @@ async def write_forever():
await w.aclose()


async def test_close_channel_during_write():
w, r = await make_pipe_channel()
async with _core.open_nursery() as nursery:

async def write_forever():
with pytest.raises(_core.ClosedResourceError) as excinfo:
while True:
await w.send(b"x" * 4096)
assert "another task" in str(excinfo.value)

nursery.start_soon(write_forever)
await wait_all_tasks_blocked(0.1)
await w.aclose()


async def test_pipe_fully():
# passing make_clogged_pipe tests wait_send_all_might_not_block, and we
# can't implement that on Windows
await check_one_way_stream(make_pipe, None)
await check_one_way_stream(make_pipe_stream, None)

0 comments on commit ec32582

Please sign in to comment.