From b018a193d4bef9e695103a0c7c8be2aac276f9f4 Mon Sep 17 00:00:00 2001 From: Niko Matsakis Date: Mon, 17 Apr 2017 12:41:08 -0400 Subject: [PATCH 1/9] introduce internal API and rebase future support to use it --- Cargo.toml | 1 + rayon-core/Cargo.toml | 3 - rayon-core/src/future/README.md | 273 ------------------- rayon-core/src/internal/mod.rs | 6 + rayon-core/src/internal/task.rs | 74 +++++ rayon-core/src/internal/worker.rs | 53 ++++ rayon-core/src/job.rs | 2 + rayon-core/src/lib.rs | 10 +- rayon-core/src/log.rs | 8 - rayon-core/src/registry.rs | 62 ++++- rayon-core/src/scope/internal.rs | 61 +++++ rayon-core/src/scope/mod.rs | 55 +--- rayon-core/src/spawn/mod.rs | 77 ------ rayon-core/src/spawn/test.rs | 57 +--- rayon-core/src/thread_pool/internal.rs | 58 ++++ rayon-core/src/thread_pool/mod.rs | 42 +-- {rayon-core/src => src}/future/mod.rs | 213 ++++++--------- {rayon-core/src => src}/future/test.rs | 54 +++- src/lib.rs | 10 +- src/prelude.rs | 3 + tests/compile-fail-unstable/future_escape.rs | 8 +- 21 files changed, 472 insertions(+), 658 deletions(-) delete mode 100644 rayon-core/src/future/README.md create mode 100644 rayon-core/src/internal/mod.rs create mode 100644 rayon-core/src/internal/task.rs create mode 100644 rayon-core/src/internal/worker.rs create mode 100644 rayon-core/src/scope/internal.rs create mode 100644 rayon-core/src/thread_pool/internal.rs rename {rayon-core/src => src}/future/mod.rs (71%) rename {rayon-core/src => src}/future/test.rs (83%) diff --git a/Cargo.toml b/Cargo.toml index f824ba0d0..fe7c3f385 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ exclude = ["ci"] [dependencies] rayon-core = { version = "1.2", path = "rayon-core" } +futures = "0.1.7" # This is a public dependency! [dependencies.either] diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index 8361a0a50..9a45e70ec 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -17,7 +17,4 @@ coco = "0.1.1" libc = "0.2.16" lazy_static = "0.2.2" -# only if #[cfg(rayon_unstable)], will be removed eventually -futures = "0.1.7" - [dev-dependencies] diff --git a/rayon-core/src/future/README.md b/rayon-core/src/future/README.md deleted file mode 100644 index 6c02bacc8..000000000 --- a/rayon-core/src/future/README.md +++ /dev/null @@ -1,273 +0,0 @@ -# Future integration into Rayon - -## How futures work - -Let's start with a brief coverage of how futures work. Our example will -be a simple chain of futures: - - F_map -> F_socket - -Here `F_socket` is a future that maps to a TCP socket. It returns a -`Vec` of data read from that socket. `F_map` is a future will take -that data and do some transformation. (Note that the real futures for -reading from sockets etc do not work in this way, this is just an -example.) - -The idea of futures is that each future offers a `poll()` method. When -`poll()` is invoked, the future will attempt to execute. Typically, -this often involves recursively calling `poll()` on other futures. So, -in our example, `F_map` when it starts would call `F_socket.poll()` to -see if the data is ready. The idea is that `poll()` returns one of -three values: - -- `Ok(Async::Ready(R))` -- the future has completed, here is the result `R`. -- `Err(E)` -- the future has completed and resulted in an error `E`. -- `Ok(Async::NotReady)` -- the future is not yet complete. - -The last one is the most interesting. It means that the future is -blocked on *some event X*, typically an I/O event (i.e., we are -waiting for more data to arrive on a TCP socket). - -When a future returns `NotReady`, it also has one additional job. It -must register the "current task" (think for now of the current thread) -to be re-awoken when the event X has occurred. For most futures, this -job is delegated to another future: e.g., in our example, `F_map` -invokes `F_socket.poll()`. So if `F_socket.poll()` returns not-ready, -then it will have registered the current thread already, and hence -`F_map` can merely propagates the `NotReady` result further up. - -### The current task and executor - -A key concept of the futures.rs library is that of an *executor*. The -executor is the runtime that first invokes the top-level future -(`T_map`, in our example). This is precisely the role that Rayon -plays. Note that in any futures system there may be many -interoperating execturs though. - -Part of an executors job is to maintain some thread-local storage -(TLS) when a future is executing. In particular, it must setup the -"current task" (basically a unique integer, although it's an opaque -type) as well as an "unpark object" of type -`Arc`. [The `Unpark` trait][unpark] offers a single method -(`unpark()`) which can be invoked when the task should be -re-awoken. So `F_socket` might, for example, get the current -`Arc` object and store it for use by an I/O thread. The I/O -thread might invoke `epoll()` or `select()` or whatever and, when it -detects the socket has more data, invoke the `unpark()` method. - -[unpark]: https://docs.rs/futures/0.1/futures/executor/trait.Unpark.html - -## Rayon's futures integration - -When you spawn a future of type `F` into rayon, the idea is that it is -going to start independently executing in the thread-pool. Meanwhile, -the `spawn_future()` method returns to you your own future (let's call -it `F'`) that you can use to poll and monitor its progress. Internally -within Rayon, however, we only allocate a single `Arc` to represent -both of these things -- an `Arc>`, to be precise -- and -this `Arc` hence serves two distinct roles. - -The operations on `F'` (the handle returned to the user) are specified -by the trait `ScopeFutureTrait` and are very simple. The user can -either `poll()` the future, which is checking to see if rayon is done -executing it yet, or `cancel()` the future. `cancel()` occurs when -`F'` is dropped, which indicates that the user no longer has interest -in the result. - -### Future reference counting - -Each spawned future is represents by an `Arc`. This `Arc` actually has -some interesting structure. Each of the edges in the diagram below -represents something that is "kept alive" by holding a ref count (in -some way, usually via an `Arc`): - - F' ---+ [ deque ] --+ - | | - v v - +---> /---------------------\ - | | registry: | ------> [rayon registry] - | | contents: --------\ | - | | | scope | | ------> [spawning scope] - | | | unpark | | --+ - | | | this | | --+ (self references) - | | | ... | | | - | | \-----------------/ | | - | \---------------------/ | - +-------------------------------+ - -Let's walk through them: - -- The incoming edge from `F'` represents the edge from the future that was returned - to the caller of `spawn_future`. This ensures that the future arc will - not be freed so long as the caller is still interesting in looking at - its result. -- The incoming edge from `[ deque ]` represents the fact that when the - future is enqueued into a thread-local deque (which it only - sometimes is), that deque holds a ref. This is done by transmuting - the `Arc` into a `*const Job` object (and hence the `*const` - logically holds the ref that was owned by the `Arc`). When the job - is executed, it is transmuted back and the resulting `Arc` is - eventually dropped, releasing the ref. -- The `registry` field holds onto an `Arc` and hence keeps - some central registry alive. This doesn't really do much but prevent - the `Registry` from being dropped. In particular, this doesn't - prevent the threads in a registry from terminating while the future - is unscheduled etc (though other fields in the future do). -- The `scope` field (of type `S`) is the "enclosing scope". This scope - is an abstract value that implements the `FutureScope<'scope>` trait - -- this means that it is responsible for ensuring that `'scope` does - not end until one of the `FutureScope` methods are invoked (which - occurs when the future has finished executing). For example, if the - future is spawned inside a `scope()` call, then the `S` will be a - wrapper (`ScopeFutureScope`) around a `*const Scope<'scope>`. When - the future is created one job is allocated for this future in the - scope, and the scope counter is decremented once the future is - marked as completing. - - In general, the job of the `scope` field is to ensure that the - future type (`F`) remains valid. After all, since `F: 'scope`, `F` - is known to be valid until the lifetime `'scope` ends, and that - lifetime cannot end until the `scope` methods are invoked, so we - know that `F` must stay valid until one of those methods are - invoked. - - All of our data of type `F` is stored in the field `spawn` (not - shown here). This field is always set to `None` before the scope - counter is decremented. See the section on lifetime safety for more - details. -- The `unpark` and `self` fields both store an `Arc` which is actually - this same future. Thus the future has a ref count cycle (two of - them...) and cannot be freed until this cycle is broken. Both of - these fields are actually `Option>` fields and will be set - to `None` once the future is complete, breakin the cycle and - allowing it to be freed when other references are dropped. - -### The future state machine - -Internally, futures go through various states, depicted here: - - PARKED <----+ - | | - v | - UNPARKED | - | | - v | - EXECUTING --+ - | | ^ - | v | - | EXECUTING_UNPARKED - | - v - COMPLETE - -When they are first created, futures begin as *PARKED*. A *PARKED* -future is one that is waiting for something to happen. It is not -scheduled in the deque of any thread. Even before we return from -`spawn_future()`, however, we will transition into *UNPARKED*. An -*UNPARKED* future is one that is waiting to be executed. It is -enqueued in the deque of some Rayon thread and hence will execute when -the thread gets around to it. - -Once the future begins to execute (it itself is a Rayon job), it -transitions into the *EXECUTING* state. This means that it is busy -calling `F.poll()`, basically. While it calls `poll()`, it also sets -up its `contents.unpark` field as the current "unpark" instance. Hence -if `F` returns `NotReady`, it will clone this `unpark` field and hold -onto it to signal us the future is ready to execute again. - -For now let's assume that `F` is complete and hence readys either -`Ok(Ready(_))` or `Err(_)`. In that case, the future can transition to -`COMPLETE`. At this point, many bits of state that are no longer -needed (e.g., the future itself, but also the `this` and `unpark` -fields) are set to `None` and dropped, and the result is stored in the -`result` field. (Moreover, we may have to signal other tasks, but that -is discussed in a future section.) - -If `F` returns `Ok(Async::NotReady)`, then we would typically -transition to the `PARKED` state and await the call to -`unpark()`. When `unpark()` is called, it would move the future into -the `UNPARK` state and inject it into the registry. - -However, due to the vagaries of thread-scheduling, it *can* happen -that `unpark()` is called before we exit the `EXECUTING` state. For -example, we might invoke `F.poll()`, which send the `Unpark` instance -to the I/O thread, which detects I/O, and invokes `unpark()`, all -before `F.poll()` has returned. In that case, the `unpark()` method -will transition the state (atomically, of course) to -`EXECUTING_UNPARKED`. In that case, instead of transitioning to -`PARKED` when `F.poll()` returns, the future will simply transition -right back to `EXECUTING` and try calling `poll()` again. This can -repeat a few times. - -### Lifetime safety - -Of course, Rayon's signature feature is that it allows you to use a -future `F` that includes references, so long as those references -outlive the lifetime of the scope `'scope`. So why is this safe? - -The basic idea of why this is safe is as follows. The `ScopeFuture` -struct holds a ref on the scope itself (via the field `scope`). -Until this ref is decremented, the scope will not end (and hence -`'scope` is still active). This ref is only decremented while the -future transitions into the *COMPLETE* state -- so anytime before -then, we know we don't have to worry, the references are still valid. - -As we transition into the *COMPLETE* state is where things get more -interesting. You'll notice that signaling the `self.scope` job as done -the *last* thing that happens during that transition. Importantly, -before that is done, we drop all access that we have to the type `F`: -that is, we store `None` into the fields that might reference values -of type `F`. This implies that we know that, whatever happens after we -transition into *COMPLETE*, we can't access any of the references -found in `F` anymore. - -This is good, because there *are* still active refs to the -`ScopeFuture` after we enter the *COMPLETE* state. There are two -sources of these: unpark values and the future result. - -**Unpark values.** We may have given away `Arc` values -- -these are trait objects, but they are actually refs to our -`ScopeFuture`. Note that `Arc: 'static`, so these could be -floating about for any length of time (we had to transmute away the -lifetimes to give them out). This is ok because (a) the `Arc` keeps -the `ScopeFuture` alive and (b) the only thing you can do is to call -`unpark()`, which will promptly return since the state is *COMPLETE* -(and, anyhow, as we saw above, it doesn't have access to any -references anyhow). - -**Future result.** The other, more interesting reference to the -`ScopeFuture` is the value that we gave back to the user when we -spawned the future in the first place. This value is more interesting -because it can be used to do non-trivial things, unlike the -`Arc`. If you look carefully at this handle, you will see that -its type has been designed to hide the type `F`. In fact, it only -reveals the types `T` and `E` which are the ok/err result types of the -future `F`. This is intentonal: suppose that the type `F` includes -some references, but those references don't appear in the result. We -want the "result" future to be able to escape the scope, then, to any -place where the types `T` and `E` are still in scope. If we exposed -`F` here that would not be possible. (Hiding `F` also requires a -transmute to an object type, in this case an internal trait called -`ScopeFutureTrait`.) Note though that it is possible for `T` and `E` -to have references in them. They could even be references tied to the -scope. - -So what can a user do with this result future? They have two -operations available: poll and cancel. Let's look at cancel first, -since it's simpler. If the state is *COMPLETE*, then `cancel()` is an -immediate no-op, so we know that it can't be used to access any -references that may be invalid. In any case, the only thing it does is -to set a field to true and invoke `unpark()`, and we already examined -the possible effects of `unpark()` in the previous section.q - -So what about `poll()`? This is how the user gets the final result out -of the future. The important thing that it does is to access (and -effectively nullify) the field `result`, which stores the result of -the future and hence may have access to `T` and `E` values. These -values may contain references...so how we know that they are still in -scope? The answer is that those types are exposed in the user's type -of the future, and hence the basic Rust type system should guarantee -that any references are still valid, or else the user shouldn't be -able to call `poll()`. (The same is true at the time of cancellation, -but that's not important, since `cancel()` doesn't do anything of -interest.) - - diff --git a/rayon-core/src/internal/mod.rs b/rayon-core/src/internal/mod.rs new file mode 100644 index 000000000..294cff58d --- /dev/null +++ b/rayon-core/src/internal/mod.rs @@ -0,0 +1,6 @@ +//! The internal directory contains internal APIs not meant to be +//! exposed to "end-users" of Rayon, but rather which are useful for +//! constructing abstractions. + +pub mod task; +pub mod worker; diff --git a/rayon-core/src/internal/task.rs b/rayon-core/src/internal/task.rs new file mode 100644 index 000000000..0d689a92c --- /dev/null +++ b/rayon-core/src/internal/task.rs @@ -0,0 +1,74 @@ +use std::any::Any; +use std::sync::Arc; + +/// Represents a task that can be scheduled onto the Rayon +/// thread-pool. Once a task is scheduler, it will execute exactly +/// once (eventually). +pub trait Task: Send { + fn execute(this: Arc); +} + +/// Represents a handle onto some Rayon scope. This could be either a +/// local scope created by the `scope()` function or the global scope +/// for a thread-pool. To get a scope-handle, you can invoke +/// `ToScopeHandle::to_scope_handle()` on either a `scope` value or a +/// `ThreadPool`. +/// +/// The existence of `ScopeHandler` offers a guarantee: +/// +/// - The Rust lifetime `'scope` will not end until the scope-handle +/// is dropped, or until you invoke `panicked()` or `ok()`. +/// +/// This trait is intended to be used as follows: +/// +/// - You have a parallel task of type `T` to perform where `T: 's`, +/// meaning that any references that `T` contains outlive the lifetime +/// `'s`. +/// - You obtain a scope handle `h` of type `H` where `H: +/// ScopeHandle<'s>`; typically this would be by invoking +/// `to_scope_handle()` on a Rayon scope (of type `Scope<'s>`) or a +/// thread-pool (in which case `'s == 'static`). +/// - You invoke `h.spawn()` to start your job(s). This may be done +/// many times. +/// - Note that `h.spawn()` is an unsafe method. You must ensure +/// that your parallel jobs have completed before moving to +/// the next step. +/// - Eventually, when all invocations are complete, you invoke +/// either `panicked()` or `ok()`. +pub unsafe trait ScopeHandle<'scope>: 'scope { + /// Enqueues a task for execution within the thread-pool. The task + /// will eventually be invoked, and once it is, the `Arc` will be + /// dropped. + /// + /// **Unsafe:** The caller must guarantee that the scope handle + /// (`self`) will not be dropped (nor will `ok()` or `panicked()` + /// be called) until the task executes. Otherwise, the lifetime + /// `'scope` may end while the task is still pending. + unsafe fn spawn_task(&self, task: Arc); + + /// Indicates that some sub-task of this scope panicked with the + /// given `err`. This panic will be propagated back to the user as + /// appropriate, depending on how this scope handle was derived. + /// + /// This takes ownership of the scope handle, meaning that once + /// you invoke `panicked`, the scope is permitted to terminate + /// (and, in particular, the Rust lifetime `'scope` may end). + fn panicked(self, err: Box); + + /// Indicates that the sub-tasks of this scope that you have + /// spawned concluded successfully. + /// + /// This takes ownership of the scope handle, meaning that once + /// you invoke `panicked`, the scope is permitted to terminate + /// (and, in particular, the Rust lifetime `'scope` may end). + fn ok(self); +} + +/// Converts a Rayon structure (typicaly a `Scope` or `ThreadPool`) +/// into a "scope handle". See the `ScopeHandle` trait for more +/// details. +pub trait ToScopeHandle<'scope> { + type ScopeHandle: ScopeHandle<'scope>; + fn to_scope_handle(&self) -> Self::ScopeHandle; +} + diff --git a/rayon-core/src/internal/worker.rs b/rayon-core/src/internal/worker.rs new file mode 100644 index 000000000..a42177628 --- /dev/null +++ b/rayon-core/src/internal/worker.rs @@ -0,0 +1,53 @@ +use latch::LatchProbe; +use registry; + +/// Represents the active worker thread. +pub struct WorkerThread<'w> { + thread: &'w registry::WorkerThread +} + +impl<'w> WorkerThread<'w> { + /// Causes the worker thread to wait until `f()` returns true. + /// While the thread is waiting, it will attempt to steal work + /// from other threads, and may go to sleep if there is no work to + /// steal. + /// + /// **Dead-lock warning: This is a low-level interface and cannot + /// be used to wait on arbitrary conditions.** In particular, if + /// the Rayon thread goes to sleep, it will only be awoken when + /// new rayon events occur (e.g., `spawn()` or `join()` is + /// invoked, or one of the methods on a `ScopeHandle`). Therefore, + /// you must ensure that, once the condition `f()` becomes true, + /// some "rayon event" will also occur to ensure that waiting + /// threads are awoken. + pub unsafe fn wait_until_true(&self, f: F) where F: Fn() -> bool { + struct DummyLatch<'a, F: 'a> { f: &'a F } + + impl<'a, F: Fn() -> bool> LatchProbe for DummyLatch<'a, F> { + fn probe(&self) -> bool { + (self.f)() + } + } + + self.thread.wait_until(&DummyLatch { f: &f }); + } +} + +/// If the current thread is a Rayon worker thread, then the callback +/// is invoked with a reference to the worker-thread the result of +/// that callback is returned with `Some`. Otherwise, if we are not on +/// a Rayon worker thread, `None` is immediately returned. +pub fn if_in_worker_thread(if_true: F) -> Option + where F: FnOnce(&WorkerThread) -> R, +{ + let worker_thread = registry::WorkerThread::current(); + if worker_thread.is_null() { + None + } else { + unsafe { + let wt = WorkerThread { thread: &*worker_thread }; + Some(if_true(&wt)) + } + } +} + diff --git a/rayon-core/src/job.rs b/rayon-core/src/job.rs index 60170c740..a2d694c85 100644 --- a/rayon-core/src/job.rs +++ b/rayon-core/src/job.rs @@ -35,6 +35,8 @@ unsafe impl Send for JobRef {} unsafe impl Sync for JobRef {} impl JobRef { + /// Unsafe: caller asserts that `data` will remain valid until the + /// job is executed. pub unsafe fn new(data: *const T) -> JobRef where T: Job { diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index d191540f9..7d532b1b1 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -38,8 +38,6 @@ use std::fmt; extern crate coco; #[macro_use] extern crate lazy_static; -#[cfg(rayon_unstable)] -extern crate futures; extern crate libc; extern crate num_cpus; extern crate rand; @@ -51,8 +49,6 @@ mod latch; mod join; mod job; mod registry; -#[cfg(rayon_unstable)] -mod future; mod scope; mod sleep; mod spawn; @@ -61,16 +57,14 @@ mod thread_pool; mod unwind; mod util; +#[cfg(rayon_unstable)] +pub mod internal; pub use thread_pool::ThreadPool; pub use thread_pool::current_thread_index; pub use thread_pool::current_thread_has_pending_tasks; pub use join::join; pub use scope::{scope, Scope}; pub use spawn::spawn; -#[cfg(rayon_unstable)] -pub use spawn::spawn_future; -#[cfg(rayon_unstable)] -pub use future::RayonFuture; /// Returns the number of threads in the current registry. If this /// code is executing within a Rayon thread-pool, then this will be diff --git a/rayon-core/src/log.rs b/rayon-core/src/log.rs index 647e31fe6..16269f954 100644 --- a/rayon-core/src/log.rs +++ b/rayon-core/src/log.rs @@ -34,14 +34,6 @@ pub enum Event { JobPanickedErrorNotStored { owner_thread: usize }, ScopeCompletePanicked { owner_thread: usize }, ScopeCompleteNoPanic { owner_thread: usize }, - - FutureExecute { state: usize }, - FutureExecuteReady, - FutureExecuteNotReady, - FutureExecuteErr, - FutureInstallWaitingTask { state: usize }, - FutureUnparkWaitingTask, - FutureComplete, } pub const DUMP_LOGS: bool = cfg!(debug_assertions); diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 7d691df10..ce1ea7f60 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -1,6 +1,8 @@ use ::{Configuration, ExitHandler, PanicHandler, StartHandler}; use coco::deque::{self, Worker, Stealer}; -use job::{JobRef, StackJob}; +use job::{Job, JobRef, StackJob}; +#[cfg(rayon_unstable)] +use internal::task::Task; use latch::{LatchProbe, Latch, CountLatch, LockLatch}; #[allow(unused_imports)] use log::Event::*; @@ -237,8 +239,8 @@ impl Registry { /// worker thread for the registry, this will push onto the /// deque. Else, it will inject from the outside (which is slower). pub fn inject_or_push(&self, job_ref: JobRef) { + let worker_thread = WorkerThread::current(); unsafe { - let worker_thread = WorkerThread::current(); if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() { (*worker_thread).push(job_ref); } else { @@ -247,9 +249,57 @@ impl Registry { } } - /// Unsafe: caller asserts that injected jobs will remain valid - /// until they are executed. - pub unsafe fn inject(&self, injected_jobs: &[JobRef]) { + /// Unsafe: the caller must guarantee that `task` will stay valid + /// until it executes. + #[cfg(rayon_unstable)] + pub unsafe fn submit_task(&self, task: Arc) + where T: Task + { + let task_job = TaskJob::new(task); + let task_job_ref = TaskJob::into_job_ref(task_job); + return self.inject_or_push(task_job_ref); + + /// A little newtype wrapper for `T`, just because I did not + /// want to implement `Job` for all `T: Task`. + #[allow(dead_code)] + struct TaskJob { + data: T + } + + impl TaskJob { + fn new(arc: Arc) -> Arc { + // `TaskJob` has the same layout as `T`, so we can safely + // tranmsute this `T` into a `TaskJob`. This lets us write our + // impls of `Job` for `TaskJob`, making them more restricted. + // Since `Job` is a private trait, this is not strictly necessary, + // I don't think, but makes me feel better. + unsafe { mem::transmute(arc) } + } + + pub fn into_task(this: Arc>) -> Arc { + // Same logic as `new()` + unsafe { mem::transmute(this) } + } + + unsafe fn into_job_ref(this: Arc) -> JobRef { + let this: *const Self = mem::transmute(this); + JobRef::new(this) + } + } + + impl Job for TaskJob { + unsafe fn execute(this: *const Self) { + let this: Arc = mem::transmute(this); + let task: Arc = TaskJob::into_task(this); + Task::execute(task); + } + } + } + + /// Push a job into the "external jobs" queue; it will be taken by + /// whatever worker has nothing to do. Use this is you know that + /// you are not on a worker of this registry. + pub fn inject(&self, injected_jobs: &[JobRef]) { log!(InjectJobs { count: injected_jobs.len() }); { let state = self.state.lock().unwrap(); @@ -612,3 +662,5 @@ unsafe fn in_worker_cold(op: OP) -> R job.latch.wait(); job.into_result() } + + diff --git a/rayon-core/src/scope/internal.rs b/rayon-core/src/scope/internal.rs new file mode 100644 index 000000000..d5671e1ab --- /dev/null +++ b/rayon-core/src/scope/internal.rs @@ -0,0 +1,61 @@ +#[cfg(rayon_unstable)] +use internal::task::{ScopeHandle, ToScopeHandle, Task}; +use std::any::Any; +use std::mem; +use std::sync::Arc; +use super::Scope; + +#[cfg(rayon_unstable)] +impl<'scope> ToScopeHandle<'scope> for Scope<'scope> { + type ScopeHandle = LocalScopeHandle<'scope>; + + fn to_scope_handle(&self) -> Self::ScopeHandle { + unsafe { LocalScopeHandle::new(self) } + } +} + +pub struct LocalScopeHandle<'scope> { + scope: *const Scope<'scope> +} + +impl<'scope> LocalScopeHandle<'scope> { + /// Caller guarantees that `*scope` will remain valid + /// until the scope completes. Since we acquire a ref, + /// that means it will remain valid until we release it. + unsafe fn new(scope: &Scope<'scope>) -> Self { + scope.job_completed_latch.increment(); + LocalScopeHandle { scope: scope } + } +} + +impl<'scope> Drop for LocalScopeHandle<'scope> { + fn drop(&mut self) { + unsafe { + if !self.scope.is_null() { + (*self.scope).job_completed_ok(); + } + } + } +} + +/// We assert that the `Self` type remains valid until a +/// method is called, and that `'scope` will not end until +/// that point. +#[cfg(rayon_unstable)] +unsafe impl<'scope> ScopeHandle<'scope> for LocalScopeHandle<'scope> { + unsafe fn spawn_task(&self, task: Arc) { + let scope = &*self.scope; + (*scope.owner_thread).registry().submit_task(task); + } + + fn ok(self) { + mem::drop(self); + } + + fn panicked(self, err: Box) { + unsafe { + (*self.scope).job_panicked(err); + mem::forget(self); // no need to run dtor now + } + } +} diff --git a/rayon-core/src/scope/mod.rs b/rayon-core/src/scope/mod.rs index 935cb9d82..f1009d27a 100644 --- a/rayon-core/src/scope/mod.rs +++ b/rayon-core/src/scope/mod.rs @@ -1,5 +1,3 @@ -#[cfg(rayon_unstable)] -use future::{self, Future, RayonFuture}; use latch::{Latch, CountLatch}; use log::Event::*; use job::HeapJob; @@ -7,13 +5,13 @@ use std::any::Any; use std::marker::PhantomData; use std::mem; use std::ptr; -use std::sync::Arc; use std::sync::atomic::{AtomicPtr, Ordering}; -use registry::{in_worker, Registry, WorkerThread}; +use registry::{in_worker, WorkerThread}; use unwind; #[cfg(test)] mod test; +mod internal; pub struct Scope<'scope> { /// thread where `scope()` was executed (note that individual jobs @@ -284,55 +282,6 @@ impl<'scope> Scope<'scope> { } } - #[cfg(rayon_unstable)] - pub fn spawn_future(&self, future: F) -> RayonFuture - where F: Future + Send + 'scope - { - // We assert that the scope is allocated in a stable location - // (an enclosing stack frame, to be exact) which will remain - // valid until the scope ends. - let future_scope = unsafe { ScopeFutureScope::new(self) }; - - return future::new_rayon_future(future, future_scope); - - struct ScopeFutureScope<'scope> { - scope: *const Scope<'scope> - } - - impl<'scope> ScopeFutureScope<'scope> { - /// Caller guarantees that `*scope` will remain valid - /// until the scope completes. Since we acquire a ref, - /// that means it will remain valid until we release it. - unsafe fn new(scope: &Scope<'scope>) -> Self { - scope.job_completed_latch.increment(); - ScopeFutureScope { scope: scope } - } - } - - /// We assert that the `Self` type remains valid until a - /// method is called, and that `'scope` will not end until - /// that point. - unsafe impl<'scope> future::FutureScope<'scope> for ScopeFutureScope<'scope> { - fn registry(&self) -> Arc { - unsafe { - (*(*self.scope).owner_thread).registry().clone() - } - } - - fn future_completed(self) { - unsafe { - (*self.scope).job_completed_ok(); - } - } - - fn future_panicked(self, err: Box) { - unsafe { - (*self.scope).job_panicked(err); - } - } - } - } - /// Executes `func` as a job, either aborting or executing as /// appropriate. /// diff --git a/rayon-core/src/spawn/mod.rs b/rayon-core/src/spawn/mod.rs index caf7c20a4..393140afe 100644 --- a/rayon-core/src/spawn/mod.rs +++ b/rayon-core/src/spawn/mod.rs @@ -1,10 +1,7 @@ -#[cfg(rayon_unstable)] -use future::{self, Future, RayonFuture}; #[allow(unused_imports)] use latch::{Latch, SpinLatch}; use job::*; use registry::Registry; -use std::any::Any; use std::mem; use std::sync::Arc; use unwind; @@ -96,79 +93,5 @@ pub unsafe fn spawn_in(func: F, registry: &Arc) mem::forget(abort_guard); } -/// Spawns a future in the static scope, scheduling it to execute on -/// Rayon's threadpool. Returns a new future that can be used to poll -/// for the result. Since this future is executing in the static scope, -/// it cannot hold references to things in the enclosing stack frame; -/// if you would like to hold such references, use [the `scope()` -/// function][scope] to create a scope. -/// -/// [scope]: fn.scope.html -/// -/// # Panic handling -/// -/// If this future should panic, that panic will be propagated when -/// `poll()` is invoked on the return value. -#[cfg(rayon_unstable)] -pub fn spawn_future(future: F) -> RayonFuture - where F: Future + Send + 'static -{ - /// We assert that the current registry cannot yet have terminated. - unsafe { spawn_future_in(future, Registry::current()) } -} - -/// Internal helper function. -/// -/// Unsafe because caller must guarantee that `registry` has not yet terminated. -#[cfg(rayon_unstable)] -pub unsafe fn spawn_future_in(future: F, registry: Arc) -> RayonFuture - where F: Future + Send + 'static -{ - let scope = StaticFutureScope::new(registry.clone()); - - future::new_rayon_future(future, scope) -} - -#[cfg(rayon_unstable)] -struct StaticFutureScope { - registry: Arc -} - -#[cfg(rayon_unstable)] -impl StaticFutureScope { - /// Caller asserts that the registry has not yet terminated. - unsafe fn new(registry: Arc) -> Self { - registry.increment_terminate_count(); - StaticFutureScope { registry: registry } - } -} - -/// We assert that: -/// -/// (a) the scope valid remains valid until a completion method -/// is called. In this case, "remains valid" means that the -/// registry is not terminated. This is true because we -/// acquire a "termination count" in `StaticFutureScope::new()` -/// which is not released until `future_panicked()` or -/// `future_completed()` is invoked. -/// (b) the lifetime `'static` will not end until a completion -/// method is called. This is true because `'static` doesn't -/// end until the end of the program. -#[cfg(rayon_unstable)] -unsafe impl future::FutureScope<'static> for StaticFutureScope { - fn registry(&self) -> Arc { - self.registry.clone() - } - - fn future_panicked(self, err: Box) { - self.registry.handle_panic(err); - self.registry.terminate(); - } - - fn future_completed(self) { - self.registry.terminate(); - } -} - #[cfg(test)] mod test; diff --git a/rayon-core/src/spawn/test.rs b/rayon-core/src/spawn/test.rs index a56b7def1..7e93df39a 100644 --- a/rayon-core/src/spawn/test.rs +++ b/rayon-core/src/spawn/test.rs @@ -1,15 +1,10 @@ -#[cfg(rayon_unstable)] -use futures::{lazy, Future}; - use scope; use std::any::Any; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; use std::sync::mpsc::channel; use {Configuration, ThreadPool}; use super::spawn; -#[cfg(rayon_unstable)] -use super::spawn_future; #[test] fn spawn_then_join_in_worker() { @@ -52,56 +47,6 @@ fn panic_fwd() { assert_eq!(1, rx.recv().unwrap()); } -#[test] -#[cfg(rayon_unstable)] -fn async_future_map() { - let data = Arc::new(Mutex::new(format!("Hello, "))); - - let a = spawn_future(lazy({ - let data = data.clone(); - move || Ok::<_, ()>(data) - })); - let future = spawn_future(a.map(|data| { - let mut v = data.lock().unwrap(); - v.push_str("world!"); - })); - let () = future.wait().unwrap(); - - // future must have executed for the scope to have ended, even - // though we never invoked `wait` to observe its result - assert_eq!(&data.lock().unwrap()[..], "Hello, world!"); -} - -#[test] -#[should_panic(expected = "Hello, world!")] -#[cfg(rayon_unstable)] -fn async_future_panic_prop() { - let future = spawn_future(lazy(move || Ok::<(), ()>(argh()))); - let _ = future.rayon_wait(); // should panic, not return a value - - fn argh() -> () { - if true { - panic!("Hello, world!"); - } - } -} - -#[test] -#[cfg(rayon_unstable)] -fn async_future_scope_interact() { - let future = spawn_future(lazy(move || Ok::(22))); - - let mut vec = vec![]; - scope(|s| { - let future = s.spawn_future(future.map(|x| x * 2)); - s.spawn(|_| { - vec.push(future.rayon_wait().unwrap()); - }); // just because - }); - - assert_eq!(vec![44], vec); -} - /// Test what happens when the thread-pool is dropped but there are /// still active asynchronous tasks. We expect the thread-pool to stay /// alive and executing until those threads are complete. diff --git a/rayon-core/src/thread_pool/internal.rs b/rayon-core/src/thread_pool/internal.rs new file mode 100644 index 000000000..b02637bc8 --- /dev/null +++ b/rayon-core/src/thread_pool/internal.rs @@ -0,0 +1,58 @@ +#[cfg(rayon_unstable)] +use internal::task::{ScopeHandle, ToScopeHandle, Task}; +use registry::Registry; +use std::any::Any; +use std::sync::Arc; +use super::ThreadPool; + +#[cfg(rayon_unstable)] +impl ToScopeHandle<'static> for ThreadPool { + type ScopeHandle = ThreadPoolScopeHandle; + + fn to_scope_handle(&self) -> Self::ScopeHandle { + unsafe { ThreadPoolScopeHandle::new(self.registry.clone()) } + } +} + +pub struct ThreadPoolScopeHandle { + registry: Arc +} + +impl ThreadPoolScopeHandle { + /// Caller asserts that the registry has not yet terminated. + unsafe fn new(registry: Arc) -> Self { + registry.increment_terminate_count(); + ThreadPoolScopeHandle { registry: registry } + } +} + +impl Drop for ThreadPoolScopeHandle { + fn drop(&mut self) { + self.registry.terminate(); + } +} + +/// We assert that: +/// +/// (a) the scope valid remains valid until a completion method +/// is called. In this case, "remains valid" means that the +/// registry is not terminated. This is true because we +/// acquire a "termination count" in `StaticFutureScope::new()` +/// which is not released until `future_panicked()` or +/// `future_completed()` is invoked. +/// (b) the lifetime `'static` will not end until a completion +/// method is called. This is true because `'static` doesn't +/// end until the end of the program. +#[cfg(rayon_unstable)] +unsafe impl ScopeHandle<'static> for ThreadPoolScopeHandle { + unsafe fn spawn_task(&self, task: Arc) { + self.registry.submit_task(task); + } + + fn ok(self) { + } + + fn panicked(self, err: Box) { + self.registry.handle_panic(err); + } +} diff --git a/rayon-core/src/thread_pool/mod.rs b/rayon-core/src/thread_pool/mod.rs index cf74931fb..e6d9bd2a3 100644 --- a/rayon-core/src/thread_pool/mod.rs +++ b/rayon-core/src/thread_pool/mod.rs @@ -1,6 +1,4 @@ use Configuration; -#[cfg(rayon_unstable)] -use future::{Future, RayonFuture}; use latch::LockLatch; #[allow(unused_imports)] use log::Event::*; @@ -12,6 +10,7 @@ use std::sync::Arc; use std::error::Error; use registry::{Registry, WorkerThread}; +mod internal; mod test; /// # ThreadPool /// @@ -242,45 +241,6 @@ impl ThreadPool { // We assert that `self.registry` has not terminated. unsafe { spawn::spawn_in(op, &self.registry) } } - - /// Spawns an asynchronous future in the thread pool. `spawn_future()` will inject - /// jobs into the threadpool that are not tied to your current stack frame. This means - /// `ThreadPool`'s `spawn` methods are not scoped. As a result, it cannot access data - /// owned by the stack. - /// - /// `spawn_future()` returns a `RayonFuture`, allowing you to chain - /// multiple jobs togther. - /// - /// ## Using `spawn_future()` - /// - /// ```rust - /// # extern crate rayon_core as rayon; - /// extern crate futures; - /// use futures::{future, Future}; - /// # fn main() { - /// - /// let pool = rayon::ThreadPool::new(rayon::Configuration::new().num_threads(8)).unwrap(); - /// - /// let a = pool.spawn_future(future::lazy(move || Ok::<_, ()>(format!("Hello, ")))); - /// let b = pool.spawn_future(a.map(|mut data| { - /// data.push_str("world"); - /// data - /// })); - /// let result = b.wait().unwrap(); // `Err` is impossible, so use `unwrap()` here - /// println!("{:?}", result); // prints: "Hello, world!" - /// # } - /// ``` - /// - /// See also: [the `spawn_future()` function defined on scopes][spawn_future]. - /// - /// [spawn_future]: struct.Scope.html#method.spawn_future - #[cfg(rayon_unstable)] - pub fn spawn_future(&self, future: F) -> RayonFuture - where F: Future + Send + 'static - { - // We assert that `self.registry` has not yet terminated. - unsafe { spawn::spawn_future_in(future, self.registry.clone()) } - } } impl Drop for ThreadPool { diff --git a/rayon-core/src/future/mod.rs b/src/future/mod.rs similarity index 71% rename from rayon-core/src/future/mod.rs rename to src/future/mod.rs index 75a4bcfe0..fdbb72a92 100644 --- a/rayon-core/src/future/mod.rs +++ b/src/future/mod.rs @@ -1,29 +1,20 @@ -//! Future support in Rayon. This module *primary* consists of -//! internal APIs that are exposed through `Scope::spawn_future` and -//! `::spawn_future`. However, the type `RayonFuture` is a public -//! type exposed to all users. +//! Future support in Rayon. //! //! See `README.md` for details. -use latch::{LatchProbe}; -#[allow(warnings)] -use log::Event::*; -use futures::{Async, Poll}; +use futures::{Async, Future, Poll}; use futures::executor; use futures::future::CatchUnwind; use futures::task::{self, Spawn, Task, Unpark}; -use job::{Job, JobRef}; -use registry::{Registry, WorkerThread}; +use rayon_core::internal::task::{Task as RayonTask, ScopeHandle, ToScopeHandle}; +use rayon_core::internal::worker; use std::any::Any; -use std::panic::AssertUnwindSafe; +use std::panic::{self, AssertUnwindSafe}; use std::mem; use std::sync::Arc; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::*; use std::sync::Mutex; -use unwind; - -pub use futures::Future; const STATE_PARKED: usize = 0; const STATE_UNPARKED: usize = 1; @@ -31,6 +22,36 @@ const STATE_EXECUTING: usize = 2; const STATE_EXECUTING_UNPARKED: usize = 3; const STATE_COMPLETE: usize = 4; +pub trait ScopeFutureExt<'scope> { + fn spawn_future(&self, future: F) -> RayonFuture + where F: Future + Send + 'scope; +} + +impl<'scope, T> ScopeFutureExt<'scope> for T + where T: ToScopeHandle<'scope> +{ + fn spawn_future(&self, future: F) -> RayonFuture + where F: Future + Send + 'scope + { + let inner = ScopeFuture::spawn(future, self.to_scope_handle()); + + // We assert that it is safe to hide the type `F` (and, in + // particular, the lifetimes in it). This is true because the API + // offered by a `RayonFuture` only permits access to the result of + // the future (of type `F::Item` or `F::Error`) and those types + // *are* exposed in the `RayonFuture` type. See + // README.md for details. + unsafe { + return RayonFuture { inner: hide_lifetime(inner) }; + } + + unsafe fn hide_lifetime<'l, T, E>(x: Arc + 'l>) + -> Arc> { + mem::transmute(x) + } + } +} + /// Represents the result of a future that has been spawned in the /// Rayon threadpool. /// @@ -39,67 +60,27 @@ const STATE_COMPLETE: usize = 4; /// Any panics that occur while computing the spawned future will be /// propagated when this future is polled. pub struct RayonFuture { - // Warning: Public end-user API! inner: Arc, Box>>, } -/// Unsafe because implementor must guarantee: -/// -/// 1. That the type `Self` remains dynamically valid until one of the -/// completion methods is called. -/// 2. That the lifetime `'scope` cannot end until one of those -/// methods is called. -/// -/// NB. Although this is public, it is not exposed to outside users. -pub unsafe trait FutureScope<'scope> { - fn registry(&self) -> Arc; - fn future_panicked(self, err: Box); - fn future_completed(self); -} - -/// Create a `RayonFuture` that will execute `F` and yield its result, -/// propagating any panics. -/// -/// NB. Although this is public, it is not exposed to outside users. -pub fn new_rayon_future<'scope, F, S>(future: F, scope: S) -> RayonFuture - where F: Future + Send + 'scope, S: FutureScope<'scope>, -{ - let inner = ScopeFuture::spawn(future, scope); - - // We assert that it is safe to hide the type `F` (and, in - // particular, the lifetimes in it). This is true because the API - // offered by a `RayonFuture` only permits access to the result of - // the future (of type `F::Item` or `F::Error`) and those types - // *are* exposed in the `RayonFuture` type. See - // README.md for details. - unsafe { - return RayonFuture { inner: hide_lifetime(inner) }; - } - - unsafe fn hide_lifetime<'l, T, E>(x: Arc + 'l>) - -> Arc> { - mem::transmute(x) - } -} - impl RayonFuture { pub fn rayon_wait(mut self) -> Result { - // NB: End-user API! - let worker_thread = WorkerThread::current(); - if worker_thread.is_null() { - self.wait() - } else { - // Assert that uses of `worker_thread` pointer below are - // valid (because we are on the worker-thread). + worker::if_in_worker_thread(|worker_thread| { + // In Rayon worker thread: spin. Unsafe because we must be + // sure that `self.inner.probe()` will trigger some Rayon + // event once it becomes true -- and it will, as when the + // future moves to the complete state, we will invoke + // either `ScopeHandle::panicked()` or `ScopeHandle::ok()` + // on our scope handle. unsafe { - (*worker_thread).wait_until(&*self.inner); - debug_assert!(self.inner.probe()); - self.poll().map(|a_v| match a_v { - Async::Ready(v) => v, - Async::NotReady => panic!("probe() returned true but poll not ready") - }) + worker_thread.wait_until_true(|| self.inner.probe()); } - } + self.poll().map(|a_v| match a_v { + Async::Ready(v) => v, + Async::NotReady => panic!("probe() returned true but poll not ready") + }) + }) + .unwrap_or_else(|| self.wait()) } } @@ -108,11 +89,9 @@ impl Future for RayonFuture { type Error = E; fn wait(self) -> Result { - if WorkerThread::current().is_null() { - executor::spawn(self).wait_future() - } else { - panic!("using `wait()` in a Rayon thread is unwise; try `rayon_wait()`") - } + worker::if_in_worker_thread( + |_| panic!("using `wait()` in a Rayon thread is unwise; try `rayon_wait()`")); + executor::spawn(self).wait_future() } fn poll(&mut self) -> Poll { @@ -120,7 +99,7 @@ impl Future for RayonFuture { Ok(Async::Ready(Ok(v))) => Ok(Async::Ready(v)), Ok(Async::Ready(Err(e))) => Err(e), Ok(Async::NotReady) => Ok(Async::NotReady), - Err(e) => unwind::resume_unwinding(e), + Err(e) => panic::resume_unwind(e), } } } @@ -134,10 +113,9 @@ impl Drop for RayonFuture { /// //////////////////////////////////////////////////////////////////////// struct ScopeFuture<'scope, F, S> - where F: Future + Send + 'scope, S: FutureScope<'scope>, + where F: Future + Send + 'scope, S: ScopeHandle<'scope>, { state: AtomicUsize, - registry: Arc, contents: Mutex>, } @@ -146,7 +124,7 @@ type CUItem = as Future>::Item; type CUError = as Future>::Error; struct ScopeFutureContents<'scope, F, S> - where F: Future + Send + 'scope, S: FutureScope<'scope>, + where F: Future + Send + 'scope, S: ScopeHandle<'scope>, { spawn: Option>>, unpark: Option>, @@ -168,14 +146,14 @@ struct ScopeFutureContents<'scope, F, S> // Assert that the `*const` is safe to transmit between threads: unsafe impl<'scope, F, S> Send for ScopeFuture<'scope, F, S> - where F: Future + Send + 'scope, S: FutureScope<'scope>, + where F: Future + Send + 'scope, S: ScopeHandle<'scope>, {} unsafe impl<'scope, F, S> Sync for ScopeFuture<'scope, F, S> - where F: Future + Send + 'scope, S: FutureScope<'scope>, + where F: Future + Send + 'scope, S: ScopeHandle<'scope>, {} impl<'scope, F, S> ScopeFuture<'scope, F, S> - where F: Future + Send + 'scope, S: FutureScope<'scope>, + where F: Future + Send + 'scope, S: ScopeHandle<'scope>, { fn spawn(future: F, scope: S) -> Arc { // Using `AssertUnwindSafe` is valid here because (a) the data @@ -185,7 +163,6 @@ impl<'scope, F, S> ScopeFuture<'scope, F, S> let future: Arc = Arc::new(ScopeFuture:: { state: AtomicUsize::new(STATE_PARKED), - registry: scope.registry(), contents: Mutex::new(ScopeFutureContents { spawn: None, unpark: None, @@ -212,14 +189,6 @@ impl<'scope, F, S> ScopeFuture<'scope, F, S> future } - /// Creates a `JobRef` from this job -- note that this hides all - /// lifetimes, so it is up to you to ensure that this JobRef - /// doesn't outlive any data that it closes over. - unsafe fn into_job_ref(this: Arc) -> JobRef { - let this: *const Self = mem::transmute(this); - JobRef::new(this) - } - fn make_unpark(this: &Arc) -> Arc { // Hide any lifetimes in `self`. This is safe because, until // `self` is dropped, the counter is not decremented, and so @@ -257,15 +226,17 @@ impl<'scope, F, S> ScopeFuture<'scope, F, S> // previous execution might have moved us to the // PARKED state but not yet released the lock. let contents = self.contents.lock().unwrap(); + let task_ref = contents.this.clone() + .expect("this ref already dropped"); - // Assert that `job_ref` remains valid until - // it is executed. That's true because - // `job_ref` holds a ref on the `Arc` and - // because, until `job_ref` completes, the - // references in the future are valid. + // We assert that `contents.scope` will be not + // be dropped until the task is executed. This + // is true because we only drop + // `contents.scope` from within `RayonTask::execute()`. unsafe { - let job_ref = Self::into_job_ref(contents.this.clone().unwrap()); - self.registry.inject_or_push(job_ref); + contents.scope.as_ref() + .expect("scope already dropped") + .spawn_task(task_ref); } return; } @@ -339,26 +310,22 @@ impl<'scope, F, S> ScopeFuture<'scope, F, S> } impl<'scope, F, S> Unpark for ScopeFuture<'scope, F, S> - where F: Future + Send + 'scope, S: FutureScope<'scope>, + where F: Future + Send + 'scope, S: ScopeHandle<'scope>, { fn unpark(&self) { self.unpark_inherent(); } } -impl<'scope, F, S> Job for ScopeFuture<'scope, F, S> - where F: Future + Send + 'scope, S: FutureScope<'scope>, +impl<'scope, F, S> RayonTask for ScopeFuture<'scope, F, S> + where F: Future + Send + 'scope, S: ScopeHandle<'scope>, { - unsafe fn execute(this: *const Self) { - let this: Arc = mem::transmute(this); - + fn execute(this: Arc) { // *generally speaking* there should be no contention for the // lock, but it is possible -- we can end execution, get re-enqeueud, // and re-executed, before we have time to return from this fn let mut contents = this.contents.lock().unwrap(); - log!(FutureExecute { state: this.state.load(Relaxed) }); - this.begin_execute_state(); loop { if contents.canceled { @@ -366,17 +333,14 @@ impl<'scope, F, S> Job for ScopeFuture<'scope, F, S> } else { match contents.poll() { Ok(Async::Ready(v)) => { - log!(FutureExecuteReady); return contents.complete(Ok(Async::Ready(v))); } Ok(Async::NotReady) => { - log!(FutureExecuteNotReady); if this.end_execute_state() { return; } } Err(err) => { - log!(FutureExecuteErr); return contents.complete(Err(err)); } } @@ -386,7 +350,7 @@ impl<'scope, F, S> Job for ScopeFuture<'scope, F, S> } impl<'scope, F, S> ScopeFutureContents<'scope, F, S> - where F: Future + Send + 'scope, S: FutureScope<'scope>, + where F: Future + Send + 'scope, S: ScopeHandle<'scope>, { fn poll(&mut self) -> Poll, CUError> { let unpark = self.unpark.clone().unwrap(); @@ -394,8 +358,6 @@ impl<'scope, F, S> ScopeFutureContents<'scope, F, S> } fn complete(&mut self, value: Poll, CUError>) { - log!(FutureComplete); - // So, this is subtle. We know that the type `F` may have some // data which is only valid until the end of the scope, and we // also know that the scope doesn't end until `self.counter` @@ -420,8 +382,7 @@ impl<'scope, F, S> ScopeFutureContents<'scope, F, S> // somewhere useful if we can. let mut err = None; if let Some(waiting_task) = self.waiting_task.take() { - log!(FutureUnparkWaitingTask); - match unwind::halt_unwinding(|| waiting_task.unpark()) { + match panic::catch_unwind(AssertUnwindSafe(|| waiting_task.unpark())) { Ok(()) => { } Err(e) => { err = Some(e); } } @@ -432,30 +393,33 @@ impl<'scope, F, S> ScopeFutureContents<'scope, F, S> // to `new_rayon_future()` ensures it for us. let scope = self.scope.take().unwrap(); if let Some(err) = err { - scope.future_panicked(err); + scope.panicked(err); } else { - scope.future_completed(); + scope.ok(); } } } -impl<'scope, F, S> LatchProbe for ScopeFuture<'scope, F, S> - where F: Future + Send, S: FutureScope<'scope>, -{ - fn probe(&self) -> bool { - self.state.load(Acquire) == STATE_COMPLETE - } -} +trait ScopeFutureTrait: Send + Sync { + /// Returns true when future is in the COMPLETE state. + fn probe(&self) -> bool; -/// NB. Although this is public, it is not exposed to outside users. -pub trait ScopeFutureTrait: Send + Sync + LatchProbe { + /// Execute the `poll` operation of a future: read the result if + /// it is ready, return `Async::NotReady` otherwise. fn poll(&self) -> Poll; + + /// Indicate that we no longer care about the result of the future. + /// Corresponds to `Drop` in the future trait. fn cancel(&self); } impl<'scope, F, S> ScopeFutureTrait, CUError> for ScopeFuture<'scope, F, S> - where F: Future + Send, S: FutureScope<'scope>, + where F: Future + Send, S: ScopeHandle<'scope>, { + fn probe(&self) -> bool { + self.state.load(Acquire) == STATE_COMPLETE + } + fn poll(&self) -> Poll, CUError> { // Important: due to transmute hackery, not all the fields are // truly known to be valid at this point. In particular, the @@ -467,7 +431,6 @@ impl<'scope, F, S> ScopeFutureTrait, CUError> for ScopeFuture<'scop let r = mem::replace(&mut contents.result, Ok(Async::NotReady)); return r; } else { - log!(FutureInstallWaitingTask { state: state }); contents.waiting_task = Some(task::park()); Ok(Async::NotReady) } diff --git a/rayon-core/src/future/test.rs b/src/future/test.rs similarity index 83% rename from rayon-core/src/future/test.rs rename to src/future/test.rs index 29d578ac4..dc9383fac 100644 --- a/rayon-core/src/future/test.rs +++ b/src/future/test.rs @@ -2,9 +2,10 @@ use futures::{self, Async, Future}; use futures::future::lazy; use futures::sync::oneshot; use futures::task::{self, Unpark}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; use ::{scope, ThreadPool, Configuration}; +use super::ScopeFutureExt; /// Basic test of using futures to data on the stack frame. #[test] @@ -218,3 +219,54 @@ fn double_unpark() { } } } + +#[test] +fn async_future_map() { + let data = Arc::new(Mutex::new(format!("Hello, "))); + + let pool = ThreadPool::global(); + let a = pool.spawn_future(lazy({ + let data = data.clone(); + move || Ok::<_, ()>(data) + })); + let future = pool.spawn_future(a.map(|data| { + let mut v = data.lock().unwrap(); + v.push_str("world!"); + })); + let () = future.wait().unwrap(); + + // future must have executed for the scope to have ended, even + // though we never invoked `wait` to observe its result + assert_eq!(&data.lock().unwrap()[..], "Hello, world!"); +} + +#[test] +#[should_panic(expected = "Hello, world!")] +fn async_future_panic_prop() { + let pool = ThreadPool::global(); + let future = pool.spawn_future(lazy(move || Ok::<(), ()>(argh()))); + let _ = future.rayon_wait(); // should panic, not return a value + + fn argh() -> () { + if true { + panic!("Hello, world!"); + } + } +} + +#[test] +fn async_future_scope_interact() { + let pool = ThreadPool::global(); + let future = pool.spawn_future(lazy(move || Ok::(22))); + + let mut vec = vec![]; + scope(|s| { + let future = s.spawn_future(future.map(|x| x * 2)); + s.spawn(|_| { + vec.push(future.rayon_wait().unwrap()); + }); // just because + }); + + assert_eq!(vec![44], vec); +} + diff --git a/src/lib.rs b/src/lib.rs index 20498466c..44cb9e1a7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,9 @@ extern crate rayon_core; extern crate either; +#[cfg(rayon_unstable)] +extern crate futures; + #[cfg(test)] extern crate rand; @@ -21,6 +24,9 @@ mod private; mod split_producer; +#[cfg(rayon_unstable)] +pub mod future; + pub mod collections; pub mod iter; pub mod option; @@ -43,7 +49,3 @@ pub use rayon_core::ThreadPool; pub use rayon_core::join; pub use rayon_core::{scope, Scope}; pub use rayon_core::spawn; -#[cfg(rayon_unstable)] -pub use rayon_core::spawn_future; -#[cfg(rayon_unstable)] -pub use rayon_core::RayonFuture; diff --git a/src/prelude.rs b/src/prelude.rs index a13d6a378..3683f5d43 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -12,3 +12,6 @@ pub use iter::ParallelIterator; pub use slice::ParallelSlice; pub use slice::ParallelSliceMut; pub use str::ParallelString; + +#[cfg(rayon_unstable)] +pub use future::ScopeFutureExt; diff --git a/tests/compile-fail-unstable/future_escape.rs b/tests/compile-fail-unstable/future_escape.rs index 9d187c626..e33208f09 100644 --- a/tests/compile-fail-unstable/future_escape.rs +++ b/tests/compile-fail-unstable/future_escape.rs @@ -2,13 +2,13 @@ extern crate futures; extern crate rayon; use futures::future::lazy; -use rayon::scope; +use rayon::prelude::*; fn a() { let data = &mut [format!("Hello, ")]; let mut future = None; - scope(|s| { + rayon::scope(|s| { let data = &mut *data; future = Some(s.spawn_future(lazy(move || Ok::<_, ()>(&mut data[0])))); }); @@ -21,7 +21,7 @@ fn b() { let data = &mut [format!("Hello, ")]; let mut future = None; - scope(|s| { + rayon::scope(|s| { future = Some(s.spawn_future(lazy(move || Ok::<_, ()>(&mut data[0])))); }); @@ -32,7 +32,7 @@ fn b() { fn c() { let mut future = None; let data = &mut [format!("Hello, ")]; - scope(|s| { + rayon::scope(|s| { future = Some(s.spawn_future(lazy(move || Ok::<_, ()>(&mut data[0])))); }); } //~ ERROR borrowed value does not live long enough From 71aeed3922fcd019ca696d108d6b0f86e219af69 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Sat, 9 Sep 2017 15:15:12 -0700 Subject: [PATCH 2/9] create a separate rayon-futures crate --- .travis.yml | 5 + Cargo.toml | 3 +- rayon-core/Cargo.toml | 2 +- rayon-futures/Cargo.toml | 15 ++ rayon-futures/LICENSE-APACHE | 201 ++++++++++++++++++ rayon-futures/LICENSE-MIT | 25 +++ src/future/mod.rs => rayon-futures/src/lib.rs | 3 + {src/future => rayon-futures/src}/test.rs | 2 +- src/lib.rs | 6 - src/prelude.rs | 3 - 10 files changed, 252 insertions(+), 13 deletions(-) create mode 100644 rayon-futures/Cargo.toml create mode 100644 rayon-futures/LICENSE-APACHE create mode 100644 rayon-futures/LICENSE-MIT rename src/future/mod.rs => rayon-futures/src/lib.rs (99%) rename {src/future => rayon-futures/src}/test.rs (99%) diff --git a/.travis.yml b/.travis.yml index 8e5bd7a8c..3cf8d8b24 100644 --- a/.travis.yml +++ b/.travis.yml @@ -36,3 +36,8 @@ script: cargo test -p rayon-demo && ./ci/highlander.sh fi + - | + if [ -n "$RUSTFLAGS" ]; then + cargo build -p rayon-futures && + cargo test -p rayon-futures + fi diff --git a/Cargo.toml b/Cargo.toml index fe7c3f385..128193c5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,12 +9,11 @@ repository = "https://github.com/nikomatsakis/rayon" documentation = "https://docs.rs/rayon/" [workspace] -members = ["rayon-demo", "rayon-core"] +members = ["rayon-demo", "rayon-core", "rayon-futures"] exclude = ["ci"] [dependencies] rayon-core = { version = "1.2", path = "rayon-core" } -futures = "0.1.7" # This is a public dependency! [dependencies.either] diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index 9a45e70ec..800a2f317 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rayon-core" -version = "1.2.1" +version = "1.2.2" authors = ["Niko Matsakis ", "Josh Stone "] description = "Core APIs for Rayon" diff --git a/rayon-futures/Cargo.toml b/rayon-futures/Cargo.toml new file mode 100644 index 000000000..9eb749aab --- /dev/null +++ b/rayon-futures/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "rayon-futures" +version = "0.1.0" +authors = ["Niko Matsakis ", + "Josh Stone "] +description = "Futures integration into Rayon" +license = "Apache-2.0/MIT" +repository = "https://github.com/nikomatsakis/rayon" +documentation = "https://docs.rs/rayon-futures/" + +[dependencies] +rayon-core = { version = "1.2.2", path = "../rayon-core" } +futures = "0.1.7" + +[dev-dependencies] diff --git a/rayon-futures/LICENSE-APACHE b/rayon-futures/LICENSE-APACHE new file mode 100644 index 000000000..16fe87b06 --- /dev/null +++ b/rayon-futures/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/rayon-futures/LICENSE-MIT b/rayon-futures/LICENSE-MIT new file mode 100644 index 000000000..25597d583 --- /dev/null +++ b/rayon-futures/LICENSE-MIT @@ -0,0 +1,25 @@ +Copyright (c) 2010 The Rust Project Developers + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/src/future/mod.rs b/rayon-futures/src/lib.rs similarity index 99% rename from src/future/mod.rs rename to rayon-futures/src/lib.rs index fdbb72a92..f6f98f92a 100644 --- a/src/future/mod.rs +++ b/rayon-futures/src/lib.rs @@ -2,6 +2,9 @@ //! //! See `README.md` for details. +extern crate futures; +extern crate rayon_core; + use futures::{Async, Future, Poll}; use futures::executor; use futures::future::CatchUnwind; diff --git a/src/future/test.rs b/rayon-futures/src/test.rs similarity index 99% rename from src/future/test.rs rename to rayon-futures/src/test.rs index dc9383fac..9054c53c5 100644 --- a/src/future/test.rs +++ b/rayon-futures/src/test.rs @@ -4,7 +4,7 @@ use futures::sync::oneshot; use futures::task::{self, Unpark}; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; -use ::{scope, ThreadPool, Configuration}; +use rayon_core::{scope, ThreadPool, Configuration}; use super::ScopeFutureExt; /// Basic test of using futures to data on the stack frame. diff --git a/src/lib.rs b/src/lib.rs index 44cb9e1a7..a21baafc3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,9 +10,6 @@ extern crate rayon_core; extern crate either; -#[cfg(rayon_unstable)] -extern crate futures; - #[cfg(test)] extern crate rand; @@ -24,9 +21,6 @@ mod private; mod split_producer; -#[cfg(rayon_unstable)] -pub mod future; - pub mod collections; pub mod iter; pub mod option; diff --git a/src/prelude.rs b/src/prelude.rs index 3683f5d43..a13d6a378 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -12,6 +12,3 @@ pub use iter::ParallelIterator; pub use slice::ParallelSlice; pub use slice::ParallelSliceMut; pub use str::ParallelString; - -#[cfg(rayon_unstable)] -pub use future::ScopeFutureExt; From 0d043823b058bf93aafc5ba4633fc0583c79756a Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Sat, 9 Sep 2017 15:19:24 -0700 Subject: [PATCH 3/9] restore the futures README (probably needs updating) --- rayon-futures/README.md | 277 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 277 insertions(+) create mode 100644 rayon-futures/README.md diff --git a/rayon-futures/README.md b/rayon-futures/README.md new file mode 100644 index 000000000..65428ebd8 --- /dev/null +++ b/rayon-futures/README.md @@ -0,0 +1,277 @@ +# Future integration into Rayon + +**NOTE:** `rayon-futures` currently requires unstable features of +`rayon-core`, which may only be enabled with `rustc --cfg`, +e.g. by setting `RUSTFLAGS=-cfg rayon_unstable` in the environment. + +## How futures work + +Let's start with a brief coverage of how futures work. Our example will +be a simple chain of futures: + + F_map -> F_socket + +Here `F_socket` is a future that maps to a TCP socket. It returns a +`Vec` of data read from that socket. `F_map` is a future will take +that data and do some transformation. (Note that the real futures for +reading from sockets etc do not work in this way, this is just an +example.) + +The idea of futures is that each future offers a `poll()` method. When +`poll()` is invoked, the future will attempt to execute. Typically, +this often involves recursively calling `poll()` on other futures. So, +in our example, `F_map` when it starts would call `F_socket.poll()` to +see if the data is ready. The idea is that `poll()` returns one of +three values: + +- `Ok(Async::Ready(R))` -- the future has completed, here is the result `R`. +- `Err(E)` -- the future has completed and resulted in an error `E`. +- `Ok(Async::NotReady)` -- the future is not yet complete. + +The last one is the most interesting. It means that the future is +blocked on *some event X*, typically an I/O event (i.e., we are +waiting for more data to arrive on a TCP socket). + +When a future returns `NotReady`, it also has one additional job. It +must register the "current task" (think for now of the current thread) +to be re-awoken when the event X has occurred. For most futures, this +job is delegated to another future: e.g., in our example, `F_map` +invokes `F_socket.poll()`. So if `F_socket.poll()` returns not-ready, +then it will have registered the current thread already, and hence +`F_map` can merely propagates the `NotReady` result further up. + +### The current task and executor + +A key concept of the futures.rs library is that of an *executor*. The +executor is the runtime that first invokes the top-level future +(`T_map`, in our example). This is precisely the role that Rayon +plays. Note that in any futures system there may be many +interoperating execturs though. + +Part of an executors job is to maintain some thread-local storage +(TLS) when a future is executing. In particular, it must setup the +"current task" (basically a unique integer, although it's an opaque +type) as well as an "unpark object" of type +`Arc`. [The `Unpark` trait][unpark] offers a single method +(`unpark()`) which can be invoked when the task should be +re-awoken. So `F_socket` might, for example, get the current +`Arc` object and store it for use by an I/O thread. The I/O +thread might invoke `epoll()` or `select()` or whatever and, when it +detects the socket has more data, invoke the `unpark()` method. + +[unpark]: https://docs.rs/futures/0.1/futures/executor/trait.Unpark.html + +## Rayon's futures integration + +When you spawn a future of type `F` into rayon, the idea is that it is +going to start independently executing in the thread-pool. Meanwhile, +the `spawn_future()` method returns to you your own future (let's call +it `F'`) that you can use to poll and monitor its progress. Internally +within Rayon, however, we only allocate a single `Arc` to represent +both of these things -- an `Arc>`, to be precise -- and +this `Arc` hence serves two distinct roles. + +The operations on `F'` (the handle returned to the user) are specified +by the trait `ScopeFutureTrait` and are very simple. The user can +either `poll()` the future, which is checking to see if rayon is done +executing it yet, or `cancel()` the future. `cancel()` occurs when +`F'` is dropped, which indicates that the user no longer has interest +in the result. + +### Future reference counting + +Each spawned future is represents by an `Arc`. This `Arc` actually has +some interesting structure. Each of the edges in the diagram below +represents something that is "kept alive" by holding a ref count (in +some way, usually via an `Arc`): + + F' ---+ [ deque ] --+ + | | + v v + +---> /---------------------\ + | | registry: | ------> [rayon registry] + | | contents: --------\ | + | | | scope | | ------> [spawning scope] + | | | unpark | | --+ + | | | this | | --+ (self references) + | | | ... | | | + | | \-----------------/ | | + | \---------------------/ | + +-------------------------------+ + +Let's walk through them: + +- The incoming edge from `F'` represents the edge from the future that was returned + to the caller of `spawn_future`. This ensures that the future arc will + not be freed so long as the caller is still interesting in looking at + its result. +- The incoming edge from `[ deque ]` represents the fact that when the + future is enqueued into a thread-local deque (which it only + sometimes is), that deque holds a ref. This is done by transmuting + the `Arc` into a `*const Job` object (and hence the `*const` + logically holds the ref that was owned by the `Arc`). When the job + is executed, it is transmuted back and the resulting `Arc` is + eventually dropped, releasing the ref. +- The `registry` field holds onto an `Arc` and hence keeps + some central registry alive. This doesn't really do much but prevent + the `Registry` from being dropped. In particular, this doesn't + prevent the threads in a registry from terminating while the future + is unscheduled etc (though other fields in the future do). +- The `scope` field (of type `S`) is the "enclosing scope". This scope + is an abstract value that implements the `FutureScope<'scope>` trait + -- this means that it is responsible for ensuring that `'scope` does + not end until one of the `FutureScope` methods are invoked (which + occurs when the future has finished executing). For example, if the + future is spawned inside a `scope()` call, then the `S` will be a + wrapper (`ScopeFutureScope`) around a `*const Scope<'scope>`. When + the future is created one job is allocated for this future in the + scope, and the scope counter is decremented once the future is + marked as completing. + - In general, the job of the `scope` field is to ensure that the + future type (`F`) remains valid. After all, since `F: 'scope`, `F` + is known to be valid until the lifetime `'scope` ends, and that + lifetime cannot end until the `scope` methods are invoked, so we + know that `F` must stay valid until one of those methods are + invoked. + - All of our data of type `F` is stored in the field `spawn` (not + shown here). This field is always set to `None` before the scope + counter is decremented. See the section on lifetime safety for more + details. +- The `unpark` and `self` fields both store an `Arc` which is actually + this same future. Thus the future has a ref count cycle (two of + them...) and cannot be freed until this cycle is broken. Both of + these fields are actually `Option>` fields and will be set + to `None` once the future is complete, breakin the cycle and + allowing it to be freed when other references are dropped. + +### The future state machine + +Internally, futures go through various states, depicted here: + + PARKED <----+ + | | + v | + UNPARKED | + | | + v | + EXECUTING --+ + | | ^ + | v | + | EXECUTING_UNPARKED + | + v + COMPLETE + +When they are first created, futures begin as *PARKED*. A *PARKED* +future is one that is waiting for something to happen. It is not +scheduled in the deque of any thread. Even before we return from +`spawn_future()`, however, we will transition into *UNPARKED*. An +*UNPARKED* future is one that is waiting to be executed. It is +enqueued in the deque of some Rayon thread and hence will execute when +the thread gets around to it. + +Once the future begins to execute (it itself is a Rayon job), it +transitions into the *EXECUTING* state. This means that it is busy +calling `F.poll()`, basically. While it calls `poll()`, it also sets +up its `contents.unpark` field as the current "unpark" instance. Hence +if `F` returns `NotReady`, it will clone this `unpark` field and hold +onto it to signal us the future is ready to execute again. + +For now let's assume that `F` is complete and hence readys either +`Ok(Ready(_))` or `Err(_)`. In that case, the future can transition to +`COMPLETE`. At this point, many bits of state that are no longer +needed (e.g., the future itself, but also the `this` and `unpark` +fields) are set to `None` and dropped, and the result is stored in the +`result` field. (Moreover, we may have to signal other tasks, but that +is discussed in a future section.) + +If `F` returns `Ok(Async::NotReady)`, then we would typically +transition to the `PARKED` state and await the call to +`unpark()`. When `unpark()` is called, it would move the future into +the `UNPARK` state and inject it into the registry. + +However, due to the vagaries of thread-scheduling, it *can* happen +that `unpark()` is called before we exit the `EXECUTING` state. For +example, we might invoke `F.poll()`, which send the `Unpark` instance +to the I/O thread, which detects I/O, and invokes `unpark()`, all +before `F.poll()` has returned. In that case, the `unpark()` method +will transition the state (atomically, of course) to +`EXECUTING_UNPARKED`. In that case, instead of transitioning to +`PARKED` when `F.poll()` returns, the future will simply transition +right back to `EXECUTING` and try calling `poll()` again. This can +repeat a few times. + +### Lifetime safety + +Of course, Rayon's signature feature is that it allows you to use a +future `F` that includes references, so long as those references +outlive the lifetime of the scope `'scope`. So why is this safe? + +The basic idea of why this is safe is as follows. The `ScopeFuture` +struct holds a ref on the scope itself (via the field `scope`). +Until this ref is decremented, the scope will not end (and hence +`'scope` is still active). This ref is only decremented while the +future transitions into the *COMPLETE* state -- so anytime before +then, we know we don't have to worry, the references are still valid. + +As we transition into the *COMPLETE* state is where things get more +interesting. You'll notice that signaling the `self.scope` job as done +the *last* thing that happens during that transition. Importantly, +before that is done, we drop all access that we have to the type `F`: +that is, we store `None` into the fields that might reference values +of type `F`. This implies that we know that, whatever happens after we +transition into *COMPLETE*, we can't access any of the references +found in `F` anymore. + +This is good, because there *are* still active refs to the +`ScopeFuture` after we enter the *COMPLETE* state. There are two +sources of these: unpark values and the future result. + +**Unpark values.** We may have given away `Arc` values -- +these are trait objects, but they are actually refs to our +`ScopeFuture`. Note that `Arc: 'static`, so these could be +floating about for any length of time (we had to transmute away the +lifetimes to give them out). This is ok because (a) the `Arc` keeps +the `ScopeFuture` alive and (b) the only thing you can do is to call +`unpark()`, which will promptly return since the state is *COMPLETE* +(and, anyhow, as we saw above, it doesn't have access to any +references anyhow). + +**Future result.** The other, more interesting reference to the +`ScopeFuture` is the value that we gave back to the user when we +spawned the future in the first place. This value is more interesting +because it can be used to do non-trivial things, unlike the +`Arc`. If you look carefully at this handle, you will see that +its type has been designed to hide the type `F`. In fact, it only +reveals the types `T` and `E` which are the ok/err result types of the +future `F`. This is intentonal: suppose that the type `F` includes +some references, but those references don't appear in the result. We +want the "result" future to be able to escape the scope, then, to any +place where the types `T` and `E` are still in scope. If we exposed +`F` here that would not be possible. (Hiding `F` also requires a +transmute to an object type, in this case an internal trait called +`ScopeFutureTrait`.) Note though that it is possible for `T` and `E` +to have references in them. They could even be references tied to the +scope. + +So what can a user do with this result future? They have two +operations available: poll and cancel. Let's look at cancel first, +since it's simpler. If the state is *COMPLETE*, then `cancel()` is an +immediate no-op, so we know that it can't be used to access any +references that may be invalid. In any case, the only thing it does is +to set a field to true and invoke `unpark()`, and we already examined +the possible effects of `unpark()` in the previous section.q + +So what about `poll()`? This is how the user gets the final result out +of the future. The important thing that it does is to access (and +effectively nullify) the field `result`, which stores the result of +the future and hence may have access to `T` and `E` values. These +values may contain references...so how we know that they are still in +scope? The answer is that those types are exposed in the user's type +of the future, and hence the basic Rust type system should guarantee +that any references are still valid, or else the user shouldn't be +able to call `poll()`. (The same is true at the time of cancellation, +but that's not important, since `cancel()` doesn't do anything of +interest.) + + From edd07673aff5978820273b976d3281be97e79f3c Mon Sep 17 00:00:00 2001 From: Stefan Schindler Date: Wed, 6 Sep 2017 10:22:25 +0200 Subject: [PATCH 4/9] Focus on the expected error by ignoring the unused warning --- tests/compile-fail/quicksort_race1.rs | 2 +- tests/compile-fail/quicksort_race2.rs | 2 +- tests/compile-fail/quicksort_race3.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/compile-fail/quicksort_race1.rs b/tests/compile-fail/quicksort_race1.rs index d00861ab9..aafa2a7aa 100644 --- a/tests/compile-fail/quicksort_race1.rs +++ b/tests/compile-fail/quicksort_race1.rs @@ -6,7 +6,7 @@ fn quick_sort(v: &mut [T]) { } let mid = partition(v); - let (lo, hi) = v.split_at_mut(mid); + let (lo, _hi) = v.split_at_mut(mid); rayon::join(|| quick_sort(lo), || quick_sort(lo)); //~ ERROR E0524 } diff --git a/tests/compile-fail/quicksort_race2.rs b/tests/compile-fail/quicksort_race2.rs index 145404b1c..a94426a22 100644 --- a/tests/compile-fail/quicksort_race2.rs +++ b/tests/compile-fail/quicksort_race2.rs @@ -6,7 +6,7 @@ fn quick_sort(v: &mut [T]) { } let mid = partition(v); - let (lo, hi) = v.split_at_mut(mid); + let (lo, _hi) = v.split_at_mut(mid); rayon::join(|| quick_sort(lo), || quick_sort(v)); //~ ERROR E0500 } diff --git a/tests/compile-fail/quicksort_race3.rs b/tests/compile-fail/quicksort_race3.rs index 8f3c24430..2bee4dedd 100644 --- a/tests/compile-fail/quicksort_race3.rs +++ b/tests/compile-fail/quicksort_race3.rs @@ -6,7 +6,7 @@ fn quick_sort(v: &mut [T]) { } let mid = partition(v); - let (lo, hi) = v.split_at_mut(mid); + let (_lo, hi) = v.split_at_mut(mid); rayon::join(|| quick_sort(hi), || quick_sort(hi)); //~ ERROR E0524 } From db6e26533a5d11ce386e82c10cdad787d1a9f98b Mon Sep 17 00:00:00 2001 From: Stefan Schindler Date: Wed, 6 Sep 2017 10:45:47 +0200 Subject: [PATCH 5/9] Fix compile tests --- tests/compile-fail/no_send_par_iter.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/compile-fail/no_send_par_iter.rs b/tests/compile-fail/no_send_par_iter.rs index 04be9d207..f0593556e 100644 --- a/tests/compile-fail/no_send_par_iter.rs +++ b/tests/compile-fail/no_send_par_iter.rs @@ -14,14 +14,14 @@ fn main() { let x = Some(NoSend(null())); x.par_iter() - .map(|&x| x) //~ ERROR Send` is not satisfied - .count(); //~ ERROR Send` is not satisfied + .map(|&x| x) //~ ERROR E0277 + .count(); //~ ERROR E0599 x.par_iter() - .filter_map(|&x| Some(x)) //~ ERROR Send` is not satisfied - .count(); //~ ERROR Send` is not satisfied + .filter_map(|&x| Some(x)) //~ ERROR E0277 + .count(); //~ ERROR E0599 x.par_iter() - .cloned() //~ ERROR Send` is not satisfied - .count(); //~ ERROR Send` is not satisfied + .cloned() //~ ERROR E0277 + .count(); //~ ERROR E0599 } From 126af23b0dbd0c3cd2c416948d77000887e6774b Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Fri, 15 Sep 2017 11:24:02 -0700 Subject: [PATCH 6/9] test rayon-futures on appveyor too --- appveyor.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/appveyor.yml b/appveyor.yml index 640348608..406f5cea9 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -59,3 +59,7 @@ test_script: cargo test -p rayon-core && cargo test -p rayon-demo ) + - if not [%RUSTFLAGS%]==[%^RUSTFLAGS%] ( + cargo build -p rayon-futures && + cargo test -p rayon-futures + ) From cd48e4325a4997c5ef16c31909a1bd8299e78961 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Fri, 15 Sep 2017 11:36:50 -0700 Subject: [PATCH 7/9] move rayon-futures compile-fail --- rayon-futures/Cargo.toml | 1 + rayon-futures/src/test.rs | 17 +++++++++++++++++ rayon-futures/tests/compile-fail/README.md | 11 +++++++++++ .../tests/compile-fail}/future_escape.rs | 3 ++- 4 files changed, 31 insertions(+), 1 deletion(-) create mode 100644 rayon-futures/tests/compile-fail/README.md rename {tests/compile-fail-unstable => rayon-futures/tests/compile-fail}/future_escape.rs (93%) diff --git a/rayon-futures/Cargo.toml b/rayon-futures/Cargo.toml index 9eb749aab..339c6ec77 100644 --- a/rayon-futures/Cargo.toml +++ b/rayon-futures/Cargo.toml @@ -13,3 +13,4 @@ rayon-core = { version = "1.2.2", path = "../rayon-core" } futures = "0.1.7" [dev-dependencies] +compiletest_rs = "0.2.1" diff --git a/rayon-futures/src/test.rs b/rayon-futures/src/test.rs index 9054c53c5..5e30532bc 100644 --- a/rayon-futures/src/test.rs +++ b/rayon-futures/src/test.rs @@ -1,12 +1,29 @@ +extern crate compiletest_rs as compiletest; + use futures::{self, Async, Future}; use futures::future::lazy; use futures::sync::oneshot; use futures::task::{self, Unpark}; +use std::path::PathBuf; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; use rayon_core::{scope, ThreadPool, Configuration}; use super::ScopeFutureExt; +fn run_compiletest(mode: &str, path: &str) { + let mut config = compiletest::default_config(); + config.mode = mode.parse().ok().expect("Invalid mode"); + config.src_base = PathBuf::from(path); + config.target_rustcflags = Some("-L ../target/debug/ -L ../target/debug/deps/".to_owned()); + + compiletest::run_tests(&config); +} + +#[test] +fn negative_tests_compile_fail() { + run_compiletest("compile-fail", "tests/compile-fail"); +} + /// Basic test of using futures to data on the stack frame. #[test] fn future_test() { diff --git a/rayon-futures/tests/compile-fail/README.md b/rayon-futures/tests/compile-fail/README.md new file mode 100644 index 000000000..58a148514 --- /dev/null +++ b/rayon-futures/tests/compile-fail/README.md @@ -0,0 +1,11 @@ +This directory contains test files that are **not expected to +compile**. It is useful for writing tests that things which ought not +to type-check do not, in fact, type-check. + +To write a test, just write a `.rs` file using Rayon. Then, for each +compilation error, write a `//~ ERROR E123` annotation on the line +where the error occurs. `E123` should be the error code that is issued +by rustc. This should be reasonably robust against future compiler +changes, though in some cases the errors may start showing up on +different lines etc as compiler heuristics change. + diff --git a/tests/compile-fail-unstable/future_escape.rs b/rayon-futures/tests/compile-fail/future_escape.rs similarity index 93% rename from tests/compile-fail-unstable/future_escape.rs rename to rayon-futures/tests/compile-fail/future_escape.rs index e33208f09..e72e0ac6e 100644 --- a/tests/compile-fail-unstable/future_escape.rs +++ b/rayon-futures/tests/compile-fail/future_escape.rs @@ -1,8 +1,9 @@ extern crate futures; extern crate rayon; +extern crate rayon_futures; use futures::future::lazy; -use rayon::prelude::*; +use rayon_futures::ScopeFutureExt; fn a() { let data = &mut [format!("Hello, ")]; From a14568c6767c5be201f04568220f045598fbd479 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Fri, 15 Sep 2017 12:13:35 -0700 Subject: [PATCH 8/9] only test futures on nightly --- .travis.yml | 4 +++- appveyor.yml | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 3cf8d8b24..f73809fa3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -39,5 +39,7 @@ script: - | if [ -n "$RUSTFLAGS" ]; then cargo build -p rayon-futures && - cargo test -p rayon-futures + if [ $TRAVIS_RUST_VERSION == nightly ]; then + cargo test -p rayon-futures + fi fi diff --git a/appveyor.yml b/appveyor.yml index 406f5cea9..077e5312f 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -59,7 +59,9 @@ test_script: cargo test -p rayon-core && cargo test -p rayon-demo ) - - if not [%RUSTFLAGS%]==[%^RUSTFLAGS%] ( + - if not "%RUSTFLAGS%"=="%^RUSTFLAGS%" ( cargo build -p rayon-futures && - cargo test -p rayon-futures + if [%CHANNEL%]==[nightly] ( + cargo test -p rayon-futures + ) ) From 3c00397a80c7e343be3d602dce811fac45054bbb Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Sat, 16 Sep 2017 15:05:56 -0700 Subject: [PATCH 9/9] update to glium 0.17 --- rayon-demo/Cargo.toml | 2 +- rayon-demo/src/nbody/visualize.rs | 49 ++++++++++++++++++------------- 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/rayon-demo/Cargo.toml b/rayon-demo/Cargo.toml index 1a2e67151..3a47f2c61 100644 --- a/rayon-demo/Cargo.toml +++ b/rayon-demo/Cargo.toml @@ -8,7 +8,7 @@ publish = false rayon = { path = "../" } cgmath = "0.14" docopt = "0.7" -glium = "0.16" +glium = "0.17" rand = "0.3" rustc-serialize = "0.3" time = "0.1" diff --git a/rayon-demo/src/nbody/visualize.rs b/rayon-demo/src/nbody/visualize.rs index 47f1e2409..fc6ae7757 100644 --- a/rayon-demo/src/nbody/visualize.rs +++ b/rayon-demo/src/nbody/visualize.rs @@ -1,8 +1,9 @@ use cgmath::{self, Angle, Vector3, Matrix4, EuclideanSpace, Point3, Rad}; -use glium::{DisplayBuild, Program, Surface}; +use glium::{Display, Program, Surface}; use glium::{IndexBuffer, VertexBuffer}; use glium::{DrawParameters, Depth, DepthTest}; -use glium::glutin::{ElementState, Event, WindowBuilder}; +use glium::glutin::{EventsLoop, WindowBuilder, ContextBuilder}; +use glium::glutin::{ElementState, Event, WindowEvent}; use glium::glutin::VirtualKeyCode as Key; use glium::index::PrimitiveType; use rand::{self, Rng, SeedableRng, XorShiftRng}; @@ -70,12 +71,13 @@ struct Instance { implement_vertex!(Instance, color, world_position); pub fn visualize_benchmarks(num_bodies: usize, mut mode: ExecutionMode) { - let display = WindowBuilder::new() + let mut events_loop = EventsLoop::new(); + let window = WindowBuilder::new() .with_dimensions(800, 600) - .with_title("nbody demo".to_string()) - .with_depth_buffer(24) - .build_glium() - .unwrap(); + .with_title("nbody demo".to_string()); + let context = ContextBuilder::new() + .with_depth_buffer(24); + let display = Display::new(window, context, &events_loop).unwrap(); let mut benchmark = NBodyBenchmark::new(num_bodies, &mut rand::thread_rng()); @@ -174,21 +176,28 @@ pub fn visualize_benchmarks(num_bodies: usize, mut mode: ExecutionMode) { ¶ms).unwrap(); target.finish().unwrap(); - for event in display.poll_events() { - match event { - Event::Closed => break 'main, - Event::KeyboardInput(ElementState::Pressed, _, Some(Key::Escape)) => break 'main, - Event::KeyboardInput(ElementState::Pressed, _, Some(Key::P)) => { - mode = ExecutionMode::Par; + let mut done = false; + events_loop.poll_events(|event| { + if let Event::WindowEvent { event, .. } = event { + match event { + WindowEvent::Closed => done = true, + WindowEvent::KeyboardInput { input, .. } => { + if let ElementState::Pressed = input.state { + match input.virtual_keycode { + Some(Key::Escape) => done = true, + Some(Key::P) => mode = ExecutionMode::Par, + Some(Key::R) => mode = ExecutionMode::ParReduce, + Some(Key::S) => mode = ExecutionMode::Seq, + _ => () + } + } + }, + _ => () } - Event::KeyboardInput(ElementState::Pressed, _, Some(Key::S)) => { - mode = ExecutionMode::Seq; - } - Event::KeyboardInput(ElementState::Pressed, _, Some(Key::R)) => { - mode = ExecutionMode::ParReduce; - } - _ => () } + }); + if done { + break 'main; } } }