Skip to content

Commit

Permalink
Initial gather (#5)
Browse files Browse the repository at this point in the history
* Initial gather

* Fix imports

* Fix type alias?

* Fixes

* Fix type hint

* Fix types

* Exclude overloads

* Reorganize

* Test for double enter

* Let's be strict

* Always upload coverage

* Improve coverage

* Fix import
  • Loading branch information
Tinche authored Nov 29, 2023
1 parent c047512 commit 710c673
Show file tree
Hide file tree
Showing 9 changed files with 467 additions and 46 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,15 @@ jobs:
export TOTAL=$(python -c "import json;print(json.load(open('coverage.json'))['totals']['percent_covered_display'])")
echo "total=$TOTAL" >> $GITHUB_ENV
# Report again and fail if under the threshold.
python -Im coverage report --fail-under=97
- name: "Upload HTML report."
uses: "actions/upload-artifact@v3"
with:
name: "html-report"
path: "htmlcov"
if: always()

- name: "Make badge"
if: github.ref == 'refs/heads/main'
Expand Down
95 changes: 66 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,49 +21,42 @@ To install _quattro_, simply:
$ pip install quattro
```

## Task Groups
## `quattro.gather`

On Python 3.11 and later, the [standard library TaskGroup](https://docs.python.org/3/library/asyncio-task.html#task-groups) implementation is used instead.
The TaskGroup implementation here can be considered a backport for older Python versions.

_quattro_ contains a TaskGroup implementation. TaskGroups are inspired by [Trio nurseries](https://trio.readthedocs.io/en/stable/reference-core.html#nurseries-and-spawning).
_quattro_ comes with an independent, simple implementation of [`asyncio.gather`](https://docs.python.org/3/library/asyncio-task.html#asyncio.gather) based on Task Groups.
The _quattro_ version is safer, and uses a task group under the hood to not leak tasks in cases of errors in child tasks.

```python
from quattro import TaskGroup
from quattro import gather

async def my_handler():
# We want to spawn some tasks, and ensure they are all handled before we return.
async def task_1():
...

async def task_2():
...

async with TaskGroup() as tg:
t1 = tg.create_task(task_1)
t2 = tg.create_task(task_2)

# The end of the `async with` block awaits the tasks, ensuring they are handled.
res_1, res_2 = await gather(long_query_1(), long_query_2())
```

TaskGroups are essential building blocks for achieving the concept of [structured concurrency](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/).
In simple terms, structured concurrency means your code does not leak tasks - when a coroutine
finishes, all tasks spawned by that coroutine and all its children are also finished.
(In fancy terms, the execution flow becomes a directed acyclic graph.)
The `return_exceptions` argument can be used to make `gather()` catch and return exceptions as responses instead of letting them bubble out.

Structured concurrency can be achieved by using TaskGroups instead of `asyncio.create_task` to start background tasks.
TaskGroups essentially do two things:
```python
from quattro import gather

- when exiting from a TaskGroup `async with` block, the TaskGroup awaits all of its children, ensuring they are finished when it exits
- when a TaskGroup child task raises an exception, all other children and the task inside the context manager are cancelled
async def my_handler():
res_1, res_2 = await gather(
long_query_1(),
long_query_2(),
return_exceptions=True,
)

The implementation has been borrowed from the EdgeDB project.
# res_1 and res_2 may be instances of exceptions.
```

The differences to `asyncio.gather` are:
- `quattro.gather()` only accepts coroutines and not futures and generators, just like a TaskGroup.
- When `return_exceptions` is false (the default), an exception in a child task will cause an ExceptionGroup to bubble out of the top-level `gather()` call, just like in a TaskGroup.
- Results are returned as a tuple, not a list.

## Cancel Scopes

_quattro_ contains an independent, asyncio implementation of [Trio CancelScopes](https://trio.readthedocs.io/en/stable/reference-core.html#cancellation-and-timeouts).
Due to fundamental differences between asyncio and Trio the actual runtime behavior isn't
exactly the same, but close.
Due to fundamental differences between asyncio and Trio the actual runtime behavior isn't exactly the same, but close.

```python
from quattro import move_on_after
Expand Down Expand Up @@ -106,6 +99,7 @@ The differences are:
- The _quattro_ versions can be cancelled manually using `scope.cancel()`, and precancelled before they are entered
- The _quattro_ versions are available on all supported Python versions, not just 3.11+.


### asyncio and Trio differences

`fail_after` and `fail_at` raise `asyncio.Timeout` instead of `trio.Cancelled` exceptions when they fail.
Expand All @@ -128,10 +122,53 @@ This is a limitation of the underlying framework.

In _quattro_, cancellation scopes cannot be shielded.


## Task Groups

On Python 3.11 and later, the [standard library TaskGroup](https://docs.python.org/3/library/asyncio-task.html#task-groups) implementation is used instead.
The TaskGroup implementation here can be considered a backport for older Python versions.

_quattro_ contains a TaskGroup implementation. TaskGroups are inspired by [Trio nurseries](https://trio.readthedocs.io/en/stable/reference-core.html#nurseries-and-spawning).

```python
from quattro import TaskGroup

async def my_handler():
# We want to spawn some tasks, and ensure they are all handled before we return.
async def task_1():
...

async def task_2():
...

async with TaskGroup() as tg:
t1 = tg.create_task(task_1)
t2 = tg.create_task(task_2)

# The end of the `async with` block awaits the tasks, ensuring they are handled.
```

TaskGroups are essential building blocks for achieving the concept of [structured concurrency](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/).
In simple terms, structured concurrency means your code does not leak tasks - when a coroutine
finishes, all tasks spawned by that coroutine and all its children are also finished.
(In fancy terms, the execution flow becomes a directed acyclic graph.)

Structured concurrency can be achieved by using TaskGroups instead of `asyncio.create_task` to start background tasks.
TaskGroups essentially do two things:

- when exiting from a TaskGroup `async with` block, the TaskGroup awaits all of its children, ensuring they are finished when it exits
- when a TaskGroup child task raises an exception, all other children and the task inside the context manager are cancelled

The implementation has been borrowed from the EdgeDB project.


## Changelog

### 23.1.0 (UNRELEASED)

- Introduce `quattro.gather`.
([#5](https://github.com/Tinche/quattro/pull/5))
- Add support for Python 3.12.
- Switch to [PDM](https://pdm.fming.dev/latest/).

### 22.2.0 (2022-12-27)
Expand Down
14 changes: 11 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ authors = [{name = "Tin Tvrtkovic", email = "tinchester@gmail.com"}]
classifiers = ["License :: OSI Approved :: Apache Software License"]
dynamic = ["description", "version"]
requires-python = ">=3.9"
dependencies = ["attrs", "exceptiongroup; python_version < '3.11'"]
dependencies = [
"attrs",
"exceptiongroup; python_version < '3.11'",
]
readme = "README.md"

[tool.pdm.dev-dependencies]
Expand All @@ -22,8 +25,8 @@ lint = [
"ruff>=0.0.272",
]

[tool.isort]
profile = "attrs"
[tool.mypy]
warn_unused_ignores = true

[[tool.mypy.overrides]]
module = "asyncio.taskgroups"
Expand Down Expand Up @@ -69,6 +72,11 @@ ignore = [
"PGH003", # leave my type: ignores alone
]

[tool.coverage.report]
exclude_also = [
"@overload",
]

[tool.hatch.version]
source = "vcs"
raw-options = { local_scheme = "no-local-version" }
27 changes: 15 additions & 12 deletions src/quattro/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
"""Task control for asyncio."""
from __future__ import annotations

from ._gather import gather
from .cancelscope import (
CancelScope,
cancel_stack,
Expand All @@ -9,21 +12,21 @@
)
from .taskgroup import TaskGroup


def get_current_effective_deadline() -> float:
return min(
[cs._deadline for cs in cancel_stack.get() if cs._deadline is not None],
default=float("inf"),
)


__all__ = [
"TaskGroup",
"TaskGroupError",
"CancelScope",
"fail_after",
"fail_at",
"gather",
"get_current_effective_deadline",
"move_on_after",
"move_on_at",
"CancelScope",
"get_current_effective_deadline",
"TaskGroup",
"TaskGroupError",
]


def get_current_effective_deadline() -> float:
return min(
[cs._deadline for cs in cancel_stack.get() if cs._deadline is not None],
default=float("inf"),
)
Loading

0 comments on commit 710c673

Please sign in to comment.