Skip to content

Make multiprocessing pipes generic #11137

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions stdlib/concurrent/futures/process.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ _global_shutdown: bool

class _ThreadWakeup:
_closed: bool
_reader: Connection
_writer: Connection
# Any: Unused send and recv methods
_reader: Connection[Any, Any]
_writer: Connection[Any, Any]
Comment on lines +22 to +24
Copy link
Member

@AlexWaygood AlexWaygood Mar 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe these should be Connection[object, object] or Connection[Unused, Unused]? Otherwise we might get complaints about false positives from the (very small) group of people who use mypy's --disallow-any-expr optional lint: https://mypy.readthedocs.io/en/stable/command_line.html#cmdoption-mypy-disallow-any-expr

Copy link
Collaborator Author

@Avasam Avasam Mar 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure for the first, _SendT, contravariant, used for params argument.
Idk about the second, _RecvT, covariant, used for return type.

reader1: Connection[int, object] = _ThreadWakeup()._reader  # OK
reader2: Connection[object, int] = _ThreadWakeup()._reader  # "Connection[object, object]" cannot be assigned to declared type "Connection[object, int]"

class MyThreadWakeup(_ThreadWakeup):
    _reader: Connection[int, int]  # OK

Arguably this is also a "private" name and I fail to come up with a real-world example

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One option is to use the stricter non-Any types for now, and see if anybody complains about false positives... But I'm also fine with the PR being merged as-is; I don't have a strong opinion here :-)

def close(self) -> None: ...
def wakeup(self) -> None: ...
def clear(self) -> None: ...
Expand Down
46 changes: 27 additions & 19 deletions stdlib/multiprocessing/connection.pyi
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the variance of the TypeVars correct here? It looks like maybe _SendT could be contravariant, and _RecvT covariant?

Copy link
Collaborator Author

@Avasam Avasam Dec 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idk if the TypeVar variance is correct, please advise.

🤷 variance with Python's TypeVars is still something I get highly confused about and has not "clicked" at all for me. Like I understand the general concept of directionality (sometimes any base type can be used, sometimes any subtype can be used), and that you'd want a "wider" type for parameters, "narrower" type for return types. But I'm never sure which to use (if any) when it comes to TypeVars

Copy link
Member

@AlexWaygood AlexWaygood Dec 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think anybody who claims to be able to think about generics in Python without their head hurting sometimes is lying to you :)

In general, a good heuristic for a given TypeVar in a given class is that if the TypeVar only appears in return annotations in that class, it should be covariant; if it only appears in parameter annotations, it should be contravariant; if it appears in both, it has to be invariant

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted and bookmarked. It may not always apply, but heuristics I can follow.

Copy link
Collaborator

@Akuli Akuli Dec 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have explained my view of variance on typeshed a few times, and I'm too lazy to find a previous explanation, so here goes again :)

Most of the time you only need invariant (default) and covariant TypeVars. To distinguish them, think about lists and tuples with int and float items. A list[int] is not a valid list[float], because we don't want to allow this:

foo: list[int] = [1, 2, 3]
bar: list[float] = foo
bar.append(12.34)

We communicate this to type checkers by using an invariant TypeVar in list. It means that the item types must match exactly for lists to be compatible.

On the other hand, a tuple[int, ...] is also a valid tuple[float, ...], so the TypeVar in tuple is covariant. A tuple of a more specific type is also a valid tuple of a more general type.

Contravariance comes up less often, but it's basically the opposite of covariant: making T more general actually makes the generic type more specific. Consider Callable[[T], None] for example. A Callable[[float], object] (think time.sleep) is also a valid Callable[[int], object].

And yes, variance is hard :)

Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import socket
import sys
import types
from _typeshed import ReadableBuffer
from _typeshed import Incomplete, ReadableBuffer
from collections.abc import Iterable
from typing import Any, SupportsIndex
from types import TracebackType
from typing import Any, Generic, SupportsIndex, TypeVar
from typing_extensions import Self, TypeAlias

__all__ = ["Client", "Listener", "Pipe", "wait"]

# https://docs.python.org/3/library/multiprocessing.html#address-formats
_Address: TypeAlias = str | tuple[str, int]

