Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract rayon-futures as a separate crate #436

Merged
merged 9 commits into from
Sep 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,10 @@ script:
cargo test -p rayon-demo &&
./ci/highlander.sh
fi
- |
if [ -n "$RUSTFLAGS" ]; then
cargo build -p rayon-futures &&
if [ $TRAVIS_RUST_VERSION == nightly ]; then
cargo test -p rayon-futures
fi
fi
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ repository = "https://github.com/nikomatsakis/rayon"
documentation = "https://docs.rs/rayon/"

[workspace]
members = ["rayon-demo", "rayon-core"]
members = ["rayon-demo", "rayon-core", "rayon-futures"]
exclude = ["ci"]

[dependencies]
Expand Down
6 changes: 6 additions & 0 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,9 @@ test_script:
cargo test -p rayon-core &&
cargo test -p rayon-demo
)
- if not "%RUSTFLAGS%"=="%^RUSTFLAGS%" (
cargo build -p rayon-futures &&
if [%CHANNEL%]==[nightly] (
cargo test -p rayon-futures
)
)
5 changes: 1 addition & 4 deletions rayon-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rayon-core"
version = "1.2.1"
version = "1.2.2"
authors = ["Niko Matsakis <niko@alum.mit.edu>",
"Josh Stone <cuviper@gmail.com>"]
description = "Core APIs for Rayon"
Expand All @@ -17,7 +17,4 @@ coco = "0.1.1"
libc = "0.2.16"
lazy_static = "0.2.2"

# only if #[cfg(rayon_unstable)], will be removed eventually
futures = "0.1.7"

[dev-dependencies]
6 changes: 6 additions & 0 deletions rayon-core/src/internal/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
//! The internal directory contains internal APIs not meant to be
//! exposed to "end-users" of Rayon, but rather which are useful for
//! constructing abstractions.

pub mod task;
pub mod worker;
74 changes: 74 additions & 0 deletions rayon-core/src/internal/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use std::any::Any;
use std::sync::Arc;

/// Represents a task that can be scheduled onto the Rayon
/// thread-pool. Once a task is scheduler, it will execute exactly
/// once (eventually).
pub trait Task: Send {
fn execute(this: Arc<Self>);
}

/// Represents a handle onto some Rayon scope. This could be either a
/// local scope created by the `scope()` function or the global scope
/// for a thread-pool. To get a scope-handle, you can invoke
/// `ToScopeHandle::to_scope_handle()` on either a `scope` value or a
/// `ThreadPool`.
///
/// The existence of `ScopeHandler` offers a guarantee:
///
/// - The Rust lifetime `'scope` will not end until the scope-handle
/// is dropped, or until you invoke `panicked()` or `ok()`.
///
/// This trait is intended to be used as follows:
///
/// - You have a parallel task of type `T` to perform where `T: 's`,
/// meaning that any references that `T` contains outlive the lifetime
/// `'s`.
/// - You obtain a scope handle `h` of type `H` where `H:
/// ScopeHandle<'s>`; typically this would be by invoking
/// `to_scope_handle()` on a Rayon scope (of type `Scope<'s>`) or a
/// thread-pool (in which case `'s == 'static`).
/// - You invoke `h.spawn()` to start your job(s). This may be done
/// many times.
/// - Note that `h.spawn()` is an unsafe method. You must ensure
/// that your parallel jobs have completed before moving to
/// the next step.
/// - Eventually, when all invocations are complete, you invoke
/// either `panicked()` or `ok()`.
pub unsafe trait ScopeHandle<'scope>: 'scope {
/// Enqueues a task for execution within the thread-pool. The task
/// will eventually be invoked, and once it is, the `Arc` will be
/// dropped.
///
/// **Unsafe:** The caller must guarantee that the scope handle
/// (`self`) will not be dropped (nor will `ok()` or `panicked()`
/// be called) until the task executes. Otherwise, the lifetime
/// `'scope` may end while the task is still pending.
unsafe fn spawn_task<T: Task + 'scope>(&self, task: Arc<T>);

/// Indicates that some sub-task of this scope panicked with the
/// given `err`. This panic will be propagated back to the user as
/// appropriate, depending on how this scope handle was derived.
///
/// This takes ownership of the scope handle, meaning that once
/// you invoke `panicked`, the scope is permitted to terminate
/// (and, in particular, the Rust lifetime `'scope` may end).
fn panicked(self, err: Box<Any + Send>);

/// Indicates that the sub-tasks of this scope that you have
/// spawned concluded successfully.
///
/// This takes ownership of the scope handle, meaning that once
/// you invoke `panicked`, the scope is permitted to terminate
/// (and, in particular, the Rust lifetime `'scope` may end).
fn ok(self);
}

