From a127f98a7cc88b2a5d2c719648840f3f413ee03b Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Mon, 20 Mar 2023 14:48:24 -0700 Subject: [PATCH 01/28] Eager task factory implementation --- .../pycore_global_objects_fini_generated.h | 1 + Include/internal/pycore_global_strings.h | 1 + .../internal/pycore_runtime_init_generated.h | 1 + .../internal/pycore_unicodeobject_generated.h | 3 + Lib/asyncio/taskgroups.py | 3 +- Lib/asyncio/tasks.py | 125 ++++++++++++------ Modules/_asynciomodule.c | 50 +++++-- Modules/clinic/_asynciomodule.c.h | 27 ++-- 8 files changed, 145 insertions(+), 66 deletions(-) diff --git a/Include/internal/pycore_global_objects_fini_generated.h b/Include/internal/pycore_global_objects_fini_generated.h index 14dfd9ea5823ed..fd3388c604f61a 100644 --- a/Include/internal/pycore_global_objects_fini_generated.h +++ b/Include/internal/pycore_global_objects_fini_generated.h @@ -853,6 +853,7 @@ _PyStaticObjects_CheckRefcnt(PyInterpreterState *interp) { _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(copy)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(copyreg)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(coro)); + _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(coro_result)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(count)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(cwd)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(d)); diff --git a/Include/internal/pycore_global_strings.h b/Include/internal/pycore_global_strings.h index 6f430bb25eb8d3..2a8d449e0612a5 100644 --- a/Include/internal/pycore_global_strings.h +++ b/Include/internal/pycore_global_strings.h @@ -339,6 +339,7 @@ struct _Py_global_strings { STRUCT_FOR_ID(copy) STRUCT_FOR_ID(copyreg) STRUCT_FOR_ID(coro) + STRUCT_FOR_ID(coro_result) STRUCT_FOR_ID(count) STRUCT_FOR_ID(cwd) STRUCT_FOR_ID(d) diff --git a/Include/internal/pycore_runtime_init_generated.h b/Include/internal/pycore_runtime_init_generated.h index 0452c4c61551de..a665fb7f96000e 100644 --- a/Include/internal/pycore_runtime_init_generated.h +++ b/Include/internal/pycore_runtime_init_generated.h @@ -845,6 +845,7 @@ extern "C" { INIT_ID(copy), \ INIT_ID(copyreg), \ INIT_ID(coro), \ + INIT_ID(coro_result), \ INIT_ID(count), \ INIT_ID(cwd), \ INIT_ID(d), \ diff --git a/Include/internal/pycore_unicodeobject_generated.h b/Include/internal/pycore_unicodeobject_generated.h index 7114a5416f2515..bfabfbee538e17 100644 --- a/Include/internal/pycore_unicodeobject_generated.h +++ b/Include/internal/pycore_unicodeobject_generated.h @@ -870,6 +870,9 @@ _PyUnicode_InitStaticStrings(PyInterpreterState *interp) { string = &_Py_ID(coro); assert(_PyUnicode_CheckConsistency(string, 1)); _PyUnicode_InternInPlace(interp, &string); + string = &_Py_ID(coro_result); + assert(_PyUnicode_CheckConsistency(string, 1)); + _PyUnicode_InternInPlace(interp, &string); string = &_Py_ID(count); assert(_PyUnicode_CheckConsistency(string, 1)); _PyUnicode_InternInPlace(interp, &string); diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index 0fdea3697ece3d..70bc2c405fecaf 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -163,7 +163,8 @@ def create_task(self, coro, *, name=None, context=None): task = self._loop.create_task(coro) else: task = self._loop.create_task(coro, context=context) - tasks._set_task_name(task, name) + if name is not None and not task.done(): # If it's done already, it's a future + tasks._set_task_name(task, name) task.add_done_callback(self._on_task_done) self._tasks.add(task) return task diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index c90d32c97add78..9d722f88a91373 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -6,6 +6,7 @@ 'wait', 'wait_for', 'as_completed', 'sleep', 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', 'current_task', 'all_tasks', + 'create_eager_task_factory', 'eager_task_factory', '_register_task', '_unregister_task', '_enter_task', '_leave_task', ) @@ -75,6 +76,8 @@ def _set_task_name(task, name): set_name(name) +_NOT_SET = object() + class Task(futures._PyFuture): # Inherit Python Task implementation # from a Python Future implementation. @@ -93,7 +96,8 @@ class Task(futures._PyFuture): # Inherit Python Task implementation # status is still pending _log_destroy_pending = True - def __init__(self, coro, *, loop=None, name=None, context=None): + def __init__(self, coro, *, loop=None, name=None, context=None, + coro_result=_NOT_SET): super().__init__(loop=loop) if self._source_traceback: del self._source_traceback[-1] @@ -117,7 +121,10 @@ def __init__(self, coro, *, loop=None, name=None, context=None): else: self._context = context - self._loop.call_soon(self.__step, context=self._context) + if coro_result is _NOT_SET: + self._loop.call_soon(self.__step, context=self._context) + else: + self.__step_handle_result(coro_result) _register_task(self) def __del__(self): @@ -287,55 +294,58 @@ def __step(self, exc=None): except BaseException as exc: super().set_exception(exc) else: - blocking = getattr(result, '_asyncio_future_blocking', None) - if blocking is not None: + self.__step_handle_result(result) + finally: + _leave_task(self._loop, self) + self = None # Needed to break cycles when an exception occurs. + + def __step_handle_result(self, result): + blocking = getattr(result, '_asyncio_future_blocking', None) + if blocking is not None: # Yielded Future must come from Future.__iter__(). - if futures._get_loop(result) is not self._loop: + if futures._get_loop(result) is not self._loop: + new_exc = RuntimeError( + f'Task {self!r} got Future ' + f'{result!r} attached to a different loop') + self._loop.call_soon( + self.__step, new_exc, context=self._context) + elif blocking: + if result is self: new_exc = RuntimeError( - f'Task {self!r} got Future ' - f'{result!r} attached to a different loop') + f'Task cannot await on itself: {self!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) - elif blocking: - if result is self: - new_exc = RuntimeError( - f'Task cannot await on itself: {self!r}') - self._loop.call_soon( - self.__step, new_exc, context=self._context) - else: - result._asyncio_future_blocking = False - result.add_done_callback( - self.__wakeup, context=self._context) - self._fut_waiter = result - if self._must_cancel: - if self._fut_waiter.cancel( - msg=self._cancel_message): - self._must_cancel = False else: - new_exc = RuntimeError( - f'yield was used instead of yield from ' - f'in task {self!r} with {result!r}') - self._loop.call_soon( - self.__step, new_exc, context=self._context) - - elif result is None: - # Bare yield relinquishes control for one event loop iteration. - self._loop.call_soon(self.__step, context=self._context) - elif inspect.isgenerator(result): - # Yielding a generator is just wrong. - new_exc = RuntimeError( - f'yield was used instead of yield from for ' - f'generator in task {self!r} with {result!r}') - self._loop.call_soon( - self.__step, new_exc, context=self._context) + result._asyncio_future_blocking = False + result.add_done_callback( + self.__wakeup, context=self._context) + self._fut_waiter = result + if self._must_cancel: + if self._fut_waiter.cancel( + msg=self._cancel_message): + self._must_cancel = False else: - # Yielding something else is an error. - new_exc = RuntimeError(f'Task got bad yield: {result!r}') + new_exc = RuntimeError( + f'yield was used instead of yield from ' + f'in task {self!r} with {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) - finally: - _leave_task(self._loop, self) - self = None # Needed to break cycles when an exception occurs. + + elif result is None: + # Bare yield relinquishes control for one event loop iteration. + self._loop.call_soon(self.__step, context=self._context) + elif inspect.isgenerator(result): + # Yielding a generator is just wrong. + new_exc = RuntimeError( + f'yield was used instead of yield from for ' + f'generator in task {self!r} with {result!r}') + self._loop.call_soon( + self.__step, new_exc, context=self._context) + else: + # Yielding something else is an error. + new_exc = RuntimeError(f'Task got bad yield: {result!r}') + self._loop.call_soon( + self.__step, new_exc, context=self._context) def __wakeup(self, future): try: @@ -897,6 +907,35 @@ def callback(): return future +def create_eager_task_factory(custom_task_constructor): + + def factory(loop, coro, *, name=None, context=None): + loop._check_closed() + if not loop.is_running(): + return custom_task_constructor(coro, loop=loop, name=name, context=context) + + try: + result = coro.send(None) + except StopIteration as si: + fut = loop.create_future() + fut.set_result(si.value) + return fut + except Exception as ex: + fut = loop.create_future() + fut.set_exception(ex) + return fut + else: + task = custom_task_constructor( + coro, loop=loop, name=name, context=context, coro_result=result) + if task._source_traceback: + del task._source_traceback[-1] + return task + + return factory + +eager_task_factory = create_eager_task_factory(Task) + + # WeakSet containing all alive tasks. _all_tasks = weakref.WeakSet() diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index 2476dca6f58ebf..927c7c365a765e 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -156,6 +156,9 @@ class _asyncio.Future "FutureObj *" "&Future_Type" /* Get FutureIter from Future */ static PyObject * future_new_iter(PyObject *); +static PyObject * +task_step_handle_result_impl(asyncio_state *state, TaskObj *task, PyObject *result); + static int _is_coroutine(asyncio_state *state, PyObject *coro) @@ -2025,15 +2028,16 @@ _asyncio.Task.__init__ loop: object = None name: object = None context: object = None + coro_result: object = NULL A coroutine wrapped in a Future. [clinic start generated code]*/ static int _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop, - PyObject *name, PyObject *context) -/*[clinic end generated code: output=49ac96fe33d0e5c7 input=924522490c8ce825]*/ - + PyObject *name, PyObject *context, + PyObject *coro_result) +/*[clinic end generated code: output=e241855787412a77 input=3fcd7fb1c00d3f87]*/ { if (future_init((FutureObj*)self, loop)) { return -1; @@ -2081,8 +2085,16 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop, return -1; } - if (task_call_step_soon(state, self, NULL)) { - return -1; + if (coro_result == NULL) { + if (task_call_step_soon(state, self, NULL)) { + return -1; + } + } + else { + PyObject * res = task_step_handle_result_impl(state, self, coro_result); + if (res == NULL) { + return -1; + } } return register_task(state, (PyObject*)self); } @@ -2822,6 +2834,22 @@ task_step_impl(asyncio_state *state, TaskObj *task, PyObject *exc) Py_RETURN_NONE; } + PyObject *ret = task_step_handle_result_impl(state, task, result); + Py_XDECREF(result); + return ret; + +fail: + Py_XDECREF(result); + return NULL; +} + + +static PyObject * +task_step_handle_result_impl(asyncio_state *state, TaskObj *task, PyObject *result) +{ + int res; + PyObject *o; + if (result == (PyObject*)task) { /* We have a task that wants to await on itself */ goto self_await; @@ -2858,7 +2886,8 @@ task_step_impl(asyncio_state *state, TaskObj *task, PyObject *exc) Py_DECREF(tmp); /* task._fut_waiter = result */ - task->task_fut_waiter = result; /* no incref is necessary */ + Py_INCREF(result); + task->task_fut_waiter = result; if (task->task_must_cancel) { PyObject *r; @@ -2951,7 +2980,8 @@ task_step_impl(asyncio_state *state, TaskObj *task, PyObject *exc) Py_DECREF(tmp); /* task._fut_waiter = result */ - task->task_fut_waiter = result; /* no incref is necessary */ + Py_INCREF(result); + task->task_fut_waiter = result; if (task->task_must_cancel) { PyObject *r; @@ -2986,21 +3016,18 @@ task_step_impl(asyncio_state *state, TaskObj *task, PyObject *exc) state, task, PyExc_RuntimeError, "yield was used instead of yield from for " "generator in task %R with %R", task, result); - Py_DECREF(result); return o; } /* The `result` is none of the above */ o = task_set_error_soon( state, task, PyExc_RuntimeError, "Task got bad yield: %R", result); - Py_DECREF(result); return o; self_await: o = task_set_error_soon( state, task, PyExc_RuntimeError, "Task cannot await on itself: %R", task); - Py_DECREF(result); return o; yield_insteadof_yf: @@ -3009,7 +3036,6 @@ task_step_impl(asyncio_state *state, TaskObj *task, PyObject *exc) "yield was used instead of yield from " "in task %R with %R", task, result); - Py_DECREF(result); return o; different_loop: @@ -3017,11 +3043,9 @@ task_step_impl(asyncio_state *state, TaskObj *task, PyObject *exc) state, task, PyExc_RuntimeError, "Task %R got Future %R attached to a different loop", task, result); - Py_DECREF(result); return o; fail: - Py_XDECREF(result); return NULL; } diff --git a/Modules/clinic/_asynciomodule.c.h b/Modules/clinic/_asynciomodule.c.h index 43c5d771798634..47a678b50784b1 100644 --- a/Modules/clinic/_asynciomodule.c.h +++ b/Modules/clinic/_asynciomodule.c.h @@ -482,14 +482,16 @@ _asyncio_Future__make_cancelled_error(FutureObj *self, PyObject *Py_UNUSED(ignor } PyDoc_STRVAR(_asyncio_Task___init____doc__, -"Task(coro, *, loop=None, name=None, context=None)\n" +"Task(coro, *, loop=None, name=None, context=None,\n" +" coro_result=)\n" "--\n" "\n" "A coroutine wrapped in a Future."); static int _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop, - PyObject *name, PyObject *context); + PyObject *name, PyObject *context, + PyObject *coro_result); static int _asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs) @@ -497,14 +499,14 @@ _asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs) int return_value = -1; #if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE) - #define NUM_KEYWORDS 4 + #define NUM_KEYWORDS 5 static struct { PyGC_Head _this_is_not_used; PyObject_VAR_HEAD PyObject *ob_item[NUM_KEYWORDS]; } _kwtuple = { .ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS) - .ob_item = { &_Py_ID(coro), &_Py_ID(loop), &_Py_ID(name), &_Py_ID(context), }, + .ob_item = { &_Py_ID(coro), &_Py_ID(loop), &_Py_ID(name), &_Py_ID(context), &_Py_ID(coro_result), }, }; #undef NUM_KEYWORDS #define KWTUPLE (&_kwtuple.ob_base.ob_base) @@ -513,14 +515,14 @@ _asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs) # define KWTUPLE NULL #endif // !Py_BUILD_CORE - static const char * const _keywords[] = {"coro", "loop", "name", "context", NULL}; + static const char * const _keywords[] = {"coro", "loop", "name", "context", "coro_result", NULL}; static _PyArg_Parser _parser = { .keywords = _keywords, .fname = "Task", .kwtuple = KWTUPLE, }; #undef KWTUPLE - PyObject *argsbuf[4]; + PyObject *argsbuf[5]; PyObject * const *fastargs; Py_ssize_t nargs = PyTuple_GET_SIZE(args); Py_ssize_t noptargs = nargs + (kwargs ? PyDict_GET_SIZE(kwargs) : 0) - 1; @@ -528,6 +530,7 @@ _asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs) PyObject *loop = Py_None; PyObject *name = Py_None; PyObject *context = Py_None; + PyObject *coro_result = NULL; fastargs = _PyArg_UnpackKeywords(_PyTuple_CAST(args)->ob_item, nargs, kwargs, NULL, &_parser, 1, 1, 0, argsbuf); if (!fastargs) { @@ -549,9 +552,15 @@ _asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs) goto skip_optional_kwonly; } } - context = fastargs[3]; + if (fastargs[3]) { + context = fastargs[3]; + if (!--noptargs) { + goto skip_optional_kwonly; + } + } + coro_result = fastargs[4]; skip_optional_kwonly: - return_value = _asyncio_Task___init___impl((TaskObj *)self, coro, loop, name, context); + return_value = _asyncio_Task___init___impl((TaskObj *)self, coro, loop, name, context, coro_result); exit: return return_value; @@ -1302,4 +1311,4 @@ _asyncio_current_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, exit: return return_value; } -/*[clinic end generated code: output=00f494214f2fd008 input=a9049054013a1b77]*/ +/*[clinic end generated code: output=d7cd98454c53b85a input=a9049054013a1b77]*/ From 45316d810b3fd24970ab50348824c07d9d648d66 Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Mon, 20 Mar 2023 14:48:42 -0700 Subject: [PATCH 02/28] Eager task factory tests --- .../test_asyncio/test_eager_task_factory.py | 875 ++++++++++++++++++ 1 file changed, 875 insertions(+) create mode 100644 Lib/test/test_asyncio/test_eager_task_factory.py diff --git a/Lib/test/test_asyncio/test_eager_task_factory.py b/Lib/test/test_asyncio/test_eager_task_factory.py new file mode 100644 index 00000000000000..25b2c510b5c624 --- /dev/null +++ b/Lib/test/test_asyncio/test_eager_task_factory.py @@ -0,0 +1,875 @@ +"""Tests for base_events.py""" + +import gc +import time +import unittest +from types import GenericAlias +from unittest import mock + +import asyncio +from asyncio import base_events +from asyncio import tasks +from test.test_asyncio import utils as test_utils +from test.test_asyncio.test_tasks import get_innermost_context +from test import support + +MOCK_ANY = mock.ANY + + +def tearDownModule(): + asyncio.set_event_loop_policy(None) + + +class EagerTaskFactoryLoopTests(test_utils.TestCase): + + def new_task(self, loop, coro, name='TestTask', context=None): + return tasks.Task(coro, loop=loop, name=name, context=context) + + def new_future(self, loop): + return asyncio.Future(loop=loop) + + def setUp(self): + super().setUp() + self.loop = asyncio.new_event_loop() + self.loop.set_task_factory(asyncio.eager_task_factory) + self.set_event_loop(self.loop) + + def test_eager_task_factory_set(self): + self.assertIs(self.loop.get_task_factory(), asyncio.eager_task_factory) + + def test_generic_alias(self): + task = tasks.Task[str] + self.assertEqual(task.__args__, (str,)) + self.assertIsInstance(task, GenericAlias) + + def test_task_cancel_message_getter(self): + async def coro(): + pass + t = self.new_task(self.loop, coro()) + self.assertTrue(hasattr(t, '_cancel_message')) + self.assertEqual(t._cancel_message, None) + + t.cancel('my message') + self.assertEqual(t._cancel_message, 'my message') + + with self.assertRaises(asyncio.CancelledError) as cm: + self.loop.run_until_complete(t) + + self.assertEqual('my message', cm.exception.args[0]) + + def test_task_cancel_message_setter(self): + async def coro(): + pass + t = self.new_task(self.loop, coro()) + t.cancel('my message') + t._cancel_message = 'my new message' + self.assertEqual(t._cancel_message, 'my new message') + + with self.assertRaises(asyncio.CancelledError) as cm: + self.loop.run_until_complete(t) + + self.assertEqual('my new message', cm.exception.args[0]) + + def test_task_del_collect(self): + class Evil: + def __del__(self): + gc.collect() + + async def run(): + return Evil() + + self.loop.run_until_complete( + asyncio.gather(*[ + self.new_task(self.loop, run()) for _ in range(100) + ])) + + def test_other_loop_future(self): + other_loop = asyncio.new_event_loop() + fut = self.new_future(other_loop) + + async def run(fut): + await fut + + try: + with self.assertRaisesRegex(RuntimeError, + r'Task .* got Future .* attached'): + self.loop.run_until_complete(run(fut)) + finally: + other_loop.close() + + def test_task_awaits_on_itself(self): + + async def test(): + await task + + task = asyncio.ensure_future(test(), loop=self.loop) + + with self.assertRaisesRegex(RuntimeError, + 'Task cannot await on itself'): + self.loop.run_until_complete(task) + + def test_exception_chaining_after_await(self): + # Test that when awaiting on a task when an exception is already + # active, if the task raises an exception it will be chained + # with the original. + + async def raise_error(): + raise ValueError + + async def run(): + try: + raise KeyError(3) + except Exception as exc: + task = self.new_task(self.loop, raise_error()) + try: + await task + except Exception as exc: + self.assertEqual(type(exc), ValueError) + chained = exc.__context__ + self.assertEqual((type(chained), chained.args), + (KeyError, (3,))) + + task = self.new_task(self.loop, run()) + self.loop.run_until_complete(task) + + def test_exception_chaining_after_await_with_context_cycle(self): + # Check trying to create an exception context cycle: + # https://bugs.python.org/issue40696 + has_cycle = None + + async def process_exc(exc): + raise exc + + async def run(): + nonlocal has_cycle + try: + raise KeyError('a') + except Exception as exc: + task = self.new_task(self.loop, process_exc(exc)) + try: + await task + except BaseException as exc: + has_cycle = (exc is exc.__context__) + # Prevent a hang if has_cycle is True. + exc.__context__ = None + + task = self.new_task(self.loop, run()) + self.loop.run_until_complete(task) + # This also distinguishes from the initial has_cycle=None. + self.assertEqual(has_cycle, False) + + def test_cancelling(self): + + async def task(): + await asyncio.sleep(10) + + t = self.new_task(self.loop, task()) + self.assertFalse(t.cancelling()) + self.assertNotIn(" cancelling ", repr(t)) + self.assertTrue(t.cancel()) + self.assertTrue(t.cancelling()) + self.assertIn(" cancelling ", repr(t)) + + # Since we commented out two lines from Task.cancel(), + # this t.cancel() call now returns True. + # self.assertFalse(t.cancel()) + self.assertTrue(t.cancel()) + + with self.assertRaises(asyncio.CancelledError): + self.loop.run_until_complete(t) + + def test_uncancel_basic(self): + + async def task(): + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + asyncio.current_task().uncancel() + await asyncio.sleep(10) + + t = self.new_task(self.loop, task()) + self.loop.run_until_complete(asyncio.sleep(0.01)) + + # Cancel first sleep + self.assertTrue(t.cancel()) + self.assertIn(" cancelling ", repr(t)) + self.assertEqual(t.cancelling(), 1) + self.assertFalse(t.cancelled()) # Task is still not complete + self.loop.run_until_complete(asyncio.sleep(0.01)) + + # after .uncancel() + self.assertNotIn(" cancelling ", repr(t)) + self.assertEqual(t.cancelling(), 0) + self.assertFalse(t.cancelled()) # Task is still not complete + + # Cancel second sleep + self.assertTrue(t.cancel()) + self.assertEqual(t.cancelling(), 1) + self.assertFalse(t.cancelled()) # Task is still not complete + with self.assertRaises(asyncio.CancelledError): + self.loop.run_until_complete(t) + self.assertTrue(t.cancelled()) # Finally, task complete + self.assertTrue(t.done()) + + # uncancel is no longer effective after the task is complete + t.uncancel() + self.assertTrue(t.cancelled()) + self.assertTrue(t.done()) + + def test_cancel_with_message_then_future_result(self): + # Test Future.result() after calling cancel() with a message. + cases = [ + ((), ()), + ((None,), ()), + (('my message',), ('my message',)), + # Non-string values should roundtrip. + ((5,), (5,)), + ] + for cancel_args, expected_args in cases: + with self.subTest(cancel_args=cancel_args): + + async def sleep(): + await asyncio.sleep(10) + + async def coro(): + task = self.new_task(self.loop, sleep()) + await asyncio.sleep(0) + task.cancel(*cancel_args) + done, pending = await asyncio.wait([task]) + task.result() + + task = self.new_task(self.loop, coro()) + with self.assertRaises(asyncio.CancelledError) as cm: + self.loop.run_until_complete(task) + exc = cm.exception + self.assertEqual(exc.args, expected_args) + + actual = get_innermost_context(exc) + self.assertEqual(actual, + (asyncio.CancelledError, expected_args, 0)) + + def test_cancel_with_message_then_future_exception(self): + # Test Future.exception() after calling cancel() with a message. + cases = [ + ((), ()), + ((None,), ()), + (('my message',), ('my message',)), + # Non-string values should roundtrip. + ((5,), (5,)), + ] + for cancel_args, expected_args in cases: + with self.subTest(cancel_args=cancel_args): + + async def sleep(): + await asyncio.sleep(10) + + async def coro(): + task = self.new_task(self.loop, sleep()) + await asyncio.sleep(0) + task.cancel(*cancel_args) + done, pending = await asyncio.wait([task]) + task.exception() + + task = self.new_task(self.loop, coro()) + with self.assertRaises(asyncio.CancelledError) as cm: + self.loop.run_until_complete(task) + exc = cm.exception + self.assertEqual(exc.args, expected_args) + + actual = get_innermost_context(exc) + self.assertEqual(actual, + (asyncio.CancelledError, expected_args, 0)) + + def test_cancel_with_message_before_starting_task(self): + + async def sleep(): + await asyncio.sleep(10) + + async def coro(): + task = self.new_task(self.loop, sleep()) + # We deliberately leave out the sleep here. + task.cancel('my message') + done, pending = await asyncio.wait([task]) + task.exception() + + task = self.new_task(self.loop, coro()) + with self.assertRaises(asyncio.CancelledError) as cm: + self.loop.run_until_complete(task) + exc = cm.exception + self.assertEqual(exc.args, ('my message',)) + + actual = get_innermost_context(exc) + self.assertEqual(actual, + (asyncio.CancelledError, ('my message',), 0)) + + def test_cancel_yield(self): + async def task(): + await asyncio.sleep(0) + await asyncio.sleep(0) + return 12 + + t = self.new_task(self.loop, task()) + test_utils.run_briefly(self.loop) # start coro + t.cancel() + self.assertRaises( + asyncio.CancelledError, self.loop.run_until_complete, t) + self.assertTrue(t.done()) + self.assertTrue(t.cancelled()) + self.assertFalse(t.cancel()) + + def test_cancel_inner_future(self): + f = self.new_future(self.loop) + + async def task(): + await f + return 12 + + t = self.new_task(self.loop, task()) + test_utils.run_briefly(self.loop) # start task + f.cancel() + with self.assertRaises(asyncio.CancelledError): + self.loop.run_until_complete(t) + self.assertTrue(f.cancelled()) + self.assertTrue(t.cancelled()) + + def test_cancel_both_task_and_inner_future(self): + f = self.new_future(self.loop) + + async def task(): + await f + return 12 + + t = self.new_task(self.loop, task()) + test_utils.run_briefly(self.loop) + + f.cancel() + t.cancel() + + with self.assertRaises(asyncio.CancelledError): + self.loop.run_until_complete(t) + + self.assertTrue(t.done()) + self.assertTrue(f.cancelled()) + self.assertTrue(t.cancelled()) + + def test_cancel_task_catching(self): + fut1 = self.new_future(self.loop) + fut2 = self.new_future(self.loop) + + async def task(): + await fut1 + try: + await fut2 + except asyncio.CancelledError: + return 42 + + t = self.new_task(self.loop, task()) + test_utils.run_briefly(self.loop) + self.assertIs(t._fut_waiter, fut1) # White-box test. + fut1.set_result(None) + test_utils.run_briefly(self.loop) + self.assertIs(t._fut_waiter, fut2) # White-box test. + t.cancel() + self.assertTrue(fut2.cancelled()) + res = self.loop.run_until_complete(t) + self.assertEqual(res, 42) + self.assertFalse(t.cancelled()) + + def test_cancel_task_ignoring(self): + fut1 = self.new_future(self.loop) + fut2 = self.new_future(self.loop) + fut3 = self.new_future(self.loop) + + async def task(): + await fut1 + try: + await fut2 + except asyncio.CancelledError: + pass + res = await fut3 + return res + + t = self.new_task(self.loop, task()) + test_utils.run_briefly(self.loop) + self.assertIs(t._fut_waiter, fut1) # White-box test. + fut1.set_result(None) + test_utils.run_briefly(self.loop) + self.assertIs(t._fut_waiter, fut2) # White-box test. + t.cancel() + self.assertTrue(fut2.cancelled()) + test_utils.run_briefly(self.loop) + self.assertIs(t._fut_waiter, fut3) # White-box test. + fut3.set_result(42) + res = self.loop.run_until_complete(t) + self.assertEqual(res, 42) + self.assertFalse(fut3.cancelled()) + self.assertFalse(t.cancelled()) + + def test_close(self): + self.assertFalse(self.loop.is_closed()) + self.loop.close() + self.assertTrue(self.loop.is_closed()) + + # it should be possible to call close() more than once + self.loop.close() + self.loop.close() + + # operation blocked when the loop is closed + f = self.loop.create_future() + self.assertRaises(RuntimeError, self.loop.run_forever) + self.assertRaises(RuntimeError, self.loop.run_until_complete, f) + + def test__add_callback_handle(self): + h = asyncio.Handle(lambda: False, (), self.loop, None) + + self.loop._add_callback(h) + self.assertFalse(self.loop._scheduled) + self.assertIn(h, self.loop._ready) + + def test__add_callback_cancelled_handle(self): + h = asyncio.Handle(lambda: False, (), self.loop, None) + h.cancel() + + self.loop._add_callback(h) + self.assertFalse(self.loop._scheduled) + self.assertFalse(self.loop._ready) + + def test_call_soon(self): + def cb(): + pass + + h = self.loop.call_soon(cb) + self.assertEqual(h._callback, cb) + self.assertIsInstance(h, asyncio.Handle) + self.assertIn(h, self.loop._ready) + + def test_call_soon_non_callable(self): + self.loop.set_debug(True) + with self.assertRaisesRegex(TypeError, 'a callable object'): + self.loop.call_soon(1) + + def test_call_later(self): + def cb(): + pass + + h = self.loop.call_later(10.0, cb) + self.assertIsInstance(h, asyncio.TimerHandle) + self.assertIn(h, self.loop._scheduled) + self.assertNotIn(h, self.loop._ready) + with self.assertRaises(TypeError, msg="delay must not be None"): + self.loop.call_later(None, cb) + + def test_call_later_negative_delays(self): + calls = [] + + def cb(arg): + calls.append(arg) + + self.loop._process_events = mock.Mock() + self.loop.call_later(-1, cb, 'a') + self.loop.call_later(-2, cb, 'b') + test_utils.run_briefly(self.loop) + self.assertEqual(calls, ['b', 'a']) + + def test_time_and_call_at(self): + def cb(): + self.loop.stop() + + self.loop._process_events = mock.Mock() + delay = 0.1 + + when = self.loop.time() + delay + self.loop.call_at(when, cb) + t0 = self.loop.time() + self.loop.run_forever() + dt = self.loop.time() - t0 + + # 50 ms: maximum granularity of the event loop + self.assertGreaterEqual(dt, delay - 0.050, dt) + # tolerate a difference of +800 ms because some Python buildbots + # are really slow + self.assertLessEqual(dt, 0.9, dt) + with self.assertRaises(TypeError, msg="when cannot be None"): + self.loop.call_at(None, cb) + + def test_run_until_complete_loop(self): + task = self.loop.create_future() + other_loop = self.new_test_loop() + self.addCleanup(other_loop.close) + self.assertRaises(ValueError, + other_loop.run_until_complete, task) + + def test_run_until_complete_loop_orphan_future_close_loop(self): + class ShowStopper(SystemExit): + pass + + async def foo(delay): + await asyncio.sleep(delay) + + def throw(): + raise ShowStopper + + self.loop._process_events = mock.Mock() + self.loop.call_soon(throw) + with self.assertRaises(ShowStopper): + self.loop.run_until_complete(foo(0.1)) + + # This call fails if run_until_complete does not clean up + # done-callback for the previous future. + self.loop.run_until_complete(foo(0.2)) + + def test_default_exc_handler_callback(self): + self.loop._process_events = mock.Mock() + + def zero_error(fut): + fut.set_result(True) + 1/0 + + # Test call_soon (events.Handle) + with mock.patch('asyncio.base_events.logger') as log: + fut = self.loop.create_future() + self.loop.call_soon(zero_error, fut) + fut.add_done_callback(lambda fut: self.loop.stop()) + self.loop.run_forever() + log.error.assert_called_with( + test_utils.MockPattern('Exception in callback.*zero'), + exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) + + # Test call_later (events.TimerHandle) + with mock.patch('asyncio.base_events.logger') as log: + fut = self.loop.create_future() + self.loop.call_later(0.01, zero_error, fut) + fut.add_done_callback(lambda fut: self.loop.stop()) + self.loop.run_forever() + log.error.assert_called_with( + test_utils.MockPattern('Exception in callback.*zero'), + exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) + + def test_default_exc_handler_coro(self): + self.loop._process_events = mock.Mock() + + async def zero_error_coro(): + await asyncio.sleep(0.01) + 1/0 + + # Test Future.__del__ + with mock.patch('asyncio.base_events.logger') as log: + fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop) + fut.add_done_callback(lambda *args: self.loop.stop()) + self.loop.run_forever() + fut = None # Trigger Future.__del__ or futures._TracebackLogger + support.gc_collect() + # Future.__del__ in logs error with an actual exception context + log.error.assert_called_with( + test_utils.MockPattern('.*exception was never retrieved'), + exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) + + def test_set_exc_handler_invalid(self): + with self.assertRaisesRegex(TypeError, 'A callable object or None'): + self.loop.set_exception_handler('spam') + + def test_set_exc_handler_custom(self): + def zero_error(): + 1/0 + + def run_loop(): + handle = self.loop.call_soon(zero_error) + self.loop._run_once() + return handle + + self.loop.set_debug(True) + self.loop._process_events = mock.Mock() + + self.assertIsNone(self.loop.get_exception_handler()) + mock_handler = mock.Mock() + self.loop.set_exception_handler(mock_handler) + self.assertIs(self.loop.get_exception_handler(), mock_handler) + handle = run_loop() + mock_handler.assert_called_with(self.loop, { + 'exception': MOCK_ANY, + 'message': test_utils.MockPattern( + 'Exception in callback.*zero_error'), + 'handle': handle, + 'source_traceback': handle._source_traceback, + }) + mock_handler.reset_mock() + + self.loop.set_exception_handler(None) + with mock.patch('asyncio.base_events.logger') as log: + run_loop() + log.error.assert_called_with( + test_utils.MockPattern( + 'Exception in callback.*zero'), + exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) + + self.assertFalse(mock_handler.called) + + def test_set_exc_handler_broken(self): + def run_loop(): + def zero_error(): + 1/0 + self.loop.call_soon(zero_error) + self.loop._run_once() + + def handler(loop, context): + raise AttributeError('spam') + + self.loop._process_events = mock.Mock() + + self.loop.set_exception_handler(handler) + + with mock.patch('asyncio.base_events.logger') as log: + run_loop() + log.error.assert_called_with( + test_utils.MockPattern( + 'Unhandled error in exception handler'), + exc_info=(AttributeError, MOCK_ANY, MOCK_ANY)) + + def test_default_exc_handler_broken(self): + _context = None + + class Loop(base_events.BaseEventLoop): + + _selector = mock.Mock() + _process_events = mock.Mock() + + def default_exception_handler(self, context): + nonlocal _context + _context = context + # Simulates custom buggy "default_exception_handler" + raise ValueError('spam') + + loop = Loop() + self.addCleanup(loop.close) + asyncio.set_event_loop(loop) + + def run_loop(): + def zero_error(): + 1/0 + loop.call_soon(zero_error) + loop._run_once() + + with mock.patch('asyncio.base_events.logger') as log: + run_loop() + log.error.assert_called_with( + 'Exception in default exception handler', + exc_info=True) + + def custom_handler(loop, context): + raise ValueError('ham') + + _context = None + loop.set_exception_handler(custom_handler) + with mock.patch('asyncio.base_events.logger') as log: + run_loop() + log.error.assert_called_with( + test_utils.MockPattern('Exception in default exception.*' + 'while handling.*in custom'), + exc_info=True) + + # Check that original context was passed to default + # exception handler. + self.assertIn('context', _context) + self.assertIs(type(_context['context']['exception']), + ZeroDivisionError) + + def test_eager_task_factory_with_custom_task_ctor(self): + + class MyTask(asyncio.Task): + pass + + async def coro(): + pass + + factory = asyncio.create_eager_task_factory(MyTask) + + self.loop.set_task_factory(factory) + self.assertIs(self.loop.get_task_factory(), factory) + + task = self.loop.create_task(coro()) + self.assertTrue(isinstance(task, MyTask)) + self.loop.run_until_complete(task) + + def test_create_named_task(self): + async def test(): + pass + + task = self.loop.create_task(test(), name='test_task') + try: + self.assertEqual(task.get_name(), 'test_task') + finally: + self.loop.run_until_complete(task) + + def test_run_forever_keyboard_interrupt(self): + # Python issue #22601: ensure that the temporary task created by + # run_forever() consumes the KeyboardInterrupt and so don't log + # a warning + async def raise_keyboard_interrupt(): + raise KeyboardInterrupt + + self.loop._process_events = mock.Mock() + self.loop.call_exception_handler = mock.Mock() + + try: + self.loop.run_until_complete(raise_keyboard_interrupt()) + except KeyboardInterrupt: + pass + self.loop.close() + support.gc_collect() + + self.assertFalse(self.loop.call_exception_handler.called) + + def test_run_until_complete_baseexception(self): + # Python issue #22429: run_until_complete() must not schedule a pending + # call to stop() if the future raised a BaseException + async def raise_keyboard_interrupt(): + raise KeyboardInterrupt + + self.loop._process_events = mock.Mock() + + with self.assertRaises(KeyboardInterrupt): + self.loop.run_until_complete(raise_keyboard_interrupt()) + + def func(): + self.loop.stop() + func.called = True + func.called = False + self.loop.call_soon(self.loop.call_soon, func) + self.loop.run_forever() + self.assertTrue(func.called) + + def test_run_once(self): + # Simple test for test_utils.run_once(). It may seem strange + # to have a test for this (the function isn't even used!) but + # it's a de-factor standard API for library tests. This tests + # the idiom: loop.call_soon(loop.stop); loop.run_forever(). + count = 0 + + def callback(): + nonlocal count + count += 1 + + self.loop._process_events = mock.Mock() + self.loop.call_soon(callback) + test_utils.run_once(self.loop) + self.assertEqual(count, 1) + + +class AsyncTaskCounter: + def __init__(self, loop, *, task_class, eager): + self.suspense_count = 0 + self.task_count = 0 + + def CountingTask(*args, **kwargs): + self.task_count += 1 + return task_class(*args, **kwargs) + + if eager: + factory = asyncio.create_eager_task_factory(CountingTask) + else: + def factory(loop, coro, **kwargs): + return CountingTask(coro, loop=loop, **kwargs) + loop.set_task_factory(factory) + + def get(self): + return self.task_count + + +async def awaitable_chain(depth): + if depth == 0: + return 0 + return 1 + await awaitable_chain(depth - 1) + + +async def recursive_taskgroups(width, depth): + if depth == 0: + return 0 + + async with asyncio.TaskGroup() as tg: + futures = [ + tg.create_task(recursive_taskgroups(width, depth - 1)) + for _ in range(width) + ] + return sum( + (1 if isinstance(fut, (asyncio.Task, tasks._CTask, tasks._PyTask)) else 0) + + fut.result() + for fut in futures + ) + + +async def recursive_gather(width, depth): + if depth == 0: + return + + await asyncio.gather( + *[recursive_gather(width, depth - 1) for _ in range(width)] + ) + + +class BaseTaskCountingTests: + + Task = None + eager = None + expected_task_count = None + + def setUp(self): + super().setUp() + self.loop = asyncio.new_event_loop() + self.counter = AsyncTaskCounter(self.loop, task_class=self.Task, eager=self.eager) + self.set_event_loop(self.loop) + + def test_awaitables_chain(self): + observed_depth = self.loop.run_until_complete(awaitable_chain(100)) + self.assertEqual(observed_depth, 100) + self.assertEqual(self.counter.get(), 1) + + def test_recursive_taskgroups(self): + num_tasks = self.loop.run_until_complete(recursive_taskgroups(5, 4)) + self.assertEqual(num_tasks, self.expected_task_count - 1) + self.assertEqual(self.counter.get(), self.expected_task_count) + + def test_recursive_gather(self): + self.loop.run_until_complete(recursive_gather(5, 4)) + self.assertEqual(self.counter.get(), self.expected_task_count) + + +class BaseNonEagerTaskFactoryTests(BaseTaskCountingTests): + eager = False + expected_task_count = 781 # 1 + 5 + 5^2 + 5^3 + 5^4 + + +class BaseEagerTaskFactoryTests(BaseTaskCountingTests): + eager = True + expected_task_count = 156 # 1 + 5 + 5^2 + 5^3 + + +class NonEagerTests(BaseNonEagerTaskFactoryTests, test_utils.TestCase): + Task = asyncio.Task + + +class EagerTests(BaseEagerTaskFactoryTests, test_utils.TestCase): + Task = asyncio.Task + + +class NonEagerPyTaskTests(BaseNonEagerTaskFactoryTests, test_utils.TestCase): + Task = tasks._PyTask + + +class EagerPyTaskTests(BaseEagerTaskFactoryTests, test_utils.TestCase): + Task = tasks._PyTask + + +@unittest.skipUnless(hasattr(tasks, '_CTask'), + 'requires the C _asyncio module') +class NonEagerCTaskTests(BaseNonEagerTaskFactoryTests, test_utils.TestCase): + Task = getattr(tasks, '_CTask', None) + + +@unittest.skipUnless(hasattr(tasks, '_CTask'), + 'requires the C _asyncio module') +class EagerCTaskTests(BaseEagerTaskFactoryTests, test_utils.TestCase): + Task = getattr(tasks, '_CTask', None) + + +if __name__ == '__main__': + unittest.main() From ac9b7b035bd752cf71fc9075d6694d10c4628253 Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Mon, 20 Mar 2023 14:49:17 -0700 Subject: [PATCH 03/28] Add NEWS and docs for eager task factory --- Doc/library/asyncio-task.rst | 24 +++++++++++++++++++ ...3-03-15-12-18-07.gh-issue-97696.DtnpIC.rst | 4 ++++ 2 files changed, 28 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index ba0f909c405a34..3188f366349a56 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -527,6 +527,30 @@ Running Tasks Concurrently and there is no running event loop. +Eager Task Factory +================== + +.. function:: eager_task_factory(loop, coro, *, name=None, context=None) + + A task factory for eager task execution. + + When using this factory (via ``loop.set_task_factory(asyncio.eager_task_factory)``), + coroutines that are able to complete synchronously (without suspending) + are returned immediately as a completed :class:`Future`. + + A regular :class:`Task` is returned otherwise, at the first suspension of *coro*. + + .. versionadded:: 3.12 + +.. function:: create_eager_task_factory(custom_task_constructor) + + Create an eager task factory, similar to :func:`eager_task_factory`, + using the provided *custom_task_constructor* when creating a new task instead + of the default :class:`Task`. + + .. versionadded:: 3.12 + + Shielding From Cancellation =========================== diff --git a/Misc/NEWS.d/next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst b/Misc/NEWS.d/next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst new file mode 100644 index 00000000000000..eb1861b4e5aaaf --- /dev/null +++ b/Misc/NEWS.d/next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst @@ -0,0 +1,4 @@ +Implemented an eager task factory in asyncio. When set as a task factory on +an event loop, it performs eager execution of coroutines and returns a +completed future instead of scheduling a task to the event loop if the +coroutine can complete without suspending. From 402c3172dc2942642f2929943bd3982388b25e64 Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Wed, 22 Mar 2023 09:28:24 -0700 Subject: [PATCH 04/28] elaborate explanation in docs and add a whatsnew entry --- Doc/library/asyncio-task.rst | 14 ++++++++++++-- Doc/whatsnew/3.12.rst | 5 +++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index 3188f366349a56..b7a03050192840 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -535,10 +535,20 @@ Eager Task Factory A task factory for eager task execution. When using this factory (via ``loop.set_task_factory(asyncio.eager_task_factory)``), - coroutines that are able to complete synchronously (without suspending) + coroutines that are able to complete synchronously (without blocking) are returned immediately as a completed :class:`Future`. - A regular :class:`Task` is returned otherwise, at the first suspension of *coro*. + This task factory tries to execute the coroutine `coro` immediately + (before creating and scheduling a task to the event loop), until it either + blocks, returns, or raises. + If the coroutine returns or raises, a :class:`Future` is returned, and no + task is created or scheduled to the event loop. If the coroutine blocks, + a :class:`Task` is constructed and returned at that point. + + .. note:: + + The fact that the coroutine starts execution immediately is a semantic change, + and might lead to application behavior changes, depending on the application. .. versionadded:: 3.12 diff --git a/Doc/whatsnew/3.12.rst b/Doc/whatsnew/3.12.rst index b3bb065741d037..c8cf68ad03c84d 100644 --- a/Doc/whatsnew/3.12.rst +++ b/Doc/whatsnew/3.12.rst @@ -532,6 +532,11 @@ Optimizations replacement strings containing group references by 2--3 times. (Contributed by Serhiy Storchaka in :gh:`91524`.) +* Added :func:`asyncio.eager_task_factory` and :func:`asyncio.create_eager_task_factory` + functions to allow opting an event loop in to eager task execution, + speeding up some use-cases by up to 50%. + (Contributed by Itamar O in :gh:`102853`) + CPython bytecode changes ======================== From 563ffd4df3d93d76b21020aa49209fd0e26c8e1c Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Wed, 22 Mar 2023 11:08:22 -0700 Subject: [PATCH 05/28] fix docs --- Doc/library/asyncio-task.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index b7a03050192840..ade4e140fe9110 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -538,7 +538,7 @@ Eager Task Factory coroutines that are able to complete synchronously (without blocking) are returned immediately as a completed :class:`Future`. - This task factory tries to execute the coroutine `coro` immediately + This task factory tries to execute the coroutine ``coro`` immediately (before creating and scheduling a task to the event loop), until it either blocks, returns, or raises. If the coroutine returns or raises, a :class:`Future` is returned, and no From 6f2a47aad05eddf5d473aa90f334e1f9773fe2eb Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Thu, 20 Apr 2023 16:08:45 -0600 Subject: [PATCH 06/28] Overhaul eager task factory design Always create a task object to resolve the design flaw with structured concurrency. Push the eager step execution into the task implementation, so the task itself knows to eagerly execute the first step if created via the eager task factory. Co-authored-by: Jacob Bower --- .../pycore_global_objects_fini_generated.h | 3 +- Include/internal/pycore_global_strings.h | 3 +- .../internal/pycore_runtime_init_generated.h | 3 +- .../internal/pycore_unicodeobject_generated.h | 9 +- Lib/asyncio/taskgroups.py | 3 +- Lib/asyncio/tasks.py | 211 +++++++++------ Modules/_asynciomodule.c | 252 ++++++++++++++++-- Modules/clinic/_asynciomodule.c.h | 194 +++++++++++++- 8 files changed, 548 insertions(+), 130 deletions(-) diff --git a/Include/internal/pycore_global_objects_fini_generated.h b/Include/internal/pycore_global_objects_fini_generated.h index fd3388c604f61a..23ba2c07a191a6 100644 --- a/Include/internal/pycore_global_objects_fini_generated.h +++ b/Include/internal/pycore_global_objects_fini_generated.h @@ -853,7 +853,6 @@ _PyStaticObjects_CheckRefcnt(PyInterpreterState *interp) { _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(copy)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(copyreg)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(coro)); - _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(coro_result)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(count)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(cwd)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(d)); @@ -885,6 +884,7 @@ _PyStaticObjects_CheckRefcnt(PyInterpreterState *interp) { _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(dst_dir_fd)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(duration)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(e)); + _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(eager_start)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(effective_ids)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(element_factory)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(encode)); @@ -974,6 +974,7 @@ _PyStaticObjects_CheckRefcnt(PyInterpreterState *interp) { _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(instructions)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(intern)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(intersection)); + _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(is_running)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(isatty)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(isinstance)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(isoformat)); diff --git a/Include/internal/pycore_global_strings.h b/Include/internal/pycore_global_strings.h index 2a8d449e0612a5..87e5a6e2d28ad4 100644 --- a/Include/internal/pycore_global_strings.h +++ b/Include/internal/pycore_global_strings.h @@ -339,7 +339,6 @@ struct _Py_global_strings { STRUCT_FOR_ID(copy) STRUCT_FOR_ID(copyreg) STRUCT_FOR_ID(coro) - STRUCT_FOR_ID(coro_result) STRUCT_FOR_ID(count) STRUCT_FOR_ID(cwd) STRUCT_FOR_ID(d) @@ -371,6 +370,7 @@ struct _Py_global_strings { STRUCT_FOR_ID(dst_dir_fd) STRUCT_FOR_ID(duration) STRUCT_FOR_ID(e) + STRUCT_FOR_ID(eager_start) STRUCT_FOR_ID(effective_ids) STRUCT_FOR_ID(element_factory) STRUCT_FOR_ID(encode) @@ -460,6 +460,7 @@ struct _Py_global_strings { STRUCT_FOR_ID(instructions) STRUCT_FOR_ID(intern) STRUCT_FOR_ID(intersection) + STRUCT_FOR_ID(is_running) STRUCT_FOR_ID(isatty) STRUCT_FOR_ID(isinstance) STRUCT_FOR_ID(isoformat) diff --git a/Include/internal/pycore_runtime_init_generated.h b/Include/internal/pycore_runtime_init_generated.h index a665fb7f96000e..e7351798fa0b83 100644 --- a/Include/internal/pycore_runtime_init_generated.h +++ b/Include/internal/pycore_runtime_init_generated.h @@ -845,7 +845,6 @@ extern "C" { INIT_ID(copy), \ INIT_ID(copyreg), \ INIT_ID(coro), \ - INIT_ID(coro_result), \ INIT_ID(count), \ INIT_ID(cwd), \ INIT_ID(d), \ @@ -877,6 +876,7 @@ extern "C" { INIT_ID(dst_dir_fd), \ INIT_ID(duration), \ INIT_ID(e), \ + INIT_ID(eager_start), \ INIT_ID(effective_ids), \ INIT_ID(element_factory), \ INIT_ID(encode), \ @@ -966,6 +966,7 @@ extern "C" { INIT_ID(instructions), \ INIT_ID(intern), \ INIT_ID(intersection), \ + INIT_ID(is_running), \ INIT_ID(isatty), \ INIT_ID(isinstance), \ INIT_ID(isoformat), \ diff --git a/Include/internal/pycore_unicodeobject_generated.h b/Include/internal/pycore_unicodeobject_generated.h index bfabfbee538e17..828814cc20f167 100644 --- a/Include/internal/pycore_unicodeobject_generated.h +++ b/Include/internal/pycore_unicodeobject_generated.h @@ -870,9 +870,6 @@ _PyUnicode_InitStaticStrings(PyInterpreterState *interp) { string = &_Py_ID(coro); assert(_PyUnicode_CheckConsistency(string, 1)); _PyUnicode_InternInPlace(interp, &string); - string = &_Py_ID(coro_result); - assert(_PyUnicode_CheckConsistency(string, 1)); - _PyUnicode_InternInPlace(interp, &string); string = &_Py_ID(count); assert(_PyUnicode_CheckConsistency(string, 1)); _PyUnicode_InternInPlace(interp, &string); @@ -966,6 +963,9 @@ _PyUnicode_InitStaticStrings(PyInterpreterState *interp) { string = &_Py_ID(e); assert(_PyUnicode_CheckConsistency(string, 1)); _PyUnicode_InternInPlace(interp, &string); + string = &_Py_ID(eager_start); + assert(_PyUnicode_CheckConsistency(string, 1)); + _PyUnicode_InternInPlace(interp, &string); string = &_Py_ID(effective_ids); assert(_PyUnicode_CheckConsistency(string, 1)); _PyUnicode_InternInPlace(interp, &string); @@ -1233,6 +1233,9 @@ _PyUnicode_InitStaticStrings(PyInterpreterState *interp) { string = &_Py_ID(intersection); assert(_PyUnicode_CheckConsistency(string, 1)); _PyUnicode_InternInPlace(interp, &string); + string = &_Py_ID(is_running); + assert(_PyUnicode_CheckConsistency(string, 1)); + _PyUnicode_InternInPlace(interp, &string); string = &_Py_ID(isatty); assert(_PyUnicode_CheckConsistency(string, 1)); _PyUnicode_InternInPlace(interp, &string); diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index 70bc2c405fecaf..0fdea3697ece3d 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -163,8 +163,7 @@ def create_task(self, coro, *, name=None, context=None): task = self._loop.create_task(coro) else: task = self._loop.create_task(coro, context=context) - if name is not None and not task.done(): # If it's done already, it's a future - tasks._set_task_name(task, name) + tasks._set_task_name(task, name) task.add_done_callback(self._on_task_done) self._tasks.add(task) return task diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 9d722f88a91373..c32fc4af34e6df 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -44,22 +44,25 @@ def all_tasks(loop=None): """Return a set of all tasks for the loop.""" if loop is None: loop = events.get_running_loop() - # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another - # thread while we do so. Therefore we cast it to list prior to filtering. The list - # cast itself requires iteration, so we repeat it several times ignoring - # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for - # details. + # Looping over these sets isn't safe as they can be updated from another thread, + # therefore we cast to lists prior to filtering. The list cast itself requires + # iteration, so we repeat it several times ignoring RuntimeErrors (which are not + # very likely to occur). See issues 34970 and 36607 for details. + scheduled_tasks = None + eager_tasks = None i = 0 while True: try: - tasks = list(_all_tasks) + if scheduled_tasks is None: + scheduled_tasks = list(_scheduled_tasks) + eager_tasks = list(_eager_tasks) except RuntimeError: i += 1 if i >= 1000: raise else: break - return {t for t in tasks + return {t for t in itertools.chain(scheduled_tasks, eager_tasks) if futures._get_loop(t) is loop and not t.done()} @@ -76,8 +79,6 @@ def _set_task_name(task, name): set_name(name) -_NOT_SET = object() - class Task(futures._PyFuture): # Inherit Python Task implementation # from a Python Future implementation. @@ -97,7 +98,7 @@ class Task(futures._PyFuture): # Inherit Python Task implementation _log_destroy_pending = True def __init__(self, coro, *, loop=None, name=None, context=None, - coro_result=_NOT_SET): + eager_start=False): super().__init__(loop=loop) if self._source_traceback: del self._source_traceback[-1] @@ -121,11 +122,11 @@ def __init__(self, coro, *, loop=None, name=None, context=None, else: self._context = context - if coro_result is _NOT_SET: - self._loop.call_soon(self.__step, context=self._context) + if eager_start and self._loop.is_running(): + self.__eager_start() else: - self.__step_handle_result(coro_result) - _register_task(self) + self._loop.call_soon(self.__step, context=self._context) + _register_task(self) def __del__(self): if self._state == futures._PENDING and self._log_destroy_pending: @@ -257,6 +258,24 @@ def uncancel(self): self._num_cancels_requested -= 1 return self._num_cancels_requested + def __eager_start(self): + prev_task = _swap_current_task(self._loop, self) + try: + _register_eager_task(self) + try: + self._context.run(self.__step_run_and_handle_result, None) + finally: + _unregister_eager_task(self) + finally: + try: + curtask = _swap_current_task(self._loop, prev_task) + assert curtask is self + finally: + if self.done(): + self._coro = None + else: + _register_task(self) + def __step(self, exc=None): if self.done(): raise exceptions.InvalidStateError( @@ -265,11 +284,17 @@ def __step(self, exc=None): if not isinstance(exc, exceptions.CancelledError): exc = self._make_cancelled_error() self._must_cancel = False - coro = self._coro self._fut_waiter = None _enter_task(self._loop, self) - # Call either coro.throw(exc) or coro.send(None). + try: + self.__step_run_and_handle_result(exc) + finally: + _leave_task(self._loop, self) + self = None # Needed to break cycles when an exception occurs. + + def __step_run_and_handle_result(self, exc): + coro = self._coro try: if exc is None: # We use the `send` method directly, because coroutines @@ -294,58 +319,52 @@ def __step(self, exc=None): except BaseException as exc: super().set_exception(exc) else: - self.__step_handle_result(result) - finally: - _leave_task(self._loop, self) - self = None # Needed to break cycles when an exception occurs. - - def __step_handle_result(self, result): - blocking = getattr(result, '_asyncio_future_blocking', None) - if blocking is not None: + blocking = getattr(result, '_asyncio_future_blocking', None) + if blocking is not None: # Yielded Future must come from Future.__iter__(). - if futures._get_loop(result) is not self._loop: - new_exc = RuntimeError( - f'Task {self!r} got Future ' - f'{result!r} attached to a different loop') - self._loop.call_soon( - self.__step, new_exc, context=self._context) - elif blocking: - if result is self: + if futures._get_loop(result) is not self._loop: new_exc = RuntimeError( - f'Task cannot await on itself: {self!r}') + f'Task {self!r} got Future ' + f'{result!r} attached to a different loop') self._loop.call_soon( self.__step, new_exc, context=self._context) + elif blocking: + if result is self: + new_exc = RuntimeError( + f'Task cannot await on itself: {self!r}') + self._loop.call_soon( + self.__step, new_exc, context=self._context) + else: + result._asyncio_future_blocking = False + result.add_done_callback( + self.__wakeup, context=self._context) + self._fut_waiter = result + if self._must_cancel: + if self._fut_waiter.cancel( + msg=self._cancel_message): + self._must_cancel = False else: - result._asyncio_future_blocking = False - result.add_done_callback( - self.__wakeup, context=self._context) - self._fut_waiter = result - if self._must_cancel: - if self._fut_waiter.cancel( - msg=self._cancel_message): - self._must_cancel = False - else: + new_exc = RuntimeError( + f'yield was used instead of yield from ' + f'in task {self!r} with {result!r}') + self._loop.call_soon( + self.__step, new_exc, context=self._context) + + elif result is None: + # Bare yield relinquishes control for one event loop iteration. + self._loop.call_soon(self.__step, context=self._context) + elif inspect.isgenerator(result): + # Yielding a generator is just wrong. new_exc = RuntimeError( - f'yield was used instead of yield from ' - f'in task {self!r} with {result!r}') + f'yield was used instead of yield from for ' + f'generator in task {self!r} with {result!r}') + self._loop.call_soon( + self.__step, new_exc, context=self._context) + else: + # Yielding something else is an error. + new_exc = RuntimeError(f'Task got bad yield: {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) - - elif result is None: - # Bare yield relinquishes control for one event loop iteration. - self._loop.call_soon(self.__step, context=self._context) - elif inspect.isgenerator(result): - # Yielding a generator is just wrong. - new_exc = RuntimeError( - f'yield was used instead of yield from for ' - f'generator in task {self!r} with {result!r}') - self._loop.call_soon( - self.__step, new_exc, context=self._context) - else: - # Yielding something else is an error. - new_exc = RuntimeError(f'Task got bad yield: {result!r}') - self._loop.call_soon( - self.__step, new_exc, context=self._context) def __wakeup(self, future): try: @@ -909,35 +928,25 @@ def callback(): def create_eager_task_factory(custom_task_constructor): + if "eager_start" not in inspect.signature(custom_task_constructor).parameters: + raise TypeError( + "Provided constructor does not support eager task execution") + def factory(loop, coro, *, name=None, context=None): - loop._check_closed() - if not loop.is_running(): - return custom_task_constructor(coro, loop=loop, name=name, context=context) + return custom_task_constructor( + coro, loop=loop, name=name, context=context, eager_start=True) - try: - result = coro.send(None) - except StopIteration as si: - fut = loop.create_future() - fut.set_result(si.value) - return fut - except Exception as ex: - fut = loop.create_future() - fut.set_exception(ex) - return fut - else: - task = custom_task_constructor( - coro, loop=loop, name=name, context=context, coro_result=result) - if task._source_traceback: - del task._source_traceback[-1] - return task return factory eager_task_factory = create_eager_task_factory(Task) -# WeakSet containing all alive tasks. -_all_tasks = weakref.WeakSet() +# Collectively these two sets hold references to the complete set of active +# tasks. Eagerly executed tasks use a faster regular set as an optimization +# but may graduate to a WeakSet if the task blocks on IO. +_scheduled_tasks = weakref.WeakSet() +_eager_tasks = set() # Dictionary containing tasks that are currently active in # all running event loops. {EventLoop: Task} @@ -945,8 +954,13 @@ def factory(loop, coro, *, name=None, context=None): def _register_task(task): - """Register a new task in asyncio as executed by loop.""" - _all_tasks.add(task) + """Register an asyncio Task scheduled to run on an event loop.""" + _scheduled_tasks.add(task) + + +def _register_eager_task(task): + """Register an asyncio Task about to be eagerly executed.""" + _eager_tasks.add(task) def _enter_task(loop, task): @@ -965,28 +979,49 @@ def _leave_task(loop, task): del _current_tasks[loop] +def _swap_current_task(loop, task): + prev_task = _current_tasks.get(loop) + if task is None: + del _current_tasks[loop] + else: + _current_tasks[loop] = task + return prev_task + + def _unregister_task(task): - """Unregister a task.""" - _all_tasks.discard(task) + """Unregister a completed, scheduled Task.""" + _scheduled_tasks.discard(task) + + +def _unregister_eager_task(task): + """Unregister a task which finished its first eager step.""" + _eager_tasks.discard(task) _py_current_task = current_task _py_register_task = _register_task +_py_register_eager_task = _register_eager_task _py_unregister_task = _unregister_task +_py_unregister_eager_task = _unregister_eager_task _py_enter_task = _enter_task _py_leave_task = _leave_task +_py_swap_current_task = _swap_current_task try: - from _asyncio import (_register_task, _unregister_task, - _enter_task, _leave_task, - _all_tasks, _current_tasks, + from _asyncio import (_register_task, _register_eager_task, + _unregister_task, _unregister_eager_task, + _enter_task, _leave_task, _swap_current_task, + _scheduled_tasks, _eager_tasks, _current_tasks, current_task) except ImportError: pass else: _c_current_task = current_task _c_register_task = _register_task + _c_register_eager_task = _register_eager_task _c_unregister_task = _unregister_task + _c_unregister_eager_task = _unregister_eager_task _c_enter_task = _enter_task _c_leave_task = _leave_task + _c_swap_current_task = _swap_current_task diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index 927c7c365a765e..27dd83e97db335 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -8,6 +8,7 @@ #include "pycore_runtime_init.h" // _Py_ID() #include "pycore_moduleobject.h" // _PyModule_GetState() #include "structmember.h" // PyMemberDef +#include "cpython/context.h" #include // offsetof() @@ -31,8 +32,11 @@ typedef struct { all running event loops. {EventLoop: Task} */ PyObject *current_tasks; - /* WeakSet containing all alive tasks. */ - PyObject *all_tasks; + /* WeakSet containing all tasks scheduled to run on event loops. */ + PyObject *scheduled_tasks; + + /* Set containing all eagerly executing tasks. */ + PyObject *eager_tasks; /* An isinstance type cache for the 'is_coroutine()' function. */ PyObject *iscoroutine_typecache; @@ -1833,6 +1837,7 @@ class _asyncio.Task "TaskObj *" "&Task_Type" static int task_call_step_soon(asyncio_state *state, TaskObj *, PyObject *); static PyObject * task_wakeup(TaskObj *, PyObject *); static PyObject * task_step(asyncio_state *, TaskObj *, PyObject *); +static int task_eager_start(asyncio_state *state, TaskObj *task); /* ----- Task._step wrapper */ @@ -1943,7 +1948,7 @@ static PyMethodDef TaskWakeupDef = { static int register_task(asyncio_state *state, PyObject *task) { - PyObject *res = PyObject_CallMethodOneArg(state->all_tasks, + PyObject *res = PyObject_CallMethodOneArg(state->scheduled_tasks, &_Py_ID(add), task); if (res == NULL) { return -1; @@ -1952,11 +1957,16 @@ register_task(asyncio_state *state, PyObject *task) return 0; } +static int +register_eager_task(asyncio_state *state, PyObject *task) +{ + return PySet_Add(state->eager_tasks, task); +} static int unregister_task(asyncio_state *state, PyObject *task) { - PyObject *res = PyObject_CallMethodOneArg(state->all_tasks, + PyObject *res = PyObject_CallMethodOneArg(state->scheduled_tasks, &_Py_ID(discard), task); if (res == NULL) { return -1; @@ -1965,6 +1975,11 @@ unregister_task(asyncio_state *state, PyObject *task) return 0; } +static int +unregister_eager_task(asyncio_state *state, PyObject *task) +{ + return PySet_Discard(state->eager_tasks, task); +} static int enter_task(asyncio_state *state, PyObject *loop, PyObject *task) @@ -2018,6 +2033,51 @@ leave_task(asyncio_state *state, PyObject *loop, PyObject *task) return _PyDict_DelItem_KnownHash(state->current_tasks, loop, hash); } +static PyObject * +swap_current_task(asyncio_state *state, PyObject *loop, PyObject *task) +{ + PyObject *prev_task; + Py_hash_t hash; + hash = PyObject_Hash(loop); + if (hash == -1) { + return NULL; + } + + prev_task = _PyDict_GetItem_KnownHash(state->current_tasks, loop, hash); + if (prev_task == NULL) { + prev_task = Py_None; + } + + if (task == Py_None) { + if (_PyDict_DelItem_KnownHash(state->current_tasks, loop, hash) == -1) { + return NULL; + } + } else { + if (_PyDict_SetItem_KnownHash(state->current_tasks, loop, task, hash) == -1) { + return NULL; + } + } + + Py_INCREF(prev_task); + + return prev_task; +} + +static int +is_loop_running(PyObject *loop) +{ + PyObject *func = PyObject_GetAttr(loop, &_Py_ID(is_running)); + if (func == NULL) { + PyErr_Format(PyExc_TypeError, "Loop missing is_running()"); + return -1; + } + PyObject *res = PyObject_CallNoArgs(func); + int retval = Py_IsTrue(res); + Py_DECREF(func); + Py_DECREF(res); + return !!retval; +} + /* ----- Task */ /*[clinic input] @@ -2028,7 +2088,7 @@ _asyncio.Task.__init__ loop: object = None name: object = None context: object = None - coro_result: object = NULL + eager_start: bool = False A coroutine wrapped in a Future. [clinic start generated code]*/ @@ -2036,8 +2096,8 @@ A coroutine wrapped in a Future. static int _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop, PyObject *name, PyObject *context, - PyObject *coro_result) -/*[clinic end generated code: output=e241855787412a77 input=3fcd7fb1c00d3f87]*/ + int eager_start) +/*[clinic end generated code: output=7aced2d27836f1a1 input=18e3f113a51b829d]*/ { if (future_init((FutureObj*)self, loop)) { return -1; @@ -2085,17 +2145,22 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop, return -1; } - if (coro_result == NULL) { - if (task_call_step_soon(state, self, NULL)) { + if (eager_start) { + int loop_running = is_loop_running(self->task_loop); + if (loop_running == -1) { return -1; } - } - else { - PyObject * res = task_step_handle_result_impl(state, self, coro_result); - if (res == NULL) { - return -1; + if (loop_running) { + if (task_eager_start(state, self)) { + return -1; + } + return 0; } } + + if (task_call_step_soon(state, self, NULL)) { + return -1; + } return register_task(state, (PyObject*)self); } @@ -2835,11 +2900,9 @@ task_step_impl(asyncio_state *state, TaskObj *task, PyObject *exc) } PyObject *ret = task_step_handle_result_impl(state, task, result); - Py_XDECREF(result); return ret; fail: - Py_XDECREF(result); return NULL; } @@ -2886,8 +2949,7 @@ task_step_handle_result_impl(asyncio_state *state, TaskObj *task, PyObject *resu Py_DECREF(tmp); /* task._fut_waiter = result */ - Py_INCREF(result); - task->task_fut_waiter = result; + task->task_fut_waiter = result; /* no incref is necessary */ if (task->task_must_cancel) { PyObject *r; @@ -2980,8 +3042,7 @@ task_step_handle_result_impl(asyncio_state *state, TaskObj *task, PyObject *resu Py_DECREF(tmp); /* task._fut_waiter = result */ - Py_INCREF(result); - task->task_fut_waiter = result; + task->task_fut_waiter = result; /* no incref is necessary */ if (task->task_must_cancel) { PyObject *r; @@ -3016,18 +3077,21 @@ task_step_handle_result_impl(asyncio_state *state, TaskObj *task, PyObject *resu state, task, PyExc_RuntimeError, "yield was used instead of yield from for " "generator in task %R with %R", task, result); + Py_DECREF(result); return o; } /* The `result` is none of the above */ o = task_set_error_soon( state, task, PyExc_RuntimeError, "Task got bad yield: %R", result); + Py_DECREF(result); return o; self_await: o = task_set_error_soon( state, task, PyExc_RuntimeError, "Task cannot await on itself: %R", task); + Py_DECREF(result); return o; yield_insteadof_yf: @@ -3036,6 +3100,7 @@ task_step_handle_result_impl(asyncio_state *state, TaskObj *task, PyObject *resu "yield was used instead of yield from " "in task %R with %R", task, result); + Py_DECREF(result); return o; different_loop: @@ -3043,9 +3108,11 @@ task_step_handle_result_impl(asyncio_state *state, TaskObj *task, PyObject *resu state, task, PyExc_RuntimeError, "Task %R got Future %R attached to a different loop", task, result); + Py_DECREF(result); return o; fail: + Py_XDECREF(result); return NULL; } @@ -3077,6 +3144,64 @@ task_step(asyncio_state *state, TaskObj *task, PyObject *exc) } } +static int +task_eager_start(asyncio_state *state, TaskObj *task) +{ + PyObject *prevtask = swap_current_task(state, task->task_loop, (PyObject *)task); + if (prevtask == NULL) { + return -1; + } + + if (register_eager_task(state, (PyObject *)task) == -1) { + Py_DECREF(prevtask); + return -1; + } + + if (PyContext_Enter(task->task_context) == -1) { + Py_DECREF(prevtask); + return -1; + } + + int retval = 0; + + PyObject *stepres = task_step_impl(state, task, NULL); + if (stepres == NULL) { + PyObject *exc = PyErr_GetRaisedException(); + _PyErr_ChainExceptions1(exc); + retval = -1; + } else { + Py_DECREF(stepres); + } + + PyObject *curtask = swap_current_task(state, task->task_loop, prevtask); + Py_DECREF(prevtask); + if (curtask == NULL) { + retval = -1; + } else { + assert(curtask == (PyObject *)task); + Py_DECREF(curtask); + } + + if (unregister_eager_task(state, (PyObject *)task) == -1) { + retval = -1; + } + + if (PyContext_Exit(task->task_context) == -1) { + retval = -1; + } + + if (task->task_state == STATE_PENDING) { + if (register_task(state, (PyObject *)task) == -1) { + retval = -1; + } + } else { + // This seems to really help performance on pyperformance benchmarks + Py_CLEAR(task->task_coro); + } + + return retval; +} + static PyObject * task_wakeup(TaskObj *task, PyObject *o) { @@ -3240,6 +3365,27 @@ _asyncio__register_task_impl(PyObject *module, PyObject *task) Py_RETURN_NONE; } +/*[clinic input] +_asyncio._register_eager_task + + task: object + +Register a new task in asyncio as executed by loop. + +Returns None. +[clinic start generated code]*/ + +static PyObject * +_asyncio__register_eager_task_impl(PyObject *module, PyObject *task) +/*[clinic end generated code: output=dfe1d45367c73f1a input=237f684683398c51]*/ +{ + asyncio_state *state = get_asyncio_state(module); + if (register_eager_task(state, task) < 0) { + return NULL; + } + Py_RETURN_NONE; +} + /*[clinic input] _asyncio._unregister_task @@ -3262,6 +3408,27 @@ _asyncio__unregister_task_impl(PyObject *module, PyObject *task) Py_RETURN_NONE; } +/*[clinic input] +_asyncio._unregister_eager_task + + task: object + +Unregister a task. + +Returns None. +[clinic start generated code]*/ + +static PyObject * +_asyncio__unregister_eager_task_impl(PyObject *module, PyObject *task) +/*[clinic end generated code: output=a426922bd07f23d1 input=9d07401ef14ee048]*/ +{ + asyncio_state *state = get_asyncio_state(module); + if (unregister_eager_task(state, task) < 0) { + return NULL; + } + Py_RETURN_NONE; +} + /*[clinic input] _asyncio._enter_task @@ -3313,6 +3480,27 @@ _asyncio__leave_task_impl(PyObject *module, PyObject *loop, PyObject *task) } +/*[clinic input] +_asyncio._swap_current_task + + loop: object + task: object + +Temporarily swap in the supplied task and return the original one (or None). + +This is intended for use during eager coroutine execution. + +[clinic start generated code]*/ + +static PyObject * +_asyncio__swap_current_task_impl(PyObject *module, PyObject *loop, + PyObject *task) +/*[clinic end generated code: output=9f88de958df74c7e input=c9c72208d3d38b6c]*/ +{ + return swap_current_task(get_asyncio_state(module), loop, task); +} + + /*[clinic input] _asyncio.current_task @@ -3394,7 +3582,8 @@ module_traverse(PyObject *mod, visitproc visit, void *arg) Py_VISIT(state->asyncio_InvalidStateError); Py_VISIT(state->asyncio_CancelledError); - Py_VISIT(state->all_tasks); + Py_VISIT(state->scheduled_tasks); + Py_VISIT(state->eager_tasks); Py_VISIT(state->current_tasks); Py_VISIT(state->iscoroutine_typecache); @@ -3431,7 +3620,8 @@ module_clear(PyObject *mod) Py_CLEAR(state->asyncio_InvalidStateError); Py_CLEAR(state->asyncio_CancelledError); - Py_CLEAR(state->all_tasks); + Py_CLEAR(state->scheduled_tasks); + Py_CLEAR(state->eager_tasks); Py_CLEAR(state->current_tasks); Py_CLEAR(state->iscoroutine_typecache); @@ -3511,9 +3701,14 @@ module_init(asyncio_state *state) PyObject *weak_set; WITH_MOD("weakref") GET_MOD_ATTR(weak_set, "WeakSet"); - state->all_tasks = PyObject_CallNoArgs(weak_set); + state->scheduled_tasks = PyObject_CallNoArgs(weak_set); Py_CLEAR(weak_set); - if (state->all_tasks == NULL) { + if (state->scheduled_tasks == NULL) { + goto fail; + } + + state->eager_tasks = PySet_New(NULL); + if (state->eager_tasks == NULL) { goto fail; } @@ -3537,9 +3732,12 @@ static PyMethodDef asyncio_methods[] = { _ASYNCIO__GET_RUNNING_LOOP_METHODDEF _ASYNCIO__SET_RUNNING_LOOP_METHODDEF _ASYNCIO__REGISTER_TASK_METHODDEF + _ASYNCIO__REGISTER_EAGER_TASK_METHODDEF _ASYNCIO__UNREGISTER_TASK_METHODDEF + _ASYNCIO__UNREGISTER_EAGER_TASK_METHODDEF _ASYNCIO__ENTER_TASK_METHODDEF _ASYNCIO__LEAVE_TASK_METHODDEF + _ASYNCIO__SWAP_CURRENT_TASK_METHODDEF {NULL, NULL} }; @@ -3576,7 +3774,11 @@ module_exec(PyObject *mod) return -1; } - if (PyModule_AddObjectRef(mod, "_all_tasks", state->all_tasks) < 0) { + if (PyModule_AddObjectRef(mod, "_scheduled_tasks", state->scheduled_tasks) < 0) { + return -1; + } + + if (PyModule_AddObjectRef(mod, "_eager_tasks", state->eager_tasks) < 0) { return -1; } diff --git a/Modules/clinic/_asynciomodule.c.h b/Modules/clinic/_asynciomodule.c.h index 47a678b50784b1..6a780a80cd0bc4 100644 --- a/Modules/clinic/_asynciomodule.c.h +++ b/Modules/clinic/_asynciomodule.c.h @@ -482,8 +482,7 @@ _asyncio_Future__make_cancelled_error(FutureObj *self, PyObject *Py_UNUSED(ignor } PyDoc_STRVAR(_asyncio_Task___init____doc__, -"Task(coro, *, loop=None, name=None, context=None,\n" -" coro_result=)\n" +"Task(coro, *, loop=None, name=None, context=None, eager_start=False)\n" "--\n" "\n" "A coroutine wrapped in a Future."); @@ -491,7 +490,7 @@ PyDoc_STRVAR(_asyncio_Task___init____doc__, static int _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop, PyObject *name, PyObject *context, - PyObject *coro_result); + int eager_start); static int _asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs) @@ -506,7 +505,7 @@ _asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs) PyObject *ob_item[NUM_KEYWORDS]; } _kwtuple = { .ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS) - .ob_item = { &_Py_ID(coro), &_Py_ID(loop), &_Py_ID(name), &_Py_ID(context), &_Py_ID(coro_result), }, + .ob_item = { &_Py_ID(coro), &_Py_ID(loop), &_Py_ID(name), &_Py_ID(context), &_Py_ID(eager_start), }, }; #undef NUM_KEYWORDS #define KWTUPLE (&_kwtuple.ob_base.ob_base) @@ -515,7 +514,7 @@ _asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs) # define KWTUPLE NULL #endif // !Py_BUILD_CORE - static const char * const _keywords[] = {"coro", "loop", "name", "context", "coro_result", NULL}; + static const char * const _keywords[] = {"coro", "loop", "name", "context", "eager_start", NULL}; static _PyArg_Parser _parser = { .keywords = _keywords, .fname = "Task", @@ -530,7 +529,7 @@ _asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs) PyObject *loop = Py_None; PyObject *name = Py_None; PyObject *context = Py_None; - PyObject *coro_result = NULL; + int eager_start = 0; fastargs = _PyArg_UnpackKeywords(_PyTuple_CAST(args)->ob_item, nargs, kwargs, NULL, &_parser, 1, 1, 0, argsbuf); if (!fastargs) { @@ -558,9 +557,12 @@ _asyncio_Task___init__(PyObject *self, PyObject *args, PyObject *kwargs) goto skip_optional_kwonly; } } - coro_result = fastargs[4]; + eager_start = PyObject_IsTrue(fastargs[4]); + if (eager_start < 0) { + goto exit; + } skip_optional_kwonly: - return_value = _asyncio_Task___init___impl((TaskObj *)self, coro, loop, name, context, coro_result); + return_value = _asyncio_Task___init___impl((TaskObj *)self, coro, loop, name, context, eager_start); exit: return return_value; @@ -1073,6 +1075,63 @@ _asyncio__register_task(PyObject *module, PyObject *const *args, Py_ssize_t narg return return_value; } +PyDoc_STRVAR(_asyncio__register_eager_task__doc__, +"_register_eager_task($module, /, task)\n" +"--\n" +"\n" +"Register a new task in asyncio as executed by loop.\n" +"\n" +"Returns None."); + +#define _ASYNCIO__REGISTER_EAGER_TASK_METHODDEF \ + {"_register_eager_task", _PyCFunction_CAST(_asyncio__register_eager_task), METH_FASTCALL|METH_KEYWORDS, _asyncio__register_eager_task__doc__}, + +static PyObject * +_asyncio__register_eager_task_impl(PyObject *module, PyObject *task); + +static PyObject * +_asyncio__register_eager_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + #if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE) + + #define NUM_KEYWORDS 1 + static struct { + PyGC_Head _this_is_not_used; + PyObject_VAR_HEAD + PyObject *ob_item[NUM_KEYWORDS]; + } _kwtuple = { + .ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS) + .ob_item = { &_Py_ID(task), }, + }; + #undef NUM_KEYWORDS + #define KWTUPLE (&_kwtuple.ob_base.ob_base) + + #else // !Py_BUILD_CORE + # define KWTUPLE NULL + #endif // !Py_BUILD_CORE + + static const char * const _keywords[] = {"task", NULL}; + static _PyArg_Parser _parser = { + .keywords = _keywords, + .fname = "_register_eager_task", + .kwtuple = KWTUPLE, + }; + #undef KWTUPLE + PyObject *argsbuf[1]; + PyObject *task; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf); + if (!args) { + goto exit; + } + task = args[0]; + return_value = _asyncio__register_eager_task_impl(module, task); + +exit: + return return_value; +} + PyDoc_STRVAR(_asyncio__unregister_task__doc__, "_unregister_task($module, /, task)\n" "--\n" @@ -1130,6 +1189,63 @@ _asyncio__unregister_task(PyObject *module, PyObject *const *args, Py_ssize_t na return return_value; } +PyDoc_STRVAR(_asyncio__unregister_eager_task__doc__, +"_unregister_eager_task($module, /, task)\n" +"--\n" +"\n" +"Unregister a task.\n" +"\n" +"Returns None."); + +#define _ASYNCIO__UNREGISTER_EAGER_TASK_METHODDEF \ + {"_unregister_eager_task", _PyCFunction_CAST(_asyncio__unregister_eager_task), METH_FASTCALL|METH_KEYWORDS, _asyncio__unregister_eager_task__doc__}, + +static PyObject * +_asyncio__unregister_eager_task_impl(PyObject *module, PyObject *task); + +static PyObject * +_asyncio__unregister_eager_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + #if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE) + + #define NUM_KEYWORDS 1 + static struct { + PyGC_Head _this_is_not_used; + PyObject_VAR_HEAD + PyObject *ob_item[NUM_KEYWORDS]; + } _kwtuple = { + .ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS) + .ob_item = { &_Py_ID(task), }, + }; + #undef NUM_KEYWORDS + #define KWTUPLE (&_kwtuple.ob_base.ob_base) + + #else // !Py_BUILD_CORE + # define KWTUPLE NULL + #endif // !Py_BUILD_CORE + + static const char * const _keywords[] = {"task", NULL}; + static _PyArg_Parser _parser = { + .keywords = _keywords, + .fname = "_unregister_eager_task", + .kwtuple = KWTUPLE, + }; + #undef KWTUPLE + PyObject *argsbuf[1]; + PyObject *task; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf); + if (!args) { + goto exit; + } + task = args[0]; + return_value = _asyncio__unregister_eager_task_impl(module, task); + +exit: + return return_value; +} + PyDoc_STRVAR(_asyncio__enter_task__doc__, "_enter_task($module, /, loop, task)\n" "--\n" @@ -1252,6 +1368,66 @@ _asyncio__leave_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, return return_value; } +PyDoc_STRVAR(_asyncio__swap_current_task__doc__, +"_swap_current_task($module, /, loop, task)\n" +"--\n" +"\n" +"Temporarily swap in the supplied task and return the original one (or None).\n" +"\n" +"This is intended for use during eager coroutine execution."); + +#define _ASYNCIO__SWAP_CURRENT_TASK_METHODDEF \ + {"_swap_current_task", _PyCFunction_CAST(_asyncio__swap_current_task), METH_FASTCALL|METH_KEYWORDS, _asyncio__swap_current_task__doc__}, + +static PyObject * +_asyncio__swap_current_task_impl(PyObject *module, PyObject *loop, + PyObject *task); + +static PyObject * +_asyncio__swap_current_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + #if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE) + + #define NUM_KEYWORDS 2 + static struct { + PyGC_Head _this_is_not_used; + PyObject_VAR_HEAD + PyObject *ob_item[NUM_KEYWORDS]; + } _kwtuple = { + .ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS) + .ob_item = { &_Py_ID(loop), &_Py_ID(task), }, + }; + #undef NUM_KEYWORDS + #define KWTUPLE (&_kwtuple.ob_base.ob_base) + + #else // !Py_BUILD_CORE + # define KWTUPLE NULL + #endif // !Py_BUILD_CORE + + static const char * const _keywords[] = {"loop", "task", NULL}; + static _PyArg_Parser _parser = { + .keywords = _keywords, + .fname = "_swap_current_task", + .kwtuple = KWTUPLE, + }; + #undef KWTUPLE + PyObject *argsbuf[2]; + PyObject *loop; + PyObject *task; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 2, 2, 0, argsbuf); + if (!args) { + goto exit; + } + loop = args[0]; + task = args[1]; + return_value = _asyncio__swap_current_task_impl(module, loop, task); + +exit: + return return_value; +} + PyDoc_STRVAR(_asyncio_current_task__doc__, "current_task($module, /, loop=None)\n" "--\n" @@ -1311,4 +1487,4 @@ _asyncio_current_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, exit: return return_value; } -/*[clinic end generated code: output=d7cd98454c53b85a input=a9049054013a1b77]*/ +/*[clinic end generated code: output=6b0e283177b07639 input=a9049054013a1b77]*/ From e7743f6cd455fc709d776306d5554d018744366b Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Thu, 20 Apr 2023 16:16:23 -0600 Subject: [PATCH 07/28] Fix task-counting tests for new impl --- Lib/test/test_asyncio/test_eager_task_factory.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/Lib/test/test_asyncio/test_eager_task_factory.py b/Lib/test/test_asyncio/test_eager_task_factory.py index 25b2c510b5c624..60751d1eb7290d 100644 --- a/Lib/test/test_asyncio/test_eager_task_factory.py +++ b/Lib/test/test_asyncio/test_eager_task_factory.py @@ -760,8 +760,10 @@ def __init__(self, loop, *, task_class, eager): self.suspense_count = 0 self.task_count = 0 - def CountingTask(*args, **kwargs): - self.task_count += 1 + def CountingTask(*args, eager_start=False, **kwargs): + if not eager_start: + self.task_count += 1 + kwargs["eager_start"] = eager_start return task_class(*args, **kwargs) if eager: @@ -821,11 +823,11 @@ def setUp(self): def test_awaitables_chain(self): observed_depth = self.loop.run_until_complete(awaitable_chain(100)) self.assertEqual(observed_depth, 100) - self.assertEqual(self.counter.get(), 1) + self.assertEqual(self.counter.get(), 0 if self.eager else 1) def test_recursive_taskgroups(self): num_tasks = self.loop.run_until_complete(recursive_taskgroups(5, 4)) - self.assertEqual(num_tasks, self.expected_task_count - 1) + # self.assertEqual(num_tasks, self.expected_task_count - 1) self.assertEqual(self.counter.get(), self.expected_task_count) def test_recursive_gather(self): @@ -840,7 +842,7 @@ class BaseNonEagerTaskFactoryTests(BaseTaskCountingTests): class BaseEagerTaskFactoryTests(BaseTaskCountingTests): eager = True - expected_task_count = 156 # 1 + 5 + 5^2 + 5^3 + expected_task_count = 0 class NonEagerTests(BaseNonEagerTaskFactoryTests, test_utils.TestCase): From 10a03a07d04a66a84fd1549ed761a0d4c71b289f Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Thu, 20 Apr 2023 20:58:05 -0600 Subject: [PATCH 08/28] Fix test_task_exc_handler_correct_context Co-authored-by: Jacob Bower --- Lib/asyncio/tasks.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index c32fc4af34e6df..782d1194c1be44 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -273,6 +273,7 @@ def __eager_start(self): finally: if self.done(): self._coro = None + self = None # Needed to break cycles when an exception occurs. else: _register_task(self) @@ -365,6 +366,8 @@ def __step_run_and_handle_result(self, exc): new_exc = RuntimeError(f'Task got bad yield: {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) + finally: + self = None # Needed to break cycles when an exception occurs. def __wakeup(self, future): try: From 441fd92e0bade657f90584f1168e1c31b82f6e71 Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Mon, 24 Apr 2023 17:42:13 -0600 Subject: [PATCH 09/28] add jbower credit --- Doc/whatsnew/3.12.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/whatsnew/3.12.rst b/Doc/whatsnew/3.12.rst index ccec6f61c8157f..3fafd3049b17cd 100644 --- a/Doc/whatsnew/3.12.rst +++ b/Doc/whatsnew/3.12.rst @@ -556,7 +556,7 @@ Optimizations * Added :func:`asyncio.eager_task_factory` and :func:`asyncio.create_eager_task_factory` functions to allow opting an event loop in to eager task execution, speeding up some use-cases by up to 50%. - (Contributed by Itamar O in :gh:`102853`) + (Contributed by Jacob Bower & Itamar O in :gh:`102853`) CPython bytecode changes From 4c46a7225c9f11ae386bad01e557a6aaf2830dea Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Mon, 24 Apr 2023 17:46:27 -0600 Subject: [PATCH 10/28] cleanup recursive_taskgroups test case --- Lib/test/test_asyncio/test_eager_task_factory.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/Lib/test/test_asyncio/test_eager_task_factory.py b/Lib/test/test_asyncio/test_eager_task_factory.py index 60751d1eb7290d..c9806a0d853d31 100644 --- a/Lib/test/test_asyncio/test_eager_task_factory.py +++ b/Lib/test/test_asyncio/test_eager_task_factory.py @@ -785,18 +785,13 @@ async def awaitable_chain(depth): async def recursive_taskgroups(width, depth): if depth == 0: - return 0 + return async with asyncio.TaskGroup() as tg: futures = [ tg.create_task(recursive_taskgroups(width, depth - 1)) for _ in range(width) ] - return sum( - (1 if isinstance(fut, (asyncio.Task, tasks._CTask, tasks._PyTask)) else 0) - + fut.result() - for fut in futures - ) async def recursive_gather(width, depth): @@ -827,7 +822,6 @@ def test_awaitables_chain(self): def test_recursive_taskgroups(self): num_tasks = self.loop.run_until_complete(recursive_taskgroups(5, 4)) - # self.assertEqual(num_tasks, self.expected_task_count - 1) self.assertEqual(self.counter.get(), self.expected_task_count) def test_recursive_gather(self): From 14b6f587f492adc1ae510267a294ad9017d68552 Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Tue, 25 Apr 2023 10:20:07 -0600 Subject: [PATCH 11/28] Update asyncio documentation with latest state of the PR --- Doc/library/asyncio-task.rst | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index ade4e140fe9110..fd7c4805f2b3ec 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -536,19 +536,20 @@ Eager Task Factory When using this factory (via ``loop.set_task_factory(asyncio.eager_task_factory)``), coroutines that are able to complete synchronously (without blocking) - are returned immediately as a completed :class:`Future`. + are returned immediately as a finished :class:`Task`. - This task factory tries to execute the coroutine ``coro`` immediately - (before creating and scheduling a task to the event loop), until it either - blocks, returns, or raises. - If the coroutine returns or raises, a :class:`Future` is returned, and no - task is created or scheduled to the event loop. If the coroutine blocks, - a :class:`Task` is constructed and returned at that point. + This task factory attempts to execute the coroutine ``coro`` immediately + (before scheduling the task to the event loop), until it either blocks, + returns, or raises. + If the coroutine returns or raises, a finished :class:`Task` is returned, + and the task is never scheduled to the event loop. If the coroutine blocks, + the (pending) :class:`Task` is scheduled and returned. .. note:: The fact that the coroutine starts execution immediately is a semantic change, and might lead to application behavior changes, depending on the application. + For example, the order of execution of tasks is likely to change. .. versionadded:: 3.12 From 0f9185c420fe657104464a07ba0fd85d7d3dc149 Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Tue, 25 Apr 2023 10:28:52 -0600 Subject: [PATCH 12/28] don't add coro to the task repr if coro is None --- Lib/asyncio/base_tasks.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/Lib/asyncio/base_tasks.py b/Lib/asyncio/base_tasks.py index 26298e638cbf0d..c907b683413732 100644 --- a/Lib/asyncio/base_tasks.py +++ b/Lib/asyncio/base_tasks.py @@ -15,11 +15,13 @@ def _task_repr_info(task): info.insert(1, 'name=%r' % task.get_name()) - coro = coroutines._format_coroutine(task._coro) - info.insert(2, f'coro=<{coro}>') - if task._fut_waiter is not None: - info.insert(3, f'wait_for={task._fut_waiter!r}') + info.insert(2, f'wait_for={task._fut_waiter!r}') + + if task._coro: + coro = coroutines._format_coroutine(task._coro) + info.insert(2, f'coro=<{coro}>') + return info From 679534ae0ccbde07cd8b80c52bfb15a6c93863d7 Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Tue, 25 Apr 2023 10:38:46 -0600 Subject: [PATCH 13/28] also update the NEWS entry --- .../2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/Misc/NEWS.d/next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst b/Misc/NEWS.d/next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst index eb1861b4e5aaaf..d899590ffc10fb 100644 --- a/Misc/NEWS.d/next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst +++ b/Misc/NEWS.d/next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst @@ -1,4 +1,6 @@ -Implemented an eager task factory in asyncio. When set as a task factory on -an event loop, it performs eager execution of coroutines and returns a -completed future instead of scheduling a task to the event loop if the -coroutine can complete without suspending. +Implemented an eager task factory in asyncio. +When used as a task factory on an event loop, it performs eager execution of +coroutines. coroutines that are able to complete synchronously (e.g. return or +raise without blocking) are returned immediately as a finished task, and the +task is never scheduled to the event loop. If the coroutine blocks, the +(pending) is scheduled and returned. From 70bb3d424805aa9303af55a7b5ba7ba059ed2ef6 Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Tue, 25 Apr 2023 11:04:38 -0600 Subject: [PATCH 14/28] add error check when using _PyDict_GetItem_KnownHash in swap_current_task --- Modules/_asynciomodule.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index 27dd83e97db335..a81f5f4357740a 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -2045,6 +2045,9 @@ swap_current_task(asyncio_state *state, PyObject *loop, PyObject *task) prev_task = _PyDict_GetItem_KnownHash(state->current_tasks, loop, hash); if (prev_task == NULL) { + if (PyErr_Occurred()) { + return NULL; + } prev_task = Py_None; } From 7edcf3f5bd4d04982ca02229a8189d0f800329b4 Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Tue, 25 Apr 2023 12:58:01 -0600 Subject: [PATCH 15/28] focus the eager task factory test suite on testing eager execution semantics --- .../test_asyncio/test_eager_task_factory.py | 750 ++---------------- 1 file changed, 60 insertions(+), 690 deletions(-) diff --git a/Lib/test/test_asyncio/test_eager_task_factory.py b/Lib/test/test_asyncio/test_eager_task_factory.py index c9806a0d853d31..0bd956b2952660 100644 --- a/Lib/test/test_asyncio/test_eager_task_factory.py +++ b/Lib/test/test_asyncio/test_eager_task_factory.py @@ -37,722 +37,92 @@ def setUp(self): def test_eager_task_factory_set(self): self.assertIs(self.loop.get_task_factory(), asyncio.eager_task_factory) - def test_generic_alias(self): - task = tasks.Task[str] - self.assertEqual(task.__args__, (str,)) - self.assertIsInstance(task, GenericAlias) + def test_await_future_during_eager_step(self): - def test_task_cancel_message_getter(self): - async def coro(): - pass - t = self.new_task(self.loop, coro()) - self.assertTrue(hasattr(t, '_cancel_message')) - self.assertEqual(t._cancel_message, None) + async def set_result(fut, val): + fut.set_result(val) - t.cancel('my message') - self.assertEqual(t._cancel_message, 'my message') + async def run(): + fut = self.loop.create_future() + t = self.loop.create_task(set_result(fut, 'my message')) + # assert the eager step completed the task + self.assertTrue(t.done()) + return await fut - with self.assertRaises(asyncio.CancelledError) as cm: - self.loop.run_until_complete(t) + result = self.loop.run_until_complete(run()) + self.assertEqual(result,'my message') - self.assertEqual('my message', cm.exception.args[0]) + def test_eager_completion(self): - def test_task_cancel_message_setter(self): async def coro(): - pass - t = self.new_task(self.loop, coro()) - t.cancel('my message') - t._cancel_message = 'my new message' - self.assertEqual(t._cancel_message, 'my new message') - - with self.assertRaises(asyncio.CancelledError) as cm: - self.loop.run_until_complete(t) - - self.assertEqual('my new message', cm.exception.args[0]) - - def test_task_del_collect(self): - class Evil: - def __del__(self): - gc.collect() + return 'hello' async def run(): - return Evil() + await asyncio.sleep(0.1) + t = self.loop.create_task(coro()) + # assert the eager step completed the task + self.assertTrue(t.done()) + return await t - self.loop.run_until_complete( - asyncio.gather(*[ - self.new_task(self.loop, run()) for _ in range(100) - ])) + result = self.loop.run_until_complete(run()) + self.assertEqual(result, 'hello') - def test_other_loop_future(self): - other_loop = asyncio.new_event_loop() - fut = self.new_future(other_loop) + def test_block_after_eager_step(self): - async def run(fut): - await fut + async def coro(): + await asyncio.sleep(0.1) + return 'finished after blocking' - try: - with self.assertRaisesRegex(RuntimeError, - r'Task .* got Future .* attached'): - self.loop.run_until_complete(run(fut)) - finally: - other_loop.close() + async def run(): + await asyncio.sleep(0.1) + t = self.loop.create_task(coro()) + self.assertFalse(t.done()) + result = await t + self.assertTrue(t.done()) + return result - def test_task_awaits_on_itself(self): + result = self.loop.run_until_complete(run()) + self.assertEqual(result, 'finished after blocking') - async def test(): - await task + def test_cancellation_after_eager_completion(self): - task = asyncio.ensure_future(test(), loop=self.loop) + async def coro(): + return 'finished without blocking' - with self.assertRaisesRegex(RuntimeError, - 'Task cannot await on itself'): - self.loop.run_until_complete(task) + async def run(): + await asyncio.sleep(0.1) + t = self.loop.create_task(coro()) + t.cancel() + result = await t + # finished task can't be cancelled + self.assertFalse(t.cancelled()) + return result - def test_exception_chaining_after_await(self): - # Test that when awaiting on a task when an exception is already - # active, if the task raises an exception it will be chained - # with the original. + result = self.loop.run_until_complete(run()) + self.assertEqual(result, 'finished without blocking') - async def raise_error(): - raise ValueError + def test_cancellation_after_eager_step_blocks(self): - async def run(): - try: - raise KeyError(3) - except Exception as exc: - task = self.new_task(self.loop, raise_error()) - try: - await task - except Exception as exc: - self.assertEqual(type(exc), ValueError) - chained = exc.__context__ - self.assertEqual((type(chained), chained.args), - (KeyError, (3,))) - - task = self.new_task(self.loop, run()) - self.loop.run_until_complete(task) - - def test_exception_chaining_after_await_with_context_cycle(self): - # Check trying to create an exception context cycle: - # https://bugs.python.org/issue40696 - has_cycle = None - - async def process_exc(exc): - raise exc + async def coro(): + await asyncio.sleep(0.1) + return 'finished after blocking' async def run(): - nonlocal has_cycle + await asyncio.sleep(0.1) + t = self.loop.create_task(coro()) + t.cancel('cancellation message') + self.assertGreater(t.cancelling(), 0) try: - raise KeyError('a') - except Exception as exc: - task = self.new_task(self.loop, process_exc(exc)) - try: - await task - except BaseException as exc: - has_cycle = (exc is exc.__context__) - # Prevent a hang if has_cycle is True. - exc.__context__ = None - - task = self.new_task(self.loop, run()) - self.loop.run_until_complete(task) - # This also distinguishes from the initial has_cycle=None. - self.assertEqual(has_cycle, False) - - def test_cancelling(self): - - async def task(): - await asyncio.sleep(10) - - t = self.new_task(self.loop, task()) - self.assertFalse(t.cancelling()) - self.assertNotIn(" cancelling ", repr(t)) - self.assertTrue(t.cancel()) - self.assertTrue(t.cancelling()) - self.assertIn(" cancelling ", repr(t)) - - # Since we commented out two lines from Task.cancel(), - # this t.cancel() call now returns True. - # self.assertFalse(t.cancel()) - self.assertTrue(t.cancel()) - - with self.assertRaises(asyncio.CancelledError): - self.loop.run_until_complete(t) - - def test_uncancel_basic(self): - - async def task(): - try: - await asyncio.sleep(10) + result = await t except asyncio.CancelledError: - asyncio.current_task().uncancel() - await asyncio.sleep(10) - - t = self.new_task(self.loop, task()) - self.loop.run_until_complete(asyncio.sleep(0.01)) - - # Cancel first sleep - self.assertTrue(t.cancel()) - self.assertIn(" cancelling ", repr(t)) - self.assertEqual(t.cancelling(), 1) - self.assertFalse(t.cancelled()) # Task is still not complete - self.loop.run_until_complete(asyncio.sleep(0.01)) - - # after .uncancel() - self.assertNotIn(" cancelling ", repr(t)) - self.assertEqual(t.cancelling(), 0) - self.assertFalse(t.cancelled()) # Task is still not complete - - # Cancel second sleep - self.assertTrue(t.cancel()) - self.assertEqual(t.cancelling(), 1) - self.assertFalse(t.cancelled()) # Task is still not complete - with self.assertRaises(asyncio.CancelledError): - self.loop.run_until_complete(t) - self.assertTrue(t.cancelled()) # Finally, task complete - self.assertTrue(t.done()) - - # uncancel is no longer effective after the task is complete - t.uncancel() - self.assertTrue(t.cancelled()) - self.assertTrue(t.done()) - - def test_cancel_with_message_then_future_result(self): - # Test Future.result() after calling cancel() with a message. - cases = [ - ((), ()), - ((None,), ()), - (('my message',), ('my message',)), - # Non-string values should roundtrip. - ((5,), (5,)), - ] - for cancel_args, expected_args in cases: - with self.subTest(cancel_args=cancel_args): - - async def sleep(): - await asyncio.sleep(10) - - async def coro(): - task = self.new_task(self.loop, sleep()) - await asyncio.sleep(0) - task.cancel(*cancel_args) - done, pending = await asyncio.wait([task]) - task.result() - - task = self.new_task(self.loop, coro()) - with self.assertRaises(asyncio.CancelledError) as cm: - self.loop.run_until_complete(task) - exc = cm.exception - self.assertEqual(exc.args, expected_args) - - actual = get_innermost_context(exc) - self.assertEqual(actual, - (asyncio.CancelledError, expected_args, 0)) - - def test_cancel_with_message_then_future_exception(self): - # Test Future.exception() after calling cancel() with a message. - cases = [ - ((), ()), - ((None,), ()), - (('my message',), ('my message',)), - # Non-string values should roundtrip. - ((5,), (5,)), - ] - for cancel_args, expected_args in cases: - with self.subTest(cancel_args=cancel_args): - - async def sleep(): - await asyncio.sleep(10) - - async def coro(): - task = self.new_task(self.loop, sleep()) - await asyncio.sleep(0) - task.cancel(*cancel_args) - done, pending = await asyncio.wait([task]) - task.exception() - - task = self.new_task(self.loop, coro()) - with self.assertRaises(asyncio.CancelledError) as cm: - self.loop.run_until_complete(task) - exc = cm.exception - self.assertEqual(exc.args, expected_args) - - actual = get_innermost_context(exc) - self.assertEqual(actual, - (asyncio.CancelledError, expected_args, 0)) - - def test_cancel_with_message_before_starting_task(self): - - async def sleep(): - await asyncio.sleep(10) - - async def coro(): - task = self.new_task(self.loop, sleep()) - # We deliberately leave out the sleep here. - task.cancel('my message') - done, pending = await asyncio.wait([task]) - task.exception() + # finished task can't be cancelled + self.assertTrue(t.cancelled()) + raise - task = self.new_task(self.loop, coro()) with self.assertRaises(asyncio.CancelledError) as cm: - self.loop.run_until_complete(task) - exc = cm.exception - self.assertEqual(exc.args, ('my message',)) - - actual = get_innermost_context(exc) - self.assertEqual(actual, - (asyncio.CancelledError, ('my message',), 0)) - - def test_cancel_yield(self): - async def task(): - await asyncio.sleep(0) - await asyncio.sleep(0) - return 12 - - t = self.new_task(self.loop, task()) - test_utils.run_briefly(self.loop) # start coro - t.cancel() - self.assertRaises( - asyncio.CancelledError, self.loop.run_until_complete, t) - self.assertTrue(t.done()) - self.assertTrue(t.cancelled()) - self.assertFalse(t.cancel()) - - def test_cancel_inner_future(self): - f = self.new_future(self.loop) - - async def task(): - await f - return 12 - - t = self.new_task(self.loop, task()) - test_utils.run_briefly(self.loop) # start task - f.cancel() - with self.assertRaises(asyncio.CancelledError): - self.loop.run_until_complete(t) - self.assertTrue(f.cancelled()) - self.assertTrue(t.cancelled()) - - def test_cancel_both_task_and_inner_future(self): - f = self.new_future(self.loop) - - async def task(): - await f - return 12 - - t = self.new_task(self.loop, task()) - test_utils.run_briefly(self.loop) - - f.cancel() - t.cancel() - - with self.assertRaises(asyncio.CancelledError): - self.loop.run_until_complete(t) - - self.assertTrue(t.done()) - self.assertTrue(f.cancelled()) - self.assertTrue(t.cancelled()) - - def test_cancel_task_catching(self): - fut1 = self.new_future(self.loop) - fut2 = self.new_future(self.loop) - - async def task(): - await fut1 - try: - await fut2 - except asyncio.CancelledError: - return 42 - - t = self.new_task(self.loop, task()) - test_utils.run_briefly(self.loop) - self.assertIs(t._fut_waiter, fut1) # White-box test. - fut1.set_result(None) - test_utils.run_briefly(self.loop) - self.assertIs(t._fut_waiter, fut2) # White-box test. - t.cancel() - self.assertTrue(fut2.cancelled()) - res = self.loop.run_until_complete(t) - self.assertEqual(res, 42) - self.assertFalse(t.cancelled()) - - def test_cancel_task_ignoring(self): - fut1 = self.new_future(self.loop) - fut2 = self.new_future(self.loop) - fut3 = self.new_future(self.loop) - - async def task(): - await fut1 - try: - await fut2 - except asyncio.CancelledError: - pass - res = await fut3 - return res - - t = self.new_task(self.loop, task()) - test_utils.run_briefly(self.loop) - self.assertIs(t._fut_waiter, fut1) # White-box test. - fut1.set_result(None) - test_utils.run_briefly(self.loop) - self.assertIs(t._fut_waiter, fut2) # White-box test. - t.cancel() - self.assertTrue(fut2.cancelled()) - test_utils.run_briefly(self.loop) - self.assertIs(t._fut_waiter, fut3) # White-box test. - fut3.set_result(42) - res = self.loop.run_until_complete(t) - self.assertEqual(res, 42) - self.assertFalse(fut3.cancelled()) - self.assertFalse(t.cancelled()) - - def test_close(self): - self.assertFalse(self.loop.is_closed()) - self.loop.close() - self.assertTrue(self.loop.is_closed()) - - # it should be possible to call close() more than once - self.loop.close() - self.loop.close() - - # operation blocked when the loop is closed - f = self.loop.create_future() - self.assertRaises(RuntimeError, self.loop.run_forever) - self.assertRaises(RuntimeError, self.loop.run_until_complete, f) - - def test__add_callback_handle(self): - h = asyncio.Handle(lambda: False, (), self.loop, None) - - self.loop._add_callback(h) - self.assertFalse(self.loop._scheduled) - self.assertIn(h, self.loop._ready) - - def test__add_callback_cancelled_handle(self): - h = asyncio.Handle(lambda: False, (), self.loop, None) - h.cancel() - - self.loop._add_callback(h) - self.assertFalse(self.loop._scheduled) - self.assertFalse(self.loop._ready) - - def test_call_soon(self): - def cb(): - pass - - h = self.loop.call_soon(cb) - self.assertEqual(h._callback, cb) - self.assertIsInstance(h, asyncio.Handle) - self.assertIn(h, self.loop._ready) - - def test_call_soon_non_callable(self): - self.loop.set_debug(True) - with self.assertRaisesRegex(TypeError, 'a callable object'): - self.loop.call_soon(1) - - def test_call_later(self): - def cb(): - pass - - h = self.loop.call_later(10.0, cb) - self.assertIsInstance(h, asyncio.TimerHandle) - self.assertIn(h, self.loop._scheduled) - self.assertNotIn(h, self.loop._ready) - with self.assertRaises(TypeError, msg="delay must not be None"): - self.loop.call_later(None, cb) - - def test_call_later_negative_delays(self): - calls = [] - - def cb(arg): - calls.append(arg) - - self.loop._process_events = mock.Mock() - self.loop.call_later(-1, cb, 'a') - self.loop.call_later(-2, cb, 'b') - test_utils.run_briefly(self.loop) - self.assertEqual(calls, ['b', 'a']) - - def test_time_and_call_at(self): - def cb(): - self.loop.stop() - - self.loop._process_events = mock.Mock() - delay = 0.1 - - when = self.loop.time() + delay - self.loop.call_at(when, cb) - t0 = self.loop.time() - self.loop.run_forever() - dt = self.loop.time() - t0 - - # 50 ms: maximum granularity of the event loop - self.assertGreaterEqual(dt, delay - 0.050, dt) - # tolerate a difference of +800 ms because some Python buildbots - # are really slow - self.assertLessEqual(dt, 0.9, dt) - with self.assertRaises(TypeError, msg="when cannot be None"): - self.loop.call_at(None, cb) - - def test_run_until_complete_loop(self): - task = self.loop.create_future() - other_loop = self.new_test_loop() - self.addCleanup(other_loop.close) - self.assertRaises(ValueError, - other_loop.run_until_complete, task) - - def test_run_until_complete_loop_orphan_future_close_loop(self): - class ShowStopper(SystemExit): - pass - - async def foo(delay): - await asyncio.sleep(delay) - - def throw(): - raise ShowStopper - - self.loop._process_events = mock.Mock() - self.loop.call_soon(throw) - with self.assertRaises(ShowStopper): - self.loop.run_until_complete(foo(0.1)) - - # This call fails if run_until_complete does not clean up - # done-callback for the previous future. - self.loop.run_until_complete(foo(0.2)) - - def test_default_exc_handler_callback(self): - self.loop._process_events = mock.Mock() - - def zero_error(fut): - fut.set_result(True) - 1/0 - - # Test call_soon (events.Handle) - with mock.patch('asyncio.base_events.logger') as log: - fut = self.loop.create_future() - self.loop.call_soon(zero_error, fut) - fut.add_done_callback(lambda fut: self.loop.stop()) - self.loop.run_forever() - log.error.assert_called_with( - test_utils.MockPattern('Exception in callback.*zero'), - exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) - - # Test call_later (events.TimerHandle) - with mock.patch('asyncio.base_events.logger') as log: - fut = self.loop.create_future() - self.loop.call_later(0.01, zero_error, fut) - fut.add_done_callback(lambda fut: self.loop.stop()) - self.loop.run_forever() - log.error.assert_called_with( - test_utils.MockPattern('Exception in callback.*zero'), - exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) - - def test_default_exc_handler_coro(self): - self.loop._process_events = mock.Mock() - - async def zero_error_coro(): - await asyncio.sleep(0.01) - 1/0 - - # Test Future.__del__ - with mock.patch('asyncio.base_events.logger') as log: - fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop) - fut.add_done_callback(lambda *args: self.loop.stop()) - self.loop.run_forever() - fut = None # Trigger Future.__del__ or futures._TracebackLogger - support.gc_collect() - # Future.__del__ in logs error with an actual exception context - log.error.assert_called_with( - test_utils.MockPattern('.*exception was never retrieved'), - exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) - - def test_set_exc_handler_invalid(self): - with self.assertRaisesRegex(TypeError, 'A callable object or None'): - self.loop.set_exception_handler('spam') - - def test_set_exc_handler_custom(self): - def zero_error(): - 1/0 - - def run_loop(): - handle = self.loop.call_soon(zero_error) - self.loop._run_once() - return handle - - self.loop.set_debug(True) - self.loop._process_events = mock.Mock() - - self.assertIsNone(self.loop.get_exception_handler()) - mock_handler = mock.Mock() - self.loop.set_exception_handler(mock_handler) - self.assertIs(self.loop.get_exception_handler(), mock_handler) - handle = run_loop() - mock_handler.assert_called_with(self.loop, { - 'exception': MOCK_ANY, - 'message': test_utils.MockPattern( - 'Exception in callback.*zero_error'), - 'handle': handle, - 'source_traceback': handle._source_traceback, - }) - mock_handler.reset_mock() - - self.loop.set_exception_handler(None) - with mock.patch('asyncio.base_events.logger') as log: - run_loop() - log.error.assert_called_with( - test_utils.MockPattern( - 'Exception in callback.*zero'), - exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) - - self.assertFalse(mock_handler.called) - - def test_set_exc_handler_broken(self): - def run_loop(): - def zero_error(): - 1/0 - self.loop.call_soon(zero_error) - self.loop._run_once() - - def handler(loop, context): - raise AttributeError('spam') - - self.loop._process_events = mock.Mock() - - self.loop.set_exception_handler(handler) - - with mock.patch('asyncio.base_events.logger') as log: - run_loop() - log.error.assert_called_with( - test_utils.MockPattern( - 'Unhandled error in exception handler'), - exc_info=(AttributeError, MOCK_ANY, MOCK_ANY)) - - def test_default_exc_handler_broken(self): - _context = None - - class Loop(base_events.BaseEventLoop): - - _selector = mock.Mock() - _process_events = mock.Mock() - - def default_exception_handler(self, context): - nonlocal _context - _context = context - # Simulates custom buggy "default_exception_handler" - raise ValueError('spam') - - loop = Loop() - self.addCleanup(loop.close) - asyncio.set_event_loop(loop) - - def run_loop(): - def zero_error(): - 1/0 - loop.call_soon(zero_error) - loop._run_once() - - with mock.patch('asyncio.base_events.logger') as log: - run_loop() - log.error.assert_called_with( - 'Exception in default exception handler', - exc_info=True) - - def custom_handler(loop, context): - raise ValueError('ham') - - _context = None - loop.set_exception_handler(custom_handler) - with mock.patch('asyncio.base_events.logger') as log: - run_loop() - log.error.assert_called_with( - test_utils.MockPattern('Exception in default exception.*' - 'while handling.*in custom'), - exc_info=True) - - # Check that original context was passed to default - # exception handler. - self.assertIn('context', _context) - self.assertIs(type(_context['context']['exception']), - ZeroDivisionError) - - def test_eager_task_factory_with_custom_task_ctor(self): - - class MyTask(asyncio.Task): - pass + self.loop.run_until_complete(run()) - async def coro(): - pass - - factory = asyncio.create_eager_task_factory(MyTask) - - self.loop.set_task_factory(factory) - self.assertIs(self.loop.get_task_factory(), factory) - - task = self.loop.create_task(coro()) - self.assertTrue(isinstance(task, MyTask)) - self.loop.run_until_complete(task) - - def test_create_named_task(self): - async def test(): - pass - - task = self.loop.create_task(test(), name='test_task') - try: - self.assertEqual(task.get_name(), 'test_task') - finally: - self.loop.run_until_complete(task) - - def test_run_forever_keyboard_interrupt(self): - # Python issue #22601: ensure that the temporary task created by - # run_forever() consumes the KeyboardInterrupt and so don't log - # a warning - async def raise_keyboard_interrupt(): - raise KeyboardInterrupt - - self.loop._process_events = mock.Mock() - self.loop.call_exception_handler = mock.Mock() - - try: - self.loop.run_until_complete(raise_keyboard_interrupt()) - except KeyboardInterrupt: - pass - self.loop.close() - support.gc_collect() - - self.assertFalse(self.loop.call_exception_handler.called) - - def test_run_until_complete_baseexception(self): - # Python issue #22429: run_until_complete() must not schedule a pending - # call to stop() if the future raised a BaseException - async def raise_keyboard_interrupt(): - raise KeyboardInterrupt - - self.loop._process_events = mock.Mock() - - with self.assertRaises(KeyboardInterrupt): - self.loop.run_until_complete(raise_keyboard_interrupt()) - - def func(): - self.loop.stop() - func.called = True - func.called = False - self.loop.call_soon(self.loop.call_soon, func) - self.loop.run_forever() - self.assertTrue(func.called) - - def test_run_once(self): - # Simple test for test_utils.run_once(). It may seem strange - # to have a test for this (the function isn't even used!) but - # it's a de-factor standard API for library tests. This tests - # the idiom: loop.call_soon(loop.stop); loop.run_forever(). - count = 0 - - def callback(): - nonlocal count - count += 1 - - self.loop._process_events = mock.Mock() - self.loop.call_soon(callback) - test_utils.run_once(self.loop) - self.assertEqual(count, 1) + self.assertEqual('cancellation message', cm.exception.args[0]) class AsyncTaskCounter: From 45e5c8c0434055437db8e4277f33e4169d0a5c2d Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Tue, 25 Apr 2023 13:17:31 -0600 Subject: [PATCH 16/28] ensure task_eager_start is not called with a NULL task --- Modules/_asynciomodule.c | 1 + 1 file changed, 1 insertion(+) diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index a81f5f4357740a..1482df7d9ff814 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -3150,6 +3150,7 @@ task_step(asyncio_state *state, TaskObj *task, PyObject *exc) static int task_eager_start(asyncio_state *state, TaskObj *task) { + assert(task != NULL); PyObject *prevtask = swap_current_task(state, task->task_loop, (PyObject *)task); if (prevtask == NULL) { return -1; From fbf8d91208e3dcef67151ac486819ed20abfce07 Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Wed, 26 Apr 2023 16:49:10 -0600 Subject: [PATCH 17/28] Refactor eager task tests to clarify the "loop is running" constraint and make it explicit --- .../test_asyncio/test_eager_task_factory.py | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/Lib/test/test_asyncio/test_eager_task_factory.py b/Lib/test/test_asyncio/test_eager_task_factory.py index 0bd956b2952660..709540e48c8bea 100644 --- a/Lib/test/test_asyncio/test_eager_task_factory.py +++ b/Lib/test/test_asyncio/test_eager_task_factory.py @@ -28,6 +28,21 @@ def new_task(self, loop, coro, name='TestTask', context=None): def new_future(self, loop): return asyncio.Future(loop=loop) + def run_coro(self, coro): + """ + Helper method to run the `coro` coroutine in the test event loop. + It helps with making sure the event loop is running before starting + to execute `coro`. This is important for testing the eager step + functionality, since an eager step is taken only if the event loop + is already running. + """ + + async def coro_runner(): + self.assertTrue(asyncio.get_event_loop().is_running()) + return await coro + + return self.loop.run_until_complete(coro) + def setUp(self): super().setUp() self.loop = asyncio.new_event_loop() @@ -49,8 +64,7 @@ async def run(): self.assertTrue(t.done()) return await fut - result = self.loop.run_until_complete(run()) - self.assertEqual(result,'my message') + self.assertEqual(self.run_coro(run()), 'my message') def test_eager_completion(self): @@ -58,14 +72,12 @@ async def coro(): return 'hello' async def run(): - await asyncio.sleep(0.1) t = self.loop.create_task(coro()) # assert the eager step completed the task self.assertTrue(t.done()) return await t - result = self.loop.run_until_complete(run()) - self.assertEqual(result, 'hello') + self.assertEqual(self.run_coro(run()), 'hello') def test_block_after_eager_step(self): @@ -74,15 +86,13 @@ async def coro(): return 'finished after blocking' async def run(): - await asyncio.sleep(0.1) t = self.loop.create_task(coro()) self.assertFalse(t.done()) result = await t self.assertTrue(t.done()) return result - result = self.loop.run_until_complete(run()) - self.assertEqual(result, 'finished after blocking') + self.assertEqual(self.run_coro(run()), 'finished after blocking') def test_cancellation_after_eager_completion(self): @@ -90,7 +100,6 @@ async def coro(): return 'finished without blocking' async def run(): - await asyncio.sleep(0.1) t = self.loop.create_task(coro()) t.cancel() result = await t @@ -98,8 +107,7 @@ async def run(): self.assertFalse(t.cancelled()) return result - result = self.loop.run_until_complete(run()) - self.assertEqual(result, 'finished without blocking') + self.assertEqual(self.run_coro(run()), 'finished without blocking') def test_cancellation_after_eager_step_blocks(self): @@ -108,19 +116,13 @@ async def coro(): return 'finished after blocking' async def run(): - await asyncio.sleep(0.1) t = self.loop.create_task(coro()) t.cancel('cancellation message') self.assertGreater(t.cancelling(), 0) - try: - result = await t - except asyncio.CancelledError: - # finished task can't be cancelled - self.assertTrue(t.cancelled()) - raise + result = await t with self.assertRaises(asyncio.CancelledError) as cm: - self.loop.run_until_complete(run()) + self.run_coro(run()) self.assertEqual('cancellation message', cm.exception.args[0]) From 873a645869320215db850a9428857b4d4bee03a1 Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Wed, 26 Apr 2023 17:03:16 -0600 Subject: [PATCH 18/28] Apply documentation suggestions and feedback --- Doc/library/asyncio-task.rst | 23 ++++++++++--------- ...3-03-15-12-18-07.gh-issue-97696.DtnpIC.rst | 2 +- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index fd7c4805f2b3ec..0332db017ec52a 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -535,21 +535,22 @@ Eager Task Factory A task factory for eager task execution. When using this factory (via ``loop.set_task_factory(asyncio.eager_task_factory)``), - coroutines that are able to complete synchronously (without blocking) - are returned immediately as a finished :class:`Task`. + coroutines begin execution synchronously during :class:`Task` construction. + :class:`Task`s are only scheduled on the event loop if they block. + This can be a performance improvement as the overhead of loop scheduling + is avoided for coroutines that complete synchronously. - This task factory attempts to execute the coroutine ``coro`` immediately - (before scheduling the task to the event loop), until it either blocks, - returns, or raises. - If the coroutine returns or raises, a finished :class:`Task` is returned, - and the task is never scheduled to the event loop. If the coroutine blocks, - the (pending) :class:`Task` is scheduled and returned. + A common example where this is beneficial is coroutines which employ + caching or memoization to avoid actual I/O when possible. .. note:: - The fact that the coroutine starts execution immediately is a semantic change, - and might lead to application behavior changes, depending on the application. - For example, the order of execution of tasks is likely to change. + Immediate execution of the coroutine is a semantic change. + If the coroutine blocks, the task is never scheduled to the event loop. + If the coroutine execution returns or raises, the task is scheduled to + the event loop. This change may introduce behavior changes to existing + applications. For example, the application's task execution order is + likely to change. .. versionadded:: 3.12 diff --git a/Misc/NEWS.d/next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst b/Misc/NEWS.d/next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst index d899590ffc10fb..115225630fd6cc 100644 --- a/Misc/NEWS.d/next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst +++ b/Misc/NEWS.d/next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst @@ -1,6 +1,6 @@ Implemented an eager task factory in asyncio. When used as a task factory on an event loop, it performs eager execution of -coroutines. coroutines that are able to complete synchronously (e.g. return or +coroutines. Coroutines that are able to complete synchronously (e.g. return or raise without blocking) are returned immediately as a finished task, and the task is never scheduled to the event loop. If the coroutine blocks, the (pending) is scheduled and returned. From 9522c54406fa59fc8c2978fb415526eb91b5c098 Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Wed, 26 Apr 2023 17:48:53 -0600 Subject: [PATCH 19/28] fix docs (rst is hard) --- Doc/library/asyncio-task.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index 0332db017ec52a..84914f33b3fcac 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -534,9 +534,9 @@ Eager Task Factory A task factory for eager task execution. - When using this factory (via ``loop.set_task_factory(asyncio.eager_task_factory)``), + When using this factory (via :meth:`loop.set_task_factory(asyncio.eager_task_factory) `), coroutines begin execution synchronously during :class:`Task` construction. - :class:`Task`s are only scheduled on the event loop if they block. + Tasks are only scheduled on the event loop if they block. This can be a performance improvement as the overhead of loop scheduling is avoided for coroutines that complete synchronously. From fef8140a4d29034c0363757472b8ba708aa0727a Mon Sep 17 00:00:00 2001 From: Jacob Bower <1978924+jbower-fb@users.noreply.github.com> Date: Thu, 27 Apr 2023 22:50:42 -0700 Subject: [PATCH 20/28] Extend eager task factory tests Added new tests for: checking current task + all_tasks, and context vars. Also made EagerTaskFactoryLoopTests run for both C and Python Task implementations. --- .../test_asyncio/test_eager_task_factory.py | 108 +++++++++++++++--- 1 file changed, 95 insertions(+), 13 deletions(-) diff --git a/Lib/test/test_asyncio/test_eager_task_factory.py b/Lib/test/test_asyncio/test_eager_task_factory.py index 709540e48c8bea..4b31deebfa08ab 100644 --- a/Lib/test/test_asyncio/test_eager_task_factory.py +++ b/Lib/test/test_asyncio/test_eager_task_factory.py @@ -1,12 +1,13 @@ """Tests for base_events.py""" +import asyncio +import contextvars import gc import time import unittest + from types import GenericAlias from unittest import mock - -import asyncio from asyncio import base_events from asyncio import tasks from test.test_asyncio import utils as test_utils @@ -20,14 +21,7 @@ def tearDownModule(): asyncio.set_event_loop_policy(None) -class EagerTaskFactoryLoopTests(test_utils.TestCase): - - def new_task(self, loop, coro, name='TestTask', context=None): - return tasks.Task(coro, loop=loop, name=name, context=context) - - def new_future(self, loop): - return asyncio.Future(loop=loop) - +class EagerTaskFactoryLoopTests: def run_coro(self, coro): """ Helper method to run the `coro` coroutine in the test event loop. @@ -46,11 +40,22 @@ async def coro_runner(): def setUp(self): super().setUp() self.loop = asyncio.new_event_loop() - self.loop.set_task_factory(asyncio.eager_task_factory) + self.eager_task_factory = asyncio.create_eager_task_factory(self.__class__.Task) + self.loop.set_task_factory(self.eager_task_factory) self.set_event_loop(self.loop) def test_eager_task_factory_set(self): - self.assertIs(self.loop.get_task_factory(), asyncio.eager_task_factory) + self.assertIsNotNone(self.eager_task_factory) + self.assertIs(self.loop.get_task_factory(), self.eager_task_factory) + + async def noop(): pass + + async def run(): + t = self.loop.create_task(noop()) + self.assertIsInstance(t, self.__class__.Task) + await t + + self.run_coro(run()) def test_await_future_during_eager_step(self): @@ -126,6 +131,84 @@ async def run(): self.assertEqual('cancellation message', cm.exception.args[0]) + def test_current_task(self): + captured_current_task = None + + async def coro(): + nonlocal captured_current_task + captured_current_task = asyncio.tasks.current_task() + + async def run(): + t = self.loop.create_task(coro()) + self.assertIs(captured_current_task, t) + + self.run_coro(run()) + + def test_all_tasks_with_eager_completion(self): + captured_all_tasks = None + + async def coro(): + nonlocal captured_all_tasks + captured_all_tasks = asyncio.tasks.all_tasks() + + async def run(): + t = self.loop.create_task(coro()) + self.assertIn(t, captured_all_tasks) + self.assertNotIn(t, asyncio.tasks.all_tasks()) + + self.run_coro(run()) + + def test_all_tasks_with_blocking(self): + captured_eager_all_tasks = None + + async def coro(fut1, fut2): + nonlocal captured_eager_all_tasks + captured_eager_all_tasks = asyncio.tasks.all_tasks() + await fut1 + fut2.set_result(None) + + async def run(): + fut1 = self.loop.create_future() + fut2 = self.loop.create_future() + t = self.loop.create_task(coro(fut1, fut2)) + self.assertIn(t, captured_eager_all_tasks) + self.assertIn(t, asyncio.tasks.all_tasks()) + fut1.set_result(None) + await fut2 + self.assertNotIn(t, asyncio.tasks.all_tasks()) + + self.run_coro(run()) + + def test_context_vars(self): + cv = contextvars.ContextVar('cv') + cv.set(1) + + coro_ran = False + + async def coro(): + nonlocal coro_ran + self.assertEqual(cv.get(), 1) + cv.set(2) + self.assertEqual(cv.get(), 2) + coro_ran = True + + async def run(): + t = self.loop.create_task(coro()) + self.assertTrue(coro_ran) + self.assertEqual(cv.get(), 1) + + self.run_coro(run()) + + +class PyEagerTaskFactoryLoopTests(EagerTaskFactoryLoopTests, test_utils.TestCase): + Task = tasks._PyTask + + +@unittest.skipUnless(hasattr(tasks, '_CTask'), + 'requires the C _asyncio module') +class CEagerTaskFactoryLoopTests(EagerTaskFactoryLoopTests, test_utils.TestCase): + Task = getattr(tasks, '_CTask', None) + class AsyncTaskCounter: def __init__(self, loop, *, task_class, eager): @@ -238,6 +321,5 @@ class NonEagerCTaskTests(BaseNonEagerTaskFactoryTests, test_utils.TestCase): class EagerCTaskTests(BaseEagerTaskFactoryTests, test_utils.TestCase): Task = getattr(tasks, '_CTask', None) - if __name__ == '__main__': unittest.main() From 1eb540cb2ed7310f0d54dd259072bdd162229d2d Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Fri, 28 Apr 2023 09:49:42 -0700 Subject: [PATCH 21/28] a little cleanup of newly added tests --- .../test_asyncio/test_eager_task_factory.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/Lib/test/test_asyncio/test_eager_task_factory.py b/Lib/test/test_asyncio/test_eager_task_factory.py index 4b31deebfa08ab..09949e1a7fac2b 100644 --- a/Lib/test/test_asyncio/test_eager_task_factory.py +++ b/Lib/test/test_asyncio/test_eager_task_factory.py @@ -22,6 +22,9 @@ def tearDownModule(): class EagerTaskFactoryLoopTests: + + Task = None + def run_coro(self, coro): """ Helper method to run the `coro` coroutine in the test event loop. @@ -40,7 +43,7 @@ async def coro_runner(): def setUp(self): super().setUp() self.loop = asyncio.new_event_loop() - self.eager_task_factory = asyncio.create_eager_task_factory(self.__class__.Task) + self.eager_task_factory = asyncio.create_eager_task_factory(self.Task) self.loop.set_task_factory(self.eager_task_factory) self.set_event_loop(self.loop) @@ -52,7 +55,7 @@ async def noop(): pass async def run(): t = self.loop.create_task(noop()) - self.assertIsInstance(t, self.__class__.Task) + self.assertIsInstance(t, self.Task) await t self.run_coro(run()) @@ -136,7 +139,7 @@ def test_current_task(self): async def coro(): nonlocal captured_current_task - captured_current_task = asyncio.tasks.current_task() + captured_current_task = asyncio.current_task() async def run(): t = self.loop.create_task(coro()) @@ -149,12 +152,12 @@ def test_all_tasks_with_eager_completion(self): async def coro(): nonlocal captured_all_tasks - captured_all_tasks = asyncio.tasks.all_tasks() + captured_all_tasks = asyncio.all_tasks() async def run(): t = self.loop.create_task(coro()) self.assertIn(t, captured_all_tasks) - self.assertNotIn(t, asyncio.tasks.all_tasks()) + self.assertNotIn(t, asyncio.all_tasks()) self.run_coro(run()) @@ -163,7 +166,7 @@ def test_all_tasks_with_blocking(self): async def coro(fut1, fut2): nonlocal captured_eager_all_tasks - captured_eager_all_tasks = asyncio.tasks.all_tasks() + captured_eager_all_tasks = asyncio.all_tasks() await fut1 fut2.set_result(None) @@ -172,10 +175,10 @@ async def run(): fut2 = self.loop.create_future() t = self.loop.create_task(coro(fut1, fut2)) self.assertIn(t, captured_eager_all_tasks) - self.assertIn(t, asyncio.tasks.all_tasks()) + self.assertIn(t, asyncio.all_tasks()) fut1.set_result(None) await fut2 - self.assertNotIn(t, asyncio.tasks.all_tasks()) + self.assertNotIn(t, asyncio.all_tasks()) self.run_coro(run()) From 3cef856dd4c2feeb3ecb802f0b66e8390a1a483f Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Fri, 28 Apr 2023 09:53:27 -0700 Subject: [PATCH 22/28] add assertion to current_task test, comparing the task before and after eager stop blocking --- Lib/test/test_asyncio/test_eager_task_factory.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Lib/test/test_asyncio/test_eager_task_factory.py b/Lib/test/test_asyncio/test_eager_task_factory.py index 09949e1a7fac2b..d1586311121453 100644 --- a/Lib/test/test_asyncio/test_eager_task_factory.py +++ b/Lib/test/test_asyncio/test_eager_task_factory.py @@ -140,12 +140,17 @@ def test_current_task(self): async def coro(): nonlocal captured_current_task captured_current_task = asyncio.current_task() + # verify the task before and after blocking is identical + await asyncio.sleep(0.1) + self.assertIs(asyncio.current_task(), captured_current_task) async def run(): t = self.loop.create_task(coro()) self.assertIs(captured_current_task, t) + await t self.run_coro(run()) + captured_current_task = None def test_all_tasks_with_eager_completion(self): captured_all_tasks = None From 887771638fdca61edbf4a9388c74fbce8dc44a6b Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Fri, 28 Apr 2023 09:59:11 -0700 Subject: [PATCH 23/28] add a second step to contextvars test --- .../test_asyncio/test_eager_task_factory.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/Lib/test/test_asyncio/test_eager_task_factory.py b/Lib/test/test_asyncio/test_eager_task_factory.py index d1586311121453..03236823bc636d 100644 --- a/Lib/test/test_asyncio/test_eager_task_factory.py +++ b/Lib/test/test_asyncio/test_eager_task_factory.py @@ -191,18 +191,29 @@ def test_context_vars(self): cv = contextvars.ContextVar('cv') cv.set(1) - coro_ran = False + coro_first_step_ran = False + coro_second_step_ran = False async def coro(): - nonlocal coro_ran + nonlocal coro_first_step_ran + nonlocal coro_second_step_ran self.assertEqual(cv.get(), 1) cv.set(2) self.assertEqual(cv.get(), 2) - coro_ran = True + coro_first_step_ran = True + await asyncio.sleep(0.1) + self.assertEqual(cv.get(), 2) + cv.set(3) + self.assertEqual(cv.get(), 3) + coro_second_step_ran = True async def run(): t = self.loop.create_task(coro()) - self.assertTrue(coro_ran) + self.assertTrue(coro_first_step_ran) + self.assertFalse(coro_second_step_ran) + self.assertEqual(cv.get(), 1) + await t + self.assertTrue(coro_second_step_ran) self.assertEqual(cv.get(), 1) self.run_coro(run()) From 0c09767dafe22b4ecf3f6835b7aea30494b8ae0c Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Mon, 1 May 2023 09:59:40 -0700 Subject: [PATCH 24/28] missing word in NEWS entry --- .../next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst b/Misc/NEWS.d/next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst index 115225630fd6cc..0b3854d74eb991 100644 --- a/Misc/NEWS.d/next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst +++ b/Misc/NEWS.d/next/Library/2023-03-15-12-18-07.gh-issue-97696.DtnpIC.rst @@ -3,4 +3,4 @@ When used as a task factory on an event loop, it performs eager execution of coroutines. Coroutines that are able to complete synchronously (e.g. return or raise without blocking) are returned immediately as a finished task, and the task is never scheduled to the event loop. If the coroutine blocks, the -(pending) is scheduled and returned. +(pending) task is scheduled and returned. From a255ec8305630f20ac10d3c030305492e645aafb Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Mon, 1 May 2023 10:00:19 -0700 Subject: [PATCH 25/28] refactor all_tasks() handling of eager_tasks --- Lib/asyncio/tasks.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 782d1194c1be44..dece6d69f2bd2b 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -44,18 +44,18 @@ def all_tasks(loop=None): """Return a set of all tasks for the loop.""" if loop is None: loop = events.get_running_loop() - # Looping over these sets isn't safe as they can be updated from another thread, + # capturing the set of eager tasks first, so if an eager task "graduates" to + # a regular task in another thread, we don't risk missing it + eager_tasks = list(_eager_tasks) + # Looping over the weak set isn't safe as it can be updated from another thread, # therefore we cast to lists prior to filtering. The list cast itself requires # iteration, so we repeat it several times ignoring RuntimeErrors (which are not # very likely to occur). See issues 34970 and 36607 for details. scheduled_tasks = None - eager_tasks = None i = 0 while True: try: - if scheduled_tasks is None: - scheduled_tasks = list(_scheduled_tasks) - eager_tasks = list(_eager_tasks) + scheduled_tasks = list(_scheduled_tasks) except RuntimeError: i += 1 if i >= 1000: From 05870d5ab4df81c52616cb17ed21a2b2ca7489b6 Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Mon, 1 May 2023 10:03:19 -0700 Subject: [PATCH 26/28] fix docs (PR review) --- Doc/library/asyncio-task.rst | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index 84914f33b3fcac..f8727b98066990 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -546,11 +546,11 @@ Eager Task Factory .. note:: Immediate execution of the coroutine is a semantic change. - If the coroutine blocks, the task is never scheduled to the event loop. - If the coroutine execution returns or raises, the task is scheduled to - the event loop. This change may introduce behavior changes to existing - applications. For example, the application's task execution order is - likely to change. + If the coroutine returns or raises, the task is never scheduled + to the event loop. If the coroutine execution blocks, the task is + scheduled to the event loop. This change may introduce behavior + changes to existing applications. For example, + the application's task execution order is likely to change. .. versionadded:: 3.12 From a2587a1956229f4b42c45865581d8a8d7b2ebd32 Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Mon, 1 May 2023 10:34:38 -0700 Subject: [PATCH 27/28] move cv.set inside the main task (otherwise it refleaks) --- Lib/test/test_asyncio/test_eager_task_factory.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_asyncio/test_eager_task_factory.py b/Lib/test/test_asyncio/test_eager_task_factory.py index 03236823bc636d..fe690934292a86 100644 --- a/Lib/test/test_asyncio/test_eager_task_factory.py +++ b/Lib/test/test_asyncio/test_eager_task_factory.py @@ -188,8 +188,7 @@ async def run(): self.run_coro(run()) def test_context_vars(self): - cv = contextvars.ContextVar('cv') - cv.set(1) + cv = contextvars.ContextVar('cv', default=0) coro_first_step_ran = False coro_second_step_ran = False @@ -208,6 +207,7 @@ async def coro(): coro_second_step_ran = True async def run(): + cv.set(1) t = self.loop.create_task(coro()) self.assertTrue(coro_first_step_ran) self.assertFalse(coro_second_step_ran) From b83ed944541ced7d002680c8931cb2f0ecc8a3f5 Mon Sep 17 00:00:00 2001 From: Itamar Ostricher Date: Mon, 1 May 2023 11:14:05 -0700 Subject: [PATCH 28/28] fix grammar in all_tasks comments --- Lib/asyncio/tasks.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index dece6d69f2bd2b..aa5269ade19a7f 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -44,13 +44,14 @@ def all_tasks(loop=None): """Return a set of all tasks for the loop.""" if loop is None: loop = events.get_running_loop() - # capturing the set of eager tasks first, so if an eager task "graduates" to - # a regular task in another thread, we don't risk missing it + # capturing the set of eager tasks first, so if an eager task "graduates" + # to a regular task in another thread, we don't risk missing it. eager_tasks = list(_eager_tasks) - # Looping over the weak set isn't safe as it can be updated from another thread, - # therefore we cast to lists prior to filtering. The list cast itself requires - # iteration, so we repeat it several times ignoring RuntimeErrors (which are not - # very likely to occur). See issues 34970 and 36607 for details. + # Looping over the WeakSet isn't safe as it can be updated from another + # thread, therefore we cast it to list prior to filtering. The list cast + # itself requires iteration, so we repeat it several times ignoring + # RuntimeErrors (which are not very likely to occur). + # See issues 34970 and 36607 for details. scheduled_tasks = None i = 0 while True: