diff --git a/futures-core/src/lib.rs b/futures-core/src/lib.rs index 546f685772..fae221e17d 100644 --- a/futures-core/src/lib.rs +++ b/futures-core/src/lib.rs @@ -67,7 +67,7 @@ mod poll; pub use poll::Poll; pub mod future; -pub use future::{Future, TryFuture}; +pub use future::{Future, CoreFutureExt, TryFuture}; pub mod stream; pub use stream::Stream; diff --git a/futures-core/src/task/mod.rs b/futures-core/src/task/mod.rs index ff00ab6030..1c973f882a 100644 --- a/futures-core/src/task/mod.rs +++ b/futures-core/src/task/mod.rs @@ -2,9 +2,10 @@ use Future; -pub use core::task::{UnsafeWake, Waker}; +pub use core::task::{UnsafeWake, Waker, LocalWaker}; + #[cfg(feature = "std")] -pub use std::task::Wake; +pub use std::task::{Wake, local_waker, local_waker_from_nonlocal}; pub use core::task::Context; @@ -37,10 +38,4 @@ if_std! { .spawn_obj(TaskObj::new(PinBox::new(f))).unwrap() } } - - impl + Send + 'static> From> for TaskObj { - fn from(boxed: Box) -> Self { - TaskObj::from_poll_task(boxed) // ToDo: This is now wrong! Should be PinBox. Will be replaced with correct version from core soon! - } - } } diff --git a/futures-executor/src/lib.rs b/futures-executor/src/lib.rs index 5e7b605e7d..e48f53d2fe 100644 --- a/futures-executor/src/lib.rs +++ b/futures-executor/src/lib.rs @@ -1,6 +1,6 @@ //! Built-in executors and related tools. -#![feature(pin, arbitrary_self_types)] +#![feature(pin, arbitrary_self_types, futures_api)] #![no_std] #![deny(missing_docs)] diff --git a/futures-executor/src/local_pool.rs b/futures-executor/src/local_pool.rs index b8c07308f7..7baa19f5b0 100644 --- a/futures-executor/src/local_pool.rs +++ b/futures-executor/src/local_pool.rs @@ -5,8 +5,8 @@ use std::rc::{Rc, Weak}; use std::mem::PinMut; use std::marker::Unpin; -use futures_core::{Future, Poll, Stream}; -use futures_core::task::{Context, Waker, TaskObj}; +use futures_core::{Future, CoreFutureExt, Poll, Stream}; +use futures_core::task::{Context, LocalWaker, TaskObj, local_waker_from_nonlocal}; use futures_core::executor::{Executor, SpawnObjError, SpawnErrorKind}; use futures_util::stream::FuturesUnordered; use futures_util::stream::StreamExt; @@ -46,15 +46,15 @@ type Incoming = RefCell>; // Set up and run a basic single-threaded executor loop, invocing `f` on each // turn. -fn run_executor Poll>(mut f: F) -> T { +fn run_executor Poll>(mut f: F) -> T { let _enter = enter() .expect("cannot execute `LocalPool` executor from within \ another executor"); ThreadNotify::with_current(|thread| { - let waker = &Waker::from(thread.clone()); + let local_waker = local_waker_from_nonlocal(thread.clone()); loop { - if let Poll::Ready(t) = f(waker) { + if let Poll::Ready(t) = f(&local_waker) { return t; } thread.park(); @@ -102,7 +102,7 @@ impl LocalPool { /// The function will block the calling thread until *all* tasks in the pool /// are complete, including any spawned while running existing tasks. pub fn run(&mut self, exec: &mut Exec) where Exec: Executor + Sized { - run_executor(|waker| self.poll_pool(waker, exec)) + run_executor(|local_waker| self.poll_pool(local_waker, exec)) } /// Runs all the tasks in the pool until the given future completes. @@ -135,9 +135,9 @@ impl LocalPool { pub fn run_until(&mut self, mut f: F, exec: &mut Exec) -> F::Output where F: Future + Unpin, Exec: Executor + Sized { - run_executor(|waker| { + run_executor(|local_waker| { { - let mut main_cx = Context::new(waker, exec); + let mut main_cx = Context::new(local_waker, exec); // if our main task is done, so are we match f.poll_unpin(&mut main_cx) { @@ -146,17 +146,17 @@ impl LocalPool { } } - self.poll_pool(waker, exec); + self.poll_pool(local_waker, exec); Poll::Pending }) } // Make maximal progress on the entire pool of spawned task, returning `Ready` // if the pool is empty and `Pending` if no further progress can be made. - fn poll_pool(&mut self, waker: &Waker, exec: &mut Exec) -> Poll<()> + fn poll_pool(&mut self, local_waker: &LocalWaker, exec: &mut Exec) -> Poll<()> where Exec: Executor + Sized { // state for the FuturesUnordered, which will never be used - let mut pool_cx = Context::new(waker, exec); + let mut pool_cx = Context::new(local_waker, exec); loop { // empty the incoming queue of newly-spawned tasks @@ -204,7 +204,7 @@ pub fn block_on(f: F) -> F::Output { /// Turn a stream into a blocking iterator. /// -/// Whne `next` is called on the resulting `BlockingStream`, the caller +/// When `next` is called on the resulting `BlockingStream`, the caller /// will be blocked until the next element of the `Stream` becomes available. /// The default executor for the future is a global `ThreadPool`. pub fn block_on_stream(s: S) -> BlockingStream { @@ -270,6 +270,6 @@ impl Future for TaskContainer { type Output = (); fn poll(mut self: PinMut, cx: &mut Context) -> Poll<()> { - self.task.poll_task(cx) + self.task.poll_unpin(cx) } } diff --git a/futures-executor/src/spawn.rs b/futures-executor/src/spawn.rs index 6a7335c539..ea251187d8 100644 --- a/futures-executor/src/spawn.rs +++ b/futures-executor/src/spawn.rs @@ -1,5 +1,5 @@ -use futures_core::{Future, Poll}; -use futures_core::task::Context; +use futures_core::{Future, Poll, CoreFutureExt}; +use futures_core::task::{Context, ContextExt}; use futures_channel::oneshot::{channel, Sender, Receiver}; use futures_util::FutureExt; diff --git a/futures-executor/src/thread_pool.rs b/futures-executor/src/thread_pool.rs index 0320a5261c..71d32130a7 100644 --- a/futures-executor/src/thread_pool.rs +++ b/futures-executor/src/thread_pool.rs @@ -9,7 +9,7 @@ use std::fmt; use std::marker::Unpin; use futures_core::*; -use futures_core::task::{self, Wake, Waker, TaskObj}; +use futures_core::task::{self, Wake, TaskObj, local_waker_from_nonlocal}; use futures_core::executor::{Executor, SpawnObjError}; use enter; @@ -130,7 +130,7 @@ impl PoolState { loop { let msg = self.rx.lock().unwrap().recv().unwrap(); match msg { - Message::Run(r) => r.run(), + Message::Run(task_container) => task_container.run(), Message::Close => break, } } @@ -276,21 +276,20 @@ struct WakeHandle { } impl TaskContainer { - /// Actually run the task (invoking `poll` on its future) on the current - /// thread. + /// Actually run the task (invoking `poll`) on the current thread. pub fn run(self) { let TaskContainer { mut task, wake_handle, mut exec } = self; - let waker = Waker::from(wake_handle.clone()); + let local_waker = local_waker_from_nonlocal(wake_handle.clone()); - // SAFETY: the ownership of this `Task` object is evidence that + // SAFETY: the ownership of this `TaskContainer` object is evidence that // we are in the `POLLING`/`REPOLL` state for the mutex. unsafe { wake_handle.mutex.start_poll(); loop { let res = { - let mut cx = task::Context::new(&waker, &mut exec); - task.poll_task(&mut cx) + let mut cx = task::Context::new(&local_waker, &mut exec); + task.poll_unpin(&mut cx) }; match res { Poll::Pending => {} @@ -302,10 +301,10 @@ impl TaskContainer { exec: exec }; match wake_handle.mutex.wait(task_container) { - Ok(()) => return, // we've waited - Err(r) => { // someone's notified us - task = r.task; - exec = r.exec; + Ok(()) => return, // we've waited + Err(task_container) => { // someone's notified us + task = task_container.task; + exec = task_container.exec; } } } diff --git a/futures-executor/tests/local_pool.rs b/futures-executor/tests/local_pool.rs index e96187b788..f286753d79 100755 --- a/futures-executor/tests/local_pool.rs +++ b/futures-executor/tests/local_pool.rs @@ -1,6 +1,6 @@ #![allow(unused_imports)] -#![feature(pin, arbitrary_self_types)] +#![feature(pin, arbitrary_self_types, futures_api)] extern crate futures; extern crate futures_executor; diff --git a/futures-util/src/stream/futures_unordered.rs b/futures-util/src/stream/futures_unordered.rs index 4dfe91a0d6..79ef1dd2d0 100644 --- a/futures-util/src/stream/futures_unordered.rs +++ b/futures-util/src/stream/futures_unordered.rs @@ -8,13 +8,14 @@ use std::mem; use std::mem::PinMut; use std::marker::Unpin; use std::ptr; +use std::ptr::NonNull; use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel}; use std::sync::atomic::{AtomicPtr, AtomicBool}; use std::sync::{Arc, Weak}; use std::usize; -use futures_core::{Stream, Future, Poll}; -use futures_core::task::{self, AtomicWaker, UnsafeWake, Waker}; +use futures_core::{Stream, Future, Poll, CoreFutureExt}; +use futures_core::task::{self, AtomicWaker, UnsafeWake, Waker, LocalWaker}; /// A set of `Future`s which may complete in any order. /// @@ -346,8 +347,8 @@ impl Stream for FuturesUnordered // deallocating the node if need be. let res = { let notify = NodeToHandle(bomb.node.as_ref().unwrap()); - let waker = Waker::from(notify); - let mut cx = cx.with_waker(&waker); + let local_waker = LocalWaker::from(notify); + let mut cx = cx.with_waker(&local_waker); future.poll_unpin(&mut cx) }; @@ -461,8 +462,8 @@ impl Inner { /// The dequeue function from the 1024cores intrusive MPSC queue algorithm /// - /// Note that this unsafe as it required mutual exclusion (only one thread - /// can call this) to be guaranteed elsewhere. + /// Note that this is unsafe as it required mutual exclusion (only one + /// thread can call this) to be guaranteed elsewhere. unsafe fn dequeue(&self) -> Dequeue { let mut tail = *self.tail_readiness.get(); let mut next = (*tail).next_readiness.load(Acquire); @@ -534,13 +535,26 @@ impl<'a, T> Clone for NodeToHandle<'a, T> { } } +#[doc(hidden)] +impl<'a, T> From> for LocalWaker { + fn from(handle: NodeToHandle<'a, T>) -> LocalWaker { + unsafe { + let ptr: Arc> = handle.0.clone(); + let ptr: *mut ArcNode = mem::transmute(ptr); + let ptr = mem::transmute(ptr as *mut UnsafeWake); // Hide lifetime + LocalWaker::new(NonNull::new(ptr).unwrap()) + } + } +} + #[doc(hidden)] impl<'a, T> From> for Waker { fn from(handle: NodeToHandle<'a, T>) -> Waker { unsafe { - let ptr = handle.0.clone(); - let ptr = mem::transmute::>, *const ArcNode>(ptr); - Waker::new(hide_lt(ptr)) + let ptr: Arc> = handle.0.clone(); + let ptr: *mut ArcNode = mem::transmute(ptr); + let ptr = mem::transmute(ptr as *mut UnsafeWake); // Hide lifetime + Waker::new(NonNull::new(ptr).unwrap()) } } } @@ -574,10 +588,6 @@ unsafe impl UnsafeWake for ArcNode { } } -unsafe fn hide_lt(p: *const ArcNode) -> *const UnsafeWake { - mem::transmute(p as *const UnsafeWake) -} - impl Node { fn notify(me: &Arc>) { let inner = match me.queue.upgrade() { diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 8a57d8468b..24ff6963df 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -19,6 +19,8 @@ //! streams and sinks, and then spawned as independent tasks that are run to //! completion, but *do not block* the thread running them. +#![feature(futures_api)] + #![no_std] #![doc(html_root_url = "https://docs.rs/futures/0.2.0")]