Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add future::try_select #1622

Merged
merged 2 commits into from
May 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 2 additions & 47 deletions futures-util/src/try_future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ mod try_join_all;
#[cfg(feature = "alloc")]
pub use self::try_join_all::{try_join_all, TryJoinAll};

// TODO
// mod try_select;
// pub use self::try_select::{try_select, TrySelect};
mod try_select;
pub use self::try_select::{try_select, TrySelect};

#[cfg(feature = "alloc")]
mod select_ok;
Expand Down Expand Up @@ -319,50 +318,6 @@ pub trait TryFutureExt: TryFuture {
OrElse::new(self, f)
}

/* TODO
/// Waits for either one of two differently-typed futures to complete.
///
/// This function will return a new future which awaits for either this or
/// the `other` future to complete. The returned future will finish with
/// both the value resolved and a future representing the completion of the
/// other work.
///
/// Note that this function consumes the receiving futures and returns a
/// wrapped version of them.
///
/// Also note that if both this and the second future have the same
/// success/error type you can use the `Either::split` method to
/// conveniently extract out the value at the end.
///
/// # Examples
///
/// ```
/// use futures::future::{self, Either};
///
/// // A poor-man's join implemented on top of select
///
/// fn join<A, B, E>(a: A, b: B) -> Box<Future<Item=(A::Item, B::Item), Error=E>>
/// where A: Future<Error = E> + 'static,
/// B: Future<Error = E> + 'static,
/// E: 'static,
/// {
/// Box::new(a.select(b).then(|res| -> Box<Future<Item=_, Error=_>> {
/// match res {
/// Ok(Either::Left((x, b))) => Box::new(b.map(move |y| (x, y))),
/// Ok(Either::Right((y, a))) => Box::new(a.map(move |x| (x, y))),
/// Err(Either::Left((e, _))) => Box::new(future::err(e)),
/// Err(Either::Right((e, _))) => Box::new(future::err(e)),
/// }
/// }))
/// }}
/// ```
fn select<B>(self, other: B) -> Select<Self, B::Future>
where B: IntoFuture, Self: Sized
{
select::new(self, other.into_future())
}
*/

/// Unwraps this future's ouput, producing a future with this future's
/// [`Ok`](TryFuture::Ok) type as its
/// [`Output`](std::future::Future::Output) type.
Expand Down
80 changes: 80 additions & 0 deletions futures-util/src/try_future/try_select.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use core::pin::Pin;
use futures_core::future::{Future, TryFuture};
use futures_core::task::{Context, Poll};
use crate::future::Either;

/// Future for the [`try_select()`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct TrySelect<A: Unpin, B: Unpin> {
inner: Option<(A, B)>,
}

impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {}

/// Waits for either one of two differently-typed futures to complete.
///
/// This function will return a new future which awaits for either one of both
/// futures to complete. The returned future will finish with both the value
/// resolved and a future representing the completion of the other work.
///
/// Note that this function consumes the receiving futures and returns a
/// wrapped version of them.
///
/// Also note that if both this and the second future have the same
/// success/error type you can use the `Either::factor_first` method to
/// conveniently extract out the value at the end.
///
/// # Examples
///
/// ```
/// use futures::future::{self, Either, Future, FutureExt, TryFuture, TryFutureExt};
///
/// // A poor-man's try_join implemented on top of select
///
/// fn try_join<A, B, E>(a: A, b: B) -> impl TryFuture<Ok=(A::Ok, B::Ok), Error=E>
/// where A: TryFuture<Error = E> + Unpin + 'static,
/// B: TryFuture<Error = E> + Unpin + 'static,
/// E: 'static,
/// {
/// future::try_select(a, b).then(|res| -> Box<dyn Future<Output = Result<_, _>> + Unpin> {
/// match res {
/// Ok(Either::Left((x, b))) => Box::new(b.map_ok(move |y| (x, y))),
/// Ok(Either::Right((y, a))) => Box::new(a.map_ok(move |x| (x, y))),
/// Err(Either::Left((e, _))) => Box::new(future::err(e)),
/// Err(Either::Right((e, _))) => Box::new(future::err(e)),
/// }
/// })
/// }
/// ```
pub fn try_select<A, B>(future1: A, future2: B) -> TrySelect<A, B>
where A: TryFuture + Unpin, B: TryFuture + Unpin
{
TrySelect { inner: Some((future1, future2)) }
}

