Skip to content

Add multiprocessing.sharedctypes stubs #5231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
May 2, 2021
21 changes: 10 additions & 11 deletions stdlib/multiprocessing/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sys
from collections.abc import Callable, Iterable
from logging import Logger
from multiprocessing import connection, pool, sharedctypes, synchronize
from multiprocessing import connection, context, pool, synchronize
from multiprocessing.context import (
AuthenticationError as AuthenticationError,
BaseContext,
Expand All @@ -17,7 +18,7 @@ from multiprocessing.process import active_children as active_children, current_
# These are technically functions that return instances of these Queue classes. See #4313 for discussion
from multiprocessing.queues import JoinableQueue as JoinableQueue, Queue as Queue, SimpleQueue as SimpleQueue
from multiprocessing.spawn import freeze_support as freeze_support
from typing import Any, Callable, Iterable, List, Optional, Sequence, Tuple, Union, overload
from typing import Any, Optional, Union, overload
from typing_extensions import Literal

if sys.version_info >= (3, 8):
Expand All @@ -32,6 +33,10 @@ if sys.platform != "win32":

# Sychronization primitives
_LockLike = Union[synchronize.Lock, synchronize.RLock]
RawValue = context._default_context.RawValue
RawArray = context._default_context.RawArray
Value = context._default_context.Value
Array = context._default_context.Array

def Barrier(parties: int, action: Optional[Callable[..., Any]] = ..., timeout: Optional[float] = ...) -> synchronize.Barrier: ...
def BoundedSemaphore(value: int = ...) -> synchronize.BoundedSemaphore: ...
Expand All @@ -40,29 +45,23 @@ def Event() -> synchronize.Event: ...
def Lock() -> synchronize.Lock: ...
def RLock() -> synchronize.RLock: ...
def Semaphore(value: int = ...) -> synchronize.Semaphore: ...
def Pipe(duplex: bool = ...) -> Tuple[connection.Connection, connection.Connection]: ...
def Pipe(duplex: bool = ...) -> tuple[connection.Connection, connection.Connection]: ...
def Pool(
processes: Optional[int] = ...,
initializer: Optional[Callable[..., Any]] = ...,
initargs: Iterable[Any] = ...,
maxtasksperchild: Optional[int] = ...,
) -> pool.Pool: ...

# Functions Array and Value are copied from context.pyi.
# See https://github.com/python/typeshed/blob/ac234f25927634e06d9c96df98d72d54dd80dfc4/stdlib/2and3/turtle.pyi#L284-L291
# for rationale
def Array(typecode_or_type: Any, size_or_initializer: Union[int, Sequence[Any]], *, lock: bool = ...) -> sharedctypes._Array: ...
def Value(typecode_or_type: Any, *args: Any, lock: bool = ...) -> sharedctypes._Value: ...

# ----- multiprocessing function stubs -----
def allow_connection_pickling() -> None: ...
def cpu_count() -> int: ...
def get_logger() -> Logger: ...
def log_to_stderr(level: Optional[Union[str, int]] = ...) -> Logger: ...
def Manager() -> SyncManager: ...
def set_executable(executable: str) -> None: ...
def set_forkserver_preload(module_names: List[str]) -> None: ...
def get_all_start_methods() -> List[str]: ...
def set_forkserver_preload(module_names: list[str]) -> None: ...
def get_all_start_methods() -> list[str]: ...
def get_start_method(allow_none: bool = ...) -> Optional[str]: ...
def set_start_method(method: str, force: Optional[bool] = ...) -> None: ...

Expand Down
71 changes: 51 additions & 20 deletions stdlib/multiprocessing/context.pyi
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import ctypes
import multiprocessing
import sys
from collections.abc import Callable, Iterable, Sequence
from ctypes import _CData
from logging import Logger
from multiprocessing import queues, sharedctypes, synchronize
from multiprocessing import queues, synchronize
from multiprocessing.process import BaseProcess
from typing import Any, Callable, Iterable, List, Optional, Sequence, Type, Union, overload
from multiprocessing.sharedctypes import SynchronizedArray, SynchronizedBase
from typing import Any, Optional, Type, TypeVar, Union, overload
from typing_extensions import Literal

_LockLike = Union[synchronize.Lock, synchronize.RLock]
_CT = TypeVar("_CT", bound=_CData)

class ProcessError(Exception): ...
class BufferTooShort(ProcessError): ...
Expand All @@ -28,7 +33,7 @@ class BaseContext(object):
@staticmethod
def parent_process() -> Optional[BaseProcess]: ...
@staticmethod
def active_children() -> List[BaseProcess]: ...
def active_children() -> list[BaseProcess]: ...
def cpu_count(self) -> int: ...
# TODO: change return to SyncManager once a stub exists in multiprocessing.managers
def Manager(self) -> Any: ...
Expand All @@ -53,28 +58,52 @@ class BaseContext(object):
initargs: Iterable[Any] = ...,
maxtasksperchild: Optional[int] = ...,
) -> multiprocessing.pool.Pool: ...
# TODO: typecode_or_type param is a ctype with a base class of _SimpleCData or array.typecode Need to figure out
# how to handle the ctype
# TODO: change return to RawValue once a stub exists in multiprocessing.sharedctypes
def RawValue(self, typecode_or_type: Any, *args: Any) -> Any: ...
# TODO: typecode_or_type param is a ctype with a base class of _SimpleCData or array.typecode Need to figure out
# how to handle the ctype
# TODO: change return to RawArray once a stub exists in multiprocessing.sharedctypes
def RawArray(self, typecode_or_type: Any, size_or_initializer: Union[int, Sequence[Any]]) -> Any: ...
# TODO: typecode_or_type param is a ctype with a base class of _SimpleCData or array.typecode Need to figure out
# how to handle the ctype
def Value(self, typecode_or_type: Any, *args: Any, lock: bool = ...) -> sharedctypes._Value: ...
# TODO: typecode_or_type param is a ctype with a base class of _SimpleCData or array.typecode Need to figure out
# how to handle the ctype
@overload
def RawValue(self, typecode_or_type: Type[_CT], *args: Any) -> _CT: ...
@overload
def RawValue(self, typecode_or_type: str, *args: Any) -> Any: ...
@overload
def RawArray(self, typecode_or_type: Type[_CT], size_or_initializer: Union[int, Sequence[Any]]) -> ctypes.Array[_CT]: ...
@overload
def RawArray(self, typecode_or_type: str, size_or_initializer: Union[int, Sequence[Any]]) -> Any: ...
@overload
def Value(self, typecode_or_type: Type[_CT], *args: Any, lock: Literal[False]) -> _CT: ...
@overload
def Value(self, typecode_or_type: Type[_CT], *args: Any, lock: Union[Literal[True], _LockLike]) -> SynchronizedBase[_CT]: ...
@overload
def Value(self, typecode_or_type: str, *args: Any, lock: Union[Literal[True], _LockLike]) -> SynchronizedBase[Any]: ...
@overload
def Value(self, typecode_or_type: Union[str, Type[_CData]], *args: Any, lock: Union[bool, _LockLike] = ...) -> Any: ...
@overload
def Array(
self, typecode_or_type: Any, size_or_initializer: Union[int, Sequence[Any]], *, lock: bool = ...
) -> sharedctypes._Array: ...
self, typecode_or_type: Type[_CT], size_or_initializer: Union[int, Sequence[Any]], *, lock: Literal[False]
) -> _CT: ...
@overload
def Array(
self,
typecode_or_type: Type[_CT],
size_or_initializer: Union[int, Sequence[Any]],
*,
lock: Union[Literal[True], _LockLike],
) -> SynchronizedArray[_CT]: ...
@overload
def Array(
self, typecode_or_type: str, size_or_initializer: Union[int, Sequence[Any]], *, lock: Union[Literal[True], _LockLike]
) -> SynchronizedArray[Any]: ...
@overload
def Array(
self,
typecode_or_type: Union[str, Type[_CData]],
size_or_initializer: Union[int, Sequence[Any]],
*,
lock: Union[bool, _LockLike] = ...,
) -> Any: ...
def freeze_support(self) -> None: ...
def get_logger(self) -> Logger: ...
def log_to_stderr(self, level: Optional[str] = ...) -> Logger: ...
def allow_connection_pickling(self) -> None: ...
def set_executable(self, executable: str) -> None: ...
def set_forkserver_preload(self, module_names: List[str]) -> None: ...
def set_forkserver_preload(self, module_names: list[str]) -> None: ...
if sys.platform != "win32":
@overload
def get_context(self, method: None = ...) -> DefaultContext: ...
Expand Down Expand Up @@ -111,7 +140,9 @@ class DefaultContext(BaseContext):
def __init__(self, context: BaseContext) -> None: ...
def set_start_method(self, method: Optional[str], force: bool = ...) -> None: ...
def get_start_method(self, allow_none: bool = ...) -> str: ...
def get_all_start_methods(self) -> List[str]: ...
def get_all_start_methods(self) -> list[str]: ...

