Skip to content

Commit

Permalink
Update to latest nightly
Browse files Browse the repository at this point in the history
  • Loading branch information
MajorBreakfast committed Jun 20, 2018
1 parent 79eec05 commit f8286dc
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 51 deletions.
2 changes: 1 addition & 1 deletion futures-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ mod poll;
pub use poll::Poll;

pub mod future;
pub use future::{Future, TryFuture};
pub use future::{Future, CoreFutureExt, TryFuture};

pub mod stream;
pub use stream::Stream;
Expand Down
11 changes: 3 additions & 8 deletions futures-core/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
use Future;

pub use core::task::{UnsafeWake, Waker};
pub use core::task::{UnsafeWake, Waker, LocalWaker};

#[cfg(feature = "std")]
pub use std::task::Wake;
pub use std::task::{Wake, local_waker, local_waker_from_nonlocal};

pub use core::task::Context;

Expand Down Expand Up @@ -37,10 +38,4 @@ if_std! {
.spawn_obj(TaskObj::new(PinBox::new(f))).unwrap()
}
}

impl<F: Future<Output = ()> + Send + 'static> From<Box<F>> for TaskObj {
fn from(boxed: Box<F>) -> Self {
TaskObj::from_poll_task(boxed) // ToDo: This is now wrong! Should be PinBox. Will be replaced with correct version from core soon!
}
}
}
2 changes: 1 addition & 1 deletion futures-executor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Built-in executors and related tools.
#![feature(pin, arbitrary_self_types)]
#![feature(pin, arbitrary_self_types, futures_api)]

#![no_std]
#![deny(missing_docs)]
Expand Down
26 changes: 13 additions & 13 deletions futures-executor/src/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::rc::{Rc, Weak};
use std::mem::PinMut;
use std::marker::Unpin;

use futures_core::{Future, Poll, Stream};
use futures_core::task::{Context, Waker, TaskObj};
use futures_core::{Future, CoreFutureExt, Poll, Stream};
use futures_core::task::{Context, LocalWaker, TaskObj, local_waker_from_nonlocal};
use futures_core::executor::{Executor, SpawnObjError, SpawnErrorKind};
use futures_util::stream::FuturesUnordered;
use futures_util::stream::StreamExt;
Expand Down Expand Up @@ -46,15 +46,15 @@ type Incoming = RefCell<Vec<TaskContainer>>;

