From 5f329b9d91d58e0e9223fd2a389934d823ee46be Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Mon, 20 May 2019 00:50:19 +0900 Subject: [PATCH 1/2] Add future::try_select --- futures-util/src/try_future/mod.rs | 49 +------------- futures-util/src/try_future/try_select.rs | 80 +++++++++++++++++++++++ futures/src/lib.rs | 1 + 3 files changed, 83 insertions(+), 47 deletions(-) create mode 100644 futures-util/src/try_future/try_select.rs diff --git a/futures-util/src/try_future/mod.rs b/futures-util/src/try_future/mod.rs index b5fc31143b..37c8ab5a5e 100644 --- a/futures-util/src/try_future/mod.rs +++ b/futures-util/src/try_future/mod.rs @@ -21,9 +21,8 @@ mod try_join_all; #[cfg(feature = "alloc")] pub use self::try_join_all::{try_join_all, TryJoinAll}; -// TODO -// mod try_select; -// pub use self::try_select::{try_select, TrySelect}; +mod try_select; +pub use self::try_select::{try_select, TrySelect}; #[cfg(feature = "alloc")] mod select_ok; @@ -319,50 +318,6 @@ pub trait TryFutureExt: TryFuture { OrElse::new(self, f) } - /* TODO - /// Waits for either one of two differently-typed futures to complete. - /// - /// This function will return a new future which awaits for either this or - /// the `other` future to complete. The returned future will finish with - /// both the value resolved and a future representing the completion of the - /// other work. - /// - /// Note that this function consumes the receiving futures and returns a - /// wrapped version of them. - /// - /// Also note that if both this and the second future have the same - /// success/error type you can use the `Either::split` method to - /// conveniently extract out the value at the end. - /// - /// # Examples - /// - /// ``` - /// use futures::future::{self, Either}; - /// - /// // A poor-man's join implemented on top of select - /// - /// fn join(a: A, b: B) -> Box> - /// where A: Future + 'static, - /// B: Future + 'static, - /// E: 'static, - /// { - /// Box::new(a.select(b).then(|res| -> Box> { - /// match res { - /// Ok(Either::Left((x, b))) => Box::new(b.map(move |y| (x, y))), - /// Ok(Either::Right((y, a))) => Box::new(a.map(move |x| (x, y))), - /// Err(Either::Left((e, _))) => Box::new(future::err(e)), - /// Err(Either::Right((e, _))) => Box::new(future::err(e)), - /// } - /// })) - /// }} - /// ``` - fn select(self, other: B) -> Select - where B: IntoFuture, Self: Sized - { - select::new(self, other.into_future()) - } -*/ - /// Unwraps this future's ouput, producing a future with this future's /// [`Ok`](TryFuture::Ok) type as its /// [`Output`](std::future::Future::Output) type. diff --git a/futures-util/src/try_future/try_select.rs b/futures-util/src/try_future/try_select.rs new file mode 100644 index 0000000000..4f55fb36c1 --- /dev/null +++ b/futures-util/src/try_future/try_select.rs @@ -0,0 +1,80 @@ +use core::pin::Pin; +use futures_core::future::{Future, TryFuture}; +use futures_core::task::{Context, Poll}; +use crate::future::Either; + +/// Future for the [`try_select()`] function. +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] +pub struct TrySelect { + inner: Option<(A, B)>, +} + +impl Unpin for TrySelect {} + +/// Waits for either one of two differently-typed futures to complete. +/// +/// This function will return a new future which awaits for either one of both +/// futures to complete. The returned future will finish with both the value +/// resolved and a future representing the completion of the other work. +/// +/// Note that this function consumes the receiving futures and returns a +/// wrapped version of them. +/// +/// Also note that if both this and the second future have the same +/// success/error type you can use the `Either::factor_first` method to +/// conveniently extract out the value at the end. +/// +/// # Examples +/// +/// ``` +/// use futures::future::{self, Either, Future, FutureExt, TryFuture, TryFutureExt}; +/// +/// // A poor-man's try_join implemented on top of select +/// +/// fn try_join(a: A, b: B) -> impl TryFuture +/// where A: TryFuture + Unpin + 'static, +/// B: TryFuture + Unpin + 'static, +/// E: 'static, +/// { +/// future::try_select(a, b).then(|res| -> Box> + Unpin> { +/// match res { +/// Ok(Either::Left((x, b))) => Box::new(b.map_ok(move |y| (x, y))), +/// Ok(Either::Right((y, a))) => Box::new(a.map_ok(move |x| (x, y))), +/// Err(Either::Left((e, _))) => Box::new(future::err(e)), +/// Err(Either::Right((e, _))) => Box::new(future::err(e)), +/// } +/// }) +/// } +/// ``` +pub fn try_select(future1: A, future2: B) -> TrySelect + where A: TryFuture + Unpin, B: TryFuture + Unpin +{ + TrySelect { inner: Some((future1, future2)) } +} + +impl Future for TrySelect + where A: TryFuture, B: TryFuture +{ + #[allow(clippy::type_complexity)] + type Output = Result< + Either<(A::Ok, B), (B::Ok, A)>, + Either<(A::Error, B), (B::Error, A)>, + >; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); + match Pin::new(&mut a).try_poll(cx) { + Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Left((x, b)))), + Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Left((x, b)))), + Poll::Pending => match Pin::new(&mut b).try_poll(cx) { + Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Right((x, a)))), + Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Right((x, a)))), + Poll::Pending => { + self.inner = Some((a, b)); + Poll::Pending + } + } + } + } +} diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 3602379992..d290e93e4a 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -247,6 +247,7 @@ pub mod future { pub use futures_util::try_future::{ try_join, try_join3, try_join4, try_join5, TryJoin, TryJoin3, TryJoin4, TryJoin5, + try_select, TrySelect, TryFutureExt, AndThen, ErrInto, FlattenSink, IntoFuture, MapErr, MapOk, OrElse, From fa63a5485257b8e0338d064c30f6c8cd9544c0b5 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Mon, 20 May 2019 16:00:15 +0900 Subject: [PATCH 2/2] Re-add tests that needed future::[try_]select --- futures/tests/futures_ordered.rs | 37 +++++++++++++++--------------- futures/tests/futures_unordered.rs | 28 +++++++++++----------- futures/tests/shared.rs | 9 ++++---- 3 files changed, 36 insertions(+), 38 deletions(-) diff --git a/futures/tests/futures_ordered.rs b/futures/tests/futures_ordered.rs index 7828149d84..d06b62f76c 100644 --- a/futures/tests/futures_ordered.rs +++ b/futures/tests/futures_ordered.rs @@ -1,8 +1,9 @@ use futures::channel::oneshot; use futures::executor::{block_on, block_on_stream}; -use futures::future::{self, join, FutureExt}; +use futures::future::{self, join, Future, FutureExt, TryFutureExt}; use futures::stream::{StreamExt, FuturesOrdered}; use futures_test::task::noop_context; +use std::any::Any; #[test] fn works_1() { @@ -56,27 +57,27 @@ fn from_iterator() { assert_eq!(block_on(stream.collect::>()), vec![1,2,3]); } -/* ToDo: This requires FutureExt::select to be implemented #[test] fn queue_never_unblocked() { - let (_a_tx, a_rx) = oneshot::channel::>(); - let (b_tx, b_rx) = oneshot::channel::>(); - let (c_tx, c_rx) = oneshot::channel::>(); + let (_a_tx, a_rx) = oneshot::channel::>(); + let (b_tx, b_rx) = oneshot::channel::>(); + let (c_tx, c_rx) = oneshot::channel::>(); let mut stream = vec![ - Box::new(a_rx) as Box>, - Box::new(b_rx.select(c_rx).then(|res| Ok(Box::new(res) as Box))) as _, + Box::new(a_rx) as Box + Unpin>, + Box::new(future::try_select(b_rx, c_rx) + .map_err(|e| e.factor_first().0) + .and_then(|e| future::ok(Box::new(e) as Box))) as _, ].into_iter().collect::>(); - with_no_spawn_context(|cx| { - for _ in 0..10 { - assert!(stream.poll_next(cx).unwrap().is_pending()); - } + let cx = &mut noop_context(); + for _ in 0..10 { + assert!(stream.poll_next_unpin(cx).is_pending()); + } - b_tx.send(Box::new(())).unwrap(); - assert!(stream.poll_next(cx).unwrap().is_pending()); - c_tx.send(Box::new(())).unwrap(); - assert!(stream.poll_next(cx).unwrap().is_pending()); - assert!(stream.poll_next(cx).unwrap().is_pending()); - }) -}*/ + b_tx.send(Box::new(())).unwrap(); + assert!(stream.poll_next_unpin(cx).is_pending()); + c_tx.send(Box::new(())).unwrap(); + assert!(stream.poll_next_unpin(cx).is_pending()); + assert!(stream.poll_next_unpin(cx).is_pending()); +} diff --git a/futures/tests/futures_unordered.rs b/futures/tests/futures_unordered.rs index b0d7c57387..bd7c48ba2e 100644 --- a/futures/tests/futures_unordered.rs +++ b/futures/tests/futures_unordered.rs @@ -1,6 +1,6 @@ use futures::channel::oneshot; use futures::executor::{block_on, block_on_stream}; -use futures::future::{self, join, FutureExt}; +use futures::future::{self, join, Future, FutureExt}; use futures::stream::{StreamExt, FuturesUnordered}; use futures::task::Poll; use futures_test::{assert_stream_done, assert_stream_next}; @@ -57,7 +57,6 @@ fn from_iterator() { assert_eq!(block_on(stream.collect::>()), vec![1,2,3]); } -/* ToDo: This requires FutureExt::select to be implemented #[test] fn finished_future() { let (_a_tx, a_rx) = oneshot::channel::(); @@ -65,22 +64,21 @@ fn finished_future() { let (c_tx, c_rx) = oneshot::channel::(); let mut stream = vec![ - a_rx.boxed(), - b_rx.select(c_rx).boxed(), + Box::new(a_rx) as Box> + Unpin>, + Box::new(future::select(b_rx, c_rx).map(|e| e.factor_first().0)) as _, ].into_iter().collect::>(); - support::with_noop_waker_context(f)(|cx| { - for _ in 0..10 { - assert!(stream.poll_next_unpin(cx).is_pending()); - } - - b_tx.send(12).unwrap(); - assert!(stream.poll_next_unpin(cx).is_ready()); - c_tx.send(3).unwrap(); - assert!(stream.poll_next_unpin(cx).is_pending()); + let cx = &mut noop_context(); + for _ in 0..10 { assert!(stream.poll_next_unpin(cx).is_pending()); - }) -}*/ + } + + b_tx.send(12).unwrap(); + c_tx.send(3).unwrap(); + assert!(stream.poll_next_unpin(cx).is_ready()); + assert!(stream.poll_next_unpin(cx).is_pending()); + assert!(stream.poll_next_unpin(cx).is_pending()); +} #[test] fn iter_mut_cancel() { diff --git a/futures/tests/shared.rs b/futures/tests/shared.rs index 4e97265198..8402bfe10b 100644 --- a/futures/tests/shared.rs +++ b/futures/tests/shared.rs @@ -1,6 +1,6 @@ use futures::channel::oneshot; use futures::executor::{block_on, LocalPool}; -use futures::future::{self, FutureExt, LocalFutureObj}; +use futures::future::{self, FutureExt, TryFutureExt, LocalFutureObj}; use futures::task::LocalSpawn; use std::cell::{Cell, RefCell}; use std::rc::Rc; @@ -41,7 +41,6 @@ fn many_threads() { send_shared_oneshot_and_wait_on_multiple_threads(1000); } -/* ToDo: This requires FutureExt::select to be implemented #[test] fn drop_on_one_task_ok() { let (tx, rx) = oneshot::channel::(); @@ -51,14 +50,14 @@ fn drop_on_one_task_ok() { let (tx2, rx2) = oneshot::channel::(); let t1 = thread::spawn(|| { - let f = f1.map_err(|_| ()).map(|x| *x).select(rx2.map_err(|_| ())); + let f = future::try_select(f1.map_err(|_| ()), rx2.map_err(|_| ())); drop(block_on(f)); }); let (tx3, rx3) = oneshot::channel::(); let t2 = thread::spawn(|| { - let _ = block_on(f2.map(|x| tx3.send(*x).unwrap()).map_err(|_| ())); + let _ = block_on(f2.map_ok(|x| tx3.send(x).unwrap()).map_err(|_| ())); }); tx2.send(11).unwrap(); // cancel `f1` @@ -68,7 +67,7 @@ fn drop_on_one_task_ok() { let result = block_on(rx3).unwrap(); assert_eq!(result, 42); t2.join().unwrap(); -}*/ +} #[test] fn drop_in_poll() {