From f9825c121356fa97a8f01b799e119e29b338cca0 Mon Sep 17 00:00:00 2001 From: Dzenan Jupic <56133904+DzenanJupic@users.noreply.github.com> Date: Wed, 5 Jul 2023 13:11:08 +0200 Subject: [PATCH] Implemented `TryStreamExt::try_ready_chunks` --- futures-util/src/stream/mod.rs | 2 +- futures-util/src/stream/try_stream/mod.rs | 55 ++++++++ .../src/stream/try_stream/try_ready_chunks.rs | 126 ++++++++++++++++++ 3 files changed, 182 insertions(+), 1 deletion(-) create mode 100644 futures-util/src/stream/try_stream/try_ready_chunks.rs diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 5a1f766aaa..08f0649273 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -71,7 +71,7 @@ pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryFlattenUnordered} pub use self::try_stream::TryForward; #[cfg(feature = "alloc")] -pub use self::try_stream::{TryChunks, TryChunksError}; +pub use self::try_stream::{TryChunks, TryChunksError, TryReadyChunks, TryReadyChunksError}; // Primitive streams diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 4f83ead474..fc547237c5 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -121,6 +121,12 @@ mod try_chunks; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_chunks::{TryChunks, TryChunksError}; +#[cfg(feature = "alloc")] +mod try_ready_chunks; +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_ready_chunks::{TryReadyChunks, TryReadyChunksError}; + mod try_unfold; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_unfold::{try_unfold, TryUnfold}; @@ -557,6 +563,55 @@ pub trait TryStreamExt: TryStream { ) } + /// An adaptor for chunking up successful, ready items of the stream inside a vector. + /// + /// This combinator will attempt to pull successful items from this stream and buffer + /// them into a local vector. At most `capacity` items will get buffered + /// before they're yielded from the returned stream. If the underlying stream + /// returns `Poll::Pending`, and the collected chunk is not empty, it will + /// be immidiatly returned. + /// + /// Note that the vectors returned from this iterator may not always have + /// `capacity` elements. If the underlying stream ended and only a partial + /// vector was created, it'll be returned. Additionally if an error happens + /// from the underlying stream then the currently buffered items will be + /// yielded. + /// + /// This method is only available when the `std` or `alloc` feature of this + /// library is activated, and it is activated by default. + /// + /// This function is similar to + /// [`StreamExt::ready_chunks`](crate::stream::StreamExt::ready_chunks) but exits + /// early if an error occurs. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, TryReadyChunksError, TryStreamExt}; + /// + /// let stream = stream::iter(vec![Ok::(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]); + /// let mut stream = stream.try_ready_chunks(2); + /// + /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2]))); + /// assert_eq!(stream.try_next().await, Err(TryReadyChunksError(vec![3], 4))); + /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6]))); + /// # }) + /// ``` + /// + /// # Panics + /// + /// This method will panic if `capacity` is zero. + #[cfg(feature = "alloc")] + fn try_ready_chunks(self, capacity: usize) -> TryReadyChunks + where + Self: Sized, + { + assert_stream::, TryReadyChunksError>, _>( + TryReadyChunks::new(self, capacity), + ) + } + /// Attempt to filter the values produced by this stream according to the /// provided asynchronous closure. /// diff --git a/futures-util/src/stream/try_stream/try_ready_chunks.rs b/futures-util/src/stream/try_stream/try_ready_chunks.rs new file mode 100644 index 0000000000..8b1470ea26 --- /dev/null +++ b/futures-util/src/stream/try_stream/try_ready_chunks.rs @@ -0,0 +1,126 @@ +use crate::stream::{Fuse, IntoStream, StreamExt}; + +use alloc::vec::Vec; +use core::fmt; +use core::pin::Pin; +use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use pin_project_lite::pin_project; + +pin_project! { + /// Stream for the [`try_ready_chunks`](super::TryStreamExt::try_ready_chunks) method. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct TryReadyChunks { + #[pin] + stream: Fuse>, + cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 + } +} + +impl TryReadyChunks { + pub(super) fn new(stream: St, capacity: usize) -> Self { + assert!(capacity > 0); + + Self { stream: IntoStream::new(stream).fuse(), cap: capacity } + } + + delegate_access_inner!(stream, St, (. .)); +} + +type TryReadyChunksStreamError = + TryReadyChunksError<::Ok, ::Error>; + +impl Stream for TryReadyChunks { + type Item = Result, TryReadyChunksStreamError>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + + let mut items: Vec = Vec::new(); + + loop { + match this.stream.as_mut().poll_next(cx) { + // Flush all the collected data if the underlying stream doesn't + // contain more ready values + Poll::Pending => { + return if items.is_empty() { + Poll::Pending + } else { + Poll::Ready(Some(Ok(items))) + } + } + + // Push the ready item into the buffer and check whether it is full. + // If so, return the buffer. + Poll::Ready(Some(Ok(item))) => { + if items.is_empty() { + items.reserve_exact(*this.cap); + } + items.push(item); + if items.len() >= *this.cap { + return Poll::Ready(Some(Ok(items))); + } + } + + // Return the already collected items and the error. + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(Err(TryReadyChunksError(items, e)))); + } + + // Since the underlying stream ran out of values, return what we + // have buffered, if we have anything. + Poll::Ready(None) => { + let last = if items.is_empty() { None } else { Some(Ok(items)) }; + return Poll::Ready(last); + } + } + } + } + + fn size_hint(&self) -> (usize, Option) { + let (lower, upper) = self.stream.size_hint(); + let lower = lower / self.cap; + (lower, upper) + } +} + +impl FusedStream for TryReadyChunks { + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl Sink for TryReadyChunks +where + S: TryStream + Sink, +{ + type Error = >::Error; + + delegate_sink!(stream, Item); +} + +/// Error indicating, that while chunk was collected inner stream produced an error. +/// +/// Contains all items that were collected before an error occurred, and the stream error itself. +#[derive(PartialEq, Eq)] +pub struct TryReadyChunksError(pub Vec, pub E); + +impl fmt::Debug for TryReadyChunksError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.1.fmt(f) + } +} + +impl fmt::Display for TryReadyChunksError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.1.fmt(f) + } +} + +#[cfg(feature = "std")] +impl std::error::Error for TryReadyChunksError {}