Skip to content

Commit

Permalink
Restore Unpin requirement, fix rest of exports
Browse files Browse the repository at this point in the history
  • Loading branch information
olegnn committed Mar 21, 2022
1 parent c21306c commit 63c694b
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 30 deletions.
7 changes: 3 additions & 4 deletions futures-util/benches/flatten_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::test::Bencher;

use futures::channel::oneshot;
use futures::executor::block_on;
use futures::future::{self, FutureExt};
use futures::future;
use futures::stream::{self, StreamExt};
use futures::task::Poll;
use std::collections::VecDeque;
Expand Down Expand Up @@ -35,15 +35,14 @@ fn oneshot_streams(b: &mut Bencher) {
});

let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| {
async {
Box::pin(async {
if let Some(next) = vals.next() {
let val = next.await.unwrap();
Some((val, vals))
} else {
None
}
}
.boxed()
})
})
.flatten_unordered(None);

Expand Down
8 changes: 4 additions & 4 deletions futures-util/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
#[allow(clippy::module_inception)]
mod stream;
pub use self::stream::{
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome,
Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, TryFold,
TryForEach, Unzip, Zip,
All, Any, Chain, Collect, Concat, Count, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten,
Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan,
SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then,
TryFold, TryForEach, Unzip, Zip,
};

