From 63c694b4a5f4ce39eed16fe8afd60af97279898b Mon Sep 17 00:00:00 2001 From: olegnn Date: Mon, 21 Mar 2022 18:27:31 +0400 Subject: [PATCH] Restore `Unpin` requirement, fix rest of exports --- futures-util/benches/flatten_unordered.rs | 7 +++--- futures-util/src/stream/mod.rs | 8 +++---- .../src/stream/stream/flatten_unordered.rs | 14 +++++------ futures-util/src/stream/stream/mod.rs | 6 ++--- futures-util/src/stream/try_stream/mod.rs | 2 +- .../try_stream/try_flatten_unordered.rs | 24 ++++++++++--------- futures/tests/stream_try_stream.rs | 1 + 7 files changed, 32 insertions(+), 30 deletions(-) diff --git a/futures-util/benches/flatten_unordered.rs b/futures-util/benches/flatten_unordered.rs index 64d5f9a4e3..b92f614914 100644 --- a/futures-util/benches/flatten_unordered.rs +++ b/futures-util/benches/flatten_unordered.rs @@ -5,7 +5,7 @@ use crate::test::Bencher; use futures::channel::oneshot; use futures::executor::block_on; -use futures::future::{self, FutureExt}; +use futures::future; use futures::stream::{self, StreamExt}; use futures::task::Poll; use std::collections::VecDeque; @@ -35,15 +35,14 @@ fn oneshot_streams(b: &mut Bencher) { }); let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| { - async { + Box::pin(async { if let Some(next) = vals.next() { let val = next.await.unwrap(); Some((val, vals)) } else { None } - } - .boxed() + }) }) .flatten_unordered(None); diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 43e6c050ca..5a1f766aaa 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -18,10 +18,10 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream}; #[allow(clippy::module_inception)] mod stream; pub use self::stream::{ - Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach, - Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome, - Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, TryFold, - TryForEach, Unzip, Zip, + All, Any, Chain, Collect, Concat, Count, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, + Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, + SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, + TryFold, TryForEach, Unzip, Zip, }; #[cfg(feature = "std")] diff --git a/futures-util/src/stream/stream/flatten_unordered.rs b/futures-util/src/stream/stream/flatten_unordered.rs index 3e9fd3f609..66ba4d0d55 100644 --- a/futures-util/src/stream/stream/flatten_unordered.rs +++ b/futures-util/src/stream/stream/flatten_unordered.rs @@ -1,4 +1,4 @@ -use alloc::{boxed::Box, sync::Arc}; +use alloc::sync::Arc; use core::{ cell::UnsafeCell, convert::identity, @@ -285,7 +285,7 @@ pin_project! { #[must_use = "streams do nothing unless polled"] pub struct FlattenUnordered where St: Stream { #[pin] - inner_streams: FuturesUnordered>>>, + inner_streams: FuturesUnordered>, #[pin] stream: St, poll_state: SharedPollState, @@ -315,7 +315,7 @@ where impl FlattenUnordered where St: Stream, - St::Item: Stream, + St::Item: Stream + Unpin, { pub(super) fn new(stream: St, limit: Option) -> FlattenUnordered { let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM); @@ -346,7 +346,7 @@ impl FlattenUnorderedProj<'_, St> where St: Stream, { - /// Checks if current `inner_streams` size is greater than optional limit. + /// Checks if current `inner_streams` bucket size is greater than optional limit. fn is_exceeded_limit(&self) -> bool { self.limit.map_or(false, |limit| self.inner_streams.len() >= limit.get()) } @@ -355,7 +355,7 @@ where impl FusedStream for FlattenUnordered where St: FusedStream, - St::Item: Stream, + St::Item: Stream + Unpin, { fn is_terminated(&self) -> bool { self.stream.is_terminated() && self.inner_streams.is_empty() @@ -365,7 +365,7 @@ where impl Stream for FlattenUnordered where St: Stream, - St::Item: Stream, + St::Item: Stream + Unpin, { type Item = ::Item; @@ -410,7 +410,7 @@ where match this.stream.as_mut().poll_next(&mut cx) { Poll::Ready(Some(inner_stream)) => { - let next_item_fut = PollStreamFut::new(Box::pin(inner_stream)); + let next_item_fut = PollStreamFut::new(inner_stream); // Add new stream to the inner streams bucket this.inner_streams.as_mut().push(next_item_fut); // Inner streams must be polled afterward diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index 26970bec82..384634b004 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -219,7 +219,7 @@ delegate_all!( FlatMapUnordered( FlattenUnordered> ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, limit: Option, f: F| FlattenUnordered::new(Map::new(x, f), limit)] - where St: Stream, U: Stream, F: FnMut(St::Item) -> U + where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U ); #[cfg(not(futures_no_atomic_cas))] @@ -832,7 +832,7 @@ pub trait StreamExt: Stream { #[cfg(feature = "alloc")] fn flatten_unordered(self, limit: impl Into>) -> FlattenUnordered where - Self::Item: Stream, + Self::Item: Stream + Unpin, Self: Sized, { assert_stream::<::Item, _>(FlattenUnordered::new(self, limit.into())) @@ -918,7 +918,7 @@ pub trait StreamExt: Stream { f: F, ) -> FlatMapUnordered where - U: Stream, + U: Stream + Unpin, F: FnMut(Self::Item) -> U, Self: Sized, { diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 3a5d2c513b..8bd0b3435a 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -687,7 +687,7 @@ pub trait TryStreamExt: TryStream { limit: impl Into>, ) -> TryFlattenUnordered where - Self::Ok: futures_core::Stream>, + Self::Ok: futures_core::Stream> + Unpin, E: From, Self: Sized, { diff --git a/futures-util/src/stream/try_stream/try_flatten_unordered.rs b/futures-util/src/stream/try_stream/try_flatten_unordered.rs index a40c5591cd..e7fe9b83e6 100644 --- a/futures-util/src/stream/try_stream/try_flatten_unordered.rs +++ b/futures-util/src/stream/try_stream/try_flatten_unordered.rs @@ -15,15 +15,16 @@ use crate::StreamExt; delegate_all!( /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method. TryFlattenUnordered( - FlattenUnordered> + FlattenUnordered> ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[ |stream: St, limit: impl Into>| - TryFlattenSuccessful::new(stream).flatten_unordered(limit) + ResultToEither::new(stream).flatten_unordered(limit) ] where St: TryStream, St::Ok: Stream>, + St::Ok: Unpin, E: From ); @@ -32,10 +33,11 @@ pin_project! { /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`. #[derive(Debug)] #[must_use = "streams do nothing unless polled"] - pub struct TryFlattenSuccessful + pub struct ResultToEither where St: TryStream, St::Ok: Stream>, + St::Ok: Unpin, E: From { #[pin] @@ -43,10 +45,10 @@ pin_project! { } } -impl TryFlattenSuccessful +impl ResultToEither where St: TryStream, - St::Ok: Stream>, + St::Ok: Stream> + Unpin, E: From, { fn new(stream: St) -> Self { @@ -56,10 +58,10 @@ where delegate_access_inner!(stream, St, ()); } -impl FusedStream for TryFlattenSuccessful +impl FusedStream for ResultToEither where St: TryStream + FusedStream, - St::Ok: Stream>, + St::Ok: Stream> + Unpin, E: From, { fn is_terminated(&self) -> bool { @@ -89,10 +91,10 @@ type SingleResult = Single< Result<<::Ok as TryStream>::Ok, <::Ok as TryStream>::Error>, >; -impl Stream for TryFlattenSuccessful +impl Stream for ResultToEither where St: TryStream, - St::Ok: Stream>, + St::Ok: Stream> + Unpin, E: From, { // Item is either an inner stream or a stream containing a single error. @@ -119,10 +121,10 @@ where // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] -impl Sink for TryFlattenSuccessful +impl Sink for ResultToEither where St: TryStream + Sink, - St::Ok: Stream>, + St::Ok: Stream> + Unpin, E: From<::Error>, { type Error = >::Error; diff --git a/futures/tests/stream_try_stream.rs b/futures/tests/stream_try_stream.rs index 25eae1b549..c9e3d09be4 100644 --- a/futures/tests/stream_try_stream.rs +++ b/futures/tests/stream_try_stream.rs @@ -54,6 +54,7 @@ fn try_flatten_unordered() { Err(val) } }) + .map_ok(Box::pin) .try_flatten_unordered(None); block_on(async move {