Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TryStreamExt::try_buffered #2245

Merged
merged 3 commits into from
Nov 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion futures-util/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub use self::try_stream::IntoAsyncRead;

#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
pub use self::try_stream::{TryBufferUnordered, TryForEachConcurrent};
pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent};

// Primitive streams

Expand Down
82 changes: 81 additions & 1 deletion futures-util/src/stream/try_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ cfg_target_has_atomic! {
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_buffer_unordered::TryBufferUnordered;

#[cfg(feature = "alloc")]
mod try_buffered;
#[cfg(feature = "alloc")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_buffered::TryBuffered;

#[cfg(feature = "alloc")]
mod try_for_each_concurrent;
#[cfg(feature = "alloc")]
Expand Down Expand Up @@ -773,7 +779,7 @@ pub trait TryStreamExt: TryStream {
assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self))
}

/// Attempt to execute several futures from a stream concurrently.
/// Attempt to execute several futures from a stream concurrently (unordered).
///
/// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
/// that matches the stream's `Error` type.
Expand Down Expand Up @@ -842,6 +848,80 @@ pub trait TryStreamExt: TryStream {
)
}

/// Attempt to execute several futures from a stream concurrently.
///
/// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
/// that matches the stream's `Error` type.
///
/// This adaptor will buffer up to `n` futures and then return their
/// outputs in the order. If the underlying stream returns an error, it will
/// be immediately propagated.
///
/// The returned stream will be a stream of results, each containing either
/// an error or a future's output. An error can be produced either by the
/// underlying stream itself or by one of the futures it yielded.
///
/// This method is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
///
/// # Examples
///
/// Results are returned in the order of addition:
/// ```
/// # futures::executor::block_on(async {
/// use futures::channel::oneshot;
/// use futures::future::lazy;
/// use futures::stream::{self, StreamExt, TryStreamExt};
///
/// let (send_one, recv_one) = oneshot::channel();
/// let (send_two, recv_two) = oneshot::channel();
///
/// let mut buffered = lazy(move |cx| {
/// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
///
/// let mut buffered = stream_of_futures.try_buffered(10);
///
/// assert!(buffered.try_poll_next_unpin(cx).is_pending());
///
/// send_two.send(2i32)?;
/// assert!(buffered.try_poll_next_unpin(cx).is_pending());
/// Ok::<_, i32>(buffered)
/// }).await?;
///
/// send_one.send(1i32)?;
/// assert_eq!(buffered.next().await, Some(Ok(1i32)));
/// assert_eq!(buffered.next().await, Some(Ok(2i32)));
///
/// assert_eq!(buffered.next().await, None);
/// # Ok::<(), i32>(()) }).unwrap();
/// ```
///
/// Errors from the underlying stream itself are propagated:
/// ```
/// # futures::executor::block_on(async {
/// use futures::channel::mpsc;
/// use futures::stream::{StreamExt, TryStreamExt};
///
/// let (sink, stream_of_futures) = mpsc::unbounded();
/// let mut buffered = stream_of_futures.try_buffered(10);
///
/// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
/// assert_eq!(buffered.next().await, Some(Ok(7i32)));
///
/// sink.unbounded_send(Err("error in the stream"))?;
/// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
fn try_buffered(self, n: usize) -> TryBuffered<Self>
where
Self::Ok: TryFuture<Error = Self::Error>,
Self: Sized,
{
TryBuffered::new(self, n)
}

// TODO: false positive warning from rustdoc. Verify once #43466 settles
//
/// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`]
Expand Down
89 changes: 89 additions & 0 deletions futures-util/src/stream/try_stream/try_buffered.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use crate::stream::{Fuse, FuturesOrdered, StreamExt, IntoStream};
use crate::future::{IntoFuture, TryFutureExt};
use futures_core::future::TryFuture;
use futures_core::stream::{Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
use pin_project::pin_project;
use core::pin::Pin;

/// Stream for the [`try_buffered`](super::TryStreamExt::try_buffered) method.
#[pin_project]
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct TryBuffered<St>
where
St: TryStream,
St::Ok: TryFuture,
{
#[pin]
stream: Fuse<IntoStream<St>>,
in_progress_queue: FuturesOrdered<IntoFuture<St::Ok>>,
max: usize,
}

impl<St> TryBuffered<St>
where
St: TryStream,
St::Ok: TryFuture,
{
pub(super) fn new(stream: St, n: usize) -> TryBuffered<St> {
TryBuffered {
stream: IntoStream::new(stream).fuse(),
in_progress_queue: FuturesOrdered::new(),
max: n,
}
}

delegate_access_inner!(stream, St, (. .));
}

impl<St> Stream for TryBuffered<St>
where
St: TryStream,
St::Ok: TryFuture<Error = St::Error>,
{
type Item = Result<<St::Ok as TryFuture>::Ok, St::Error>;

fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut this = self.project();

// First up, try to spawn off as many futures as possible by filling up
// our queue of futures. Propagate errors from the stream immediately.
while this.in_progress_queue.len() < *this.max {
match this.stream.as_mut().poll_next(cx)? {
Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()),
Poll::Ready(None) | Poll::Pending => break,
}
}

// Attempt to pull the next value from the in_progress_queue
match this.in_progress_queue.poll_next_unpin(cx) {
x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
Poll::Ready(None) => {}
}

// If more values are still coming from the stream, we're not done yet
if this.stream.is_done() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item, E> Sink<Item> for TryBuffered<S>
where
S: TryStream + Sink<Item, Error = E>,
S::Ok: TryFuture<Error = E>,
{
type Error = E;

delegate_sink!(stream, Item);
}
2 changes: 1 addition & 1 deletion futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ pub mod stream {
#[cfg(feature = "alloc")]
pub use futures_util::stream::{
// For TryStreamExt:
TryBufferUnordered, TryForEachConcurrent,
TryBufferUnordered, TryBuffered, TryForEachConcurrent,
};

#[cfg(feature = "std")]
Expand Down