class _ConnectionBase:
# Defaulting to Any to avoid forcing generics on a lot of pre-existing code
_SendT = TypeVar("_SendT", contravariant=True, default=Any)
_RecvT = TypeVar("_RecvT", covariant=True, default=Any)

class _ConnectionBase(Generic[_SendT, _RecvT]):
def __init__(self, handle: SupportsIndex, readable: bool = True, writable: bool = True) -> None: ...
@property
def closed(self) -> bool: ... # undocumented
Expand All @@ -22,54 +26,58 @@ class _ConnectionBase:
def fileno(self) -> int: ...
def close(self) -> None: ...
def send_bytes(self, buf: ReadableBuffer, offset: int = 0, size: int | None = None) -> None: ...
def send(self, obj: Any) -> None: ...
def send(self, obj: _SendT) -> None: ...
def recv_bytes(self, maxlength: int | None = None) -> bytes: ...
def recv_bytes_into(self, buf: Any, offset: int = 0) -> int: ...
def recv(self) -> Any: ...
def recv(self) -> _RecvT: ...
def poll(self, timeout: float | None = 0.0) -> bool: ...
def __enter__(self) -> Self: ...
def __exit__(
self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_tb: types.TracebackType | None
self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_tb: TracebackType | None
) -> None: ...
def __del__(self) -> None: ...

class Connection(_ConnectionBase): ...
class Connection(_ConnectionBase[_SendT, _RecvT]): ...

if sys.platform == "win32":
class PipeConnection(_ConnectionBase): ...
class PipeConnection(_ConnectionBase[_SendT, _RecvT]): ...

class Listener:
def __init__(
self, address: _Address | None = None, family: str | None = None, backlog: int = 1, authkey: bytes | None = None
) -> None: ...
def accept(self) -> Connection: ...
def accept(self) -> Connection[Incomplete, Incomplete]: ...
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is Incomplete instead of Any since you'd like to make Listener generic as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this could be improved further, rather than "too complex for the type system" or "you can truly send anything through the connection". Just not something I care to do in the scope of making Pipe generic.

def close(self) -> None: ...
@property
def address(self) -> _Address: ...
@property
def last_accepted(self) -> _Address | None: ...
def __enter__(self) -> Self: ...
def __exit__(
self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_tb: types.TracebackType | None
self, exc_type: type[BaseException] | None, exc_value: BaseException | None, exc_tb: TracebackType | None
) -> None: ...

# Any: send and recv methods unused
if sys.version_info >= (3, 12):
def deliver_challenge(connection: Connection, authkey: bytes, digest_name: str = "sha256") -> None: ...
def deliver_challenge(connection: Connection[Any, Any], authkey: bytes, digest_name: str = "sha256") -> None: ...

else:
def deliver_challenge(connection: Connection, authkey: bytes) -> None: ...
def deliver_challenge(connection: Connection[Any, Any], authkey: bytes) -> None: ...

def answer_challenge(connection: Connection, authkey: bytes) -> None: ...
def answer_challenge(connection: Connection[Any, Any], authkey: bytes) -> None: ...
def wait(
object_list: Iterable[Connection | socket.socket | int], timeout: float | None = None
) -> list[Connection | socket.socket | int]: ...
def Client(address: _Address, family: str | None = None, authkey: bytes | None = None) -> Connection: ...
object_list: Iterable[Connection[_SendT, _RecvT] | socket.socket | int], timeout: float | None = None
) -> list[Connection[_SendT, _RecvT] | socket.socket | int]: ...
def Client(address: _Address, family: str | None = None, authkey: bytes | None = None) -> Connection[Any, Any]: ...

# N.B. Keep this in sync with multiprocessing.context.BaseContext.Pipe.
# _ConnectionBase is the common base class of Connection and PipeConnection
# and can be used in cross-platform code.
#
# The two connections should have the same generic types but inverted (Connection[_T1, _T2], Connection[_T2, _T1]).
# However, TypeVars scoped entirely within a return annotation is unspecified in the spec.
if sys.platform != "win32":
def Pipe(duplex: bool = True) -> tuple[Connection, Connection]: ...
def Pipe(duplex: bool = True) -> tuple[Connection[Any, Any], Connection[Any, Any]]: ...

