From 212107c539be94f0c090e75e940c087abbca2e7c Mon Sep 17 00:00:00 2001 From: olegnn Date: Sun, 6 Feb 2022 11:52:14 +0300 Subject: [PATCH 01/10] `FlattenUnordered`: improve wakers behavior --- .../src/stream/stream/flatten_unordered.rs | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/futures-util/src/stream/stream/flatten_unordered.rs b/futures-util/src/stream/stream/flatten_unordered.rs index 6361565f2b..07f971c55a 100644 --- a/futures-util/src/stream/stream/flatten_unordered.rs +++ b/futures-util/src/stream/stream/flatten_unordered.rs @@ -92,11 +92,17 @@ impl SharedPollState { let value = self .state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { - let next_value = value | to_poll; - + // Waking process for this waker already started + if value & waking != NONE { + return None; + } + let mut next_value = value | to_poll; + // Only start the waking process if we're not in the polling phase and the stream isn't woken already if value & (WOKEN | POLLING) == NONE { - Some(next_value | waking) - } else if next_value != value { + next_value |= waking; + } + + if next_value != value { Some(next_value) } else { None @@ -141,11 +147,13 @@ impl SharedPollState { fn stop_waking(&self, waking: u8) -> u8 { self.state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { - let next_value = value & !waking; - + let mut next_value = value & !waking; + // Waker will be called only if the current waking state is the same as the specified waker state if value & WAKING_ALL == waking { - Some(next_value | WOKEN) - } else if next_value != value { + next_value |= WOKEN; + } + + if next_value != value { Some(next_value) } else { None From 4441dbd3fb8c0d1ec059ca15ec8971d04198163b Mon Sep 17 00:00:00 2001 From: olegnn Date: Sat, 12 Mar 2022 22:10:40 +0400 Subject: [PATCH 02/10] `FlattenUnordered`: remove `Unpin` requirement, improve docs, simplify logic --- futures-util/src/stream/mod.rs | 5 +- .../src/stream/stream/flatten_unordered.rs | 129 ++++++++---------- futures-util/src/stream/stream/mod.rs | 21 ++- 3 files changed, 74 insertions(+), 81 deletions(-) diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index ad7730f9c5..00f692f8fa 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -39,7 +39,10 @@ pub use self::stream::Forward; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] -pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent, TryForEachConcurrent}; +pub use self::stream::{ + BufferUnordered, Buffered, FlatMapUnordered, FlattenUnordered, ForEachConcurrent, + TryForEachConcurrent, +}; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "sink")] diff --git a/futures-util/src/stream/stream/flatten_unordered.rs b/futures-util/src/stream/stream/flatten_unordered.rs index 07f971c55a..ab5995c614 100644 --- a/futures-util/src/stream/stream/flatten_unordered.rs +++ b/futures-util/src/stream/stream/flatten_unordered.rs @@ -22,8 +22,7 @@ use futures_task::{waker, ArcWake}; use crate::stream::FuturesUnordered; -/// There is nothing to poll and stream isn't being -/// polled or waking at the moment. +/// There is nothing to poll and stream isn't being polled/waking/woken at the moment. const NONE: u8 = 0; /// Inner streams need to be polled. @@ -32,26 +31,19 @@ const NEED_TO_POLL_INNER_STREAMS: u8 = 1; /// The base stream needs to be polled. const NEED_TO_POLL_STREAM: u8 = 0b10; -/// It needs to poll base stream and inner streams. +/// Both base stream and inner streams need to be polled. const NEED_TO_POLL_ALL: u8 = NEED_TO_POLL_INNER_STREAMS | NEED_TO_POLL_STREAM; /// The current stream is being polled at the moment. const POLLING: u8 = 0b100; -/// Inner streams are being woken at the moment. -const WAKING_INNER_STREAMS: u8 = 0b1000; - -/// The base stream is being woken at the moment. -const WAKING_STREAM: u8 = 0b10000; - -/// The base stream and inner streams are being woken at the moment. -const WAKING_ALL: u8 = WAKING_STREAM | WAKING_INNER_STREAMS; +/// Stream is being woken at the moment. +const WAKING: u8 = 0b1000; /// The stream was waked and will be polled. -const WOKEN: u8 = 0b100000; +const WOKEN: u8 = 0b10000; -/// Determines what needs to be polled, and is stream being polled at the -/// moment or not. +/// Internal polling state of the stream. #[derive(Clone, Debug)] struct SharedPollState { state: Arc, @@ -71,7 +63,7 @@ impl SharedPollState { let value = self .state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { - if value & WAKING_ALL == NONE { + if value & WAKING == NONE { Some(POLLING) } else { None @@ -83,23 +75,20 @@ impl SharedPollState { Some((value, bomb)) } - /// Starts the waking process and performs bitwise or with the given value. + /// Attempts to start the waking process and performs bitwise or with the given value. + /// + /// If some waker is already in progress or stream is already woken/being polled, waking process won't start, however + /// state will be disjuncted with the given value. fn start_waking( &self, to_poll: u8, - waking: u8, ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { let value = self .state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { - // Waking process for this waker already started - if value & waking != NONE { - return None; - } let mut next_value = value | to_poll; - // Only start the waking process if we're not in the polling phase and the stream isn't woken already if value & (WOKEN | POLLING) == NONE { - next_value |= waking; + next_value |= WAKING; } if next_value != value { @@ -110,8 +99,9 @@ impl SharedPollState { }) .ok()?; - if value & (WOKEN | POLLING) == NONE { - let bomb = PollStateBomb::new(self, move |state| state.stop_waking(waking)); + // Only start the waking process if we're not in the polling phase and the stream isn't woken already + if value & (WOKEN | POLLING | WAKING) == NONE { + let bomb = PollStateBomb::new(self, SharedPollState::stop_waking); Some((value, bomb)) } else { @@ -123,7 +113,7 @@ impl SharedPollState { /// - `!POLLING` allowing to use wakers /// - `WOKEN` if the state was changed during `POLLING` phase as waker will be called, /// or `will_be_woken` flag supplied - /// - `!WAKING_ALL` as + /// - `!WAKING` as /// * Wakers called during the `POLLING` phase won't propagate their calls /// * `POLLING` phase can't start if some of the wakers are active /// So no wrapped waker can touch the inner waker's cell, it's safe to poll again. @@ -138,20 +128,16 @@ impl SharedPollState { } next_value |= value; - Some(next_value & !POLLING & !WAKING_ALL) + Some(next_value & !POLLING & !WAKING) }) .unwrap() } /// Toggles state to non-waking, allowing to start polling. - fn stop_waking(&self, waking: u8) -> u8 { + fn stop_waking(&self) -> u8 { self.state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { - let mut next_value = value & !waking; - // Waker will be called only if the current waking state is the same as the specified waker state - if value & WAKING_ALL == waking { - next_value |= WOKEN; - } + let next_value = value & !WAKING; if next_value != value { Some(next_value) @@ -201,16 +187,16 @@ impl u8> Drop for PollStateBomb<'_, F> { /// Will update state with the provided value on `wake_by_ref` call /// and then, if there is a need, call `inner_waker`. -struct InnerWaker { +struct WrappedWaker { inner_waker: UnsafeCell>, poll_state: SharedPollState, need_to_poll: u8, } -unsafe impl Send for InnerWaker {} -unsafe impl Sync for InnerWaker {} +unsafe impl Send for WrappedWaker {} +unsafe impl Sync for WrappedWaker {} -impl InnerWaker { +impl WrappedWaker { /// Replaces given waker's inner_waker for polling stream/futures which will /// update poll state on `wake_by_ref` call. Use only if you need several /// contexts. @@ -218,7 +204,7 @@ impl InnerWaker { /// ## Safety /// /// This function will modify waker's `inner_waker` via `UnsafeCell`, so - /// it should be used only during `POLLING` phase. + /// it should be used only during `POLLING` phase by one thread at the time. unsafe fn replace_waker(self_arc: &mut Arc, cx: &Context<'_>) -> Waker { *self_arc.inner_waker.get() = cx.waker().clone().into(); waker(self_arc.clone()) @@ -227,16 +213,11 @@ impl InnerWaker { /// Attempts to start the waking process for the waker with the given value. /// If succeeded, then the stream isn't yet woken and not being polled at the moment. fn start_waking(&self) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { - self.poll_state.start_waking(self.need_to_poll, self.waking_state()) - } - - /// Returns the corresponding waking state toggled by this waker. - fn waking_state(&self) -> u8 { - self.need_to_poll << 3 + self.poll_state.start_waking(self.need_to_poll) } } -impl ArcWake for InnerWaker { +impl ArcWake for WrappedWaker { fn wake_by_ref(self_arc: &Arc) { if let Some((_, state_bomb)) = self_arc.start_waking() { // Safety: now state is not `POLLING` @@ -246,12 +227,8 @@ impl ArcWake for InnerWaker { // Stop waking to allow polling stream let poll_state_value = state_bomb.fire().unwrap(); - // Here we want to call waker only if stream isn't woken yet and - // also to optimize the case when two wakers are called at the same time. - // - // In this case the best strategy will be to propagate only the latest waker's awake, - // and then poll both entities in a single `poll_next` call - if poll_state_value & (WOKEN | WAKING_ALL) == self_arc.waking_state() { + // We want to call waker only if the stream isn't woken yet + if poll_state_value & (WOKEN | WAKING) == WAKING { // Wake up inner waker inner_waker.wake(); } @@ -308,14 +285,14 @@ pin_project! { #[must_use = "streams do nothing unless polled"] pub struct FlattenUnordered where St: Stream { #[pin] - inner_streams: FuturesUnordered>, + inner_streams: FuturesUnordered>>>, #[pin] stream: St, poll_state: SharedPollState, limit: Option, is_stream_done: bool, - inner_streams_waker: Arc, - stream_waker: Arc, + inner_streams_waker: Arc, + stream_waker: Arc, } } @@ -338,7 +315,7 @@ where impl FlattenUnordered where St: Stream, - St::Item: Stream + Unpin, + St::Item: Stream, { pub(super) fn new(stream: St, limit: Option) -> FlattenUnordered { let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM); @@ -348,12 +325,12 @@ where stream, is_stream_done: false, limit: limit.and_then(NonZeroUsize::new), - inner_streams_waker: Arc::new(InnerWaker { + inner_streams_waker: Arc::new(WrappedWaker { inner_waker: UnsafeCell::new(None), poll_state: poll_state.clone(), need_to_poll: NEED_TO_POLL_INNER_STREAMS, }), - stream_waker: Arc::new(InnerWaker { + stream_waker: Arc::new(WrappedWaker { inner_waker: UnsafeCell::new(None), poll_state: poll_state.clone(), need_to_poll: NEED_TO_POLL_STREAM, @@ -369,7 +346,7 @@ impl FlattenUnorderedProj<'_, St> where St: Stream, { - /// Checks if current `inner_streams` size is less than optional limit. + /// Checks if current `inner_streams` size is greater than optional limit. fn is_exceeded_limit(&self) -> bool { self.limit.map_or(false, |limit| self.inner_streams.len() >= limit.get()) } @@ -378,7 +355,7 @@ where impl FusedStream for FlattenUnordered where St: FusedStream, - St::Item: FusedStream + Unpin, + St::Item: Stream, { fn is_terminated(&self) -> bool { self.stream.is_terminated() && self.inner_streams.is_empty() @@ -388,7 +365,7 @@ where impl Stream for FlattenUnordered where St: Stream, - St::Item: Stream + Unpin, + St::Item: Stream, { type Item = ::Item; @@ -407,8 +384,7 @@ where }; if poll_state_value & NEED_TO_POLL_STREAM != NONE { - // Safety: now state is `POLLING`. - let stream_waker = unsafe { InnerWaker::replace_waker(this.stream_waker, cx) }; + let mut stream_waker = None; // Here we need to poll the base stream. // @@ -424,15 +400,24 @@ where break; } else { - match this.stream.as_mut().poll_next(&mut Context::from_waker(&stream_waker)) { + // Initialize base stream waker if it's not yet initialized + if stream_waker.is_none() { + // Safety: now state is `POLLING`. + stream_waker + .replace(unsafe { WrappedWaker::replace_waker(this.stream_waker, cx) }); + } + let mut cx = Context::from_waker(stream_waker.as_ref().unwrap()); + + match this.stream.as_mut().poll_next(&mut cx) { Poll::Ready(Some(inner_stream)) => { + let next_item_fut = PollStreamFut::new(Box::pin(inner_stream)); // Add new stream to the inner streams bucket - this.inner_streams.as_mut().push(PollStreamFut::new(inner_stream)); + this.inner_streams.as_mut().push(next_item_fut); // Inner streams must be polled afterward poll_state_value |= NEED_TO_POLL_INNER_STREAMS; } Poll::Ready(None) => { - // Mark the stream as done + // Mark the base stream as done *this.is_stream_done = true; } Poll::Pending => { @@ -446,13 +431,10 @@ where if poll_state_value & NEED_TO_POLL_INNER_STREAMS != NONE { // Safety: now state is `POLLING`. let inner_streams_waker = - unsafe { InnerWaker::replace_waker(this.inner_streams_waker, cx) }; + unsafe { WrappedWaker::replace_waker(this.inner_streams_waker, cx) }; + let mut cx = Context::from_waker(&inner_streams_waker); - match this - .inner_streams - .as_mut() - .poll_next(&mut Context::from_waker(&inner_streams_waker)) - { + match this.inner_streams.as_mut().poll_next(&mut cx) { Poll::Ready(Some(Some((item, next_item_fut)))) => { // Push next inner stream item future to the list of inner streams futures this.inner_streams.as_mut().push(next_item_fut); @@ -472,15 +454,16 @@ where // We didn't have any `poll_next` panic, so it's time to deactivate the bomb state_bomb.deactivate(); + // Call the waker at the end of polling if let mut force_wake = // we need to poll the stream and didn't reach the limit yet need_to_poll_next & NEED_TO_POLL_STREAM != NONE && !this.is_exceeded_limit() - // or we need to poll inner streams again + // or we need to poll the inner streams again || need_to_poll_next & NEED_TO_POLL_INNER_STREAMS != NONE; // Stop polling and swap the latest state poll_state_value = this.poll_state.stop_polling(need_to_poll_next, force_wake); - // If state was changed during `POLLING` phase, need to manually call a waker + // If state was changed during `POLLING` phase, we also need to manually call a waker force_wake |= poll_state_value & NEED_TO_POLL_ALL != NONE; let is_done = *this.is_stream_done && this.inner_streams.is_empty(); diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index 46a425b363..26970bec82 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -219,7 +219,7 @@ delegate_all!( FlatMapUnordered( FlattenUnordered> ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, limit: Option, f: F| FlattenUnordered::new(Map::new(x, f), limit)] - where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U + where St: Stream, U: Stream, F: FnMut(St::Item) -> U ); #[cfg(not(futures_no_atomic_cas))] @@ -788,7 +788,14 @@ pub trait StreamExt: Stream { } /// Flattens a stream of streams into just one continuous stream. Polls - /// inner streams concurrently. + /// inner streams produced by the base stream concurrently. + /// + /// The only argument is an optional limit on the number of concurrently + /// polled streams. If this limit is not `None`, no more than `limit` streams + /// will be polled at the same time. The `limit` argument is of type + /// `Into>`, and so can be provided as either `None`, + /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as + /// no limit at all, and will have the same result as passing in `None`. /// /// # Examples /// @@ -825,10 +832,10 @@ pub trait StreamExt: Stream { #[cfg(feature = "alloc")] fn flatten_unordered(self, limit: impl Into>) -> FlattenUnordered where - Self::Item: Stream + Unpin, + Self::Item: Stream, Self: Sized, { - FlattenUnordered::new(self, limit.into()) + assert_stream::<::Item, _>(FlattenUnordered::new(self, limit.into())) } /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s. @@ -877,7 +884,7 @@ pub trait StreamExt: Stream { /// /// The first argument is an optional limit on the number of concurrently /// polled streams. If this limit is not `None`, no more than `limit` streams - /// will be polled concurrently. The `limit` argument is of type + /// will be polled at the same time. The `limit` argument is of type /// `Into>`, and so can be provided as either `None`, /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as /// no limit at all, and will have the same result as passing in `None`. @@ -911,11 +918,11 @@ pub trait StreamExt: Stream { f: F, ) -> FlatMapUnordered where - U: Stream + Unpin, + U: Stream, F: FnMut(Self::Item) -> U, Self: Sized, { - FlatMapUnordered::new(self, limit.into(), f) + assert_stream::(FlatMapUnordered::new(self, limit.into(), f)) } /// Combinator similar to [`StreamExt::fold`] that holds internal state From 47f33d9896081f822e240c74c033754ef7045170 Mon Sep 17 00:00:00 2001 From: olegnn Date: Sat, 12 Mar 2022 22:17:41 +0400 Subject: [PATCH 03/10] Basic `TryFlattenUnordered` --- futures-util/src/future/try_select.rs | 12 +- futures-util/src/stream/mod.rs | 4 +- futures-util/src/stream/try_stream/mod.rs | 62 +++++++++ .../src/stream/try_stream/try_chunks.rs | 4 +- .../try_stream/try_flatten_unordered.rs | 123 ++++++++++++++++++ futures/tests/stream_try_stream.rs | 25 ++++ 6 files changed, 222 insertions(+), 8 deletions(-) create mode 100644 futures-util/src/stream/try_stream/try_flatten_unordered.rs diff --git a/futures-util/src/future/try_select.rs b/futures-util/src/future/try_select.rs index 15a6216fde..915a8156f5 100644 --- a/futures-util/src/future/try_select.rs +++ b/futures-util/src/future/try_select.rs @@ -52,18 +52,20 @@ where A: TryFuture + Unpin, B: TryFuture + Unpin, { - super::assert_future::< - Result, Either<(A::Error, B), (B::Error, A)>>, - _, - >(TrySelect { inner: Some((future1, future2)) }) + super::assert_future::, EitherErr>, _>(TrySelect { + inner: Some((future1, future2)), + }) } +type EitherOk = Either<(::Ok, B), (::Ok, A)>; +type EitherErr = Either<(::Error, B), (::Error, A)>; + impl Future for TrySelect where A: TryFuture, B: TryFuture, { - type Output = Result, Either<(A::Error, B), (B::Error, A)>>; + type Output = Result, EitherErr>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 00f692f8fa..2876a8446c 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -53,8 +53,8 @@ pub use self::stream::{ReuniteError, SplitSink, SplitStream}; mod try_stream; pub use self::try_stream::{ try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse, - TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryNext, TrySkipWhile, - TryStreamExt, TryTakeWhile, TryUnfold, + TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryFlattenUnordered, TryNext, + TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold, }; #[cfg(feature = "io")] diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index d9f1f7912a..8026945224 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -14,6 +14,7 @@ use crate::stream::{assert_stream, Inspect, Map}; #[cfg(feature = "alloc")] use alloc::vec::Vec; use core::pin::Pin; +use futures_core::Stream; use futures_core::{ future::{Future, TryFuture}, stream::TryStream, @@ -98,6 +99,10 @@ mod try_flatten; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_flatten::TryFlatten; +mod try_flatten_unordered; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_flatten_unordered::TryFlattenUnordered; + mod try_collect; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_collect::TryCollect; @@ -628,6 +633,63 @@ pub trait TryStreamExt: TryStream { assert_stream::, _>(TryFilterMap::new(self, f)) } + /// Flattens a stream of streams into just one continuous stream. Produced streams + /// will be polled concurrently and any errors are passed through without looking at them. + /// + /// The only argument is an optional limit on the number of concurrently + /// polled streams. If this limit is not `None`, no more than `limit` streams + /// will be polled at the same time. The `limit` argument is of type + /// `Into>`, and so can be provided as either `None`, + /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as + /// no limit at all, and will have the same result as passing in `None`. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::channel::mpsc; + /// use futures::stream::{StreamExt, TryStreamExt}; + /// use std::thread; + /// + /// let (tx1, rx1) = mpsc::unbounded(); + /// let (tx2, rx2) = mpsc::unbounded(); + /// let (tx3, rx3) = mpsc::unbounded(); + /// + /// thread::spawn(move || { + /// tx1.unbounded_send(Ok(1)).unwrap(); + /// }); + /// thread::spawn(move || { + /// tx2.unbounded_send(Ok(2)).unwrap(); + /// tx2.unbounded_send(Err(3)).unwrap(); + /// tx2.unbounded_send(Ok(4)).unwrap(); + /// }); + /// thread::spawn(move || { + /// tx3.unbounded_send(Ok(rx1)).unwrap(); + /// tx3.unbounded_send(Ok(rx2)).unwrap(); + /// tx3.unbounded_send(Err(5)).unwrap(); + /// }); + /// + /// let stream = rx3.try_flatten_unordered(None); + /// let mut values: Vec<_> = stream.collect().await; + /// values.sort(); + /// + /// assert_eq!(values, vec![Ok(1), Ok(2), Ok(4), Err(3), Err(5)]); + /// # }); + /// ``` + fn try_flatten_unordered(self, limit: impl Into>) -> TryFlattenUnordered + where + Self: TryStream, + Self::Ok: TryStream + // Needed because either way compiler can't infer types properly... + + Stream::Ok, ::Error>>, + ::Error: From, + Self: Sized, + { + assert_stream::::Ok, ::Error>, _>( + TryFlattenUnordered::new(self, limit), + ) + } + /// Flattens a stream of streams into just one continuous stream. /// /// If this stream's elements are themselves streams then this combinator diff --git a/futures-util/src/stream/try_stream/try_chunks.rs b/futures-util/src/stream/try_stream/try_chunks.rs index aa7fca16ec..7ff9f5cf80 100644 --- a/futures-util/src/stream/try_stream/try_chunks.rs +++ b/futures-util/src/stream/try_stream/try_chunks.rs @@ -41,8 +41,10 @@ impl TryChunks { delegate_access_inner!(stream, St, (. .)); } +type TryChunksStreamError = TryChunksError<::Ok, ::Error>; + impl Stream for TryChunks { - type Item = Result, TryChunksError>; + type Item = Result, TryChunksStreamError>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.as_mut().project(); diff --git a/futures-util/src/stream/try_stream/try_flatten_unordered.rs b/futures-util/src/stream/try_stream/try_flatten_unordered.rs new file mode 100644 index 0000000000..58749249fc --- /dev/null +++ b/futures-util/src/stream/try_stream/try_flatten_unordered.rs @@ -0,0 +1,123 @@ +use core::pin::Pin; + +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; + +use pin_project_lite::pin_project; + +use crate::future::Either; +use crate::stream::stream::FlattenUnordered; +use crate::StreamExt; + +delegate_all!( + /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method. + TryFlattenUnordered( + FlattenUnordered> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + + New[ + |stream: St, limit: impl Into>| + TryFlattenSuccessful::new(stream).flatten_unordered(limit) + ] + where + St: TryStream, + St::Ok: TryStream, + ::Error: From, + // Needed because either way compiler can't infer types properly... + St::Ok: Stream::Ok, ::Error>> +); + +pin_project! { + /// Flattens successful streams from the given stream, bubbling up the errors. + /// This's a wrapper for `TryFlattenUnordered` to reuse `FlattenUnordered` logic over `TryStream`. + #[derive(Debug)] + pub struct TryFlattenSuccessful { + #[pin] + stream: St, + } +} + +impl TryFlattenSuccessful { + fn new(stream: St) -> Self { + Self { stream } + } + + delegate_access_inner!(stream, St, ()); +} + +impl FusedStream for TryFlattenSuccessful +where + St: TryStream + FusedStream, + St::Ok: TryStream, + ::Error: From, +{ + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + +/// Emits one item immediately, then stream will be terminated. +#[derive(Debug, Clone)] +pub struct One(Option); + +impl Unpin for One {} + +impl Stream for One { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(self.0.take()) + } + + fn size_hint(&self) -> (usize, Option) { + self.0.as_ref().map_or((0, Some(0)), |_| (1, Some(1))) + } +} + +type OneResult = One< + Result<<::Ok as TryStream>::Ok, <::Ok as TryStream>::Error>, +>; + +impl Stream for TryFlattenSuccessful +where + St: TryStream, + St::Ok: TryStream, + ::Error: From, +{ + // Item is either an inner stream or a stream containing a single error. + // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s. + type Item = Either>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let item = ready!(self.project().stream.try_poll_next(cx)); + + let out = item.map(|res| match res { + // Emit inner stream as is + Ok(stream) => Either::Left(stream), + // Wrap an error into stream wrapper containing one item + err @ Err(_) => { + let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into); + + Either::Right(One(Some(res))) + } + }); + + Poll::Ready(out) + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl Sink for TryFlattenSuccessful +where + S: TryStream + Sink, + S::Ok: TryStream, + S::Ok: Stream::Ok, ::Error>>, + ::Error: From<::Error>, +{ + type Error = >::Error; + + delegate_sink!(stream, Item); +} diff --git a/futures/tests/stream_try_stream.rs b/futures/tests/stream_try_stream.rs index d83fc54b1c..f0c9a07e96 100644 --- a/futures/tests/stream_try_stream.rs +++ b/futures/tests/stream_try_stream.rs @@ -4,6 +4,7 @@ use futures::{ stream::{self, StreamExt, TryStreamExt}, task::Poll, }; +use futures_executor::block_on; use futures_test::task::noop_context; #[test] @@ -38,3 +39,27 @@ fn try_take_while_after_err() { .boxed(); assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); } + +#[test] +fn try_flatten_unordered() { + let s = stream::iter(1..5) + .map(|val: u32| { + if val % 2 == 0 { + Ok(stream::unfold((val, 1), |(val, pow)| async move { + Some((val.pow(pow), (val, pow + 1))) + }) + .take(3) + .map(move |val| if val % 2 == 0 { Ok(val) } else { Err(val) })) + } else { + Err(val) + } + }) + .try_flatten_unordered(None); + + block_on(async move { + assert_eq!( + vec![Err(1), Ok(2), Err(3), Ok(4), Ok(4), Ok(16), Ok(8), Ok(64)], + s.collect::>().await + ) + }) +} From f415babc6e79ce80abb5d71130bca69aadf4f189 Mon Sep 17 00:00:00 2001 From: olegnn Date: Sat, 12 Mar 2022 22:40:27 +0400 Subject: [PATCH 04/10] Improve test --- futures/tests/stream_try_stream.rs | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/futures/tests/stream_try_stream.rs b/futures/tests/stream_try_stream.rs index f0c9a07e96..25eae1b549 100644 --- a/futures/tests/stream_try_stream.rs +++ b/futures/tests/stream_try_stream.rs @@ -42,14 +42,14 @@ fn try_take_while_after_err() { #[test] fn try_flatten_unordered() { - let s = stream::iter(1..5) + let s = stream::iter(1..7) .map(|val: u32| { if val % 2 == 0 { Ok(stream::unfold((val, 1), |(val, pow)| async move { Some((val.pow(pow), (val, pow + 1))) }) .take(3) - .map(move |val| if val % 2 == 0 { Ok(val) } else { Err(val) })) + .map(move |val| if val % 16 != 0 { Ok(val) } else { Err(val) })) } else { Err(val) } @@ -58,7 +58,22 @@ fn try_flatten_unordered() { block_on(async move { assert_eq!( - vec![Err(1), Ok(2), Err(3), Ok(4), Ok(4), Ok(16), Ok(8), Ok(64)], + // All numbers can be divided by 16 and odds must be `Err` + // For all basic evens we must have powers from 1 to 3 + vec![ + Err(1), + Ok(2), + Err(3), + Ok(4), + Err(5), + Ok(6), + Ok(4), + Err(16), + Ok(36), + Ok(8), + Err(64), + Ok(216) + ], s.collect::>().await ) }) From 49f1b0835298791e4d80488ead3df9dd9b36dddc Mon Sep 17 00:00:00 2001 From: olegnn Date: Sat, 12 Mar 2022 22:54:58 +0400 Subject: [PATCH 05/10] Misc fixes except for 1.45 backward compatibility --- futures-util/src/stream/stream/flatten_unordered.rs | 2 +- futures-util/src/stream/try_stream/mod.rs | 3 +++ futures-util/src/stream/try_stream/try_flatten_unordered.rs | 1 - 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/futures-util/src/stream/stream/flatten_unordered.rs b/futures-util/src/stream/stream/flatten_unordered.rs index ab5995c614..3e9fd3f609 100644 --- a/futures-util/src/stream/stream/flatten_unordered.rs +++ b/futures-util/src/stream/stream/flatten_unordered.rs @@ -1,4 +1,4 @@ -use alloc::sync::Arc; +use alloc::{boxed::Box, sync::Arc}; use core::{ cell::UnsafeCell, convert::identity, diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 8026945224..b9b23abfde 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -99,7 +99,9 @@ mod try_flatten; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_flatten::TryFlatten; +#[cfg(feature = "alloc")] mod try_flatten_unordered; +#[cfg(feature = "alloc")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_flatten_unordered::TryFlattenUnordered; @@ -676,6 +678,7 @@ pub trait TryStreamExt: TryStream { /// assert_eq!(values, vec![Ok(1), Ok(2), Ok(4), Err(3), Err(5)]); /// # }); /// ``` + #[cfg(feature = "alloc")] fn try_flatten_unordered(self, limit: impl Into>) -> TryFlattenUnordered where Self: TryStream, diff --git a/futures-util/src/stream/try_stream/try_flatten_unordered.rs b/futures-util/src/stream/try_stream/try_flatten_unordered.rs index 58749249fc..ed9ea219b9 100644 --- a/futures-util/src/stream/try_stream/try_flatten_unordered.rs +++ b/futures-util/src/stream/try_stream/try_flatten_unordered.rs @@ -114,7 +114,6 @@ impl Sink for TryFlattenSuccessful where S: TryStream + Sink, S::Ok: TryStream, - S::Ok: Stream::Ok, ::Error>>, ::Error: From<::Error>, { type Error = >::Error; From 1e482efc00579e9a171ab4a011073903c6e35445 Mon Sep 17 00:00:00 2001 From: olegnn Date: Sun, 13 Mar 2022 17:25:41 +0400 Subject: [PATCH 06/10] Fix 1.45 backward compatibility + feature gate --- futures-util/src/future/try_select.rs | 6 +-- futures-util/src/stream/mod.rs | 6 +-- futures-util/src/stream/try_stream/mod.rs | 16 +++--- .../try_stream/try_flatten_unordered.rs | 54 +++++++++++-------- 4 files changed, 47 insertions(+), 35 deletions(-) diff --git a/futures-util/src/future/try_select.rs b/futures-util/src/future/try_select.rs index 915a8156f5..bc282f7db1 100644 --- a/futures-util/src/future/try_select.rs +++ b/futures-util/src/future/try_select.rs @@ -12,6 +12,9 @@ pub struct TrySelect { impl Unpin for TrySelect {} +type EitherOk = Either<(::Ok, B), (::Ok, A)>; +type EitherErr = Either<(::Error, B), (::Error, A)>; + /// Waits for either one of two differently-typed futures to complete. /// /// This function will return a new future which awaits for either one of both @@ -57,9 +60,6 @@ where }) } -type EitherOk = Either<(::Ok, B), (::Ok, A)>; -type EitherErr = Either<(::Error, B), (::Error, A)>; - impl Future for TrySelect where A: TryFuture, diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 2876a8446c..43e6c050ca 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -53,8 +53,8 @@ pub use self::stream::{ReuniteError, SplitSink, SplitStream}; mod try_stream; pub use self::try_stream::{ try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse, - TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryFlattenUnordered, TryNext, - TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold, + TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryNext, TrySkipWhile, + TryStreamExt, TryTakeWhile, TryUnfold, }; #[cfg(feature = "io")] @@ -64,7 +64,7 @@ pub use self::try_stream::IntoAsyncRead; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] -pub use self::try_stream::{TryBufferUnordered, TryBuffered}; +pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryFlattenUnordered}; #[cfg(feature = "sink")] #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index b9b23abfde..4dfc680838 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -14,7 +14,7 @@ use crate::stream::{assert_stream, Inspect, Map}; #[cfg(feature = "alloc")] use alloc::vec::Vec; use core::pin::Pin; -use futures_core::Stream; + use futures_core::{ future::{Future, TryFuture}, stream::TryStream, @@ -99,8 +99,10 @@ mod try_flatten; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_flatten::TryFlatten; +#[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] mod try_flatten_unordered; +#[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_flatten_unordered::TryFlattenUnordered; @@ -678,14 +680,16 @@ pub trait TryStreamExt: TryStream { /// assert_eq!(values, vec![Ok(1), Ok(2), Ok(4), Err(3), Err(5)]); /// # }); /// ``` + #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] - fn try_flatten_unordered(self, limit: impl Into>) -> TryFlattenUnordered + fn try_flatten_unordered( + self, + limit: impl Into>, + ) -> TryFlattenUnordered where Self: TryStream, - Self::Ok: TryStream - // Needed because either way compiler can't infer types properly... - + Stream::Ok, ::Error>>, - ::Error: From, + Self::Ok: futures_core::Stream>, + E: From, Self: Sized, { assert_stream::::Ok, ::Error>, _>( diff --git a/futures-util/src/stream/try_stream/try_flatten_unordered.rs b/futures-util/src/stream/try_stream/try_flatten_unordered.rs index ed9ea219b9..53b76e2ac1 100644 --- a/futures-util/src/stream/try_stream/try_flatten_unordered.rs +++ b/futures-util/src/stream/try_stream/try_flatten_unordered.rs @@ -14,8 +14,8 @@ use crate::StreamExt; delegate_all!( /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method. - TryFlattenUnordered( - FlattenUnordered> + TryFlattenUnordered( + FlattenUnordered> ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[ |stream: St, limit: impl Into>| @@ -23,23 +23,31 @@ delegate_all!( ] where St: TryStream, - St::Ok: TryStream, - ::Error: From, - // Needed because either way compiler can't infer types properly... - St::Ok: Stream::Ok, ::Error>> + St::Ok: Stream>, + E: From ); pin_project! { /// Flattens successful streams from the given stream, bubbling up the errors. - /// This's a wrapper for `TryFlattenUnordered` to reuse `FlattenUnordered` logic over `TryStream`. + /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`. #[derive(Debug)] - pub struct TryFlattenSuccessful { - #[pin] - stream: St, - } + pub struct TryFlattenSuccessful + where + St: TryStream, + St::Ok: Stream>, + E: From + { + #[pin] + stream: St, + } } -impl TryFlattenSuccessful { +impl TryFlattenSuccessful +where + St: TryStream, + St::Ok: Stream>, + E: From, +{ fn new(stream: St) -> Self { Self { stream } } @@ -47,11 +55,11 @@ impl TryFlattenSuccessful { delegate_access_inner!(stream, St, ()); } -impl FusedStream for TryFlattenSuccessful +impl FusedStream for TryFlattenSuccessful where St: TryStream + FusedStream, - St::Ok: TryStream, - ::Error: From, + St::Ok: Stream>, + E: From, { fn is_terminated(&self) -> bool { self.stream.is_terminated() @@ -80,11 +88,11 @@ type OneResult = One< Result<<::Ok as TryStream>::Ok, <::Ok as TryStream>::Error>, >; -impl Stream for TryFlattenSuccessful +impl Stream for TryFlattenSuccessful where St: TryStream, - St::Ok: TryStream, - ::Error: From, + St::Ok: Stream>, + E: From, { // Item is either an inner stream or a stream containing a single error. // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s. @@ -110,13 +118,13 @@ where // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] -impl Sink for TryFlattenSuccessful +impl Sink for TryFlattenSuccessful where - S: TryStream + Sink, - S::Ok: TryStream, - ::Error: From<::Error>, + St: TryStream + Sink, + St::Ok: Stream>, + E: From<::Error>, { - type Error = >::Error; + type Error = >::Error; delegate_sink!(stream, Item); } From c21306c882e355c7832beabcdc8f08ae336349d3 Mon Sep 17 00:00:00 2001 From: olegnn Date: Sun, 13 Mar 2022 18:05:17 +0400 Subject: [PATCH 07/10] Minor tweaks --- futures-util/src/stream/try_stream/mod.rs | 1 - .../try_stream/try_flatten_unordered.rs | 21 ++++++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 4dfc680838..3a5d2c513b 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -687,7 +687,6 @@ pub trait TryStreamExt: TryStream { limit: impl Into>, ) -> TryFlattenUnordered where - Self: TryStream, Self::Ok: futures_core::Stream>, E: From, Self: Sized, diff --git a/futures-util/src/stream/try_stream/try_flatten_unordered.rs b/futures-util/src/stream/try_stream/try_flatten_unordered.rs index 53b76e2ac1..a40c5591cd 100644 --- a/futures-util/src/stream/try_stream/try_flatten_unordered.rs +++ b/futures-util/src/stream/try_stream/try_flatten_unordered.rs @@ -28,9 +28,10 @@ delegate_all!( ); pin_project! { - /// Flattens successful streams from the given stream, bubbling up the errors. + /// Emits either successful streams or single-item streams containing the underlying errors. /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`. #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] pub struct TryFlattenSuccessful where St: TryStream, @@ -66,13 +67,13 @@ where } } -/// Emits one item immediately, then stream will be terminated. +/// Emits single item immediately, then stream will be terminated. #[derive(Debug, Clone)] -pub struct One(Option); +pub struct Single(Option); -impl Unpin for One {} +impl Unpin for Single {} -impl Stream for One { +impl Stream for Single { type Item = T; fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { @@ -84,7 +85,7 @@ impl Stream for One { } } -type OneResult = One< +type SingleResult = Single< Result<<::Ok as TryStream>::Ok, <::Ok as TryStream>::Error>, >; @@ -96,19 +97,19 @@ where { // Item is either an inner stream or a stream containing a single error. // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s. - type Item = Either>; + type Item = Either>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let item = ready!(self.project().stream.try_poll_next(cx)); let out = item.map(|res| match res { - // Emit inner stream as is + // Emit successful inner stream as is Ok(stream) => Either::Left(stream), - // Wrap an error into stream wrapper containing one item + // Wrap an error into a stream containing a single item err @ Err(_) => { let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into); - Either::Right(One(Some(res))) + Either::Right(Single(Some(res))) } }); From 63c694b4a5f4ce39eed16fe8afd60af97279898b Mon Sep 17 00:00:00 2001 From: olegnn Date: Mon, 21 Mar 2022 18:27:31 +0400 Subject: [PATCH 08/10] Restore `Unpin` requirement, fix rest of exports --- futures-util/benches/flatten_unordered.rs | 7 +++--- futures-util/src/stream/mod.rs | 8 +++---- .../src/stream/stream/flatten_unordered.rs | 14 +++++------ futures-util/src/stream/stream/mod.rs | 6 ++--- futures-util/src/stream/try_stream/mod.rs | 2 +- .../try_stream/try_flatten_unordered.rs | 24 ++++++++++--------- futures/tests/stream_try_stream.rs | 1 + 7 files changed, 32 insertions(+), 30 deletions(-) diff --git a/futures-util/benches/flatten_unordered.rs b/futures-util/benches/flatten_unordered.rs index 64d5f9a4e3..b92f614914 100644 --- a/futures-util/benches/flatten_unordered.rs +++ b/futures-util/benches/flatten_unordered.rs @@ -5,7 +5,7 @@ use crate::test::Bencher; use futures::channel::oneshot; use futures::executor::block_on; -use futures::future::{self, FutureExt}; +use futures::future; use futures::stream::{self, StreamExt}; use futures::task::Poll; use std::collections::VecDeque; @@ -35,15 +35,14 @@ fn oneshot_streams(b: &mut Bencher) { }); let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| { - async { + Box::pin(async { if let Some(next) = vals.next() { let val = next.await.unwrap(); Some((val, vals)) } else { None } - } - .boxed() + }) }) .flatten_unordered(None); diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 43e6c050ca..5a1f766aaa 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -18,10 +18,10 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream}; #[allow(clippy::module_inception)] mod stream; pub use self::stream::{ - Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach, - Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome, - Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, TryFold, - TryForEach, Unzip, Zip, + All, Any, Chain, Collect, Concat, Count, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, + Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, + SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, + TryFold, TryForEach, Unzip, Zip, }; #[cfg(feature = "std")] diff --git a/futures-util/src/stream/stream/flatten_unordered.rs b/futures-util/src/stream/stream/flatten_unordered.rs index 3e9fd3f609..66ba4d0d55 100644 --- a/futures-util/src/stream/stream/flatten_unordered.rs +++ b/futures-util/src/stream/stream/flatten_unordered.rs @@ -1,4 +1,4 @@ -use alloc::{boxed::Box, sync::Arc}; +use alloc::sync::Arc; use core::{ cell::UnsafeCell, convert::identity, @@ -285,7 +285,7 @@ pin_project! { #[must_use = "streams do nothing unless polled"] pub struct FlattenUnordered where St: Stream { #[pin] - inner_streams: FuturesUnordered>>>, + inner_streams: FuturesUnordered>, #[pin] stream: St, poll_state: SharedPollState, @@ -315,7 +315,7 @@ where impl FlattenUnordered where St: Stream, - St::Item: Stream, + St::Item: Stream + Unpin, { pub(super) fn new(stream: St, limit: Option) -> FlattenUnordered { let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM); @@ -346,7 +346,7 @@ impl FlattenUnorderedProj<'_, St> where St: Stream, { - /// Checks if current `inner_streams` size is greater than optional limit. + /// Checks if current `inner_streams` bucket size is greater than optional limit. fn is_exceeded_limit(&self) -> bool { self.limit.map_or(false, |limit| self.inner_streams.len() >= limit.get()) } @@ -355,7 +355,7 @@ where impl FusedStream for FlattenUnordered where St: FusedStream, - St::Item: Stream, + St::Item: Stream + Unpin, { fn is_terminated(&self) -> bool { self.stream.is_terminated() && self.inner_streams.is_empty() @@ -365,7 +365,7 @@ where impl Stream for FlattenUnordered where St: Stream, - St::Item: Stream, + St::Item: Stream + Unpin, { type Item = ::Item; @@ -410,7 +410,7 @@ where match this.stream.as_mut().poll_next(&mut cx) { Poll::Ready(Some(inner_stream)) => { - let next_item_fut = PollStreamFut::new(Box::pin(inner_stream)); + let next_item_fut = PollStreamFut::new(inner_stream); // Add new stream to the inner streams bucket this.inner_streams.as_mut().push(next_item_fut); // Inner streams must be polled afterward diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index 26970bec82..384634b004 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -219,7 +219,7 @@ delegate_all!( FlatMapUnordered( FlattenUnordered> ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, limit: Option, f: F| FlattenUnordered::new(Map::new(x, f), limit)] - where St: Stream, U: Stream, F: FnMut(St::Item) -> U + where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U ); #[cfg(not(futures_no_atomic_cas))] @@ -832,7 +832,7 @@ pub trait StreamExt: Stream { #[cfg(feature = "alloc")] fn flatten_unordered(self, limit: impl Into>) -> FlattenUnordered where - Self::Item: Stream, + Self::Item: Stream + Unpin, Self: Sized, { assert_stream::<::Item, _>(FlattenUnordered::new(self, limit.into())) @@ -918,7 +918,7 @@ pub trait StreamExt: Stream { f: F, ) -> FlatMapUnordered where - U: Stream, + U: Stream + Unpin, F: FnMut(Self::Item) -> U, Self: Sized, { diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 3a5d2c513b..8bd0b3435a 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -687,7 +687,7 @@ pub trait TryStreamExt: TryStream { limit: impl Into>, ) -> TryFlattenUnordered where - Self::Ok: futures_core::Stream>, + Self::Ok: futures_core::Stream> + Unpin, E: From, Self: Sized, { diff --git a/futures-util/src/stream/try_stream/try_flatten_unordered.rs b/futures-util/src/stream/try_stream/try_flatten_unordered.rs index a40c5591cd..e7fe9b83e6 100644 --- a/futures-util/src/stream/try_stream/try_flatten_unordered.rs +++ b/futures-util/src/stream/try_stream/try_flatten_unordered.rs @@ -15,15 +15,16 @@ use crate::StreamExt; delegate_all!( /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method. TryFlattenUnordered( - FlattenUnordered> + FlattenUnordered> ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[ |stream: St, limit: impl Into>| - TryFlattenSuccessful::new(stream).flatten_unordered(limit) + ResultToEither::new(stream).flatten_unordered(limit) ] where St: TryStream, St::Ok: Stream>, + St::Ok: Unpin, E: From ); @@ -32,10 +33,11 @@ pin_project! { /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`. #[derive(Debug)] #[must_use = "streams do nothing unless polled"] - pub struct TryFlattenSuccessful + pub struct ResultToEither where St: TryStream, St::Ok: Stream>, + St::Ok: Unpin, E: From { #[pin] @@ -43,10 +45,10 @@ pin_project! { } } -impl TryFlattenSuccessful +impl ResultToEither where St: TryStream, - St::Ok: Stream>, + St::Ok: Stream> + Unpin, E: From, { fn new(stream: St) -> Self { @@ -56,10 +58,10 @@ where delegate_access_inner!(stream, St, ()); } -impl FusedStream for TryFlattenSuccessful +impl FusedStream for ResultToEither where St: TryStream + FusedStream, - St::Ok: Stream>, + St::Ok: Stream> + Unpin, E: From, { fn is_terminated(&self) -> bool { @@ -89,10 +91,10 @@ type SingleResult = Single< Result<<::Ok as TryStream>::Ok, <::Ok as TryStream>::Error>, >; -impl Stream for TryFlattenSuccessful +impl Stream for ResultToEither where St: TryStream, - St::Ok: Stream>, + St::Ok: Stream> + Unpin, E: From, { // Item is either an inner stream or a stream containing a single error. @@ -119,10 +121,10 @@ where // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] -impl Sink for TryFlattenSuccessful +impl Sink for ResultToEither where St: TryStream + Sink, - St::Ok: Stream>, + St::Ok: Stream> + Unpin, E: From<::Error>, { type Error = >::Error; diff --git a/futures/tests/stream_try_stream.rs b/futures/tests/stream_try_stream.rs index 25eae1b549..c9e3d09be4 100644 --- a/futures/tests/stream_try_stream.rs +++ b/futures/tests/stream_try_stream.rs @@ -54,6 +54,7 @@ fn try_flatten_unordered() { Err(val) } }) + .map_ok(Box::pin) .try_flatten_unordered(None); block_on(async move { From 2fad8272a6962fd3d3fbff7e17107c147e9affff Mon Sep 17 00:00:00 2001 From: olegnn Date: Tue, 22 Mar 2022 22:00:43 +0400 Subject: [PATCH 09/10] Fix types --- futures-util/src/stream/try_stream/mod.rs | 9 ++-- .../try_stream/try_flatten_unordered.rs | 42 ++++++++++--------- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 8bd0b3435a..62090414f5 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -682,13 +682,10 @@ pub trait TryStreamExt: TryStream { /// ``` #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] - fn try_flatten_unordered( - self, - limit: impl Into>, - ) -> TryFlattenUnordered + fn try_flatten_unordered(self, limit: impl Into>) -> TryFlattenUnordered where - Self::Ok: futures_core::Stream> + Unpin, - E: From, + Self::Ok: TryStream + Unpin, + ::Error: From, Self: Sized, { assert_stream::::Ok, ::Error>, _>( diff --git a/futures-util/src/stream/try_stream/try_flatten_unordered.rs b/futures-util/src/stream/try_stream/try_flatten_unordered.rs index e7fe9b83e6..b9a61cddfa 100644 --- a/futures-util/src/stream/try_stream/try_flatten_unordered.rs +++ b/futures-util/src/stream/try_stream/try_flatten_unordered.rs @@ -12,10 +12,12 @@ use crate::future::Either; use crate::stream::stream::FlattenUnordered; use crate::StreamExt; +use super::IntoStream; + delegate_all!( /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method. - TryFlattenUnordered( - FlattenUnordered> + TryFlattenUnordered( + FlattenUnordered> ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[ |stream: St, limit: impl Into>| @@ -23,9 +25,9 @@ delegate_all!( ] where St: TryStream, - St::Ok: Stream>, + St::Ok: TryStream, St::Ok: Unpin, - E: From + ::Error: From ); pin_project! { @@ -33,23 +35,23 @@ pin_project! { /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`. #[derive(Debug)] #[must_use = "streams do nothing unless polled"] - pub struct ResultToEither + pub struct ResultToEither where St: TryStream, - St::Ok: Stream>, + St::Ok: TryStream, St::Ok: Unpin, - E: From + ::Error: From { #[pin] stream: St, } } -impl ResultToEither +impl ResultToEither where St: TryStream, - St::Ok: Stream> + Unpin, - E: From, + St::Ok: TryStream + Unpin, + ::Error: From, { fn new(stream: St) -> Self { Self { stream } @@ -58,11 +60,11 @@ where delegate_access_inner!(stream, St, ()); } -impl FusedStream for ResultToEither +impl FusedStream for ResultToEither where St: TryStream + FusedStream, - St::Ok: Stream> + Unpin, - E: From, + St::Ok: TryStream + Unpin, + ::Error: From, { fn is_terminated(&self) -> bool { self.stream.is_terminated() @@ -91,22 +93,22 @@ type SingleResult = Single< Result<<::Ok as TryStream>::Ok, <::Ok as TryStream>::Error>, >; -impl Stream for ResultToEither +impl Stream for ResultToEither where St: TryStream, - St::Ok: Stream> + Unpin, - E: From, + St::Ok: TryStream + Unpin, + ::Error: From, { // Item is either an inner stream or a stream containing a single error. // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s. - type Item = Either>; + type Item = Either, SingleResult>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let item = ready!(self.project().stream.try_poll_next(cx)); let out = item.map(|res| match res { // Emit successful inner stream as is - Ok(stream) => Either::Left(stream), + Ok(stream) => Either::Left(IntoStream::new(stream)), // Wrap an error into a stream containing a single item err @ Err(_) => { let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into); @@ -121,11 +123,11 @@ where // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] -impl Sink for ResultToEither +impl Sink for ResultToEither where St: TryStream + Sink, St::Ok: Stream> + Unpin, - E: From<::Error>, + ::Error: From<::Error>, { type Error = >::Error; From 2c48da2f44cb585b35baa026adbd00b238452e55 Mon Sep 17 00:00:00 2001 From: olegnn Date: Tue, 29 Mar 2022 16:40:26 +0400 Subject: [PATCH 10/10] Better naming --- .../try_stream/try_flatten_unordered.rs | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/futures-util/src/stream/try_stream/try_flatten_unordered.rs b/futures-util/src/stream/try_stream/try_flatten_unordered.rs index b9a61cddfa..aaad910bf0 100644 --- a/futures-util/src/stream/try_stream/try_flatten_unordered.rs +++ b/futures-util/src/stream/try_stream/try_flatten_unordered.rs @@ -17,11 +17,11 @@ use super::IntoStream; delegate_all!( /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method. TryFlattenUnordered( - FlattenUnordered> + FlattenUnordered> ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[ |stream: St, limit: impl Into>| - ResultToEither::new(stream).flatten_unordered(limit) + TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams::new(stream).flatten_unordered(limit) ] where St: TryStream, @@ -35,7 +35,7 @@ pin_project! { /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`. #[derive(Debug)] #[must_use = "streams do nothing unless polled"] - pub struct ResultToEither + pub struct TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams where St: TryStream, St::Ok: TryStream, @@ -47,7 +47,7 @@ pin_project! { } } -impl ResultToEither +impl TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams where St: TryStream, St::Ok: TryStream + Unpin, @@ -60,7 +60,7 @@ where delegate_access_inner!(stream, St, ()); } -impl FusedStream for ResultToEither +impl FusedStream for TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams where St: TryStream + FusedStream, St::Ok: TryStream + Unpin, @@ -89,11 +89,9 @@ impl Stream for Single { } } -type SingleResult = Single< - Result<<::Ok as TryStream>::Ok, <::Ok as TryStream>::Error>, ->; +type SingleStreamResult = Single::Ok, ::Error>>; -impl Stream for ResultToEither +impl Stream for TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams where St: TryStream, St::Ok: TryStream + Unpin, @@ -101,7 +99,7 @@ where { // Item is either an inner stream or a stream containing a single error. // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s. - type Item = Either, SingleResult>; + type Item = Either, SingleStreamResult>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let item = ready!(self.project().stream.try_poll_next(cx)); @@ -123,7 +121,7 @@ where // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] -impl Sink for ResultToEither +impl Sink for TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams where St: TryStream + Sink, St::Ok: Stream> + Unpin,