_default_context: DefaultContext

if sys.platform != "win32":
class ForkProcess(BaseProcess):
Expand Down
124 changes: 91 additions & 33 deletions stdlib/multiprocessing/sharedctypes.pyi
Original file line number Diff line number Diff line change
@@ -1,43 +1,101 @@
from ctypes import _CData
import ctypes
from collections.abc import Callable, Iterable, Sequence
from ctypes import _CData, _SimpleCData, c_char
from multiprocessing.context import BaseContext
from multiprocessing.synchronize import _LockLike
from typing import Any, List, Optional, Sequence, Type, Union, overload
from typing import Any, Generic, Optional, Protocol, Type, TypeVar, Union, overload
from typing_extensions import Literal

class _Array:
value: Any = ...
def __init__(
self,
typecode_or_type: Union[str, Type[_CData]],
size_or_initializer: Union[int, Sequence[Any]],
*,
lock: Union[bool, _LockLike] = ...,
) -> None: ...
def acquire(self) -> bool: ...
def release(self) -> bool: ...
def get_lock(self) -> _LockLike: ...
def get_obj(self) -> Any: ...
@overload
def __getitem__(self, key: int) -> Any: ...
@overload
def __getitem__(self, key: slice) -> List[Any]: ...
def __getslice__(self, start: int, stop: int) -> Any: ...
def __setitem__(self, key: int, value: Any) -> None: ...

