From c5bbcad038001a76cbd1102e9aea41cc5010e3c2 Mon Sep 17 00:00:00 2001 From: Aaron Turon Date: Thu, 19 Apr 2018 10:11:25 -0700 Subject: [PATCH] Convert basic futures combinators to futures-core 0.3 This commit includes the introduction of a `FutureResult` trait along the lines of the RFC (with some slight revisions). It tackles all the "basic" combinators, but leaves off `shared`, `select`, and `join` for the moment. --- Cargo.toml | 2 +- futures-util/Cargo.toml | 22 +- futures-util/src/future/and_then.rs | 40 -- futures-util/src/future/catch_unwind.rs | 29 +- futures-util/src/future/chain.rs | 72 +-- futures-util/src/future/empty.rs | 18 +- futures-util/src/future/err_into.rs | 38 -- futures-util/src/future/flatten.rs | 25 +- futures-util/src/future/flatten_stream.rs | 71 ++- futures-util/src/future/fuse.rs | 31 +- futures-util/src/future/inspect.rs | 31 +- futures-util/src/future/inspect_err.rs | 41 -- futures-util/src/future/into_stream.rs | 30 +- futures-util/src/future/lazy.rs | 68 +-- futures-util/src/future/loop_fn.rs | 105 ----- futures-util/src/future/map.rs | 25 +- futures-util/src/future/map_err.rs | 37 -- futures-util/src/future/mod.rs | 515 ++++----------------- futures-util/src/future/or_else.rs | 40 -- futures-util/src/future/poll_fn.rs | 16 +- futures-util/src/future/recover.rs | 35 -- futures-util/src/future/then.rs | 23 +- futures-util/src/future/with_executor.rs | 12 +- futures-util/src/future_result/and_then.rs | 67 +++ futures-util/src/future_result/err_into.rs | 40 ++ futures-util/src/future_result/map_err.rs | 42 ++ futures-util/src/future_result/map_ok.rs | 42 ++ futures-util/src/future_result/mod.rs | 455 ++++++++++++++++++ futures-util/src/future_result/or_else.rs | 67 +++ futures-util/src/future_result/recover.rs | 37 ++ futures-util/src/lib.rs | 43 +- futures-util/src/lock.rs | 13 +- 32 files changed, 1081 insertions(+), 1051 deletions(-) delete mode 100644 futures-util/src/future/and_then.rs delete mode 100644 futures-util/src/future/err_into.rs delete mode 100644 futures-util/src/future/inspect_err.rs delete mode 100644 futures-util/src/future/loop_fn.rs delete mode 100644 futures-util/src/future/map_err.rs delete mode 100644 futures-util/src/future/or_else.rs delete mode 100644 futures-util/src/future/recover.rs create mode 100644 futures-util/src/future_result/and_then.rs create mode 100644 futures-util/src/future_result/err_into.rs create mode 100644 futures-util/src/future_result/map_err.rs create mode 100644 futures-util/src/future_result/map_ok.rs create mode 100644 futures-util/src/future_result/mod.rs create mode 100644 futures-util/src/future_result/or_else.rs create mode 100644 futures-util/src/future_result/recover.rs diff --git a/Cargo.toml b/Cargo.toml index f377d72994..2d26b6d00f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,5 +10,5 @@ members = [ # "futures-macro-await", # "futures-sink", # "futures-stable", -# "futures-util", + "futures-util", ] diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 31eab056f0..510f72cd32 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "futures-util" -version = "0.2.0" +version = "0.3.0-alpha" authors = ["Alex Crichton "] license = "MIT/Apache-2.0" repository = "https://github.com/rust-lang-nursery/futures-rs" @@ -11,18 +11,20 @@ Common utilities and extension traits for the futures-rs library. """ [features] -std = ["futures-core/std", "futures-io/std", "futures-sink/std", "either/use_std"] -default = ["std", "futures-core/either", "futures-sink/either"] +# std = ["futures-core/std", "futures-io/std", "futures-sink/std", "either/use_std"] +std = ["futures-core/std", "either/use_std"] +# default = ["std", "futures-core/either", "futures-sink/either"] +default = ["std", "futures-core/either"] bench = [] [dependencies] -futures-core = { path = "../futures-core", version = "0.2.0", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.2.0", default-features = false } -futures-io = { path = "../futures-io", version = "0.2.0", default-features = false } -futures-sink = { path = "../futures-sink", version = "0.2.0", default-features = false} +futures-core = { path = "../futures-core", version = "0.3.0-alpha", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.0-alpha", default-features = false } +# futures-io = { path = "../futures-io", version = "0.2.0", default-features = false } +# futures-sink = { path = "../futures-sink", version = "0.2.0", default-features = false} either = { version = "1.4", default-features = false } [dev-dependencies] -futures = { path = "../futures", version = "0.2.0" } -futures-executor = { path = "../futures-executor", version = "0.2.0" } -futures-channel = { path = "../futures-channel", version = "0.2.0" } +# futures = { path = "../futures", version = "0.2.0" } +# futures-executor = { path = "../futures-executor", version = "0.2.0" } +# futures-channel = { path = "../futures-channel", version = "0.2.0" } diff --git a/futures-util/src/future/and_then.rs b/futures-util/src/future/and_then.rs deleted file mode 100644 index f0f0d09933..0000000000 --- a/futures-util/src/future/and_then.rs +++ /dev/null @@ -1,40 +0,0 @@ -use futures_core::{Future, IntoFuture, Poll}; -use futures_core::task; - -use super::chain::Chain; - -/// Future for the `and_then` combinator, chaining a computation onto the end of -/// another future which completes successfully. -/// -/// This is created by the `Future::and_then` method. -#[derive(Debug)] -#[must_use = "futures do nothing unless polled"] -pub struct AndThen where A: Future, B: IntoFuture { - state: Chain, -} - -pub fn new(future: A, f: F) -> AndThen - where A: Future, - B: IntoFuture, -{ - AndThen { - state: Chain::new(future, f), - } -} - -impl Future for AndThen - where A: Future, - B: IntoFuture, - F: FnOnce(A::Item) -> B, -{ - type Item = B::Item; - type Error = B::Error; - - fn poll(&mut self, cx: &mut task::Context) -> Poll { - self.state.poll(cx, |result, f| { - result.map(|e| { - Err(f(e).into_future()) - }) - }) - } -} diff --git a/futures-util/src/future/catch_unwind.rs b/futures-util/src/future/catch_unwind.rs index 58d29db3fa..c6a5800d5b 100644 --- a/futures-util/src/future/catch_unwind.rs +++ b/futures-util/src/future/catch_unwind.rs @@ -1,8 +1,9 @@ +use std::mem::Pin; use std::prelude::v1::*; use std::any::Any; use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe}; -use futures_core::{Future, Poll, Async}; +use futures_core::{Future, Poll}; use futures_core::task; /// Future for the `catch_unwind` combinator. @@ -11,35 +12,25 @@ use futures_core::task; #[derive(Debug)] #[must_use = "futures do nothing unless polled"] pub struct CatchUnwind where F: Future { - future: Option, + future: F, } pub fn new(future: F) -> CatchUnwind where F: Future + UnwindSafe, { - CatchUnwind { - future: Some(future), - } + CatchUnwind { future } } impl Future for CatchUnwind where F: Future + UnwindSafe, { - type Item = Result; - type Error = Box; + type Output = Result>; - fn poll(&mut self, cx: &mut task::Context) -> Poll { - let mut future = self.future.take().expect("cannot poll twice"); - let (res, future) = catch_unwind(AssertUnwindSafe(|| { - (future.poll(cx), future) - }))?; - match res { - Ok(Async::Pending) => { - self.future = Some(future); - Ok(Async::Pending) - } - Ok(Async::Ready(t)) => Ok(Async::Ready(Ok(t))), - Err(e) => Ok(Async::Ready(Err(e))), + fn poll(mut self: Pin, cx: &mut task::Context) -> Poll { + let fut = unsafe { pinned_field!(self, future) }; + match catch_unwind(AssertUnwindSafe(|| fut.poll(cx))) { + Ok(res) => res.map(Ok), + Err(e) => Poll::Ready(Err(e)) } } } diff --git a/futures-util/src/future/chain.rs b/futures-util/src/future/chain.rs index e84d04cb54..faa25d3110 100644 --- a/futures-util/src/future/chain.rs +++ b/futures-util/src/future/chain.rs @@ -1,49 +1,53 @@ -use core::mem; +use core::mem::Pin; -use futures_core::{Future, Poll, Async}; +use futures_core::{Future, Poll}; use futures_core::task; #[must_use = "futures do nothing unless polled"] #[derive(Debug)] -pub enum Chain where A: Future { - First(A, C), - Second(B), - Done, +pub enum Chain { + First(Fut1, Option), + Second(Fut2), } -impl Chain - where A: Future, - B: Future, +impl Chain + where Fut1: Future, + Fut2: Future, { - pub fn new(a: A, c: C) -> Chain { - Chain::First(a, c) + pub fn new(fut1: Fut1, data: Data) -> Chain { + Chain::First(fut1, Some(data)) } - pub fn poll(&mut self, cx: &mut task::Context, f: F) -> Poll - where F: FnOnce(Result, C) - -> Result, B::Error>, + pub fn poll(mut self: Pin, cx: &mut task::Context, f: F) -> Poll + where F: FnOnce(Fut1::Output, Data) -> Fut2, { - let a_result = match *self { - Chain::First(ref mut a, _) => { - match a.poll(cx) { - Ok(Async::Pending) => return Ok(Async::Pending), - Ok(Async::Ready(t)) => Ok(t), - Err(e) => Err(e), + let mut f = Some(f); + + loop { + // safe to `get_mut` here because we don't move out + let fut2 = match *unsafe { Pin::get_mut(&mut self) } { + Chain::First(ref mut fut1, ref mut data) => { + // safe to create a new `Pin` because `fut1` will never move + // before it's dropped. + match unsafe { Pin::new_unchecked(fut1) }.poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(t) => { + (f.take().unwrap())(t, data.take().unwrap()) + } + } } - } - Chain::Second(ref mut b) => return b.poll(cx), - Chain::Done => panic!("cannot poll a chained future twice"), - }; - let data = match mem::replace(self, Chain::Done) { - Chain::First(_, c) => c, - _ => panic!(), - }; - match f(a_result, data)? { - Ok(e) => Ok(Async::Ready(e)), - Err(mut b) => { - let ret = b.poll(cx); - *self = Chain::Second(b); - ret + Chain::Second(ref mut fut2) => { + // safe to create a new `Pin` because `fut2` will never move + // before it's dropped; once we're in `Chain::Second` we stay + // there forever. + return unsafe { Pin::new_unchecked(fut2) }.poll(cx) + } + }; + + // safe because we're using the `&mut` to do an assignment, not for moving out + unsafe { + // note: it's safe to move the `fut2` here because we haven't yet polled it + *Pin::get_mut(&mut self) = Chain::Second(fut2); } } } diff --git a/futures-util/src/future/empty.rs b/futures-util/src/future/empty.rs index b26c1500f9..c79f87d6da 100644 --- a/futures-util/src/future/empty.rs +++ b/futures-util/src/future/empty.rs @@ -1,8 +1,9 @@ //! Definition of the Empty combinator, a future that's never ready. +use core::mem::Pin; use core::marker; -use futures_core::{Future, Poll, Async}; +use futures_core::{Future, Poll}; use futures_core::task; /// A future which is never resolved. @@ -10,23 +11,22 @@ use futures_core::task; /// This future can be created with the `empty` function. #[derive(Debug)] #[must_use = "futures do nothing unless polled"] -pub struct Empty { - _data: marker::PhantomData<(T, E)>, +pub struct Empty { + _data: marker::PhantomData, } /// Creates a future which never resolves, representing a computation that never /// finishes. /// /// The returned future will forever return `Async::Pending`. -pub fn empty() -> Empty { +pub fn empty() -> Empty { Empty { _data: marker::PhantomData } } -impl Future for Empty { - type Item = T; - type Error = E; +impl Future for Empty { + type Output = T; - fn poll(&mut self, _: &mut task::Context) -> Poll { - Ok(Async::Pending) + fn poll(self: Pin, _: &mut task::Context) -> Poll { + Poll::Pending } } diff --git a/futures-util/src/future/err_into.rs b/futures-util/src/future/err_into.rs deleted file mode 100644 index b47ca8b466..0000000000 --- a/futures-util/src/future/err_into.rs +++ /dev/null @@ -1,38 +0,0 @@ -use core::marker::PhantomData; - -use futures_core::{Future, Poll, Async}; -use futures_core::task; - -/// Future for the `err_into` combinator, changing the error type of a future. -/// -/// This is created by the `Future::err_into` method. -#[derive(Debug)] -#[must_use = "futures do nothing unless polled"] -pub struct ErrInto where A: Future { - future: A, - f: PhantomData -} - -pub fn new(future: A) -> ErrInto - where A: Future -{ - ErrInto { - future: future, - f: PhantomData - } -} - -impl Future for ErrInto - where A::Error: Into -{ - type Item = A::Item; - type Error = E; - - fn poll(&mut self, cx: &mut task::Context) -> Poll { - let e = match self.future.poll(cx) { - Ok(Async::Pending) => return Ok(Async::Pending), - other => other, - }; - e.map_err(Into::into) - } -} diff --git a/futures-util/src/future/flatten.rs b/futures-util/src/future/flatten.rs index 68f1beaafa..9481f4d4d4 100644 --- a/futures-util/src/future/flatten.rs +++ b/futures-util/src/future/flatten.rs @@ -1,6 +1,7 @@ use core::fmt; +use core::mem::Pin; -use futures_core::{Future, IntoFuture, Poll}; +use futures_core::{Future, Poll}; use futures_core::task; use super::chain::Chain; @@ -11,14 +12,13 @@ use super::chain::Chain; /// /// This is created by the `Future::flatten` method. #[must_use = "futures do nothing unless polled"] -pub struct Flatten where A: Future, A::Item: IntoFuture { - state: Chain::Future, ()>, +pub struct Flatten where A: Future, A::Output: Future { + state: Chain, } impl fmt::Debug for Flatten where A: Future + fmt::Debug, - A::Item: IntoFuture, - <::Item as IntoFuture>::Future: fmt::Debug, + A::Output: Future + fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("Flatten") @@ -29,7 +29,7 @@ impl fmt::Debug for Flatten pub fn new(future: A) -> Flatten where A: Future, - A::Item: IntoFuture, + A::Output: Future, { Flatten { state: Chain::new(future, ()), @@ -38,16 +38,11 @@ pub fn new(future: A) -> Flatten impl Future for Flatten where A: Future, - A::Item: IntoFuture, - <::Item as IntoFuture>::Error: From<::Error> + A::Output: Future, { - type Item = <::Item as IntoFuture>::Item; - type Error = <::Item as IntoFuture>::Error; + type Output = ::Output; - fn poll(&mut self, cx: &mut task::Context) -> Poll { - self.state.poll(cx, |a, ()| { - let future = a?.into_future(); - Ok(Err(future)) - }) + fn poll(mut self: Pin, cx: &mut task::Context) -> Poll { + unsafe { pinned_field!(self, state) }.poll(cx, |a, ()| a) } } diff --git a/futures-util/src/future/flatten_stream.rs b/futures-util/src/future/flatten_stream.rs index 5b7c1c2e2d..b6d71bac67 100644 --- a/futures-util/src/future/flatten_stream.rs +++ b/futures-util/src/future/flatten_stream.rs @@ -1,6 +1,7 @@ use core::fmt; +use core::mem::Pin; -use futures_core::{Async, Future, Poll, Stream}; +use futures_core::{Future, Poll, Stream}; use futures_core::task; /// Future for the `flatten_stream` combinator, flattening a @@ -8,16 +9,13 @@ use futures_core::task; /// /// This is created by the `Future::flatten_stream` method. #[must_use = "streams do nothing unless polled"] -pub struct FlattenStream - where F: Future, - ::Item: Stream, -{ +pub struct FlattenStream { state: State } impl fmt::Debug for FlattenStream where F: Future + fmt::Debug, - ::Item: Stream + fmt::Debug, + F::Output: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("FlattenStream") @@ -26,75 +24,58 @@ impl fmt::Debug for FlattenStream } } -pub fn new(f: F) -> FlattenStream - where F: Future, - ::Item: Stream, -{ +pub fn new(f: F) -> FlattenStream { FlattenStream { state: State::Future(f) } } #[derive(Debug)] -enum State - where F: Future, - ::Item: Stream, -{ +enum State { // future is not yet called or called and not ready Future(F), // future resolved to Stream - Stream(F::Item), - // EOF after future resolved to error - Eof, - // after EOF after future resolved to error - Done, + Stream(F::Output), } impl Stream for FlattenStream where F: Future, - ::Item: Stream, + F::Output: Stream, { - type Item = ::Item; - type Error = ::Error; + type Item = ::Item; - fn poll_next(&mut self, cx: &mut task::Context) -> Poll, Self::Error> { + fn poll_next(mut self: Pin, cx: &mut task::Context) -> Poll> { loop { - let (next_state, ret_opt) = match self.state { + // safety: data is never moved via the resulting &mut reference + let stream = match unsafe { Pin::get_mut(&mut self) }.state { State::Future(ref mut f) => { - match f.poll(cx) { - Ok(Async::Pending) => { + // safety: the future we're re-pinning here will never be moved; + // it will just be polled, then dropped in place + match unsafe { Pin::new_unchecked(f) }.poll(cx) { + Poll::Pending => { // State is not changed, early return. - return Ok(Async::Pending) + return Poll::Pending }, - Ok(Async::Ready(stream)) => { + Poll::Ready(stream) => { // Future resolved to stream. // We do not return, but poll that // stream in the next loop iteration. - (State::Stream(stream), None) - } - Err(e) => { - (State::Eof, Some(Err(e))) + stream } } } State::Stream(ref mut s) => { - // Just forward call to the stream, - // do not track its state. - return s.poll_next(cx); - } - State::Eof => { - (State::Done, Some(Ok(Async::Ready(None)))) - } - State::Done => { - panic!("poll called after eof"); + // safety: the stream we're repinning here will never be moved; + // it will just be polled, then dropped in place + return unsafe { Pin::new_unchecked(s) }.poll_next(cx); } }; - self.state = next_state; - if let Some(ret) = ret_opt { - return ret; + unsafe { + // safety: we use the &mut only for an assignment, which causes + // only an in-place drop + Pin::get_mut(&mut self).state = State::Stream(stream); } } } } - diff --git a/futures-util/src/future/fuse.rs b/futures-util/src/future/fuse.rs index 25caa3f3ee..b98c321549 100644 --- a/futures-util/src/future/fuse.rs +++ b/futures-util/src/future/fuse.rs @@ -1,4 +1,6 @@ -use futures_core::{Future, Poll, Async}; +use core::mem::Pin; + +use futures_core::{Future, Poll}; use futures_core::task; /// A future which "fuses" a future once it's been resolved. @@ -21,18 +23,23 @@ pub fn new(f: A) -> Fuse { } impl Future for Fuse { - type Item = A::Item; - type Error = A::Error; + type Output = A::Output; - fn poll(&mut self, cx: &mut task::Context) -> Poll { - let res = self.future.as_mut().map(|f| f.poll(cx)); - match res.unwrap_or(Ok(Async::Pending)) { - res @ Ok(Async::Ready(_)) | - res @ Err(_) => { - self.future = None; - res + fn poll(mut self: Pin, cx: &mut task::Context) -> Poll { + // safety: we use this &mut only for matching, not for movement + let v = match unsafe { Pin::get_mut(&mut self) }.future { + Some(ref mut fut) => { + // safety: this re-pinned future will never move before being dropped + match unsafe { Pin::new_unchecked(fut) }.poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(v) => v + } } - Ok(Async::Pending) => Ok(Async::Pending) - } + None => return Poll::Pending, + }; + + // safety: we use this &mut only for a replacement, which drops the future in place + unsafe { Pin::get_mut(&mut self) }.future = None; + Poll::Ready(v) } } diff --git a/futures-util/src/future/inspect.rs b/futures-util/src/future/inspect.rs index d0e1ba6416..f348734cff 100644 --- a/futures-util/src/future/inspect.rs +++ b/futures-util/src/future/inspect.rs @@ -1,4 +1,6 @@ -use futures_core::{Future, Poll, Async}; +use core::mem::Pin; + +use futures_core::{Future, Poll}; use futures_core::task; /// Do something with the item of a future, passing it on. @@ -13,7 +15,7 @@ pub struct Inspect where A: Future { pub fn new(future: A, f: F) -> Inspect where A: Future, - F: FnOnce(&A::Item), + F: FnOnce(&A::Output), { Inspect { future: future, @@ -23,19 +25,20 @@ pub fn new(future: A, f: F) -> Inspect impl Future for Inspect where A: Future, - F: FnOnce(&A::Item), + F: FnOnce(&A::Output), { - type Item = A::Item; - type Error = A::Error; + type Output = A::Output; + + fn poll(mut self: Pin, cx: &mut task::Context) -> Poll { + let e = match unsafe { pinned_field!(self, future) }.poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(e) => e, + }; - fn poll(&mut self, cx: &mut task::Context) -> Poll { - match self.future.poll(cx) { - Ok(Async::Pending) => Ok(Async::Pending), - Ok(Async::Ready(e)) => { - (self.f.take().expect("cannot poll Inspect twice"))(&e); - Ok(Async::Ready(e)) - }, - Err(e) => Err(e), - } + let f = unsafe { + Pin::get_mut(&mut self).f.take().expect("cannot poll Inspect twice") + }; + f(&e); + Poll::Ready(e) } } diff --git a/futures-util/src/future/inspect_err.rs b/futures-util/src/future/inspect_err.rs deleted file mode 100644 index ea0c312af5..0000000000 --- a/futures-util/src/future/inspect_err.rs +++ /dev/null @@ -1,41 +0,0 @@ -use futures_core::{Future, Poll, Async}; -use futures_core::task; - -/// Do something with the error of a future, passing it on. -/// -/// This is created by the [`FutureExt::inspect_err`] method. -#[derive(Debug)] -#[must_use = "futures do nothing unless polled"] -pub struct InspectErr where A: Future { - future: A, - f: Option, -} - -pub fn new(future: A, f: F) -> InspectErr - where A: Future, - F: FnOnce(&A::Error), -{ - InspectErr { - future: future, - f: Some(f), - } -} - -impl Future for InspectErr - where A: Future, - F: FnOnce(&A::Error), -{ - type Item = A::Item; - type Error = A::Error; - - fn poll(&mut self, cx: &mut task::Context) -> Poll { - match self.future.poll(cx) { - Ok(Async::Pending) => Ok(Async::Pending), - Ok(Async::Ready(e)) => Ok(Async::Ready(e)), - Err(e) => { - (self.f.take().expect("cannot poll InspectErr twice"))(&e); - Err(e) - }, - } - } -} diff --git a/futures-util/src/future/into_stream.rs b/futures-util/src/future/into_stream.rs index bdb0e82371..57779fed51 100644 --- a/futures-util/src/future/into_stream.rs +++ b/futures-util/src/future/into_stream.rs @@ -1,4 +1,6 @@ -use futures_core::{Async, Poll, Future, Stream}; +use core::mem::Pin; + +use futures_core::{Poll, Future, Stream}; use futures_core::task; /// A type which converts a `Future` into a `Stream` @@ -16,21 +18,23 @@ pub fn new(future: F) -> IntoStream { } impl Stream for IntoStream { - type Item = F::Item; - type Error = F::Error; + type Item = F::Output; - fn poll_next(&mut self, cx: &mut task::Context) -> Poll, Self::Error> { - let ret = match self.future { - None => return Ok(Async::Ready(None)), - Some(ref mut future) => { - match future.poll(cx) { - Ok(Async::Pending) => return Ok(Async::Pending), - Err(e) => Err(e), - Ok(Async::Ready(r)) => Ok(r), + fn poll_next(mut self: Pin, cx: &mut task::Context) -> Poll> { + // safety: we use this &mut only for matching, not for movement + let v = match unsafe { Pin::get_mut(&mut self) }.future { + Some(ref mut fut) => { + // safety: this re-pinned future will never move before being dropped + match unsafe { Pin::new_unchecked(fut) }.poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(v) => v } } + None => return Poll::Ready(None), }; - self.future = None; - ret.map(|r| Async::Ready(Some(r))) + + // safety: we use this &mut only for a replacement, which drops the future in place + unsafe { Pin::get_mut(&mut self) }.future = None; + Poll::Ready(Some(v)) } } diff --git a/futures-util/src/future/lazy.rs b/futures-util/src/future/lazy.rs index 4c1dc0468a..5732363b44 100644 --- a/futures-util/src/future/lazy.rs +++ b/futures-util/src/future/lazy.rs @@ -1,88 +1,56 @@ //! Definition of the Lazy combinator, deferring execution of a function until //! the future is polled. -use core::mem; +use core::mem::Pin; +use core::marker::Unpin; -use futures_core::{Future, IntoFuture, Poll}; +use futures_core::{Future, Poll}; use futures_core::task; -/// A future which defers creation of the actual future until the future -/// is `poll`ed. +/// A future which, when polled, invokes a closure and yields its result. /// /// This is created by the `lazy` function. #[derive(Debug)] #[must_use = "futures do nothing unless polled"] -pub struct Lazy { - inner: _Lazy, +pub struct Lazy { + f: Option } -#[derive(Debug)] -enum _Lazy { - First(F), - Second(R), - Moved, -} +// safe because we never generate `Pin` +unsafe impl Unpin for Lazy {} -/// Creates a new future which will eventually be the same as the one created -/// by the closure provided. +/// Creates a new future from a closure. /// /// The provided closure is only run once the future is polled. -/// Once run, however, this future is the same as the one the closure creates. /// /// # Examples /// /// ``` /// # extern crate futures; /// use futures::prelude::*; -/// use futures::future::{self, FutureResult}; +/// use futures::future; /// /// # fn main() { -/// let a = future::lazy(|_| future::ok::(1)); +/// let a = future::lazy(|_| 1); /// -/// let b = future::lazy(|_| -> FutureResult { +/// let b = future::lazy(|_| -> i32 { /// panic!("oh no!") /// }); /// drop(b); // closure is never run /// # } /// ``` -pub fn lazy(f: F) -> Lazy - where F: FnOnce(&mut task::Context) -> R, - R: IntoFuture -{ - Lazy { - inner: _Lazy::First(f), - } -} - -impl Lazy +pub fn lazy(f: F) -> Lazy where F: FnOnce(&mut task::Context) -> R, - R: IntoFuture, { - fn get(&mut self, cx: &mut task::Context) -> &mut R::Future { - match self.inner { - _Lazy::First(_) => {} - _Lazy::Second(ref mut f) => return f, - _Lazy::Moved => panic!(), // can only happen if `f()` panics - } - match mem::replace(&mut self.inner, _Lazy::Moved) { - _Lazy::First(f) => self.inner = _Lazy::Second(f(cx).into_future()), - _ => panic!(), // we already found First - } - match self.inner { - _Lazy::Second(ref mut f) => f, - _ => panic!(), // we just stored Second - } - } + Lazy { f: Some(f) } } -impl Future for Lazy +impl Future for Lazy where F: FnOnce(&mut task::Context) -> R, - R: IntoFuture, { - type Item = R::Item; - type Error = R::Error; + type Output = R; - fn poll(&mut self, cx: &mut task::Context) -> Poll { - self.get(cx).poll(cx) + fn poll(mut self: Pin, cx: &mut task::Context) -> Poll { + Poll::Ready((self.f.take().unwrap())(cx)) } } diff --git a/futures-util/src/future/loop_fn.rs b/futures-util/src/future/loop_fn.rs deleted file mode 100644 index f628c781eb..0000000000 --- a/futures-util/src/future/loop_fn.rs +++ /dev/null @@ -1,105 +0,0 @@ -//! Definition of the `LoopFn` combinator, implementing `Future` loops. - -use futures_core::{Async, Future, IntoFuture, Poll}; -use futures_core::task; - -/// An enum describing whether to `break` or `continue` a `loop_fn` loop. -#[derive(Debug)] -pub enum Loop { - /// Indicates that the loop has completed with output `T`. - Break(T), - - /// Indicates that the loop function should be called again with input - /// state `S`. - Continue(S), -} - -/// A future implementing a tail-recursive loop. -/// -/// Created by the `loop_fn` function. -#[must_use = "futures do nothing unless polled"] -#[derive(Debug)] -pub struct LoopFn where A: IntoFuture { - future: A::Future, - func: F, -} - -/// Creates a new future implementing a tail-recursive loop. -/// -/// The loop function is immediately called with `initial_state` and should -/// return a value that can be converted to a future. On successful completion, -/// this future should output a `Loop` to indicate the status of the -/// loop. -/// -/// `Loop::Break(T)` halts the loop and completes the future with output `T`. -/// -/// `Loop::Continue(S)` reinvokes the loop function with state `S`. The returned -/// future will be subsequently polled for a new `Loop` value. -/// -/// # Examples -/// -/// ``` -/// # extern crate futures; -/// use futures::prelude::*; -/// use futures::future::{self, ok, loop_fn, Loop, FutureResult}; -/// use futures::never::Never; -/// -/// struct Client { -/// ping_count: u8, -/// } -/// -/// impl Client { -/// fn new() -> Self { -/// Client { ping_count: 0 } -/// } -/// -/// fn send_ping(self) -> FutureResult { -/// ok(Client { ping_count: self.ping_count + 1 }) -/// } -/// -/// fn receive_pong(self) -> FutureResult<(Self, bool), Never> { -/// let done = self.ping_count >= 5; -/// ok((self, done)) -/// } -/// } -/// -/// # fn main() { -/// let ping_til_done = loop_fn(Client::new(), |client| { -/// client.send_ping() -/// .and_then(|client| client.receive_pong()) -/// .and_then(|(client, done)| { -/// if done { -/// Ok(Loop::Break(client)) -/// } else { -/// Ok(Loop::Continue(client)) -/// } -/// }) -/// }); -/// # } -/// ``` -pub fn loop_fn(initial_state: S, mut func: F) -> LoopFn - where F: FnMut(S) -> A, - A: IntoFuture>, -{ - LoopFn { - future: func(initial_state).into_future(), - func: func, - } -} - -impl Future for LoopFn - where F: FnMut(S) -> A, - A: IntoFuture>, -{ - type Item = T; - type Error = A::Error; - - fn poll(&mut self, cx: &mut task::Context) -> Poll { - loop { - match try_ready!(self.future.poll(cx)) { - Loop::Break(x) => return Ok(Async::Ready(x)), - Loop::Continue(s) => self.future = (self.func)(s).into_future(), - } - } - } -} diff --git a/futures-util/src/future/map.rs b/futures-util/src/future/map.rs index 6a1596ad53..3117698f15 100644 --- a/futures-util/src/future/map.rs +++ b/futures-util/src/future/map.rs @@ -1,4 +1,6 @@ -use futures_core::{Future, Poll, Async}; +use core::mem::Pin; + +use futures_core::{Future, Poll}; use futures_core::task; /// Future for the `map` combinator, changing the type of a future. @@ -22,18 +24,19 @@ pub fn new(future: A, f: F) -> Map impl Future for Map where A: Future, - F: FnOnce(A::Item) -> U, + F: FnOnce(A::Output) -> U, { - type Item = U; - type Error = A::Error; + type Output = U; + + fn poll(mut self: Pin, cx: &mut task::Context) -> Poll { + let e = match unsafe { pinned_field!(self, future) }.poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(e) => e, + }; - fn poll(&mut self, cx: &mut task::Context) -> Poll { - let e = match self.future.poll(cx) { - Ok(Async::Pending) => return Ok(Async::Pending), - Ok(Async::Ready(e)) => Ok(e), - Err(e) => Err(e), + let f = unsafe { + Pin::get_mut(&mut self).f.take().expect("cannot poll Map twice") }; - e.map(self.f.take().expect("cannot poll Map twice")) - .map(Async::Ready) + Poll::Ready(f(e)) } } diff --git a/futures-util/src/future/map_err.rs b/futures-util/src/future/map_err.rs deleted file mode 100644 index d46f82a7e8..0000000000 --- a/futures-util/src/future/map_err.rs +++ /dev/null @@ -1,37 +0,0 @@ -use futures_core::{Future, Poll, Async}; -use futures_core::task; - -/// Future for the `map_err` combinator, changing the error type of a future. -/// -/// This is created by the `Future::map_err` method. -#[derive(Debug)] -#[must_use = "futures do nothing unless polled"] -pub struct MapErr where A: Future { - future: A, - f: Option, -} - -pub fn new(future: A, f: F) -> MapErr - where A: Future -{ - MapErr { - future: future, - f: Some(f), - } -} - -impl Future for MapErr - where A: Future, - F: FnOnce(A::Error) -> U, -{ - type Item = A::Item; - type Error = U; - - fn poll(&mut self, cx: &mut task::Context) -> Poll { - let e = match self.future.poll(cx) { - Ok(Async::Pending) => return Ok(Async::Pending), - other => other, - }; - e.map_err(self.f.take().expect("cannot poll MapErr twice")) - } -} diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index 121bd8ff48..0524fe4cf7 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -3,80 +3,69 @@ //! This module contains a number of functions for working with `Future`s, //! including the `FutureExt` trait which adds methods to `Future` types. -use core::result; - -use futures_core::{Future, IntoFuture, Stream}; -use futures_sink::Sink; +use futures_core::{Future, Stream}; +// use futures_sink::Sink; // Primitive futures mod empty; mod lazy; mod poll_fn; -mod loop_fn; pub use self::empty::{empty, Empty}; pub use self::lazy::{lazy, Lazy}; pub use self::poll_fn::{poll_fn, PollFn}; -pub use self::loop_fn::{loop_fn, Loop, LoopFn}; // combinators -mod and_then; mod flatten; -mod flatten_sink; +// mod flatten_sink; mod flatten_stream; mod fuse; mod into_stream; -mod join; +// mod join; mod map; -mod map_err; -mod err_into; -mod or_else; -mod select; +// mod select; mod then; mod inspect; -mod inspect_err; -mod recover; // impl details mod chain; -pub use self::and_then::AndThen; pub use self::flatten::Flatten; -pub use self::flatten_sink::FlattenSink; +// pub use self::flatten_sink::FlattenSink; pub use self::flatten_stream::FlattenStream; pub use self::fuse::Fuse; pub use self::into_stream::IntoStream; -pub use self::join::{Join, Join3, Join4, Join5}; +// pub use self::join::{Join, Join3, Join4, Join5}; pub use self::map::Map; -pub use self::map_err::MapErr; -pub use self::err_into::ErrInto; -pub use self::or_else::OrElse; -pub use self::select::Select; +// pub use self::select::Select; pub use self::then::Then; pub use self::inspect::Inspect; -pub use self::inspect_err::InspectErr; -pub use self::recover::Recover; pub use either::Either; if_std! { mod catch_unwind; + + /* TODO mod join_all; mod select_all; mod select_ok; mod shared; - mod with_executor; - pub use self::catch_unwind::CatchUnwind; + pub use self::join_all::{join_all, JoinAll}; pub use self::select_all::{SelectAll, SelectAllNext, select_all}; pub use self::select_ok::{SelectOk, select_ok}; pub use self::shared::{Shared, SharedItem, SharedError}; + */ + + mod with_executor; + pub use self::catch_unwind::CatchUnwind; pub use self::with_executor::WithExecutor; } impl FutureExt for T where T: Future {} /// An extension trait for `Future`s that provides a variety of convenient -/// combinator functions. +/// adapters. pub trait FutureExt: Future { /// Map this future's result to a different type, returning a new future of /// the resulting type. @@ -85,10 +74,6 @@ pub trait FutureExt: Future { /// it will change the type of the underlying future. This is useful to /// chain along a computation once a future has been resolved. /// - /// The closure provided will only be called if this future is resolved - /// successfully. If this future returns an error, panics, or is dropped, - /// then the closure provided will never be invoked. - /// /// Note that this function consumes the receiving future and returns a /// wrapped version of it, similar to the existing `map` methods in the /// standard library. @@ -103,131 +88,27 @@ pub trait FutureExt: Future { /// use futures_executor::block_on; /// /// # fn main() { - /// let future = future::ok::(1); + /// let future = future::ready(1); /// let new_future = future.map(|x| x + 3); - /// assert_eq!(block_on(new_future), Ok(4)); - /// # } - /// ``` - /// - /// Calling `map` on an errored `Future` has no effect: - /// - /// ``` - /// # extern crate futures; - /// # extern crate futures_executor; - /// use futures::prelude::*; - /// use futures::future; - /// use futures_executor::block_on; - /// - /// # fn main() { - /// let future = future::err::(1); - /// let new_future = future.map(|x| x + 3); - /// assert_eq!(block_on(new_future), Err(1)); + /// assert_eq!(block_on(new_future), 4); /// # } /// ``` fn map(self, f: F) -> Map - where F: FnOnce(Self::Item) -> U, + where F: FnOnce(Self::Output) -> U, Self: Sized, { - assert_future::(map::new(self, f)) - } - - /// Map this future's error to a different error, returning a new future. - /// - /// This function is similar to the `Result::map_err` where it will change - /// the error type of the underlying future. This is useful for example to - /// ensure that futures have the same error type when used with combinators - /// like `select` and `join`. - /// - /// The closure provided will only be called if this future is resolved - /// with an error. If this future returns a success, panics, or is - /// dropped, then the closure provided will never be invoked. - /// - /// Note that this function consumes the receiving future and returns a - /// wrapped version of it. - /// - /// # Examples - /// - /// ``` - /// # extern crate futures; - /// # extern crate futures_executor; - /// use futures::future::err; - /// use futures::prelude::*; - /// use futures_executor::block_on; - /// - /// # fn main() { - /// let future = err::(1); - /// let new_future = future.map_err(|x| x + 3); - /// assert_eq!(block_on(new_future), Err(4)); - /// # } - /// ``` - /// - /// Calling `map_err` on a successful `Future` has no effect: - /// - /// ``` - /// # extern crate futures; - /// # extern crate futures_executor; - /// use futures::future::ok; - /// use futures::prelude::*; - /// use futures_executor::block_on; - /// - /// # fn main() { - /// let future = ok::(1); - /// let new_future = future.map_err(|x| x + 3); - /// assert_eq!(block_on(new_future), Ok(1)); - /// # } - /// ``` - fn map_err(self, f: F) -> MapErr - where F: FnOnce(Self::Error) -> E, - Self: Sized, - { - assert_future::(map_err::new(self, f)) - } - - /// Map this future's error to a new error type using the `Into` trait. - /// - /// This function does for futures what `try!` does for `Result`, - /// by letting the compiler infer the type of the resulting error. - /// Just as `map_err` above, this is useful for example to ensure - /// that futures have the same error type when used with - /// combinators like `select` and `join`. - /// - /// Note that this function consumes the receiving future and returns a - /// wrapped version of it. - /// - /// # Examples - /// - /// ``` - /// # extern crate futures; - /// use futures::prelude::*; - /// use futures::future; - /// - /// # fn main() { - /// let future_with_err_u8 = future::err::<(), u8>(1); - /// let future_with_err_u32 = future_with_err_u8.err_into::(); - /// # } - /// ``` - fn err_into(self) -> ErrInto - where Self: Sized, - Self::Error: Into - { - assert_future::(err_into::new(self)) + assert_future::(map::new(self, f)) } /// Chain on a computation for when a future finished, passing the result of /// the future to the provided closure `f`. /// - /// This function can be used to ensure a computation runs regardless of - /// the conclusion of the future. The closure provided will be yielded a - /// `Result` once the future is complete. - /// - /// The returned value of the closure must implement the `IntoFuture` trait + /// The returned value of the closure must implement the `Future` trait /// and can represent some more work to be done before the composed future - /// is finished. Note that the `Result` type implements the `IntoFuture` - /// trait so it is possible to simply alter the `Result` yielded to the - /// closure and return it. + /// is finished. /// - /// If this future is dropped or panics then the closure `f` will not be - /// run. + /// The closure `f` is only run *after* successful completion of the `self` + /// future. /// /// Note that this function consumes the receiving future and returns a /// wrapped version of it. @@ -240,116 +121,19 @@ pub trait FutureExt: Future { /// use futures::future; /// /// # fn main() { - /// let future_of_1 = future::ok::(1); - /// let future_of_4 = future_of_1.then(|x| { - /// x.map(|y| y + 3) - /// }); - /// - /// let future_of_err_1 = future::err::(1); - /// let future_of_4 = future_of_err_1.then(|x| { - /// match x { - /// Ok(_) => panic!("expected an error"), - /// Err(y) => future::ok::(y + 3), - /// } - /// }); - /// # } + /// let future_of_1 = future::ready(1); + /// let future_of_4 = future_of_1.then(|x| future::ready(x + 3)); + /// assert_eq!(block_on(future_of_4), 4); /// ``` fn then(self, f: F) -> Then - where F: FnOnce(result::Result) -> B, - B: IntoFuture, - Self: Sized, - { - assert_future::(then::new(self, f)) - } - - /// Execute another future after this one has resolved successfully. - /// - /// This function can be used to chain two futures together and ensure that - /// the final future isn't resolved until both have finished. The closure - /// provided is yielded the successful result of this future and returns - /// another value which can be converted into a future. - /// - /// Note that because `Result` implements the `IntoFuture` trait this method - /// can also be useful for chaining fallible and serial computations onto - /// the end of one future. - /// - /// If this future is dropped, panics, or completes with an error then the - /// provided closure `f` is never called. - /// - /// Note that this function consumes the receiving future and returns a - /// wrapped version of it. - /// - /// # Examples - /// - /// ``` - /// # extern crate futures; - /// use futures::prelude::*; - /// use futures::future::{self, FutureResult}; - /// - /// # fn main() { - /// let future_of_1 = future::ok::(1); - /// let future_of_4 = future_of_1.and_then(|x| { - /// Ok(x + 3) - /// }); - /// - /// let future_of_err_1 = future::err::(1); - /// future_of_err_1.and_then(|_| -> FutureResult { - /// panic!("should not be called in case of an error"); - /// }); - /// # } - /// ``` - fn and_then(self, f: F) -> AndThen - where F: FnOnce(Self::Item) -> B, - B: IntoFuture, - Self: Sized, - { - assert_future::(and_then::new(self, f)) - } - - /// Execute another future if this one resolves with an error. - /// - /// Return a future that passes along this future's value if it succeeds, - /// and otherwise passes the error to the closure `f` and waits for the - /// future it returns. The closure may also simply return a value that can - /// be converted into a future. - /// - /// Note that because `Result` implements the `IntoFuture` trait this method - /// can also be useful for chaining together fallback computations, where - /// when one fails, the next is attempted. - /// - /// If this future is dropped, panics, or completes successfully then the - /// provided closure `f` is never called. - /// - /// Note that this function consumes the receiving future and returns a - /// wrapped version of it. - /// - /// # Examples - /// - /// ``` - /// # extern crate futures; - /// use futures::prelude::*; - /// use futures::future::{self, FutureResult}; - /// - /// # fn main() { - /// let future_of_err_1 = future::err::(1); - /// let future_of_4 = future_of_err_1.or_else(|x| -> Result { - /// Ok(x + 3) - /// }); - /// - /// let future_of_1 = future::ok::(1); - /// future_of_1.or_else(|_| -> FutureResult { - /// panic!("should not be called in case of success"); - /// }); - /// # } - /// ``` - fn or_else(self, f: F) -> OrElse - where F: FnOnce(Self::Error) -> B, - B: IntoFuture, + where F: FnOnce(Self::Output) -> B, + B: Future, Self: Sized, { - assert_future::(or_else::new(self, f)) + assert_future::(then::new(self, f)) } + /* TODO /// Waits for either one of two differently-typed futures to complete. /// /// This function will return a new future which awaits for either this or @@ -484,38 +268,7 @@ pub trait FutureExt: Future { join::new5(self, b.into_future(), c.into_future(), d.into_future(), e.into_future()) } - - /// Wrap this future in an `Either` future, making it the left-hand variant - /// of that `Either`. - /// - /// This can be used in combination with the `right` method to write `if` - /// statements that evaluate to different futures in different branches. - /// - /// # Examples - /// - /// ``` - /// # extern crate futures; - /// use futures::executor::block_on; - /// use futures::future::*; - /// - /// # fn main() { - /// let x = 6; - /// let future = if x < 10 { - /// ok::<_, bool>(x).left() - /// } else { - /// empty().right() - /// }; - /// - /// assert_eq!(x, block_on(future).unwrap()); - /// # } - /// ``` - #[deprecated(note = "use `left_future` instead")] - fn left(self) -> Either - where B: Future, - Self: Sized - { - Either::Left(self) - } +*/ /// Wrap this future in an `Either` future, making it the left-hand variant /// of that `Either`. @@ -533,16 +286,16 @@ pub trait FutureExt: Future { /// # fn main() { /// let x = 6; /// let future = if x < 10 { - /// ok::<_, bool>(x).left_future() + /// ready(true).left_future() /// } else { - /// empty().right_future() + /// ready(false).right_future() /// }; /// - /// assert_eq!(x, block_on(future).unwrap()); + /// assert_eq!(true, block_on(future)); /// # } /// ``` fn left_future(self) -> Either - where B: Future, + where B: Future, Self: Sized { Either::Left(self) @@ -564,48 +317,15 @@ pub trait FutureExt: Future { /// # fn main() { /// let x = 6; /// let future = if x < 10 { - /// ok::<_, bool>(x).left() - /// } else { - /// empty().right() - /// }; - /// - /// assert_eq!(x, block_on(future).unwrap()); - /// # } - /// ``` - #[deprecated(note = "use `right_future` instead")] - fn right(self) -> Either - where A: Future, - Self: Sized, - { - Either::Right(self) - } - - /// Wrap this future in an `Either` future, making it the right-hand variant - /// of that `Either`. - /// - /// This can be used in combination with the `left_future` method to write `if` - /// statements that evaluate to different futures in different branches. - /// - /// # Examples - /// - /// ``` - /// # extern crate futures; - /// use futures::executor::block_on; - /// use futures::future::*; - /// - /// # fn main() { - /// let x = 6; - /// let future = if x < 10 { - /// ok::<_, bool>(x).left_future() + /// ready(true).left_future() /// } else { - /// empty().right_future() + /// ready(false).right_future() /// }; /// - /// assert_eq!(x, block_on(future).unwrap()); + /// assert_eq!(false, block_on(future)); /// # } - /// ``` fn right_future(self) -> Either - where A: Future, + where A: Future, Self: Sized, { Either::Right(self) @@ -626,15 +346,10 @@ pub trait FutureExt: Future { /// use futures_executor::block_on; /// /// # fn main() { - /// let future = future::ok::<_, bool>(17); + /// let future = future::ready::(17); /// let stream = future.into_stream(); - /// let collected: Vec<_> = block_on(stream.collect()).unwrap(); + /// let collected: Vec<_> = block_on(stream.collect()); /// assert_eq!(collected, vec![17]); - /// - /// let future = future::err::(19); - /// let stream = future.into_stream(); - /// let collected: Result, _> = block_on(stream.collect()); - /// assert_eq!(collected, Err(19)); /// # } /// ``` fn into_stream(self) -> IntoStream @@ -667,38 +382,20 @@ pub trait FutureExt: Future { /// use futures_executor::block_on; /// /// # fn main() { - /// let nested_future = future::ok::<_, u32>(future::ok::(1)); + /// let nested_future = future::ready(future::ready(1)); /// let future = nested_future.flatten(); - /// assert_eq!(block_on(future), Ok(1)); - /// # } - /// ``` - /// - /// Calling `flatten` on an errored `Future`, or if the inner `Future` is - /// errored, will result in an errored `Future`: - /// - /// ``` - /// # extern crate futures; - /// # extern crate futures_executor; - /// use futures::prelude::*; - /// use futures::future; - /// use futures_executor::block_on; - /// - /// # fn main() { - /// let nested_future = future::ok::<_, u32>(future::err::(1)); - /// let future = nested_future.flatten(); - /// assert_eq!(block_on(future), Err(1)); + /// assert_eq!(block_on(future), 1); /// # } /// ``` fn flatten(self) -> Flatten - where Self::Item: IntoFuture::Error>, + where Self::Output: Future, Self: Sized { let f = flatten::new(self); - assert_future::<<::Item as IntoFuture>::Item, - <::Item as IntoFuture>::Error, - _>(f) + assert_future::<<::Output as Future>::Output, _>(f) } + /* TODO /// Flatten the execution of this future when the successful result of this /// future is a sink. /// @@ -714,6 +411,7 @@ pub trait FutureExt: Future { { flatten_sink::new(self) } + */ /// Flatten the execution of this future when the successful result of this /// future is a stream. @@ -737,15 +435,15 @@ pub trait FutureExt: Future { /// /// # fn main() { /// let stream_items = vec![17, 18, 19]; - /// let future_of_a_stream = future::ok::<_, bool>(stream::iter_ok(stream_items)); + /// let future_of_a_stream = future::ready(stream::iter_ok(stream_items)); /// /// let stream = future_of_a_stream.flatten_stream(); - /// let list: Vec<_> = block_on(stream.collect()).unwrap(); + /// let list: Vec<_> = block_on(stream.collect()); /// assert_eq!(list, vec![17, 18, 19]); /// # } /// ``` fn flatten_stream(self) -> FlattenStream - where ::Item: Stream, + where Self::Output: Stream, Self: Sized { flatten_stream::new(self) @@ -770,14 +468,15 @@ pub trait FutureExt: Future { where Self: Sized { let f = fuse::new(self); - assert_future::(f) + assert_future::(f) } - /// Do something with the item of a future, passing it on. + /// Do something with the output of a future before passing it on. /// - /// When using futures, you'll often chain several of them together. - /// While working on such code, you might want to check out what's happening at - /// various parts in the pipeline. To do that, insert a call to `inspect`. + /// When using futures, you'll often chain several of them together. While + /// working on such code, you might want to check out what's happening at + /// various parts in the pipeline, without consuming the intermediate + /// value. To do that, insert a call to `inspect`. /// /// # Examples /// @@ -789,44 +488,16 @@ pub trait FutureExt: Future { /// use futures_executor::block_on; /// /// # fn main() { - /// let future = future::ok::(1); + /// let future = future::ready(1); /// let new_future = future.inspect(|&x| println!("about to resolve: {}", x)); - /// assert_eq!(block_on(new_future), Ok(1)); + /// assert_eq!(block_on(new_future), 1); /// # } /// ``` fn inspect(self, f: F) -> Inspect - where F: FnOnce(&Self::Item) -> (), + where F: FnOnce(&Self::Output) -> (), Self: Sized, { - assert_future::(inspect::new(self, f)) - } - - /// Do something with the error of a future, passing it on. - /// - /// When using futures, you'll often chain several of them together. - /// While working on such code, you might want to check out what's happening - /// to the errors at various parts in the pipeline. To do that, insert a - /// call to `inspect_err`. - /// - /// # Examples - /// - /// ``` - /// # extern crate futures; - /// use futures::prelude::*; - /// use futures::future; - /// use futures::executor::block_on; - /// - /// # fn main() { - /// let future = future::err::(1); - /// let new_future = future.inspect_err(|&x| println!("about to error: {}", x)); - /// assert_eq!(block_on(new_future), Err(1)); - /// # } - /// ``` - fn inspect_err(self, f: F) -> InspectErr - where F: FnOnce(&Self::Error) -> (), - Self: Sized, - { - assert_future::(inspect_err::new(self, f)) + assert_future::(inspect::new(self, f)) } /// Catches unwinding panics while polling the future. @@ -855,12 +526,12 @@ pub trait FutureExt: Future { /// use futures_executor::block_on; /// /// # fn main() { - /// let mut future = future::ok::(2); + /// let mut future = future::ready(2); /// assert!(block_on(future.catch_unwind()).is_ok()); /// - /// let mut future = future::lazy(|_| -> FutureResult { + /// let mut future = future::lazy(|_| -> ReadyFuture { /// panic!(); - /// future::ok::(2) + /// future::ready(2) /// }); /// assert!(block_on(future.catch_unwind()).is_err()); /// # } @@ -872,17 +543,16 @@ pub trait FutureExt: Future { catch_unwind::new(self) } + /* TODO /// Create a cloneable handle to this future where all handles will resolve /// to the same result. /// /// The shared() method provides a method to convert any future into a /// cloneable future. It enables a future to be polled by multiple threads. /// - /// The returned `Shared` future resolves successfully with - /// `SharedItem` or erroneously with `SharedError`. - /// Both `SharedItem` and `SharedError` implements `Deref` to allow shared - /// access to the underlying result. Ownership of `Self::Item` and - /// `Self::Error` cannot currently be reclaimed. + /// The returned `Shared` future resolves with `SharedItem`, + /// which implements `Deref` to allow shared access to the underlying + /// result. Ownership of the underlying value cannot currently be reclaimed. /// /// This method is only available when the `std` feature of this /// library is activated, and it is activated by default. @@ -897,12 +567,12 @@ pub trait FutureExt: Future { /// use futures_executor::block_on; /// /// # fn main() { - /// let future = future::ok::<_, bool>(6); + /// let future = future::ready(6); /// let shared1 = future.shared(); /// let shared2 = shared1.clone(); /// - /// assert_eq!(6, *block_on(shared1).unwrap()); - /// assert_eq!(6, *block_on(shared2).unwrap()); + /// assert_eq!(6, *block_on(shared1)); + /// assert_eq!(6, *block_on(shared2)); /// # } /// ``` /// @@ -916,13 +586,13 @@ pub trait FutureExt: Future { /// use futures_executor::block_on; /// /// # fn main() { - /// let future = future::ok::<_, bool>(6); + /// let future = future::ready(6); /// let shared1 = future.shared(); /// let shared2 = shared1.clone(); /// let join_handle = thread::spawn(move || { - /// assert_eq!(6, *block_on(shared2).unwrap()); + /// assert_eq!(6, *block_on(shared2)); /// }); - /// assert_eq!(6, *block_on(shared1).unwrap()); + /// assert_eq!(6, *block_on(shared1)); /// join_handle.join().unwrap(); /// # } /// ``` @@ -932,34 +602,7 @@ pub trait FutureExt: Future { { shared::new(self) } - - /// Handle errors generated by this future by converting them into - /// `Self::Item`. - /// - /// Because it can never produce an error, the returned `Recover` future can - /// conform to any specific `Error` type, including `Never`. - /// - /// # Examples - /// - /// ``` - /// # extern crate futures; - /// # extern crate futures_executor; - /// use futures::prelude::*; - /// use futures::future; - /// use futures_executor::block_on; - /// - /// # fn main() { - /// let future = future::err::<(), &str>("something went wrong"); - /// let new_future = future.recover::(|_| ()); - /// assert_eq!(block_on(new_future), Ok(())); - /// # } - /// ``` - fn recover(self, f: F) -> Recover - where Self: Sized, - F: FnOnce(Self::Error) -> Self::Item - { - recover::new(self, f) - } +*/ /// Assigns the provided `Executor` to be used when spawning tasks /// from within the future. @@ -975,9 +618,9 @@ pub trait FutureExt: Future { /// /// # fn main() { /// let pool = ThreadPool::new().expect("unable to create threadpool"); - /// let future = future::ok::<(), _>(()); + /// let future = future::ready(3); /// let spawn_future = spawn(future).with_executor(pool); - /// assert_eq!(block_on(spawn_future), Ok(())); + /// assert_eq!(block_on(spawn_future), 3); /// # } /// ``` #[cfg(feature = "std")] @@ -991,8 +634,8 @@ pub trait FutureExt: Future { // Just a helper function to ensure the futures we're returning all have the // right implementations. -fn assert_future(t: F) -> F - where F: Future, +fn assert_future(t: F) -> F + where F: Future, { t } diff --git a/futures-util/src/future/or_else.rs b/futures-util/src/future/or_else.rs deleted file mode 100644 index 6e9c506e78..0000000000 --- a/futures-util/src/future/or_else.rs +++ /dev/null @@ -1,40 +0,0 @@ -use futures_core::{Future, IntoFuture, Poll}; -use futures_core::task; -use super::chain::Chain; - -/// Future for the `or_else` combinator, chaining a computation onto the end of -/// a future which fails with an error. -/// -/// This is created by the `Future::or_else` method. -#[derive(Debug)] -#[must_use = "futures do nothing unless polled"] -pub struct OrElse where A: Future, B: IntoFuture { - state: Chain, -} - -pub fn new(future: A, f: F) -> OrElse - where A: Future, - B: IntoFuture, -{ - OrElse { - state: Chain::new(future, f), - } -} - -impl Future for OrElse - where A: Future, - B: IntoFuture, - F: FnOnce(A::Error) -> B, -{ - type Item = B::Item; - type Error = B::Error; - - fn poll(&mut self, cx: &mut task::Context) -> Poll { - self.state.poll(cx, |a, f| { - match a { - Ok(item) => Ok(Ok(item)), - Err(e) => Ok(Err(f(e).into_future())) - } - }) - } -} diff --git a/futures-util/src/future/poll_fn.rs b/futures-util/src/future/poll_fn.rs index 02be5f296e..db8302a906 100644 --- a/futures-util/src/future/poll_fn.rs +++ b/futures-util/src/future/poll_fn.rs @@ -1,5 +1,7 @@ //! Definition of the `PollFn` adapter combinator +use core::mem::Pin; + use futures_core::{Future, Poll}; use futures_core::task; @@ -34,18 +36,18 @@ pub struct PollFn { /// # } /// ``` pub fn poll_fn(f: F) -> PollFn - where F: FnMut(&mut task::Context) -> Poll + where F: FnMut(&mut task::Context) -> Poll { PollFn { inner: f } } -impl Future for PollFn - where F: FnMut(&mut task::Context) -> Poll +impl Future for PollFn + where F: FnMut(&mut task::Context) -> Poll { - type Item = T; - type Error = E; + type Output = T; - fn poll(&mut self, cx: &mut task::Context) -> Poll { - (self.inner)(cx) + fn poll(mut self: Pin, cx: &mut task::Context) -> Poll { + // safe because we never expose a Pin + (unsafe { Pin::get_mut(&mut self) }.inner)(cx) } } diff --git a/futures-util/src/future/recover.rs b/futures-util/src/future/recover.rs deleted file mode 100644 index 65743a0b8c..0000000000 --- a/futures-util/src/future/recover.rs +++ /dev/null @@ -1,35 +0,0 @@ -use core::marker::PhantomData; - -use futures_core::{Future, Poll, Async}; -use futures_core::task; - -/// Future for the `recover` combinator, handling errors by converting them into -/// an `Item`, compatible with any error type of the caller's choosing. -#[must_use = "futures do nothing unless polled"] -#[derive(Debug)] -pub struct Recover { - inner: A, - f: Option, - err: PhantomData, -} - -pub fn new(future: A, f: F) -> Recover - where A: Future -{ - Recover { inner: future, f: Some(f), err: PhantomData } -} - -impl Future for Recover - where A: Future, - F: FnOnce(A::Error) -> A::Item, -{ - type Item = A::Item; - type Error = E; - - fn poll(&mut self, cx: &mut task::Context) -> Poll { - match self.inner.poll(cx) { - Err(e) => Ok(Async::Ready((self.f.take().expect("Polled future::Recover after completion"))(e))), - Ok(x) => Ok(x), - } - } -} diff --git a/futures-util/src/future/then.rs b/futures-util/src/future/then.rs index 17e4e75142..a26feba262 100644 --- a/futures-util/src/future/then.rs +++ b/futures-util/src/future/then.rs @@ -1,4 +1,6 @@ -use futures_core::{Future, IntoFuture, Poll}; +use core::mem::Pin; + +use futures_core::{Future, Poll}; use futures_core::task; use super::chain::Chain; @@ -8,13 +10,13 @@ use super::chain::Chain; /// This is created by the `Future::then` method. #[derive(Debug)] #[must_use = "futures do nothing unless polled"] -pub struct Then where A: Future, B: IntoFuture { - state: Chain, +pub struct Then where A: Future, B: Future { + state: Chain, } pub fn new(future: A, f: F) -> Then where A: Future, - B: IntoFuture, + B: Future, { Then { state: Chain::new(future, f), @@ -23,15 +25,12 @@ pub fn new(future: A, f: F) -> Then impl Future for Then where A: Future, - B: IntoFuture, - F: FnOnce(Result) -> B, + B: Future, + F: FnOnce(A::Output) -> B, { - type Item = B::Item; - type Error = B::Error; + type Output = B::Output; - fn poll(&mut self, cx: &mut task::Context) -> Poll { - self.state.poll(cx, |a, f| { - Ok(Err(f(a).into_future())) - }) + fn poll(mut self: Pin, cx: &mut task::Context) -> Poll { + unsafe { pinned_field!(self, state) }.poll(cx, |a, f| f(a)) } } diff --git a/futures-util/src/future/with_executor.rs b/futures-util/src/future/with_executor.rs index e4517ee5bd..4f1310c691 100644 --- a/futures-util/src/future/with_executor.rs +++ b/futures-util/src/future/with_executor.rs @@ -1,3 +1,5 @@ +use core::mem::Pin; + use futures_core::{Future, Poll}; use futures_core::task; use futures_core::executor::Executor; @@ -24,10 +26,12 @@ impl Future for WithExecutor where F: Future, E: Executor, { - type Item = F::Item; - type Error = F::Error; + type Output = F::Output; - fn poll(&mut self, cx: &mut task::Context) -> Poll { - self.future.poll(&mut cx.with_executor(&mut self.executor)) + fn poll(mut self: Pin, cx: &mut task::Context) -> Poll { + let this = unsafe { Pin::get_mut(&mut self) }; + let fut = unsafe { Pin::new_unchecked(&mut this.future) }; + let exec = &mut this.executor; + fut.poll(&mut cx.with_executor(exec)) } } diff --git a/futures-util/src/future_result/and_then.rs b/futures-util/src/future_result/and_then.rs new file mode 100644 index 0000000000..86d2d3c785 --- /dev/null +++ b/futures-util/src/future_result/and_then.rs @@ -0,0 +1,67 @@ +use core::mem::Pin; + +use futures_core::{Future, Poll}; +use futures_core::task; + +use FutureResult; + +/// Future for the `and_then` combinator, chaining a computation onto the end of +/// another future which completes successfully. +/// +/// This is created by the `Future::and_then` method. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct AndThen { + state: State, +} + +#[derive(Debug)] +enum State { + First(Fut1, Option), + Second(Fut2), +} + +pub fn new(future: A, f: F) -> AndThen { + AndThen { + state: State::First(future, Some(f)), + } +} + +impl Future for AndThen + where A: FutureResult, + B: FutureResult, + F: FnOnce(A::Item) -> B, +{ + type Output = Result; + + fn poll(mut self: Pin, cx: &mut task::Context) -> Poll { + loop { + // safe to `get_mut` here because we don't move out + let fut2 = match unsafe { Pin::get_mut(&mut self) }.state { + State::First(ref mut fut1, ref mut data) => { + // safe to create a new `Pin` because `fut1` will never move + // before it's dropped. + match unsafe { Pin::new_unchecked(fut1) }.poll_result(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Ready(Ok(v)) => { + (data.take().unwrap())(v) + } + } + } + State::Second(ref mut fut2) => { + // safe to create a new `Pin` because `fut2` will never move + // before it's dropped; once we're in `Chain::Second` we stay + // there forever. + return unsafe { Pin::new_unchecked(fut2) }.poll_result(cx) + } + }; + + // safe because we're using the `&mut` to do an assignment, not for moving out + unsafe { + // note: it's safe to move the `fut2` here because we haven't yet polled it + Pin::get_mut(&mut self).state = State::Second(fut2); + } + } + } +} diff --git a/futures-util/src/future_result/err_into.rs b/futures-util/src/future_result/err_into.rs new file mode 100644 index 0000000000..cdb57a0899 --- /dev/null +++ b/futures-util/src/future_result/err_into.rs @@ -0,0 +1,40 @@ +use core::mem::Pin; +use core::marker::PhantomData; + +use futures_core::{Future, Poll}; +use futures_core::task; + +use FutureResult; + +/// Future for the `err_into` combinator, changing the error type of a future. +/// +/// This is created by the `Future::err_into` method. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct ErrInto { + future: A, + f: PhantomData +} + +pub fn new(future: A) -> ErrInto { + ErrInto { + future: future, + f: PhantomData + } +} + +impl Future for ErrInto + where A: FutureResult, + A::Error: Into, +{ + type Output = Result; + + fn poll(mut self: Pin, cx: &mut task::Context) -> Poll { + match unsafe { pinned_field!(self, future) }.poll_result(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(e) => { + Poll::Ready(e.map_err(Into::into)) + } + } + } +} diff --git a/futures-util/src/future_result/map_err.rs b/futures-util/src/future_result/map_err.rs new file mode 100644 index 0000000000..229ce9dd8b --- /dev/null +++ b/futures-util/src/future_result/map_err.rs @@ -0,0 +1,42 @@ +use core::mem::Pin; + +use futures_core::{Future, Poll}; +use futures_core::task; + +use FutureResult; + +/// Future for the `map_err` combinator, changing the type of a future. +/// +/// This is created by the `Future::map_err` method. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct MapErr { + future: A, + f: Option, +} + +pub fn new(future: A, f: F) -> MapErr { + MapErr { + future: future, + f: Some(f), + } +} + +impl Future for MapErr + where A: FutureResult, + F: FnOnce(A::Error) -> U, +{ + type Output = Result; + + fn poll(mut self: Pin, cx: &mut task::Context) -> Poll { + match unsafe { pinned_field!(self, future) }.poll_result(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(e) => { + let f = unsafe { + Pin::get_mut(&mut self).f.take().expect("cannot poll MapErr twice") + }; + Poll::Ready(e.map_err(f)) + } + } + } +} diff --git a/futures-util/src/future_result/map_ok.rs b/futures-util/src/future_result/map_ok.rs new file mode 100644 index 0000000000..c8b97582fb --- /dev/null +++ b/futures-util/src/future_result/map_ok.rs @@ -0,0 +1,42 @@ +use core::mem::Pin; + +use futures_core::{Future, Poll}; +use futures_core::task; + +use FutureResult; + +/// Future for the `map_ok` combinator, changing the type of a future. +/// +/// This is created by the `Future::map_ok` method. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct MapOk { + future: A, + f: Option, +} + +pub fn new(future: A, f: F) -> MapOk { + MapOk { + future: future, + f: Some(f), + } +} + +impl Future for MapOk + where A: FutureResult, + F: FnOnce(A::Item) -> U, +{ + type Output = Result; + + fn poll(mut self: Pin, cx: &mut task::Context) -> Poll { + match unsafe { pinned_field!(self, future) }.poll_result(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(e) => { + let f = unsafe { + Pin::get_mut(&mut self).f.take().expect("cannot poll MapOk twice") + }; + Poll::Ready(e.map(f)) + } + } + } +} diff --git a/futures-util/src/future_result/mod.rs b/futures-util/src/future_result/mod.rs new file mode 100644 index 0000000000..269803a57c --- /dev/null +++ b/futures-util/src/future_result/mod.rs @@ -0,0 +1,455 @@ +//! Futures +//! +//! This module contains a number of functions for working with `Future`s, +//! including the `FutureExt` trait which adds methods to `Future` types. + +use core::mem::Pin; + +use futures_core::{task, Future, Poll}; + +// combinators +mod and_then; +mod map_ok; +mod map_err; +mod err_into; +mod or_else; +mod recover; + +/* TODO +mod join; +mod select; +pub use self::join::{Join, Join3, Join4, Join5}; +pub use self::select::Select; + +if_std! { +mod join_all; +mod select_all; +mod select_ok; +pub use self::join_all::{join_all, JoinAll}; +pub use self::select_all::{SelectAll, SelectAllNext, select_all}; +pub use self::select_ok::{SelectOk, select_ok}; +} +*/ + +pub use self::and_then::AndThen; +pub use self::map_ok::MapOk; +pub use self::map_err::MapErr; +pub use self::err_into::ErrInto; +pub use self::or_else::OrElse; +pub use self::recover::Recover; + +impl FutureResult for F + where F: Future> +{ + type Item = T; + type Error = E; + + fn poll_result(self: Pin, cx: &mut task::Context) -> Poll { + self.poll(cx) + } +} + +/// A convenience for futures that return `Result` values that includes +/// a variety of adapters tailored to such futures. +pub trait FutureResult { + /// The type of successful values yielded by this future + type Item; + + /// The type of failures yielded by this future + type Error; + + /// Map this future's result to a different type, returning a new future of + /// the resulting type. + /// + /// This function is similar to the `Option::map` or `Iterator::map` where + /// it will change the type of the underlying future. This is useful to + /// chain along a computation once a future has been resolved. + /// + /// The closure provided will only be called if this future is resolved + /// successfully. If this future returns an error, panics, or is dropped, + /// then the closure provided will never be invoked. + /// + /// Note that this function consumes the receiving future and returns a + /// wrapped version of it, similar to the existing `map` methods in the + /// standard library. + /// + /// # Examples + /// + /// ``` + /// # extern crate futures; + /// # extern crate futures_executor; + /// use futures::prelude::*; + /// use futures::future; + /// use futures_executor::block_on; + /// + /// # fn main() { + /// let future = future::ok::(1); + /// let new_future = future.map(|x| x + 3); + /// assert_eq!(block_on(new_future), Ok(4)); + /// # } + /// ``` + /// + /// Calling `map` on an errored `Future` has no effect: + /// + /// ``` + /// # extern crate futures; + /// # extern crate futures_executor; + /// use futures::prelude::*; + /// use futures::future; + /// use futures_executor::block_on; + /// + /// # fn main() { + /// let future = future::err::(1); + /// let new_future = future.map(|x| x + 3); + /// assert_eq!(block_on(new_future), Err(1)); + /// # } + /// ``` + fn map_ok(self, f: F) -> MapOk + where F: FnOnce(Self::Item) -> U, + Self: Sized, + { + map_ok::new(self, f) + } + + /// Map this future's error to a different error, returning a new future. + /// + /// This function is similar to the `Result::map_err` where it will change + /// the error type of the underlying future. This is useful for example to + /// ensure that futures have the same error type when used with combinators + /// like `select` and `join`. + /// + /// The closure provided will only be called if this future is resolved + /// with an error. If this future returns a success, panics, or is + /// dropped, then the closure provided will never be invoked. + /// + /// Note that this function consumes the receiving future and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// # extern crate futures; + /// # extern crate futures_executor; + /// use futures::future::err; + /// use futures::prelude::*; + /// use futures_executor::block_on; + /// + /// # fn main() { + /// let future = err::(1); + /// let new_future = future.map_err(|x| x + 3); + /// assert_eq!(block_on(new_future), Err(4)); + /// # } + /// ``` + /// + /// Calling `map_err` on a successful `Future` has no effect: + /// + /// ``` + /// # extern crate futures; + /// # extern crate futures_executor; + /// use futures::future::ok; + /// use futures::prelude::*; + /// use futures_executor::block_on; + /// + /// # fn main() { + /// let future = ok::(1); + /// let new_future = future.map_err(|x| x + 3); + /// assert_eq!(block_on(new_future), Ok(1)); + /// # } + /// ``` + fn map_err(self, f: F) -> MapErr + where F: FnOnce(Self::Error) -> E, + Self: Sized, + { + map_err::new(self, f) + } + + /// Map this future's error to a new error type using the `Into` trait. + /// + /// This function does for futures what `try!` does for `Result`, + /// by letting the compiler infer the type of the resulting error. + /// Just as `map_err` above, this is useful for example to ensure + /// that futures have the same error type when used with + /// combinators like `select` and `join`. + /// + /// Note that this function consumes the receiving future and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// # extern crate futures; + /// use futures::prelude::*; + /// use futures::future; + /// + /// # fn main() { + /// let future_with_err_u8 = future::err::<(), u8>(1); + /// let future_with_err_u32 = future_with_err_u8.err_into::(); + /// # } + /// ``` + fn err_into(self) -> ErrInto + where Self: Sized, + Self::Error: Into + { + err_into::new(self) + } + + /// Execute another future after this one has resolved successfully. + /// + /// This function can be used to chain two futures together and ensure that + /// the final future isn't resolved until both have finished. The closure + /// provided is yielded the successful result of this future and returns + /// another value which can be converted into a future. + /// + /// Note that because `Result` implements the `IntoFuture` trait this method + /// can also be useful for chaining fallible and serial computations onto + /// the end of one future. + /// + /// If this future is dropped, panics, or completes with an error then the + /// provided closure `f` is never called. + /// + /// Note that this function consumes the receiving future and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// # extern crate futures; + /// use futures::prelude::*; + /// use futures::future::{self, FutureResult}; + /// + /// # fn main() { + /// let future_of_1 = future::ok::(1); + /// let future_of_4 = future_of_1.and_then(|x| { + /// Ok(x + 3) + /// }); + /// + /// let future_of_err_1 = future::err::(1); + /// future_of_err_1.and_then(|_| -> FutureResult { + /// panic!("should not be called in case of an error"); + /// }); + /// # } + /// ``` + fn and_then(self, f: F) -> AndThen + where F: FnOnce(Self::Item) -> B, + B: FutureResult, + Self: Sized, + { + and_then::new(self, f) + } + + /// Execute another future if this one resolves with an error. + /// + /// Return a future that passes along this future's value if it succeeds, + /// and otherwise passes the error to the closure `f` and waits for the + /// future it returns. The closure may also simply return a value that can + /// be converted into a future. + /// + /// Note that because `Result` implements the `IntoFuture` trait this method + /// can also be useful for chaining together fallback computations, where + /// when one fails, the next is attempted. + /// + /// If this future is dropped, panics, or completes successfully then the + /// provided closure `f` is never called. + /// + /// Note that this function consumes the receiving future and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// # extern crate futures; + /// use futures::prelude::*; + /// use futures::future::{self, FutureResult}; + /// + /// # fn main() { + /// let future_of_err_1 = future::err::(1); + /// let future_of_4 = future_of_err_1.or_else(|x| -> Result { + /// Ok(x + 3) + /// }); + /// + /// let future_of_1 = future::ok::(1); + /// future_of_1.or_else(|_| -> FutureResult { + /// panic!("should not be called in case of success"); + /// }); + /// # } + /// ``` + fn or_else(self, f: F) -> OrElse + where F: FnOnce(Self::Error) -> B, + B: FutureResult, + Self: Sized, + { + or_else::new(self, f) + } + + /* TODO + /// Waits for either one of two differently-typed futures to complete. + /// + /// This function will return a new future which awaits for either this or + /// the `other` future to complete. The returned future will finish with + /// both the value resolved and a future representing the completion of the + /// other work. + /// + /// Note that this function consumes the receiving futures and returns a + /// wrapped version of them. + /// + /// Also note that if both this and the second future have the same + /// success/error type you can use the `Either::split` method to + /// conveniently extract out the value at the end. + /// + /// # Examples + /// + /// ``` + /// # extern crate futures; + /// use futures::prelude::*; + /// use futures::future::{self, Either}; + /// + /// // A poor-man's join implemented on top of select + /// + /// fn join(a: A, b: B) -> Box> + /// where A: Future + 'static, + /// B: Future + 'static, + /// E: 'static, + /// { + /// Box::new(a.select(b).then(|res| -> Box> { + /// match res { + /// Ok(Either::Left((x, b))) => Box::new(b.map(move |y| (x, y))), + /// Ok(Either::Right((y, a))) => Box::new(a.map(move |x| (x, y))), + /// Err(Either::Left((e, _))) => Box::new(future::err(e)), + /// Err(Either::Right((e, _))) => Box::new(future::err(e)), + /// } + /// })) + /// } + /// # fn main() {} + /// ``` + fn select(self, other: B) -> Select + where B: IntoFuture, Self: Sized + { + select::new(self, other.into_future()) + } + + /// Joins the result of two futures, waiting for them both to complete. + /// + /// This function will return a new future which awaits both this and the + /// `other` future to complete. The returned future will finish with a tuple + /// of both results. + /// + /// Both futures must have the same error type, and if either finishes with + /// an error then the other will be dropped and that error will be + /// returned. + /// + /// Note that this function consumes the receiving future and returns a + /// wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// # extern crate futures; + /// # extern crate futures_executor; + /// use futures::prelude::*; + /// use futures::future; + /// use futures_executor::block_on; + /// + /// # fn main() { + /// let a = future::ok::(1); + /// let b = future::ok::(2); + /// let pair = a.join(b); + /// + /// assert_eq!(block_on(pair), Ok((1, 2))); + /// # } + /// ``` + /// + /// If one or both of the joined `Future`s is errored, the resulting + /// `Future` will be errored: + /// + /// ``` + /// # extern crate futures; + /// # extern crate futures_executor; + /// use futures::prelude::*; + /// use futures::future; + /// use futures_executor::block_on; + /// + /// # fn main() { + /// let a = future::ok::(1); + /// let b = future::err::(2); + /// let pair = a.join(b); + /// + /// assert_eq!(block_on(pair), Err(2)); + /// # } + /// ``` + fn join(self, other: B) -> Join + where B: IntoFuture, + Self: Sized, + { + let f = join::new(self, other.into_future()); + assert_future::<(Self::Item, B::Item), Self::Error, _>(f) + } + + /// Same as `join`, but with more futures. + fn join3(self, b: B, c: C) -> Join3 + where B: IntoFuture, + C: IntoFuture, + Self: Sized, + { + join::new3(self, b.into_future(), c.into_future()) + } + + /// Same as `join`, but with more futures. + fn join4(self, b: B, c: C, d: D) + -> Join4 + where B: IntoFuture, + C: IntoFuture, + D: IntoFuture, + Self: Sized, + { + join::new4(self, b.into_future(), c.into_future(), d.into_future()) + } + + /// Same as `join`, but with more futures. + fn join5(self, b: B, c: C, d: D, e: E) + -> Join5 + where B: IntoFuture, + C: IntoFuture, + D: IntoFuture, + E: IntoFuture, + Self: Sized, + { + join::new5(self, b.into_future(), c.into_future(), d.into_future(), + e.into_future()) + } +*/ + + /// Handle errors generated by this future by converting them into + /// `Self::Item`. + /// + /// Because it can never produce an error, the returned `Recover` future can + /// conform to any specific `Error` type, including `Never`. + /// + /// # Examples + /// + /// ``` + /// # extern crate futures; + /// # extern crate futures_executor; + /// use futures::prelude::*; + /// use futures::future; + /// use futures_executor::block_on; + /// + /// # fn main() { + /// let future = future::err::<(), &str>("something went wrong"); + /// let new_future = future.recover::(|_| ()); + /// assert_eq!(block_on(new_future), Ok(())); + /// # } + /// ``` + fn recover(self, f: F) -> Recover + where Self: Sized, + F: FnOnce(Self::Error) -> Self::Item + { + recover::new(self, f) + } + + /// Poll this `FutureResult` as if it were a `Future`. + /// + /// This method is a stopgap for a compiler limitation that prevents us from + /// directly inheriting from the `Future` trait; in the future it won't be + /// needed. + fn poll_result(self: Pin, cx: &mut task::Context) -> Poll>; +} diff --git a/futures-util/src/future_result/or_else.rs b/futures-util/src/future_result/or_else.rs new file mode 100644 index 0000000000..a7188c8279 --- /dev/null +++ b/futures-util/src/future_result/or_else.rs @@ -0,0 +1,67 @@ +use core::mem::Pin; + +use futures_core::{Future, Poll}; +use futures_core::task; + +use FutureResult; + +/// Future for the `or_else` combinator, chaining a computation onto the end of +/// a future which fails with an error. +/// +/// This is created by the `Future::or_else` method. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct OrElse { + state: State, +} + +#[derive(Debug)] +enum State { + First(Fut1, Option), + Second(Fut2), +} + +pub fn new(future: A, f: F) -> OrElse { + OrElse { + state: State::First(future, Some(f)), + } +} + +impl Future for OrElse + where A: FutureResult, + B: FutureResult, + F: FnOnce(A::Error) -> B, +{ + type Output = Result; + + fn poll(mut self: Pin, cx: &mut task::Context) -> Poll { + loop { + // safe to `get_mut` here because we don't move out + let fut2 = match unsafe { Pin::get_mut(&mut self) }.state { + State::First(ref mut fut1, ref mut data) => { + // safe to create a new `Pin` because `fut1` will never move + // before it's dropped. + match unsafe { Pin::new_unchecked(fut1) }.poll_result(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)), + Poll::Ready(Err(e)) => { + (data.take().unwrap())(e) + } + } + } + State::Second(ref mut fut2) => { + // safe to create a new `Pin` because `fut2` will never move + // before it's dropped; once we're in `Chain::Second` we stay + // there forever. + return unsafe { Pin::new_unchecked(fut2) }.poll_result(cx) + } + }; + + // safe because we're using the `&mut` to do an assignment, not for moving out + unsafe { + // note: it's safe to move the `fut2` here because we haven't yet polled it + Pin::get_mut(&mut self).state = State::Second(fut2); + } + } + } +} diff --git a/futures-util/src/future_result/recover.rs b/futures-util/src/future_result/recover.rs new file mode 100644 index 0000000000..7ec1e43c02 --- /dev/null +++ b/futures-util/src/future_result/recover.rs @@ -0,0 +1,37 @@ +use core::mem::Pin; + +use futures_core::{Future, Poll}; +use futures_core::task; + +use FutureResult; + +/// Future for the `recover` combinator, handling errors by converting them into +/// an `Item`, compatible with any error type of the caller's choosing. +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub struct Recover { + inner: A, + f: Option, +} + +pub fn new(future: A, f: F) -> Recover { + Recover { inner: future, f: Some(f) } +} + +impl Future for Recover + where A: FutureResult, + F: FnOnce(A::Error) -> A::Item, +{ + type Output = A::Item; + + fn poll(mut self: Pin, cx: &mut task::Context) -> Poll { + unsafe { pinned_field!(self, inner) }.poll_result(cx) + .map(|res| res.unwrap_or_else(|e| { + let f = unsafe { + Pin::get_mut(&mut self).f.take() + .expect("Polled future::Recover after completion") + }; + f(e) + })) + } +} diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index 970eab2db4..39afba0a5e 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -1,6 +1,8 @@ //! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s, //! and the `AsyncRead` and `AsyncWrite` traits. +#![feature(pin, arbitrary_self_types)] + #![no_std] #![deny(missing_docs, missing_debug_implementations, warnings)] #![doc(html_root_url = "https://docs.rs/futures/0.1")] @@ -11,12 +13,14 @@ extern crate futures_channel; extern crate futures_core; #[cfg(test)] extern crate futures_executor; -extern crate futures_io; -extern crate futures_sink; + +// extern crate futures_io; +// extern crate futures_sink; + extern crate either; -#[cfg(feature = "std")] -use futures_core::{Async, Future, Poll, task}; +//#[cfg(feature = "std")] +//use futures_core::{Future, Poll, task}; macro_rules! if_std { ($($i:item)*) => ($( @@ -26,9 +30,10 @@ macro_rules! if_std { } #[cfg(feature = "std")] -#[macro_use] +//#[macro_use] extern crate std; +/* macro_rules! delegate_sink { ($field:ident) => { fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { @@ -49,30 +54,36 @@ macro_rules! delegate_sink { } } +*/ +/* #[cfg(all(feature = "std", any(test, feature = "bench")))] pub mod lock; #[cfg(all(feature = "std", not(any(test, feature = "bench"))))] mod lock; +*/ pub mod future; pub use future::FutureExt; -#[cfg(feature = "std")] -pub mod io; -#[cfg(feature = "std")] -pub use io::{AsyncReadExt, AsyncWriteExt}; +pub mod future_result; +pub use future_result::FutureResult; + +// #[cfg(feature = "std")] +// pub mod io; +// #[cfg(feature = "std")] +// pub use io::{AsyncReadExt, AsyncWriteExt}; -pub mod stream; -pub use stream::StreamExt; +// pub mod stream; +// pub use stream::StreamExt; -pub mod sink; -pub use sink::SinkExt; +// pub mod sink; +// pub use sink::SinkExt; pub mod prelude { //! Prelude containing the extension traits, which add functionality to //! existing asynchronous types. - pub use {FutureExt, StreamExt, SinkExt}; - #[cfg(feature = "std")] - pub use {AsyncReadExt, AsyncWriteExt}; + // pub use {FutureExt, StreamExt, SinkExt}; + // #[cfg(feature = "std")] + // pub use {AsyncReadExt, AsyncWriteExt}; } diff --git a/futures-util/src/lock.rs b/futures-util/src/lock.rs index 44ed6f7d5c..ef032b16db 100644 --- a/futures-util/src/lock.rs +++ b/futures-util/src/lock.rs @@ -12,7 +12,7 @@ use std::sync::Arc; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; -use futures_core::{Async, Future, Poll}; +use futures_core::{Future, Poll}; use futures_core::task::{self, Waker}; /// A type of futures-powered synchronization primitive which is a mutex between @@ -80,11 +80,11 @@ impl BiLock { /// /// This function will panic if called outside the context of a future's /// task. - pub fn poll_lock(&self, cx: &mut task::Context) -> Async> { + pub fn poll_lock(&self, cx: &mut task::Context) -> Poll> { loop { match self.inner.state.swap(1, SeqCst) { // Woohoo, we grabbed the lock! - 0 => return Async::Ready(BiLockGuard { inner: self }), + 0 => return Poll::Ready(BiLockGuard { inner: self }), // Oops, someone else has locked the lock 1 => {} @@ -103,7 +103,7 @@ impl BiLock { match self.inner.state.compare_exchange(1, me, SeqCst, SeqCst) { // The lock is still locked, but we've now parked ourselves, so // just report that we're scheduled to receive a notification. - Ok(_) => return Async::Pending, + Ok(_) => return Poll::Pending, // Oops, looks like the lock was unlocked after our swap above // and before the compare_exchange. Deallocate what we just @@ -245,8 +245,7 @@ pub struct BiLockAcquire { } impl Future for BiLockAcquire { - type Item = BiLockAcquired; - type Error = (); + type Output = BiLockAcquired; fn poll(&mut self, cx: &mut task::Context) -> Poll, ()> { match self.inner.as_ref().expect("cannot poll after Ready").poll_lock(cx) { @@ -255,7 +254,7 @@ impl Future for BiLockAcquire { } Async::Pending => return Ok(Async::Pending), } - Ok(Async::Ready(BiLockAcquired { inner: self.inner.take() })) + Poll::Ready(BiLockAcquired { inner: self.inner.take() }) } }