Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-32309: Implement asyncio.to_thread() #20143

Merged
merged 9 commits into from
May 19, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions Doc/library/asyncio-task.rst
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,86 @@ Waiting Primitives
# ...


Running in Threads
==================

.. coroutinefunction:: to_thread(func, /, \*args, \*\*kwargs)
aeros marked this conversation as resolved.
Show resolved Hide resolved

Asynchronously run function *func* in a separate thread.
aeros marked this conversation as resolved.
Show resolved Hide resolved

Any \*args and \*\*kwargs supplied for this function are directly passed
to *func*.

Return an :class:`asyncio.Future` which represents the eventual result of
*func*.

This coroutine function is primarily intended to be used for executing
IO-bound functions/methods that would otherwise block the event loop if
they were ran in the main thread. For example, the following code would
block the event loop::

# "async def" is just used here so we can submit to asyncio.gather()
async def blocking_io():
aeros marked this conversation as resolved.
Show resolved Hide resolved
print(f"start blocking_io at {time.strftime('%X')}")
# If done in the same thread, blocking IO (such as file operations) will
# block the event loop.
time.sleep(1)
print(f"blocking_io complete at {time.strftime('%X')}")

async def main():
print(f"started main at {time.strftime('%X')}")

await asyncio.gather(
blocking_io(),
asyncio.sleep(1))

print(f"finished main at {time.strftime('%X')}")


asyncio.run(main())

# Expected output:
#
# started main at 19:50:49
# start blocking_io at 19:50:49
# blocking_io complete at 19:50:50
# finished main at 19:50:51

However, by using `asyncio.to_thread()` to run `blocking_io()` in a
separate thread, we can avoid blocking the event loop::

def blocking_io():
print(f"start blocking_io at {time.strftime('%X')}")
time.sleep(1)
print(f"blocking_io complete at {time.strftime('%X')}")

async def main():
print(f"started main at {time.strftime('%X')}")

await asyncio.gather(
asyncio.to_thread(blocking_io),
asyncio.sleep(1))

print(f"finished main at {time.strftime('%X')}")


asyncio.run(main())

# Expected output:
#
# started main at 19:50:53
# start blocking_io at 19:50:53
# blocking_io complete at 19:50:54
# finished main at 19:50:54

.. note::

Due to the :term:`GIL`, `asyncio.to_thread()` can typically only be used
to make IO-bound functions non-blocking. However, for extension modules
that release the GIL or alternative Python implementations that don't
have one, `asyncio.to_thread()` can also be used for CPU-bound functions.
aeros marked this conversation as resolved.
Show resolved Hide resolved


Scheduling From Other Threads
=============================

Expand Down
6 changes: 6 additions & 0 deletions Doc/whatsnew/3.9.rst
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,12 @@ that schedules a shutdown for the default executor that waits on the
Added :class:`asyncio.PidfdChildWatcher`, a Linux-specific child watcher
implementation that polls process file descriptors. (:issue:`38692`)

Added a new :term:`coroutine` :func:`asyncio.to_thread`. It is mainly used for
running IO-bound functions in a separate thread to avoid blocking the event
loop, and essentially works as a high-level version of
:meth:`~asyncio.loop.run_in_executor` that can directly take keyword arguments.
(Contributed by Kyle Stanley and Yury Selivanov in :issue:`32309`.)

compileall
----------

Expand Down
2 changes: 2 additions & 0 deletions Lib/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .streams import *
from .subprocess import *
from .tasks import *
from .threads import *
from .transports import *

# Exposed for _asynciomodule.c to implement now deprecated
Expand All @@ -35,6 +36,7 @@
streams.__all__ +
subprocess.__all__ +
tasks.__all__ +
threads.__all__ +
transports.__all__)

if sys.platform == 'win32': # pragma: no cover
Expand Down
21 changes: 21 additions & 0 deletions Lib/asyncio/threads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""High-level support for working with threads in asyncio"""

import functools

from . import events


__all__ = "to_thread",


async def to_thread(func, /, *args, **kwargs):
"""Asynchronously run function *func* in a separate thread.

Any *args and **kwargs supplied for this function are directly passed
to *func*.

Return an asyncio.Future which represents the eventual result of *func*.
"""
loop = events.get_running_loop()
func_call = functools.partial(func, *args, **kwargs)
aeros marked this conversation as resolved.
Show resolved Hide resolved
return await loop.run_in_executor(None, func_call)
79 changes: 79 additions & 0 deletions Lib/test/test_asyncio/test_threads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Tests for asyncio/threads.py"""

import asyncio
import unittest

from unittest import mock
from test.test_asyncio import utils as test_utils


def tearDownModule():
asyncio.set_event_loop_policy(None)


class ToThreadTests(test_utils.TestCase):
def setUp(self):
super().setUp()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

def tearDown(self):
self.loop.run_until_complete(
self.loop.shutdown_default_executor())
self.loop.close()
asyncio.set_event_loop(None)
self.loop = None
super().tearDown()

def test_to_thread(self):
async def main():
return await asyncio.to_thread(sum, [40, 2])

result = self.loop.run_until_complete(main())
self.assertEqual(result, 42)

def test_to_thread_exception(self):
def raise_runtime():
raise RuntimeError("test")

async def main():
await asyncio.to_thread(raise_runtime)

with self.assertRaisesRegex(RuntimeError, "test"):
self.loop.run_until_complete(main())

def test_to_thread_once(self):
func = mock.Mock()

async def main():
await asyncio.to_thread(func)

self.loop.run_until_complete(main())
func.assert_called_once()

def test_to_thread_concurrent(self):
func = mock.Mock()

async def main():
futs = []
for _ in range(10):
fut = asyncio.to_thread(func)
futs.append(fut)
await asyncio.gather(*futs)

self.loop.run_until_complete(main())
self.assertEqual(func.call_count, 10)

def test_to_thread_args_kwargs(self):
# Unlike run_in_executor(), to_thread() should directly accept kwargs.
func = mock.Mock()

async def main():
await asyncio.to_thread(func, 'test', something=True)

self.loop.run_until_complete(main())
func.assert_called_once_with('test', something=True)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Added a new :term:`coroutine` :func:`asyncio.to_thread`. It is mainly used for
running IO-bound functions in a separate thread to avoid blocking the event
loop, and essentially works as a high-level version of
:meth:`~asyncio.loop.run_in_executor` that can directly take keyword arguments.