-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add JoinSet
#4335
Add JoinSet
#4335
Conversation
That was a previous iteration.
tokio/src/task/task_set.rs
Outdated
Some(Poll::Ready(Ok(res))) => { | ||
self.inner.with_entry_value(&entry, |jh| jh.take()); | ||
self.inner.remove(&entry); | ||
return Poll::Ready(Ok(Some(res))); | ||
} | ||
Some(Poll::Ready(Err(err))) => { | ||
self.inner.with_entry_value(&entry, |jh| jh.take()); | ||
self.inner.remove(&entry); | ||
return Poll::Ready(Err(err)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the code could be deduped here. I don't think we need to re-map it to Result<Option<T>, ...>
Option<Result<...>>
should be fine?
tokio/src/task/task_set.rs
Outdated
/// This method is cancel safe. If `join_one` is used as the event in a `tokio::select!` | ||
/// statement and some other branch completes first, it is guaranteed that no tasks were | ||
/// removed from this `TaskSet`. | ||
pub async fn join_one(&mut self) -> Result<Option<T>, JoinError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probably keep this as Option<Result<...>>
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it's a pain to use in a while let
loop if you do that though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am with Alice on this, Result is far more ergonomic for most expected use cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worse? It seems like returning Option
means you can loop until all tasks are finished, while as Result
makes it easier for one task error to blow up the loop.
while let Some(result) = set.join_one().await {
}
// or
while let Ok(Some(val)) = set.join_one().await {
// loop canceled on first `Err`?
}
The Option
does make it more annoying if you want to use ?
(like set.join_one().await?
) in the condition. I have an opinion otherwise about which is better or worse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was mostly thinking of the while let Some(result) = set.join_one().await?
use-case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I agree w/ @Darksonn but what is the precedence in std
? I'm not sure what the best way to search is. I can't think of any methods that returns Result<Option<_>, _>
and the only methods that return Option<Result<...>>
would be Iterator
, which doesn't really count.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think that std has precedence on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The impl looks fine to me. I had a minor API suggestion. I would also suggest we initially release this as an unstable API. We probably should document unstable APIs better though.
@Darksonn How would we add an API in the future for spawning tasks that don't abort when the set is dropped? |
Considering that task set is going to be used a lot for resource cleanup and management, we probably want something along the lines of a |
How should we handle panics in the sub-tasks when the user has called |
"clean shutdown" is a good call. What I would probably do is a call to This is a similar strategy as the |
I'm in agreement with Carl here. It would be best not to propagate the panics. |
I have reworked the I'm also adding a test for the coop budget as I realized that |
tokio/src/task/task_set.rs
Outdated
/// [`join_one`]: fn@Self::join_one | ||
pub async fn shutdown(&mut self) { | ||
self.abort_all(); | ||
while self.join_one().await.transpose().is_some() {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be possibly advantageous to bifurcate this operation, and have the function return impl future and be non-async. That way, we could abort the tasks, then return a future that awaits all tasks. This does add complexity though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Darksonn if we made this change in a future PR, would it be considered blocking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I don't think it would be considered a breaking change.
This seems about ready. I left a few questions and comments, but I don't think they block a merge here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I gave the IdleNotifiedSet
code a closer look, and it seems correct to me. I had some minor, non-blocking style suggestions; feel free to ignore any you disagree with.
tokio/src/util/idle_notified_set.rs
Outdated
{ | ||
let mut lock = self.lists.inner.lock(); | ||
lock.idle.push_front(entry.clone()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can't this just be
{ | |
let mut lock = self.lists.inner.lock(); | |
lock.idle.push_front(entry.clone()); | |
} | |
self.lists.inner.lock().idle.push_front(entry.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that I am not the only one afflicted by this lifetime/scoping paranoia. I tend to follow the same pattern even though it isn't necessary here I believe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, this looks great to me! thanks @Darksonn for addressing my various nitpicks. :)
Adds `JoinSet` for managing multiple spawned tasks and joining them in completion order. Closes: tokio-rs#3903
# 1.16.2 (February 15, 2022) This release updates the minimum supported Rust version (MSRV) to 1.49, the `mio` dependency to v0.8, and the (optional) `parking_lot` dependency to v0.12. Additionally, it contains several bug fixes, as well as internal refactoring and performance improvements. ### Fixed - time: prevent panicking in `sleep` with large durations ([#4495]) - time: eliminate potential panics in `Instant` arithmetic on platforms where `Instant::now` is not monotonic ([#4461]) - io: fix `DuplexStream` not participating in cooperative yielding ([#4478]) - rt: fix potential double panic when dropping a `JoinHandle` ([#4430]) ### Changed - update minimum supported Rust version to 1.49 ([#4457]) - update `parking_lot` dependency to v0.12.0 ([#4459]) - update `mio` dependency to v0.8 ([#4449]) - rt: remove an unnecessary lock in the blocking pool ([#4436]) - rt: remove an unnecessary enum in the basic scheduler ([#4462]) - time: use bit manipulation instead of modulo to improve performance ([#4480]) - net: use `std::future::Ready` instead of our own `Ready` future ([#4271]) - replace deprecated `atomic::spin_loop_hint` with `hint::spin_loop` ([#4491]) - fix miri failures in intrusive linked lists ([#4397]) ### Documented - io: add an example for `tokio::process::ChildStdin` ([#4479]) ### Unstable The following changes only apply when building with `--cfg tokio_unstable`: - task: fix missing location information in `tracing` spans generated by `spawn_local` ([#4483]) - task: add `JoinSet` for managing sets of tasks ([#4335]) - metrics: fix compilation error on MIPS ([#4475]) - metrics: fix compilation error on arm32v7 ([#4453]) [#4495]: #4495 [#4461]: #4461 [#4478]: #4478 [#4430]: #4430 [#4457]: #4457 [#4459]: #4459 [#4449]: #4449 [#4462]: #4462 [#4436]: #4436 [#4480]: #4480 [#4271]: #4271 [#4491]: #4491 [#4397]: #4397 [#4479]: #4479 [#4483]: #4483 [#4335]: #4335 [#4475]: #4475 [#4453]: #4453
# 1.17.0 (February 16, 2022) This release updates the minimum supported Rust version (MSRV) to 1.49, the `mio` dependency to v0.8, and the (optional) `parking_lot` dependency to v0.12. Additionally, it contains several bug fixes, as well as internal refactoring and performance improvements. ### Fixed - time: prevent panicking in `sleep` with large durations ([#4495]) - time: eliminate potential panics in `Instant` arithmetic on platforms where `Instant::now` is not monotonic ([#4461]) - io: fix `DuplexStream` not participating in cooperative yielding ([#4478]) - rt: fix potential double panic when dropping a `JoinHandle` ([#4430]) ### Changed - update minimum supported Rust version to 1.49 ([#4457]) - update `parking_lot` dependency to v0.12.0 ([#4459]) - update `mio` dependency to v0.8 ([#4449]) - rt: remove an unnecessary lock in the blocking pool ([#4436]) - rt: remove an unnecessary enum in the basic scheduler ([#4462]) - time: use bit manipulation instead of modulo to improve performance ([#4480]) - net: use `std::future::Ready` instead of our own `Ready` future ([#4271]) - replace deprecated `atomic::spin_loop_hint` with `hint::spin_loop` ([#4491]) - fix miri failures in intrusive linked lists ([#4397]) ### Documented - io: add an example for `tokio::process::ChildStdin` ([#4479]) ### Unstable The following changes only apply when building with `--cfg tokio_unstable`: - task: fix missing location information in `tracing` spans generated by `spawn_local` ([#4483]) - task: add `JoinSet` for managing sets of tasks ([#4335]) - metrics: fix compilation error on MIPS ([#4475]) - metrics: fix compilation error on arm32v7 ([#4453]) [#4495]: #4495 [#4461]: #4461 [#4478]: #4478 [#4430]: #4430 [#4457]: #4457 [#4459]: #4459 [#4449]: #4449 [#4462]: #4462 [#4436]: #4436 [#4480]: #4480 [#4271]: #4271 [#4491]: #4491 [#4397]: #4397 [#4479]: #4479 [#4483]: #4483 [#4335]: #4335 [#4475]: #4475 [#4453]: #4453
## Motivation PR #4538 adds a prototype implementation of a `JoinMap` API in `tokio::task`. In [this comment][1] on that PR, @carllerche pointed out that a much simpler `JoinMap` type could be implemented outside of `tokio` (either in `tokio-util` or in user code) if we just modified `JoinSet` to return a task ID type when spawning new tasks, and when tasks complete. This seems like a better approach for the following reasons: * A `JoinMap`-like type need not become a permanent part of `tokio`'s stable API * Task IDs seem like something that could be generally useful outside of a `JoinMap` implementation ## Solution This branch adds a `tokio::task::Id` type that uniquely identifies a task relative to all currently spawned tasks. The ID is internally represented as a `NonZeroUsize` that's based on the address of the task's header. I thought that it was better to use addresses to generate IDs than assigning sequential IDs to tasks, because a sequential ID would mean adding an additional usize field to the task data somewhere, making it a word bigger. I've added methods to `JoinHandle` and `AbortHandle` for returning a task's ID. In addition, I modified `JoinSet` to add a `join_with_id` method that behaves identically to `join_one` but also returns an ID. This can be used to implement a `JoinMap` type. Note that because `join_with_id` must return a task ID regardless of whether the task completes successfully or returns a `JoinError` (in order to ensure that dead tasks are always cleaned up from a map), it inverts the ordering of the `Option` and `Result` returned by `join_one` --- which we've bikeshedded about a bit [here][2]. This makes the method's return type inconsistent with the existing `join_one` method, which feels not great. As I see it, there are three possible solutions to this: * change the existing `join_one` method to also swap the `Option` and `Result` nesting for consistency. * change `join_with_id` to return `Result<Option<(Id, T)>, (Id, JoinError)>>`, which feels gross... * add a task ID to `JoinError` as well. [1]: #4538 (comment) [2]: #4335 (comment)
## Motivation PR #4538 adds a prototype implementation of a `JoinMap` API in `tokio::task`. In [this comment][1] on that PR, @carllerche pointed out that a much simpler `JoinMap` type could be implemented outside of `tokio` (either in `tokio-util` or in user code) if we just modified `JoinSet` to return a task ID type when spawning new tasks, and when tasks complete. This seems like a better approach for the following reasons: * A `JoinMap`-like type need not become a permanent part of `tokio`'s stable API * Task IDs seem like something that could be generally useful outside of a `JoinMap` implementation ## Solution This branch adds a `tokio::task::Id` type that uniquely identifies a task relative to all currently spawned tasks. The ID is internally represented as a `NonZeroUsize` that's based on the address of the task's header. I thought that it was better to use addresses to generate IDs than assigning sequential IDs to tasks, because a sequential ID would mean adding an additional usize field to the task data somewhere, making it a word bigger. I've added methods to `JoinHandle` and `AbortHandle` for returning a task's ID. In addition, I modified `JoinSet` to add a `join_with_id` method that behaves identically to `join_one` but also returns an ID. This can be used to implement a `JoinMap` type. Note that because `join_with_id` must return a task ID regardless of whether the task completes successfully or returns a `JoinError` (in order to ensure that dead tasks are always cleaned up from a map), it inverts the ordering of the `Option` and `Result` returned by `join_one` --- which we've bikeshedded about a bit [here][2]. This makes the method's return type inconsistent with the existing `join_one` method, which feels not great. As I see it, there are three possible solutions to this: * change the existing `join_one` method to also swap the `Option` and `Result` nesting for consistency. * change `join_with_id` to return `Result<Option<(Id, T)>, (Id, JoinError)>>`, which feels gross... * add a task ID to `JoinError` as well. [1]: #4538 (comment) [2]: #4335 (comment)
## Motivation PR #4538 adds a prototype implementation of a `JoinMap` API in `tokio::task`. In [this comment][1] on that PR, @carllerche pointed out that a much simpler `JoinMap` type could be implemented outside of `tokio` (either in `tokio-util` or in user code) if we just modified `JoinSet` to return a task ID type when spawning new tasks, and when tasks complete. This seems like a better approach for the following reasons: * A `JoinMap`-like type need not become a permanent part of `tokio`'s stable API * Task IDs seem like something that could be generally useful outside of a `JoinMap` implementation ## Solution This branch adds a `tokio::task::Id` type that uniquely identifies a task relative to all other spawned tasks. Task IDs are assigned sequentially based on an atomic `usize` counter of spawned tasks. In addition, I modified `JoinSet` to add a `join_with_id` method that behaves identically to `join_one` but also returns an ID. This can be used to implement a `JoinMap` type. Note that because `join_with_id` must return a task ID regardless of whether the task completes successfully or returns a `JoinError`, I've also changed `JoinError` to carry the ID of the task that errored, and added a `JoinError::id` method for accessing it. Alternatively, we could have done one of the following: * have `join_with_id` return `Option<(Id, Result<T, JoinError>)>`, which would be inconsistent with the return type of `join_one` (which we've [already bikeshedded over once][2]...) * have `join_with_id` return `Result<Option<(Id, T)>, (Id, JoinError)>>`, which just feels gross. I thought adding the task ID to `JoinError` was the nicest option, and is potentially useful for other stuff as well, so it's probably a good API to have anyway. [1]: #4538 (comment) [2]: #4335 (comment) Closes #4538 Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This PR adds a new
TaskSet
type that uses Tokio'slinked_list
utility to implement polling in a way that runs in constant time.Closes: #3903