#[cfg(feature = "std")]
Expand Down
14 changes: 7 additions & 7 deletions futures-util/src/stream/stream/flatten_unordered.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use alloc::{boxed::Box, sync::Arc};
use alloc::sync::Arc;
use core::{
cell::UnsafeCell,
convert::identity,
Expand Down Expand Up @@ -285,7 +285,7 @@ pin_project! {
#[must_use = "streams do nothing unless polled"]
pub struct FlattenUnordered<St> where St: Stream {
#[pin]
inner_streams: FuturesUnordered<PollStreamFut<Pin<Box<St::Item>>>>,
inner_streams: FuturesUnordered<PollStreamFut<St::Item>>,
#[pin]
stream: St,
poll_state: SharedPollState,
Expand Down Expand Up @@ -315,7 +315,7 @@ where
impl<St> FlattenUnordered<St>
where
St: Stream,
St::Item: Stream,
St::Item: Stream + Unpin,
{
pub(super) fn new(stream: St, limit: Option<usize>) -> FlattenUnordered<St> {
let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM);
Expand Down Expand Up @@ -346,7 +346,7 @@ impl<St> FlattenUnorderedProj<'_, St>
where
St: Stream,
{
/// Checks if current `inner_streams` size is greater than optional limit.
/// Checks if current `inner_streams` bucket size is greater than optional limit.
fn is_exceeded_limit(&self) -> bool {
self.limit.map_or(false, |limit| self.inner_streams.len() >= limit.get())
}
Expand All @@ -355,7 +355,7 @@ where
impl<St> FusedStream for FlattenUnordered<St>
where
St: FusedStream,
St::Item: Stream,
St::Item: Stream + Unpin,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated() && self.inner_streams.is_empty()
Expand All @@ -365,7 +365,7 @@ where
impl<St> Stream for FlattenUnordered<St>
where
St: Stream,
St::Item: Stream,
St::Item: Stream + Unpin,
{
type Item = <St::Item as Stream>::Item;

Expand Down Expand Up @@ -410,7 +410,7 @@ where

match this.stream.as_mut().poll_next(&mut cx) {
Poll::Ready(Some(inner_stream)) => {
let next_item_fut = PollStreamFut::new(Box::pin(inner_stream));
let next_item_fut = PollStreamFut::new(inner_stream);
// Add new stream to the inner streams bucket
this.inner_streams.as_mut().push(next_item_fut);
// Inner streams must be polled afterward
Expand Down
6 changes: 3 additions & 3 deletions futures-util/src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ delegate_all!(
FlatMapUnordered<St, U, F>(
FlattenUnordered<Map<St, F>>
): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, limit: Option<usize>, f: F| FlattenUnordered::new(Map::new(x, f), limit)]
where St: Stream, U: Stream, F: FnMut(St::Item) -> U
where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U
);

#[cfg(not(futures_no_atomic_cas))]
Expand Down Expand Up @@ -832,7 +832,7 @@ pub trait StreamExt: Stream {
#[cfg(feature = "alloc")]
fn flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self>
where
Self::Item: Stream,
Self::Item: Stream + Unpin,
Self: Sized,
{
assert_stream::<<Self::Item as Stream>::Item, _>(FlattenUnordered::new(self, limit.into()))
Expand Down Expand Up @@ -918,7 +918,7 @@ pub trait StreamExt: Stream {
f: F,
) -> FlatMapUnordered<Self, U, F>
where
U: Stream,
U: Stream + Unpin,
F: FnMut(Self::Item) -> U,
Self: Sized,
{
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/try_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ pub trait TryStreamExt: TryStream {
limit: impl Into<Option<usize>>,
) -> TryFlattenUnordered<Self, I, E>
where
Self::Ok: futures_core::Stream<Item = Result<I, E>>,
Self::Ok: futures_core::Stream<Item = Result<I, E>> + Unpin,
E: From<Self::Error>,
Self: Sized,
{
Expand Down
24 changes: 13 additions & 11 deletions futures-util/src/stream/try_stream/try_flatten_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ use crate::StreamExt;
delegate_all!(
/// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method.
TryFlattenUnordered<St, I, E>(
FlattenUnordered<TryFlattenSuccessful<St, I, E>>
FlattenUnordered<ResultToEither<St, I, E>>
): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)]
+ New[
|stream: St, limit: impl Into<Option<usize>>|
TryFlattenSuccessful::new(stream).flatten_unordered(limit)
ResultToEither::new(stream).flatten_unordered(limit)
]
where
St: TryStream,
St::Ok: Stream<Item = Result<I, E>>,
St::Ok: Unpin,
E: From<St::Error>
);

Expand All @@ -32,21 +33,22 @@ pin_project! {
/// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct TryFlattenSuccessful<St, I, E>
pub struct ResultToEither<St, I, E>
where
St: TryStream,
St::Ok: Stream<Item = Result<I, E>>,
St::Ok: Unpin,
E: From<St::Error>
{
#[pin]
stream: St,
}
}

impl<St, I, E> TryFlattenSuccessful<St, I, E>
impl<St, I, E> ResultToEither<St, I, E>
where
St: TryStream,
St::Ok: Stream<Item = Result<I, E>>,
St::Ok: Stream<Item = Result<I, E>> + Unpin,
E: From<St::Error>,
{
fn new(stream: St) -> Self {
Expand All @@ -56,10 +58,10 @@ where
delegate_access_inner!(stream, St, ());
}

impl<St, I, E> FusedStream for TryFlattenSuccessful<St, I, E>
impl<St, I, E> FusedStream for ResultToEither<St, I, E>
where
St: TryStream + FusedStream,
St::Ok: Stream<Item = Result<I, E>>,
St::Ok: Stream<Item = Result<I, E>> + Unpin,
E: From<St::Error>,
{
fn is_terminated(&self) -> bool {
Expand Down Expand Up @@ -89,10 +91,10 @@ type SingleResult<St> = Single<
Result<<<St as TryStream>::Ok as TryStream>::Ok, <<St as TryStream>::Ok as TryStream>::Error>,
>;

impl<St, I, E> Stream for TryFlattenSuccessful<St, I, E>
impl<St, I, E> Stream for ResultToEither<St, I, E>
where
St: TryStream,
St::Ok: Stream<Item = Result<I, E>>,
St::Ok: Stream<Item = Result<I, E>> + Unpin,
E: From<St::Error>,
{
// Item is either an inner stream or a stream containing a single error.
Expand All @@ -119,10 +121,10 @@ where

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<St, I, E, Item> Sink<Item> for TryFlattenSuccessful<St, I, E>
impl<St, I, E, Item> Sink<Item> for ResultToEither<St, I, E>
where
St: TryStream + Sink<Item>,
St::Ok: Stream<Item = Result<I, E>>,
St::Ok: Stream<Item = Result<I, E>> + Unpin,
E: From<<St as TryStream>::Error>,
{
type Error = <St as Sink<Item>>::Error;
Expand Down
1 change: 1 addition & 0 deletions futures/tests/stream_try_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ fn try_flatten_unordered() {
Err(val)
}
})
.map_ok(Box::pin)
.try_flatten_unordered(None);

block_on(async move {
Expand Down

0 comments on commit 63c694b

Please sign in to comment.