impl<A: Unpin, B: Unpin> Future for TrySelect<A, B>
where A: TryFuture, B: TryFuture
{
#[allow(clippy::type_complexity)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol-- no kidding

type Output = Result<
Either<(A::Ok, B), (B::Ok, A)>,
Either<(A::Error, B), (B::Error, A)>,
>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");
match Pin::new(&mut a).try_poll(cx) {
Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Left((x, b)))),
Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Left((x, b)))),
Poll::Pending => match Pin::new(&mut b).try_poll(cx) {
Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Right((x, a)))),
Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Right((x, a)))),
Poll::Pending => {
self.inner = Some((a, b));
Poll::Pending
}
}
}
}
}
1 change: 1 addition & 0 deletions futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ pub mod future {
pub use futures_util::try_future::{
try_join, try_join3, try_join4, try_join5,
TryJoin, TryJoin3, TryJoin4, TryJoin5,
try_select, TrySelect,

TryFutureExt,
AndThen, ErrInto, FlattenSink, IntoFuture, MapErr, MapOk, OrElse,
Expand Down
37 changes: 19 additions & 18 deletions futures/tests/futures_ordered.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use futures::channel::oneshot;
use futures::executor::{block_on, block_on_stream};
use futures::future::{self, join, FutureExt};
use futures::future::{self, join, Future, FutureExt, TryFutureExt};
use futures::stream::{StreamExt, FuturesOrdered};
use futures_test::task::noop_context;
use std::any::Any;

#[test]
fn works_1() {
Expand Down Expand Up @@ -56,27 +57,27 @@ fn from_iterator() {
assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1,2,3]);
}

/* ToDo: This requires FutureExt::select to be implemented
#[test]
fn queue_never_unblocked() {
let (_a_tx, a_rx) = oneshot::channel::<Box<Any+Send>>();
let (b_tx, b_rx) = oneshot::channel::<Box<Any+Send>>();
let (c_tx, c_rx) = oneshot::channel::<Box<Any+Send>>();
let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>();
let (b_tx, b_rx) = oneshot::channel::<Box<dyn Any + Send>>();
let (c_tx, c_rx) = oneshot::channel::<Box<dyn Any + Send>>();

let mut stream = vec![
Box::new(a_rx) as Box<Future<Item = _, Error = _>>,
Box::new(b_rx.select(c_rx).then(|res| Ok(Box::new(res) as Box<Any+Send>))) as _,
Box::new(a_rx) as Box<dyn Future<Output = _> + Unpin>,
Box::new(future::try_select(b_rx, c_rx)
.map_err(|e| e.factor_first().0)
.and_then(|e| future::ok(Box::new(e) as Box<dyn Any + Send>))) as _,
].into_iter().collect::<FuturesOrdered<_>>();

with_no_spawn_context(|cx| {
for _ in 0..10 {
assert!(stream.poll_next(cx).unwrap().is_pending());
}
let cx = &mut noop_context();
for _ in 0..10 {
assert!(stream.poll_next_unpin(cx).is_pending());
}

b_tx.send(Box::new(())).unwrap();
assert!(stream.poll_next(cx).unwrap().is_pending());
c_tx.send(Box::new(())).unwrap();
assert!(stream.poll_next(cx).unwrap().is_pending());
assert!(stream.poll_next(cx).unwrap().is_pending());
})
}*/
b_tx.send(Box::new(())).unwrap();
assert!(stream.poll_next_unpin(cx).is_pending());
c_tx.send(Box::new(())).unwrap();
assert!(stream.poll_next_unpin(cx).is_pending());
assert!(stream.poll_next_unpin(cx).is_pending());
}
28 changes: 13 additions & 15 deletions futures/tests/futures_unordered.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures::channel::oneshot;
use futures::executor::{block_on, block_on_stream};
use futures::future::{self, join, FutureExt};
use futures::future::{self, join, Future, FutureExt};
use futures::stream::{StreamExt, FuturesUnordered};
use futures::task::Poll;
use futures_test::{assert_stream_done, assert_stream_next};
Expand Down Expand Up @@ -57,30 +57,28 @@ fn from_iterator() {
assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1,2,3]);
}