else:
def Pipe(duplex: bool = True) -> tuple[PipeConnection, PipeConnection]: ...
def Pipe(duplex: bool = True) -> tuple[PipeConnection[Any, Any], PipeConnection[Any, Any]]: ...
7 changes: 5 additions & 2 deletions stdlib/multiprocessing/context.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ class BaseContext:
# N.B. Keep this in sync with multiprocessing.connection.Pipe.
# _ConnectionBase is the common base class of Connection and PipeConnection
# and can be used in cross-platform code.
#
# The two connections should have the same generic types but inverted (Connection[_T1, _T2], Connection[_T2, _T1]).
# However, TypeVars scoped entirely within a return annotation is unspecified in the spec.
if sys.platform != "win32":
def Pipe(self, duplex: bool = True) -> tuple[Connection, Connection]: ...
def Pipe(self, duplex: bool = True) -> tuple[Connection[Any, Any], Connection[Any, Any]]: ...
else:
def Pipe(self, duplex: bool = True) -> tuple[PipeConnection, PipeConnection]: ...
def Pipe(self, duplex: bool = True) -> tuple[PipeConnection[Any, Any], PipeConnection[Any, Any]]: ...

def Barrier(
self, parties: int, action: Callable[..., object] | None = None, timeout: float | None = None
Expand Down
6 changes: 4 additions & 2 deletions stdlib/multiprocessing/managers.pyi
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import queue
import sys
import threading
from _typeshed import SupportsKeysAndGetItem, SupportsRichComparison, SupportsRichComparisonT
from _typeshed import Incomplete, SupportsKeysAndGetItem, SupportsRichComparison, SupportsRichComparisonT
from collections.abc import Callable, Iterable, Iterator, Mapping, MutableMapping, MutableSequence, Sequence
from types import TracebackType
from typing import Any, AnyStr, ClassVar, Generic, SupportsIndex, TypeVar, overload
Expand Down Expand Up @@ -125,7 +125,9 @@ class Server:
self, registry: dict[str, tuple[Callable[..., Any], Any, Any, Any]], address: Any, authkey: bytes, serializer: str
) -> None: ...
def serve_forever(self) -> None: ...
def accept_connection(self, c: Connection, name: str) -> None: ...
def accept_connection(
self, c: Connection[tuple[str, str | None], tuple[str, str, Iterable[Incomplete], Mapping[str, Incomplete]]], name: str
) -> None: ...

class BaseManager:
if sys.version_info >= (3, 11):
Expand Down
4 changes: 2 additions & 2 deletions stdlib/multiprocessing/reduction.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ if sys.platform == "win32":
handle: int, target_process: int | None = None, inheritable: bool = False, *, source_process: int | None = None
) -> int: ...
def steal_handle(source_pid: int, handle: int) -> int: ...
def send_handle(conn: connection.PipeConnection, handle: int, destination_pid: int) -> None: ...
def recv_handle(conn: connection.PipeConnection) -> int: ...
def send_handle(conn: connection.PipeConnection[DupHandle, Any], handle: int, destination_pid: int) -> None: ...
def recv_handle(conn: connection.PipeConnection[Any, DupHandle]) -> int: ...

class DupHandle:
def __init__(self, handle: int, access: int, pid: int | None = None) -> None: ...
Expand Down
31 changes: 31 additions & 0 deletions test_cases/stdlib/multiprocessing/check_pipe_connections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from __future__ import annotations

import sys
from multiprocessing.connection import Pipe

if sys.platform != "win32":
from multiprocessing.connection import Connection
else:
from multiprocessing.connection import PipeConnection as Connection


# Unfortunately, we cannot validate that both connections have the same, but inverted generic types,
# since TypeVars scoped entirely within a return annotation is unspecified in the spec.
# Pipe[str, int]() -> tuple[Connection[str, int], Connection[int, str]]

a: Connection[str, int]
b: Connection[int, str]
a, b = Pipe()

connections: tuple[Connection[str, int], Connection[int, str]] = Pipe()
a, b = connections

a.send("test")
a.send(0) # type: ignore
test1: str = b.recv()
test2: int = b.recv() # type: ignore

b.send("test") # type: ignore
b.send(0)
test3: str = a.recv() # type: ignore
test4: int = a.recv()