diff --git a/benches/current_thread_executor.rs b/benches/current_thread_executor.rs new file mode 100644 index 0000000000..e69858a537 --- /dev/null +++ b/benches/current_thread_executor.rs @@ -0,0 +1,92 @@ +#![feature(test)] + +extern crate test; +extern crate futures; + +use futures::{task, Async}; +use futures::current_thread; +use futures::future::{lazy, poll_fn}; + +use test::Bencher; + +use std::cell::Cell; +use std::rc::Rc; + +#[bench] +fn execute_oneshot(b: &mut Bencher) { + const ITER: usize = 1000; + + b.iter(move || { + let cnt = Rc::new(Cell::new(0)); + + current_thread::run(|_| { + for _ in 0..ITER { + let cnt = cnt.clone(); + current_thread::spawn(lazy(move || { + cnt.set(1 + cnt.get()); + Ok::<(), ()>(()) + })); + } + }); + + assert_eq!(cnt.get(), ITER); + }); +} + +#[bench] +fn execute_yield_many(b: &mut Bencher) { + const YIELDS: usize = 500; + const TASKS: usize = 20; + + b.iter(move || { + let cnt = Rc::new(Cell::new(0)); + + current_thread::run(|_| { + for _ in 0..TASKS { + let cnt = cnt.clone(); + let mut rem = YIELDS; + + current_thread::spawn(poll_fn(move || { + cnt.set(1 + cnt.get()); + rem -= 1; + + if rem == 0 { + Ok::<_, ()>(().into()) + } else { + task::current().notify(); + Ok(Async::NotReady) + } + })); + } + }); + + assert_eq!(cnt.get(), YIELDS * TASKS); + }); +} + +#[bench] +fn execute_daisy(b: &mut Bencher) { + const DEPTH: usize = 1000; + + let cnt = Rc::new(Cell::new(0)); + + fn daisy(rem: usize, cnt: Rc>) { + if rem > 0 { + current_thread::spawn(lazy(move || { + cnt.set(1 + cnt.get()); + daisy(rem - 1, cnt); + Ok(()) + })); + } + } + + b.iter(move || { + cnt.set(0); + + current_thread::run(|_| { + daisy(DEPTH, cnt.clone()); + }); + + assert_eq!(cnt.get(), DEPTH); + }); +} diff --git a/src/current_thread.rs b/src/current_thread.rs new file mode 100644 index 0000000000..7db4c25f2c --- /dev/null +++ b/src/current_thread.rs @@ -0,0 +1,426 @@ +//! Execute tasks on the current thread +//! +//! This module implements an executor that keeps futures on the same thread +//! that they are submitted on. This allows it to execute futures that are +//! not `Send`. For more details on general executor concepts, like executing +//! futures, see [here]. +//! +//! Before being able to spawn futures with this module, an executor +//! context must be setup by calling [`run`]. From within that context [`spawn`] +//! may be called with the future to run in the background. +//! +//! ``` +//! # use futures::current_thread::*; +//! use futures::future::lazy; +//! +//! // Calling execute here results in a panic +//! // spawn(my_future); +//! +//! run(|_| { +//! // The execution context is setup, futures may be executed. +//! spawn(lazy(|| { +//! println!("called from the current thread executor"); +//! Ok(()) +//! })); +//! }); +//! ``` +//! +//! # Execution model +//! +//! When an execution context is setup with `run` the current thread will block +//! and all the futures managed by the executor are driven to completion. +//! Whenever a future receives a notification, it is pushed to the end of a +//! scheduled list. The executor will drain this list, advancing the state of +//! each future. +//! +//! All futures managed by this module will remain on the current thread, +//! as such, this module is able to safely execute futures that are not `Send`. +//! +//! Once a future is complete, it is dropped. Once all futures are completed, +//! [`run`] will unblock and return. +//! +//! This module makes a best effort to fairly schedule futures that it manages. +//! +//! [here]: https://tokio.rs/docs/going-deeper-futures/tasks/ +//! [`spawn`]: fn.spawn.html +//! [`run`]: fn.run.html + +use Async; +use executor::{self, Spawn, Sleep, Wakeup}; +use future::{Future, Executor, ExecuteError, ExecuteErrorKind}; +use scheduler; +use task_impl::ThreadNotify; + +use std::prelude::v1::*; + +use std::{fmt, thread}; +use std::cell::Cell; +use std::rc::Rc; + +/// Executes futures on the current thread. +/// +/// All futures executed using this executor will be executed on the current +/// thread. As such, `run` will wait for these futures to complete before +/// returning. +/// +/// For more details, see the [module level](index.html) documentation. +#[derive(Debug, Clone)] +pub struct TaskExecutor { + // Prevent the handle from moving across threads. + _p: ::std::marker::PhantomData>, +} + +/// A context yielded to the closure provided to `run`. +/// +/// This context is mostly a future-proofing of the library to add future +/// contextual information into it. Currently it only contains the `Enter` +/// instance used to reserve the current thread for blocking on futures. +#[derive(Debug)] +pub struct Context<'a> { + enter: executor::Enter, + cancel: &'a Cell, +} + +/// Implements the "blocking" logic for the current thread executor. A +/// `TaskRunner` will be created during `run` and will sit on the stack until +/// execution is complete. +#[derive(Debug)] +struct TaskRunner { + /// Executes futures. + scheduler: Scheduler, +} + +struct CurrentRunner { + /// When set to true, the executor should return immediately, even if there + /// still futures to run. + cancel: Cell, + + /// Number of futures currently being executed by the runner. + num_futures: Cell, + + /// Raw pointer to the current scheduler pusher. + /// + /// The raw pointer is required in order to store it in a thread-local slot. + schedule: Cell>, +} + +type Scheduler = scheduler::Scheduler; +type Schedule = scheduler::Schedule; + +struct Task(Spawn>>); + +/// Current thread's task runner. This is set in `TaskRunner::with` +thread_local!(static CURRENT: CurrentRunner = CurrentRunner { + cancel: Cell::new(false), + num_futures: Cell::new(0), + schedule: Cell::new(None), +}); + +/// Calls the given closure, then block until all futures submitted for +/// execution complete. +/// +/// In more detail, this function will block until: +/// - All executing futures are complete, or +/// - `cancel_all_spawned` is invoked. +pub fn run(f: F) -> R +where F: FnOnce(&mut Context) -> R +{ + ThreadNotify::with_current(|mut thread_notify| { + TaskRunner::enter(&mut thread_notify, f) + }) +} + +/// Calls the given closure with a custom sleep strategy. +/// +/// This function is the same as `run` except that it allows customizing the +/// sleep strategy. +pub fn run_with_sleep(sleep: &mut S, f: F) -> R +where F: FnOnce(&mut Context) -> R, + S: Sleep, +{ + TaskRunner::enter(sleep, f) +} + +/// Executes a future on the current thread. +/// +/// The provided future must complete or be canceled before `run` will return. +/// +/// # Panics +/// +/// This function can only be invoked from the context of a `run` call; any +/// other use will result in a panic. +pub fn spawn(future: F) +where F: Future + 'static +{ + execute(future).unwrap_or_else(|_| { + panic!("cannot call `execute` unless the thread is already \ + in the context of a call to `run`") + }) +} + +/// Returns an executor that executes futures on the current thread. +/// +/// The user of `TaskExecutor` must ensure that when a future is submitted, +/// that it is done within the context of a call to `run`. +/// +/// For more details, see the [module level](index.html) documentation. +pub fn task_executor() -> TaskExecutor { + TaskExecutor { + _p: ::std::marker::PhantomData, + } +} + +impl Executor for TaskExecutor +where F: Future + 'static +{ + fn execute(&self, future: F) -> Result<(), ExecuteError> { + execute(future) + } +} + +impl<'a> Context<'a> { + /// Returns a reference to the executor `Enter` handle. + /// + /// This can be used to schedule callbacks to run when the `Enter` scope is + /// exited or otherwise pass the instance of `Enter` to other APIs which + /// work with `Enter`. + pub fn enter(&self) -> &executor::Enter { + &self.enter + } + + /// Cancels *all* executing futures. + pub fn cancel_all_spawned(&self) { + self.cancel.set(true); + } +} + +/// Submits a future to the current executor. This is done by +/// checking the thread-local variable tracking the current executor. +/// +/// If this function is not called in context of an executor, i.e. outside of +/// `run`, then `Err` is returned. +/// +/// This function does not panic. +fn execute(future: F) -> Result<(), ExecuteError> +where F: Future + 'static, +{ + CURRENT.with(|current| { + match current.schedule.get() { + Some(schedule) => { + let spawned = Task::new(future); + + let num_futures = current.num_futures.get(); + current.num_futures.set(num_futures + 1); + + unsafe { (*schedule).schedule(spawned); } + + Ok(()) + } + None => { + Err(ExecuteError::new(ExecuteErrorKind::Shutdown, future)) + } + } + }) +} + +impl TaskRunner +where T: Wakeup, +{ + /// Return a new `TaskRunner` + fn new(wakeup: T) -> TaskRunner { + let scheduler = scheduler::Scheduler::new(wakeup); + + TaskRunner { + scheduler: scheduler, + } + } + + /// Enter a new `TaskRunner` context + /// + /// This function handles advancing the scheduler state and blocking while + /// listening for notified futures. + /// + /// First, a new task runner is created backed by the current `ThreadNotify` + /// handle. Passing `ThreadNotify` into the scheduler is how scheduled + /// futures unblock the thread, signalling that there is more work to do. + /// + /// Before any future is polled, the scheduler must be set to a thread-local + /// variable so that `execute` is able to submit new futures to the current + /// executor. Because `Scheduler::schedule` requires `&mut self`, this + /// introduces a mutability hazard. This hazard is minimized with some + /// indirection. See `set_schedule` for more details. + /// + /// Once all context is setup, the init closure is invoked. This is the + /// "boostrapping" process that executes the initial futures into the + /// scheduler. After this, the function loops and advances the scheduler + /// state until all futures complete. When no scheduled futures are ready to + /// be advanced, the thread is blocked using `S: Sleep`. + fn enter(sleep: &mut S, f: F) -> R + where F: FnOnce(&mut Context) -> R, + S: Sleep, + { + let mut runner = TaskRunner::new(sleep.wakeup()); + + CURRENT.with(|current| { + // Make sure that another task runner is not set. + // + // This should not be ever possible due to how `set_schedule` + // is setup, but better safe than sorry! + assert!(current.schedule.get().is_none()); + + let enter = executor::enter() + .expect("cannot execute `current_thread` executor from within \ + another executor"); + + // Enter an execution scope + let mut ctx = Context { + enter: enter, + cancel: ¤t.cancel, + }; + + // Set the scheduler to the TLS and perform setup work, + // returning a future to execute. + // + // This could possibly suubmit other futures for execution. + let ret = current.set_schedule(&mut runner.scheduler as &mut Schedule, || { + f(&mut ctx) + }); + + // Execute the runner. + // + // This function will not return until either + // + // a) All futures have completed execution + // b) `cancel_all_spawned` is called, forcing the executor to + // return. + runner.run(sleep, current); + + // Not technically required, but this makes the fact that `ctx` + // needs to live until this point explicit. + drop(ctx); + + ret + }) + } + + fn run(&mut self, sleep: &mut S, current: &CurrentRunner) + where S: Sleep, + { + use scheduler::Tick; + + while current.is_running() { + // Try to advance the scheduler state + let res = self.scheduler.tick(|scheduler, spawned, notify| { + // `scheduler` is a `&mut Scheduler` reference returned back + // from the scheduler to us, but only within the context of this + // closure. + // + // This lets us push new futures into the scheduler. It also + // lets us pass the scheduler mutable reference into + // `set_schedule`, which sets the thread-local variable that + // `spawn` uses for submitting new futures to the + // "current" executor. + // + // See `set_schedule` documentation for more details on how we + // guard against mutable pointer aliasing. + current.set_schedule(scheduler as &mut Schedule, || { + match spawned.0.poll_future_notify(notify, 0) { + Ok(Async::Ready(_)) | Err(_) => { + Async::Ready(()) + } + Ok(Async::NotReady) => Async::NotReady, + } + }) + }); + + // Process the result of ticking the scheduler + match res { + // A future completed. `is_daemon` is true when the future was + // submitted as a daemon future. + Tick::Data(_) => { + let num_futures = current.num_futures.get(); + debug_assert!(num_futures > 0); + current.num_futures.set(num_futures - 1); + }, + Tick::Empty => { + // The scheduler did not have any work to process. + // + // At this point, the scheduler is currently running given + // that the `while` condition was true and no user code has + // been executed. + + debug_assert!(current.is_running()); + + // Block the current thread until a future managed by the scheduler + // receives a readiness notification. + sleep.sleep(); + } + Tick::Inconsistent => { + // Yield the thread and loop + thread::yield_now(); + } + } + } + } +} + +impl CurrentRunner { + /// Set the provided schedule handle to the TLS slot for the duration of the + /// closure. + /// + /// `spawn` will access the CURRENT thread-local variable in + /// order to push a future into the scheduler. This requires a `&mut` + /// reference, introducing mutability hazards. + /// + /// Rust requires that `&mut` references are not aliases, i.e. there are + /// never two "live" mutable references to the same piece of data. In order + /// to store a `&mut` reference in a thread-local variable, we must ensure + /// that one can not access the scheduler anywhere else. + /// + /// To do this, we only allow access to the thread local variable from + /// within the closure passed to `set_schedule`. This function also takes a + /// &mut reference to the scheduler, which is essentially holding a "lock" + /// on that reference, preventing any other location in the code from + /// also getting that &mut reference. + /// + /// When `set_schedule` returns, the thread-local variable containing the + /// mut reference is set to null. This is done even if the closure panics. + /// + /// This reduces the odds of introducing pointer aliasing. + fn set_schedule(&self, schedule: &mut Schedule, f: F) -> R + where F: FnOnce() -> R + { + // Ensure that the runner is removed from the thread-local context + // when leaving the scope. This handles cases that involve panicking. + struct Reset<'a>(&'a CurrentRunner); + + impl<'a> Drop for Reset<'a> { + fn drop(&mut self) { + self.0.schedule.set(None); + } + } + + let _reset = Reset(self); + + self.schedule.set(Some(schedule as *mut Schedule)); + + f() + } + + fn is_running(&self) -> bool { + self.num_futures.get() > 0 && !self.cancel.get() + } +} + +impl Task { + fn new + 'static>(f: T) -> Self { + Task(executor::spawn(Box::new(f))) + } +} + +impl fmt::Debug for Task { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Task") + .finish() + } +} diff --git a/src/executor/enter.rs b/src/executor/enter.rs new file mode 100644 index 0000000000..5b263cac13 --- /dev/null +++ b/src/executor/enter.rs @@ -0,0 +1,89 @@ +use std::prelude::v1::*; +use std::cell::{RefCell, Cell}; +use std::fmt; + +thread_local!(static ENTERED: Cell = Cell::new(false)); + +/// Represents an executor context. +/// +/// For more details, see [`enter` documentation](fn.enter.html) +pub struct Enter { + on_exit: RefCell>>, + permanent: bool, +} + +/// Marks the current thread as being within the dynamic extent of an +/// executor. +/// +/// Executor implementations should call this function before blocking the +/// thread. If `None` is returned, the executor should fail by panicking or +/// taking some other action without blocking the current thread. This prevents +/// deadlocks due to multiple executors competing for the same thread. +/// +/// # Panics +/// +/// Panics if the current thread is *already* marked. +pub fn enter() -> Option { + ENTERED.with(|c| { + if c.get() { + None + } else { + c.set(true); + + Some(Enter { + on_exit: RefCell::new(Vec::new()), + permanent: false, + }) + } + }) +} + +impl Enter { + /// Register a callback to be invoked if and when the thread + /// ceased to act as an executor. + pub fn on_exit(&self, f: F) where F: FnOnce() + 'static { + self.on_exit.borrow_mut().push(Box::new(f)); + } + + /// Treat the remainder of execution on this thread as part of an + /// executor; used mostly for thread pool worker threads. + /// + /// All registered `on_exit` callbacks are *dropped* without being + /// invoked. + pub fn make_permanent(mut self) { + self.permanent = true; + } +} + +impl fmt::Debug for Enter { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Enter").finish() + } +} + +impl Drop for Enter { + fn drop(&mut self) { + ENTERED.with(|c| { + assert!(c.get()); + if self.permanent { + return + } + + let mut on_exit = self.on_exit.borrow_mut(); + for callback in on_exit.drain(..) { + callback.call(); + } + c.set(false); + }); + } +} + +trait Callback: 'static { + fn call(self: Box); +} + +impl Callback for F { + fn call(self: Box) { + (*self)() + } +} diff --git a/src/executor.rs b/src/executor/mod.rs similarity index 71% rename from src/executor.rs rename to src/executor/mod.rs index b6b6d422a8..7833216ce0 100644 --- a/src/executor.rs +++ b/src/executor/mod.rs @@ -7,6 +7,12 @@ //! //! [online]: https://tokio.rs/docs/going-deeper-futures/tasks/ +#[cfg(feature = "use_std")] +mod enter; + +#[cfg(feature = "use_std")] +mod sleep; + #[allow(deprecated)] #[cfg(feature = "use_std")] pub use task_impl::{Unpark, Executor, Run}; @@ -14,3 +20,9 @@ pub use task_impl::{Unpark, Executor, Run}; pub use task_impl::{Spawn, spawn, Notify, with_notify}; pub use task_impl::{UnsafeNotify, NotifyHandle}; + +#[cfg(feature = "use_std")] +pub use self::enter::{enter, Enter}; + +#[cfg(feature = "use_std")] +pub use self::sleep::{Sleep, Wakeup}; diff --git a/src/executor/sleep.rs b/src/executor/sleep.rs new file mode 100644 index 0000000000..dbdaa8dbe0 --- /dev/null +++ b/src/executor/sleep.rs @@ -0,0 +1,22 @@ +use std::time::Duration; + +/// Puts the current thread to sleep. +pub trait Sleep { + /// Wake up handle. + type Wakeup: Wakeup; + + /// Get a new `Wakeup` handle. + fn wakeup(&self) -> Self::Wakeup; + + /// Put the current thread to sleep. + fn sleep(&mut self); + + /// Put the current thread to sleep for at most `duration`. + fn sleep_timeout(&mut self, duration: Duration); +} + +/// Wake up a sleeping thread. +pub trait Wakeup: Send + Sync + 'static { + /// Wake up the sleeping thread. + fn wakeup(&self); +} diff --git a/src/future/blocking.rs b/src/future/blocking.rs new file mode 100644 index 0000000000..10c96614c4 --- /dev/null +++ b/src/future/blocking.rs @@ -0,0 +1,100 @@ +use Async; +use future::Future; +use executor::{self, NotifyHandle}; +use task_impl::ThreadNotify; + +/// Provides thread-blocking operations on a future. +/// +/// See [`blocking`](fn.blocking.html) documentation for more details. +#[derive(Debug)] +#[must_use = "futures do nothing unless used"] +pub struct Blocking { + inner: executor::Spawn, +} + +/// Provides thread-blocking operations on a future. +/// +/// `blocking` consumes ownership of `future`, returning a `Blocking` backed by +/// the future. The `Blocking` value exposes thread-blocking operations that +/// allow getting the realized value of the future. For example, +/// `Blocking::wait` will block the current thread until the inner future has +/// completed and return the completed value. +/// +/// **These operations will block the current thread**. This means that they +/// should not be called while in the context of a task executor as this will +/// block the task executor's progress. +pub fn blocking(future: T) -> Blocking { + let inner = executor::spawn(future); + Blocking { inner: inner } +} + +impl Blocking { + /// Query the inner future to see if its value has become available. + /// + /// Unlike `Future::poll`, this function does **not** register interest if + /// the inner future is not in a ready state. + /// + /// This function will return immediately if the inner future is not ready. + pub fn try_take(&mut self) -> Option> { + match self.inner.poll_future_notify(&NotifyHandle::noop(), 0) { + Ok(Async::NotReady) => None, + Ok(Async::Ready(v)) => Some(Ok(v)), + Err(e) => Some(Err(e)), + } + } + + /// Block the current thread until this future is resolved. + /// + /// This method will drive the inner future to completion via + /// `Future::poll`. **The current thread will be blocked** until the future + /// transitions to a ready state. Once the future is complete, the result of + /// this future is returned. + /// + /// > **Note:** This method is not appropriate to call on event loops or + /// > similar I/O situations because it will prevent the event + /// > loop from making progress (this blocks the thread). This + /// > method should only be called when it's guaranteed that the + /// > blocking work associated with this future will be completed + /// > by another thread. + /// + /// This method is only available when the `use_std` feature of this + /// library is activated, and it is activated by default. + /// + /// # Panics + /// + /// This method panics if called from within an executor. + /// + /// This method does not attempt to catch panics. If the `poll` function of + /// the inner future panics, the panic will be propagated to the caller. + pub fn wait(&mut self) -> Result { + let _enter = executor::enter() + .expect("cannot call `future::Blocking::wait` from within \ + another executor."); + + ThreadNotify::with_current(|notify| { + loop { + match self.inner.poll_future_notify(notify, 0)? { + Async::NotReady => notify.park(), + Async::Ready(e) => return Ok(e), + } + } + }) + } +} + +impl Blocking { + /// Get a shared reference to the inner future. + pub fn get_ref(&self) -> &T { + self.inner.get_ref() + } + + /// Get a mutable reference to the inner future. + pub fn get_mut(&mut self) -> &mut T { + self.inner.get_mut() + } + + /// Consume the `Blocking`, returning its inner future. + pub fn into_inner(self) -> T { + self.inner.into_inner() + } +} diff --git a/src/future/mod.rs b/src/future/mod.rs index 7cccd907b0..5ce55fb8ef 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -78,11 +78,13 @@ pub use self::either::Either; pub use self::inspect::Inspect; if_std! { + mod blocking; mod catch_unwind; mod join_all; mod select_all; mod select_ok; mod shared; + pub use self::blocking::{blocking, Blocking}; pub use self::catch_unwind::CatchUnwind; pub use self::join_all::{join_all, JoinAll}; pub use self::select_all::{SelectAll, SelectAllNext, select_all}; @@ -271,28 +273,10 @@ pub trait Future { /// error. fn poll(&mut self) -> Poll; - /// Block the current thread until this future is resolved. - /// - /// This method will consume ownership of this future, driving it to - /// completion via `poll` and blocking the current thread while it's waiting - /// for the value to become available. Once the future is resolved the - /// result of this future is returned. - /// - /// > **Note:** This method is not appropriate to call on event loops or - /// > similar I/O situations because it will prevent the event - /// > loop from making progress (this blocks the thread). This - /// > method should only be called when it's guaranteed that the - /// > blocking work associated with this future will be completed - /// > by another thread. - /// - /// This method is only available when the `use_std` feature of this - /// library is activated, and it is activated by default. - /// - /// # Panics - /// - /// This function does not attempt to catch panics. If the `poll` function - /// of this future panics, panics will be propagated to the caller. #[cfg(feature = "use_std")] + #[doc(hidden)] + #[allow(deprecated)] + #[deprecated(note = "use `future::blocking` instead")] fn wait(self) -> result::Result where Self: Sized { diff --git a/src/lib.rs b/src/lib.rs index ac27d3bc5f..49d4b59e8d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -202,6 +202,8 @@ mod lock; mod task_impl; mod resultstream; +#[cfg(feature = "use_std")] +mod scheduler; pub mod task; pub mod executor; @@ -209,6 +211,8 @@ pub mod executor; pub mod sync; #[cfg(feature = "use_std")] pub mod unsync; +#[cfg(feature = "use_std")] +pub mod current_thread; if_std! { diff --git a/src/scheduler.rs b/src/scheduler.rs new file mode 100644 index 0000000000..1db32d3164 --- /dev/null +++ b/src/scheduler.rs @@ -0,0 +1,722 @@ +//! An unbounded set of futures. + +use Async; +use executor::{self, UnsafeNotify, NotifyHandle, Wakeup}; + +use std::cell::UnsafeCell; +use std::fmt::{self, Debug}; +use std::marker::PhantomData; +use std::mem; +use std::ptr; +use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel}; +use std::sync::atomic::{AtomicPtr, AtomicBool}; +use std::sync::{Arc, Weak}; +use std::usize; + +/// A generic task-aware scheduler. +/// +/// This is used both by `FuturesUnordered` and the current-thread executor. +pub struct Scheduler { + inner: Arc>, + nodes: List, +} + +/// Schedule new futures +pub trait Schedule { + /// Schedule a new future. + fn schedule(&mut self, item: T); +} + +pub struct Notify<'a, T: 'a, W: 'a>(&'a Arc>); + +// A linked-list of nodes +struct List { + len: usize, + head: *const Node, + tail: *const Node, +} + +unsafe impl Send for Scheduler {} +unsafe impl Sync for Scheduler {} + +// Scheduler is implemented using two linked lists. The first linked list tracks +// all items managed by a `Scheduler`. This list is stored on the `Scheduler` +// struct and is **not** thread safe. The second linked list is an +// implementation of the intrusive MPSC queue algorithm described by +// 1024cores.net and is stored on `Inner`. This linked list can push items to +// the back concurrently but only one consumer may pop from the front. To +// enforce this requirement, all popping will be performed via fns on +// `Scheduler` that take `&mut self`. +// +// When a item is submitted to the set a node is allocated and inserted in +// both linked lists. This means that all insertion operations **must** be +// originated from `Scheduler` with `&mut self` The next call to `tick` will +// (eventually) see this node and call `poll` on the item. +// +// Nodes are wrapped in `Arc` cells which manage the lifetime of the node. +// However, `Arc` handles are sometimes cast to `*const Node` pointers. +// Specifically, when a node is stored in at least one of the two lists +// described above, this represents a logical `Arc` handle. This is how +// `Scheduler` maintains its reference to all nodes it manages. Each +// `NotifyHande` instance is an `Arc` as well. +// +// When `Scheduler` drops, it clears the linked list of all nodes that it +// manages. When doing so, it must attempt to decrement the reference count (by +// dropping an Arc handle). However, it can **only** decrement the reference +// count if the node is not currently stored in the mpsc channel. If the node +// **is** "queued" in the mpsc channel, then the arc reference count cannot be +// decremented. Once the node is popped from the mpsc channel, then the final +// arc reference count can be decremented, thus freeing the node. + +#[allow(missing_debug_implementations)] +struct Inner { + // The task using `Scheduler`. + wakeup: W, + + // Head/tail of the readiness queue + head_readiness: AtomicPtr>, + tail_readiness: UnsafeCell<*const Node>, + + // Used as part of the MPSC queue algorithm + stub: Arc>, +} + +struct Node { + // The item + item: UnsafeCell>, + + // Next pointer for linked list tracking all active nodes + next_all: UnsafeCell<*const Node>, + + // Previous node in linked list tracking all active nodes + prev_all: UnsafeCell<*const Node>, + + // Next pointer in readiness queue + next_readiness: AtomicPtr>, + + // Whether or not this node is currently in the mpsc queue. + queued: AtomicBool, + + // Queue that we'll be enqueued to when notified + queue: Weak>, +} + +/// Returned by the `Scheduler::tick` function, allowing the caller to decide +/// what action to take next. +pub enum Tick { + Data(T), + Empty, + Inconsistent, +} + +/// Returned by `Inner::dequeue`, representing either a dequeue success (with +/// the dequeued node), an empty list, or an inconsistent state. +/// +/// The inconsistent state is described in more detail at [1024cores], but +/// roughly indicates that a node will be ready to dequeue sometime shortly in +/// the future and the caller should try again soon. +/// +/// [1024cores]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue +enum Dequeue { + Data(*const Node), + Empty, + Inconsistent, +} + +impl Scheduler +where W: Wakeup, +{ + /// Constructs a new, empty `Scheduler` + /// + /// The returned `Scheduler` does not contain any items and, in this + /// state, `Scheduler::poll` will return `Ok(Async::Ready(None))`. + pub fn new(wakeup: W) -> Self { + let stub = Arc::new(Node { + item: UnsafeCell::new(None), + next_all: UnsafeCell::new(ptr::null()), + prev_all: UnsafeCell::new(ptr::null()), + next_readiness: AtomicPtr::new(ptr::null_mut()), + queued: AtomicBool::new(true), + queue: Weak::new(), + }); + let stub_ptr = &*stub as *const Node; + let inner = Arc::new(Inner { + wakeup: wakeup, + head_readiness: AtomicPtr::new(stub_ptr as *mut _), + tail_readiness: UnsafeCell::new(stub_ptr), + stub: stub, + }); + + Scheduler { + inner: inner, + nodes: List::new(), + } + } +} + +impl Scheduler { + /// Returns a reference to the wakeup handle + pub fn get_wakeup(&self) -> &W { + &self.inner.wakeup + } + + /// Returns the number of items contained in the set. + /// + /// This represents the total number of in-flight items. + pub fn len(&self) -> usize { + self.nodes.len + } + + /// Returns `true` if the set contains no items + pub fn is_empty(&self) -> bool { + self.nodes.len == 0 + } + + /// Advance the scheduler state. + /// + /// This function should be called whenever the caller is notified via a + /// wakeup. + pub fn tick(&mut self, mut f: F) -> Tick + where F: FnMut(&mut Self, &mut T, &Notify) -> Async + { + loop { + let node = match unsafe { self.inner.dequeue() } { + Dequeue::Empty => { + return Tick::Empty; + } + Dequeue::Inconsistent => { + return Tick::Inconsistent; + } + Dequeue::Data(node) => node, + }; + + debug_assert!(node != self.inner.stub()); + + unsafe { + if (*(*node).item.get()).is_none() { + // The node has already been released. However, while it was + // being released, another thread notified it, which + // resulted in it getting pushed into the mpsc channel. + // + // In this case, we just dec the ref count. + let node = ptr2arc(node); + assert!((*node.next_all.get()).is_null()); + assert!((*node.prev_all.get()).is_null()); + continue + }; + + // We're going to need to be very careful if the `poll` + // function below panics. We need to (a) not leak memory and + // (b) ensure that we still don't have any use-after-frees. To + // manage this we do a few things: + // + // * This "bomb" here will call `release_node` if dropped + // abnormally. That way we'll be sure the memory management + // of the `node` is managed correctly. + // + // * We unlink the node from our internal queue to preemptively + // assume is is complete (will return Ready or panic), in + // which case we'll want to discard it regardless. + // + struct Bomb<'a, T: 'a, W: 'a> { + queue: &'a mut Scheduler, + node: Option>>, + } + + impl<'a, T, W> Drop for Bomb<'a, T, W> { + fn drop(&mut self) { + if let Some(node) = self.node.take() { + release_node(node); + } + } + } + + let mut bomb = Bomb { + node: Some(self.nodes.remove(node)), + queue: self, + }; + + // Now that the bomb holds the node, create a new scope. This + // scope ensures that the borrow will go out of scope before we + // mutate the node pointer in `bomb` again + let res = { + let node = bomb.node.as_ref().unwrap(); + + // Get a reference to the inner future. We already ensured + // that the item `is_some`. + let item = (*node.item.get()).as_mut().unwrap(); + + // Unset queued flag... this must be done before + // polling. This ensures that the item gets + // rescheduled if it is notified **during** a call + // to `poll`. + let prev = (*node).queued.swap(false, SeqCst); + assert!(prev); + + // Poll the underlying item with the appropriate `notify` + // implementation. This is where a large bit of the unsafety + // starts to stem from internally. The `notify` instance itself + // is basically just our `Arc>` and tracks the mpsc + // queue of ready items. + // + // Critically though `Node` won't actually access `T`, the + // item, while it's floating around inside of `Task` + // instances. These structs will basically just use `T` to size + // the internal allocation, appropriately accessing fields and + // deallocating the node if need be. + let queue = &mut *bomb.queue; + let notify = Notify(bomb.node.as_ref().unwrap()); + f(queue, item, ¬ify) + }; + + let ret = match res { + Async::NotReady => { + // The future is not done, push it back into the "all + // node" list. + let node = bomb.node.take().unwrap(); + bomb.queue.nodes.push_back(node); + continue; + } + Async::Ready(v) => { + // `bomb` will take care of unlinking and releasing the + // node. + Tick::Data(v) + } + }; + + return ret + } + } + } + + /// Returns an iterator that allows modifying each item in the set. + pub fn iter_mut(&mut self) -> IterMut { + self.nodes.iter_mut() + } +} + +impl Schedule for Scheduler { + fn schedule(&mut self, item: T) { + let node = Arc::new(Node { + item: UnsafeCell::new(Some(item)), + next_all: UnsafeCell::new(ptr::null_mut()), + prev_all: UnsafeCell::new(ptr::null_mut()), + next_readiness: AtomicPtr::new(ptr::null_mut()), + queued: AtomicBool::new(true), + queue: Arc::downgrade(&self.inner), + }); + + // Right now our node has a strong reference count of 1. We transfer + // ownership of this reference count to our internal linked list + // and we'll reclaim ownership through the `unlink` function below. + let ptr = self.nodes.push_back(node); + + // We'll need to get the item "into the system" to start tracking it, + // e.g. getting its unpark notifications going to us tracking which + // items are ready. To do that we unconditionally enqueue it for + // polling here. + self.inner.enqueue(ptr); + } +} + +fn release_node(node: Arc>) { + // The item is done, try to reset the queued flag. This will prevent + // `notify` from doing any work in the item + let prev = node.queued.swap(true, SeqCst); + + // Drop the item, even if it hasn't finished yet. This is safe + // because we're dropping the item on the thread that owns + // `Scheduler`, which correctly tracks T's lifetimes and such. + unsafe { + drop((*node.item.get()).take()); + } + + // If the queued flag was previously set then it means that this node + // is still in our internal mpsc queue. We then transfer ownership + // of our reference count to the mpsc queue, and it'll come along and + // free it later, noticing that the item is `None`. + // + // If, however, the queued flag was *not* set then we're safe to + // release our reference count on the internal node. The queued flag + // was set above so all item `enqueue` operations will not actually + // enqueue the node, so our node will never see the mpsc queue again. + // The node itself will be deallocated once all reference counts have + // been dropped by the various owning tasks elsewhere. + if prev { + mem::forget(node); + } +} + +impl Debug for Scheduler { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "Scheduler {{ ... }}") + } +} + +impl Drop for Scheduler { + fn drop(&mut self) { + // When a `Scheduler` is dropped we want to drop all items associated + // with it. At the same time though there may be tons of `Task` handles + // flying around which contain `Node` references inside them. We'll + // let those naturally get deallocated when the `Task` itself goes out + // of scope or gets notified. + while let Some(node) = self.nodes.pop_front() { + release_node(node); + } + + // Note that at this point we could still have a bunch of nodes in the + // mpsc queue. None of those nodes, however, have items associated + // with them so they're safe to destroy on any thread. At this point + // the `Scheduler` struct, the owner of the one strong reference + // to `Inner` will drop the strong reference. At that point + // whichever thread releases the strong refcount last (be it this + // thread or some other thread as part of an `upgrade`) will clear out + // the mpsc queue and free all remaining nodes. + // + // While that freeing operation isn't guaranteed to happen here, it's + // guaranteed to happen "promptly" as no more "blocking work" will + // happen while there's a strong refcount held. + } +} + +#[derive(Debug)] +/// Mutable iterator over all items in the unordered set. +pub struct IterMut<'a, T: 'a, W: 'a> { + node: *const Node, + len: usize, + _marker: PhantomData<&'a mut Scheduler> +} + +impl<'a, T, W> Iterator for IterMut<'a, T, W> { + type Item = &'a mut T; + + fn next(&mut self) -> Option<&'a mut T> { + if self.node.is_null() { + return None; + } + unsafe { + let item = (*(*self.node).item.get()).as_mut().unwrap(); + let next = *(*self.node).next_all.get(); + self.node = next; + self.len -= 1; + return Some(item); + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.len, Some(self.len)) + } +} + +impl<'a, T, W> ExactSizeIterator for IterMut<'a, T, W> {} + +impl Inner { + /// The enqueue function from the 1024cores intrusive MPSC queue algorithm. + fn enqueue(&self, node: *const Node) { + unsafe { + debug_assert!((*node).queued.load(Relaxed)); + + // This action does not require any coordination + (*node).next_readiness.store(ptr::null_mut(), Relaxed); + + // Note that these atomic orderings come from 1024cores + let node = node as *mut _; + let prev = self.head_readiness.swap(node, AcqRel); + (*prev).next_readiness.store(node, Release); + } + } + + /// The dequeue function from the 1024cores intrusive MPSC queue algorithm + /// + /// Note that this unsafe as it required mutual exclusion (only one thread + /// can call this) to be guaranteed elsewhere. + unsafe fn dequeue(&self) -> Dequeue { + let mut tail = *self.tail_readiness.get(); + let mut next = (*tail).next_readiness.load(Acquire); + + if tail == self.stub() { + if next.is_null() { + return Dequeue::Empty; + } + + *self.tail_readiness.get() = next; + tail = next; + next = (*next).next_readiness.load(Acquire); + } + + if !next.is_null() { + *self.tail_readiness.get() = next; + debug_assert!(tail != self.stub()); + return Dequeue::Data(tail); + } + + if self.head_readiness.load(Acquire) as *const _ != tail { + return Dequeue::Inconsistent; + } + + self.enqueue(self.stub()); + + next = (*tail).next_readiness.load(Acquire); + + if !next.is_null() { + *self.tail_readiness.get() = next; + return Dequeue::Data(tail); + } + + Dequeue::Inconsistent + } + + fn stub(&self) -> *const Node { + &*self.stub + } +} + +impl Drop for Inner { + fn drop(&mut self) { + // Once we're in the destructor for `Inner` we need to clear out the + // mpsc queue of nodes if there's anything left in there. + // + // Note that each node has a strong reference count associated with it + // which is owned by the mpsc queue. All nodes should have had their + // items dropped already by the `Scheduler` destructor above, + // so we're just pulling out nodes and dropping their refcounts. + unsafe { + loop { + match self.dequeue() { + Dequeue::Empty => break, + Dequeue::Inconsistent => abort("inconsistent in drop"), + Dequeue::Data(ptr) => drop(ptr2arc(ptr)), + } + } + } + } +} + +impl List { + fn new() -> Self { + List { + len: 0, + head: ptr::null_mut(), + tail: ptr::null_mut(), + } + } + + /// Prepends an element to the back of the list + fn push_back(&mut self, node: Arc>) -> *const Node { + let ptr = arc2ptr(node); + + unsafe { + // Point to the current last node in the list + *(*ptr).prev_all.get() = self.tail; + *(*ptr).next_all.get() = ptr::null_mut(); + + if !self.tail.is_null() { + *(*self.tail).next_all.get() = ptr; + self.tail = ptr; + } else { + // This is the first node + self.tail = ptr; + self.head = ptr; + } + } + + self.len += 1; + + return ptr + } + + /// Pop an element from the front of the list + fn pop_front(&mut self) -> Option>> { + if self.head.is_null() { + // The list is empty + return None; + } + + self.len -= 1; + + unsafe { + // Convert the ptr to Arc<_> + let node = ptr2arc(self.head); + + // Update the head pointer + self.head = *node.next_all.get(); + + // If the pointer is null, then the list is empty + if self.head.is_null() { + self.tail = ptr::null_mut(); + } else { + *(*self.head).prev_all.get() = ptr::null_mut(); + } + + Some(node) + } + } + + /// Remove a specific node + unsafe fn remove(&mut self, node: *const Node) -> Arc> { + let node = ptr2arc(node); + let next = *node.next_all.get(); + let prev = *node.prev_all.get(); + *node.next_all.get() = ptr::null_mut(); + *node.prev_all.get() = ptr::null_mut(); + + if !next.is_null() { + *(*next).prev_all.get() = prev; + } else { + self.tail = prev; + } + + if !prev.is_null() { + *(*prev).next_all.get() = next; + } else { + self.head = next; + } + + self.len -= 1; + + return node + } + + fn iter_mut(&mut self) -> IterMut { + IterMut { + node: self.head, + len: self.len, + _marker: PhantomData + } + } +} + +impl<'a, T, W> Clone for Notify<'a, T, W> { + fn clone(&self) -> Self { + Notify(self.0) + } +} + +impl<'a, T: fmt::Debug, W: fmt::Debug> fmt::Debug for Notify<'a, T, W> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Notiy").finish() + } +} + +impl<'a, T, W: Wakeup> From> for NotifyHandle { + fn from(handle: Notify<'a, T, W>) -> NotifyHandle { + unsafe { + let ptr = handle.0.clone(); + let ptr = mem::transmute::>, *mut ArcNode>(ptr); + NotifyHandle::new(hide_lt(ptr)) + } + } +} + +struct ArcNode(PhantomData<(T, W)>); + +// We should never touch `T` on any thread other than the one owning +// `Scheduler`, so this should be a safe operation. +// +// `W` already requires `Sync + Send` +unsafe impl Send for ArcNode {} +unsafe impl Sync for ArcNode {} + +impl executor::Notify for ArcNode { + fn notify(&self, _id: usize) { + unsafe { + let me: *const ArcNode = self; + let me: *const *const ArcNode = &me; + let me = me as *const Arc>; + Node::notify(&*me) + } + } +} + +unsafe impl UnsafeNotify for ArcNode { + unsafe fn clone_raw(&self) -> NotifyHandle { + let me: *const ArcNode = self; + let me: *const *const ArcNode = &me; + let me = &*(me as *const Arc>); + Notify(me).into() + } + + unsafe fn drop_raw(&self) { + let mut me: *const ArcNode = self; + let me = &mut me as *mut *const ArcNode as *mut Arc>; + ptr::drop_in_place(me); + } +} + +unsafe fn hide_lt(p: *mut ArcNode) -> *mut UnsafeNotify { + mem::transmute(p as *mut UnsafeNotify) +} + +impl Node { + fn notify(me: &Arc>) { + let inner = match me.queue.upgrade() { + Some(inner) => inner, + None => return, + }; + + // It's our job to notify the node that it's ready to get polled, + // meaning that we need to enqueue it into the readiness queue. To + // do this we flag that we're ready to be queued, and if successful + // we then do the literal queueing operation, ensuring that we're + // only queued once. + // + // Once the node is inserted we be sure to notify the parent task, + // as it'll want to come along and pick up our node now. + // + // Note that we don't change the reference count of the node here, + // we're just enqueueing the raw pointer. The `Scheduler` + // implementation guarantees that if we set the `queued` flag true that + // there's a reference count held by the main `Scheduler` queue + // still. + let prev = me.queued.swap(true, SeqCst); + if !prev { + inner.enqueue(&**me); + inner.wakeup.wakeup(); + } + } +} + +impl Drop for Node { + fn drop(&mut self) { + // Currently a `Node` is sent across all threads for any lifetime, + // regardless of `T`. This means that for memory safety we can't + // actually touch `T` at any time except when we have a reference to the + // `Scheduler` itself. + // + // Consequently it *should* be the case that we always drop items from + // the `Scheduler` instance, but this is a bomb in place to catch + // any bugs in that logic. + unsafe { + if (*self.item.get()).is_some() { + abort("item still here when dropping"); + } + } + } +} + +fn arc2ptr(ptr: Arc) -> *const T { + let addr = &*ptr as *const T; + mem::forget(ptr); + return addr +} + +unsafe fn ptr2arc(ptr: *const T) -> Arc { + let anchor = mem::transmute::>(0x10); + let addr = &*anchor as *const T; + mem::forget(anchor); + let offset = addr as isize - 0x10; + mem::transmute::>(ptr as isize - offset) +} + +fn abort(s: &str) -> ! { + struct DoublePanic; + + impl Drop for DoublePanic { + fn drop(&mut self) { + panic!("panicking twice to abort the program"); + } + } + + let _bomb = DoublePanic; + panic!("{}", s); +} diff --git a/src/sink/blocking.rs b/src/sink/blocking.rs new file mode 100644 index 0000000000..5c4fa60b43 --- /dev/null +++ b/src/sink/blocking.rs @@ -0,0 +1,137 @@ +use {Async, AsyncSink}; +use sink::Sink; +use executor; +use task_impl::ThreadNotify; + +/// Provides thread-blocking operations on a sink. +/// +/// See [`blocking`](fn.blocking.html) documentation for more details. +#[derive(Debug)] +#[must_use = "sinks do nothing unless used"] +pub struct Blocking { + inner: Option>, +} + +/// Provides thread-blocking operations on a sink. +/// +/// `blocking` consumes ownership of `sink`, returning a `Blocking` backed by +/// the sink. The `Blocking` value exposes thread-blocking operations that allow +/// sending values into the sink. For example, `Blocking::send` will block the +/// current thread until the inner sink has capacity to accept the item being +/// sent. +/// +/// **These operations will block the current thread**. This means that they +/// should not be called while in the context of a task executor as this will +/// block the task executor's progress. Also, note that **this value will block +/// the current thread on drop**. Droping `Blocking` will ensure that the inner +/// sink is fully flushed before completing the drop. This could block the +/// current thread. +/// +/// Any errors that happen in the process of flushing when `Blocking` is dropped +/// will be ignored. Code that wishes to handle such errors must manually call +/// flush before the value is dropped. This behavior is similar to [`BufWriter`] +/// in std. +/// +/// [`BufWriter`]: https://doc.rust-lang.org/std/io/struct.BufWriter.html +pub fn blocking(sink: T) -> Blocking { + let inner = executor::spawn(sink); + Blocking { inner: Some(inner) } +} + +impl Blocking { + /// Sends a value to this sink, blocking the current thread until it's able + /// to do so. + /// + /// This function will take the `value` provided and call the underlying + /// sink's `start_send` function until it's ready to accept the value. If + /// the function returns `NotReady` then the current thread is blocked + /// until it is otherwise ready to accept the value. + /// + /// # Return value + /// + /// If `Ok(())` is returned then the `value` provided was successfully sent + /// along the sink, and if `Err(e)` is returned then an error occurred + /// which prevented the value from being sent. + pub fn send(&mut self, mut value: T::SinkItem) -> Result<(), T::SinkError> { + let _enter = executor::enter() + .expect("cannot call `sink::Blocking::send` from within \ + another executor."); + + ThreadNotify::with_current(|notify| { + loop { + let inner = self.inner.as_mut().unwrap(); + match inner.start_send_notify(value, notify, 0) { + Ok(AsyncSink::Ready) => return Ok(()), + Ok(AsyncSink::NotReady(v)) => { + value = v; + notify.park(); + } + Err(e) => return Err(e), + } + } + }) + } + + /// Flushes any buffered data in this sink, blocking the current thread + /// until it's entirely flushed. + /// + /// This function will call the underlying sink's `poll_complete` method + /// until it returns that it's ready to proceed. If the method returns + /// `NotReady` the current thread will be blocked until it's otherwise + /// ready to proceed. + pub fn flush(&mut self) -> Result<(), T::SinkError> { + let _enter = executor::enter() + .expect("cannot call `sink::Blocking::flush` from within \ + another executor."); + + ThreadNotify::with_current(|notify| { + loop { + let inner = self.inner.as_mut().unwrap(); + match inner.poll_flush_notify(notify, 0) { + Ok(Async::Ready(_)) => return Ok(()), + Ok(Async::NotReady) => notify.park(), + Err(e) => return Err(e), + } + } + }) + } + + /// Get a shared reference to the inner sink. + pub fn get_ref(&self) -> &T { + self.inner.as_ref().unwrap().get_ref() + } + + /// Get a mutable reference to the inner sink. + pub fn get_mut(&mut self) -> &mut T { + self.inner.as_mut().unwrap().get_mut() + } + + /// Consume the `Blocking`, returning its inner sink. + pub fn into_inner(mut self) -> T { + self.inner.take().unwrap().into_inner() + } +} + +impl Drop for Blocking { + fn drop(&mut self) { + let _enter = match executor::enter() { + Some(enter) => enter, + // Cannot block from the context of an executor. This is considered + // an "error" and will just not flush. If this panicked, it would + // potentially end up in a double panic situation. + None => return, + }; + + ThreadNotify::with_current(|notify| { + if let Some(ref mut inner) = self.inner { + loop { + match inner.close_notify(notify, 0) { + Ok(Async::Ready(_)) => break, + Ok(Async::NotReady) => notify.park(), + Err(_) => break, + } + } + } + }) + } +} diff --git a/src/sink/mod.rs b/src/sink/mod.rs index e5ea97f92a..a3fdba3aae 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -24,10 +24,15 @@ mod map_err; mod fanout; if_std! { + mod blocking; mod buffer; mod wait; + pub use self::blocking::{blocking, Blocking}; pub use self::buffer::Buffer; + + #[doc(hidden)] + #[allow(deprecated)] pub use self::wait::Wait; // TODO: consider expanding this via e.g. FromIterator @@ -286,13 +291,10 @@ pub trait Sink { #[cfg(not(feature = "with-deprecated"))] fn close(&mut self) -> Poll<(), Self::SinkError>; - /// Creates a new object which will produce a synchronous sink. - /// - /// The sink returned does **not** implement the `Sink` trait, and instead - /// only has two methods: `send` and `flush`. These two methods correspond - /// to `start_send` and `poll_complete` above except are executed in a - /// blocking fashion. #[cfg(feature = "use_std")] + #[doc(hidden)] + #[allow(deprecated)] + #[deprecated(note = "use `sink::blocking` instead")] fn wait(self) -> Wait where Self: Sized { diff --git a/src/sink/wait.rs b/src/sink/wait.rs index 940a58862f..1f25d4a448 100644 --- a/src/sink/wait.rs +++ b/src/sink/wait.rs @@ -1,13 +1,11 @@ +#![allow(deprecated)] + use sink::Sink; use executor; -/// A sink combinator which converts an asynchronous sink to a **blocking -/// sink**. -/// -/// Created by the `Sink::wait` method, this function transforms any sink into a -/// blocking version. This is implemented by blocking the current thread when a -/// sink is otherwise unable to make progress. #[must_use = "sinks do nothing unless used"] +#[doc(hidden)] +#[deprecated(note = "use `sink::blocking` instead")] #[derive(Debug)] pub struct Wait { sink: executor::Spawn, @@ -33,6 +31,7 @@ impl Wait { /// If `Ok(())` is returned then the `value` provided was successfully sent /// along the sink, and if `Err(e)` is returned then an error occurred /// which prevented the value from being sent. + #[allow(deprecated)] pub fn send(&mut self, value: S::SinkItem) -> Result<(), S::SinkError> { self.sink.wait_send(value) } @@ -44,6 +43,7 @@ impl Wait { /// until it returns that it's ready to proceed. If the method returns /// `NotReady` the current thread will be blocked until it's otherwise /// ready to proceed. + #[allow(deprecated)] pub fn flush(&mut self) -> Result<(), S::SinkError> { self.sink.wait_flush() } @@ -53,6 +53,7 @@ impl Wait { /// This function will call the underlying sink's `close` method /// until it returns that it's closed. If the method returns /// `NotReady` the current thread will be blocked until it's otherwise closed. + #[allow(deprecated)] pub fn close(&mut self) -> Result<(), S::SinkError> { self.sink.wait_close() } diff --git a/src/stream/blocking.rs b/src/stream/blocking.rs new file mode 100644 index 0000000000..d87d827c94 --- /dev/null +++ b/src/stream/blocking.rs @@ -0,0 +1,65 @@ +use Async; +use stream::Stream; +use executor; +use task_impl::ThreadNotify; + +/// Provides thread-blocking stream iteration. +/// +/// See [`blocking`](fn.blocking.html) documentation for more details. +#[derive(Debug)] +#[must_use = "iterators do nothing unless advanced"] +pub struct Blocking { + inner: executor::Spawn, +} + +/// Provides thread-blocking stream iteration. +/// +/// `blocking` consumes ownership of `stream`, returning a `Blocking` backed by +/// the stream. The `Blocking` value provides a thread-blocking iterator that +/// yields each consecutive value from stream. +/// +/// **Iteration will block the current thread**. This means that it should not +/// be performed while in the context of a task executor as this will block the +/// task executor's progress. +pub fn blocking(stream: T) -> Blocking { + let inner = executor::spawn(stream); + Blocking { inner: inner } +} + +impl Blocking { + /// Get a shared reference to the inner stream. + pub fn get_ref(&self) -> &T { + self.inner.get_ref() + } + + /// Get a mutable reference to the inner stream. + pub fn get_mut(&mut self) -> &mut T { + self.inner.get_mut() + } + + /// Consume the `Blocking`, returning its inner stream. + pub fn into_inner(self) -> T { + self.inner.into_inner() + } +} + +impl Iterator for Blocking { + type Item = Result; + + fn next(&mut self) -> Option { + let _enter = executor::enter() + .expect("cannot call `stream::Blocking::next` from within \ + another executor."); + + ThreadNotify::with_current(|notify| { + loop { + match self.inner.poll_stream_notify(notify, 0) { + Ok(Async::Ready(Some(v))) => return Some(Ok(v)), + Ok(Async::Ready(None)) => return None, + Ok(Async::NotReady) => notify.park(), + Err(e) => return Some(Err(e)), + } + } + }) + } +} diff --git a/src/stream/futures_unordered.rs b/src/stream/futures_unordered.rs index 2940fd3495..313bd206ae 100644 --- a/src/stream/futures_unordered.rs +++ b/src/stream/futures_unordered.rs @@ -1,18 +1,9 @@ //! An unbounded set of futures. -use std::cell::UnsafeCell; -use std::fmt::{self, Debug}; use std::iter::FromIterator; -use std::marker::PhantomData; -use std::mem; -use std::ptr; -use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel}; -use std::sync::atomic::{AtomicPtr, AtomicBool}; -use std::sync::{Arc, Weak}; -use std::usize; use {task, Stream, Future, Poll, Async}; -use executor::{Notify, UnsafeNotify, NotifyHandle}; +use scheduler::{self, Scheduler, Schedule}; use task_impl::{self, AtomicTask}; /// An unbounded set of futures. @@ -44,73 +35,9 @@ use task_impl::{self, AtomicTask}; /// `futures_unordered` function in the `stream` module, or you can start with an /// empty set with the `FuturesUnordered::new` constructor. #[must_use = "streams do nothing unless polled"] -pub struct FuturesUnordered { - inner: Arc>, - len: usize, - head_all: *const Node, -} - -unsafe impl Send for FuturesUnordered {} -unsafe impl Sync for FuturesUnordered {} - -// FuturesUnordered is implemented using two linked lists. One which links all -// futures managed by a `FuturesUnordered` and one that tracks futures that have -// been scheduled for polling. The first linked list is not thread safe and is -// only accessed by the thread that owns the `FuturesUnordered` value. The -// second linked list is an implementation of the intrusive MPSC queue algorithm -// described by 1024cores.net. -// -// When a future is submitted to the set a node is allocated and inserted in -// both linked lists. The next call to `poll` will (eventually) see this node -// and call `poll` on the future. -// -// Before a managed future is polled, the current task's `Notify` is replaced -// with one that is aware of the specific future being run. This ensures that -// task notifications generated by that specific future are visible to -// `FuturesUnordered`. When a notification is received, the node is scheduled -// for polling by being inserted into the concurrent linked list. -// -// Each node uses an `AtomicUsize` to track it's state. The node state is the -// reference count (the number of outstanding handles to the node) as well as a -// flag tracking if the node is currently inserted in the atomic queue. When the -// future is notified, it will only insert itself into the linked list if it -// isn't currently inserted. - -#[allow(missing_debug_implementations)] -struct Inner { - // The task using `FuturesUnordered`. - parent: AtomicTask, - - // Head/tail of the readiness queue - head_readiness: AtomicPtr>, - tail_readiness: UnsafeCell<*const Node>, - stub: Arc>, -} - -struct Node { - // The future - future: UnsafeCell>, - - // Next pointer for linked list tracking all active nodes - next_all: UnsafeCell<*const Node>, - - // Previous node in linked list tracking all active nodes - prev_all: UnsafeCell<*const Node>, - - // Next pointer in readiness queue - next_readiness: AtomicPtr>, - - // Queue that we'll be enqueued to when notified - queue: Weak>, - - // Whether or not this node is currently in the mpsc queue. - queued: AtomicBool, -} - -enum Dequeue { - Data(*const Node), - Empty, - Inconsistent, +#[derive(Debug)] +pub struct FuturesUnordered { + inner: Scheduler, } impl FuturesUnordered @@ -121,27 +48,8 @@ impl FuturesUnordered /// The returned `FuturesUnordered` does not contain any futures and, in this /// state, `FuturesUnordered::poll` will return `Ok(Async::Ready(None))`. pub fn new() -> FuturesUnordered { - let stub = Arc::new(Node { - future: UnsafeCell::new(None), - next_all: UnsafeCell::new(ptr::null()), - prev_all: UnsafeCell::new(ptr::null()), - next_readiness: AtomicPtr::new(ptr::null_mut()), - queued: AtomicBool::new(true), - queue: Weak::new(), - }); - let stub_ptr = &*stub as *const Node; - let inner = Arc::new(Inner { - parent: AtomicTask::new(), - head_readiness: AtomicPtr::new(stub_ptr as *mut _), - tail_readiness: UnsafeCell::new(stub_ptr), - stub: stub, - }); - - FuturesUnordered { - len: 0, - head_all: ptr::null_mut(), - inner: inner, - } + let inner = Scheduler::new(AtomicTask::new()); + FuturesUnordered { inner: inner } } } @@ -150,12 +58,12 @@ impl FuturesUnordered { /// /// This represents the total number of in-flight futures. pub fn len(&self) -> usize { - self.len + self.inner.len() } /// Returns `true` if the set contains no futures pub fn is_empty(&self) -> bool { - self.len == 0 + self.inner.is_empty() } /// Push a future into the set. @@ -165,99 +73,14 @@ impl FuturesUnordered { /// ensure that `FuturesUnordered::poll` is called in order to receive task /// notifications. pub fn push(&mut self, future: T) { - let node = Arc::new(Node { - future: UnsafeCell::new(Some(future)), - next_all: UnsafeCell::new(ptr::null_mut()), - prev_all: UnsafeCell::new(ptr::null_mut()), - next_readiness: AtomicPtr::new(ptr::null_mut()), - queued: AtomicBool::new(true), - queue: Arc::downgrade(&self.inner), - }); - - // Right now our node has a strong reference count of 1. We transfer - // ownership of this reference count to our internal linked list - // and we'll reclaim ownership through the `unlink` function below. - let ptr = self.link(node); - - // We'll need to get the future "into the system" to start tracking it, - // e.g. getting its unpark notifications going to us tracking which - // futures are ready. To do that we unconditionally enqueue it for - // polling here. - self.inner.enqueue(ptr); + self.inner.schedule(future) } /// Returns an iterator that allows modifying each future in the set. pub fn iter_mut(&mut self) -> IterMut { IterMut { - node: self.head_all, - len: self.len, - _marker: PhantomData - } - } - - fn release_node(&mut self, node: Arc>) { - // The future is done, try to reset the queued flag. This will prevent - // `notify` from doing any work in the future - let prev = node.queued.swap(true, SeqCst); - - // Drop the future, even if it hasn't finished yet. This is safe - // because we're dropping the future on the thread that owns - // `FuturesUnordered`, which correctly tracks T's lifetimes and such. - unsafe { - drop((*node.future.get()).take()); - } - - // If the queued flag was previously set then it means that this node - // is still in our internal mpsc queue. We then transfer ownership - // of our reference count to the mpsc queue, and it'll come along and - // free it later, noticing that the future is `None`. - // - // If, however, the queued flag was *not* set then we're safe to - // release our reference count on the internal node. The queued flag - // was set above so all future `enqueue` operations will not actually - // enqueue the node, so our node will never see the mpsc queue again. - // The node itself will be deallocated once all reference counts have - // been dropped by the various owning tasks elsewhere. - if prev { - mem::forget(node); - } - } - - /// Insert a new node into the internal linked list. - fn link(&mut self, node: Arc>) -> *const Node { - let ptr = arc2ptr(node); - unsafe { - *(*ptr).next_all.get() = self.head_all; - if !self.head_all.is_null() { - *(*self.head_all).prev_all.get() = ptr; - } - } - - self.head_all = ptr; - self.len += 1; - return ptr - } - - /// Remove the node from the linked list tracking all nodes currently - /// managed by `FuturesUnordered`. - unsafe fn unlink(&mut self, node: *const Node) -> Arc> { - let node = ptr2arc(node); - let next = *node.next_all.get(); - let prev = *node.prev_all.get(); - *node.next_all.get() = ptr::null_mut(); - *node.prev_all.get() = ptr::null_mut(); - - if !next.is_null() { - *(*next).prev_all.get() = prev; - } - - if !prev.is_null() { - *(*prev).next_all.get() = next; - } else { - self.head_all = next; + inner: self.inner.iter_mut(), } - self.len -= 1; - return node } } @@ -268,156 +91,44 @@ impl Stream for FuturesUnordered type Error = T::Error; fn poll(&mut self) -> Poll, T::Error> { - // Ensure `parent` is correctly set. - self.inner.parent.register(); - - loop { - let node = match unsafe { self.inner.dequeue() } { - Dequeue::Empty => { - if self.is_empty() { - return Ok(Async::Ready(None)); - } else { - return Ok(Async::NotReady) - } - } - Dequeue::Inconsistent => { - // At this point, it may be worth yielding the thread & - // spinning a few times... but for now, just yield using the - // task system. - task::current().notify(); - return Ok(Async::NotReady); - } - Dequeue::Data(node) => node, - }; + use scheduler::Tick; - debug_assert!(node != self.inner.stub()); - - unsafe { - let mut future = match (*(*node).future.get()).take() { - Some(future) => future, - - // If the future has already gone away then we're just - // cleaning out this node. See the comment in - // `release_node` for more information, but we're basically - // just taking ownership of our reference count here. - None => { - let node = ptr2arc(node); - assert!((*node.next_all.get()).is_null()); - assert!((*node.prev_all.get()).is_null()); - continue - } - }; + // Ensure `parent` is correctly set. + self.inner.get_wakeup().register(); - // Unset queued flag... this must be done before - // polling. This ensures that the future gets - // rescheduled if it is notified **during** a call - // to `poll`. - let prev = (*node).queued.swap(false, SeqCst); - assert!(prev); + let res = self.inner.tick(|_, f, notify| { + match task_impl::with_notify(notify, 0, || f.poll()) { + Ok(Async::Ready(v)) => Async::Ready(Ok(v)), + Ok(Async::NotReady) => Async::NotReady, + Err(e) => Async::Ready(Err(e)), + } + }); - // We're going to need to be very careful if the `poll` - // function below panics. We need to (a) not leak memory and - // (b) ensure that we still don't have any use-after-frees. To - // manage this we do a few things: - // - // * This "bomb" here will call `release_node` if dropped - // abnormally. That way we'll be sure the memory management - // of the `node` is managed correctly. - // * The future was extracted above (taken ownership). That way - // if it panics we're guaranteed that the future is - // dropped on this thread and doesn't accidentally get - // dropped on a different thread (bad). - // * We unlink the node from our internal queue to preemptively - // assume it'll panic, in which case we'll want to discard it - // regardless. - struct Bomb<'a, T: 'a> { - queue: &'a mut FuturesUnordered, - node: Option>>, + match res { + Tick::Data(Ok(v)) => Ok(Async::Ready(Some(v))), + Tick::Data(Err(e)) => Err(e), + Tick::Empty => { + if self.is_empty() { + Ok(Async::Ready(None)) + } else { + Ok(Async::NotReady) } - impl<'a, T> Drop for Bomb<'a, T> { - fn drop(&mut self) { - if let Some(node) = self.node.take() { - self.queue.release_node(node); - } - } - } - let mut bomb = Bomb { - node: Some(self.unlink(node)), - queue: self, - }; - - // Poll the underlying future with the appropriate `notify` - // implementation. This is where a large bit of the unsafety - // starts to stem from internally. The `notify` instance itself - // is basically just our `Arc>` and tracks the mpsc - // queue of ready futures. - // - // Critically though `Node` won't actually access `T`, the - // future, while it's floating around inside of `Task` - // instances. These structs will basically just use `T` to size - // the internal allocation, appropriately accessing fields and - // deallocating the node if need be. - let res = { - let notify = NodeToHandle(bomb.node.as_ref().unwrap()); - task_impl::with_notify(¬ify, 0, || { - future.poll() - }) - }; - - let ret = match res { - Ok(Async::NotReady) => { - let node = bomb.node.take().unwrap(); - *node.future.get() = Some(future); - bomb.queue.link(node); - continue - } - Ok(Async::Ready(e)) => Ok(Async::Ready(Some(e))), - Err(e) => Err(e), - }; - return ret } - } - } -} - -impl Debug for FuturesUnordered { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "FuturesUnordered {{ ... }}") - } -} - -impl Drop for FuturesUnordered { - fn drop(&mut self) { - // When a `FuturesUnordered` is dropped we want to drop all futures associated - // with it. At the same time though there may be tons of `Task` handles - // flying around which contain `Node` references inside them. We'll - // let those naturally get deallocated when the `Task` itself goes out - // of scope or gets notified. - unsafe { - while !self.head_all.is_null() { - let head = self.head_all; - let node = self.unlink(head); - self.release_node(node); + Tick::Inconsistent => { + // At this point, it may be worth yielding the thread & + // spinning a few times... but for now, just yield using the + // task system. + // + // TODO: Don't do this here + task::current().notify(); + return Ok(Async::NotReady); } } - - // Note that at this point we could still have a bunch of nodes in the - // mpsc queue. None of those nodes, however, have futures associated - // with them so they're safe to destroy on any thread. At this point - // the `FuturesUnordered` struct, the owner of the one strong reference - // to `Inner` will drop the strong reference. At that point - // whichever thread releases the strong refcount last (be it this - // thread or some other thread as part of an `upgrade`) will clear out - // the mpsc queue and free all remaining nodes. - // - // While that freeing operation isn't guaranteed to happen here, it's - // guaranteed to happen "promptly" as no more "blocking work" will - // happen while there's a strong refcount held. } } impl FromIterator for FuturesUnordered { - fn from_iter(iter: T) -> Self + fn from_iter(iter: T) -> Self where T: IntoIterator { let mut new = FuturesUnordered::new(); @@ -431,242 +142,19 @@ impl FromIterator for FuturesUnordered { #[derive(Debug)] /// Mutable iterator over all futures in the unordered set. pub struct IterMut<'a, F: 'a> { - node: *const Node, - len: usize, - _marker: PhantomData<&'a mut FuturesUnordered> + inner: scheduler::IterMut<'a, F, AtomicTask>, } impl<'a, F> Iterator for IterMut<'a, F> { type Item = &'a mut F; fn next(&mut self) -> Option<&'a mut F> { - if self.node.is_null() { - return None; - } - unsafe { - let future = (*(*self.node).future.get()).as_mut().unwrap(); - let next = *(*self.node).next_all.get(); - self.node = next; - self.len -= 1; - return Some(future); - } + self.inner.next() } fn size_hint(&self) -> (usize, Option) { - (self.len, Some(self.len)) + self.inner.size_hint() } } impl<'a, F> ExactSizeIterator for IterMut<'a, F> {} - -impl Inner { - /// The enqueue function from the 1024cores intrusive MPSC queue algorithm. - fn enqueue(&self, node: *const Node) { - unsafe { - debug_assert!((*node).queued.load(Relaxed)); - - // This action does not require any coordination - (*node).next_readiness.store(ptr::null_mut(), Relaxed); - - // Note that these atomic orderings come from 1024cores - let node = node as *mut _; - let prev = self.head_readiness.swap(node, AcqRel); - (*prev).next_readiness.store(node, Release); - } - } - - /// The dequeue function from the 1024cores intrusive MPSC queue algorithm - /// - /// Note that this unsafe as it required mutual exclusion (only one thread - /// can call this) to be guaranteed elsewhere. - unsafe fn dequeue(&self) -> Dequeue { - let mut tail = *self.tail_readiness.get(); - let mut next = (*tail).next_readiness.load(Acquire); - - if tail == self.stub() { - if next.is_null() { - return Dequeue::Empty; - } - - *self.tail_readiness.get() = next; - tail = next; - next = (*next).next_readiness.load(Acquire); - } - - if !next.is_null() { - *self.tail_readiness.get() = next; - debug_assert!(tail != self.stub()); - return Dequeue::Data(tail); - } - - if self.head_readiness.load(Acquire) as *const _ != tail { - return Dequeue::Inconsistent; - } - - self.enqueue(self.stub()); - - next = (*tail).next_readiness.load(Acquire); - - if !next.is_null() { - *self.tail_readiness.get() = next; - return Dequeue::Data(tail); - } - - Dequeue::Inconsistent - } - - fn stub(&self) -> *const Node { - &*self.stub - } -} - -impl Drop for Inner { - fn drop(&mut self) { - // Once we're in the destructor for `Inner` we need to clear out the - // mpsc queue of nodes if there's anything left in there. - // - // Note that each node has a strong reference count associated with it - // which is owned by the mpsc queue. All nodes should have had their - // futures dropped already by the `FuturesUnordered` destructor above, - // so we're just pulling out nodes and dropping their refcounts. - unsafe { - loop { - match self.dequeue() { - Dequeue::Empty => break, - Dequeue::Inconsistent => abort("inconsistent in drop"), - Dequeue::Data(ptr) => drop(ptr2arc(ptr)), - } - } - } - } -} - -#[allow(missing_debug_implementations)] -struct NodeToHandle<'a, T: 'a>(&'a Arc>); - -impl<'a, T> Clone for NodeToHandle<'a, T> { - fn clone(&self) -> Self { - NodeToHandle(self.0) - } -} - -impl<'a, T> From> for NotifyHandle { - fn from(handle: NodeToHandle<'a, T>) -> NotifyHandle { - unsafe { - let ptr = handle.0.clone(); - let ptr = mem::transmute::>, *mut ArcNode>(ptr); - NotifyHandle::new(hide_lt(ptr)) - } - } -} - -struct ArcNode(PhantomData); - -// We should never touch `T` on any thread other than the one owning -// `FuturesUnordered`, so this should be a safe operation. -unsafe impl Send for ArcNode {} -unsafe impl Sync for ArcNode {} - -impl Notify for ArcNode { - fn notify(&self, _id: usize) { - unsafe { - let me: *const ArcNode = self; - let me: *const *const ArcNode = &me; - let me = me as *const Arc>; - Node::notify(&*me) - } - } -} - -unsafe impl UnsafeNotify for ArcNode { - unsafe fn clone_raw(&self) -> NotifyHandle { - let me: *const ArcNode = self; - let me: *const *const ArcNode = &me; - let me = &*(me as *const Arc>); - NodeToHandle(me).into() - } - - unsafe fn drop_raw(&self) { - let mut me: *const ArcNode = self; - let me = &mut me as *mut *const ArcNode as *mut Arc>; - ptr::drop_in_place(me); - } -} - -unsafe fn hide_lt(p: *mut ArcNode) -> *mut UnsafeNotify { - mem::transmute(p as *mut UnsafeNotify) -} - -impl Node { - fn notify(me: &Arc>) { - let inner = match me.queue.upgrade() { - Some(inner) => inner, - None => return, - }; - - // It's our job to notify the node that it's ready to get polled, - // meaning that we need to enqueue it into the readiness queue. To - // do this we flag that we're ready to be queued, and if successful - // we then do the literal queueing operation, ensuring that we're - // only queued once. - // - // Once the node is inserted we be sure to notify the parent task, - // as it'll want to come along and pick up our node now. - // - // Note that we don't change the reference count of the node here, - // we're just enqueueing the raw pointer. The `FuturesUnordered` - // implementation guarantees that if we set the `queued` flag true that - // there's a reference count held by the main `FuturesUnordered` queue - // still. - let prev = me.queued.swap(true, SeqCst); - if !prev { - inner.enqueue(&**me); - inner.parent.notify(); - } - } -} - -impl Drop for Node { - fn drop(&mut self) { - // Currently a `Node` is sent across all threads for any lifetime, - // regardless of `T`. This means that for memory safety we can't - // actually touch `T` at any time except when we have a reference to the - // `FuturesUnordered` itself. - // - // Consequently it *should* be the case that we always drop futures from - // the `FuturesUnordered` instance, but this is a bomb in place to catch - // any bugs in that logic. - unsafe { - if (*self.future.get()).is_some() { - abort("future still here when dropping"); - } - } - } -} - -fn arc2ptr(ptr: Arc) -> *const T { - let addr = &*ptr as *const T; - mem::forget(ptr); - return addr -} - -unsafe fn ptr2arc(ptr: *const T) -> Arc { - let anchor = mem::transmute::>(0x10); - let addr = &*anchor as *const T; - mem::forget(anchor); - let offset = addr as isize - 0x10; - mem::transmute::>(ptr as isize - offset) -} - -fn abort(s: &str) -> ! { - struct DoublePanic; - - impl Drop for DoublePanic { - fn drop(&mut self) { - panic!("panicking twice to abort the program"); - } - } - - let _bomb = DoublePanic; - panic!("{}", s); -} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 01ae6d698a..606146b3fb 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -99,6 +99,7 @@ use sink::{Sink}; if_std! { use std; + mod blocking; mod buffered; mod buffer_unordered; mod catch_unwind; @@ -109,16 +110,20 @@ if_std! { mod split; pub mod futures_unordered; mod futures_ordered; + pub use self::blocking::{blocking, Blocking}; pub use self::buffered::Buffered; pub use self::buffer_unordered::BufferUnordered; pub use self::catch_unwind::CatchUnwind; pub use self::chunks::Chunks; pub use self::collect::Collect; - pub use self::wait::Wait; pub use self::split::{SplitStream, SplitSink}; pub use self::futures_unordered::FuturesUnordered; pub use self::futures_ordered::{futures_ordered, FuturesOrdered}; + #[doc(hidden)] + #[allow(deprecated)] + pub use self::wait::Wait; + #[doc(hidden)] #[cfg(feature = "with-deprecated")] #[allow(deprecated)] @@ -218,29 +223,10 @@ pub trait Stream { // item? basically just says "please make more progress internally" // seems crucial for buffering to actually make any sense. - /// Creates an iterator which blocks the current thread until each item of - /// this stream is resolved. - /// - /// This method will consume ownership of this stream, returning an - /// implementation of a standard iterator. This iterator will *block the - /// current thread* on each call to `next` if the item in the stream isn't - /// ready yet. - /// - /// > **Note:** This method is not appropriate to call on event loops or - /// > similar I/O situations because it will prevent the event - /// > loop from making progress (this blocks the thread). This - /// > method should only be called when it's guaranteed that the - /// > blocking work associated with this stream will be completed - /// > by another thread. - /// - /// This method is only available when the `use_std` feature of this - /// library is activated, and it is activated by default. - /// - /// # Panics - /// - /// The returned iterator does not attempt to catch panics. If the `poll` - /// function panics, panics will be propagated to the caller of `next`. #[cfg(feature = "use_std")] + #[doc(hidden)] + #[allow(deprecated)] + #[deprecated(note = "use `stream::blocking` instead")] fn wait(self) -> Wait where Self: Sized { diff --git a/src/stream/wait.rs b/src/stream/wait.rs index 80acb6c2a6..7dbabfef08 100644 --- a/src/stream/wait.rs +++ b/src/stream/wait.rs @@ -1,13 +1,11 @@ +#![allow(deprecated)] + use stream::Stream; use executor; -/// A stream combinator which converts an asynchronous stream to a **blocking -/// iterator**. -/// -/// Created by the `Stream::wait` method, this function transforms any stream -/// into a standard iterator. This is implemented by blocking the current thread -/// while items on the underlying stream aren't ready yet. #[must_use = "iterators do nothing unless advanced"] +#[doc(hidden)] +#[deprecated(note = "use `stream::blocking` instead")] #[derive(Debug)] pub struct Wait { stream: executor::Spawn, @@ -47,6 +45,7 @@ pub fn new(s: S) -> Wait { impl Iterator for Wait { type Item = Result; + #[allow(deprecated)] fn next(&mut self) -> Option { self.stream.wait_stream() } diff --git a/src/task_impl/atomic_task.rs b/src/task_impl/atomic_task.rs index a89e20c70f..7fe6989dd9 100644 --- a/src/task_impl/atomic_task.rs +++ b/src/task_impl/atomic_task.rs @@ -175,6 +175,13 @@ impl AtomicTask { } } +#[cfg(feature = "use_std")] +impl ::executor::Wakeup for AtomicTask { + fn wakeup(&self) { + self.notify(); + } +} + impl Default for AtomicTask { fn default() -> Self { AtomicTask::new() diff --git a/src/task_impl/mod.rs b/src/task_impl/mod.rs index 132173459f..4f9a536115 100644 --- a/src/task_impl/mod.rs +++ b/src/task_impl/mod.rs @@ -616,6 +616,19 @@ impl NotifyHandle { NotifyHandle { inner: inner } } + /// Return a no-op notify handle + pub fn noop() -> NotifyHandle { + struct Noop; + + impl Notify for Noop { + fn notify(&self, _id: usize) {} + } + + const NOOP: &'static Noop = &Noop; + + NotifyHandle::from(NOOP) + } + /// Invokes the underlying instance of `Notify` with the provided `id`. pub fn notify(&self, id: usize) { unsafe { (*self.inner).notify(id) } diff --git a/src/task_impl/std/mod.rs b/src/task_impl/std/mod.rs index 2472c8124e..c8d7fb803d 100644 --- a/src/task_impl/std/mod.rs +++ b/src/task_impl/std/mod.rs @@ -7,6 +7,7 @@ use std::mem; use std::ptr; use std::sync::{Arc, Mutex, Condvar, Once, ONCE_INIT}; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::{Duration, Instant}; use {Future, Stream, Sink, Poll, Async, StartSend, AsyncSink}; use super::core; @@ -230,12 +231,10 @@ impl Spawn { self.enter(BorrowedUnpark::Old(&unpark), |f| f.poll()) } - /// Waits for the internal future to complete, blocking this thread's - /// execution until it does. - /// - /// This function will call `poll_future` in a loop, waiting for the future - /// to complete. When a future cannot make progress it will use - /// `thread::park` to block the current thread. + #[cfg(feature = "use_std")] + #[doc(hidden)] + #[allow(deprecated)] + #[deprecated(note = "use `future::blocking` instead")] pub fn wait_future(&mut self) -> Result { ThreadNotify::with_current(|notify| { @@ -293,8 +292,9 @@ impl Spawn { self.enter(BorrowedUnpark::Old(&unpark), |s| s.poll()) } - /// Like `wait_future`, except only waits for the next element to arrive on - /// the underlying stream. + #[doc(hidden)] + #[allow(deprecated)] + #[deprecated(note = "use `stream::blocking` instead")] pub fn wait_stream(&mut self) -> Option> { ThreadNotify::with_current(|notify| { @@ -335,11 +335,9 @@ impl Spawn { self.enter(BorrowedUnpark::Old(unpark), |s| s.poll_complete()) } - /// Blocks the current thread until it's able to send `value` on this sink. - /// - /// This function will send the `value` on the sink that this task wraps. If - /// the sink is not ready to send the value yet then the current thread will - /// be blocked until it's able to send the value. + #[doc(hidden)] + #[allow(deprecated)] + #[deprecated(note = "use `sink::blocking` instead")] pub fn wait_send(&mut self, mut value: S::SinkItem) -> Result<(), S::SinkError> { ThreadNotify::with_current(|notify| { @@ -354,14 +352,9 @@ impl Spawn { }) } - /// Blocks the current thread until it's able to flush this sink. - /// - /// This function will call the underlying sink's `poll_complete` method - /// until it returns that it's ready, proxying out errors upwards to the - /// caller if one occurs. - /// - /// The thread will be blocked until `poll_complete` returns that it's - /// ready. + #[doc(hidden)] + #[allow(deprecated)] + #[deprecated(note = "use `sink::blocking` instead")] pub fn wait_flush(&mut self) -> Result<(), S::SinkError> { ThreadNotify::with_current(|notify| { @@ -374,11 +367,9 @@ impl Spawn { }) } - /// Blocks the current thread until it's able to close this sink. - /// - /// This function will close the sink that this task wraps. If the sink - /// is not ready to be close yet, then the current thread will be blocked - /// until it's closed. + #[doc(hidden)] + #[allow(deprecated)] + #[deprecated(note = "use `future::blocking` instead")] pub fn wait_close(&mut self) -> Result<(), S::SinkError> { ThreadNotify::with_current(|notify| { @@ -481,7 +472,7 @@ impl Notify for RunInner { // ===== ThreadNotify ===== -struct ThreadNotify { +pub struct ThreadNotify { state: AtomicUsize, mutex: Mutex<()>, condvar: Condvar, @@ -500,13 +491,17 @@ thread_local! { } impl ThreadNotify { - fn with_current(f: F) -> R + pub fn with_current(f: F) -> R where F: FnOnce(&Arc) -> R, { CURRENT_THREAD_NOTIFY.with(|notify| f(notify)) } - fn park(&self) { + pub fn park(&self) { + self.park_timeout(None); + } + + pub fn park_timeout(&self, dur: Option) { // If currently notified, then we skip sleeping. This is checked outside // of the lock to avoid acquiring a mutex if not necessary. match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) { @@ -531,9 +526,26 @@ impl ThreadNotify { _ => unreachable!(), } - // Loop until we've been notified + // Track (until, remaining) + let mut time = dur.map(|dur| (Instant::now() + dur, dur)); + loop { - m = self.condvar.wait(m).unwrap(); + m = match time { + Some((until, rem)) => { + let (guard, _) = self.condvar.wait_timeout(m, rem).unwrap(); + let now = Instant::now(); + + if now >= until { + // Timed out... exit sleep state + self.state.store(IDLE, Ordering::SeqCst); + return; + } + + time = Some((until, until - now)); + guard + } + None => self.condvar.wait(m).unwrap(), + }; // Transition back to idle, loop otherwise if NOTIFY == self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) { @@ -541,10 +553,8 @@ impl ThreadNotify { } } } -} -impl Notify for ThreadNotify { - fn notify(&self, _unpark_id: usize) { + fn unpark(&self) { // First, try transitioning from IDLE -> NOTIFY, this does not require a // lock. match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) { @@ -567,6 +577,40 @@ impl Notify for ThreadNotify { } } +impl Notify for ThreadNotify { + fn notify(&self, _unpark_id: usize) { + self.unpark(); + } +} + +impl<'a> ::executor::Sleep for &'a Arc { + type Wakeup = Arc; + + fn wakeup(&self) -> Self::Wakeup { + (*self).clone() + } + + fn sleep(&mut self) { + self.park(); + } + + fn sleep_timeout(&mut self, duration: Duration) { + self.park_timeout(Some(duration)); + } +} + +impl ::executor::Wakeup for Arc { + fn wakeup(&self) { + self.unpark(); + } +} + +impl fmt::Debug for ThreadNotify { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("ThreadNotify").finish() + } +} + // ===== UnparkEvent ===== /// For the duration of the given callback, add an "unpark event" to be diff --git a/tests/all.rs b/tests/all.rs index bdd67315c5..1711e4eea2 100644 --- a/tests/all.rs +++ b/tests/all.rs @@ -356,8 +356,11 @@ fn select2() { #[test] fn option() { - assert_eq!(Ok(Some(())), Some(ok::<(), ()>(())).wait()); - assert_eq!(Ok(None), > as Future>::wait(None)); + let some = Some(ok::<(), ()>(())); + let none: Option> = None; + + assert_eq!(Ok(Some(())), blocking(some).wait()); + assert_eq!(Ok(None), blocking(none).wait()); } #[test] diff --git a/tests/bilock.rs b/tests/bilock.rs index 78d873635a..61c8bb5b99 100644 --- a/tests/bilock.rs +++ b/tests/bilock.rs @@ -5,7 +5,7 @@ use std::thread; use futures::prelude::*; use futures::executor; use futures::stream; -use futures::future; +use futures::future::{self, blocking}; use futures::sync::BiLock; mod support; @@ -66,8 +66,8 @@ fn concurrent() { }) }); - let t1 = thread::spawn(move || a.wait()); - let b = b.wait().expect("b error"); + let t1 = thread::spawn(move || blocking(a).wait()); + let b = blocking(b).wait().expect("b error"); let a = t1.join().unwrap().expect("a error"); match a.poll_lock() { diff --git a/tests/blocking.rs b/tests/blocking.rs new file mode 100644 index 0000000000..371fc72039 --- /dev/null +++ b/tests/blocking.rs @@ -0,0 +1,16 @@ +extern crate futures; + +use futures::future::blocking; +use futures::unsync::oneshot; + +#[test] +fn future_try_take() { + let (tx, rx) = oneshot::channel::(); + let mut rx = blocking(rx); + + assert!(rx.try_take().is_none()); + + tx.send(1).unwrap(); + + assert_eq!(Some(Ok(1)), rx.try_take()); +} diff --git a/tests/buffer_unordered.rs b/tests/buffer_unordered.rs index 005bbd9835..e3335cd3f8 100644 --- a/tests/buffer_unordered.rs +++ b/tests/buffer_unordered.rs @@ -3,7 +3,9 @@ extern crate futures; use std::sync::mpsc as std_mpsc; use std::thread; +use futures::future::blocking; use futures::prelude::*; +use futures::stream; use futures::sync::oneshot; use futures::sync::mpsc; @@ -18,20 +20,20 @@ fn works() { let t1 = thread::spawn(move || { for _ in 0..N+1 { let (mytx, myrx) = oneshot::channel(); - tx = tx.send(myrx).wait().unwrap(); + tx = blocking(tx.send(myrx)).wait().unwrap(); tx3.send(mytx).unwrap(); } rx2.recv().unwrap(); for _ in 0..N { let (mytx, myrx) = oneshot::channel(); - tx = tx.send(myrx).wait().unwrap(); + tx = blocking(tx.send(myrx)).wait().unwrap(); tx3.send(mytx).unwrap(); } }); let (tx4, rx4) = std_mpsc::channel(); let t2 = thread::spawn(move || { - for item in rx.map_err(|_| panic!()).buffer_unordered(N).wait() { + for item in stream::blocking(rx.map_err(|_| panic!()).buffer_unordered(N)) { tx4.send(item.unwrap()).unwrap(); } }); diff --git a/tests/channel.rs b/tests/channel.rs index 58c611b5ad..eba81b1190 100644 --- a/tests/channel.rs +++ b/tests/channel.rs @@ -3,7 +3,8 @@ extern crate futures; use std::sync::atomic::*; use futures::prelude::*; -use futures::future::result; +use futures::future::{blocking, result}; +use futures::stream; use futures::sync::mpsc; mod support; @@ -18,7 +19,7 @@ fn sequence() { let amt = 20; send(amt, tx).forget(); - let mut rx = rx.wait(); + let mut rx = stream::blocking(rx); for i in (1..amt + 1).rev() { assert_eq!(rx.next(), Some(Ok(i))); } @@ -45,9 +46,9 @@ fn drop_sender() { #[test] fn drop_rx() { let (tx, rx) = mpsc::channel::(1); - let tx = tx.send(1).wait().ok().unwrap(); + let tx = blocking(tx.send(1)).wait().ok().unwrap(); drop(rx); - assert!(tx.send(1).wait().is_err()); + assert!(blocking(tx.send(1)).wait().is_err()); } #[test] @@ -63,10 +64,10 @@ fn drop_order() { } } - let tx = tx.send(A).wait().unwrap(); + let tx = blocking(tx.send(A)).wait().unwrap(); assert_eq!(DROPS.load(Ordering::SeqCst), 0); drop(rx); assert_eq!(DROPS.load(Ordering::SeqCst), 1); - assert!(tx.send(A).wait().is_err()); + assert!(blocking(tx.send(A)).wait().is_err()); assert_eq!(DROPS.load(Ordering::SeqCst), 2); } diff --git a/tests/current_thread_executor.rs b/tests/current_thread_executor.rs new file mode 100644 index 0000000000..e109d56037 --- /dev/null +++ b/tests/current_thread_executor.rs @@ -0,0 +1,190 @@ +extern crate futures; + +use futures::{task, Future, Poll, Async}; +use futures::future::{blocking, empty, lazy}; +use futures::current_thread::*; + +use std::cell::{Cell, RefCell}; +use std::rc::Rc; + +#[test] +fn spawning_from_init_future() { + let cnt = Rc::new(Cell::new(0)); + + run(|_| { + let cnt = cnt.clone(); + + spawn(lazy(move || { + cnt.set(1 + cnt.get()); + Ok(()) + })); + }); + + assert_eq!(1, cnt.get()); +} + +#[test] +fn block_waits_for_non_daemon() { + use futures::sync::oneshot; + use std::thread; + use std::time::Duration; + + let cnt = Rc::new(Cell::new(0)); + + run(|_| { + let cnt = cnt.clone(); + + let (tx, rx) = oneshot::channel(); + + thread::spawn(|| { + thread::sleep(Duration::from_millis(1000)); + tx.send(()).unwrap(); + }); + + spawn(rx.then(move |_| { + cnt.set(1 + cnt.get()); + Ok(()) + })); + }); + + assert_eq!(1, cnt.get()); +} + +#[test] +#[should_panic] +fn spawning_out_of_executor_context() { + spawn(lazy(|| Ok(()))); +} + +#[test] +fn spawn_many() { + const ITER: usize = 200; + + let cnt = Rc::new(Cell::new(0)); + + run(|_| { + for _ in 0..ITER { + let cnt = cnt.clone(); + spawn(lazy(move || { + cnt.set(1 + cnt.get()); + Ok::<(), ()>(()) + })); + } + }); + + assert_eq!(cnt.get(), ITER); +} + +struct Never(Rc<()>); + +impl Future for Never { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + Ok(Async::NotReady) + } +} + +#[test] +fn outstanding_tasks_are_dropped_on_cancel() { + let mut rc = Rc::new(()); + + run(|ctx| { + spawn(Never(rc.clone())); + ctx.cancel_all_spawned(); + }); + + // Ensure the daemon is dropped + assert!(Rc::get_mut(&mut rc).is_some()); +} + +#[test] +#[should_panic] +fn nesting_run() { + run(|_| { + run(|_| { + }); + }); +} + +#[test] +#[should_panic] +fn run_in_future() { + run(|_| { + spawn(lazy(|| { + run(|_| { + }); + Ok::<(), ()>(()) + })); + }); +} + +#[test] +#[should_panic] +fn blocking_within_init() { + run(|_| { + let _ = blocking(empty::<(), ()>()).wait(); + }); +} + +#[test] +#[should_panic] +fn blocking_in_future() { + run(|_| { + spawn(lazy(|| { + let _ = blocking(empty::<(), ()>()).wait(); + Ok::<(), ()>(()) + })); + }); +} + +#[test] +fn tasks_are_scheduled_fairly() { + let state = Rc::new(RefCell::new([0, 0])); + + struct Spin { + state: Rc>, + idx: usize, + } + + impl Future for Spin { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + let mut state = self.state.borrow_mut(); + + if self.idx == 0 { + let diff = state[0] - state[1]; + + assert!(diff.abs() <= 1); + + if state[0] >= 50 { + return Ok(().into()); + } + } + + state[self.idx] += 1; + + if state[self.idx] >= 100 { + return Ok(().into()); + } + + task::current().notify(); + Ok(Async::NotReady) + } + } + + run(|_| { + spawn(Spin { + state: state.clone(), + idx: 0, + }); + + spawn(Spin { + state: state, + idx: 1, + }); + }); +} diff --git a/tests/future_flatten_stream.rs b/tests/future_flatten_stream.rs index 442d381fd7..1d071fe0bc 100644 --- a/tests/future_flatten_stream.rs +++ b/tests/future_flatten_stream.rs @@ -14,7 +14,7 @@ fn successful_future() { let stream = future_of_a_stream.flatten_stream(); - let mut iter = stream.wait(); + let mut iter = stream::blocking(stream); assert_eq!(Ok(17), iter.next().unwrap()); assert_eq!(Ok(19), iter.next().unwrap()); assert_eq!(None, iter.next()); @@ -37,7 +37,7 @@ impl Stream for PanickingStream { fn failed_future() { let future_of_a_stream = err::, _>(10); let stream = future_of_a_stream.flatten_stream(); - let mut iter = stream.wait(); + let mut iter = stream::blocking(stream); assert_eq!(Err(10), iter.next().unwrap()); assert_eq!(None, iter.next()); } diff --git a/tests/futures_ordered.rs b/tests/futures_ordered.rs index 229a8e58c0..6559aef77a 100644 --- a/tests/futures_ordered.rs +++ b/tests/futures_ordered.rs @@ -2,6 +2,7 @@ extern crate futures; use std::any::Any; +use futures::future::blocking; use futures::sync::oneshot; use futures::stream::futures_ordered; use futures::prelude::*; @@ -59,7 +60,7 @@ fn from_iterator() { ok::(3) ].into_iter().collect::>(); assert_eq!(stream.len(), 3); - assert_eq!(stream.collect().wait(), Ok(vec![1,2,3])); + assert_eq!(blocking(stream.collect()).wait(), Ok(vec![1,2,3])); } #[test] diff --git a/tests/futures_unordered.rs b/tests/futures_unordered.rs index 9b8c08d01b..7e23291c0b 100644 --- a/tests/futures_unordered.rs +++ b/tests/futures_unordered.rs @@ -2,8 +2,9 @@ extern crate futures; use std::any::Any; +use futures::future::blocking; use futures::sync::oneshot; -use futures::stream::futures_unordered; +use futures::stream::{self, futures_unordered}; use futures::prelude::*; mod support; @@ -16,15 +17,15 @@ fn works_1() { let stream = futures_unordered(vec![a_rx, b_rx, c_rx]); - let mut spawn = futures::executor::spawn(stream); + let mut stream = stream::blocking(stream); b_tx.send(99).unwrap(); - assert_eq!(Some(Ok(99)), spawn.wait_stream()); + assert_eq!(Some(Ok(99)), stream.next()); a_tx.send(33).unwrap(); c_tx.send(33).unwrap(); - assert_eq!(Some(Ok(33)), spawn.wait_stream()); - assert_eq!(Some(Ok(33)), spawn.wait_stream()); - assert_eq!(None, spawn.wait_stream()); + assert_eq!(Some(Ok(33)), stream.next()); + assert_eq!(Some(Ok(33)), stream.next()); + assert_eq!(None, stream.next()); } #[test] @@ -57,7 +58,7 @@ fn from_iterator() { ok::(3) ].into_iter().collect::>(); assert_eq!(stream.len(), 3); - assert_eq!(stream.collect().wait(), Ok(vec![1,2,3])); + assert_eq!(blocking(stream.collect()).wait(), Ok(vec![1,2,3])); } #[test] @@ -100,11 +101,11 @@ fn iter_mut_cancel() { assert!(b_tx.is_canceled()); assert!(c_tx.is_canceled()); - let mut spawn = futures::executor::spawn(stream); - assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream()); - assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream()); - assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream()); - assert_eq!(None, spawn.wait_stream()); + let mut stream = stream::blocking(stream); + assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), stream.next()); + assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), stream.next()); + assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), stream.next()); + assert_eq!(None, stream.next()); } #[test] diff --git a/tests/inspect.rs b/tests/inspect.rs index c16372ed91..adc819888b 100644 --- a/tests/inspect.rs +++ b/tests/inspect.rs @@ -1,7 +1,7 @@ extern crate futures; use futures::prelude::*; -use futures::future::{ok, err}; +use futures::future::{blocking, ok, err}; #[test] fn smoke() { @@ -9,14 +9,14 @@ fn smoke() { { let work = ok::(40).inspect(|val| { counter += *val; }); - assert_eq!(work.wait(), Ok(40)); + assert_eq!(blocking(work).wait(), Ok(40)); } assert_eq!(counter, 40); { let work = err::(4).inspect(|val| { counter += *val; }); - assert_eq!(work.wait(), Err(4)); + assert_eq!(blocking(work).wait(), Err(4)); } assert_eq!(counter, 40); diff --git a/tests/mpsc-close.rs b/tests/mpsc-close.rs index 253e015705..0e8f9ad7f2 100644 --- a/tests/mpsc-close.rs +++ b/tests/mpsc-close.rs @@ -2,6 +2,7 @@ extern crate futures; use std::thread; +use futures::future::blocking; use futures::prelude::*; use futures::sync::mpsc::*; @@ -10,12 +11,12 @@ fn smoke() { let (mut sender, receiver) = channel(1); let t = thread::spawn(move ||{ - while let Ok(s) = sender.send(42).wait() { + while let Ok(s) = blocking(sender.send(42)).wait() { sender = s; } }); - receiver.take(3).for_each(|_| Ok(())).wait().unwrap(); + blocking(receiver.take(3).for_each(|_| Ok(()))).wait().unwrap(); t.join().unwrap() } diff --git a/tests/mpsc.rs b/tests/mpsc.rs index 8df98d490f..dd97970741 100644 --- a/tests/mpsc.rs +++ b/tests/mpsc.rs @@ -4,8 +4,8 @@ extern crate futures; use futures::prelude::*; -use futures::future::{lazy, ok}; -use futures::stream::unfold; +use futures::future::{blocking, lazy, ok}; +use futures::stream::{self, unfold}; use futures::sync::mpsc; use futures::sync::oneshot; @@ -24,9 +24,9 @@ impl AssertSend for mpsc::Receiver {} #[test] fn send_recv() { let (tx, rx) = mpsc::channel::(16); - let mut rx = rx.wait(); + let mut rx = stream::blocking(rx); - tx.send(1).wait().unwrap(); + blocking(tx.send(1)).wait().unwrap(); assert_eq!(rx.next().unwrap(), Ok(1)); } @@ -36,7 +36,7 @@ fn send_recv_no_buffer() { let (mut tx, mut rx) = mpsc::channel::(0); // Run on a task context - lazy(move || { + blocking(lazy(move || { assert!(tx.poll_complete().unwrap().is_ready()); assert!(tx.poll_ready().unwrap().is_ready()); @@ -62,29 +62,29 @@ fn send_recv_no_buffer() { assert!(tx.poll_ready().unwrap().is_ready()); Ok::<(), ()>(()) - }).wait().unwrap(); + })).wait().unwrap(); } #[test] fn send_shared_recv() { let (tx1, rx) = mpsc::channel::(16); let tx2 = tx1.clone(); - let mut rx = rx.wait(); + let mut rx = stream::blocking(rx); - tx1.send(1).wait().unwrap(); + blocking(tx1.send(1)).wait().unwrap(); assert_eq!(rx.next().unwrap(), Ok(1)); - tx2.send(2).wait().unwrap(); + blocking(tx2.send(2)).wait().unwrap(); assert_eq!(rx.next().unwrap(), Ok(2)); } #[test] fn send_recv_threads() { let (tx, rx) = mpsc::channel::(16); - let mut rx = rx.wait(); + let mut rx = stream::blocking(rx); thread::spawn(move|| { - tx.send(1).wait().unwrap(); + blocking(tx.send(1)).wait().unwrap(); }); assert_eq!(rx.next().unwrap(), Ok(1)); @@ -93,14 +93,15 @@ fn send_recv_threads() { #[test] fn send_recv_threads_no_capacity() { let (tx, rx) = mpsc::channel::(0); - let mut rx = rx.wait(); + let mut rx = stream::blocking(rx); let (readytx, readyrx) = mpsc::channel::<()>(2); - let mut readyrx = readyrx.wait(); + let mut readyrx = stream::blocking(readyrx); + let t = thread::spawn(move|| { let readytx = readytx.sink_map_err(|_| panic!()); - let (a, b) = tx.send(1).join(readytx.send(())).wait().unwrap(); - a.send(2).join(b.send(())).wait().unwrap(); + let (a, b) = blocking(tx.send(1).join(readytx.send(()))).wait().unwrap(); + blocking(a.send(2).join(b.send(()))).wait().unwrap(); }); drop(readyrx.next().unwrap()); @@ -116,7 +117,7 @@ fn recv_close_gets_none() { let (mut tx, mut rx) = mpsc::channel::(10); // Run on a task context - lazy(move || { + blocking(lazy(move || { rx.close(); assert_eq!(rx.poll(), Ok(Async::Ready(None))); @@ -125,7 +126,7 @@ fn recv_close_gets_none() { drop(tx); Ok::<(), ()>(()) - }).wait().unwrap(); + })).wait().unwrap(); } @@ -134,12 +135,12 @@ fn tx_close_gets_none() { let (_, mut rx) = mpsc::channel::(10); // Run on a task context - lazy(move || { + blocking(lazy(move || { assert_eq!(rx.poll(), Ok(Async::Ready(None))); assert_eq!(rx.poll(), Ok(Async::Ready(None))); Ok::<(), ()>(()) - }).wait().unwrap(); + })).wait().unwrap(); } #[test] @@ -214,7 +215,7 @@ fn stress_shared_unbounded() { const AMT: u32 = 10000; const NTHREADS: u32 = 8; let (tx, rx) = mpsc::unbounded::(); - let mut rx = rx.wait(); + let mut rx = stream::blocking(rx); let t = thread::spawn(move|| { for _ in 0..AMT * NTHREADS { @@ -246,7 +247,7 @@ fn stress_shared_bounded_hard() { const AMT: u32 = 10000; const NTHREADS: u32 = 8; let (tx, rx) = mpsc::channel::(0); - let mut rx = rx.wait(); + let mut rx = stream::blocking(rx); let t = thread::spawn(move|| { for _ in 0..AMT * NTHREADS { @@ -263,7 +264,7 @@ fn stress_shared_bounded_hard() { thread::spawn(move|| { for _ in 0..AMT { - tx = tx.send(1).wait().unwrap(); + tx = blocking(tx.send(1)).wait().unwrap(); } }); } @@ -298,7 +299,7 @@ fn stress_receiver_multi_task_bounded_hard() { match lock.take() { Some(mut rx) => { if i % 5 == 0 { - let (item, rest) = rx.into_future().wait().ok().unwrap(); + let (item, rest) = blocking(rx.into_future()).wait().ok().unwrap(); if item.is_none() { break; @@ -309,7 +310,7 @@ fn stress_receiver_multi_task_bounded_hard() { } else { // Just poll let n = n.clone(); - let r = lazy(move || { + let r = blocking(lazy(move || { let r = match rx.poll().unwrap() { Async::Ready(Some(_)) => { n.fetch_add(1, Ordering::Relaxed); @@ -326,7 +327,7 @@ fn stress_receiver_multi_task_bounded_hard() { }; Ok::(r) - }).wait().unwrap(); + })).wait().unwrap(); if r { break; @@ -342,7 +343,7 @@ fn stress_receiver_multi_task_bounded_hard() { } for i in 0..AMT { - tx = tx.send(i).wait().unwrap(); + tx = blocking(tx.send(i)).wait().unwrap(); } drop(tx); @@ -368,7 +369,7 @@ fn stress_drop_sender() { } for _ in 0..10000 { - assert_eq!(list().wait().collect::, _>>(), + assert_eq!(stream::blocking(list()).collect::, _>>(), Ok(vec![1, 2, 3])); } } @@ -387,7 +388,7 @@ fn stress_close_receiver_iter() { } }); - let mut rx = rx.wait(); + let mut rx = stream::blocking(rx); // Read one message to make sure thread effectively started assert_eq!(Some(Ok(1)), rx.next()); @@ -449,15 +450,15 @@ fn stress_poll_ready() { for _ in 0..NTHREADS { let sender = tx.clone(); threads.push(thread::spawn(move || { - SenderTask { + blocking(SenderTask { sender: sender, count: AMT, - }.wait() + }).wait() })); } drop(tx); - let mut rx = rx.wait(); + let mut rx = stream::blocking(rx); for _ in 0..AMT * NTHREADS { assert!(rx.next().is_some()); } @@ -496,7 +497,7 @@ fn try_send_1() { } } }); - for (i, j) in rx.wait().enumerate() { + for (i, j) in stream::blocking(rx).enumerate() { assert_eq!(i, j.unwrap()); } t.join().unwrap(); @@ -511,18 +512,18 @@ fn try_send_2() { let (readytx, readyrx) = oneshot::channel::<()>(); let th = thread::spawn(|| { - lazy(|| { + blocking(lazy(|| { assert!(tx.start_send("fail").unwrap().is_not_ready()); Ok::<_, ()>(()) - }).wait().unwrap(); + })).wait().unwrap(); drop(readytx); - tx.send("goodbye").wait().unwrap(); + blocking(tx.send("goodbye")).wait().unwrap(); }); - let mut rx = rx.wait(); + let mut rx = stream::blocking(rx); - drop(readyrx.wait()); + drop(blocking(readyrx).wait()); assert_eq!(rx.next(), Some(Ok("hello"))); assert_eq!(rx.next(), Some(Ok("goodbye"))); assert!(rx.next().is_none()); @@ -533,7 +534,7 @@ fn try_send_2() { #[test] fn try_send_fail() { let (mut tx, rx) = mpsc::channel(0); - let mut rx = rx.wait(); + let mut rx = stream::blocking(rx); tx.try_send("hello").unwrap(); diff --git a/tests/oneshot.rs b/tests/oneshot.rs index 45c1996876..3bf61d83ad 100644 --- a/tests/oneshot.rs +++ b/tests/oneshot.rs @@ -4,7 +4,7 @@ use std::sync::mpsc; use std::thread; use futures::prelude::*; -use futures::future::{lazy, ok}; +use futures::future::{blocking, lazy, ok}; use futures::sync::oneshot::*; mod support; @@ -85,7 +85,7 @@ fn close_wakes() { rx.close(); rx2.recv().unwrap(); }); - WaitForCancel { tx: tx }.wait().unwrap(); + blocking(WaitForCancel { tx: tx }).wait().unwrap(); tx2.send(()).unwrap(); t.join().unwrap(); } diff --git a/tests/ready_queue.rs b/tests/ready_queue.rs index b0dc2375ba..2e9f3df92d 100644 --- a/tests/ready_queue.rs +++ b/tests/ready_queue.rs @@ -4,8 +4,8 @@ use std::panic::{self, AssertUnwindSafe}; use futures::prelude::*; use futures::Async::*; -use futures::future; -use futures::stream::FuturesUnordered; +use futures::future::{self, blocking}; +use futures::stream::{self, FuturesUnordered}; use futures::sync::oneshot; trait AssertSendSync: Send + Sync {} @@ -13,7 +13,7 @@ impl AssertSendSync for FuturesUnordered<()> {} #[test] fn basic_usage() { - future::lazy(move || { + blocking(future::lazy(move || { let mut queue = FuturesUnordered::new(); let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); @@ -38,12 +38,12 @@ fn basic_usage() { assert_eq!(Ready(None), queue.poll().unwrap()); Ok::<_, ()>(()) - }).wait().unwrap(); + })).wait().unwrap(); } #[test] fn resolving_errors() { - future::lazy(move || { + blocking(future::lazy(move || { let mut queue = FuturesUnordered::new(); let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); @@ -68,12 +68,12 @@ fn resolving_errors() { assert_eq!(Ready(None), queue.poll().unwrap()); Ok::<_, ()>(()) - }).wait().unwrap(); + })).wait().unwrap(); } #[test] fn dropping_ready_queue() { - future::lazy(move || { + blocking(future::lazy(move || { let mut queue = FuturesUnordered::new(); let (mut tx1, rx1) = oneshot::channel::<()>(); let (mut tx2, rx2) = oneshot::channel::<()>(); @@ -94,7 +94,7 @@ fn dropping_ready_queue() { assert!(tx3.poll_cancel().unwrap().is_ready()); Ok::<_, ()>(()) - }).wait().unwrap(); + })).wait().unwrap(); } #[test] @@ -126,7 +126,7 @@ fn stress() { barrier.wait(); - let mut sync = queue.wait(); + let mut sync = stream::blocking(queue); let mut rx: Vec<_> = (&mut sync) .take(n) @@ -148,7 +148,7 @@ fn stress() { #[test] fn panicking_future_dropped() { - future::lazy(move || { + blocking(future::lazy(move || { let mut queue = FuturesUnordered::new(); queue.push(future::poll_fn(|| -> Poll { panic!() @@ -160,5 +160,5 @@ fn panicking_future_dropped() { assert_eq!(Ready(None), queue.poll().unwrap()); Ok::<_, ()>(()) - }).wait().unwrap(); + })).wait().unwrap(); } diff --git a/tests/recurse.rs b/tests/recurse.rs index 4eb024ac95..c7010ba6c0 100644 --- a/tests/recurse.rs +++ b/tests/recurse.rs @@ -2,7 +2,7 @@ extern crate futures; use std::sync::mpsc::channel; -use futures::future::ok; +use futures::future::{blocking, ok}; use futures::prelude::*; #[test] @@ -17,7 +17,7 @@ fn lots() { let (tx, rx) = channel(); ::std::thread::spawn(|| { - doit(1_000).map(move |_| tx.send(()).unwrap()).wait() + blocking(doit(1_000).map(move |_| tx.send(()).unwrap())).wait() }); rx.recv().unwrap(); } diff --git a/tests/select_all.rs b/tests/select_all.rs index 7780aa306d..552f248943 100644 --- a/tests/select_all.rs +++ b/tests/select_all.rs @@ -1,7 +1,6 @@ extern crate futures; -use futures::prelude::*; -use futures::future::{ok, select_all, err}; +use futures::future::{blocking, ok, select_all, err}; #[test] fn smoke() { @@ -11,15 +10,15 @@ fn smoke() { ok(3), ]; - let (i, idx, v) = select_all(v).wait().ok().unwrap(); + let (i, idx, v) = blocking(select_all(v)).wait().ok().unwrap(); assert_eq!(i, 1); assert_eq!(idx, 0); - let (i, idx, v) = select_all(v).wait().err().unwrap(); + let (i, idx, v) = blocking(select_all(v)).wait().err().unwrap(); assert_eq!(i, 2); assert_eq!(idx, 0); - let (i, idx, v) = select_all(v).wait().ok().unwrap(); + let (i, idx, v) = blocking(select_all(v)).wait().ok().unwrap(); assert_eq!(i, 3); assert_eq!(idx, 0); diff --git a/tests/select_ok.rs b/tests/select_ok.rs index 85f39e2d39..c823e87878 100644 --- a/tests/select_ok.rs +++ b/tests/select_ok.rs @@ -11,12 +11,12 @@ fn ignore_err() { ok(4), ]; - let (i, v) = select_ok(v).wait().ok().unwrap(); + let (i, v) = blocking(select_ok(v)).wait().ok().unwrap(); assert_eq!(i, 3); assert_eq!(v.len(), 1); - let (i, v) = select_ok(v).wait().ok().unwrap(); + let (i, v) = blocking(select_ok(v)).wait().ok().unwrap(); assert_eq!(i, 4); assert!(v.is_empty()); @@ -30,11 +30,11 @@ fn last_err() { err(3), ]; - let (i, v) = select_ok(v).wait().ok().unwrap(); + let (i, v) = blocking(select_ok(v)).wait().ok().unwrap(); assert_eq!(i, 1); assert_eq!(v.len(), 2); - let i = select_ok(v).wait().err().unwrap(); + let i = blocking(select_ok(v)).wait().err().unwrap(); assert_eq!(i, 3); } diff --git a/tests/shared.rs b/tests/shared.rs index 99d2b381ea..77b4e0f473 100644 --- a/tests/shared.rs +++ b/tests/shared.rs @@ -8,7 +8,7 @@ use std::thread; use futures::sync::oneshot; use futures::prelude::*; -use futures::future; +use futures::future::{self, blocking}; fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) { let (tx, rx) = oneshot::channel::(); @@ -16,11 +16,11 @@ fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) { let threads = (0..threads_number).map(|_| { let cloned_future = f.clone(); thread::spawn(move || { - assert_eq!(*cloned_future.wait().unwrap(), 6); + assert_eq!(*blocking(cloned_future).wait().unwrap(), 6); }) }).collect::>(); tx.send(6).unwrap(); - assert_eq!(*f.wait().unwrap(), 6); + assert_eq!(*blocking(f).wait().unwrap(), 6); for f in threads { f.join().unwrap(); } @@ -51,20 +51,20 @@ fn drop_on_one_task_ok() { let t1 = thread::spawn(|| { let f = f1.map_err(|_| ()).map(|x| *x).select(rx2.map_err(|_| ())); - drop(f.wait()); + drop(blocking(f).wait()); }); let (tx3, rx3) = oneshot::channel::(); let t2 = thread::spawn(|| { - let _ = f2.map(|x| tx3.send(*x).unwrap()).map_err(|_| ()).wait(); + let _ = blocking(f2.map(|x| tx3.send(*x).unwrap()).map_err(|_| ())).wait(); }); tx2.send(11).unwrap(); // cancel `f1` t1.join().unwrap(); tx.send(42).unwrap(); // Should cause `f2` and then `rx3` to get resolved. - let result = rx3.wait().unwrap(); + let result = blocking(rx3).wait().unwrap(); assert_eq!(result, 42); t2.join().unwrap(); } @@ -79,7 +79,7 @@ fn drop_in_poll() { }).shared(); let future2 = Box::new(future.clone()) as Box>; *slot.borrow_mut() = Some(future2); - assert_eq!(*future.wait().unwrap(), 1); + assert_eq!(*blocking(future).wait().unwrap(), 1); } #[test] diff --git a/tests/sink.rs b/tests/sink.rs index cb2fdcf26d..9fdc383745 100644 --- a/tests/sink.rs +++ b/tests/sink.rs @@ -7,7 +7,7 @@ use std::cell::{Cell, RefCell}; use std::sync::atomic::{Ordering, AtomicBool}; use futures::prelude::*; -use futures::future::ok; +use futures::future::{blocking, ok}; use futures::stream; use futures::sync::{oneshot, mpsc}; use futures::task::{self, Task}; @@ -30,10 +30,10 @@ fn vec_sink() { fn send() { let v = Vec::new(); - let v = v.send(0).wait().unwrap(); + let v = blocking(v.send(0)).wait().unwrap(); assert_eq!(v, vec![0]); - let v = v.send(1).wait().unwrap(); + let v = blocking(v.send(1)).wait().unwrap(); assert_eq!(v, vec![0, 1]); assert_done(move || v.send(2), @@ -44,10 +44,10 @@ fn send() { fn send_all() { let v = Vec::new(); - let (v, _) = v.send_all(stream::iter_ok(vec![0, 1])).wait().unwrap(); + let (v, _) = blocking(v.send_all(stream::iter_ok(vec![0, 1]))).wait().unwrap(); assert_eq!(v, vec![0, 1]); - let (v, _) = v.send_all(stream::iter_ok(vec![2, 3])).wait().unwrap(); + let (v, _) = blocking(v.send_all(stream::iter_ok(vec![2, 3]))).wait().unwrap(); assert_eq!(v, vec![0, 1, 2, 3]); assert_done( @@ -109,7 +109,7 @@ impl Future for StartSendFut { fn mpsc_blocking_start_send() { let (mut tx, mut rx) = mpsc::channel::(0); - futures::future::lazy(|| { + blocking(futures::future::lazy(|| { assert_eq!(tx.start_send(0).unwrap(), AsyncSink::Ready); let flag = Flag::new(); @@ -125,7 +125,7 @@ fn mpsc_blocking_start_send() { sassert_next(&mut rx, 1); Ok::<(), ()>(()) - }).wait().unwrap(); + })).wait().unwrap(); } #[test] @@ -152,7 +152,7 @@ fn with_flush() { _ => panic!() }; - assert_eq!(sink.send(1).wait().unwrap().get_ref(), &[1, 2]); + assert_eq!(blocking(sink.send(1)).wait().unwrap().get_ref(), &[1, 2]); } #[test] @@ -161,9 +161,9 @@ fn with_as_map() { let sink = Vec::new().with(|item| -> Result { Ok(item * 2) }); - let sink = sink.send(0).wait().unwrap(); - let sink = sink.send(1).wait().unwrap(); - let sink = sink.send(2).wait().unwrap(); + let sink = blocking(sink.send(0)).wait().unwrap(); + let sink = blocking(sink.send(1)).wait().unwrap(); + let sink = blocking(sink.send(2)).wait().unwrap(); assert_eq!(sink.get_ref(), &[0, 2, 4]); } @@ -173,10 +173,10 @@ fn with_flat_map() { let sink = Vec::new().with_flat_map(|item| { stream::iter_ok(vec![item; item]) }); - let sink = sink.send(0).wait().unwrap(); - let sink = sink.send(1).wait().unwrap(); - let sink = sink.send(2).wait().unwrap(); - let sink = sink.send(3).wait().unwrap(); + let sink = blocking(sink.send(0)).wait().unwrap(); + let sink = blocking(sink.send(1)).wait().unwrap(); + let sink = blocking(sink.send(2)).wait().unwrap(); + let sink = blocking(sink.send(3)).wait().unwrap(); assert_eq!(sink.get_ref(), &[1,2,2,3,3,3]); } @@ -251,13 +251,13 @@ fn with_flush_propagate() { // test that a buffer is a no-nop around a sink that always accepts sends fn buffer_noop() { let sink = Vec::new().buffer(0); - let sink = sink.send(0).wait().unwrap(); - let sink = sink.send(1).wait().unwrap(); + let sink = blocking(sink.send(0)).wait().unwrap(); + let sink = blocking(sink.send(1)).wait().unwrap(); assert_eq!(sink.get_ref(), &[0, 1]); let sink = Vec::new().buffer(1); - let sink = sink.send(0).wait().unwrap(); - let sink = sink.send(1).wait().unwrap(); + let sink = blocking(sink.send(0)).wait().unwrap(); + let sink = blocking(sink.send(1)).wait().unwrap(); assert_eq!(sink.get_ref(), &[0, 1]); } @@ -335,8 +335,8 @@ fn buffer() { let (sink, allow) = manual_allow::(); let sink = sink.buffer(2); - let sink = StartSendFut::new(sink, 0).wait().unwrap(); - let sink = StartSendFut::new(sink, 1).wait().unwrap(); + let sink = blocking(StartSendFut::new(sink, 0)).wait().unwrap(); + let sink = blocking(StartSendFut::new(sink, 1)).wait().unwrap(); let flag = Flag::new(); let mut task = executor::spawn(sink.send(2)); diff --git a/tests/split.rs b/tests/split.rs index 7a0667f135..0eb02122fd 100644 --- a/tests/split.rs +++ b/tests/split.rs @@ -1,5 +1,6 @@ extern crate futures; +use futures::future::blocking; use futures::prelude::*; use futures::stream::iter_ok; @@ -41,7 +42,7 @@ fn test_split() { let (sink, stream) = j.split(); let j = sink.reunite(stream).expect("test_split: reunite error"); let (sink, stream) = j.split(); - sink.send_all(stream).wait().unwrap(); + blocking(sink.send_all(stream)).wait().unwrap(); } assert_eq!(dest, vec![10, 20, 30]); } diff --git a/tests/stream.rs b/tests/stream.rs index eb7560351d..ec46f23e3d 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -2,9 +2,8 @@ extern crate futures; use futures::prelude::*; -use futures::executor; -use futures::future::{err, ok}; -use futures::stream::{empty, iter_ok, poll_fn, Peekable}; +use futures::future::{blocking, err, ok}; +use futures::stream::{self, empty, iter_ok, poll_fn, Peekable}; use futures::sync::oneshot; use futures::sync::mpsc; @@ -136,9 +135,8 @@ fn skip() { #[test] fn skip_passes_errors_through() { - let mut s = iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]) - .skip(1) - .wait(); + let mut s = stream::blocking(iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]) + .skip(1)); assert_eq!(s.next(), Some(Err(1))); assert_eq!(s.next(), Some(Err(2))); assert_eq!(s.next(), Some(Ok(4))); @@ -164,15 +162,14 @@ fn take_while() { #[test] fn take_passes_errors_through() { - let mut s = iter(vec![Err(1), Err(2), Ok(3), Ok(4), Err(4)]) - .take(1) - .wait(); + let mut s = stream::blocking(iter(vec![Err(1), Err(2), Ok(3), Ok(4), Err(4)]) + .take(1)); assert_eq!(s.next(), Some(Err(1))); assert_eq!(s.next(), Some(Err(2))); assert_eq!(s.next(), Some(Ok(3))); assert_eq!(s.next(), None); - let mut s = iter(vec![Ok(1), Err(2)]).take(1).wait(); + let mut s = stream::blocking(iter(vec![Ok(1), Err(2)]).take(1)); assert_eq!(s.next(), Some(Ok(1))); assert_eq!(s.next(), None); } @@ -184,7 +181,7 @@ fn peekable() { #[test] fn fuse() { - let mut stream = list().fuse().wait(); + let mut stream = stream::blocking(list().fuse()); assert_eq!(stream.next(), Some(Ok(1))); assert_eq!(stream.next(), Some(Ok(2))); assert_eq!(stream.next(), Some(Ok(3))); @@ -208,7 +205,7 @@ fn buffered() { c.send(3).unwrap(); sassert_empty(&mut rx); a.send(5).unwrap(); - let mut rx = rx.wait(); + let mut rx = stream::blocking(rx); assert_eq!(rx.next(), Some(Ok(5))); assert_eq!(rx.next(), Some(Ok(3))); assert_eq!(rx.next(), None); @@ -226,7 +223,7 @@ fn buffered() { c.send(3).unwrap(); sassert_empty(&mut rx); a.send(5).unwrap(); - let mut rx = rx.wait(); + let mut rx = stream::blocking(rx); assert_eq!(rx.next(), Some(Ok(5))); assert_eq!(rx.next(), Some(Ok(3))); assert_eq!(rx.next(), None); @@ -244,7 +241,7 @@ fn unordered() { let mut rx = rx.buffer_unordered(2); sassert_empty(&mut rx); - let mut rx = rx.wait(); + let mut rx = stream::blocking(rx); c.send(3).unwrap(); assert_eq!(rx.next(), Some(Ok(3))); a.send(5).unwrap(); @@ -265,7 +262,7 @@ fn unordered() { c.send(3).unwrap(); sassert_empty(&mut rx); a.send(5).unwrap(); - let mut rx = rx.wait(); + let mut rx = stream::blocking(rx); assert_eq!(rx.next(), Some(Ok(5))); assert_eq!(rx.next(), Some(Ok(3))); assert_eq!(rx.next(), None); @@ -305,14 +302,14 @@ fn peek() { } } - Peek { + blocking(Peek { inner: list().peekable(), - }.wait().unwrap() + }).wait().unwrap() } #[test] fn wait() { - assert_eq!(list().wait().collect::, _>>(), + assert_eq!(stream::blocking(list()).collect::, _>>(), Ok(vec![1, 2, 3])); } @@ -321,10 +318,10 @@ fn chunks() { assert_done(|| list().chunks(3).collect(), Ok(vec![vec![1, 2, 3]])); assert_done(|| list().chunks(1).collect(), Ok(vec![vec![1], vec![2], vec![3]])); assert_done(|| list().chunks(2).collect(), Ok(vec![vec![1, 2], vec![3]])); - let mut list = executor::spawn(err_list().chunks(3)); - let i = list.wait_stream().unwrap().unwrap(); + let mut list = stream::blocking(err_list().chunks(3)); + let i = list.next().unwrap().unwrap(); assert_eq!(i, vec![1, 2]); - let i = list.wait_stream().unwrap().unwrap_err(); + let i = list.next().unwrap().unwrap_err(); assert_eq!(i, 3); } @@ -352,10 +349,10 @@ fn select() { #[test] fn forward() { let v = Vec::new(); - let v = iter_ok::<_, ()>(vec![0, 1]).forward(v).wait().unwrap().1; + let v = blocking(iter_ok::<_, ()>(vec![0, 1]).forward(v)).wait().unwrap().1; assert_eq!(v, vec![0, 1]); - let v = iter_ok::<_, ()>(vec![2, 3]).forward(v).wait().unwrap().1; + let v = blocking(iter_ok::<_, ()>(vec![2, 3]).forward(v)).wait().unwrap().1; assert_eq!(v, vec![0, 1, 2, 3]); assert_done(move || iter_ok(vec![4, 5]).forward(v).map(|(_, s)| s), @@ -396,7 +393,7 @@ fn stream_poll_fn() { Ok(Async::Ready(Some(counter))) }); - assert_eq!(read_stream.wait().count(), 5); + assert_eq!(stream::blocking(read_stream).count(), 5); } #[test] diff --git a/tests/stream_catch_unwind.rs b/tests/stream_catch_unwind.rs index a06748d09a..89382b9eea 100644 --- a/tests/stream_catch_unwind.rs +++ b/tests/stream_catch_unwind.rs @@ -1,6 +1,6 @@ extern crate futures; -use futures::stream; +use futures::stream::{self, blocking}; use futures::prelude::*; #[test] @@ -9,7 +9,7 @@ fn panic_in_the_middle_of_the_stream() { // panic on second element let stream_panicking = stream.map(|o| o.unwrap()); - let mut iter = stream_panicking.catch_unwind().wait(); + let mut iter = blocking(stream_panicking.catch_unwind()); assert_eq!(Ok(10), iter.next().unwrap().ok().unwrap()); assert!(iter.next().unwrap().is_err()); @@ -20,7 +20,7 @@ fn panic_in_the_middle_of_the_stream() { fn no_panic() { let stream = stream::iter_ok::<_, bool>(vec![10, 11, 12]); - let mut iter = stream.catch_unwind().wait(); + let mut iter = blocking(stream.catch_unwind()); assert_eq!(Ok(10), iter.next().unwrap().ok().unwrap()); assert_eq!(Ok(11), iter.next().unwrap().ok().unwrap()); diff --git a/tests/support/mod.rs b/tests/support/mod.rs index 297749777a..a996f2497e 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use std::thread; use futures::{Future, IntoFuture, Async, Poll}; -use futures::future::FutureResult; +use futures::future::{blocking, FutureResult}; use futures::stream::Stream; use futures::executor::{self, NotifyHandle, Notify}; use futures::task; @@ -23,7 +23,7 @@ pub fn assert_done(f: F, result: Result) T::Error: Eq + fmt::Debug, F: FnOnce() -> T, { - assert_eq!(f().wait(), result); + assert_eq!(blocking(f()).wait(), result); } pub fn assert_empty T>(mut f: F) { @@ -104,7 +104,7 @@ impl ForgetExt for F F::Error: Send { fn forget(self) { - thread::spawn(|| self.wait()); + thread::spawn(|| blocking(self).wait()); } } diff --git a/tests/unsync-oneshot.rs b/tests/unsync-oneshot.rs index 55b0ca5ac2..2a7fe07773 100644 --- a/tests/unsync-oneshot.rs +++ b/tests/unsync-oneshot.rs @@ -1,8 +1,8 @@ extern crate futures; use futures::prelude::*; -use futures::future; -use futures::unsync::oneshot::{channel, Canceled, spawn}; +use futures::future::{self, blocking}; +use futures::sync::oneshot::*; mod support; use support::local_executor; @@ -11,13 +11,13 @@ use support::local_executor; fn smoke() { let (tx, rx) = channel(); tx.send(33).unwrap(); - assert_eq!(rx.wait().unwrap(), 33); + assert_eq!(blocking(rx).wait().unwrap(), 33); } #[test] fn canceled() { let (_, rx) = channel::<()>(); - assert_eq!(rx.wait().unwrap_err(), Canceled); + assert_eq!(blocking(rx).wait().unwrap_err(), Canceled); } #[test] @@ -34,7 +34,7 @@ fn tx_complete_rx_unparked() { tx.send(55).unwrap(); Ok(11) })); - assert_eq!(res.wait().unwrap(), (55, 11)); + assert_eq!(blocking(res).wait().unwrap(), (55, 11)); } #[test] @@ -45,7 +45,7 @@ fn tx_dropped_rx_unparked() { let _tx = tx; Ok(11) })); - assert_eq!(res.wait().unwrap_err(), Canceled); + assert_eq!(blocking(res).wait().unwrap_err(), Canceled); } diff --git a/tests/unsync.rs b/tests/unsync.rs index b5ae8d0fbf..09632ed153 100644 --- a/tests/unsync.rs +++ b/tests/unsync.rs @@ -7,17 +7,17 @@ mod support; use futures::prelude::*; use futures::unsync::oneshot; use futures::unsync::mpsc::{self, SendError}; -use futures::future::lazy; -use futures::stream::{iter_ok, unfold}; +use futures::future::{blocking, lazy}; +use futures::stream::{self, iter_ok, unfold}; use support::local_executor::Core; #[test] fn mpsc_send_recv() { let (tx, rx) = mpsc::channel::(1); - let mut rx = rx.wait(); + let mut rx = stream::blocking(rx); - tx.send(42).wait().unwrap(); + blocking(tx.send(42)).wait().unwrap(); assert_eq!(rx.next(), Some(Ok(42))); assert_eq!(rx.next(), None); @@ -27,20 +27,20 @@ fn mpsc_send_recv() { fn mpsc_rx_notready() { let (_tx, mut rx) = mpsc::channel::(1); - lazy(|| { + blocking(lazy(|| { assert_eq!(rx.poll().unwrap(), Async::NotReady); Ok(()) as Result<(), ()> - }).wait().unwrap(); + })).wait().unwrap(); } #[test] fn mpsc_rx_end() { let (_, mut rx) = mpsc::channel::(1); - lazy(|| { + blocking(lazy(|| { assert_eq!(rx.poll().unwrap(), Async::Ready(None)); Ok(()) as Result<(), ()> - }).wait().unwrap(); + })).wait().unwrap(); } #[test] @@ -54,10 +54,10 @@ fn mpsc_tx_clone_weak_rc() { }).wait().unwrap(); drop(tx); // rc = 1 - lazy(|| { + blocking(lazy(|| { assert_eq!(rx.poll().unwrap(), Async::NotReady); Ok(()) as Result<(), ()> - }).wait().unwrap(); + })).wait().unwrap(); drop(tx_clone); // rc = 0 lazy(|| { @@ -69,46 +69,46 @@ fn mpsc_tx_clone_weak_rc() { #[test] fn mpsc_tx_notready() { let (tx, _rx) = mpsc::channel::(1); - let tx = tx.send(1).wait().unwrap(); - lazy(move || { + let tx = blocking(tx.send(1)).wait().unwrap(); + blocking(lazy(move || { assert!(tx.send(2).poll().unwrap().is_not_ready()); Ok(()) as Result<(), ()> - }).wait().unwrap(); + })).wait().unwrap(); } #[test] fn mpsc_tx_err() { let (tx, _) = mpsc::channel::(1); - lazy(move || { + blocking(lazy(move || { assert!(tx.send(2).poll().is_err()); Ok(()) as Result<(), ()> - }).wait().unwrap(); + })).wait().unwrap(); } #[test] fn mpsc_backpressure() { let (tx, rx) = mpsc::channel::(1); - lazy(move || { + blocking(lazy(move || { iter_ok(vec![1, 2, 3]) .forward(tx) .map_err(|e: SendError| panic!("{}", e)) .join(rx.take(3).collect().map(|xs| { assert_eq!(xs, [1, 2, 3]); })) - }).wait().unwrap(); + })).wait().unwrap(); } #[test] fn mpsc_unbounded() { let (tx, rx) = mpsc::unbounded::(); - lazy(move || { + blocking(lazy(move || { iter_ok(vec![1, 2, 3]) .forward(tx) .map_err(|e: SendError| panic!("{}", e)) .join(rx.take(3).collect().map(|xs| { assert_eq!(xs, [1, 2, 3]); })) - }).wait().unwrap(); + })).wait().unwrap(); } #[test]