Skip to content

Commit

Permalink
Add type annotations for windows_pipes and its test (#2812)
Browse files Browse the repository at this point in the history
* Add type annotations for windows_pipes and its test
  • Loading branch information
CoolCat467 authored Oct 17, 2023
1 parent f2cb7b1 commit b161fec
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 31 deletions.
4 changes: 0 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ disallow_untyped_calls = false
# files not yet fully typed
[[tool.mypy.overrides]]
module = [
# internal
"trio/_windows_pipes",

# tests
"trio/testing/_fake_net",
"trio/_core/_tests/test_guest_mode",
Expand Down Expand Up @@ -115,7 +112,6 @@ module = [
"trio/_tests/test_tracing",
"trio/_tests/test_util",
"trio/_tests/test_wait_for_object",
"trio/_tests/test_windows_pipes",
"trio/_tests/tools/test_gen_exports",
]
check_untyped_defs = false
Expand Down
40 changes: 22 additions & 18 deletions trio/_tests/test_windows_pipes.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,41 @@
from __future__ import annotations

import sys
from typing import Any, Tuple
from typing import TYPE_CHECKING

import pytest

from .. import _core
from ..testing import check_one_way_stream, wait_all_tasks_blocked

# Mark all the tests in this file as being windows-only
pytestmark = pytest.mark.skipif(sys.platform != "win32", reason="windows only")

assert ( # Skip type checking when not on Windows
sys.platform == "win32" or not TYPE_CHECKING
)

if sys.platform == "win32":
from asyncio.windows_utils import pipe

from .._core._windows_cffi import _handle, kernel32
from .._windows_pipes import PipeReceiveStream, PipeSendStream
else:
pytestmark = pytest.mark.skip(reason="windows only")
pipe: Any = None
PipeSendStream: Any = None
PipeReceiveStream: Any = None


async def make_pipe() -> Tuple[PipeSendStream, PipeReceiveStream]:
async def make_pipe() -> tuple[PipeSendStream, PipeReceiveStream]:
"""Makes a new pair of pipes."""
(r, w) = pipe()
return PipeSendStream(w), PipeReceiveStream(r)


async def test_pipe_typecheck():
async def test_pipe_typecheck() -> None:
with pytest.raises(TypeError):
PipeSendStream(1.0)
PipeSendStream(1.0) # type: ignore[arg-type]
with pytest.raises(TypeError):
PipeReceiveStream(None)
PipeReceiveStream(None) # type: ignore[arg-type]


async def test_pipe_error_on_close():
async def test_pipe_error_on_close() -> None:
# Make sure we correctly handle a failure from kernel32.CloseHandle
r, w = pipe()

Expand All @@ -47,18 +51,18 @@ async def test_pipe_error_on_close():
await receive_stream.aclose()


async def test_pipes_combined():
async def test_pipes_combined() -> None:
write, read = await make_pipe()
count = 2**20
replicas = 3

async def sender():
async def sender() -> None:
async with write:
big = bytearray(count)
for _ in range(replicas):
await write.send_all(big)

async def reader():
async def reader() -> None:
async with read:
await wait_all_tasks_blocked()
total_received = 0
Expand All @@ -76,7 +80,7 @@ async def reader():
n.start_soon(reader)


async def test_async_with():
async def test_async_with() -> None:
w, r = await make_pipe()
async with w, r:
pass
Expand All @@ -87,11 +91,11 @@ async def test_async_with():
await r.receive_some(10)


async def test_close_during_write():
async def test_close_during_write() -> None:
w, r = await make_pipe()
async with _core.open_nursery() as nursery:

async def write_forever():
async def write_forever() -> None:
with pytest.raises(_core.ClosedResourceError) as excinfo:
while True:
await w.send_all(b"x" * 4096)
Expand All @@ -102,7 +106,7 @@ async def write_forever():
await w.aclose()


async def test_pipe_fully():
async def test_pipe_fully() -> None:
# passing make_clogged_pipe tests wait_send_all_might_not_block, and we
# can't implement that on Windows
await check_one_way_stream(make_pipe, None)
20 changes: 11 additions & 9 deletions trio/_windows_pipes.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import sys
from typing import TYPE_CHECKING

Expand All @@ -23,18 +25,18 @@ def __init__(self, handle: int) -> None:
_core.register_with_iocp(self.handle)

@property
def closed(self):
def closed(self) -> bool:
return self.handle == -1

def close(self):
def close(self) -> None:
if self.closed:
return
handle = self.handle
self.handle = -1
if not kernel32.CloseHandle(_handle(handle)):
raise_winerror()

def __del__(self):
def __del__(self) -> None:
self.close()


Expand All @@ -50,7 +52,7 @@ def __init__(self, handle: int) -> None:
"another task is currently using this pipe"
)

async def send_all(self, data: bytes):
async def send_all(self, data: bytes) -> None:
with self._conflict_detector:
if self._handle_holder.closed:
raise _core.ClosedResourceError("this pipe is already closed")
Expand All @@ -76,10 +78,10 @@ async def wait_send_all_might_not_block(self) -> None:
# not implemented yet, and probably not needed
await _core.checkpoint()

def close(self):
def close(self) -> None:
self._handle_holder.close()

async def aclose(self):
async def aclose(self) -> None:
self.close()
await _core.checkpoint()

Expand All @@ -94,7 +96,7 @@ def __init__(self, handle: int) -> None:
"another task is currently using this pipe"
)

async def receive_some(self, max_bytes=None) -> bytes:
async def receive_some(self, max_bytes: int | None = None) -> bytes:
with self._conflict_detector:
if self._handle_holder.closed:
raise _core.ClosedResourceError("this pipe is already closed")
Expand Down Expand Up @@ -133,9 +135,9 @@ async def receive_some(self, max_bytes=None) -> bytes:
del buffer[size:]
return buffer

def close(self):
def close(self) -> None:
self._handle_holder.close()

async def aclose(self):
async def aclose(self) -> None:
self.close()
await _core.checkpoint()

0 comments on commit b161fec

Please sign in to comment.