Skip to content

Commit

Permalink
Add TryFlattenUnordered and improve FlattenUnordered (#2577)
Browse files Browse the repository at this point in the history
  • Loading branch information
olegnn authored and taiki-e committed Mar 11, 2023
1 parent 5e3693a commit a1d09a1
Show file tree
Hide file tree
Showing 9 changed files with 326 additions and 91 deletions.
7 changes: 3 additions & 4 deletions futures-util/benches/flatten_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::test::Bencher;

use futures::channel::oneshot;
use futures::executor::block_on;
use futures::future::{self, FutureExt};
use futures::future;
use futures::stream::{self, StreamExt};
use futures::task::Poll;
use std::collections::VecDeque;
Expand Down Expand Up @@ -35,15 +35,14 @@ fn oneshot_streams(b: &mut Bencher) {
});

let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| {
async {
Box::pin(async {
if let Some(next) = vals.next() {
let val = next.await.unwrap();
Some((val, vals))
} else {
None
}
}
.boxed()
})
})
.flatten_unordered(None);

Expand Down
13 changes: 7 additions & 6 deletions futures-util/src/future/try_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub struct TrySelect<A, B> {

impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {}

type EitherOk<A, B> = Either<(<A as TryFuture>::Ok, B), (<B as TryFuture>::Ok, A)>;
type EitherErr<A, B> = Either<(<A as TryFuture>::Error, B), (<B as TryFuture>::Error, A)>;

/// Waits for either one of two differently-typed futures to complete.
///
/// This function will return a new future which awaits for either one of both
Expand Down Expand Up @@ -52,19 +55,17 @@ where
A: TryFuture + Unpin,
B: TryFuture + Unpin,
{
super::assert_future::<
Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>,
_,
>(TrySelect { inner: Some((future1, future2)) })
super::assert_future::<Result<EitherOk<A, B>, EitherErr<A, B>>, _>(TrySelect {
inner: Some((future1, future2)),
})
}

impl<A: Unpin, B: Unpin> Future for TrySelect<A, B>
where
A: TryFuture,
B: TryFuture,
{
#[allow(clippy::type_complexity)]
type Output = Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>;
type Output = Result<EitherOk<A, B>, EitherErr<A, B>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");
Expand Down
15 changes: 10 additions & 5 deletions futures-util/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
#[allow(clippy::module_inception)]
mod stream;
pub use self::stream::{
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome,
Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip,
All, Any, Chain, Collect, Concat, Count, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten,
Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan,
SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then,
Unzip, Zip,
};

#[cfg(feature = "std")]
Expand All @@ -38,7 +39,9 @@ pub use self::stream::Forward;

#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent};
pub use self::stream::{
BufferUnordered, Buffered, FlatMapUnordered, FlattenUnordered, ForEachConcurrent,
};

#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "sink")]
Expand All @@ -60,7 +63,9 @@ pub use self::try_stream::IntoAsyncRead;

#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent};
pub use self::try_stream::{
TryBufferUnordered, TryBuffered, TryFlattenUnordered, TryForEachConcurrent,
};

#[cfg(feature = "alloc")]
pub use self::try_stream::{TryChunks, TryChunksError};
Expand Down
Loading

0 comments on commit a1d09a1

Please sign in to comment.