Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FuturesUnordered: Limit max value of yield_every #2527

Merged
merged 1 commit into from
Dec 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions futures-util/src/stream/futures_unordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use crate::task::AtomicWaker;
use alloc::sync::{Arc, Weak};
use core::cell::UnsafeCell;
use core::cmp;
use core::fmt::{self, Debug};
use core::iter::FromIterator;
use core::marker::PhantomData;
Expand All @@ -30,6 +31,33 @@ use self::task::Task;
mod ready_to_run_queue;
use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue};

/// Constant used for a `FuturesUnordered` to determine how many times it is
/// allowed to poll underlying futures without yielding.
///
/// A single call to `poll_next` may potentially do a lot of work before
/// yielding. This happens in particular if the underlying futures are awoken
/// frequently but continue to return `Pending`. This is problematic if other
/// tasks are waiting on the executor, since they do not get to run. This value
/// caps the number of calls to `poll` on underlying futures a single call to
/// `poll_next` is allowed to make.
///
/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
/// that amortize wakeup and scheduling costs, but low enough that we do not
/// starve other tasks for long.
///
/// See also https://github.com/rust-lang/futures-rs/issues/2047.
///
/// Note that using the length of the `FuturesUnordered` instead of this value
/// may cause problems if the number of futures is large.
/// See also https://github.com/rust-lang/futures-rs/pull/2527.
///
/// Additionally, polling the same future twice per iteration may cause another
/// problem. So, when using this value, it is necessary to limit the max value
/// based on the length of the `FuturesUnordered`.
/// (e.g., `cmp::min(self.len(), YIELD_EVERY)`)
/// See also https://github.com/rust-lang/futures-rs/pull/2333.
const YIELD_EVERY: usize = 32;

/// A set of futures which may complete in any order.
///
/// This structure is optimized to manage a large number of futures.
Expand Down Expand Up @@ -383,21 +411,8 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
type Item = Fut::Output;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Variable to determine how many times it is allowed to poll underlying
// futures without yielding.
//
// A single call to `poll_next` may potentially do a lot of work before
// yielding. This happens in particular if the underlying futures are awoken
// frequently but continue to return `Pending`. This is problematic if other
// tasks are waiting on the executor, since they do not get to run. This value
// caps the number of calls to `poll` on underlying futures a single call to
// `poll_next` is allowed to make.
//
// The value is the length of FuturesUnordered. This ensures that each
// future is polled only once at most per iteration.
//
// See also https://github.com/rust-lang/futures-rs/issues/2047.
let yield_every = self.len();
// See YIELD_EVERY docs for more.
let yield_every = cmp::min(self.len(), YIELD_EVERY);

// Keep track of how many child futures we have polled,
// in case we want to forcibly yield.
Expand Down
2 changes: 1 addition & 1 deletion futures/tests/stream_futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ fn polled_only_once_at_most_per_iteration() {

let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]);
assert!(tasks.poll_next_unpin(cx).is_pending());
assert_eq!(33, tasks.iter().filter(|f| f.polled).count());
assert_eq!(32, tasks.iter().filter(|f| f.polled).count());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this test be adjusted to ensure that eventually all of the contained futures are polled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of the test is to make sure that the same future is not polled multiple times in a single poll_next call (#2333), so I think it's fine as is.

fn polled_only_once_at_most_per_iteration() {


let mut tasks = FuturesUnordered::<F>::new();
assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));
Expand Down