diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py index fed16ec7c67fac..4467ff9725a5fa 100644 --- a/Lib/asyncio/__init__.py +++ b/Lib/asyncio/__init__.py @@ -18,6 +18,7 @@ from .subprocess import * from .tasks import * from .taskgroups import * +from .taskscopes import * from .timeouts import * from .threads import * from .transports import * @@ -34,6 +35,8 @@ streams.__all__ + subprocess.__all__ + tasks.__all__ + + taskgroups.__all__ + + taskscopes.__all__ + threads.__all__ + timeouts.__all__ + transports.__all__) diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index 06b2e0db86a1fe..299bd52a019ff8 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -2,14 +2,13 @@ # license: PSFL. -__all__ = ["TaskGroup"] +__all__ = "TaskGroup", -from . import events from . import exceptions -from . import tasks +from . import taskscopes -class TaskGroup: +class TaskGroup(taskscopes.TaskScope): """Asynchronous context manager for managing groups of tasks. Example use: @@ -26,16 +25,8 @@ class TaskGroup: The exceptions are then combined and raised as an `ExceptionGroup`. """ def __init__(self): - self._entered = False - self._exiting = False - self._aborting = False - self._loop = None - self._parent_task = None - self._parent_cancel_requested = False - self._tasks = set() + super().__init__(delegate_errors=None) self._errors = [] - self._base_error = None - self._on_completed_fut = None def __repr__(self): info = [''] @@ -49,91 +40,20 @@ def __repr__(self): info.append('entered') info_str = ' '.join(info) - return f'' + return f'' - async def __aenter__(self): - if self._entered: - raise RuntimeError( - f"TaskGroup {self!r} has been already entered") - self._entered = True - - if self._loop is None: - self._loop = events.get_running_loop() - - self._parent_task = tasks.current_task(self._loop) - if self._parent_task is None: - raise RuntimeError( - f'TaskGroup {self!r} cannot determine the parent task') + def create_task(self, coro, *, name=None, context=None): + """Create a new task in this group and return it. - return self + Similar to `asyncio.create_task`. + """ + task = super().create_task(coro, name=name, context=context) + if not task.done(): + task.add_done_callback(self._handle_completion_as_group) + return task async def __aexit__(self, et, exc, tb): - self._exiting = True - - if (exc is not None and - self._is_base_error(exc) and - self._base_error is None): - self._base_error = exc - - propagate_cancellation_error = \ - exc if et is exceptions.CancelledError else None - if self._parent_cancel_requested: - # If this flag is set we *must* call uncancel(). - if self._parent_task.uncancel() == 0: - # If there are no pending cancellations left, - # don't propagate CancelledError. - propagate_cancellation_error = None - - if et is not None: - if not self._aborting: - # Our parent task is being cancelled: - # - # async with TaskGroup() as g: - # g.create_task(...) - # await ... # <- CancelledError - # - # or there's an exception in "async with": - # - # async with TaskGroup() as g: - # g.create_task(...) - # 1 / 0 - # - self._abort() - - # We use while-loop here because "self._on_completed_fut" - # can be cancelled multiple times if our parent task - # is being cancelled repeatedly (or even once, when - # our own cancellation is already in progress) - while self._tasks: - if self._on_completed_fut is None: - self._on_completed_fut = self._loop.create_future() - - try: - await self._on_completed_fut - except exceptions.CancelledError as ex: - if not self._aborting: - # Our parent task is being cancelled: - # - # async def wrapper(): - # async with TaskGroup() as g: - # g.create_task(foo) - # - # "wrapper" is being cancelled while "foo" is - # still running. - propagate_cancellation_error = ex - self._abort() - - self._on_completed_fut = None - - assert not self._tasks - - if self._base_error is not None: - raise self._base_error - - # Propagate CancelledError if there is one, except if there - # are other errors -- those have priority. - if propagate_cancellation_error and not self._errors: - raise propagate_cancellation_error + await super().__aexit__(et, exc, tb) if et is not None and et is not exceptions.CancelledError: self._errors.append(exc) @@ -148,95 +68,12 @@ async def __aexit__(self, et, exc, tb): finally: self._errors = None - def create_task(self, coro, *, name=None, context=None): - """Create a new task in this group and return it. - - Similar to `asyncio.create_task`. - """ - if not self._entered: - raise RuntimeError(f"TaskGroup {self!r} has not been entered") - if self._exiting and not self._tasks: - raise RuntimeError(f"TaskGroup {self!r} is finished") - if self._aborting: - raise RuntimeError(f"TaskGroup {self!r} is shutting down") - if context is None: - task = self._loop.create_task(coro) - else: - task = self._loop.create_task(coro, context=context) - tasks._set_task_name(task, name) - # optimization: Immediately call the done callback if the task is - # already done (e.g. if the coro was able to complete eagerly), - # and skip scheduling a done callback - if task.done(): - self._on_task_done(task) - else: - self._tasks.add(task) - task.add_done_callback(self._on_task_done) - return task - - # Since Python 3.8 Tasks propagate all exceptions correctly, - # except for KeyboardInterrupt and SystemExit which are - # still considered special. - - def _is_base_error(self, exc: BaseException) -> bool: - assert isinstance(exc, BaseException) - return isinstance(exc, (SystemExit, KeyboardInterrupt)) - - def _abort(self): - self._aborting = True - - for t in self._tasks: - if not t.done(): - t.cancel() - - def _on_task_done(self, task): - self._tasks.discard(task) - - if self._on_completed_fut is not None and not self._tasks: - if not self._on_completed_fut.done(): - self._on_completed_fut.set_result(True) - + def _handle_completion_as_group(self, task): if task.cancelled(): return - - exc = task.exception() - if exc is None: - return - - self._errors.append(exc) - if self._is_base_error(exc) and self._base_error is None: - self._base_error = exc - - if self._parent_task.done(): - # Not sure if this case is possible, but we want to handle - # it anyways. - self._loop.call_exception_handler({ - 'message': f'Task {task!r} has errored out but its parent ' - f'task {self._parent_task} is already completed', - 'exception': exc, - 'task': task, - }) - return - - if not self._aborting and not self._parent_cancel_requested: - # If parent task *is not* being cancelled, it means that we want - # to manually cancel it to abort whatever is being run right now - # in the TaskGroup. But we want to mark parent task as - # "not cancelled" later in __aexit__. Example situation that - # we need to handle: - # - # async def foo(): - # try: - # async with TaskGroup() as g: - # g.create_task(crash_soon()) - # await something # <- this needs to be canceled - # # by the TaskGroup, e.g. - # # foo() needs to be cancelled - # except Exception: - # # Ignore any exceptions raised in the TaskGroup - # pass - # await something_else # this line has to be called - # # after TaskGroup is finished. - self._abort() - self._parent_cancel_requested = True - self._parent_task.cancel() + if (exc := task.exception()) is not None: + self._errors.append(exc) + if not self._aborting and not self._parent_cancel_requested: + self._abort() + self._parent_cancel_requested = True + self._parent_task.cancel() diff --git a/Lib/asyncio/taskscopes.py b/Lib/asyncio/taskscopes.py new file mode 100644 index 00000000000000..dbfc039942c760 --- /dev/null +++ b/Lib/asyncio/taskscopes.py @@ -0,0 +1,268 @@ +# Adapted with permission from the EdgeDB project; +# license: PSFL. + + +__all__ = "TaskScope", + +from . import events +from . import exceptions +from . import tasks + + +_default_error_handler = object() + + +class TaskScope: + """Asynchronous context manager for managing a scope of subtasks. + + Example use: + + async with asyncio.TaskScope() as scope: + task1 = scope.create_task(some_coroutine(...)) + task2 = scope.create_task(other_coroutine(...)) + print("Both tasks have completed now.") + + All tasks are awaited when the context manager exits. + + Any exceptions other than `asyncio.CancelledError` raised within + a task will be handled differently depending on `delegate_errors`. + + If `delegate_errors` is not set, it will run + `loop.call_exception_handler()`. + If it is set `None`, it will silently ignore the exception. + If it is set as a callable function, it will invoke it using the same + context argument of `loop.call_exception_handler()`. + """ + def __init__(self, delegate_errors=_default_error_handler): + self._entered = False + self._exiting = False + self._aborting = False + self._loop = None + self._parent_task = None + self._parent_cancel_requested = False + self._tasks = set() + self._base_error = None + self._on_completed_fut = None + self._delegate_errors = delegate_errors + self._has_errors = False + + def __repr__(self): + info = [''] + if self._tasks: + info.append(f'tasks={len(self._tasks)}') + if self._aborting: + info.append('cancelling') + elif self._entered: + info.append('entered') + + info_str = ' '.join(info) + return f'' + + async def __aenter__(self): + if self._entered: + raise RuntimeError( + f'{type(self).__name__} {self!r} ' + f'has been already entered' + ) + self._entered = True + + if self._loop is None: + self._loop = events.get_running_loop() + + self._parent_task = tasks.current_task(self._loop) + if self._parent_task is None: + raise RuntimeError( + f'{type(self).__name__} {self!r} ' + f'cannot determine the parent task' + ) + + return self + + async def __aexit__(self, et, exc, tb): + self._exiting = True + + if (exc is not None and + self._is_base_error(exc) and + self._base_error is None): + self._base_error = exc + + propagate_cancellation_error = \ + exc if et is exceptions.CancelledError else None + if self._parent_cancel_requested: + # If this flag is set we *must* call uncancel(). + if self._parent_task.uncancel() == 0: + # If there are no pending cancellations left, + # don't propagate CancelledError. + propagate_cancellation_error = None + + if et is not None: + if not self._aborting: + # Our parent task is being cancelled: + # + # async with TaskGroup() as g: + # g.create_task(...) + # await ... # <- CancelledError + # + # or there's an exception in "async with": + # + # async with TaskGroup() as g: + # g.create_task(...) + # 1 / 0 + # + self._abort() + + # We use while-loop here because "self._on_completed_fut" + # can be cancelled multiple times if our parent task + # is being cancelled repeatedly (or even once, when + # our own cancellation is already in progress) + while self._tasks: + if self._on_completed_fut is None: + self._on_completed_fut = self._loop.create_future() + + try: + await self._on_completed_fut + except exceptions.CancelledError as ex: + if not self._aborting: + # Our parent task is being cancelled: + # + # async def wrapper(): + # async with TaskGroup() as g: + # g.create_task(foo) + # + # "wrapper" is being cancelled while "foo" is + # still running. + propagate_cancellation_error = ex + self._abort() + + self._on_completed_fut = None + + assert not self._tasks + + if self._base_error is not None: + raise self._base_error + + # Propagate CancelledError if there is one, except if there + # are other errors -- those have priority. + if propagate_cancellation_error and not self._has_errors: + raise propagate_cancellation_error + + if et is not None and et is not exceptions.CancelledError: + self._has_errors = True + + def create_task(self, coro, *, name=None, context=None): + """Create a new task in this group and return it. + + Similar to `asyncio.create_task`. + """ + if not self._entered: + raise RuntimeError( + f"{type(self).__name__} {self!r} has not been entered" + ) + if self._exiting and not self._tasks: + raise RuntimeError(f"{type(self).__name__} {self!r} is finished") + if self._aborting: + raise RuntimeError( + f"{type(self).__name__} {self!r} is shutting down" + ) + if context is None: + task = self._loop.create_task(coro) + else: + task = self._loop.create_task(coro, context=context) + tasks._set_task_name(task, name) + # optimization: Immediately call the done callback if the task is + # already done (e.g. if the coro was able to complete eagerly), + # and skip scheduling a done callback + if task.done(): + self._on_task_done(task) + else: + self._tasks.add(task) + task.add_done_callback(self._on_task_done) + return task + + # Since Python 3.8 Tasks propagate all exceptions correctly, + # except for KeyboardInterrupt and SystemExit which are + # still considered special. + + def _is_base_error(self, exc: BaseException) -> bool: + assert isinstance(exc, BaseException) + return isinstance(exc, (SystemExit, KeyboardInterrupt)) + + def _abort(self): + self._aborting = True + + for t in self._tasks: + if not t.done(): + t.cancel() + + shutdown = _abort # alias + + def _on_task_done(self, task): + self._tasks.discard(task) + + if self._on_completed_fut is not None and not self._tasks: + if not self._on_completed_fut.done(): + self._on_completed_fut.set_result(True) + + if task.cancelled(): + return + + exc = task.exception() + if exc is None: + return + + self._has_errors = True + match self._delegate_errors: + case None: + pass # deliberately set to ignore errors + case func if callable(func): + func({ + 'message': f'Task {task!r} has errored inside the parent ' + f'task {self._parent_task}', + 'exception': exc, + 'task': task, + }) + case default if default is _default_error_handler: + self._loop.call_exception_handler({ + 'message': f'Task {task!r} has errored inside the parent ' + f'task {self._parent_task}', + 'exception': exc, + 'task': task, + }) + + is_base_error = self._is_base_error(exc) + if is_base_error and self._base_error is None: + self._base_error = exc + + if self._parent_task.done(): + # Not sure if this case is possible, but we want to handle + # it anyways. + self._loop.call_exception_handler({ + 'message': f'Task {task!r} has errored out but its parent ' + f'task {self._parent_task} is already completed', + 'exception': exc, + 'task': task, + }) + return + + if is_base_error: + # If parent task *is not* being cancelled, it means that we want + # to manually cancel it to abort whatever is being run right now + # in the TaskGroup. But we want to mark parent task as + # "not cancelled" later in __aexit__. Example situation that + # we need to handle: + # + # async def foo(): + # try: + # async with TaskGroup() as g: + # g.create_task(crash_soon()) + # await something # <- this needs to be canceled + # # by the TaskGroup, e.g. + # # foo() needs to be cancelled + # except Exception: + # # Ignore any exceptions raised in the TaskGroup + # pass + # await something_else # this line has to be called + # # after TaskGroup is finished. + self._abort() + self._parent_cancel_requested = True + self._parent_task.cancel() diff --git a/Lib/test/test_asyncio/test_taskscopes.py b/Lib/test/test_asyncio/test_taskscopes.py new file mode 100644 index 00000000000000..b74edfcb58025f --- /dev/null +++ b/Lib/test/test_asyncio/test_taskscopes.py @@ -0,0 +1,571 @@ +# license: PSFL. + +import unittest +from unittest import mock + +import asyncio +from asyncio import taskscopes +from test.test_asyncio import utils as test_utils + + +# To prevent a warning "test altered the execution environment" +def tearDownModule(): + asyncio.set_event_loop_policy(None) + + +class MyExc(Exception): + pass + + +class MyBaseExc(BaseException): + pass + + +class TestTaskScope(unittest.IsolatedAsyncioTestCase): + + async def test_children_complete_on_child_error(self): + async def zero_division(): + 1 / 0 + + async def foo1(): + await asyncio.sleep(0.1) + return 42 + + async def foo2(): + await asyncio.sleep(0.2) + return 11 + + loop = asyncio.get_running_loop() + exc_handler = mock.Mock() + with mock.patch.object(loop, 'call_exception_handler', exc_handler): + async with taskscopes.TaskScope() as g: + t1 = g.create_task(foo1()) + t2 = g.create_task(foo2()) + t3 = g.create_task(zero_division()) + + self.assertEqual(t1.result(), 42) + self.assertEqual(t2.result(), 11) + exc_handler.assert_called_with({ + 'message': test_utils.MockPattern( + '^Task .* has errored inside the parent .*' + ), + 'exception': test_utils.MockInstanceOf(ZeroDivisionError), + 'task': t3, + }) + + async def test_inner_complete_on_child_error(self): + async def zero_division(): + 1 / 0 + + async def foo1(): + await asyncio.sleep(0.1) + return 42 + + async def foo2(): + await asyncio.sleep(0.2) + return 11 + + loop = asyncio.get_running_loop() + exc_handler = mock.Mock() + with mock.patch.object(loop, 'call_exception_handler', exc_handler): + async with taskscopes.TaskScope() as g: + t1 = g.create_task(foo1()) + t2 = g.create_task(zero_division()) + r1 = await foo2() + + self.assertEqual(t1.result(), 42) + self.assertEqual(r1, 11) + exc_handler.assert_called_with({ + 'message': test_utils.MockPattern( + '^Task .* has errored inside the parent .*' + ), + 'exception': test_utils.MockInstanceOf(ZeroDivisionError), + 'task': t2, + }) + + async def test_children_exceptions_propagate(self): + async def zero_division(): + 1 / 0 + + async def value_error(): + await asyncio.sleep(0.2) + raise ValueError + + async def foo1(): + await asyncio.sleep(0.4) + return 42 + + loop = asyncio.get_running_loop() + exc_handler = mock.Mock() + with mock.patch.object(loop, 'call_exception_handler', exc_handler): + async with taskscopes.TaskScope() as g: + t1 = g.create_task(zero_division()) + t2 = g.create_task(value_error()) + t3 = g.create_task(foo1()) + + exc_handler.assert_has_calls( + [ + mock.call({ + 'message': test_utils.MockPattern( + '^Task .* has errored inside the parent .*' + ), + 'exception': test_utils.MockInstanceOf(ZeroDivisionError), + 'task': t1, + }), + mock.call({ + 'message': test_utils.MockPattern( + '^Task .* has errored inside the parent .*' + ), + 'exception': test_utils.MockInstanceOf(ValueError), + 'task': t2, + }), + ], + any_order=True, + ) + self.assertEqual(t3.result(), 42) + + async def test_children_cancel_on_inner_failure(self): + async def zero_division(): + 1 / 0 + + async def foo1(): + await asyncio.sleep(0.2) + return 42 + + with self.assertRaises(ZeroDivisionError): + async with taskscopes.TaskScope() as g: + t1 = g.create_task(foo1()) + await zero_division() + + self.assertTrue(t1.cancelled()) + + async def test_cancellation_01(self): + + NUM = 0 + + async def foo(): + nonlocal NUM + try: + await asyncio.sleep(5) + except asyncio.CancelledError: + NUM += 1 + raise + + async def runner(): + async with taskscopes.TaskScope() as g: + for _ in range(5): + g.create_task(foo()) + + r = asyncio.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError) as cm: + await r + + self.assertEqual(NUM, 5) + + async def test_delegate_error_ignore(self): + async def zero_division(): + 1 / 0 + + async def value_error(): + await asyncio.sleep(0.2) + raise ValueError + + async def foo1(): + await asyncio.sleep(0.4) + return 42 + + loop = asyncio.get_running_loop() + exc_handler = mock.Mock() + with mock.patch.object(loop, 'call_exception_handler', exc_handler): + async with taskscopes.TaskScope(delegate_errors=None) as g: + g.create_task(zero_division()) + g.create_task(value_error()) + g.create_task(foo1()) + + exc_handler.assert_not_called() + + async def test_delegate_error_custom(self): + async def zero_division(): + 1 / 0 + + async def value_error(): + await asyncio.sleep(0.2) + raise ValueError + + async def foo1(): + await asyncio.sleep(0.4) + return 42 + + catched_errors = [] + + def catch_error(context): + nonlocal catched_errors + catched_errors.append(context) + + async with taskscopes.TaskScope(delegate_errors=catch_error) as g: + t1 = g.create_task(zero_division()) + t2 = g.create_task(value_error()) + g.create_task(foo1()) + + match_count = 0 + for item in catched_errors: + match item["exception"]: + case ZeroDivisionError(): + self.assertIs(item["task"], t1) + match_count += 1 + case ValueError(): + self.assertIs(item["task"], t2) + match_count += 10 + self.assertEqual(match_count, 11) + + async def test_taskgroup_35(self): + + NUM = 0 + + async def foo(): + nonlocal NUM + try: + await asyncio.sleep(5) + except asyncio.CancelledError: + NUM += 1 + raise + + async def runner(): + nonlocal NUM + async with taskscopes.TaskScope() as g: + for _ in range(5): + g.create_task(foo()) + + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + NUM += 10 + raise + + r = asyncio.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + self.assertEqual(NUM, 15) + + async def test_taskgroup_36(self): + + async def foo(): + try: + await asyncio.sleep(10) + finally: + 1 / 0 + + async def runner(): + async with taskscopes.TaskScope() as g: + for _ in range(5): + g.create_task(foo()) + + await asyncio.sleep(10) + + loop = asyncio.get_running_loop() + exc_handler = mock.Mock() + with mock.patch.object(loop, 'call_exception_handler', exc_handler): + r = asyncio.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + exc_handler.assert_called_with({ + 'message': test_utils.MockPattern( + '^Task .* has errored inside the parent .*' + ), + 'exception': test_utils.MockInstanceOf(ZeroDivisionError), + 'task': mock.ANY, + }) + self.assertEqual(len(exc_handler.call_args_list), 5) + + async def test_taskgroup_37(self): + + async def foo(): + try: + await asyncio.sleep(10) + finally: + 1 / 0 + + async def runner(): + async with taskscopes.TaskScope(): + async with taskscopes.TaskScope() as g2: + for _ in range(5): + g2.create_task(foo()) + + await asyncio.sleep(10) + + loop = asyncio.get_running_loop() + exc_handler = mock.Mock() + with mock.patch.object(loop, 'call_exception_handler', exc_handler): + r = asyncio.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + exc_handler.assert_called_with({ + 'message': test_utils.MockPattern( + '^Task .* has errored inside the parent .*' + ), + 'exception': test_utils.MockInstanceOf(ZeroDivisionError), + 'task': mock.ANY, + }) + self.assertEqual(len(exc_handler.call_args_list), 5) + + async def test_taskgroup_38(self): + + async def foo(): + try: + await asyncio.sleep(10) + finally: + 1 / 0 + + async def runner(): + async with taskscopes.TaskScope() as g1: + g1.create_task(asyncio.sleep(10)) + + async with taskscopes.TaskScope() as g2: + for _ in range(5): + g2.create_task(foo()) + + await asyncio.sleep(10) + + loop = asyncio.get_running_loop() + exc_handler = mock.Mock() + with mock.patch.object(loop, 'call_exception_handler', exc_handler): + r = asyncio.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + exc_handler.assert_called_with({ + 'message': test_utils.MockPattern( + '^Task .* has errored inside the parent .*' + ), + 'exception': test_utils.MockInstanceOf(ZeroDivisionError), + 'task': mock.ANY, + }) + self.assertEqual(len(exc_handler.call_args_list), 5) + + async def test_taskgroup_39(self): + + async def crash_soon(): + await asyncio.sleep(0.3) + 1 / 0 + + async def runner(): + async with taskscopes.TaskScope() as g1: + g1.create_task(crash_soon()) + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + await asyncio.sleep(0.5) + raise + + loop = asyncio.get_running_loop() + exc_handler = mock.Mock() + with mock.patch.object(loop, 'call_exception_handler', exc_handler): + r = asyncio.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + exc_handler.assert_called_with({ + 'message': test_utils.MockPattern( + '^Task .* has errored inside the parent .*' + ), + 'exception': test_utils.MockInstanceOf(ZeroDivisionError), + 'task': mock.ANY, + }) + + async def test_taskgroup_40(self): + + async def crash_soon(): + await asyncio.sleep(0.3) + 1 / 0 + + async def nested_runner(): + async with taskscopes.TaskScope() as g1: + g1.create_task(crash_soon()) + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + await asyncio.sleep(0.5) + raise + + async def runner(): + t = asyncio.create_task(nested_runner()) + await t + + loop = asyncio.get_running_loop() + exc_handler = mock.Mock() + with mock.patch.object(loop, 'call_exception_handler', exc_handler): + r = asyncio.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + exc_handler.assert_called_with({ + 'message': test_utils.MockPattern( + '^Task .* has errored inside the parent .*' + ), + 'exception': test_utils.MockInstanceOf(ZeroDivisionError), + 'task': mock.ANY, + }) + + async def test_taskgroup_41(self): + + NUM = 0 + + async def runner(): + nonlocal NUM + async with taskscopes.TaskScope(): + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + NUM += 10 + raise + + r = asyncio.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + with self.assertRaises(asyncio.CancelledError): + await r + + self.assertEqual(NUM, 10) + + async def test_taskgroup_42(self): + + NUM = 0 + + async def runner(): + nonlocal NUM + async with taskscopes.TaskScope(): + try: + await asyncio.sleep(10) + except asyncio.CancelledError: + NUM += 10 + # This isn't a good idea, but we have to support + # this weird case. + raise MyExc + + r = asyncio.create_task(runner()) + await asyncio.sleep(0.1) + + self.assertFalse(r.done()) + r.cancel() + + with self.assertRaises(MyExc): + await r + + self.assertEqual(NUM, 10) + + async def test_taskgroup_43(self): + t1 = None + + async def crash_soon(): + await asyncio.sleep(0.1) + 1 / 0 + + async def nested(): + try: + await asyncio.sleep(10) + finally: + raise MyExc + + async def runner(): + nonlocal t1 + async with taskscopes.TaskScope() as g: + t1 = g.create_task(crash_soon()) + await nested() + + loop = asyncio.get_running_loop() + exc_handler = mock.Mock() + with mock.patch.object(loop, 'call_exception_handler', exc_handler): + r = asyncio.create_task(runner()) + with self.assertRaises(MyExc): + await r + + exc_handler.assert_called_with({ + 'message': test_utils.MockPattern( + '^Task .* has errored inside the parent .*' + ), + 'exception': test_utils.MockInstanceOf(ZeroDivisionError), + 'task': t1, + }) + + async def test_taskgroup_44(self): + + async def foo1(): + await asyncio.sleep(1) + return 42 + + async def foo2(): + await asyncio.sleep(2) + return 11 + + async def runner(): + async with taskscopes.TaskScope() as g: + g.create_task(foo1()) + g.create_task(foo2()) + + r = asyncio.create_task(runner()) + await asyncio.sleep(0.05) + r.cancel() + + with self.assertRaises(asyncio.CancelledError): + await r + + async def test_taskgroup_45(self): + + NUM = 0 + + async def foo1(): + nonlocal NUM + await asyncio.sleep(0.2) + NUM += 1 + + async def foo2(): + nonlocal NUM + await asyncio.sleep(0.3) + NUM += 2 + + async def runner(): + async with taskscopes.TaskScope() as g: + g.create_task(foo1()) + g.create_task(foo2()) + + r = asyncio.create_task(runner()) + await asyncio.sleep(0.05) + r.cancel() + + with self.assertRaises(asyncio.CancelledError): + await r + + self.assertEqual(NUM, 0) diff --git a/Misc/NEWS.d/next/Library/2023-05-28-05-06-23.gh-issue-101581.KKTDqD.rst b/Misc/NEWS.d/next/Library/2023-05-28-05-06-23.gh-issue-101581.KKTDqD.rst new file mode 100644 index 00000000000000..e16869a6c3c407 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2023-05-28-05-06-23.gh-issue-101581.KKTDqD.rst @@ -0,0 +1 @@ +Add :class:`asyncio.TaskScope` as a task cancellation scope primitive and now :class:`asyncio.TaskGroup` is an extension of it, allowing easier writing of safe coroutine lifecycle managers in 3rd party codes. Contribution by Andrea Tedeschi and Joongi Kim.