Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand cancellation usability from native trio threads #2392

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
d9d02db
Add tests and stub for thread cancellation and reuse.
richardsheridan Jul 22, 2022
0c0ff10
make from_thread tests pass
richardsheridan Mar 4, 2023
5f86fb2
remove failing tests and untested feature
richardsheridan Mar 4, 2023
5f75c86
refactor system tasks to use messages as well
richardsheridan Mar 4, 2023
7ccf0f7
factor out trio_token checking logic
richardsheridan Mar 9, 2023
6d3b2d6
Add trio.from_thread.check_cancelled api to allow threads to efficien…
richardsheridan Aug 8, 2022
2d911ea
Document trio.from_thread.check_cancelled
richardsheridan Aug 8, 2022
bc6c82a
Add Newsfragment
richardsheridan Aug 8, 2022
644b913
Linting and fix docs
richardsheridan Aug 8, 2022
b87456f
use cancel_register in _send_message_to_host_task
richardsheridan Mar 9, 2023
f6b27b2
Document thread reuse semantics
richardsheridan Mar 10, 2023
2b088cc
test coverage for cancelled host task
richardsheridan Mar 11, 2023
832da41
unnest unprotected_fn
richardsheridan Mar 11, 2023
5a10e9b
flip _send_message_to_host_task.in_trio_thread semantics
richardsheridan Mar 11, 2023
a09aa84
Merge branch 'master' into from_thread_check_cancelled
richardsheridan Sep 3, 2023
787d286
Aesthetic refactor of API functions
richardsheridan Sep 3, 2023
881180f
fix typos in docs
richardsheridan Sep 3, 2023
c0e11d4
remove extra cvar toggling
richardsheridan Sep 3, 2023
1930cae
Transmute AttributeError to RuntimeError
richardsheridan Sep 3, 2023
b02dfe9
type consistency between from_thread_run and from_thread_run_sync
richardsheridan Sep 6, 2023
1bb9b79
split up nonblocking send of message from blocking reception of response
richardsheridan Sep 6, 2023
689d45c
thread messages do not necessarily have RetT
richardsheridan Sep 16, 2023
c1990d4
Update docs based on review comments
richardsheridan Oct 7, 2023
d4d10bb
Merge remote-tracking branch 'upstream/master' into from_thread_check…
richardsheridan Oct 7, 2023
daed7bb
fiddle type completeness
richardsheridan Oct 7, 2023
2be054a
fix test_from_thread_run_during_shutdown
richardsheridan Oct 7, 2023
a667a52
apply nits from code review
richardsheridan Oct 7, 2023
d6df308
document "extra" checkpoints needed to pick up context
richardsheridan Oct 8, 2023
9f4e79e
add TODOs for future assert_never type cleverness
richardsheridan Oct 8, 2023
0e18c93
implement cancellation semantics suggestions from code review
richardsheridan Oct 8, 2023
eab30c4
adjust coverage pragma
richardsheridan Oct 8, 2023
2f79f15
revise and document cancellation semantics
richardsheridan Oct 15, 2023
96e45c5
Apply suggestions from code review
richardsheridan Oct 18, 2023
ab092b0
Merge branch 'master' into from_thread_check_cancelled
richardsheridan Oct 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions docs/source/reference-core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1823,8 +1823,22 @@ to spawn a child thread, and then use a :ref:`memory channel

.. literalinclude:: reference-core/from-thread-example.py

You can also perform a non-blocking check for cancellation from threads spawned
by `trio.to_thread.run_sync`.
.. note::

The ``from_thread.run*`` functions reuse the host task that called
:func:`trio.to_thread.run_sync` to run your provided function in the typical case,
namely when ``cancellable=False`` so Trio can be sure that the task will always be
richardsheridan marked this conversation as resolved.
Show resolved Hide resolved
around to perform the work. If you pass ``cancellable=True`` at the outset, or if
you provide a :class:`~trio.lowlevel.TrioToken` when calling back in to Trio, your
functions will be executed in a new system task. Therefore, the
:func:`~trio.lowlevel.current_task`, :func:`current_effective_deadline`, or other
task-tree specific values may differ depending on keyword argument values.

You can also use :func:`trio.from_thread.check_cancelled` to check for cancellation from
a thread that was spawned by :func:`trio.to_thread.run_sync`. If the call to
:func:`~trio.to_thread.run_sync` was cancelled (even if ``cancellable=False``!), then
:func:`~trio.from_thread.check_cancelled` will raise :func:`trio.Cancelled`.
It's like ``trio.from_thread.run(trio.sleep, 0)``, but much faster.

.. autofunction:: trio.from_thread.check_cancelled

Expand Down
6 changes: 5 additions & 1 deletion newsfragments/2392.feature.rst
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
If called from a thread spawned by `trio.to_thread.run_sync`, `trio.from_thread.run` and `trio.from_thread.run_sync` now reuse the task and cancellation status of the host task and have `trio.from_thread.check_cancelled` to efficiently poll for cancellation.
If called from a thread spawned by `trio.to_thread.run_sync`, `trio.from_thread.run` and
`trio.from_thread.run_sync` now reuse the task and cancellation status of the host task;
this means that context variables and cancel scopes naturally propagate 'through'
threads spawned by Trio. You can also use `trio.from_thread.check_cancelled`
to efficiently check for cancellation without reentering the Trio thread.
66 changes: 37 additions & 29 deletions trio/_tests/test_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ async def async_fn(): # pragma: no cover
def thread_fn():
from_thread_run_sync(async_fn)

with pytest.raises(TypeError, match="expected a sync function"):
with pytest.raises(TypeError, match="expected a synchronous function"):
await to_thread_run_sync(thread_fn)


Expand Down Expand Up @@ -813,19 +813,23 @@ async def agen(token):
try:
yield
finally:
with pytest.raises(_core.RunFinishedError), _core.CancelScope(shield=True):
await to_thread_run_sync(
partial(from_thread_run, sleep, 0, trio_token=token)
)
record.append("ok")
with _core.CancelScope(shield=True):
try:
await to_thread_run_sync(
partial(from_thread_run, sleep, 0, trio_token=token)
)
except _core.RunFinishedError:
record.append("finished")
else:
record.append("clean")

async def main(use_system_task):
save.append(agen(_core.current_trio_token() if use_system_task else None))
await save[-1].asend(None)

_core.run(main, True) # System nursery will be closed and raise RunFinishedError
_core.run(main, False) # host task will not be rescheduled
assert record == ["ok"]
_core.run(main, False) # host task will be rescheduled as normal
assert record == ["finished", "clean"]


async def test_trio_token_weak_referenceable():
Expand Down Expand Up @@ -874,8 +878,8 @@ def sync_check():
from_thread_run_sync(cancel_scope.cancel)
try:
from_thread_run_sync(bool)
except _core.Cancelled:
queue.put(True)
except _core.Cancelled: # pragma: no cover
queue.put(True) # sync functions don't raise Cancelled
else:
queue.put(False)

Expand All @@ -889,7 +893,7 @@ def sync_check():
await to_thread_run_sync(sync_check, cancellable=True)

assert cancel_scope.cancelled_caught
assert await to_thread_run_sync(partial(queue.get, timeout=1))
assert not await to_thread_run_sync(partial(queue.get, timeout=1))

async def no_checkpoint():
return True
Expand All @@ -898,8 +902,8 @@ def async_check():
from_thread_run_sync(cancel_scope.cancel)
try:
assert from_thread_run(no_checkpoint)
except _core.Cancelled:
queue.put(True)
except _core.Cancelled: # pragma: no cover
queue.put(True) # async functions raise Cancelled at checkpoints
else:
queue.put(False)

Expand All @@ -913,7 +917,7 @@ def async_check():
await to_thread_run_sync(async_check, cancellable=True)

assert cancel_scope.cancelled_caught
assert await to_thread_run_sync(partial(queue.get, timeout=1))
assert not await to_thread_run_sync(partial(queue.get, timeout=1))

async def async_time_bomb():
cancel_scope.cancel()
Expand All @@ -929,14 +933,16 @@ async def async_time_bomb():
async def test_from_thread_check_cancelled():
q = stdlib_queue.Queue()

async def child(cancellable):
record.append("start")
try:
return await to_thread_run_sync(f, cancellable=cancellable)
except _core.Cancelled:
record.append("cancel")
finally:
record.append("exit")
async def child(cancellable, scope):
with scope:
record.append("start")
try:
return await to_thread_run_sync(f, cancellable=cancellable)
except _core.Cancelled:
record.append("cancel")
raise
finally:
record.append("exit")

def f():
try:
Expand All @@ -952,7 +958,7 @@ def f():
record = []
ev = threading.Event()
async with _core.open_nursery() as nursery:
nursery.start_soon(child, False)
nursery.start_soon(child, False, _core.CancelScope())
await wait_all_tasks_blocked()
assert record[0] == "start"
assert q.get(timeout=1) == "Not Cancelled"
Expand All @@ -964,14 +970,15 @@ def f():
# the appropriate cancel scope
record = []
ev = threading.Event()
scope = _core.CancelScope() # Nursery cancel scope gives false positives
async with _core.open_nursery() as nursery:
nursery.start_soon(child, False)
nursery.start_soon(child, False, scope)
await wait_all_tasks_blocked()
assert record[0] == "start"
assert q.get(timeout=1) == "Not Cancelled"
nursery.cancel_scope.cancel()
scope.cancel()
ev.set()
assert nursery.cancel_scope.cancelled_caught
assert scope.cancelled_caught
assert "cancel" in record
assert record[-1] == "exit"

Expand All @@ -988,13 +995,14 @@ def f(): # noqa: F811

record = []
ev = threading.Event()
scope = _core.CancelScope()
async with _core.open_nursery() as nursery:
nursery.start_soon(child, True)
nursery.start_soon(child, True, scope)
await wait_all_tasks_blocked()
assert record[0] == "start"
nursery.cancel_scope.cancel()
scope.cancel()
ev.set()
assert nursery.cancel_scope.cancelled_caught
assert scope.cancelled_caught
assert "cancel" in record
assert record[-1] == "exit"
assert q.get(timeout=1) == "Cancelled"
Expand Down
2 changes: 1 addition & 1 deletion trio/_tests/verify_types_darwin.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
],
"exportedSymbolCounts": {
"withAmbiguousType": 0,
"withKnownType": 632,
"withKnownType": 631,
"withUnknownType": 0
},
"ignoreUnknownTypesFromImports": true,
Expand Down
2 changes: 1 addition & 1 deletion trio/_tests/verify_types_linux.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
],
"exportedSymbolCounts": {
"withAmbiguousType": 0,
"withKnownType": 629,
"withKnownType": 628,
"withUnknownType": 0
},
"ignoreUnknownTypesFromImports": true,
Expand Down
6 changes: 3 additions & 3 deletions trio/_tests/verify_types_windows.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"warningCount": 0
},
"typeCompleteness": {
"completenessScore": 0.9857594936708861,
"completenessScore": 1,
"diagnostics": [
{
"message": "No docstring found for function \"trio.lowlevel.current_iocp\"",
Expand Down Expand Up @@ -64,8 +64,8 @@
],
"exportedSymbolCounts": {
"withAmbiguousType": 0,
"withKnownType": 623,
"withUnknownType": 9
"withKnownType": 631,
"withUnknownType": 0
},
"ignoreUnknownTypesFromImports": true,
"missingClassDocStringCount": 1,
Expand Down
Loading
Loading