/* ToDo: This requires FutureExt::select to be implemented
#[test]
fn finished_future() {
let (_a_tx, a_rx) = oneshot::channel::<i32>();
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();

let mut stream = vec![
a_rx.boxed(),
b_rx.select(c_rx).boxed(),
Box::new(a_rx) as Box<dyn Future<Output = Result<_, _>> + Unpin>,
Box::new(future::select(b_rx, c_rx).map(|e| e.factor_first().0)) as _,
].into_iter().collect::<FuturesUnordered<_>>();

support::with_noop_waker_context(f)(|cx| {
for _ in 0..10 {
assert!(stream.poll_next_unpin(cx).is_pending());
}

b_tx.send(12).unwrap();
assert!(stream.poll_next_unpin(cx).is_ready());
c_tx.send(3).unwrap();
assert!(stream.poll_next_unpin(cx).is_pending());
let cx = &mut noop_context();
for _ in 0..10 {
assert!(stream.poll_next_unpin(cx).is_pending());
})
}*/
}

b_tx.send(12).unwrap();
c_tx.send(3).unwrap();
assert!(stream.poll_next_unpin(cx).is_ready());
assert!(stream.poll_next_unpin(cx).is_pending());
assert!(stream.poll_next_unpin(cx).is_pending());
}

#[test]
fn iter_mut_cancel() {
Expand Down
9 changes: 4 additions & 5 deletions futures/tests/shared.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures::channel::oneshot;
use futures::executor::{block_on, LocalPool};
use futures::future::{self, FutureExt, LocalFutureObj};
use futures::future::{self, FutureExt, TryFutureExt, LocalFutureObj};
use futures::task::LocalSpawn;
use std::cell::{Cell, RefCell};
use std::rc::Rc;
Expand Down Expand Up @@ -41,7 +41,6 @@ fn many_threads() {
send_shared_oneshot_and_wait_on_multiple_threads(1000);
}

/* ToDo: This requires FutureExt::select to be implemented
#[test]
fn drop_on_one_task_ok() {
let (tx, rx) = oneshot::channel::<u32>();
Expand All @@ -51,14 +50,14 @@ fn drop_on_one_task_ok() {
let (tx2, rx2) = oneshot::channel::<u32>();

let t1 = thread::spawn(|| {
let f = f1.map_err(|_| ()).map(|x| *x).select(rx2.map_err(|_| ()));
let f = future::try_select(f1.map_err(|_| ()), rx2.map_err(|_| ()));
drop(block_on(f));
});

let (tx3, rx3) = oneshot::channel::<u32>();

let t2 = thread::spawn(|| {
let _ = block_on(f2.map(|x| tx3.send(*x).unwrap()).map_err(|_| ()));
let _ = block_on(f2.map_ok(|x| tx3.send(x).unwrap()).map_err(|_| ()));
});

tx2.send(11).unwrap(); // cancel `f1`
Expand All @@ -68,7 +67,7 @@ fn drop_on_one_task_ok() {
let result = block_on(rx3).unwrap();
assert_eq!(result, 42);
t2.join().unwrap();
}*/
}

#[test]
fn drop_in_poll() {
Expand Down