Skip to content

Commit

Permalink
Convert basic futures combinators to futures-core 0.3
Browse files Browse the repository at this point in the history
This commit includes the introduction of a `FutureResult` trait along
the lines of the RFC (with some slight revisions).

It tackles all the "basic" combinators, but leaves off `shared`,
`select`, and `join` for the moment.
  • Loading branch information
aturon committed Apr 20, 2018
1 parent 4f9d185 commit c5bbcad
Show file tree
Hide file tree
Showing 32 changed files with 1,081 additions and 1,051 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ members = [
# "futures-macro-await",
# "futures-sink",
# "futures-stable",
# "futures-util",
"futures-util",
]
22 changes: 12 additions & 10 deletions futures-util/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures-util"
version = "0.2.0"
version = "0.3.0-alpha"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
license = "MIT/Apache-2.0"
repository = "https://github.com/rust-lang-nursery/futures-rs"
Expand All @@ -11,18 +11,20 @@ Common utilities and extension traits for the futures-rs library.
"""

[features]
std = ["futures-core/std", "futures-io/std", "futures-sink/std", "either/use_std"]
default = ["std", "futures-core/either", "futures-sink/either"]
# std = ["futures-core/std", "futures-io/std", "futures-sink/std", "either/use_std"]
std = ["futures-core/std", "either/use_std"]
# default = ["std", "futures-core/either", "futures-sink/either"]
default = ["std", "futures-core/either"]
bench = []

[dependencies]
futures-core = { path = "../futures-core", version = "0.2.0", default-features = false }
futures-channel = { path = "../futures-channel", version = "0.2.0", default-features = false }
futures-io = { path = "../futures-io", version = "0.2.0", default-features = false }
futures-sink = { path = "../futures-sink", version = "0.2.0", default-features = false}
futures-core = { path = "../futures-core", version = "0.3.0-alpha", default-features = false }
futures-channel = { path = "../futures-channel", version = "0.3.0-alpha", default-features = false }
# futures-io = { path = "../futures-io", version = "0.2.0", default-features = false }
# futures-sink = { path = "../futures-sink", version = "0.2.0", default-features = false}
either = { version = "1.4", default-features = false }

[dev-dependencies]
futures = { path = "../futures", version = "0.2.0" }
futures-executor = { path = "../futures-executor", version = "0.2.0" }
futures-channel = { path = "../futures-channel", version = "0.2.0" }
# futures = { path = "../futures", version = "0.2.0" }
# futures-executor = { path = "../futures-executor", version = "0.2.0" }
# futures-channel = { path = "../futures-channel", version = "0.2.0" }
40 changes: 0 additions & 40 deletions futures-util/src/future/and_then.rs

This file was deleted.

29 changes: 10 additions & 19 deletions futures-util/src/future/catch_unwind.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::mem::Pin;
use std::prelude::v1::*;
use std::any::Any;
use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe};

use futures_core::{Future, Poll, Async};
use futures_core::{Future, Poll};
use futures_core::task;

/// Future for the `catch_unwind` combinator.
Expand All @@ -11,35 +12,25 @@ use futures_core::task;
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct CatchUnwind<F> where F: Future {
future: Option<F>,
future: F,
}

pub fn new<F>(future: F) -> CatchUnwind<F>
where F: Future + UnwindSafe,
{
CatchUnwind {
future: Some(future),
}
CatchUnwind { future }
}

impl<F> Future for CatchUnwind<F>
where F: Future + UnwindSafe,
{
type Item = Result<F::Item, F::Error>;
type Error = Box<Any + Send>;
type Output = Result<F::Output, Box<Any + Send>>;

fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
let mut future = self.future.take().expect("cannot poll twice");
let (res, future) = catch_unwind(AssertUnwindSafe(|| {
(future.poll(cx), future)
}))?;
match res {
Ok(Async::Pending) => {
self.future = Some(future);
Ok(Async::Pending)
}
Ok(Async::Ready(t)) => Ok(Async::Ready(Ok(t))),
Err(e) => Ok(Async::Ready(Err(e))),
fn poll(mut self: Pin<Self>, cx: &mut task::Context) -> Poll<Self::Output> {
let fut = unsafe { pinned_field!(self, future) };
match catch_unwind(AssertUnwindSafe(|| fut.poll(cx))) {
Ok(res) => res.map(Ok),
Err(e) => Poll::Ready(Err(e))
}
}
}
72 changes: 38 additions & 34 deletions futures-util/src/future/chain.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,53 @@
use core::mem;
use core::mem::Pin;

use futures_core::{Future, Poll, Async};
use futures_core::{Future, Poll};
use futures_core::task;

#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub enum Chain<A, B, C> where A: Future {
First(A, C),
Second(B),
Done,
pub enum Chain<Fut1, Fut2, Data> {
First(Fut1, Option<Data>),
Second(Fut2),
}

impl<A, B, C> Chain<A, B, C>
where A: Future,
B: Future,
impl<Fut1, Fut2, Data> Chain<Fut1, Fut2, Data>
where Fut1: Future,
Fut2: Future,
{
pub fn new(a: A, c: C) -> Chain<A, B, C> {
Chain::First(a, c)
pub fn new(fut1: Fut1, data: Data) -> Chain<Fut1, Fut2, Data> {
Chain::First(fut1, Some(data))
}

pub fn poll<F>(&mut self, cx: &mut task::Context, f: F) -> Poll<B::Item, B::Error>
where F: FnOnce(Result<A::Item, A::Error>, C)
-> Result<Result<B::Item, B>, B::Error>,
pub fn poll<F>(mut self: Pin<Self>, cx: &mut task::Context, f: F) -> Poll<Fut2::Output>
where F: FnOnce(Fut1::Output, Data) -> Fut2,
{
let a_result = match *self {
Chain::First(ref mut a, _) => {
match a.poll(cx) {
Ok(Async::Pending) => return Ok(Async::Pending),
Ok(Async::Ready(t)) => Ok(t),
Err(e) => Err(e),
let mut f = Some(f);

loop {
// safe to `get_mut` here because we don't move out
let fut2 = match *unsafe { Pin::get_mut(&mut self) } {
Chain::First(ref mut fut1, ref mut data) => {
// safe to create a new `Pin` because `fut1` will never move
// before it's dropped.
match unsafe { Pin::new_unchecked(fut1) }.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(t) => {
(f.take().unwrap())(t, data.take().unwrap())
}
}
}
}
Chain::Second(ref mut b) => return b.poll(cx),
Chain::Done => panic!("cannot poll a chained future twice"),
};
let data = match mem::replace(self, Chain::Done) {
Chain::First(_, c) => c,
_ => panic!(),
};
match f(a_result, data)? {
Ok(e) => Ok(Async::Ready(e)),
Err(mut b) => {
let ret = b.poll(cx);
*self = Chain::Second(b);
ret
Chain::Second(ref mut fut2) => {
// safe to create a new `Pin` because `fut2` will never move
// before it's dropped; once we're in `Chain::Second` we stay
// there forever.
return unsafe { Pin::new_unchecked(fut2) }.poll(cx)
}
};

// safe because we're using the `&mut` to do an assignment, not for moving out
unsafe {
// note: it's safe to move the `fut2` here because we haven't yet polled it
*Pin::get_mut(&mut self) = Chain::Second(fut2);
}
}
}
Expand Down
18 changes: 9 additions & 9 deletions futures-util/src/future/empty.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
//! Definition of the Empty combinator, a future that's never ready.
use core::mem::Pin;
use core::marker;

use futures_core::{Future, Poll, Async};
use futures_core::{Future, Poll};
use futures_core::task;

/// A future which is never resolved.
///
/// This future can be created with the `empty` function.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct Empty<T, E> {
_data: marker::PhantomData<(T, E)>,
pub struct Empty<T> {
_data: marker::PhantomData<T>,
}

/// Creates a future which never resolves, representing a computation that never
/// finishes.
///
/// The returned future will forever return `Async::Pending`.
pub fn empty<T, E>() -> Empty<T, E> {
pub fn empty<T>() -> Empty<T> {
Empty { _data: marker::PhantomData }
}

impl<T, E> Future for Empty<T, E> {
type Item = T;
type Error = E;
impl<T> Future for Empty<T> {
type Output = T;

fn poll(&mut self, _: &mut task::Context) -> Poll<T, E> {
Ok(Async::Pending)
fn poll(self: Pin<Self>, _: &mut task::Context) -> Poll<T> {
Poll::Pending
}
}
38 changes: 0 additions & 38 deletions futures-util/src/future/err_into.rs

This file was deleted.

25 changes: 10 additions & 15 deletions futures-util/src/future/flatten.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use core::fmt;
use core::mem::Pin;

use futures_core::{Future, IntoFuture, Poll};
use futures_core::{Future, Poll};
use futures_core::task;

use super::chain::Chain;
Expand All @@ -11,14 +12,13 @@ use super::chain::Chain;
///
/// This is created by the `Future::flatten` method.
#[must_use = "futures do nothing unless polled"]
pub struct Flatten<A> where A: Future, A::Item: IntoFuture {
state: Chain<A, <A::Item as IntoFuture>::Future, ()>,
pub struct Flatten<A> where A: Future, A::Output: Future {
state: Chain<A, A::Output, ()>,
}

impl<A> fmt::Debug for Flatten<A>
where A: Future + fmt::Debug,
A::Item: IntoFuture,
<<A as Future>::Item as IntoFuture>::Future: fmt::Debug,
A::Output: Future + fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Flatten")
Expand All @@ -29,7 +29,7 @@ impl<A> fmt::Debug for Flatten<A>

pub fn new<A>(future: A) -> Flatten<A>
where A: Future,
A::Item: IntoFuture,
A::Output: Future,
{
Flatten {
state: Chain::new(future, ()),
Expand All @@ -38,16 +38,11 @@ pub fn new<A>(future: A) -> Flatten<A>

impl<A> Future for Flatten<A>
where A: Future,
A::Item: IntoFuture,
<<A as Future>::Item as IntoFuture>::Error: From<<A as Future>::Error>
A::Output: Future,
{
type Item = <<A as Future>::Item as IntoFuture>::Item;
type Error = <<A as Future>::Item as IntoFuture>::Error;
type Output = <A::Output as Future>::Output;

fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
self.state.poll(cx, |a, ()| {
let future = a?.into_future();
Ok(Err(future))
})
fn poll(mut self: Pin<Self>, cx: &mut task::Context) -> Poll<Self::Output> {
unsafe { pinned_field!(self, state) }.poll(cx, |a, ()| a)
}
}
Loading

0 comments on commit c5bbcad

Please sign in to comment.