From c13c60e8b7bf6b396b4726c86e1d0081603cef6f Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 10 Jun 2019 18:35:38 -0700 Subject: [PATCH] Rename run_sync_in_worker_thread -> run_sync_in_thread See gh-810 --- docs/source/history.rst | 4 +- docs/source/reference-core.rst | 18 ++++----- .../blocking-trio-portal-example.py | 2 +- docs/source/reference-hazmat.rst | 2 +- docs/source/reference-io.rst | 2 +- newsfragments/810.removal.rst | 5 +++ notes-to-self/blocking-read-hack.py | 2 +- notes-to-self/thread-dispatch-bench.py | 2 +- trio/__init__.py | 16 +++++++- trio/_core/_entry_queue.py | 2 +- trio/_core/_traps.py | 2 +- trio/_core/tests/test_run.py | 4 +- trio/_file_io.py | 10 ++--- trio/_path.py | 10 ++--- trio/_socket.py | 10 ++--- trio/_subprocess_platform/waitid.py | 4 +- trio/_sync.py | 6 +-- trio/_threads.py | 38 +++++++++---------- trio/_wait_for_object.py | 2 +- trio/tests/test_signals.py | 2 +- trio/tests/test_ssl.py | 2 +- trio/tests/test_threads.py | 28 +++++++------- trio/tests/test_util.py | 4 +- trio/tests/test_wait_for_object.py | 8 ++-- 24 files changed, 102 insertions(+), 83 deletions(-) create mode 100644 newsfragments/810.removal.rst diff --git a/docs/source/history.rst b/docs/source/history.rst index f860825b4c..00e48139c7 100644 --- a/docs/source/history.rst +++ b/docs/source/history.rst @@ -560,7 +560,7 @@ Upcoming breaking changes with warnings (i.e., stuff that in 0.2.0 functions that take and run a synchronous function. As part of this: * ``run_in_worker_thread`` is becoming - :func:`run_sync_in_worker_thread` + ``run_sync_in_worker_thread`` * We took the opportunity to refactor ``run_in_trio_thread`` and ``await_in_trio_thread`` into the new class @@ -655,7 +655,7 @@ CPython, or PyPy3 5.9+. Other changes ~~~~~~~~~~~~~ -* :func:`run_sync_in_worker_thread` now has a :ref:`robust mechanism +* :func:`run_sync_in_thread` now has a :ref:`robust mechanism for applying capacity limits to the number of concurrent threads ` (`#10 `__, `#57 diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index 1ec6d0bfd4..95a24781ab 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1566,7 +1566,7 @@ In acknowledgment of this reality, Trio provides two useful utilities for working with real, operating-system level, :mod:`threading`\-module-style threads. First, if you're in Trio but need to push some blocking I/O into a thread, there's -:func:`run_sync_in_worker_thread`. And if you're in a thread and need +:func:`run_sync_in_thread`. And if you're in a thread and need to communicate back with Trio, you can use a :class:`BlockingTrioPortal`. @@ -1589,7 +1589,7 @@ are spawned and the system gets overloaded and crashes. Instead, the N threads start executing the first N jobs, while the other (100,000 - N) jobs sit in a queue and wait their turn. Which is generally what you want, and this is how -:func:`trio.run_sync_in_worker_thread` works by default. +:func:`trio.run_sync_in_thread` works by default. The downside of this kind of thread pool is that sometimes, you need more sophisticated logic for controlling how many threads are run at @@ -1636,10 +1636,10 @@ re-using threads, but has no admission control policy: if you give it responsible for providing the policy to make sure that this doesn't happen – but since it *only* has to worry about policy, it can be much simpler. In fact, all there is to it is the ``limiter=`` argument -passed to :func:`run_sync_in_worker_thread`. This defaults to a global +passed to :func:`run_sync_in_thread`. This defaults to a global :class:`CapacityLimiter` object, which gives us the classic fixed-size thread pool behavior. (See -:func:`current_default_worker_thread_limiter`.) But if you want to use +:func:`current_default_thread_limiter`.) But if you want to use "separate pools" for type A jobs and type B jobs, then it's just a matter of creating two separate :class:`CapacityLimiter` objects and passing them in when running these jobs. Or here's an example of @@ -1679,7 +1679,7 @@ time:: return USER_LIMITERS[user_id] except KeyError: per_user_limiter = trio.CapacityLimiter(MAX_THREADS_PER_USER) - global_limiter = trio.current_default_worker_thread_limiter() + global_limiter = trio.current_default_thread_limiter() # IMPORTANT: acquire the per_user_limiter before the global_limiter. # If we get 100 jobs for a user at the same time, we want # to only allow 3 of them at a time to even compete for the @@ -1690,17 +1690,17 @@ time:: async def run_in_worker_thread_for_user(user_id, async_fn, *args, **kwargs): - # *args belong to async_fn; **kwargs belong to run_sync_in_worker_thread + # *args belong to async_fn; **kwargs belong to run_sync_in_thread kwargs["limiter"] = get_user_limiter(user_id) - return await trio.run_sync_in_worker_thread(asycn_fn, *args, **kwargs) + return await trio.run_sync_in_thread(asycn_fn, *args, **kwargs) Putting blocking I/O into worker threads ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. autofunction:: run_sync_in_worker_thread +.. autofunction:: run_sync_in_thread -.. autofunction:: current_default_worker_thread_limiter +.. autofunction:: current_default_thread_limiter Getting back into the Trio thread from another thread diff --git a/docs/source/reference-core/blocking-trio-portal-example.py b/docs/source/reference-core/blocking-trio-portal-example.py index a84fb9f9e7..99200b932f 100644 --- a/docs/source/reference-core/blocking-trio-portal-example.py +++ b/docs/source/reference-core/blocking-trio-portal-example.py @@ -22,7 +22,7 @@ async def main(): # In a background thread, run: # thread_fn(portal, receive_from_trio, send_to_trio) nursery.start_soon( - trio.run_sync_in_worker_thread, + trio.run_sync_in_thread, thread_fn, portal, receive_from_trio, send_to_trio ) diff --git a/docs/source/reference-hazmat.rst b/docs/source/reference-hazmat.rst index b14c4cfdb6..99d0eed80f 100644 --- a/docs/source/reference-hazmat.rst +++ b/docs/source/reference-hazmat.rst @@ -442,7 +442,7 @@ This logic is a bit convoluted, but accomplishes all of the following: loop outside of the ``except BlockingIOError:`` block. These functions can also be useful in other situations. For example, -when :func:`trio.run_sync_in_worker_thread` schedules some work to run +when :func:`trio.run_sync_in_thread` schedules some work to run in a worker thread, it blocks until the work is finished (so it's a schedule point), but by default it doesn't allow cancellation. So to make sure that the call always acts as a checkpoint, it calls diff --git a/docs/source/reference-io.rst b/docs/source/reference-io.rst index e395f2c6fb..4979b5b127 100644 --- a/docs/source/reference-io.rst +++ b/docs/source/reference-io.rst @@ -479,7 +479,7 @@ To understand why, you need to know two things. First, right now no mainstream operating system offers a generic, reliable, native API for async file or filesystem operations, so we have to fake it by using threads (specifically, -:func:`run_sync_in_worker_thread`). This is cheap but isn't free: on a +:func:`run_sync_in_thread`). This is cheap but isn't free: on a typical PC, dispatching to a worker thread adds something like ~100 µs of overhead to each operation. ("µs" is pronounced "microseconds", and there are 1,000,000 µs in a second. Note that all the numbers here are diff --git a/newsfragments/810.removal.rst b/newsfragments/810.removal.rst new file mode 100644 index 0000000000..9ca960bcbc --- /dev/null +++ b/newsfragments/810.removal.rst @@ -0,0 +1,5 @@ +``run_sync_in_worker_thread`` was too much of a mouthful – now it's +just called `run_sync_in_thread` (though the old name still works with +a deprecation warning, for now). Similarly, +``current_default_worker_thread_limiter`` is becoming +`current_default_thread_limiter`. diff --git a/notes-to-self/blocking-read-hack.py b/notes-to-self/blocking-read-hack.py index 8f570c8bfd..83dbf379a9 100644 --- a/notes-to-self/blocking-read-hack.py +++ b/notes-to-self/blocking-read-hack.py @@ -29,7 +29,7 @@ async def kill_it_after_timeout(new_fd): async with trio.open_nursery() as nursery: nursery.start_soon(kill_it_after_timeout, new_fd) try: - data = await trio.run_sync_in_worker_thread(os.read, new_fd, count) + data = await trio.run_sync_in_thread(os.read, new_fd, count) except OSError as exc: if cancel_requested and exc.errno == errno.ENOTCONN: # Call was successfully cancelled. In a real version we'd diff --git a/notes-to-self/thread-dispatch-bench.py b/notes-to-self/thread-dispatch-bench.py index f85d256b4f..02c8c0750f 100644 --- a/notes-to-self/thread-dispatch-bench.py +++ b/notes-to-self/thread-dispatch-bench.py @@ -2,7 +2,7 @@ # minimal a fashion as possible. # # This is useful to get a sense of the *lower-bound* cost of -# run_sync_in_worker_thread +# run_sync_in_thread import threading from queue import Queue diff --git a/trio/__init__.py b/trio/__init__.py index 652253e2fe..4719bd2b0c 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -32,7 +32,7 @@ ) from ._threads import ( - run_sync_in_worker_thread, current_default_worker_thread_limiter, + run_sync_in_thread, current_default_thread_limiter, BlockingTrioPortal ) @@ -100,6 +100,20 @@ "library 'subprocess' module" ), ), + "run_sync_in_worker_thread": + _deprecate.DeprecatedAttribute( + run_sync_in_thread, + "0.12.0", + issue=810, + instead=run_sync_in_thread, + ), + "current_default_worker_thread_limiter": + _deprecate.DeprecatedAttribute( + current_default_thread_limiter, + "0.12.0", + issue=810, + instead=current_default_thread_limiter, + ), } # Having the public path in .__module__ attributes is important for: diff --git a/trio/_core/_entry_queue.py b/trio/_core/_entry_queue.py index c522b3bf74..f2d86f129a 100644 --- a/trio/_core/_entry_queue.py +++ b/trio/_core/_entry_queue.py @@ -132,7 +132,7 @@ class TrioToken: 1. It lets you re-enter the Trio run loop from external threads or signal handlers. This is the low-level primitive that - :func:`trio.run_sync_in_worker_thread` uses to receive results from + :func:`trio.run_sync_in_thread` uses to receive results from worker threads, that :func:`trio.open_signal_receiver` uses to receive notifications about signals, and so forth. diff --git a/trio/_core/_traps.py b/trio/_core/_traps.py index f28ff855be..037a4fe022 100644 --- a/trio/_core/_traps.py +++ b/trio/_core/_traps.py @@ -111,7 +111,7 @@ def abort_func(raise_cancel): At that point there are again two possibilities. You can simply ignore the cancellation altogether: wait for the operation to complete and then reschedule and continue as normal. (For example, this is what - :func:`trio.run_sync_in_worker_thread` does if cancellation is disabled.) + :func:`trio.run_sync_in_thread` does if cancellation is disabled.) The other possibility is that the ``abort_func`` does succeed in cancelling the operation, but for some reason isn't able to report that right away. (Example: on Windows, it's possible to request that an diff --git a/trio/_core/tests/test_run.py b/trio/_core/tests/test_run.py index 1369e39fe6..5e356e5f3b 100644 --- a/trio/_core/tests/test_run.py +++ b/trio/_core/tests/test_run.py @@ -17,7 +17,7 @@ from .tutil import check_sequence_matches, gc_collect_harder from ... import _core -from ..._threads import run_sync_in_worker_thread +from ..._threads import run_sync_in_thread from ..._timeouts import sleep, fail_after from ..._util import aiter_compat from ...testing import ( @@ -552,7 +552,7 @@ async def test_cancel_scope_repr(mock_clock): scope.deadline = _core.current_time() + 10 assert "deadline is 10.00 seconds from now" in repr(scope) # when not in async context, can't get the current time - assert "deadline" not in await run_sync_in_worker_thread(repr, scope) + assert "deadline" not in await run_sync_in_thread(repr, scope) scope.cancel() assert "cancelled" in repr(scope) assert "exited" in repr(scope) diff --git a/trio/_file_io.py b/trio/_file_io.py index 4a9d84dc5d..14f83397c9 100644 --- a/trio/_file_io.py +++ b/trio/_file_io.py @@ -53,7 +53,7 @@ class AsyncIOWrapper(AsyncResource): """A generic :class:`~io.IOBase` wrapper that implements the :term:`asynchronous file object` interface. Wrapped methods that could block are executed in - :meth:`trio.run_sync_in_worker_thread`. + :meth:`trio.run_sync_in_thread`. All properties and methods defined in in :mod:`~io` are exposed by this wrapper, if they exist in the wrapped file object. @@ -80,7 +80,7 @@ def __getattr__(self, name): @async_wraps(self.__class__, self._wrapped.__class__, name) async def wrapper(*args, **kwargs): func = partial(meth, *args, **kwargs) - return await trio.run_sync_in_worker_thread(func) + return await trio.run_sync_in_thread(func) # cache the generated method setattr(self, name, wrapper) @@ -115,7 +115,7 @@ async def detach(self): """ - raw = await trio.run_sync_in_worker_thread(self._wrapped.detach) + raw = await trio.run_sync_in_thread(self._wrapped.detach) return wrap_file(raw) async def aclose(self): @@ -128,7 +128,7 @@ async def aclose(self): # ensure the underling file is closed during cancellation with trio.CancelScope(shield=True): - await trio.run_sync_in_worker_thread(self._wrapped.close) + await trio.run_sync_in_thread(self._wrapped.close) await trio.hazmat.checkpoint_if_cancelled() @@ -165,7 +165,7 @@ async def open_file( file = fspath(file) _file = wrap_file( - await trio.run_sync_in_worker_thread( + await trio.run_sync_in_thread( io.open, file, mode, buffering, encoding, errors, newline, closefd, opener ) diff --git a/trio/_path.py b/trio/_path.py index a2002fbe77..369da73d25 100644 --- a/trio/_path.py +++ b/trio/_path.py @@ -58,7 +58,7 @@ def iter_wrapper_factory(cls, meth_name): async def wrapper(self, *args, **kwargs): meth = getattr(self._wrapped, meth_name) func = partial(meth, *args, **kwargs) - items = await trio.run_sync_in_worker_thread(func) + items = await trio.run_sync_in_thread(func) return (rewrap_path(item) for item in items) return wrapper @@ -70,7 +70,7 @@ async def wrapper(self, *args, **kwargs): args = unwrap_paths(args) meth = getattr(self._wrapped, meth_name) func = partial(meth, *args, **kwargs) - value = await trio.run_sync_in_worker_thread(func) + value = await trio.run_sync_in_thread(func) return rewrap_path(value) return wrapper @@ -83,7 +83,7 @@ async def wrapper(cls, *args, **kwargs): args = unwrap_paths(args) meth = getattr(cls._wraps, meth_name) func = partial(meth, *args, **kwargs) - value = await trio.run_sync_in_worker_thread(func) + value = await trio.run_sync_in_thread(func) return rewrap_path(value) return wrapper @@ -145,7 +145,7 @@ def generate_iter(cls, attrs): class Path(metaclass=AsyncAutoWrapperType): """A :class:`pathlib.Path` wrapper that executes blocking methods in - :meth:`trio.run_sync_in_worker_thread`. + :meth:`trio.run_sync_in_thread`. """ @@ -185,7 +185,7 @@ async def open(self, *args, **kwargs): """ func = partial(self._wrapped.open, *args, **kwargs) - value = await trio.run_sync_in_worker_thread(func) + value = await trio.run_sync_in_thread(func) return trio.wrap_file(value) diff --git a/trio/_socket.py b/trio/_socket.py index 1b563f3d43..54f60b143f 100644 --- a/trio/_socket.py +++ b/trio/_socket.py @@ -6,7 +6,7 @@ import idna as _idna import trio -from ._threads import run_sync_in_worker_thread +from ._threads import run_sync_in_thread from ._util import fspath from ._core import RunVar, wait_socket_readable, wait_socket_writable @@ -178,7 +178,7 @@ def numeric_only_failure(exc): if hr is not None: return await hr.getaddrinfo(host, port, family, type, proto, flags) else: - return await run_sync_in_worker_thread( + return await run_sync_in_thread( _stdlib_socket.getaddrinfo, host, port, @@ -204,7 +204,7 @@ async def getnameinfo(sockaddr, flags): if hr is not None: return await hr.getnameinfo(sockaddr, flags) else: - return await run_sync_in_worker_thread( + return await run_sync_in_thread( _stdlib_socket.getnameinfo, sockaddr, flags, cancellable=True ) @@ -215,7 +215,7 @@ async def getprotobyname(name): Like :func:`socket.getprotobyname`, but async. """ - return await run_sync_in_worker_thread( + return await run_sync_in_thread( _stdlib_socket.getprotobyname, name, cancellable=True ) @@ -463,7 +463,7 @@ async def bind(self, address): ): # Use a thread for the filesystem traversal (unless it's an # abstract domain socket) - return await run_sync_in_worker_thread(self._sock.bind, address) + return await run_sync_in_thread(self._sock.bind, address) else: # POSIX actually says that bind can return EWOULDBLOCK and # complete asynchronously, like connect. But in practice AFAICT diff --git a/trio/_subprocess_platform/waitid.py b/trio/_subprocess_platform/waitid.py index 9b2f86f752..d411265427 100644 --- a/trio/_subprocess_platform/waitid.py +++ b/trio/_subprocess_platform/waitid.py @@ -5,7 +5,7 @@ from .. import _core, _subprocess from .._sync import CapacityLimiter, Event -from .._threads import run_sync_in_worker_thread +from .._threads import run_sync_in_thread try: from os import waitid @@ -74,7 +74,7 @@ async def _waitid_system_task(pid: int, event: Event) -> None: # call to trio.run is shutting down. try: - await run_sync_in_worker_thread( + await run_sync_in_thread( sync_wait_reapable, pid, cancellable=True, diff --git a/trio/_sync.py b/trio/_sync.py index 67518a5219..ab80197a5e 100644 --- a/trio/_sync.py +++ b/trio/_sync.py @@ -143,9 +143,9 @@ class CapacityLimiter: fixed number of seats, and if they're all taken then you have to wait for someone to get up before you can sit down. - By default, :func:`run_sync_in_worker_thread` uses a + By default, :func:`run_sync_in_thread` uses a :class:`CapacityLimiter` to limit the number of threads running at once; - see :func:`current_default_worker_thread_limiter` for details. + see :func:`current_default_thread_limiter` for details. If you're familiar with semaphores, then you can think of this as a restricted semaphore that's specialized for one common use case, with @@ -246,7 +246,7 @@ def acquire_on_behalf_of_nowait(self, borrower): Args: borrower: A :class:`trio.hazmat.Task` or arbitrary opaque object used to record who is borrowing this token. This is used by - :func:`run_sync_in_worker_thread` to allow threads to "hold + :func:`run_sync_in_thread` to allow threads to "hold tokens", with the intention in the future of using it to `allow deadlock detection and other useful things `__ diff --git a/trio/_threads.py b/trio/_threads.py index d316db6882..a05b2598a4 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -11,8 +11,8 @@ from ._core import enable_ki_protection, disable_ki_protection, RunVar __all__ = [ - "run_sync_in_worker_thread", - "current_default_worker_thread_limiter", + "run_sync_in_thread", + "current_default_thread_limiter", "BlockingTrioPortal", ] @@ -42,7 +42,7 @@ class BlockingTrioPortal: async def some_function(): portal = trio.BlockingTrioPortal() - await trio.run_sync_in_worker_thread(sync_fn, portal) + await trio.run_sync_in_thread(sync_fn, portal) Alternatively, you can pass an explicit :class:`trio.hazmat.TrioToken` to specify the :func:`trio.run` that you want your portal to connect to. @@ -227,12 +227,12 @@ def run_sync(self, fn, *args): # I pulled this number out of the air; it isn't based on anything. Probably we # should make some kind of measurements to pick a good value. DEFAULT_LIMIT = 40 -_worker_thread_counter = count() +_thread_counter = count() -def current_default_worker_thread_limiter(): +def current_default_thread_limiter(): """Get the default :class:`CapacityLimiter` used by - :func:`run_sync_in_worker_thread`. + :func:`run_sync_in_thread`. The most common reason to call this would be if you want to modify its :attr:`~CapacityLimiter.total_tokens` attribute. @@ -256,7 +256,7 @@ class ThreadPlaceholder: @enable_ki_protection -async def run_sync_in_worker_thread( +async def run_sync_in_thread( sync_fn, *args, cancellable=False, limiter=None ): """Convert a blocking operation into an async operation using a thread. @@ -264,7 +264,7 @@ async def run_sync_in_worker_thread( These two lines are equivalent:: sync_fn(*args) - await run_sync_in_worker_thread(sync_fn, *args) + await run_sync_in_thread(sync_fn, *args) except that if ``sync_fn`` takes a long time, then the first line will block the Trio loop while it runs, while the second line allows other Trio @@ -283,17 +283,17 @@ async def run_sync_in_worker_thread( anything providing compatible :meth:`~trio.CapacityLimiter.acquire_on_behalf_of` and :meth:`~trio.CapacityLimiter.release_on_behalf_of` - methods. :func:`run_sync_in_worker_thread` will call + methods. :func:`run_sync_in_thread` will call ``acquire_on_behalf_of`` before starting the thread, and ``release_on_behalf_of`` after the thread has finished. If None (the default), uses the default :class:`CapacityLimiter`, as - returned by :func:`current_default_worker_thread_limiter`. + returned by :func:`current_default_thread_limiter`. **Cancellation handling**: Cancellation is a tricky issue here, because neither Python nor the operating systems it runs on provide any general mechanism for cancelling an arbitrary synchronous function running in a - thread. :func:`run_sync_in_worker_thread` will always check for + thread. :func:`run_sync_in_thread` will always check for cancellation on entry, before starting the thread. But once the thread is running, there are two ways it can handle being cancelled: @@ -301,33 +301,33 @@ async def run_sync_in_worker_thread( keeps going, just like if we had called ``sync_fn`` synchronously. This is the default behavior. - * If ``cancellable=True``, then ``run_sync_in_worker_thread`` immediately + * If ``cancellable=True``, then ``run_sync_in_thread`` immediately raises :exc:`Cancelled`. In this case **the thread keeps running in background** – we just abandon it to do whatever it's going to do, and silently discard any return value or errors that it raises. Only use this if you know that the operation is safe and side-effect free. (For example: :func:`trio.socket.getaddrinfo` is implemented using - :func:`run_sync_in_worker_thread`, and it sets ``cancellable=True`` + :func:`run_sync_in_thread`, and it sets ``cancellable=True`` because it doesn't really affect anything if a stray hostname lookup keeps running in the background.) The ``limiter`` is only released after the thread has *actually* finished – which in the case of cancellation may be some time after - :func:`run_sync_in_worker_thread` has returned. (This is why it's - crucial that :func:`run_sync_in_worker_thread` takes care of acquiring + :func:`run_sync_in_thread` has returned. (This is why it's + crucial that :func:`run_sync_in_thread` takes care of acquiring and releasing the limiter.) If :func:`trio.run` finishes before the thread does, then the limiter release method will never be called at all. .. warning:: - You should not use :func:`run_sync_in_worker_thread` to call + You should not use :func:`run_sync_in_thread` to call long-running CPU-bound functions! In addition to the usual GIL-related reasons why using threads for CPU-bound work is not very effective in Python, there is an additional problem: on CPython, `CPU-bound threads tend to "starve out" IO-bound threads `__, so using - :func:`run_sync_in_worker_thread` for CPU-bound work is likely to + :func:`run_sync_in_thread` for CPU-bound work is likely to adversely affect the main thread running Trio. If you need to do this, you're better off using a worker process, or perhaps PyPy (which still has a GIL, but may do a better job of fairly allocating CPU time @@ -343,13 +343,13 @@ async def run_sync_in_worker_thread( await trio.hazmat.checkpoint_if_cancelled() token = trio.hazmat.current_trio_token() if limiter is None: - limiter = current_default_worker_thread_limiter() + limiter = current_default_thread_limiter() # Holds a reference to the task that's blocked in this function waiting # for the result – or None if this function was cancelled and we should # discard the result. task_register = [trio.hazmat.current_task()] - name = "trio-worker-{}".format(next(_worker_thread_counter)) + name = "trio-worker-{}".format(next(_thread_counter)) placeholder = ThreadPlaceholder(name) # This function gets scheduled into the Trio run loop to deliver the diff --git a/trio/_wait_for_object.py b/trio/_wait_for_object.py index 08cf482b8c..9feab9e03a 100644 --- a/trio/_wait_for_object.py +++ b/trio/_wait_for_object.py @@ -32,7 +32,7 @@ async def WaitForSingleObject(obj): # that we can use to cancel the thread. cancel_handle = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) try: - await trio.run_sync_in_worker_thread( + await trio.run_sync_in_thread( WaitForMultipleObjects_sync, handle, cancel_handle, diff --git a/trio/tests/test_signals.py b/trio/tests/test_signals.py index d95fa6fff1..8121bc861f 100644 --- a/trio/tests/test_signals.py +++ b/trio/tests/test_signals.py @@ -55,7 +55,7 @@ async def naughty(): pass # pragma: no cover with pytest.raises(RuntimeError): - await trio.run_sync_in_worker_thread(trio.run, naughty) + await trio.run_sync_in_thread(trio.run, naughty) async def test_open_signal_receiver_conflict(): diff --git a/trio/tests/test_ssl.py b/trio/tests/test_ssl.py index c004f5a38e..8178f396fd 100644 --- a/trio/tests/test_ssl.py +++ b/trio/tests/test_ssl.py @@ -114,7 +114,7 @@ async def ssl_echo_server_raw(**kwargs): # nursery context manager to exit too. with a, b: nursery.start_soon( - trio.run_sync_in_worker_thread, + trio.run_sync_in_thread, partial(ssl_echo_serve_sync, b, **kwargs) ) diff --git a/trio/tests/test_threads.py b/trio/tests/test_threads.py index f54d7d4b83..dbbce975a4 100644 --- a/trio/tests/test_threads.py +++ b/trio/tests/test_threads.py @@ -92,7 +92,7 @@ def worker_thread(token): portal = BlockingTrioPortal(token) return portal.run_sync(threading.current_thread) - t = await run_sync_in_worker_thread(worker_thread, token) + t = await run_sync_in_thread(worker_thread, token) assert t == threading.current_thread() @@ -177,7 +177,7 @@ async def test_run_in_worker_thread(): def f(x): return (x, threading.current_thread()) - x, child_thread = await run_sync_in_worker_thread(f, 1) + x, child_thread = await run_sync_in_thread(f, 1) assert x == 1 assert child_thread != trio_thread @@ -185,7 +185,7 @@ def g(): raise ValueError(threading.current_thread()) with pytest.raises(ValueError) as excinfo: - await run_sync_in_worker_thread(g) + await run_sync_in_thread(g) print(excinfo.value.args) assert excinfo.value.args[0] != trio_thread @@ -202,7 +202,7 @@ def f(q): async def child(q, cancellable): record.append("start") try: - return await run_sync_in_worker_thread( + return await run_sync_in_thread( f, q, cancellable=cancellable ) finally: @@ -213,7 +213,7 @@ async def child(q, cancellable): async with _core.open_nursery() as nursery: nursery.start_soon(child, q, True) # Give it a chance to get started. (This is important because - # run_sync_in_worker_thread does a checkpoint_if_cancelled before + # run_sync_in_thread does a checkpoint_if_cancelled before # blocking on the thread, and we don't want to trigger this.) await wait_all_tasks_blocked() assert record == ["start"] @@ -262,7 +262,7 @@ def thread_fn(): async def main(): async def child(): - await run_sync_in_worker_thread(thread_fn, cancellable=True) + await run_sync_in_thread(thread_fn, cancellable=True) async with _core.open_nursery() as nursery: nursery.start_soon(child) @@ -291,13 +291,13 @@ async def test_run_in_worker_thread_limiter(MAX, cancel, use_default_limiter): # This test is a bit tricky. The goal is to make sure that if we set # limiter=CapacityLimiter(MAX), then in fact only MAX threads are ever # running at a time, even if there are more concurrent calls to - # run_sync_in_worker_thread, and even if some of those are cancelled. And + # run_sync_in_thread, and even if some of those are cancelled. And # also to make sure that the default limiter actually limits. COUNT = 2 * MAX gate = threading.Event() lock = threading.Lock() if use_default_limiter: - c = current_default_worker_thread_limiter() + c = current_default_thread_limiter() orig_total_tokens = c.total_tokens c.total_tokens = MAX limiter_arg = None @@ -344,7 +344,7 @@ def thread_fn(cancel_scope): async def run_thread(event): with _core.CancelScope() as cancel_scope: - await run_sync_in_worker_thread( + await run_sync_in_thread( thread_fn, cancel_scope, limiter=limiter_arg, @@ -411,7 +411,7 @@ def release_on_behalf_of(self, borrower): record.append("release") assert borrower == self._borrower - await run_sync_in_worker_thread(lambda: None, limiter=CustomLimiter()) + await run_sync_in_thread(lambda: None, limiter=CustomLimiter()) assert record == ["acquire", "release"] @@ -429,7 +429,7 @@ def release_on_behalf_of(self, borrower): bs = BadCapacityLimiter() with pytest.raises(ValueError) as excinfo: - await run_sync_in_worker_thread(lambda: None, limiter=bs) + await run_sync_in_thread(lambda: None, limiter=bs) assert excinfo.value.__context__ is None assert record == ["acquire", "release"] record = [] @@ -438,7 +438,7 @@ def release_on_behalf_of(self, borrower): # chains with it d = {} with pytest.raises(ValueError) as excinfo: - await run_sync_in_worker_thread(lambda: d["x"], limiter=bs) + await run_sync_in_thread(lambda: d["x"], limiter=bs) assert isinstance(excinfo.value.__context__, KeyError) assert record == ["acquire", "release"] @@ -450,12 +450,12 @@ def bad_start(self): monkeypatch.setattr(threading.Thread, "start", bad_start) - limiter = current_default_worker_thread_limiter() + limiter = current_default_thread_limiter() assert limiter.borrowed_tokens == 0 # We get an appropriate error, and the limiter is cleanly released with pytest.raises(RuntimeError) as excinfo: - await run_sync_in_worker_thread(lambda: None) # pragma: no cover + await run_sync_in_thread(lambda: None) # pragma: no cover assert "engines" in str(excinfo.value) assert limiter.borrowed_tokens == 0 diff --git a/trio/tests/test_util.py b/trio/tests/test_util.py index 77fdcd0835..1b8af8beff 100644 --- a/trio/tests/test_util.py +++ b/trio/tests/test_util.py @@ -6,7 +6,7 @@ import pytest from .. import _core -from trio import run_sync_in_worker_thread +from trio import run_sync_in_thread from .._util import ( signal_raise, ConflictDetector, fspath, is_main_thread, generic_function, Final, NoPublicConstructor @@ -160,7 +160,7 @@ async def test_is_main_thread(): def not_main_thread(): assert not is_main_thread() - await run_sync_in_worker_thread(not_main_thread) + await run_sync_in_thread(not_main_thread) def test_generic_function(): diff --git a/trio/tests/test_wait_for_object.py b/trio/tests/test_wait_for_object.py index a3ce477959..89753ca82a 100644 --- a/trio/tests/test_wait_for_object.py +++ b/trio/tests/test_wait_for_object.py @@ -12,7 +12,7 @@ if on_windows: from .._core._windows_cffi import ffi, kernel32 from .._wait_for_object import WaitForSingleObject, WaitForMultipleObjects_sync - from trio import run_sync_in_worker_thread + from trio import run_sync_in_thread async def test_WaitForMultipleObjects_sync(): @@ -80,7 +80,7 @@ async def test_WaitForMultipleObjects_sync_slow(): t0 = _core.current_time() async with _core.open_nursery() as nursery: nursery.start_soon( - run_sync_in_worker_thread, WaitForMultipleObjects_sync, handle1 + run_sync_in_thread, WaitForMultipleObjects_sync, handle1 ) await _timeouts.sleep(TIMEOUT) # If we would comment the line below, the above thread will be stuck, @@ -97,7 +97,7 @@ async def test_WaitForMultipleObjects_sync_slow(): t0 = _core.current_time() async with _core.open_nursery() as nursery: nursery.start_soon( - run_sync_in_worker_thread, WaitForMultipleObjects_sync, handle1, + run_sync_in_thread, WaitForMultipleObjects_sync, handle1, handle2 ) await _timeouts.sleep(TIMEOUT) @@ -114,7 +114,7 @@ async def test_WaitForMultipleObjects_sync_slow(): t0 = _core.current_time() async with _core.open_nursery() as nursery: nursery.start_soon( - run_sync_in_worker_thread, WaitForMultipleObjects_sync, handle1, + run_sync_in_thread, WaitForMultipleObjects_sync, handle1, handle2 ) await _timeouts.sleep(TIMEOUT)