From d3747e688a0356a03a45ad7ea9bec95e6381ad9c Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 18 Dec 2021 23:56:35 +0900 Subject: [PATCH] FuturesUnordered: Limit max value of yield_every --- .../src/stream/futures_unordered/mod.rs | 45 ++++++++++++------- futures/tests/stream_futures_unordered.rs | 2 +- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 6918a26b91..572bf6124d 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -6,6 +6,7 @@ use crate::task::AtomicWaker; use alloc::sync::{Arc, Weak}; use core::cell::UnsafeCell; +use core::cmp; use core::fmt::{self, Debug}; use core::iter::FromIterator; use core::marker::PhantomData; @@ -30,6 +31,33 @@ use self::task::Task; mod ready_to_run_queue; use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue}; +/// Constant used for a `FuturesUnordered` to determine how many times it is +/// allowed to poll underlying futures without yielding. +/// +/// A single call to `poll_next` may potentially do a lot of work before +/// yielding. This happens in particular if the underlying futures are awoken +/// frequently but continue to return `Pending`. This is problematic if other +/// tasks are waiting on the executor, since they do not get to run. This value +/// caps the number of calls to `poll` on underlying futures a single call to +/// `poll_next` is allowed to make. +/// +/// The value itself is chosen somewhat arbitrarily. It needs to be high enough +/// that amortize wakeup and scheduling costs, but low enough that we do not +/// starve other tasks for long. +/// +/// See also https://github.com/rust-lang/futures-rs/issues/2047. +/// +/// Note that using the length of the `FuturesUnordered` instead of this value +/// may cause problems if the number of futures is large. +/// See also https://github.com/rust-lang/futures-rs/pull/2527. +/// +/// Additionally, polling the same future twice per iteration may cause another +/// problem. So, when using this value, it is necessary to limit the max value +/// based on the length of the `FuturesUnordered`. +/// (e.g., `cmp::min(self.len(), YIELD_EVERY)`) +/// See also https://github.com/rust-lang/futures-rs/pull/2333. +const YIELD_EVERY: usize = 32; + /// A set of futures which may complete in any order. /// /// This structure is optimized to manage a large number of futures. @@ -383,21 +411,8 @@ impl Stream for FuturesUnordered { type Item = Fut::Output; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // Variable to determine how many times it is allowed to poll underlying - // futures without yielding. - // - // A single call to `poll_next` may potentially do a lot of work before - // yielding. This happens in particular if the underlying futures are awoken - // frequently but continue to return `Pending`. This is problematic if other - // tasks are waiting on the executor, since they do not get to run. This value - // caps the number of calls to `poll` on underlying futures a single call to - // `poll_next` is allowed to make. - // - // The value is the length of FuturesUnordered. This ensures that each - // future is polled only once at most per iteration. - // - // See also https://github.com/rust-lang/futures-rs/issues/2047. - let yield_every = self.len(); + // See YIELD_EVERY docs for more. + let yield_every = cmp::min(self.len(), YIELD_EVERY); // Keep track of how many child futures we have polled, // in case we want to forcibly yield. diff --git a/futures/tests/stream_futures_unordered.rs b/futures/tests/stream_futures_unordered.rs index 4d18589714..d83e129f9c 100644 --- a/futures/tests/stream_futures_unordered.rs +++ b/futures/tests/stream_futures_unordered.rs @@ -340,7 +340,7 @@ fn polled_only_once_at_most_per_iteration() { let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]); assert!(tasks.poll_next_unpin(cx).is_pending()); - assert_eq!(33, tasks.iter().filter(|f| f.polled).count()); + assert_eq!(32, tasks.iter().filter(|f| f.polled).count()); let mut tasks = FuturesUnordered::::new(); assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));