diff --git a/docs/api.rst b/docs/api.rst index 8da25428..5d2a6634 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -50,6 +50,12 @@ Running code in worker threads .. autofunction:: anyio.to_thread.run_sync .. autofunction:: anyio.to_thread.current_default_thread_limiter +Running code in subinterpreters +------------------------------- + +.. autofunction:: anyio.to_interpreter.run_sync +.. autofunction:: anyio.to_interpreter.current_default_interpreter_limiter + Running code in worker processes -------------------------------- @@ -189,6 +195,8 @@ Exceptions ---------- .. autoexception:: anyio.BrokenResourceError +.. autoexception:: anyio.BrokenWorkerIntepreter +.. autoexception:: anyio.BrokenWorkerProcess .. autoexception:: anyio.BusyResourceError .. autoexception:: anyio.ClosedResourceError .. autoexception:: anyio.DelimiterNotFound diff --git a/docs/index.rst b/docs/index.rst index 1b78732d..c6d234ec 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -18,6 +18,7 @@ The manual networking threads subprocesses + subinterpreters fileio signals testing diff --git a/docs/subinterpreters.rst b/docs/subinterpreters.rst new file mode 100644 index 00000000..41bb121a --- /dev/null +++ b/docs/subinterpreters.rst @@ -0,0 +1,51 @@ +Working with subinterpreters +============================ + +.. py:currentmodule:: anyio + +Subinterpreters offer a middle ground between worker threads and worker processes. They +allow you to utilize multiple CPU cores to run Python code while avoiding the overhead +and complexities of spawning subprocesses. + +.. warning:: Subinterpreter support is considered **experimental**. The underlying + Python API for managing subinterpreters has not been finalized yet, and has had + little real-world testing. As such, it is not recommended to use this feature for + anything important yet. + +Running a function in a worker interpreter +------------------------------------------ + +Running functions in a worker interpreter makes sense when: + +* The code you want to run in parallel is CPU intensive +* The code is either pure Python code, or extension code that does not release the + Global Interpreter Lock (GIL) + +If the code you're trying to run only does blocking network I/O, or file I/O, then +you're better off using :doc:`worker thread ` instead. + +This is done by using :func:`.interpreter.run_sync`:: + + import time + + from anyio import run, to_interpreter + + from yourothermodule import cpu_intensive_function + + async def main(): + result = await to_interpreter.run_sync( + cpu_intensive_function, 'Hello, ', 'world!' + ) + print(result) + + run(main) + +Limitations +----------- + +* Subinterpreters are only supported on Python 3.13 or later +* Code in the ``__main__`` module cannot be run with this (as a consequence, this + applies to any functions defined in the REPL) +* The target functions cannot react to cancellation +* Unlike with threads, the code running in the subinterpreter cannot share mutable data + with other interpreters/threads (however, sharing _immutable_ data is fine) diff --git a/docs/subprocesses.rst b/docs/subprocesses.rst index e228fd4f..5245eae4 100644 --- a/docs/subprocesses.rst +++ b/docs/subprocesses.rst @@ -61,13 +61,16 @@ Running functions in worker processes ------------------------------------- When you need to run CPU intensive code, worker processes are better than threads -because current implementations of Python cannot run Python code in multiple threads at +because, with the exception of the experimental free-threaded builds of Python 3.13 and +later, current implementations of Python cannot run Python code in multiple threads at once. Exceptions to this rule are: #. Blocking I/O operations #. C extension code that explicitly releases the Global Interpreter Lock +#. :doc:`Subinterpreter workers ` + (experimental; available on Python 3.13 and later) If the code you wish to run does not belong in this category, it's best to use worker processes instead in order to take advantage of multiple CPU cores. diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 588eebbf..64fb0161 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -5,6 +5,8 @@ This library adheres to `Semantic Versioning 2.0 `_. **UNRELEASED** +- Added **experimental** support for running functions in subinterpreters on Python + 3.13 and later - Added support for the ``copy()``, ``copy_into()``, ``move()`` and ``move_into()`` methods in ``anyio.Path``, available in Python 3.14 - Changed ``TaskGroup`` on asyncio to always spawn tasks non-eagerly, even if using a diff --git a/src/anyio/__init__.py b/src/anyio/__init__.py index 0738e595..09831259 100644 --- a/src/anyio/__init__.py +++ b/src/anyio/__init__.py @@ -8,6 +8,7 @@ from ._core._eventloop import sleep_forever as sleep_forever from ._core._eventloop import sleep_until as sleep_until from ._core._exceptions import BrokenResourceError as BrokenResourceError +from ._core._exceptions import BrokenWorkerIntepreter as BrokenWorkerIntepreter from ._core._exceptions import BrokenWorkerProcess as BrokenWorkerProcess from ._core._exceptions import BusyResourceError as BusyResourceError from ._core._exceptions import ClosedResourceError as ClosedResourceError diff --git a/src/anyio/_core/_exceptions.py b/src/anyio/_core/_exceptions.py index 97ea3130..16b94482 100644 --- a/src/anyio/_core/_exceptions.py +++ b/src/anyio/_core/_exceptions.py @@ -2,6 +2,8 @@ import sys from collections.abc import Generator +from textwrap import dedent +from typing import Any if sys.version_info < (3, 11): from exceptiongroup import BaseExceptionGroup @@ -21,6 +23,41 @@ class BrokenWorkerProcess(Exception): """ +class BrokenWorkerIntepreter(Exception): + """ + Raised by :meth:`~anyio.to_interpreter.run_sync` if an unexpected exception is + raised in the subinterpreter. + """ + + def __init__(self, excinfo: Any): + # This was adapted from concurrent.futures.interpreter.ExecutionFailed + msg = excinfo.formatted + if not msg: + if excinfo.type and excinfo.msg: + msg = f"{excinfo.type.__name__}: {excinfo.msg}" + else: + msg = excinfo.type.__name__ or excinfo.msg + + super().__init__(msg) + self.excinfo = excinfo + + def __str__(self) -> str: + try: + formatted = self.excinfo.errdisplay + except Exception: + return super().__str__() + else: + return dedent( + f""" + {super().__str__()} + + Uncaught in the interpreter: + + {formatted} + """.strip() + ) + + class BusyResourceError(Exception): """ Raised when two tasks are trying to read from or write to the same resource diff --git a/src/anyio/to_interpreter.py b/src/anyio/to_interpreter.py new file mode 100644 index 00000000..bcde24d3 --- /dev/null +++ b/src/anyio/to_interpreter.py @@ -0,0 +1,218 @@ +from __future__ import annotations + +import atexit +import os +import pickle +import sys +from collections import deque +from collections.abc import Callable +from textwrap import dedent +from typing import Any, Final, TypeVar + +from . import current_time, to_thread +from ._core._exceptions import BrokenWorkerIntepreter +from ._core._synchronization import CapacityLimiter +from .lowlevel import RunVar + +if sys.version_info >= (3, 11): + from typing import TypeVarTuple, Unpack +else: + from typing_extensions import TypeVarTuple, Unpack + +UNBOUND: Final = 2 # I have no clue how this works, but it was used in the stdlib +FMT_UNPICKLED: Final = 0 +FMT_PICKLED: Final = 1 +DEFAULT_CPU_COUNT: Final = 8 # this is just an arbitrarily selected value +MAX_WORKER_IDLE_TIME = ( + 30 # seconds a subinterpreter can be idle before becoming eligible for pruning +) + +T_Retval = TypeVar("T_Retval") +PosArgsT = TypeVarTuple("PosArgsT") + +_idle_workers = RunVar[deque["Worker"]]("_available_workers") +_default_interpreter_limiter = RunVar[CapacityLimiter]("_default_interpreter_limiter") + + +class Worker: + _run_func = compile( + dedent(""" + import _interpqueues as queues + import _interpreters as interpreters + from pickle import loads, dumps, HIGHEST_PROTOCOL + + item = queues.get(queue_id)[0] + try: + func, args = loads(item) + retval = func(*args) + except BaseException as exc: + is_exception = True + retval = exc + else: + is_exception = False + + try: + queues.put(queue_id, (retval, is_exception), FMT_UNPICKLED, UNBOUND) + except interpreters.NotShareableError: + retval = dumps(retval, HIGHEST_PROTOCOL) + queues.put(queue_id, (retval, is_exception), FMT_PICKLED, UNBOUND) + """), + "", + "exec", + ) + + last_used: float = 0 + + _initialized: bool = False + _interpreter_id: int + _queue_id: int + + def initialize(self) -> None: + import _interpqueues as queues + import _interpreters as interpreters + + self._interpreter_id = interpreters.create() + self._queue_id = queues.create(2, FMT_UNPICKLED, UNBOUND) # type: ignore[call-arg] + self._initialized = True + interpreters.set___main___attrs( + self._interpreter_id, + { + "queue_id": self._queue_id, + "FMT_PICKLED": FMT_PICKLED, + "FMT_UNPICKLED": FMT_UNPICKLED, + "UNBOUND": UNBOUND, + }, + ) + + def destroy(self) -> None: + import _interpqueues as queues + import _interpreters as interpreters + + if self._initialized: + interpreters.destroy(self._interpreter_id) + queues.destroy(self._queue_id) + + def _call( + self, + func: Callable[..., T_Retval], + args: tuple[Any], + ) -> tuple[Any, bool]: + import _interpqueues as queues + import _interpreters as interpreters + + if not self._initialized: + self.initialize() + + payload = pickle.dumps((func, args), pickle.HIGHEST_PROTOCOL) + queues.put(self._queue_id, payload, FMT_PICKLED, UNBOUND) # type: ignore[call-arg] + + res: Any + is_exception: bool + if exc_info := interpreters.exec(self._interpreter_id, self._run_func): # type: ignore[func-returns-value,arg-type] + raise BrokenWorkerIntepreter(exc_info) + + (res, is_exception), fmt = queues.get(self._queue_id)[:2] + if fmt == FMT_PICKLED: + res = pickle.loads(res) + + return res, is_exception + + async def call( + self, + func: Callable[..., T_Retval], + args: tuple[Any], + limiter: CapacityLimiter, + ) -> T_Retval: + result, is_exception = await to_thread.run_sync( + self._call, + func, + args, + limiter=limiter, + ) + if is_exception: + raise result + + return result + + +def _stop_workers(workers: deque[Worker]) -> None: + for worker in workers: + worker.destroy() + + workers.clear() + + +async def run_sync( + func: Callable[[Unpack[PosArgsT]], T_Retval], + *args: Unpack[PosArgsT], + limiter: CapacityLimiter | None = None, +) -> T_Retval: + """ + Call the given function with the given arguments in a subinterpreter. + + If the ``cancellable`` option is enabled and the task waiting for its completion is + cancelled, the call will still run its course but its return value (or any raised + exception) will be ignored. + + .. warning:: This feature is **experimental**. The upstream interpreter API has not + yet been finalized or thoroughly tested, so don't rely on this for anything + mission critical. + + :param func: a callable + :param args: positional arguments for the callable + :param limiter: capacity limiter to use to limit the total amount of subinterpreters + running (if omitted, the default limiter is used) + :return: the result of the call + :raises BrokenWorkerIntepreter: if there's an internal error in a subinterpreter + + """ + if sys.version_info <= (3, 13): + raise RuntimeError("subinterpreters require at least Python 3.13") + + if limiter is None: + limiter = current_default_interpreter_limiter() + + try: + idle_workers = _idle_workers.get() + except LookupError: + idle_workers = deque() + _idle_workers.set(idle_workers) + atexit.register(_stop_workers, idle_workers) + + async with limiter: + try: + worker = idle_workers.pop() + except IndexError: + worker = Worker() + + try: + return await worker.call(func, args, limiter) + finally: + # Prune workers that have been idle for too long + now = current_time() + while idle_workers: + if now - idle_workers[0].last_used <= MAX_WORKER_IDLE_TIME: + break + + await to_thread.run_sync(idle_workers.popleft().destroy, limiter=limiter) + + worker.last_used = current_time() + idle_workers.append(worker) + + +def current_default_interpreter_limiter() -> CapacityLimiter: + """ + Return the capacity limiter that is used by default to limit the number of + concurrently running subinterpreters. + + Defaults to the number of CPU cores. + + :return: a capacity limiter object + + """ + try: + return _default_interpreter_limiter.get() + except LookupError: + limiter = CapacityLimiter(os.cpu_count() or DEFAULT_CPU_COUNT) + _default_interpreter_limiter.set(limiter) + return limiter diff --git a/tests/test_to_interpreter.py b/tests/test_to_interpreter.py new file mode 100644 index 00000000..79f3427f --- /dev/null +++ b/tests/test_to_interpreter.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import sys +from collections.abc import AsyncGenerator +from functools import partial + +import pytest +from pytest import fixture + +from anyio import to_interpreter + +pytestmark = [ + pytest.mark.anyio, + pytest.mark.skipif(sys.version_info < (3, 13), reason="requires Python 3.13+"), +] + + +@fixture(autouse=True) +async def destroy_workers() -> AsyncGenerator[None]: + yield + idle_workers = to_interpreter._idle_workers.get() + for worker in idle_workers: + worker.destroy() + + idle_workers.clear() + + +async def test_run_sync() -> None: + """ + Test that the function runs in a different interpreter, and the same interpreter in + both calls. + + """ + import _interpreters + + main_interpreter_id, _ = _interpreters.get_current() + interpreter_id, _ = await to_interpreter.run_sync(_interpreters.get_current) + interpreter_id_2, _ = await to_interpreter.run_sync(_interpreters.get_current) + assert interpreter_id == interpreter_id_2 + assert interpreter_id != main_interpreter_id + + +async def test_args_kwargs() -> None: + """Test that partial() can be used to pass keyword arguments.""" + result = await to_interpreter.run_sync(partial(sorted, reverse=True), ["a", "b"]) + assert result == ["b", "a"] + + +async def test_exception() -> None: + """Test that exceptions are delivered properly.""" + with pytest.raises(ValueError, match="invalid literal for int"): + assert await to_interpreter.run_sync(int, "a")