From 4c6a20b27c24f5568c9660690b0b4221a3e94782 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Wed, 25 Dec 2024 12:56:48 +0200 Subject: [PATCH 01/11] Added support for subinterpreter workers --- docs/api.rst | 8 ++ docs/index.rst | 1 + docs/subinterpreters.rst | 51 ++++++++ docs/subprocesses.rst | 5 +- docs/versionhistory.rst | 2 + src/anyio/_core/_exceptions.py | 37 ++++++ src/anyio/to_interpreter.py | 211 +++++++++++++++++++++++++++++++++ tests/test_to_interpreter.py | 39 ++++++ 8 files changed, 353 insertions(+), 1 deletion(-) create mode 100644 docs/subinterpreters.rst create mode 100644 src/anyio/to_interpreter.py create mode 100644 tests/test_to_interpreter.py diff --git a/docs/api.rst b/docs/api.rst index 8da25428..5d2a6634 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -50,6 +50,12 @@ Running code in worker threads .. autofunction:: anyio.to_thread.run_sync .. autofunction:: anyio.to_thread.current_default_thread_limiter +Running code in subinterpreters +------------------------------- + +.. autofunction:: anyio.to_interpreter.run_sync +.. autofunction:: anyio.to_interpreter.current_default_interpreter_limiter + Running code in worker processes -------------------------------- @@ -189,6 +195,8 @@ Exceptions ---------- .. autoexception:: anyio.BrokenResourceError +.. autoexception:: anyio.BrokenWorkerIntepreter +.. autoexception:: anyio.BrokenWorkerProcess .. autoexception:: anyio.BusyResourceError .. autoexception:: anyio.ClosedResourceError .. autoexception:: anyio.DelimiterNotFound diff --git a/docs/index.rst b/docs/index.rst index 1b78732d..c6d234ec 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -18,6 +18,7 @@ The manual networking threads subprocesses + subinterpreters fileio signals testing diff --git a/docs/subinterpreters.rst b/docs/subinterpreters.rst new file mode 100644 index 00000000..214f7939 --- /dev/null +++ b/docs/subinterpreters.rst @@ -0,0 +1,51 @@ +Working with subinterpreters +============================ + +.. py:currentmodule:: anyio + +Subinterpreters offer a middle ground between worker threads and worker processes. They +allow you to utilize multiple CPU cores to run Python code while avoiding the overhead +and complexities of spawning subprocesses. + +.. warning:: Subinterpreter support is considered **experimental**. The underlying + Python API for managing subinterpreters has not been finalized yet, and has had + little real-world testing. As such, it is not recommended to use this feature for + anything important yet. + +Running a function in a worker interpreter +------------------------------------------ + +Running functions in a worker interpreter makes sense when: + +* The code you want to run in parallel is CPU intensive +* The code is either pure Python code, or extension code that does not release the + Global Interpreter Lock (GIL) + +If the code you'try trying to run only does blocking network I/O, or file I/O, then +you're better off using :doc:`worker thread ` instead. + +This is done by using :func:`.interpreter.run_sync`:: + + import time + + from anyio import run, to_interpreter + + from yourothermodule import cpu_intensive_function + + async def main(): + result = await to_interpreter.run_sync( + cpu_intensive_function, 'Hello, ', 'world!' + ) + print(result) + + run(main) + +Limitations +----------- + +* Subinterpreters are only supported on Python 3.13 or later +* Code in the ``__main__`` module cannot be run with this (as a consequence, this + applies to any functions defined in the REPL) +* The target functions cannot react to cancellation +* Unlike with threads, the code running in the subinterpreter cannot share mutable data + with other interpreters/threads (however, sharing _immutable_ data is fine) diff --git a/docs/subprocesses.rst b/docs/subprocesses.rst index e228fd4f..5245eae4 100644 --- a/docs/subprocesses.rst +++ b/docs/subprocesses.rst @@ -61,13 +61,16 @@ Running functions in worker processes ------------------------------------- When you need to run CPU intensive code, worker processes are better than threads -because current implementations of Python cannot run Python code in multiple threads at +because, with the exception of the experimental free-threaded builds of Python 3.13 and +later, current implementations of Python cannot run Python code in multiple threads at once. Exceptions to this rule are: #. Blocking I/O operations #. C extension code that explicitly releases the Global Interpreter Lock +#. :doc:`Subinterpreter workers ` + (experimental; available on Python 3.13 and later) If the code you wish to run does not belong in this category, it's best to use worker processes instead in order to take advantage of multiple CPU cores. diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 4901da4c..da5819cb 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -5,6 +5,8 @@ This library adheres to `Semantic Versioning 2.0 `_. **UNRELEASED** +- Added **experimental** support for running functions in subinterpreters on Python + 3.13 and later - Configure ``SO_RCVBUF``, ``SO_SNDBUF`` and ``TCP_NODELAY`` on the selector thread waker socket pair. This should improve the performance of ``wait_readable()`` and ``wait_writable()`` when using the ``ProactorEventLoop`` diff --git a/src/anyio/_core/_exceptions.py b/src/anyio/_core/_exceptions.py index 97ea3130..f844d932 100644 --- a/src/anyio/_core/_exceptions.py +++ b/src/anyio/_core/_exceptions.py @@ -2,6 +2,8 @@ import sys from collections.abc import Generator +from textwrap import dedent +from typing import Any if sys.version_info < (3, 11): from exceptiongroup import BaseExceptionGroup @@ -21,6 +23,41 @@ class BrokenWorkerProcess(Exception): """ +class BrokenWorkerIntepreter(Exception): + """ + Raised by :meth:`~anyio.to_interpreter.run_sync` if an unexpected exception is + raised in the subinterpreter. + """ + + def __init__(self, excinfo: Any): + # This was adapted from concurrent.futures.interpreter.ExecutionFailed + msg = excinfo.formatted + if not msg: + if excinfo.type and excinfo.msg: + msg = f"{excinfo.type.__name__}: {excinfo.msg}" + else: + msg = excinfo.type.__name__ or excinfo.msg + + super().__init__(msg) + self.excinfo = excinfo + + def __str__(self) -> str: + try: + formatted = self.excinfo.errdisplay + except Exception: + return super().__str__() + else: + return dedent( + f""" + {super().__str__()} + + Uncaught in the interpreter: + + {formatted} + """.strip() + ) + + class BusyResourceError(Exception): """ Raised when two tasks are trying to read from or write to the same resource diff --git a/src/anyio/to_interpreter.py b/src/anyio/to_interpreter.py new file mode 100644 index 00000000..68b1892e --- /dev/null +++ b/src/anyio/to_interpreter.py @@ -0,0 +1,211 @@ +from __future__ import annotations + +import atexit +import os +import pickle +import sys +from collections import deque +from collections.abc import Callable, Mapping +from textwrap import dedent +from typing import Any, TypeVar + +from . import to_thread +from ._core._exceptions import BrokenWorkerIntepreter +from ._core._synchronization import CapacityLimiter +from .lowlevel import RunVar + +if sys.version_info >= (3, 11): + from typing import TypeVarTuple, Unpack +else: + from typing_extensions import TypeVarTuple, Unpack + +UNBOUND = 2 # I have no clue how this works, but it was used in the stdlib +FMT_UNPICKLED = 0 +FMT_PICKLED = 1 +DEFAULT_CPU_COUNT = 8 # this is just an arbitrarily selected value + +T_Retval = TypeVar("T_Retval") +PosArgsT = TypeVarTuple("PosArgsT") + +_workers = RunVar[set["Worker"]]("_workers") +_idle_workers = RunVar[deque["Worker"]]("_available_workers") +_default_interpreter_limiter = RunVar[CapacityLimiter]("_default_interpreter_limiter") + + +class Worker: + _run_func = compile( + dedent(""" + import _interpqueues as queues + import _interpreters as interpreters + from pickle import loads, dumps, HIGHEST_PROTOCOL + + item = queues.get(queue_id)[0] + try: + func, args, kwargs = loads(item) + retval = func(*args, **kwargs) + except Exception as exc: + is_exception = True + retval = exc + else: + is_exception = False + + try: + queues.put(queue_id, (retval, is_exception), FMT_UNPICKLED, UNBOUND) + except interpreters.NotShareableError: + retval = dumps(retval, HIGHEST_PROTOCOL) + queues.put(queue_id, (retval, is_exception), FMT_PICKLED, UNBOUND) + """), + "", + "exec", + ) + + _interpreter_id: int + _queue_id: int + + async def initialize(self) -> None: + import _interpqueues as queues + import _interpreters as interpreters + + self._interpreter_id = interpreters.create() + self._queue_id = queues.create(2, FMT_UNPICKLED, UNBOUND) # type: ignore[call-arg] + interpreters.set___main___attrs( + self._interpreter_id, + { + "queue_id": self._queue_id, + "FMT_PICKLED": FMT_PICKLED, + "FMT_UNPICKLED": FMT_UNPICKLED, + "UNBOUND": UNBOUND, + }, + ) + + def destroy(self) -> None: + import _interpqueues as queues + import _interpreters as interpreters + + interpreters.destroy(self._interpreter_id) + queues.destroy(self._queue_id) + + def _call( + self, func: Callable[..., T_Retval], args: tuple[Any], kwargs: dict[str, Any] + ) -> tuple[Any, bool]: + import _interpqueues as queues + import _interpreters as interpreters + + payload = pickle.dumps((func, args, kwargs), pickle.HIGHEST_PROTOCOL) + queues.put(self._queue_id, payload, FMT_PICKLED, UNBOUND) # type: ignore[call-arg] + + res: Any + is_exception: bool + if exc_info := interpreters.exec(self._interpreter_id, self._run_func): # type: ignore[func-returns-value,arg-type] + raise BrokenWorkerIntepreter(exc_info) + + (res, is_exception), fmt = queues.get(self._queue_id)[:2] + if fmt == FMT_PICKLED: + res = pickle.loads(res) + + return res, is_exception + + async def call( + self, + func: Callable[..., T_Retval], + args: tuple[Any], + kwargs: dict[str, Any], + abandon_on_cancel: bool, + limiter: CapacityLimiter, + ) -> T_Retval: + result, is_exception = await to_thread.run_sync( + self._call, + func, + args, + kwargs, + abandon_on_cancel=abandon_on_cancel, + limiter=limiter, + ) + if is_exception: + raise result + + return result + + +def _stop_workers(workers: set[Worker]) -> None: + for worker in workers: + worker.destroy() + + workers.clear() + + +async def run_sync( + func: Callable[[Unpack[PosArgsT]], T_Retval], + *args: Unpack[PosArgsT], + kwargs: Mapping[str, Any] | None = None, + abandon_on_cancel: bool = False, + limiter: CapacityLimiter | None = None, +) -> T_Retval: + """ + Call the given function with the given arguments in a subinterpreter. + + If the ``cancellable`` option is enabled and the task waiting for its completion is + cancelled, the call will still run its course but its return value (or any raised + exception) will be ignored. + + .. warning:: This feature is **experimental**. The upstream interpreter API has not + yet been finalized or thoroughly tested, so don't rely on this for anything + mission critical. + + :param func: a callable + :param args: positional arguments for the callable + :param kwargs: keyword arguments for the callable + :param abandon_on_cancel: ``True`` to abandon the call (leaving it to run + unchecked on its own) if the host task is cancelled, ``False`` to ignore + cancellations in the host task until the operation has completed in the + subinterpreter + :param limiter: capacity limiter to use to limit the total amount of subinterpreters + running (if omitted, the default limiter is used) + :return: the result of the call + :raises BrokenWorkerIntepreter: if there's an internal error in a subinterpreter + + """ + if sys.version_info <= (3, 13): + raise RuntimeError("subinterpreters require at least Python 3.13") + + if limiter is None: + limiter = current_default_interpreter_limiter() + + try: + idle_workers = _idle_workers.get() + except LookupError: + idle_workers = deque() + _idle_workers.set(idle_workers) + workers = set() + _workers.set(workers) + atexit.register(_stop_workers, workers) + + async with limiter: + try: + worker = idle_workers.pop() + except IndexError: + worker = Worker() + await worker.initialize() + + try: + return await worker.call(func, args, kwargs or {}, abandon_on_cancel, limiter) + finally: + idle_workers.append(worker) + + +def current_default_interpreter_limiter() -> CapacityLimiter: + """ + Return the capacity limiter that is used by default to limit the number of + concurrently running subinterpreters. + + Defaults to the number of CPU cores. + + :return: a capacity limiter object + + """ + try: + return _default_interpreter_limiter.get() + except LookupError: + limiter = CapacityLimiter(os.cpu_count() or DEFAULT_CPU_COUNT) + _default_interpreter_limiter.set(limiter) + return limiter diff --git a/tests/test_to_interpreter.py b/tests/test_to_interpreter.py new file mode 100644 index 00000000..dc940f32 --- /dev/null +++ b/tests/test_to_interpreter.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +import sys + +import pytest + +from anyio import to_interpreter + +pytestmark = [ + pytest.mark.anyio, + pytest.mark.skipif(sys.version_info < (3, 13), reason="requires Python 3.13+"), +] + + +async def test_run_sync() -> None: + """ + Test that the function runs in a different interpreter, and the same interpreter in + both calls. + + """ + import _interpreters + + main_interpreter_id, _ = _interpreters.get_current() + interpreter_id, _ = await to_interpreter.run_sync(_interpreters.get_current) + interpreter_id_2, _ = await to_interpreter.run_sync(_interpreters.get_current) + assert interpreter_id == interpreter_id_2 + assert interpreter_id != main_interpreter_id + + +async def test_args_kwargs() -> None: + """Test that partial() can be used to pass keyword arguments.""" + result = await to_interpreter.run_sync(sorted, ["a", "b"], kwargs={"reverse": True}) + assert result == ["b", "a"] + + +async def test_exception() -> None: + """Test that exceptions are delivered properly.""" + with pytest.raises(ValueError, match="invalid literal for int"): + assert await to_interpreter.run_sync(int, "a") From 59ca9793561333ce0495bb6ab213f9a9b85ba844 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Wed, 1 Jan 2025 01:31:21 +0200 Subject: [PATCH 02/11] Added missing re-export of BrokenWorkerIntepreter --- src/anyio/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/anyio/__init__.py b/src/anyio/__init__.py index 0738e595..09831259 100644 --- a/src/anyio/__init__.py +++ b/src/anyio/__init__.py @@ -8,6 +8,7 @@ from ._core._eventloop import sleep_forever as sleep_forever from ._core._eventloop import sleep_until as sleep_until from ._core._exceptions import BrokenResourceError as BrokenResourceError +from ._core._exceptions import BrokenWorkerIntepreter as BrokenWorkerIntepreter from ._core._exceptions import BrokenWorkerProcess as BrokenWorkerProcess from ._core._exceptions import BusyResourceError as BusyResourceError from ._core._exceptions import ClosedResourceError as ClosedResourceError From 27dd8f587ff9385404b072d03d73003727f5cf77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Wed, 1 Jan 2025 18:14:54 +0200 Subject: [PATCH 03/11] Fixed test failures with uvloop --- src/anyio/to_interpreter.py | 7 ++----- tests/test_to_interpreter.py | 12 ++++++++++++ 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/anyio/to_interpreter.py b/src/anyio/to_interpreter.py index 68b1892e..511299d8 100644 --- a/src/anyio/to_interpreter.py +++ b/src/anyio/to_interpreter.py @@ -27,7 +27,6 @@ T_Retval = TypeVar("T_Retval") PosArgsT = TypeVarTuple("PosArgsT") -_workers = RunVar[set["Worker"]]("_workers") _idle_workers = RunVar[deque["Worker"]]("_available_workers") _default_interpreter_limiter = RunVar[CapacityLimiter]("_default_interpreter_limiter") @@ -127,7 +126,7 @@ async def call( return result -def _stop_workers(workers: set[Worker]) -> None: +def _stop_workers(workers: deque[Worker]) -> None: for worker in workers: worker.destroy() @@ -176,9 +175,7 @@ async def run_sync( except LookupError: idle_workers = deque() _idle_workers.set(idle_workers) - workers = set() - _workers.set(workers) - atexit.register(_stop_workers, workers) + atexit.register(_stop_workers, idle_workers) async with limiter: try: diff --git a/tests/test_to_interpreter.py b/tests/test_to_interpreter.py index dc940f32..b73440e9 100644 --- a/tests/test_to_interpreter.py +++ b/tests/test_to_interpreter.py @@ -1,8 +1,10 @@ from __future__ import annotations import sys +from collections.abc import AsyncGenerator import pytest +from pytest import fixture from anyio import to_interpreter @@ -12,6 +14,16 @@ ] +@fixture(autouse=True) +async def destroy_workers() -> AsyncGenerator[None]: + yield + idle_workers = to_interpreter._idle_workers.get() + for worker in idle_workers: + worker.destroy() + + idle_workers.clear() + + async def test_run_sync() -> None: """ Test that the function runs in a different interpreter, and the same interpreter in From f96092b60afb6b4a3920a4737f872232ab7d9240 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Thu, 2 Jan 2025 15:23:01 +0200 Subject: [PATCH 04/11] Initialize subinterpreters in worker threads, not in the event loop --- src/anyio/to_interpreter.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/anyio/to_interpreter.py b/src/anyio/to_interpreter.py index 511299d8..93239c12 100644 --- a/src/anyio/to_interpreter.py +++ b/src/anyio/to_interpreter.py @@ -58,15 +58,17 @@ class Worker: "exec", ) + _initialized: bool = False _interpreter_id: int _queue_id: int - async def initialize(self) -> None: + def initialize(self) -> None: import _interpqueues as queues import _interpreters as interpreters self._interpreter_id = interpreters.create() self._queue_id = queues.create(2, FMT_UNPICKLED, UNBOUND) # type: ignore[call-arg] + self._initialized = True interpreters.set___main___attrs( self._interpreter_id, { @@ -81,8 +83,9 @@ def destroy(self) -> None: import _interpqueues as queues import _interpreters as interpreters - interpreters.destroy(self._interpreter_id) - queues.destroy(self._queue_id) + if self._initialized: + interpreters.destroy(self._interpreter_id) + queues.destroy(self._queue_id) def _call( self, func: Callable[..., T_Retval], args: tuple[Any], kwargs: dict[str, Any] @@ -90,6 +93,9 @@ def _call( import _interpqueues as queues import _interpreters as interpreters + if not self._initialized: + self.initialize() + payload = pickle.dumps((func, args, kwargs), pickle.HIGHEST_PROTOCOL) queues.put(self._queue_id, payload, FMT_PICKLED, UNBOUND) # type: ignore[call-arg] @@ -182,7 +188,6 @@ async def run_sync( worker = idle_workers.pop() except IndexError: worker = Worker() - await worker.initialize() try: return await worker.call(func, args, kwargs or {}, abandon_on_cancel, limiter) From e3a8f5390e41ac69ef6a574256ff03edaabac898 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Thu, 2 Jan 2025 15:23:51 +0200 Subject: [PATCH 05/11] Catch base exceptions in the worker too --- src/anyio/to_interpreter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/anyio/to_interpreter.py b/src/anyio/to_interpreter.py index 93239c12..f8afd728 100644 --- a/src/anyio/to_interpreter.py +++ b/src/anyio/to_interpreter.py @@ -42,7 +42,7 @@ class Worker: try: func, args, kwargs = loads(item) retval = func(*args, **kwargs) - except Exception as exc: + except BaseException as exc: is_exception = True retval = exc else: From 0b08fb65f14b6ce5c6c90bfd50b5984a9f6a9aa7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Thu, 2 Jan 2025 15:25:30 +0200 Subject: [PATCH 06/11] Tweaked the formatting in BrokenWorkerIntepreter.__str__() --- src/anyio/_core/_exceptions.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/anyio/_core/_exceptions.py b/src/anyio/_core/_exceptions.py index f844d932..16b94482 100644 --- a/src/anyio/_core/_exceptions.py +++ b/src/anyio/_core/_exceptions.py @@ -49,12 +49,12 @@ def __str__(self) -> str: else: return dedent( f""" - {super().__str__()} + {super().__str__()} - Uncaught in the interpreter: + Uncaught in the interpreter: - {formatted} - """.strip() + {formatted} + """.strip() ) From 1dc2499b482ce010735e7bff994866c097016fd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Thu, 2 Jan 2025 21:02:46 +0200 Subject: [PATCH 07/11] Updated type annotations to accommodate upcoming Typeshed changes The related PR: https://github.com/python/typeshed/pull/13355 --- src/anyio/to_interpreter.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/anyio/to_interpreter.py b/src/anyio/to_interpreter.py index f8afd728..3a4a616a 100644 --- a/src/anyio/to_interpreter.py +++ b/src/anyio/to_interpreter.py @@ -7,7 +7,7 @@ from collections import deque from collections.abc import Callable, Mapping from textwrap import dedent -from typing import Any, TypeVar +from typing import Any, Final, TypeVar from . import to_thread from ._core._exceptions import BrokenWorkerIntepreter @@ -19,10 +19,10 @@ else: from typing_extensions import TypeVarTuple, Unpack -UNBOUND = 2 # I have no clue how this works, but it was used in the stdlib -FMT_UNPICKLED = 0 -FMT_PICKLED = 1 -DEFAULT_CPU_COUNT = 8 # this is just an arbitrarily selected value +UNBOUND: Final = 2 # I have no clue how this works, but it was used in the stdlib +FMT_UNPICKLED: Final = 0 +FMT_PICKLED: Final = 1 +DEFAULT_CPU_COUNT: Final = 8 # this is just an arbitrarily selected value T_Retval = TypeVar("T_Retval") PosArgsT = TypeVarTuple("PosArgsT") From 2f4f261ca5f99417a02a98d2816195e0c5b1c6ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sat, 4 Jan 2025 00:20:06 +0200 Subject: [PATCH 08/11] Removed the abandon_on_cancel option for now --- src/anyio/to_interpreter.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/anyio/to_interpreter.py b/src/anyio/to_interpreter.py index 3a4a616a..3d48ae0c 100644 --- a/src/anyio/to_interpreter.py +++ b/src/anyio/to_interpreter.py @@ -115,7 +115,6 @@ async def call( func: Callable[..., T_Retval], args: tuple[Any], kwargs: dict[str, Any], - abandon_on_cancel: bool, limiter: CapacityLimiter, ) -> T_Retval: result, is_exception = await to_thread.run_sync( @@ -123,7 +122,6 @@ async def call( func, args, kwargs, - abandon_on_cancel=abandon_on_cancel, limiter=limiter, ) if is_exception: @@ -143,7 +141,6 @@ async def run_sync( func: Callable[[Unpack[PosArgsT]], T_Retval], *args: Unpack[PosArgsT], kwargs: Mapping[str, Any] | None = None, - abandon_on_cancel: bool = False, limiter: CapacityLimiter | None = None, ) -> T_Retval: """ @@ -160,10 +157,6 @@ async def run_sync( :param func: a callable :param args: positional arguments for the callable :param kwargs: keyword arguments for the callable - :param abandon_on_cancel: ``True`` to abandon the call (leaving it to run - unchecked on its own) if the host task is cancelled, ``False`` to ignore - cancellations in the host task until the operation has completed in the - subinterpreter :param limiter: capacity limiter to use to limit the total amount of subinterpreters running (if omitted, the default limiter is used) :return: the result of the call @@ -190,7 +183,7 @@ async def run_sync( worker = Worker() try: - return await worker.call(func, args, kwargs or {}, abandon_on_cancel, limiter) + return await worker.call(func, args, kwargs or {}, limiter) finally: idle_workers.append(worker) From ea0d7a7d43f285426b9a26f38b7b2deaa1ce178d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sat, 4 Jan 2025 00:25:51 +0200 Subject: [PATCH 09/11] Update docs/subinterpreters.rst Co-authored-by: Jordan Speicher --- docs/subinterpreters.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/subinterpreters.rst b/docs/subinterpreters.rst index 214f7939..41bb121a 100644 --- a/docs/subinterpreters.rst +++ b/docs/subinterpreters.rst @@ -21,7 +21,7 @@ Running functions in a worker interpreter makes sense when: * The code is either pure Python code, or extension code that does not release the Global Interpreter Lock (GIL) -If the code you'try trying to run only does blocking network I/O, or file I/O, then +If the code you're trying to run only does blocking network I/O, or file I/O, then you're better off using :doc:`worker thread ` instead. This is done by using :func:`.interpreter.run_sync`:: From 667b157e65a5309faef1d60975055c0466f0c487 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sat, 4 Jan 2025 19:49:46 +0200 Subject: [PATCH 10/11] Prune idle workers at exit --- src/anyio/to_interpreter.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/anyio/to_interpreter.py b/src/anyio/to_interpreter.py index 3d48ae0c..a88051d0 100644 --- a/src/anyio/to_interpreter.py +++ b/src/anyio/to_interpreter.py @@ -9,7 +9,7 @@ from textwrap import dedent from typing import Any, Final, TypeVar -from . import to_thread +from . import current_time, to_thread from ._core._exceptions import BrokenWorkerIntepreter from ._core._synchronization import CapacityLimiter from .lowlevel import RunVar @@ -23,6 +23,9 @@ FMT_UNPICKLED: Final = 0 FMT_PICKLED: Final = 1 DEFAULT_CPU_COUNT: Final = 8 # this is just an arbitrarily selected value +MAX_WORKER_IDLE_TIME = ( + 30 # seconds a subinterpreter can be idle before becoming eligible for pruning +) T_Retval = TypeVar("T_Retval") PosArgsT = TypeVarTuple("PosArgsT") @@ -58,6 +61,8 @@ class Worker: "exec", ) + last_used: float = 0 + _initialized: bool = False _interpreter_id: int _queue_id: int @@ -185,6 +190,15 @@ async def run_sync( try: return await worker.call(func, args, kwargs or {}, limiter) finally: + # Prune workers that have been idle for too long + now = current_time() + while idle_workers: + if now - idle_workers[0].last_used <= MAX_WORKER_IDLE_TIME: + break + + await to_thread.run_sync(idle_workers.popleft().destroy, limiter=limiter) + + worker.last_used = current_time() idle_workers.append(worker) From 65ba5e1100057109af4659e7e1df4db77a1c7861 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 5 Jan 2025 01:11:46 +0200 Subject: [PATCH 11/11] Removed the kwargs parameter We can add it back later, just needs to be consistent across the worker thread/interpreter/process APIs --- src/anyio/to_interpreter.py | 18 ++++++++---------- tests/test_to_interpreter.py | 3 ++- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/anyio/to_interpreter.py b/src/anyio/to_interpreter.py index a88051d0..bcde24d3 100644 --- a/src/anyio/to_interpreter.py +++ b/src/anyio/to_interpreter.py @@ -5,7 +5,7 @@ import pickle import sys from collections import deque -from collections.abc import Callable, Mapping +from collections.abc import Callable from textwrap import dedent from typing import Any, Final, TypeVar @@ -43,8 +43,8 @@ class Worker: item = queues.get(queue_id)[0] try: - func, args, kwargs = loads(item) - retval = func(*args, **kwargs) + func, args = loads(item) + retval = func(*args) except BaseException as exc: is_exception = True retval = exc @@ -93,7 +93,9 @@ def destroy(self) -> None: queues.destroy(self._queue_id) def _call( - self, func: Callable[..., T_Retval], args: tuple[Any], kwargs: dict[str, Any] + self, + func: Callable[..., T_Retval], + args: tuple[Any], ) -> tuple[Any, bool]: import _interpqueues as queues import _interpreters as interpreters @@ -101,7 +103,7 @@ def _call( if not self._initialized: self.initialize() - payload = pickle.dumps((func, args, kwargs), pickle.HIGHEST_PROTOCOL) + payload = pickle.dumps((func, args), pickle.HIGHEST_PROTOCOL) queues.put(self._queue_id, payload, FMT_PICKLED, UNBOUND) # type: ignore[call-arg] res: Any @@ -119,14 +121,12 @@ async def call( self, func: Callable[..., T_Retval], args: tuple[Any], - kwargs: dict[str, Any], limiter: CapacityLimiter, ) -> T_Retval: result, is_exception = await to_thread.run_sync( self._call, func, args, - kwargs, limiter=limiter, ) if is_exception: @@ -145,7 +145,6 @@ def _stop_workers(workers: deque[Worker]) -> None: async def run_sync( func: Callable[[Unpack[PosArgsT]], T_Retval], *args: Unpack[PosArgsT], - kwargs: Mapping[str, Any] | None = None, limiter: CapacityLimiter | None = None, ) -> T_Retval: """ @@ -161,7 +160,6 @@ async def run_sync( :param func: a callable :param args: positional arguments for the callable - :param kwargs: keyword arguments for the callable :param limiter: capacity limiter to use to limit the total amount of subinterpreters running (if omitted, the default limiter is used) :return: the result of the call @@ -188,7 +186,7 @@ async def run_sync( worker = Worker() try: - return await worker.call(func, args, kwargs or {}, limiter) + return await worker.call(func, args, limiter) finally: # Prune workers that have been idle for too long now = current_time() diff --git a/tests/test_to_interpreter.py b/tests/test_to_interpreter.py index b73440e9..79f3427f 100644 --- a/tests/test_to_interpreter.py +++ b/tests/test_to_interpreter.py @@ -2,6 +2,7 @@ import sys from collections.abc import AsyncGenerator +from functools import partial import pytest from pytest import fixture @@ -41,7 +42,7 @@ async def test_run_sync() -> None: async def test_args_kwargs() -> None: """Test that partial() can be used to pass keyword arguments.""" - result = await to_interpreter.run_sync(sorted, ["a", "b"], kwargs={"reverse": True}) + result = await to_interpreter.run_sync(partial(sorted, reverse=True), ["a", "b"]) assert result == ["b", "a"]