class _Value:
value: Any = ...
def __init__(self, typecode_or_type: Union[str, Type[_CData]], *args: Any, lock: Union[bool, _LockLike] = ...) -> None: ...
def get_lock(self) -> _LockLike: ...
def get_obj(self) -> Any: ...
def acquire(self) -> bool: ...
def release(self) -> bool: ...
_T = TypeVar("_T")
_CT = TypeVar("_CT", bound=_CData)

@overload
def RawValue(typecode_or_type: Type[_CT], *args: Any) -> _CT: ...
@overload
def RawValue(typecode_or_type: str, *args: Any) -> Any: ...
@overload
def RawArray(typecode_or_type: Type[_CT], size_or_initializer: Union[int, Sequence[Any]]) -> ctypes.Array[_CT]: ...
@overload
def RawArray(typecode_or_type: str, size_or_initializer: Union[int, Sequence[Any]]) -> Any: ...
@overload
def Value(typecode_or_type: Type[_CT], *args: Any, lock: Literal[False], ctx: Optional[BaseContext] = ...) -> _CT: ...
@overload
def Value(
typecode_or_type: Type[_CT], *args: Any, lock: Union[Literal[True], _LockLike], ctx: Optional[BaseContext] = ...
) -> SynchronizedBase[_CT]: ...
@overload
def Value(
typecode_or_type: str, *args: Any, lock: Union[Literal[True], _LockLike], ctx: Optional[BaseContext] = ...
) -> SynchronizedBase[Any]: ...
@overload
def Value(
typecode_or_type: Union[str, Type[_CData]], *args: Any, lock: Union[bool, _LockLike] = ..., ctx: Optional[BaseContext] = ...
) -> Any: ...
@overload
def Array(
typecode_or_type: Type[_CT],
size_or_initializer: Union[int, Sequence[Any]],
*,
lock: Literal[False],
ctx: Optional[BaseContext] = ...,
) -> _CT: ...
@overload
def Array(
typecode_or_type: Type[_CT],
size_or_initializer: Union[int, Sequence[Any]],
*,
lock: Union[Literal[True], _LockLike],
ctx: Optional[BaseContext] = ...,
) -> SynchronizedArray[_CT]: ...
@overload
def Array(
typecode_or_type: str,
size_or_initializer: Union[int, Sequence[Any]],
*,
lock: Union[Literal[True], _LockLike],
ctx: Optional[BaseContext] = ...,
) -> SynchronizedArray[Any]: ...
@overload
def Array(
typecode_or_type: Union[str, Type[_CData]],
size_or_initializer: Union[int, Sequence[Any]],
*,
lock: Union[bool, _LockLike] = ...,
ctx: Optional[BaseContext] = ...,
) -> _Array: ...
def Value(
typecode_or_type: Union[str, Type[_CData]], *args: Any, lock: Union[bool, _LockLike] = ..., ctx: Optional[BaseContext] = ...
) -> _Value: ...
) -> Any: ...
def copy(obj: _CT) -> _CT: ...
@overload
def synchronized(obj: _SimpleCData[_T], lock: Optional[_LockLike] = ..., ctx: Optional[Any] = ...) -> Synchronized[_T]: ...
@overload
def synchronized(obj: ctypes.Array[c_char], lock: Optional[_LockLike] = ..., ctx: Optional[Any] = ...) -> SynchronizedString: ...
@overload
def synchronized(obj: ctypes.Array[_CT], lock: Optional[_LockLike] = ..., ctx: Optional[Any] = ...) -> SynchronizedArray[_CT]: ...
@overload
def synchronized(obj: _CT, lock: Optional[_LockLike] = ..., ctx: Optional[Any] = ...) -> SynchronizedBase[_CT]: ...

