Skip to content

Commit

Permalink
Fix incorrect termination of select_with_strategy streams (#2635)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored and taiki-e committed Aug 29, 2022
1 parent 7bff2be commit 5af1f52
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 3 deletions.
11 changes: 8 additions & 3 deletions futures-util/src/stream/select_with_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,18 +231,23 @@ where
St1: Stream,
St2: Stream<Item = St1::Item>,
{
match poll_side(select, side, cx) {
let first_done = match poll_side(select, side, cx) {
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
Poll::Ready(None) => {
select.internal_state.finish(side);
true
}
Poll::Pending => (),
Poll::Pending => false,
};
let other = side.other();
match poll_side(select, other, cx) {
Poll::Ready(None) => {
select.internal_state.finish(other);
Poll::Ready(None)
if first_done {
Poll::Ready(None)
} else {
Poll::Pending
}
}
a => a,
}
Expand Down
42 changes: 42 additions & 0 deletions futures/tests/stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::cell::Cell;
use std::iter;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::task::Context;

use futures::channel::mpsc;
use futures::executor::block_on;
Expand All @@ -9,6 +13,7 @@ use futures::sink::SinkExt;
use futures::stream::{self, StreamExt};
use futures::task::Poll;
use futures::{ready, FutureExt};
use futures_core::Stream;
use futures_test::task::noop_context;

#[test]
Expand Down Expand Up @@ -419,3 +424,40 @@ fn ready_chunks() {
assert_eq!(s.next().await.unwrap(), vec![4]);
});
}

struct SlowStream {
times_should_poll: usize,
times_polled: Rc<Cell<usize>>,
}
impl Stream for SlowStream {
type Item = usize;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.times_polled.set(self.times_polled.get() + 1);
if self.times_polled.get() % 2 == 0 {
cx.waker().wake_by_ref();
return Poll::Pending;
}
if self.times_polled.get() >= self.times_should_poll {
return Poll::Ready(None);
}
Poll::Ready(Some(self.times_polled.get()))
}
}

#[test]
fn select_with_strategy_doesnt_terminate_early() {
for side in [stream::PollNext::Left, stream::PollNext::Right] {
let times_should_poll = 10;
let count = Rc::new(Cell::new(0));
let b = stream::iter([10, 20]);

let mut selected = stream::select_with_strategy(
SlowStream { times_should_poll, times_polled: count.clone() },
b,
|_: &mut ()| side,
);
block_on(async move { while selected.next().await.is_some() {} });
assert_eq!(count.get(), times_should_poll + 1);
}
}

0 comments on commit 5af1f52

Please sign in to comment.