// Set up and run a basic single-threaded executor loop, invocing `f` on each
// turn.
fn run_executor<T, F: FnMut(&Waker) -> Poll<T>>(mut f: F) -> T {
fn run_executor<T, F: FnMut(&LocalWaker) -> Poll<T>>(mut f: F) -> T {
let _enter = enter()
.expect("cannot execute `LocalPool` executor from within \
another executor");

ThreadNotify::with_current(|thread| {
let waker = &Waker::from(thread.clone());
let local_waker = local_waker_from_nonlocal(thread.clone());
loop {
if let Poll::Ready(t) = f(waker) {
if let Poll::Ready(t) = f(&local_waker) {
return t;
}
thread.park();
Expand Down Expand Up @@ -102,7 +102,7 @@ impl LocalPool {
/// The function will block the calling thread until *all* tasks in the pool
/// are complete, including any spawned while running existing tasks.
pub fn run<Exec>(&mut self, exec: &mut Exec) where Exec: Executor + Sized {
run_executor(|waker| self.poll_pool(waker, exec))
run_executor(|local_waker| self.poll_pool(local_waker, exec))
}

/// Runs all the tasks in the pool until the given future completes.
Expand Down Expand Up @@ -135,9 +135,9 @@ impl LocalPool {
pub fn run_until<F, Exec>(&mut self, mut f: F, exec: &mut Exec) -> F::Output
where F: Future + Unpin, Exec: Executor + Sized
{
run_executor(|waker| {
run_executor(|local_waker| {
{
let mut main_cx = Context::new(waker, exec);
let mut main_cx = Context::new(local_waker, exec);

// if our main task is done, so are we
match f.poll_unpin(&mut main_cx) {
Expand All @@ -146,17 +146,17 @@ impl LocalPool {
}
}

self.poll_pool(waker, exec);
self.poll_pool(local_waker, exec);
Poll::Pending
})
}

// Make maximal progress on the entire pool of spawned task, returning `Ready`
// if the pool is empty and `Pending` if no further progress can be made.
fn poll_pool<Exec>(&mut self, waker: &Waker, exec: &mut Exec) -> Poll<()>
fn poll_pool<Exec>(&mut self, local_waker: &LocalWaker, exec: &mut Exec) -> Poll<()>
where Exec: Executor + Sized {
// state for the FuturesUnordered, which will never be used
let mut pool_cx = Context::new(waker, exec);
let mut pool_cx = Context::new(local_waker, exec);

loop {
// empty the incoming queue of newly-spawned tasks
Expand Down Expand Up @@ -204,7 +204,7 @@ pub fn block_on<F: Future + Unpin>(f: F) -> F::Output {

/// Turn a stream into a blocking iterator.
///
/// Whne `next` is called on the resulting `BlockingStream`, the caller
/// When `next` is called on the resulting `BlockingStream`, the caller
/// will be blocked until the next element of the `Stream` becomes available.
/// The default executor for the future is a global `ThreadPool`.
pub fn block_on_stream<S: Stream>(s: S) -> BlockingStream<S> {
Expand Down Expand Up @@ -270,6 +270,6 @@ impl Future for TaskContainer {
type Output = ();

fn poll(mut self: PinMut<Self>, cx: &mut Context) -> Poll<()> {
self.task.poll_task(cx)
self.task.poll_unpin(cx)
}
}
4 changes: 2 additions & 2 deletions futures-executor/src/spawn.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures_core::{Future, Poll};
use futures_core::task::Context;
use futures_core::{Future, Poll, CoreFutureExt};
use futures_core::task::{Context, ContextExt};
use futures_channel::oneshot::{channel, Sender, Receiver};
use futures_util::FutureExt;

Expand Down
23 changes: 11 additions & 12 deletions futures-executor/src/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::fmt;
use std::marker::Unpin;

use futures_core::*;
use futures_core::task::{self, Wake, Waker, TaskObj};
use futures_core::task::{self, Wake, TaskObj, local_waker_from_nonlocal};
use futures_core::executor::{Executor, SpawnObjError};

use enter;
Expand Down Expand Up @@ -130,7 +130,7 @@ impl PoolState {
loop {
let msg = self.rx.lock().unwrap().recv().unwrap();
match msg {
Message::Run(r) => r.run(),
Message::Run(task_container) => task_container.run(),
Message::Close => break,
}
}
Expand Down Expand Up @@ -276,21 +276,20 @@ struct WakeHandle {
}

impl TaskContainer {
/// Actually run the task (invoking `poll` on its future) on the current
/// thread.
/// Actually run the task (invoking `poll`) on the current thread.
pub fn run(self) {
let TaskContainer { mut task, wake_handle, mut exec } = self;
let waker = Waker::from(wake_handle.clone());
let local_waker = local_waker_from_nonlocal(wake_handle.clone());

// SAFETY: the ownership of this `Task` object is evidence that
// SAFETY: the ownership of this `TaskContainer` object is evidence that
// we are in the `POLLING`/`REPOLL` state for the mutex.
unsafe {
wake_handle.mutex.start_poll();

loop {
let res = {
let mut cx = task::Context::new(&waker, &mut exec);
task.poll_task(&mut cx)
let mut cx = task::Context::new(&local_waker, &mut exec);
task.poll_unpin(&mut cx)
};
match res {
Poll::Pending => {}
Expand All @@ -302,10 +301,10 @@ impl TaskContainer {
exec: exec
};
match wake_handle.mutex.wait(task_container) {
Ok(()) => return, // we've waited
Err(r) => { // someone's notified us
task = r.task;
exec = r.exec;
Ok(()) => return, // we've waited
Err(task_container) => { // someone's notified us
task = task_container.task;
exec = task_container.exec;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion futures-executor/tests/local_pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(unused_imports)]

#![feature(pin, arbitrary_self_types)]
#![feature(pin, arbitrary_self_types, futures_api)]

extern crate futures;
extern crate futures_executor;
Expand Down
36 changes: 23 additions & 13 deletions futures-util/src/stream/futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ use std::mem;
use std::mem::PinMut;
use std::marker::Unpin;
use std::ptr;
use std::ptr::NonNull;
use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel};
use std::sync::atomic::{AtomicPtr, AtomicBool};
use std::sync::{Arc, Weak};
use std::usize;

use futures_core::{Stream, Future, Poll};
use futures_core::task::{self, AtomicWaker, UnsafeWake, Waker};
use futures_core::{Stream, Future, Poll, CoreFutureExt};
use futures_core::task::{self, AtomicWaker, UnsafeWake, Waker, LocalWaker};

/// A set of `Future`s which may complete in any order.
///
Expand Down Expand Up @@ -346,8 +347,8 @@ impl<T> Stream for FuturesUnordered<T>
// deallocating the node if need be.
let res = {
let notify = NodeToHandle(bomb.node.as_ref().unwrap());
let waker = Waker::from(notify);
let mut cx = cx.with_waker(&waker);
let local_waker = LocalWaker::from(notify);
let mut cx = cx.with_waker(&local_waker);
future.poll_unpin(&mut cx)
};

Expand Down Expand Up @@ -461,8 +462,8 @@ impl<T> Inner<T> {

/// The dequeue function from the 1024cores intrusive MPSC queue algorithm
///
/// Note that this unsafe as it required mutual exclusion (only one thread
/// can call this) to be guaranteed elsewhere.
/// Note that this is unsafe as it required mutual exclusion (only one
/// thread can call this) to be guaranteed elsewhere.
unsafe fn dequeue(&self) -> Dequeue<T> {
let mut tail = *self.tail_readiness.get();
let mut next = (*tail).next_readiness.load(Acquire);
Expand Down Expand Up @@ -534,13 +535,26 @@ impl<'a, T> Clone for NodeToHandle<'a, T> {
}
}

#[doc(hidden)]
impl<'a, T> From<NodeToHandle<'a, T>> for LocalWaker {
fn from(handle: NodeToHandle<'a, T>) -> LocalWaker {
unsafe {
let ptr: Arc<Node<T>> = handle.0.clone();
let ptr: *mut ArcNode<T> = mem::transmute(ptr);
let ptr = mem::transmute(ptr as *mut UnsafeWake); // Hide lifetime
LocalWaker::new(NonNull::new(ptr).unwrap())
}
}
}

#[doc(hidden)]
impl<'a, T> From<NodeToHandle<'a, T>> for Waker {
fn from(handle: NodeToHandle<'a, T>) -> Waker {
unsafe {
let ptr = handle.0.clone();
let ptr = mem::transmute::<Arc<Node<T>>, *const ArcNode<T>>(ptr);
Waker::new(hide_lt(ptr))
let ptr: Arc<Node<T>> = handle.0.clone();
let ptr: *mut ArcNode<T> = mem::transmute(ptr);
let ptr = mem::transmute(ptr as *mut UnsafeWake); // Hide lifetime
Waker::new(NonNull::new(ptr).unwrap())
}
}
}
Expand Down Expand Up @@ -574,10 +588,6 @@ unsafe impl<T> UnsafeWake for ArcNode<T> {
}
}

unsafe fn hide_lt<T>(p: *const ArcNode<T>) -> *const UnsafeWake {
mem::transmute(p as *const UnsafeWake)
}

impl<T> Node<T> {
fn notify(me: &Arc<Node<T>>) {
let inner = match me.queue.upgrade() {
Expand Down
2 changes: 2 additions & 0 deletions futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
//! streams and sinks, and then spawned as independent tasks that are run to
//! completion, but *do not block* the thread running them.
#![feature(futures_api)]

#![no_std]
#![doc(html_root_url = "https://docs.rs/futures/0.2.0")]

Expand Down

0 comments on commit f8286dc

Please sign in to comment.