Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TryStreamExt::try_ready_chunks as failable version of StreamExt::ready_chunks #2757

Merged
merged 1 commit into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion futures-util/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryFlattenUnordered}
pub use self::try_stream::TryForward;

#[cfg(feature = "alloc")]
pub use self::try_stream::{TryChunks, TryChunksError};
pub use self::try_stream::{TryChunks, TryChunksError, TryReadyChunks, TryReadyChunksError};

// Primitive streams

Expand Down
55 changes: 55 additions & 0 deletions futures-util/src/stream/try_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ mod try_chunks;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_chunks::{TryChunks, TryChunksError};

#[cfg(feature = "alloc")]
mod try_ready_chunks;
#[cfg(feature = "alloc")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_ready_chunks::{TryReadyChunks, TryReadyChunksError};

mod try_unfold;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_unfold::{try_unfold, TryUnfold};
Expand Down Expand Up @@ -557,6 +563,55 @@ pub trait TryStreamExt: TryStream {
)
}

/// An adaptor for chunking up successful, ready items of the stream inside a vector.
///
/// This combinator will attempt to pull successful items from this stream and buffer
/// them into a local vector. At most `capacity` items will get buffered
/// before they're yielded from the returned stream. If the underlying stream
/// returns `Poll::Pending`, and the collected chunk is not empty, it will
/// be immidiatly returned.
///
/// Note that the vectors returned from this iterator may not always have
/// `capacity` elements. If the underlying stream ended and only a partial
/// vector was created, it'll be returned. Additionally if an error happens
/// from the underlying stream then the currently buffered items will be
/// yielded.
///
/// This method is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
///
/// This function is similar to
/// [`StreamExt::ready_chunks`](crate::stream::StreamExt::ready_chunks) but exits
/// early if an error occurs.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::stream::{self, TryReadyChunksError, TryStreamExt};
///
/// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]);
/// let mut stream = stream.try_ready_chunks(2);
///
/// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2])));
/// assert_eq!(stream.try_next().await, Err(TryReadyChunksError(vec![3], 4)));
/// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6])));
/// # })
/// ```
///
/// # Panics
///
/// This method will panic if `capacity` is zero.
#[cfg(feature = "alloc")]
fn try_ready_chunks(self, capacity: usize) -> TryReadyChunks<Self>
where
Self: Sized,
{
assert_stream::<Result<Vec<Self::Ok>, TryReadyChunksError<Self::Ok, Self::Error>>, _>(
TryReadyChunks::new(self, capacity),
)
}

/// Attempt to filter the values produced by this stream according to the
/// provided asynchronous closure.
///
Expand Down
126 changes: 126 additions & 0 deletions futures-util/src/stream/try_stream/try_ready_chunks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use crate::stream::{Fuse, IntoStream, StreamExt};

use alloc::vec::Vec;
use core::fmt;
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
use pin_project_lite::pin_project;

pin_project! {
/// Stream for the [`try_ready_chunks`](super::TryStreamExt::try_ready_chunks) method.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct TryReadyChunks<St: TryStream> {
#[pin]
stream: Fuse<IntoStream<St>>,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
}
}

impl<St: TryStream> TryReadyChunks<St> {
pub(super) fn new(stream: St, capacity: usize) -> Self {
assert!(capacity > 0);

Self { stream: IntoStream::new(stream).fuse(), cap: capacity }
}

delegate_access_inner!(stream, St, (. .));
}

type TryReadyChunksStreamError<St> =
TryReadyChunksError<<St as TryStream>::Ok, <St as TryStream>::Error>;

impl<St: TryStream> Stream for TryReadyChunks<St> {
type Item = Result<Vec<St::Ok>, TryReadyChunksStreamError<St>>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();

let mut items: Vec<St::Ok> = Vec::new();

loop {
match this.stream.as_mut().poll_next(cx) {
// Flush all the collected data if the underlying stream doesn't
// contain more ready values
Poll::Pending => {
return if items.is_empty() {
Poll::Pending
} else {
Poll::Ready(Some(Ok(items)))
}
}

// Push the ready item into the buffer and check whether it is full.
// If so, return the buffer.
Poll::Ready(Some(Ok(item))) => {
if items.is_empty() {
items.reserve_exact(*this.cap);
}
items.push(item);
if items.len() >= *this.cap {
return Poll::Ready(Some(Ok(items)));
}
}

// Return the already collected items and the error.
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(Err(TryReadyChunksError(items, e))));
}

// Since the underlying stream ran out of values, return what we
// have buffered, if we have anything.
Poll::Ready(None) => {
let last = if items.is_empty() { None } else { Some(Ok(items)) };
return Poll::Ready(last);
}
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let (lower, upper) = self.stream.size_hint();
let lower = lower / self.cap;
(lower, upper)
}
}

impl<St: TryStream + FusedStream> FusedStream for TryReadyChunks<St> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for TryReadyChunks<S>
where
S: TryStream + Sink<Item>,
{
type Error = <S as Sink<Item>>::Error;

delegate_sink!(stream, Item);
}

/// Error indicating, that while chunk was collected inner stream produced an error.
///
/// Contains all items that were collected before an error occurred, and the stream error itself.
#[derive(PartialEq, Eq)]
pub struct TryReadyChunksError<T, E>(pub Vec<T>, pub E);

impl<T, E: fmt::Debug> fmt::Debug for TryReadyChunksError<T, E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.1.fmt(f)
}
}

impl<T, E: fmt::Display> fmt::Display for TryReadyChunksError<T, E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.1.fmt(f)
}
}

#[cfg(feature = "std")]
impl<T, E: fmt::Debug + fmt::Display> std::error::Error for TryReadyChunksError<T, E> {}