-
-
Notifications
You must be signed in to change notification settings - Fork 343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add trio.testing.wait_all_threads_completed #2937
Add trio.testing.wait_all_threads_completed #2937
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #2937 +/- ##
=======================================
Coverage 99.64% 99.64%
=======================================
Files 116 116
Lines 17521 17591 +70
Branches 3151 3157 +6
=======================================
+ Hits 17459 17529 +70
Misses 43 43
Partials 19 19
|
This looks pretty good, but one part I worry about is the incrementing and decrementing thread count system. As it stands I do think it would work, but I would feel a lot better if it were using a context manager instead. from contextlib import contextmanager
from collections.abc import Generator
@contextmanager
def handling_thread() -> Generator[None, None, None]:
_increment_active_threads()
yield
_decrement_active_threads()
with handling_thread():
# do all the things
... Context managers are guaranteed to run their close methods, similar to a try finally block, but way nicer to work with. Edit: from typing import NamedTuple
class _ActiveThreadCount(NamedTuple):
count: int = 0
event: Event = Event() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also I'd like you to add tests so you get 100% patch coverage.
Don't have any particular insight or opinion on if this version could be an "attractive nuisance" or whatever. But I do like that it's exposed in testing.
src/trio/_threads.py
Outdated
_decrement_active_threads() | ||
return msg_from_thread.unwrap() | ||
elif isinstance(msg_from_thread, Run): | ||
await msg_from_thread.run() | ||
elif isinstance(msg_from_thread, RunSync): | ||
msg_from_thread.run_sync() | ||
else: # pragma: no cover, internal debugging guard TODO: use assert_never | ||
_decrement_active_threads() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking at this for a bit I agree with @CoolCat467 that the increment/decrement does seem unreliable. if await msg_from_thread.run()
or msg_from_thread.run_sync
raises exceptions it won't decrement. So either a contextmanager, or wrapping all the involved lines in to_thread_run_sync
in a try/finally, would probably be good.
src/trio/_threads.py
Outdated
raise TrioInternalError( | ||
"Tried to decrement active threads while _active_threads_local is unset" | ||
) from e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems fine to have a pragma: no cover
on - but think it'd be fairly straightforward to manipulate _active_threads_local
to force an error.
This is the equivalent of trio.testing.wait_all_tasks_blocked but for threads managed by trio. This is useful when writing tests that use to_thread
1faf1c8
to
9148414
Compare
I believe I have addressed all comments. The codecov fail seems to be spurious (the diff is in lines with only comments in a complete unrelated file if I understand it correctly). |
For future reference, force pushing makes it far more difficult to compare the previous version to new changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good but I have a question about a part of the reset logic
active_threads_local.event.set() | ||
active_threads_local.event = Event() | ||
|
||
|
||
async def wait_all_threads_completed() -> None: | ||
"""Wait until no threads are still running tasks. | ||
|
||
This is intended to be used when testing code with trio.to_thread to | ||
make sure no tasks are still making progress in a thread. See the | ||
following code for a usage example:: | ||
|
||
async def wait_all_settled(): | ||
while True: | ||
await trio.testing.wait_all_threads_complete() | ||
await trio.testing.wait_all_tasks_blocked() | ||
if trio.testing.active_thread_count() == 0: | ||
break | ||
""" | ||
|
||
await checkpoint() | ||
|
||
try: | ||
active_threads_local = _active_threads_local.get() | ||
except LookupError: | ||
# If there would have been active threads, the | ||
# _active_threads_local would have been set | ||
return | ||
|
||
while active_threads_local.count != 0: | ||
await active_threads_local.event.wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wanting to be sure, I know event wait call is doing what we want and will block until thread count is zero and it's set, but is there any way with race conditions that active_threads_local.event
is reset to a new lock before the wait
call sees that? Would it be beneficial to move the event resetting to before the active_threads_local.count += 1
line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooh. Yeah shouldn't it maybe be something like this:
try:
active_threads_local = _active_threads_local.get()
except LookupError:
active_threads_local = _ActiveThreadCount(1, Event())
_active_threads_local.set(active_threads_local)
else:
active_threads_local.count += 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, probably something like that and adding a part in the else block there where it resets the event object if the event it has has already fired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand where a race could arise here. This is all happening in the main thread so everything between awaits is effectively atomic.
I have similar code in a different project and it hasn't given me trouble... Although speaking of that, it would be good to assert active_threads_local.count >= 0
after decrementing it, because if a negative number sneaks in it'd be better to fail fast.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I don't think we have to care about thread safety here. Rather, the only times _active_threads_local
can be changed (that we worry about) is at an await
point.
After all to_thread_run_sync
can be only run from the main thread (that has your trio
event loop).
yeah the codecov can be weird sometimes, if there seems to be duplicates of codecov in the run list some may be stale and not getting updated / checking vs the wrong set of files / something like that. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't look at this too hard but the strategy makes sense to me.
active_threads_local.event.set() | ||
active_threads_local.event = Event() | ||
|
||
|
||
async def wait_all_threads_completed() -> None: | ||
"""Wait until no threads are still running tasks. | ||
|
||
This is intended to be used when testing code with trio.to_thread to | ||
make sure no tasks are still making progress in a thread. See the | ||
following code for a usage example:: | ||
|
||
async def wait_all_settled(): | ||
while True: | ||
await trio.testing.wait_all_threads_complete() | ||
await trio.testing.wait_all_tasks_blocked() | ||
if trio.testing.active_thread_count() == 0: | ||
break | ||
""" | ||
|
||
await checkpoint() | ||
|
||
try: | ||
active_threads_local = _active_threads_local.get() | ||
except LookupError: | ||
# If there would have been active threads, the | ||
# _active_threads_local would have been set | ||
return | ||
|
||
while active_threads_local.count != 0: | ||
await active_threads_local.event.wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I don't think we have to care about thread safety here. Rather, the only times _active_threads_local
can be changed (that we worry about) is at an await
point.
After all to_thread_run_sync
can be only run from the main thread (that has your trio
event loop).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't have to worry about thread safety I think this is great!
Alternative to #2880