Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed TaskGroup.start() cancellation on asyncio #597

Merged
merged 5 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
whether there are running child tasks to be waited on
- On asyncio, cancel scopes will defer cancelling tasks that are scheduled to resume
with a finished future
- On asyncio and Python 3.9 or later, cancel scopes now only suppress cancellation
exceptions if the cancel message matches the scope
- Task groups on all backends now raise a single cancellation exception when an outer
cancel scope is cancelled, and no exceptions other than cancellation exceptions are
raised in the group
Expand Down Expand Up @@ -45,6 +47,7 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
- Fixed ``MemoryObjectReceiveStream.receive()`` causing the receiving task on asyncio to
remain in a cancelled state if the operation was cancelled after an item was queued to
be received by the task (but before the task could actually receive the item)
- Fixed ``TaskGroup.start()`` on asyncio not responding to cancellation from the outside
- Fixed tasks started from ``BlockingPortal`` not notifying synchronous listeners
(``concurrent.futures.wait()``) when they're cancelled
- Removed unnecessary extra waiting cycle in ``Event.wait()`` on asyncio in the case
Expand Down
34 changes: 21 additions & 13 deletions src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from collections import OrderedDict, deque
from collections.abc import AsyncIterator, Iterable
from concurrent.futures import Future
from contextlib import suppress
from contextvars import Context, copy_context
from dataclasses import dataclass
from functools import partial, wraps
Expand Down Expand Up @@ -417,22 +418,23 @@ def __exit__(
self._deliver_cancellation_to_parent()

if isinstance(exc_val, CancelledError) and self._cancel_called:
self._cancelled_caught = self._uncancel()
self._cancelled_caught = self._uncancel(exc_val)
return self._cancelled_caught

return None

def _uncancel(self) -> bool:
if sys.version_info < (3, 11) or self._host_task is None:
def _uncancel(self, cancelled_exc: CancelledError) -> bool:
if sys.version_info < (3, 9) or self._host_task is None:
self._cancel_calls = 0
return True

# Uncancel all AnyIO cancellations
for i in range(self._cancel_calls):
self._host_task.uncancel()
if sys.version_info >= (3, 11):
for i in range(self._cancel_calls):
self._host_task.uncancel()

self._cancel_calls = 0
return not self._host_task.cancelling()
return f"Cancelled by cancel scope {id(self):x}" in cancelled_exc.args

def _timeout(self) -> None:
if self._deadline != math.inf:
Expand Down Expand Up @@ -471,7 +473,10 @@ def _deliver_cancellation(self) -> None:
waiter = task._fut_waiter # type: ignore[attr-defined]
if not isinstance(waiter, asyncio.Future) or not waiter.done():
self._cancel_calls += 1
task.cancel()
if sys.version_info >= (3, 9):
task.cancel(f"Cancelled by cancel scope {id(self):x}")
else:
task.cancel()

# Schedule another callback if there are still tasks left
if should_retry:
Expand Down Expand Up @@ -751,12 +756,15 @@ async def start(
# point between, the task group is cancelled and this method never proceeds to
# process the completed future. That's why we have to have a shielded cancel
# scope here.
with CancelScope(shield=True):
try:
return await future
except CancelledError:
task.cancel()
raise
try:
return await future
except CancelledError:
# Cancel the task and wait for it to exit before returning
task.cancel()
with CancelScope(shield=True), suppress(CancelledError):
await task

raise


#
Expand Down
8 changes: 4 additions & 4 deletions tests/test_taskgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ async def start_another() -> None:
async with create_task_group() as tg:
await tg.start(taskfunc)

if sys.version_info < (3, 9):
pytest.xfail("Requires a way to detect cancellation source")

task = asyncio.get_running_loop().create_task(start_another())
await wait_all_tasks_blocked()
task.cancel()
Expand Down Expand Up @@ -259,11 +262,8 @@ def task_fn(*, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None:
await tg.start(task_fn) # type: ignore[arg-type]


async def test_start_cancel_after_error(anyio_backend_name: str) -> None:
async def test_start_cancel_after_error() -> None:
"""Regression test for #517."""
if anyio_backend_name == "asyncio":
pytest.xfail("Known issue with the asyncio backend")

sleep_completed = False

async def sleep_and_raise() -> None:
Expand Down