Skip to content

Commit

Permalink
Add TryStreamExt::try_forward, remove TryStream bound from `Strea…
Browse files Browse the repository at this point in the history
…mExt::forward` (#2469)

Co-authored-by: mukund <yaymukund@gmail.com>
  • Loading branch information
ibraheemdev and mukund authored Sep 2, 2021
1 parent 37dfb05 commit c0e9368
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 34 deletions.
2 changes: 1 addition & 1 deletion futures-util/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ pub trait AsyncWriteExt: AsyncWrite {
/// use futures::io::AsyncWriteExt;
/// use futures::stream::{self, StreamExt};
///
/// let stream = stream::iter(vec![Ok([1, 2, 3]), Ok([4, 5, 6])]);
/// let stream = stream::iter(vec![[1, 2, 3], [4, 5, 6]]);
///
/// let mut writer = vec![];
///
Expand Down
4 changes: 4 additions & 0 deletions futures-util/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ pub use self::try_stream::IntoAsyncRead;
#[cfg(feature = "alloc")]
pub use self::try_stream::{TryBufferUnordered, TryBuffered};

#[cfg(feature = "sink")]
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
pub use self::try_stream::TryForward;

#[cfg(feature = "alloc")]
pub use self::try_stream::{TryChunks, TryChunksError};

Expand Down
6 changes: 3 additions & 3 deletions futures-util/src/stream/stream/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl<St, Si, Item> Forward<St, Si, Item> {
impl<St, Si, Item, E> FusedFuture for Forward<St, Si, Item>
where
Si: Sink<Item, Error = E>,
St: Stream<Item = Result<Item, E>>,
St: Stream<Item = Item>,
{
fn is_terminated(&self) -> bool {
self.sink.is_none()
Expand All @@ -40,7 +40,7 @@ where
impl<St, Si, Item, E> Future for Forward<St, Si, Item>
where
Si: Sink<Item, Error = E>,
St: Stream<Item = Result<Item, E>>,
St: Stream<Item = Item>,
{
type Output = Result<(), E>;

Expand All @@ -56,7 +56,7 @@ where
si.as_mut().start_send(buffered_item.take().unwrap())?;
}

match stream.as_mut().poll_next(cx)? {
match stream.as_mut().poll_next(cx) {
Poll::Ready(Some(item)) => {
*buffered_item = Some(item);
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/stream/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pin_project! {
}

impl<St> Fuse<St> {
pub(super) fn new(stream: St) -> Self {
pub(crate) fn new(stream: St) -> Self {
Self { stream, done: false }
}

Expand Down
18 changes: 6 additions & 12 deletions futures-util/src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ use alloc::boxed::Box;
#[cfg(feature = "alloc")]
use alloc::vec::Vec;
use core::pin::Pin;
#[cfg(feature = "sink")]
use futures_core::stream::TryStream;
#[cfg(feature = "alloc")]
use futures_core::stream::{BoxStream, LocalBoxStream};
use futures_core::{
Expand Down Expand Up @@ -86,9 +84,9 @@ delegate_all!(
/// Future for the [`forward`](super::StreamExt::forward) method.
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
Forward<St, Si>(
forward::Forward<St, Si, St::Ok>
forward::Forward<St, Si, St::Item>
): Debug + Future + FusedFuture + New[|x: St, y: Si| forward::Forward::new(x, y)]
where St: TryStream
where St: Stream
);

mod for_each;
Expand Down Expand Up @@ -1551,19 +1549,15 @@ pub trait StreamExt: Stream {
/// the sink is closed. Note that neither the original stream nor provided
/// sink will be output by this future. Pass the sink by `Pin<&mut S>`
/// (for example, via `forward(&mut sink)` inside an `async` fn/block) in
/// order to preserve access to the `Sink`. If the stream produces an error,
/// that error will be returned by this future without flushing/closing the sink.
/// order to preserve access to the `Sink`.
#[cfg(feature = "sink")]
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
fn forward<S>(self, sink: S) -> Forward<Self, S>
where
S: Sink<Self::Ok, Error = Self::Error>,
Self: TryStream + Sized,
// Self: TryStream + Sized + Stream<Item = Result<<Self as TryStream>::Ok, <Self as TryStream>::Error>>,
S: Sink<Self::Item>,
Self: Sized,
{
// TODO: type mismatch resolving `<Self as futures_core::Stream>::Item == std::result::Result<<Self as futures_core::TryStream>::Ok, <Self as futures_core::TryStream>::Error>`
// assert_future::<Result<(), Self::Error>, _>(Forward::new(self, sink))
Forward::new(self, sink)
assert_future::<Result<(), S::Error>, _>(Forward::new(self, sink))
}

/// Splits this `Stream + Sink` object into separate `Sink` and `Stream`
Expand Down
39 changes: 37 additions & 2 deletions futures-util/src/stream/try_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ use crate::fns::{
IntoFn, MapErrFn, MapOkFn,
};
use crate::future::assert_future;
use crate::stream::assert_stream;
use crate::stream::{Inspect, Map};
use crate::stream::{assert_stream, Inspect, Map};
#[cfg(feature = "alloc")]
use alloc::vec::Vec;
use core::pin::Pin;
Expand All @@ -20,6 +19,8 @@ use futures_core::{
stream::TryStream,
task::{Context, Poll},
};
#[cfg(feature = "sink")]
use futures_sink::Sink;

mod and_then;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
Expand Down Expand Up @@ -76,6 +77,19 @@ mod try_filter;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_filter::TryFilter;

#[cfg(feature = "sink")]
mod try_forward;

#[cfg(feature = "sink")]
delegate_all!(
/// Future for the [`try_forward`](super::TryStreamExt::try_forward) method.
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
TryForward<St, Si>(
try_forward::TryForward<St, Si, St::Ok>
): Debug + Future + FusedFuture + New[|x: St, y: Si| try_forward::TryForward::new(x, y)]
where St: TryStream
);

mod try_filter_map;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_filter_map::TryFilterMap;
Expand Down Expand Up @@ -287,6 +301,27 @@ pub trait TryStreamExt: TryStream {
assert_stream::<Result<Self::Ok, Fut::Error>, _>(OrElse::new(self, f))
}

/// A future that completes after the given stream has been fully processed
/// into the sink and the sink has been flushed and closed.
///
/// This future will drive the stream to keep producing items until it is
/// exhausted, sending each item to the sink. It will complete once the
/// stream is exhausted, the sink has received and flushed all items, and
/// the sink is closed. Note that neither the original stream nor provided
/// sink will be output by this future. Pass the sink by `Pin<&mut S>`
/// (for example, via `try_forward(&mut sink)` inside an `async` fn/block) in
/// order to preserve access to the `Sink`. If the stream produces an error,
/// that error will be returned by this future without flushing/closing the sink.
#[cfg(feature = "sink")]
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
fn try_forward<S>(self, sink: S) -> TryForward<Self, S>
where
S: Sink<Self::Ok, Error = Self::Error>,
Self: Sized,
{
assert_future::<Result<(), Self::Error>, _>(TryForward::new(self, sink))
}

/// Do something with the success value of this stream, afterwards passing
/// it on.
///
Expand Down
74 changes: 74 additions & 0 deletions futures-util/src/stream/try_stream/try_forward.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use crate::stream::{Fuse, IntoStream, Stream, TryStream};
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::ready;
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
use pin_project_lite::pin_project;

pin_project! {
/// Future for the [`try_forward`](super::TryStreamExt::try_forward) method.
#[project = TryForwardProj]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryForward<St, Si, Item> {
#[pin]
sink: Option<Si>,
#[pin]
stream: Fuse<IntoStream<St>>,
buffered_item: Option<Item>,
}
}

impl<St, Si, Item> TryForward<St, Si, Item> {
pub(crate) fn new(stream: St, sink: Si) -> Self {
Self { sink: Some(sink), stream: Fuse::new(IntoStream::new(stream)), buffered_item: None }
}
}

impl<St, Si, Item, E> FusedFuture for TryForward<St, Si, Item>
where
Si: Sink<Item, Error = E>,
St: TryStream<Ok = Item, Error = E>,
{
fn is_terminated(&self) -> bool {
self.sink.is_none()
}
}

impl<St, Si, Item, E> Future for TryForward<St, Si, Item>
where
Si: Sink<Item, Error = E>,
St: TryStream<Ok = Item, Error = E>,
{
type Output = Result<(), E>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let TryForwardProj { mut sink, mut stream, buffered_item } = self.project();
let mut si = sink.as_mut().as_pin_mut().expect("polled `TryForward` after completion");

loop {
// If we've got an item buffered already, we need to write it to the
// sink before we can do anything else
if buffered_item.is_some() {
ready!(si.as_mut().poll_ready(cx))?;
si.as_mut().start_send(buffered_item.take().unwrap())?;
}

match stream.as_mut().poll_next(cx)? {
Poll::Ready(Some(item)) => {
*buffered_item = Some(item);
}
Poll::Ready(None) => {
ready!(si.poll_close(cx))?;
sink.set(None);
return Poll::Ready(Ok(()));
}
Poll::Pending => {
ready!(si.poll_flush(cx))?;
return Poll::Pending;
}
}
}
}
}
34 changes: 23 additions & 11 deletions futures/tests/auto_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1281,17 +1281,29 @@ pub mod stream {
assert_impl!(ForEachConcurrent<(), PhantomPinned, PhantomPinned>: Unpin);
assert_not_impl!(ForEachConcurrent<PhantomPinned, (), ()>: Unpin);

assert_impl!(Forward<SendTryStream<()>, ()>: Send);
assert_not_impl!(Forward<SendTryStream, ()>: Send);
assert_not_impl!(Forward<SendTryStream<()>, *const ()>: Send);
assert_not_impl!(Forward<LocalTryStream, ()>: Send);
assert_impl!(Forward<SyncTryStream<()>, ()>: Sync);
assert_not_impl!(Forward<SyncTryStream, ()>: Sync);
assert_not_impl!(Forward<SyncTryStream<()>, *const ()>: Sync);
assert_not_impl!(Forward<LocalTryStream, ()>: Sync);
assert_impl!(Forward<UnpinTryStream, ()>: Unpin);
assert_not_impl!(Forward<UnpinTryStream, PhantomPinned>: Unpin);
assert_not_impl!(Forward<PinnedTryStream, ()>: Unpin);
assert_impl!(Forward<SendStream<()>, ()>: Send);
assert_not_impl!(Forward<SendStream, ()>: Send);
assert_not_impl!(Forward<SendStream<()>, *const ()>: Send);
assert_not_impl!(Forward<LocalStream, ()>: Send);
assert_impl!(Forward<SyncStream<()>, ()>: Sync);
assert_not_impl!(Forward<SyncStream, ()>: Sync);
assert_not_impl!(Forward<SyncStream<()>, *const ()>: Sync);
assert_not_impl!(Forward<LocalStream, ()>: Sync);
assert_impl!(Forward<UnpinStream, ()>: Unpin);
assert_not_impl!(Forward<UnpinStream, PhantomPinned>: Unpin);
assert_not_impl!(Forward<PinnedStream, ()>: Unpin);

assert_impl!(TryForward<SendTryStream<()>, ()>: Send);
assert_not_impl!(TryForward<SendTryStream, ()>: Send);
assert_not_impl!(TryForward<SendTryStream<()>, *const ()>: Send);
assert_not_impl!(TryForward<LocalTryStream, ()>: Send);
assert_impl!(TryForward<SyncTryStream<()>, ()>: Sync);
assert_not_impl!(TryForward<SyncTryStream, ()>: Sync);
assert_not_impl!(TryForward<SyncTryStream<()>, *const ()>: Sync);
assert_not_impl!(TryForward<LocalTryStream, ()>: Sync);
assert_impl!(TryForward<UnpinTryStream, ()>: Unpin);
assert_not_impl!(TryForward<UnpinTryStream, PhantomPinned>: Unpin);
assert_not_impl!(TryForward<PinnedTryStream, ()>: Unpin);

assert_impl!(Fuse<()>: Send);
assert_not_impl!(Fuse<*const ()>: Send);
Expand Down
2 changes: 1 addition & 1 deletion futures/tests/sink_fanout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn it_works() {
let collect_fut2 = rx2.collect::<Vec<_>>();
let (_, vec1, vec2) = block_on(async move { join!(fwd, collect_fut1, collect_fut2) });

let expected = (0..10).collect::<Vec<_>>();
let expected = (0..10).map(Ok::<_, ()>).collect::<Vec<_>>();

assert_eq!(vec1, expected);
assert_eq!(vec2, expected);
Expand Down
18 changes: 15 additions & 3 deletions futures/tests_disabled/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,26 @@ fn chunks_panic_on_cap_zero() {
#[test]
fn forward() {
let v = Vec::new();
let v = block_on(iter_ok::<_, Never>(vec![0, 1]).forward(v)).unwrap().1;
let v = block_on(iter(vec![0, 1]).forward(v)).unwrap().1;
assert_eq!(v, vec![0, 1]);

let v = block_on(iter_ok::<_, Never>(vec![2, 3]).forward(v)).unwrap().1;
let v = block_on(iter(vec![2, 3]).forward(v)).unwrap().1;
assert_eq!(v, vec![0, 1, 2, 3]);

assert_done(move || iter(vec![4, 5]).forward(v).map(|(_, s)| s), Ok(vec![0, 1, 2, 3, 4, 5]));
}

#[test]
fn try_forward() {
let v = Vec::new();
let v = block_on(iter_ok::<_, Never>(vec![0, 1]).try_forward(v)).unwrap().1;
assert_eq!(v, vec![0, 1]);

let v = block_on(iter_ok::<_, Never>(vec![2, 3]).try_forward(v)).unwrap().1;
assert_eq!(v, vec![0, 1, 2, 3]);

assert_done(
move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s),
move || iter_ok::<_, Never>(vec![4, 5]).try_forward(v).map(|(_, s)| s),
Ok(vec![0, 1, 2, 3, 4, 5]),
);
}
Expand Down

0 comments on commit c0e9368

Please sign in to comment.