class _AcquireFunc(Protocol):
def __call__(self, block: bool = ..., timeout: Optional[float] = ...) -> bool: ...

class SynchronizedBase(Generic[_CT]):
acquire: _AcquireFunc = ...
release: Callable[[], None] = ...
def __init__(self, obj: Any, lock: Optional[_LockLike] = ..., ctx: Optional[Any] = ...) -> None: ...
def __reduce__(self) -> tuple[Callable[..., Any], tuple[Any, _LockLike]]: ...
def get_obj(self) -> _CT: ...
def get_lock(self) -> _LockLike: ...
def __enter__(self) -> bool: ...
def __exit__(self, *args: Any) -> None: ...

class Synchronized(SynchronizedBase[_SimpleCData[_T]], Generic[_T]):
value: _T

class SynchronizedArray(SynchronizedBase[ctypes.Array[_CT]], Generic[_CT]):
def __len__(self) -> int: ...
def __getitem__(self, i: int) -> _CT: ...
def __setitem__(self, i: int, o: _CT) -> None: ...
def __getslice__(self, start: int, stop: int) -> list[_CT]: ...
def __setslice__(self, start: int, stop: int, values: Iterable[_CT]) -> None: ...

class SynchronizedString(SynchronizedArray[c_char]):
value: bytes
raw: bytes