Skip to content

Commit

Permalink
Add push_front and push_back to FuturesOrdered (#2591)
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros authored Jul 7, 2022
1 parent 2ebb94f commit 183f8c6
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 4 deletions.
32 changes: 30 additions & 2 deletions futures-util/src/stream/futures_ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,39 @@ impl<Fut: Future> FuturesOrdered<Fut> {
/// This function will not call `poll` on the submitted future. The caller
/// must ensure that `FuturesOrdered::poll` is called in order to receive
/// task notifications.
#[deprecated(note = "use `push_back` instead")]
pub fn push(&mut self, future: Fut) {
self.push_back(future);
}

/// Pushes a future to the back of the queue.
///
/// This function submits the given future to the internal set for managing.
/// This function will not call `poll` on the submitted future. The caller
/// must ensure that `FuturesOrdered::poll` is called in order to receive
/// task notifications.
pub fn push_back(&mut self, future: Fut) {
let wrapped = OrderWrapper { data: future, index: self.next_incoming_index };
self.next_incoming_index += 1;
self.in_progress_queue.push(wrapped);
}

/// Pushes a future to the front of the queue.
///
/// This function submits the given future to the internal set for managing.
/// This function will not call `poll` on the submitted future. The caller
/// must ensure that `FuturesOrdered::poll` is called in order to receive
/// task notifications. This future will be the next future to be returned
/// complete.
pub fn push_front(&mut self, future: Fut) {
if self.next_outgoing_index == 0 {
self.push_back(future)
} else {
let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 };
self.next_outgoing_index -= 1;
self.in_progress_queue.push(wrapped);
}
}
}

impl<Fut: Future> Default for FuturesOrdered<Fut> {
Expand Down Expand Up @@ -196,7 +224,7 @@ impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> {
{
let acc = Self::new();
iter.into_iter().fold(acc, |mut acc, item| {
acc.push(item);
acc.push_back(item);
acc
})
}
Expand All @@ -214,7 +242,7 @@ impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> {
I: IntoIterator<Item = Fut>,
{
for item in iter {
self.push(item);
self.push_back(item);
}
}
}
2 changes: 1 addition & 1 deletion futures-util/src/stream/stream/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ where
// our queue of futures.
while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) {
match this.stream.as_mut().poll_next(cx) {
Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut),
Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut),
Poll::Ready(None) | Poll::Pending => break,
}
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/try_stream/try_buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ where
// our queue of futures. Propagate errors from the stream immediately.
while this.max.map(|max| this.in_progress_queue.len() < max.get()).unwrap_or(true) {
match this.stream.as_mut().poll_next(cx)? {
Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()),
Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut.into_future()),
Poll::Ready(None) | Poll::Pending => break,
}
}
Expand Down
64 changes: 64 additions & 0 deletions futures/tests/stream_futures_ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use futures::channel::oneshot;
use futures::executor::{block_on, block_on_stream};
use futures::future::{self, join, Future, FutureExt, TryFutureExt};
use futures::stream::{FuturesOrdered, StreamExt};
use futures::task::Poll;
use futures_test::task::noop_context;
use std::any::Any;

Expand Down Expand Up @@ -45,6 +46,69 @@ fn works_2() {
assert!(stream.poll_next_unpin(&mut cx).is_ready());
}

#[test]
fn test_push_front() {
let (a_tx, a_rx) = oneshot::channel::<i32>();
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();
let (d_tx, d_rx) = oneshot::channel::<i32>();

let mut stream = FuturesOrdered::new();

let mut cx = noop_context();

stream.push_back(a_rx);
stream.push_back(b_rx);
stream.push_back(c_rx);

a_tx.send(1).unwrap();
b_tx.send(2).unwrap();
c_tx.send(3).unwrap();

// 1 and 2 should be received in order
assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));

stream.push_front(d_rx);
d_tx.send(4).unwrap();

// we pushed `d_rx` to the front and sent 4, so we should recieve 4 next
// and then 3 after it
assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx));
assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
}

#[test]
fn test_push_back() {
let (a_tx, a_rx) = oneshot::channel::<i32>();
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();
let (d_tx, d_rx) = oneshot::channel::<i32>();

let mut stream = FuturesOrdered::new();

let mut cx = noop_context();

stream.push_back(a_rx);
stream.push_back(b_rx);
stream.push_back(c_rx);

a_tx.send(1).unwrap();
b_tx.send(2).unwrap();
c_tx.send(3).unwrap();

// All results should be received in order

assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));

stream.push_back(d_rx);
d_tx.send(4).unwrap();

assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx));
}

#[test]
fn from_iterator() {
let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)]
Expand Down

0 comments on commit 183f8c6

Please sign in to comment.