Skip to content

Commit

Permalink
Fixed TaskGroup.start() cancellation on asyncio (#597)
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm authored Jul 26, 2023
1 parent a6ab281 commit c7c4acf
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 17 deletions.
3 changes: 3 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
whether there are running child tasks to be waited on
- On asyncio, cancel scopes will defer cancelling tasks that are scheduled to resume
with a finished future
- On asyncio and Python 3.9 or later, cancel scopes now only suppress cancellation
exceptions if the cancel message matches the scope
- Task groups on all backends now raise a single cancellation exception when an outer
cancel scope is cancelled, and no exceptions other than cancellation exceptions are
raised in the group
Expand Down Expand Up @@ -45,6 +47,7 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
- Fixed ``MemoryObjectReceiveStream.receive()`` causing the receiving task on asyncio to
remain in a cancelled state if the operation was cancelled after an item was queued to
be received by the task (but before the task could actually receive the item)
- Fixed ``TaskGroup.start()`` on asyncio not responding to cancellation from the outside
- Fixed tasks started from ``BlockingPortal`` not notifying synchronous listeners
(``concurrent.futures.wait()``) when they're cancelled
- Removed unnecessary extra waiting cycle in ``Event.wait()`` on asyncio in the case
Expand Down
34 changes: 21 additions & 13 deletions src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from collections import OrderedDict, deque
from collections.abc import AsyncIterator, Iterable
from concurrent.futures import Future
from contextlib import suppress
from contextvars import Context, copy_context
from dataclasses import dataclass
from functools import partial, wraps
Expand Down Expand Up @@ -417,22 +418,23 @@ def __exit__(
self._deliver_cancellation_to_parent()

if isinstance(exc_val, CancelledError) and self._cancel_called:
self._cancelled_caught = self._uncancel()
self._cancelled_caught = self._uncancel(exc_val)
return self._cancelled_caught

return None

def _uncancel(self) -> bool:
if sys.version_info < (3, 11) or self._host_task is None:
def _uncancel(self, cancelled_exc: CancelledError) -> bool:
if sys.version_info < (3, 9) or self._host_task is None:
self._cancel_calls = 0
return True

# Uncancel all AnyIO cancellations
for i in range(self._cancel_calls):
self._host_task.uncancel()
if sys.version_info >= (3, 11):
for i in range(self._cancel_calls):
self._host_task.uncancel()

self._cancel_calls = 0
return not self._host_task.cancelling()
return f"Cancelled by cancel scope {id(self):x}" in cancelled_exc.args

def _timeout(self) -> None:
if self._deadline != math.inf:
Expand Down Expand Up @@ -471,7 +473,10 @@ def _deliver_cancellation(self) -> None:
waiter = task._fut_waiter # type: ignore[attr-defined]
if not isinstance(waiter, asyncio.Future) or not waiter.done():
self._cancel_calls += 1
task.cancel()
if sys.version_info >= (3, 9):
task.cancel(f"Cancelled by cancel scope {id(self):x}")
else:
task.cancel()

# Schedule another callback if there are still tasks left
if should_retry:
Expand Down Expand Up @@ -751,12 +756,15 @@ async def start(
# point between, the task group is cancelled and this method never proceeds to
# process the completed future. That's why we have to have a shielded cancel
# scope here.
with CancelScope(shield=True):
try:
return await future
except CancelledError:
task.cancel()
raise
try:
return await future
except CancelledError:
# Cancel the task and wait for it to exit before returning
task.cancel()
with CancelScope(shield=True), suppress(CancelledError):
await task

raise


#
Expand Down
8 changes: 4 additions & 4 deletions tests/test_taskgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ async def start_another() -> None:
async with create_task_group() as tg:
await tg.start(taskfunc)

if sys.version_info < (3, 9):
pytest.xfail("Requires a way to detect cancellation source")

task = asyncio.get_running_loop().create_task(start_another())
await wait_all_tasks_blocked()
task.cancel()
Expand Down Expand Up @@ -259,11 +262,8 @@ def task_fn(*, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None:
await tg.start(task_fn) # type: ignore[arg-type]


async def test_start_cancel_after_error(anyio_backend_name: str) -> None:
async def test_start_cancel_after_error() -> None:
"""Regression test for #517."""
if anyio_backend_name == "asyncio":
pytest.xfail("Known issue with the asyncio backend")

sleep_completed = False

async def sleep_and_raise() -> None:
Expand Down

0 comments on commit c7c4acf

Please sign in to comment.