From a2ce3b9b17a0b187cb075ffa38da2111930e9f07 Mon Sep 17 00:00:00 2001 From: Kyle Huey Date: Sun, 1 Nov 2020 21:39:02 -0800 Subject: [PATCH 1/3] Add TryStreamExt::try_buffered. --- futures-util/src/stream/try_stream/mod.rs | 81 +++++++++++++++++ .../src/stream/try_stream/try_buffered.rs | 89 +++++++++++++++++++ 2 files changed, 170 insertions(+) create mode 100644 futures-util/src/stream/try_stream/try_buffered.rs diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 0c8e10840f..445002dbf0 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -114,6 +114,12 @@ cfg_target_has_atomic! { #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_buffer_unordered::TryBufferUnordered; + #[cfg(feature = "alloc")] + mod try_buffered; + #[cfg(feature = "alloc")] + #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 + pub use self::try_buffered::TryBuffered; + #[cfg(feature = "alloc")] mod try_for_each_concurrent; #[cfg(feature = "alloc")] @@ -842,6 +848,81 @@ pub trait TryStreamExt: TryStream { ) } + /// Attempt to execute several futures from a stream concurrently. + /// + /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type + /// that matches the stream's `Error` type. + /// + /// This adaptor will buffer up to `n` futures and then return their + /// outputs in the order. If the underlying stream returns an error, it will + /// be immediately propagated. + /// + /// The returned stream will be a stream of results, each containing either + /// an error or a future's output. An error can be produced either by the + /// underlying stream itself or by one of the futures it yielded. + /// + /// This method is only available when the `std` or `alloc` feature of this + /// library is activated, and it is activated by default. + /// + /// # Examples + /// + /// Results are returned in the order of addition: + /// ``` + /// # futures::executor::block_on(async { + /// use futures::channel::oneshot; + /// use futures::future::lazy; + /// use futures::stream::{self, StreamExt, TryStreamExt}; + /// use futures::task::Poll; + /// + /// let (send_one, recv_one) = oneshot::channel(); + /// let (send_two, recv_two) = oneshot::channel(); + /// + /// let mut buffered = lazy(move |cx| { + /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]); + /// + /// let mut buffered = stream_of_futures.try_buffered(10); + /// + /// assert_eq!(buffered.try_poll_next_unpin(cx), Poll::Pending); + /// + /// send_two.send(2i32)?; + /// assert_eq!(buffered.try_poll_next_unpin(cx), Poll::Pending); + /// Ok::<_, i32>(buffered) + /// }).await?; + /// + /// send_one.send(1i32)?; + /// assert_eq!(buffered.next().await, Some(Ok(1i32))); + /// assert_eq!(buffered.next().await, Some(Ok(2i32))); + /// + /// assert_eq!(buffered.next().await, None); + /// # Ok::<(), i32>(()) }).unwrap(); + /// ``` + /// + /// Errors from the underlying stream itself are propagated: + /// ``` + /// # futures::executor::block_on(async { + /// use futures::channel::mpsc; + /// use futures::stream::{StreamExt, TryStreamExt}; + /// + /// let (sink, stream_of_futures) = mpsc::unbounded(); + /// let mut buffered = stream_of_futures.try_buffered(10); + /// + /// sink.unbounded_send(Ok(async { Ok(7i32) }))?; + /// assert_eq!(buffered.next().await, Some(Ok(7i32))); + /// + /// sink.unbounded_send(Err("error in the stream"))?; + /// assert_eq!(buffered.next().await, Some(Err("error in the stream"))); + /// # Ok::<(), Box>(()) }).unwrap(); + /// ``` + #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] + #[cfg(feature = "alloc")] + fn try_buffered(self, n: usize) -> TryBuffered + where + Self::Ok: TryFuture, + Self: Sized, + { + TryBuffered::new(self, n) + } + // TODO: false positive warning from rustdoc. Verify once #43466 settles // /// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`] diff --git a/futures-util/src/stream/try_stream/try_buffered.rs b/futures-util/src/stream/try_stream/try_buffered.rs new file mode 100644 index 0000000000..5e35b45cde --- /dev/null +++ b/futures-util/src/stream/try_stream/try_buffered.rs @@ -0,0 +1,89 @@ +use crate::stream::{Fuse, FuturesOrdered, StreamExt, IntoStream}; +use crate::future::{IntoFuture, TryFutureExt}; +use futures_core::future::TryFuture; +use futures_core::stream::{Stream, TryStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use pin_project::pin_project; +use core::pin::Pin; + +/// Stream for the [`try_buffered`](super::TryStreamExt::try_buffered) method. +#[pin_project] +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct TryBuffered +where + St: TryStream, + St::Ok: TryFuture, +{ + #[pin] + stream: Fuse>, + in_progress_queue: FuturesOrdered>, + max: usize, +} + +impl TryBuffered +where + St: TryStream, + St::Ok: TryFuture, +{ + pub(super) fn new(stream: St, n: usize) -> TryBuffered { + TryBuffered { + stream: IntoStream::new(stream).fuse(), + in_progress_queue: FuturesOrdered::new(), + max: n, + } + } + + delegate_access_inner!(stream, St, (. .)); +} + +impl Stream for TryBuffered +where + St: TryStream, + St::Ok: TryFuture, +{ + type Item = Result<::Ok, St::Error>; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let mut this = self.project(); + + // First up, try to spawn off as many futures as possible by filling up + // our queue of futures. Propagate errors from the stream immediately. + while this.in_progress_queue.len() < *this.max { + match this.stream.as_mut().poll_next(cx)? { + Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()), + Poll::Ready(None) | Poll::Pending => break, + } + } + + // Attempt to pull the next value from the in_progress_queue + match this.in_progress_queue.poll_next_unpin(cx) { + x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x, + Poll::Ready(None) => {} + } + + // If more values are still coming from the stream, we're not done yet + if this.stream.is_done() { + Poll::Ready(None) + } else { + Poll::Pending + } + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl Sink for TryBuffered +where + S: TryStream + Sink, + S::Ok: TryFuture, +{ + type Error = E; + + delegate_sink!(stream, Item); +} From 7c99af94be5cf7bcb00feb844f728fbd58714188 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Mon, 2 Nov 2020 21:28:05 +0900 Subject: [PATCH 2/3] Tweak docs --- futures-util/src/stream/try_stream/mod.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 445002dbf0..6a48a4c4b4 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -779,7 +779,7 @@ pub trait TryStreamExt: TryStream { assert_future::, _>(TryConcat::new(self)) } - /// Attempt to execute several futures from a stream concurrently. + /// Attempt to execute several futures from a stream concurrently (unordered). /// /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type /// that matches the stream's `Error` type. @@ -872,7 +872,6 @@ pub trait TryStreamExt: TryStream { /// use futures::channel::oneshot; /// use futures::future::lazy; /// use futures::stream::{self, StreamExt, TryStreamExt}; - /// use futures::task::Poll; /// /// let (send_one, recv_one) = oneshot::channel(); /// let (send_two, recv_two) = oneshot::channel(); @@ -882,10 +881,10 @@ pub trait TryStreamExt: TryStream { /// /// let mut buffered = stream_of_futures.try_buffered(10); /// - /// assert_eq!(buffered.try_poll_next_unpin(cx), Poll::Pending); + /// assert!(buffered.try_poll_next_unpin(cx).is_pending()); /// /// send_two.send(2i32)?; - /// assert_eq!(buffered.try_poll_next_unpin(cx), Poll::Pending); + /// assert!(buffered.try_poll_next_unpin(cx).is_pending()); /// Ok::<_, i32>(buffered) /// }).await?; /// From 886681583f0c7935e7202a4b55bafbb1b1af63d1 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Mon, 2 Nov 2020 21:29:12 +0900 Subject: [PATCH 3/3] Re-export TryBuffered --- futures-util/src/stream/mod.rs | 2 +- futures/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index ca9bc89ace..164e54afc8 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -55,7 +55,7 @@ pub use self::try_stream::IntoAsyncRead; #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] -pub use self::try_stream::{TryBufferUnordered, TryForEachConcurrent}; +pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent}; // Primitive streams diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 3f04b7dbb8..fad70b8743 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -498,7 +498,7 @@ pub mod stream { #[cfg(feature = "alloc")] pub use futures_util::stream::{ // For TryStreamExt: - TryBufferUnordered, TryForEachConcurrent, + TryBufferUnordered, TryBuffered, TryForEachConcurrent, }; #[cfg(feature = "std")]