Skip to content

Commit

Permalink
tweak heuristic
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Feb 6, 2022
1 parent dd86ee1 commit b6afb23
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 5 deletions.
5 changes: 3 additions & 2 deletions futures-util/src/stream/futures_unordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
// Keep track of how many child futures we have polled,
// in case we want to forcibly yield.
let mut polled = 0;
let mut yielded = 0;

// Ensure `parent` is correctly set.
self.ready_to_run_queue.waker.register(cx.waker());
Expand Down Expand Up @@ -519,15 +520,15 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
let task = bomb.task.take().unwrap();
// If the future was awoken during polling, we assume
// the future wanted to explicitly yield.
let yielded = task.woken.load(Relaxed);
yielded += task.woken.load(Relaxed) as usize;
bomb.queue.link(task);

// If a future yields, we respect it and yield here.
// If all futures have been polled, we also yield here to
// avoid starving other tasks waiting on the executor.
// (polling the same future twice per iteration may cause
// the problem: https://github.com/rust-lang/futures-rs/pull/2333)
if yielded || polled == len {
if yielded >= 2 || polled == len {
cx.waker().wake_by_ref();
return Poll::Pending;
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/futures_unordered/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub(super) struct Task<Fut> {
// Whether or not this task is currently in the ready to run queue
pub(super) queued: AtomicBool,

// Whether the future waken before it finishes polling
// Whether the future was awoken during polling
// It is possible for this flag to be set to true after the polling,
// but it will be ignored.
pub(super) woken: AtomicBool,
Expand Down
2 changes: 0 additions & 2 deletions futures/tests/stream_futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,6 @@ fn futures_not_moved_after_poll() {
let fut = future::ready(()).pending_once().assert_unmoved();
let mut stream = vec![fut; 3].into_iter().collect::<FuturesUnordered<_>>();
assert_stream_pending!(stream);
assert_stream_pending!(stream);
assert_stream_pending!(stream);
assert_stream_next!(stream, ());
assert_stream_next!(stream, ());
assert_stream_next!(stream, ());
Expand Down

0 comments on commit b6afb23

Please sign in to comment.