Skip to content

Commit

Permalink
Fix bug in FuturesOrdered::push_front (#2664)
Browse files Browse the repository at this point in the history
  • Loading branch information
conradludgate authored and taiki-e committed Jan 30, 2023
1 parent 63f2ff2 commit e6ea210
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 10 deletions.
16 changes: 6 additions & 10 deletions futures-util/src/stream/futures_ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pin_project! {
struct OrderWrapper<T> {
#[pin]
data: T, // A future or a future's output
index: usize,
index: isize,
}
}

Expand Down Expand Up @@ -95,8 +95,8 @@ where
pub struct FuturesOrdered<T: Future> {
in_progress_queue: FuturesUnordered<OrderWrapper<T>>,
queued_outputs: BinaryHeap<OrderWrapper<T::Output>>,
next_incoming_index: usize,
next_outgoing_index: usize,
next_incoming_index: isize,
next_outgoing_index: isize,
}

impl<T: Future> Unpin for FuturesOrdered<T> {}
Expand Down Expand Up @@ -160,13 +160,9 @@ impl<Fut: Future> FuturesOrdered<Fut> {
/// task notifications. This future will be the next future to be returned
/// complete.
pub fn push_front(&mut self, future: Fut) {
if self.next_outgoing_index == 0 {
self.push_back(future)
} else {
let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 };
self.next_outgoing_index -= 1;
self.in_progress_queue.push(wrapped);
}
let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 };
self.next_outgoing_index -= 1;
self.in_progress_queue.push(wrapped);
}
}

Expand Down
24 changes: 24 additions & 0 deletions futures/tests/stream_futures_ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,27 @@ fn queue_never_unblocked() {
assert!(stream.poll_next_unpin(cx).is_pending());
assert!(stream.poll_next_unpin(cx).is_pending());
}

#[test]
fn test_push_front_negative() {
let (a_tx, a_rx) = oneshot::channel::<i32>();
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();

let mut stream = FuturesOrdered::new();

let mut cx = noop_context();

stream.push_front(a_rx);
stream.push_front(b_rx);
stream.push_front(c_rx);

a_tx.send(1).unwrap();
b_tx.send(2).unwrap();
c_tx.send(3).unwrap();

// These should all be recieved in reverse order
assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
}

0 comments on commit e6ea210

Please sign in to comment.