/// Converts a Rayon structure (typicaly a `Scope` or `ThreadPool`)
/// into a "scope handle". See the `ScopeHandle` trait for more
/// details.
pub trait ToScopeHandle<'scope> {
type ScopeHandle: ScopeHandle<'scope>;
fn to_scope_handle(&self) -> Self::ScopeHandle;
}

53 changes: 53 additions & 0 deletions rayon-core/src/internal/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use latch::LatchProbe;
use registry;

/// Represents the active worker thread.
pub struct WorkerThread<'w> {
thread: &'w registry::WorkerThread
}

impl<'w> WorkerThread<'w> {
/// Causes the worker thread to wait until `f()` returns true.
/// While the thread is waiting, it will attempt to steal work
/// from other threads, and may go to sleep if there is no work to
/// steal.
///
/// **Dead-lock warning: This is a low-level interface and cannot
/// be used to wait on arbitrary conditions.** In particular, if
/// the Rayon thread goes to sleep, it will only be awoken when
/// new rayon events occur (e.g., `spawn()` or `join()` is
/// invoked, or one of the methods on a `ScopeHandle`). Therefore,
/// you must ensure that, once the condition `f()` becomes true,
/// some "rayon event" will also occur to ensure that waiting
/// threads are awoken.
pub unsafe fn wait_until_true<F>(&self, f: F) where F: Fn() -> bool {
struct DummyLatch<'a, F: 'a> { f: &'a F }

impl<'a, F: Fn() -> bool> LatchProbe for DummyLatch<'a, F> {
fn probe(&self) -> bool {
(self.f)()
}
}

self.thread.wait_until(&DummyLatch { f: &f });
}
}

/// If the current thread is a Rayon worker thread, then the callback
/// is invoked with a reference to the worker-thread the result of
/// that callback is returned with `Some`. Otherwise, if we are not on
/// a Rayon worker thread, `None` is immediately returned.
pub fn if_in_worker_thread<F,R>(if_true: F) -> Option<R>
where F: FnOnce(&WorkerThread) -> R,
{
let worker_thread = registry::WorkerThread::current();
if worker_thread.is_null() {
None
} else {
unsafe {
let wt = WorkerThread { thread: &*worker_thread };
Some(if_true(&wt))
}
}
}

2 changes: 2 additions & 0 deletions rayon-core/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ unsafe impl Send for JobRef {}
unsafe impl Sync for JobRef {}

impl JobRef {
/// Unsafe: caller asserts that `data` will remain valid until the
/// job is executed.
pub unsafe fn new<T>(data: *const T) -> JobRef
where T: Job
{
Expand Down
10 changes: 2 additions & 8 deletions rayon-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ use std::fmt;
extern crate coco;
#[macro_use]
extern crate lazy_static;
#[cfg(rayon_unstable)]
extern crate futures;
extern crate libc;
extern crate num_cpus;
extern crate rand;
Expand All @@ -51,8 +49,6 @@ mod latch;
mod join;
mod job;
mod registry;
#[cfg(rayon_unstable)]
mod future;
mod scope;
mod sleep;
mod spawn;
Expand All @@ -61,16 +57,14 @@ mod thread_pool;
mod unwind;
mod util;

#[cfg(rayon_unstable)]
pub mod internal;
pub use thread_pool::ThreadPool;
pub use thread_pool::current_thread_index;
pub use thread_pool::current_thread_has_pending_tasks;
pub use join::join;
pub use scope::{scope, Scope};
pub use spawn::spawn;
#[cfg(rayon_unstable)]
pub use spawn::spawn_future;
#[cfg(rayon_unstable)]
pub use future::RayonFuture;

/// Returns the number of threads in the current registry. If this
/// code is executing within a Rayon thread-pool, then this will be
Expand Down
8 changes: 0 additions & 8 deletions rayon-core/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,6 @@ pub enum Event {
JobPanickedErrorNotStored { owner_thread: usize },
ScopeCompletePanicked { owner_thread: usize },
ScopeCompleteNoPanic { owner_thread: usize },

FutureExecute { state: usize },
FutureExecuteReady,
FutureExecuteNotReady,
FutureExecuteErr,
FutureInstallWaitingTask { state: usize },
FutureUnparkWaitingTask,
FutureComplete,
}

pub const DUMP_LOGS: bool = cfg!(debug_assertions);
Expand Down
62 changes: 57 additions & 5 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use ::{Configuration, ExitHandler, PanicHandler, StartHandler};
use coco::deque::{self, Worker, Stealer};
use job::{JobRef, StackJob};
use job::{Job, JobRef, StackJob};
#[cfg(rayon_unstable)]
use internal::task::Task;
use latch::{LatchProbe, Latch, CountLatch, LockLatch};
#[allow(unused_imports)]
use log::Event::*;
Expand Down Expand Up @@ -237,8 +239,8 @@ impl Registry {
/// worker thread for the registry, this will push onto the
/// deque. Else, it will inject from the outside (which is slower).
pub fn inject_or_push(&self, job_ref: JobRef) {
let worker_thread = WorkerThread::current();
unsafe {
let worker_thread = WorkerThread::current();
if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() {
(*worker_thread).push(job_ref);
} else {
Expand All @@ -247,9 +249,57 @@ impl Registry {
}
}

/// Unsafe: caller asserts that injected jobs will remain valid
/// until they are executed.
pub unsafe fn inject(&self, injected_jobs: &[JobRef]) {
/// Unsafe: the caller must guarantee that `task` will stay valid
/// until it executes.
#[cfg(rayon_unstable)]
pub unsafe fn submit_task<T>(&self, task: Arc<T>)
where T: Task
{
let task_job = TaskJob::new(task);
let task_job_ref = TaskJob::into_job_ref(task_job);
return self.inject_or_push(task_job_ref);

/// A little newtype wrapper for `T`, just because I did not
/// want to implement `Job` for all `T: Task`.
#[allow(dead_code)]
struct TaskJob<T: Task> {
data: T
}

impl<T: Task> TaskJob<T> {
fn new(arc: Arc<T>) -> Arc<Self> {
// `TaskJob<T>` has the same layout as `T`, so we can safely
// tranmsute this `T` into a `TaskJob<T>`. This lets us write our
// impls of `Job` for `TaskJob<T>`, making them more restricted.
// Since `Job` is a private trait, this is not strictly necessary,
// I don't think, but makes me feel better.
unsafe { mem::transmute(arc) }
}

pub fn into_task(this: Arc<TaskJob<T>>) -> Arc<T> {
// Same logic as `new()`
unsafe { mem::transmute(this) }
}

unsafe fn into_job_ref(this: Arc<Self>) -> JobRef {
let this: *const Self = mem::transmute(this);
JobRef::new(this)
}
}

impl<T: Task> Job for TaskJob<T> {
unsafe fn execute(this: *const Self) {
let this: Arc<Self> = mem::transmute(this);
let task: Arc<T> = TaskJob::into_task(this);
Task::execute(task);
}
}
}

/// Push a job into the "external jobs" queue; it will be taken by
/// whatever worker has nothing to do. Use this is you know that
/// you are not on a worker of this registry.
pub fn inject(&self, injected_jobs: &[JobRef]) {
log!(InjectJobs { count: injected_jobs.len() });
{
let state = self.state.lock().unwrap();
Expand Down Expand Up @@ -612,3 +662,5 @@ unsafe fn in_worker_cold<OP, R>(op: OP) -> R
job.latch.wait();
job.into_result()
}


61 changes: 61 additions & 0 deletions rayon-core/src/scope/internal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#[cfg(rayon_unstable)]
use internal::task::{ScopeHandle, ToScopeHandle, Task};
use std::any::Any;
use std::mem;
use std::sync::Arc;
use super::Scope;

#[cfg(rayon_unstable)]
impl<'scope> ToScopeHandle<'scope> for Scope<'scope> {
type ScopeHandle = LocalScopeHandle<'scope>;

fn to_scope_handle(&self) -> Self::ScopeHandle {
unsafe { LocalScopeHandle::new(self) }
}
}

pub struct LocalScopeHandle<'scope> {
scope: *const Scope<'scope>
}

impl<'scope> LocalScopeHandle<'scope> {
/// Caller guarantees that `*scope` will remain valid
/// until the scope completes. Since we acquire a ref,
/// that means it will remain valid until we release it.
unsafe fn new(scope: &Scope<'scope>) -> Self {
scope.job_completed_latch.increment();
LocalScopeHandle { scope: scope }
}
}

impl<'scope> Drop for LocalScopeHandle<'scope> {
fn drop(&mut self) {
unsafe {
if !self.scope.is_null() {
(*self.scope).job_completed_ok();
}
}
}
}

/// We assert that the `Self` type remains valid until a
/// method is called, and that `'scope` will not end until
/// that point.
#[cfg(rayon_unstable)]
unsafe impl<'scope> ScopeHandle<'scope> for LocalScopeHandle<'scope> {
unsafe fn spawn_task<T: Task + 'scope>(&self, task: Arc<T>) {
let scope = &*self.scope;
(*scope.owner_thread).registry().submit_task(task);
}

fn ok(self) {
mem::drop(self);
}

fn panicked(self, err: Box<Any + Send>) {
unsafe {
(*self.scope).job_panicked(err);
mem::forget(self); // no need to run dtor now
}
}
}
Loading