Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FuturesUnordered: Limit max value of yield_every #2527

Merged
merged 1 commit into from
Dec 18, 2021

Conversation

taiki-e
Copy link
Member

@taiki-e taiki-e commented Nov 30, 2021

UPDATE(taiki-e): Note that this is not an approach we currently use; see #2551 for an approach we currently use.


Fixes #2526

The initial version of the #2333 did the same thing (tokio-rs/tokio#3493 (comment)), but at the time I thought it was unnecessary and removed it.

before

n: 10000, time: 315ms
n: 20000, time: 1235ms
n: 40000, time: 4753ms
n: 80000, time: 18912ms
(timeout)

after

n: 10000, time: 29ms
n: 20000, time: 55ms
n: 40000, time: 104ms
n: 80000, time: 202ms
n: 160000, time: 392ms

A little slower than the version using unconstrained, but a little faster than the version using tokio::spawn, on my machine.

with unconstrained

n: 10000, time: 27ms
n: 20000, time: 51ms
n: 40000, time: 93ms
n: 80000, time: 179ms
n: 160000, time: 361ms

with tokio::spawn

n: 10000, time: 50ms
n: 20000, time: 100ms
n: 40000, time: 168ms
n: 80000, time: 340ms
n: 160000, time: 741ms

fyi @jonhoo

@taiki-e taiki-e added A-stream Area: futures::stream 0.3-backport: pending The maintainer accepted to backport this to the 0.3 branch, but backport has not been done yet. labels Nov 30, 2021
@taiki-e taiki-e force-pushed the taiki-e/futures_unordered branch from 9afc0f3 to ebf3681 Compare November 30, 2021 13:15
@psarna
Copy link

psarna commented Nov 30, 2021

Sounds very reasonable to have a constant cap on the number of polls in order to avoid quadratic behavior in Tokio (and I guess also other runtimes that implement preemption). Thanks! Crossing my fingers that it get accepted and backported.

@pkolaczk
Copy link

pkolaczk commented Dec 1, 2021

Is there any branch with changes backported to 0.3? I tried to use it but I couldn't make it work with tokio IntervalStream:

error[E0599]: the method `map` exists for struct `IntervalStream`, but its trait bounds were not satisfied
  --> src/main.rs:90:22
   |
90 |                     .map(|_| deadline.next())
   |                      ^^^ method cannot be called on `IntervalStream` due to unsatisfied trait bounds
   |
  ::: /home/pkolaczk/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-stream-0.1.8/src/wrappers/interval.rs:12:1
   |
12 | pub struct IntervalStream {
   | -------------------------
   | |
   | doesn't satisfy `IntervalStream: FutureExt`
   | doesn't satisfy `IntervalStream: futures::Future`
   | doesn't satisfy `IntervalStream: futures::StreamExt`
   | doesn't satisfy `IntervalStream: futures::Stream`
   | doesn't satisfy `IntervalStream: std::iter::Iterator`
   |
   = note: the following trait bounds were not satisfied:
           `IntervalStream: futures::Future`
           which is required by `IntervalStream: FutureExt`
           `IntervalStream: futures::Stream`
           which is required by `IntervalStream: futures::StreamExt`
           `&IntervalStream: futures::Future`
           which is required by `&IntervalStream: FutureExt`
           `&IntervalStream: futures::Stream`
           which is required by `&IntervalStream: futures::StreamExt`
           `&mut IntervalStream: futures::Future`
           which is required by `&mut IntervalStream: FutureExt`
           `&mut IntervalStream: futures::Stream`
           which is required by `&mut IntervalStream: futures::StreamExt`
           `IntervalStream: std::iter::Iterator`
           which is required by `&mut IntervalStream: std::iter::Iterator`
   = help: items from traits can only be used if the trait is in scope
help: the following traits are implemented but not in scope; perhaps add a `use` for one of them:
   |
1  | use std::iter::Iterator;
   |
1  | use futures::FutureExt;
   |
1  | use futures::StreamExt;
   |
1  | use futures_util::future::future::FutureExt;
   |
     and 5 other candidates

I do have use futures::StreamExt in my code...

Copy link
Contributor

@jonhoo jonhoo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd personally make this a const the same way it was in the pre-#2333 implementation, and include a bit more docs (you can probably copy-paste that too from the old code), but otherwise 👍 for this change.

@taiki-e taiki-e force-pushed the taiki-e/futures_unordered branch from ebf3681 to d3747e6 Compare December 18, 2021 14:56
@taiki-e
Copy link
Member Author

taiki-e commented Dec 18, 2021

@jonhoo Thanks for the review! I've re-added the constant along with docs that was removed in #2333, and added docs about #2333 and this issue.

@taiki-e taiki-e merged commit afff1ce into master Dec 18, 2021
@taiki-e taiki-e deleted the taiki-e/futures_unordered branch December 18, 2021 15:06
This was referenced Dec 18, 2021
@taiki-e taiki-e added 0.3-backport: completed and removed 0.3-backport: pending The maintainer accepted to backport this to the 0.3 branch, but backport has not been done yet. labels Dec 18, 2021
@taiki-e
Copy link
Member Author

taiki-e commented Dec 18, 2021

Published in 0.3.19.

@@ -340,7 +340,7 @@ fn polled_only_once_at_most_per_iteration() {

let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]);
assert!(tasks.poll_next_unpin(cx).is_pending());
assert_eq!(33, tasks.iter().filter(|f| f.polled).count());
assert_eq!(32, tasks.iter().filter(|f| f.polled).count());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this test be adjusted to ensure that eventually all of the contained futures are polled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of the test is to make sure that the same future is not polled multiple times in a single poll_next call (#2333), so I think it's fine as is.

fn polled_only_once_at_most_per_iteration() {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Quadratic complexity in FuturesUnordered
5 participants