diff --git a/Cargo.toml b/Cargo.toml index 0fc9c3d60..207f2aeeb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,25 +1,25 @@ [package] -name = "rayon" +name = "rustc-rayon" # Reminder to update html_rool_url in lib.rs when updating version -version = "1.5.3" +version = "0.4.0" authors = ["Niko Matsakis ", "Josh Stone "] -description = "Simple work-stealing parallelism for Rust" +description = "Simple work-stealing parallelism for Rust - fork for rustc" edition = "2018" license = "MIT OR Apache-2.0" -repository = "https://github.com/rayon-rs/rayon" -documentation = "https://docs.rs/rayon/" +repository = "https://github.com/rust-lang/rustc-rayon" +documentation = "https://docs.rs/rustc-rayon/" readme = "README.md" keywords = ["parallel", "thread", "concurrency", "join", "performance"] categories = ["concurrency"] exclude = ["/ci/*", "/scripts/*", "/.github/*", "/bors.toml"] [workspace] -members = ["rayon-demo", "rayon-core"] +members = ["rayon-core"] exclude = ["ci"] [dependencies] -rayon-core = { version = "1.9.2", path = "rayon-core" } +rayon-core = { version = "0.4", path = "rayon-core", package = "rustc-rayon-core" } crossbeam-deque = "0.8.1" # This is a public dependency! diff --git a/README.md b/README.md index b8156db7c..6a0d54651 100644 --- a/README.md +++ b/README.md @@ -1,133 +1,10 @@ -# Rayon +# rustc-rayon -[![Rayon crate](https://img.shields.io/crates/v/rayon.svg)](https://crates.io/crates/rayon) -[![Rayon documentation](https://docs.rs/rayon/badge.svg)](https://docs.rs/rayon) -![minimum rustc 1.36](https://img.shields.io/badge/rustc-1.36+-red.svg) -[![build status](https://github.com/rayon-rs/rayon/workflows/master/badge.svg)](https://github.com/rayon-rs/rayon/actions) -[![Join the chat at https://gitter.im/rayon-rs/Lobby](https://badges.gitter.im/rayon-rs/Lobby.svg)](https://gitter.im/rayon-rs/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) - -Rayon is a data-parallelism library for Rust. It is extremely -lightweight and makes it easy to convert a sequential computation into -a parallel one. It also guarantees data-race freedom. (You may also -enjoy [this blog post][blog] about Rayon, which gives more background -and details about how it works, or [this video][video], from the Rust -Belt Rust conference.) Rayon is -[available on crates.io](https://crates.io/crates/rayon), and -[API Documentation is available on docs.rs](https://docs.rs/rayon/). - -[blog]: https://smallcultfollowing.com/babysteps/blog/2015/12/18/rayon-data-parallelism-in-rust/ -[video]: https://www.youtube.com/watch?v=gof_OEv71Aw - -## Parallel iterators and more - -Rayon makes it drop-dead simple to convert sequential iterators into -parallel ones: usually, you just change your `foo.iter()` call into -`foo.par_iter()`, and Rayon does the rest: - -```rust -use rayon::prelude::*; -fn sum_of_squares(input: &[i32]) -> i32 { - input.par_iter() // <-- just change that! - .map(|&i| i * i) - .sum() -} -``` - -[Parallel iterators] take care of deciding how to divide your data -into tasks; it will dynamically adapt for maximum performance. If you -need more flexibility than that, Rayon also offers the [join] and -[scope] functions, which let you create parallel tasks on your own. -For even more control, you can create [custom threadpools] rather than -using Rayon's default, global threadpool. - -[Parallel iterators]: https://docs.rs/rayon/*/rayon/iter/index.html -[join]: https://docs.rs/rayon/*/rayon/fn.join.html -[scope]: https://docs.rs/rayon/*/rayon/fn.scope.html -[custom threadpools]: https://docs.rs/rayon/*/rayon/struct.ThreadPool.html - -## No data races - -You may have heard that parallel execution can produce all kinds of -crazy bugs. Well, rest easy. Rayon's APIs all guarantee **data-race -freedom**, which generally rules out most parallel bugs (though not -all). In other words, **if your code compiles**, it typically does the -same thing it did before. - -For the most, parallel iterators in particular are guaranteed to -produce the same results as their sequential counterparts. One caveat: -If your iterator has side effects (for example, sending methods to -other threads through a [Rust channel] or writing to disk), those side -effects may occur in a different order. Note also that, in some cases, -parallel iterators offer alternative versions of the sequential -iterator methods that can have higher performance. - -[Rust channel]: https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html - -## Using Rayon - -[Rayon is available on crates.io](https://crates.io/crates/rayon). The -recommended way to use it is to add a line into your Cargo.toml such -as: - -```toml -[dependencies] -rayon = "1.5" -``` - -To use the Parallel Iterator APIs, a number of traits have to be in -scope. The easiest way to bring those things into scope is to use the -[Rayon prelude](https://docs.rs/rayon/*/rayon/prelude/index.html). In -each module where you would like to use the parallel iterator APIs, -just add: - -```rust -use rayon::prelude::*; -``` - -Rayon currently requires `rustc 1.36.0` or greater. - -### Usage with WebAssembly - -Rayon can work on the Web via WebAssembly, but requires an adapter -and some project configuration to account for differences between -WebAssembly threads and threads on the other platforms. - -Check out [wasm-bindgen-rayon](https://github.com/GoogleChromeLabs/wasm-bindgen-rayon) -docs for more details. - -## Contribution - -Rayon is an open source project! If you'd like to contribute to Rayon, check out [the list of "help wanted" issues](https://github.com/rayon-rs/rayon/issues?q=is%3Aissue+is%3Aopen+label%3A%22help+wanted%22). These are all (or should be) issues that are suitable for getting started, and they generally include a detailed set of instructions for what to do. Please ask questions if anything is unclear! Also, check out the [Guide to Development](https://github.com/rayon-rs/rayon/wiki/Guide-to-Development) page on the wiki. Note that all code submitted in PRs to Rayon is assumed to [be licensed under Rayon's dual MIT/Apache2 licensing](https://github.com/rayon-rs/rayon/blob/master/README.md#license). - -## Quick demo - -To see Rayon in action, check out the `rayon-demo` directory, which -includes a number of demos of code using Rayon. For example, run this -command to get a visualization of an nbody simulation. To see the -effect of using Rayon, press `s` to run sequentially and `p` to run in -parallel. - -```text -> cd rayon-demo -> cargo run --release -- nbody visualize -``` - -For more information on demos, try: - -```text -> cd rayon-demo -> cargo run --release -- --help -``` - -## Other questions? - -See [the Rayon FAQ][faq]. - -[faq]: https://github.com/rayon-rs/rayon/blob/master/FAQ.md +rustc-rayon is a fork of [the Rayon crate](https://github.com/rayon-rs/rayon/). It adds a few "in progress" features that rustc is using, mostly around deadlock detection. These features are not stable and should not be used by others -- though they may find their way into rayon proper at some point. In general, if you are not rustc, you should be using the real rayon crate, not rustc-rayon. =) ## License -Rayon is distributed under the terms of both the MIT license and the +rustc-rayon is a fork of rayon. rayon is distributed under the terms of both the MIT license and the Apache License (Version 2.0). See [LICENSE-APACHE](LICENSE-APACHE) and [LICENSE-MIT](LICENSE-MIT) for details. Opening a pull requests is assumed to signal agreement with these licensing terms. diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index db0bb48e9..96cf5897c 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -1,14 +1,13 @@ [package] -name = "rayon-core" -version = "1.9.3" # reminder to update html_root_url attribute +name = "rustc-rayon-core" +version = "0.4.1" # reminder to update html_root_url attribute authors = ["Niko Matsakis ", "Josh Stone "] -description = "Core APIs for Rayon" +description = "Core APIs for Rayon - fork for rustc" license = "MIT OR Apache-2.0" -repository = "https://github.com/rayon-rs/rayon" -documentation = "https://docs.rs/rayon/" +repository = "https://github.com/rust-lang/rustc-rayon" +documentation = "https://docs.rs/rustc-rayon-core/" edition = "2018" -links = "rayon-core" build = "build.rs" readme = "README.md" keywords = ["parallel", "thread", "concurrency", "join", "performance"] diff --git a/rayon-core/README.md b/rayon-core/README.md index 0c4936267..94c667017 100644 --- a/rayon-core/README.md +++ b/rayon-core/README.md @@ -1,3 +1,5 @@ +Note: This is an unstable fork made for use in rustc + Rayon-core represents the "core, stable" APIs of Rayon: join, scope, and so forth, as well as the ability to create custom thread-pools with ThreadPool. Maybe worth mentioning: users are not necessarily intended to directly access rayon-core; all its APIs are mirror in the rayon crate. To that end, the examples in the docs use rayon::join and so forth rather than rayon_core::join. diff --git a/rayon-core/src/job.rs b/rayon-core/src/job.rs index a71f1b0e9..0ee0f323f 100644 --- a/rayon-core/src/job.rs +++ b/rayon-core/src/job.rs @@ -1,4 +1,6 @@ use crate::latch::Latch; +use crate::tlv; +use crate::tlv::Tlv; use crate::unwind; use crossbeam_deque::{Injector, Steal}; use std::any::Any; @@ -73,6 +75,7 @@ where pub(super) latch: L, func: UnsafeCell>, result: UnsafeCell>, + tlv: Tlv, } impl StackJob @@ -81,11 +84,12 @@ where F: FnOnce(bool) -> R + Send, R: Send, { - pub(super) fn new(func: F, latch: L) -> StackJob { + pub(super) fn new(tlv: Tlv, func: F, latch: L) -> StackJob { StackJob { latch, func: UnsafeCell::new(Some(func)), result: UnsafeCell::new(JobResult::None), + tlv, } } @@ -114,6 +118,7 @@ where } let this = &*this; + tlv::set(this.tlv); let abort = unwind::AbortIfPanic; let func = (*this.func.get()).take().unwrap(); (*this.result.get()) = match unwind::halt_unwinding(call(func)) { @@ -136,15 +141,17 @@ where BODY: FnOnce() + Send, { job: UnsafeCell>, + tlv: Tlv, } impl HeapJob where BODY: FnOnce() + Send, { - pub(super) fn new(func: BODY) -> Self { + pub(super) fn new(tlv: Tlv, func: BODY) -> Self { HeapJob { job: UnsafeCell::new(Some(func)), + tlv, } } @@ -163,6 +170,7 @@ where { unsafe fn execute(this: *const Self) { let this: Box = mem::transmute(this); + tlv::set(this.tlv); let job = (*this.job.get()).take().unwrap(); job(); } diff --git a/rayon-core/src/join/mod.rs b/rayon-core/src/join/mod.rs index d72c7e61c..2a842be80 100644 --- a/rayon-core/src/join/mod.rs +++ b/rayon-core/src/join/mod.rs @@ -1,6 +1,7 @@ use crate::job::StackJob; use crate::latch::SpinLatch; use crate::registry::{self, WorkerThread}; +use crate::tlv::{self, Tlv}; use crate::unwind; use std::any::Any; @@ -130,10 +131,11 @@ where } registry::in_worker(|worker_thread, injected| unsafe { + let tlv = tlv::get(); // Create virtual wrapper for task b; this all has to be // done here so that the stack frame can keep it all live // long enough. - let job_b = StackJob::new(call_b(oper_b), SpinLatch::new(worker_thread)); + let job_b = StackJob::new(tlv, call_b(oper_b), SpinLatch::new(worker_thread)); let job_b_ref = job_b.as_job_ref(); worker_thread.push(job_b_ref); @@ -141,7 +143,7 @@ where let status_a = unwind::halt_unwinding(call_a(oper_a, injected)); let result_a = match status_a { Ok(v) => v, - Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err), + Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err, tlv), }; // Now that task A has finished, try to pop job B from the @@ -155,6 +157,10 @@ where // Found it! Let's run it. // // Note that this could panic, but it's ok if we unwind here. + + // Restore the TLV since we might have run some jobs overwriting it when waiting for job b. + tlv::set(tlv); + let result_b = job_b.run_inline(injected); return (result_a, result_b); } else { @@ -169,6 +175,9 @@ where } } + // Restore the TLV since we might have run some jobs overwriting it when waiting for job b. + tlv::set(tlv); + (result_a, job_b.into_result()) }) } @@ -181,7 +190,12 @@ unsafe fn join_recover_from_panic( worker_thread: &WorkerThread, job_b_latch: &SpinLatch<'_>, err: Box, + tlv: Tlv, ) -> ! { worker_thread.wait_until(job_b_latch); + + // Restore the TLV since we might have run some jobs overwriting it when waiting for job b. + tlv::set(tlv); + unwind::resume_unwinding(err) } diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 246b80070..3cf41a212 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -45,9 +45,6 @@ //! succeed. #![doc(html_root_url = "https://docs.rs/rayon-core/1.9")] -#![deny(missing_debug_implementations)] -#![deny(missing_docs)] -#![deny(unreachable_pub)] #![warn(rust_2018_idioms)] use std::any::Any; @@ -72,18 +69,23 @@ mod sleep; mod spawn; mod thread_pool; mod unwind; +mod worker_local; mod compile_fail; mod test; +pub mod tlv; + pub use self::join::{join, join_context}; pub use self::registry::ThreadBuilder; +pub use self::registry::{mark_blocked, mark_unblocked, Registry}; pub use self::scope::{in_place_scope, scope, Scope}; pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo}; pub use self::spawn::{spawn, spawn_fifo}; pub use self::thread_pool::current_thread_has_pending_tasks; pub use self::thread_pool::current_thread_index; pub use self::thread_pool::ThreadPool; +pub use worker_local::WorkerLocal; use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn}; @@ -166,6 +168,9 @@ pub struct ThreadPoolBuilder { /// The stack size for the created worker threads stack_size: Option, + /// Closure invoked on deadlock. + deadlock_handler: Option>, + /// Closure invoked on worker thread start. start_handler: Option>, @@ -175,6 +180,12 @@ pub struct ThreadPoolBuilder { /// Closure invoked to spawn threads. spawn_handler: S, + /// Closure invoked when starting computations in a thread. + acquire_thread_handler: Option>, + + /// Closure invoked when blocking in a thread. + release_thread_handler: Option>, + /// If false, worker threads will execute spawned jobs in a /// "depth-first" fashion. If true, they will do a "breadth-first" /// fashion. Depth-first is the default. @@ -193,6 +204,9 @@ pub struct Configuration { /// may be invoked multiple times in parallel. type PanicHandler = dyn Fn(Box) + Send + Sync; +/// The type for a closure that gets invoked when the Rayon thread pool deadlocks +type DeadlockHandler = dyn Fn() + Send + Sync; + /// The type for a closure that gets invoked when a thread starts. The /// closure is passed the index of the thread on which it is invoked. /// Note that this same closure may be invoked multiple times in parallel. @@ -213,12 +227,23 @@ impl Default for ThreadPoolBuilder { stack_size: None, start_handler: None, exit_handler: None, + deadlock_handler: None, + acquire_thread_handler: None, + release_thread_handler: None, spawn_handler: DefaultSpawn, breadth_first: false, } } } +/// The type for a closure that gets invoked before starting computations in a thread. +/// Note that this same closure may be invoked multiple times in parallel. +type AcquireThreadHandler = dyn Fn() + Send + Sync; + +/// The type for a closure that gets invoked before blocking in a thread. +/// Note that this same closure may be invoked multiple times in parallel. +type ReleaseThreadHandler = dyn Fn() + Send + Sync; + impl ThreadPoolBuilder { /// Creates and returns a valid rayon thread pool builder, but does not initialize it. pub fn new() -> Self { @@ -319,7 +344,12 @@ impl ThreadPoolBuilder { Ok(()) }) .build()?; - Ok(with_pool(&pool)) + let result = unwind::halt_unwinding(|| with_pool(&pool)); + pool.wait_until_stopped(); + match result { + Ok(result) => Ok(result), + Err(err) => unwind::resume_unwinding(err), + } }); match result { @@ -397,6 +427,9 @@ impl ThreadPoolBuilder { stack_size: self.stack_size, start_handler: self.start_handler, exit_handler: self.exit_handler, + deadlock_handler: self.deadlock_handler, + acquire_thread_handler: self.acquire_thread_handler, + release_thread_handler: self.release_thread_handler, breadth_first: self.breadth_first, } } @@ -555,6 +588,48 @@ impl ThreadPoolBuilder { self.breadth_first } + /// Takes the current acquire thread callback, leaving `None`. + fn take_acquire_thread_handler(&mut self) -> Option> { + self.acquire_thread_handler.take() + } + + /// Set a callback to be invoked when starting computations in a thread. + pub fn acquire_thread_handler(mut self, acquire_thread_handler: H) -> Self + where + H: Fn() + Send + Sync + 'static, + { + self.acquire_thread_handler = Some(Box::new(acquire_thread_handler)); + self + } + + /// Takes the current release thread callback, leaving `None`. + fn take_release_thread_handler(&mut self) -> Option> { + self.release_thread_handler.take() + } + + /// Set a callback to be invoked when blocking in thread. + pub fn release_thread_handler(mut self, release_thread_handler: H) -> Self + where + H: Fn() + Send + Sync + 'static, + { + self.release_thread_handler = Some(Box::new(release_thread_handler)); + self + } + + /// Takes the current deadlock callback, leaving `None`. + fn take_deadlock_handler(&mut self) -> Option> { + self.deadlock_handler.take() + } + + /// Set a callback to be invoked on current deadlock. + pub fn deadlock_handler(mut self, deadlock_handler: H) -> Self + where + H: Fn() + Send + Sync + 'static, + { + self.deadlock_handler = Some(Box::new(deadlock_handler)); + self + } + /// Takes the current thread start callback, leaving `None`. fn take_start_handler(&mut self) -> Option> { self.start_handler.take() @@ -717,8 +792,11 @@ impl fmt::Debug for ThreadPoolBuilder { ref get_thread_name, ref panic_handler, ref stack_size, + ref deadlock_handler, ref start_handler, ref exit_handler, + ref acquire_thread_handler, + ref release_thread_handler, spawn_handler: _, ref breadth_first, } = *self; @@ -733,16 +811,22 @@ impl fmt::Debug for ThreadPoolBuilder { } let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder); let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder); + let deadlock_handler = deadlock_handler.as_ref().map(|_| ClosurePlaceholder); let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder); let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder); + let acquire_thread_handler = acquire_thread_handler.as_ref().map(|_| ClosurePlaceholder); + let release_thread_handler = release_thread_handler.as_ref().map(|_| ClosurePlaceholder); f.debug_struct("ThreadPoolBuilder") .field("num_threads", num_threads) .field("get_thread_name", &get_thread_name) .field("panic_handler", &panic_handler) .field("stack_size", &stack_size) + .field("deadlock_handler", &deadlock_handler) .field("start_handler", &start_handler) .field("exit_handler", &exit_handler) + .field("acquire_thread_handler", &acquire_thread_handler) + .field("release_thread_handler", &release_thread_handler) .field("breadth_first", &breadth_first) .finish() } diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 7405fe8e7..6c8fefd35 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -3,9 +3,11 @@ use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLat use crate::log::Event::*; use crate::log::Logger; use crate::sleep::Sleep; +use crate::tlv::Tlv; use crate::unwind; use crate::{ - ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, + AcquireThreadHandler, DeadlockHandler, ErrorKind, ExitHandler, PanicHandler, + ReleaseThreadHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder, }; use crossbeam_deque::{Injector, Steal, Stealer, Worker}; use std::any::Any; @@ -130,14 +132,17 @@ where } } -pub(super) struct Registry { +pub struct Registry { logger: Logger, thread_infos: Vec, sleep: Sleep, injected_jobs: Injector, panic_handler: Option>, + pub(crate) deadlock_handler: Option>, start_handler: Option>, exit_handler: Option>, + pub(crate) acquire_thread_handler: Option>, + pub(crate) release_thread_handler: Option>, // When this latch reaches 0, it means that all work on this // registry must be complete. This is ensured in the following ways: @@ -240,8 +245,11 @@ impl Registry { injected_jobs: Injector::new(), terminate_count: AtomicUsize::new(1), panic_handler: builder.take_panic_handler(), + deadlock_handler: builder.take_deadlock_handler(), start_handler: builder.take_start_handler(), exit_handler: builder.take_exit_handler(), + acquire_thread_handler: builder.take_acquire_thread_handler(), + release_thread_handler: builder.take_release_thread_handler(), }); // If we return early or panic, make sure to terminate existing threads. @@ -266,7 +274,7 @@ impl Registry { Ok(registry) } - pub(super) fn current() -> Arc { + pub fn current() -> Arc { unsafe { let worker_thread = WorkerThread::current(); let registry = if worker_thread.is_null() { @@ -350,11 +358,24 @@ impl Registry { /// Waits for the worker threads to stop. This is used for testing /// -- so we can check that termination actually works. - #[cfg(test)] pub(super) fn wait_until_stopped(&self) { + self.release_thread(); for info in &self.thread_infos { info.stopped.wait(); } + self.acquire_thread(); + } + + pub(crate) fn acquire_thread(&self) { + if let Some(ref acquire_thread_handler) = self.acquire_thread_handler { + acquire_thread_handler(); + } + } + + pub(crate) fn release_thread(&self) { + if let Some(ref release_thread_handler) = self.release_thread_handler { + release_thread_handler(); + } } /// //////////////////////////////////////////////////////////////////////// @@ -406,7 +427,7 @@ impl Registry { .new_injected_jobs(usize::MAX, injected_jobs.len() as u32, queue_was_empty); } - fn has_injected_job(&self) -> bool { + pub(crate) fn has_injected_job(&self) -> bool { !self.injected_jobs.is_empty() } @@ -462,6 +483,7 @@ impl Registry { // This thread isn't a member of *any* thread pool, so just block. debug_assert!(WorkerThread::current().is_null()); let job = StackJob::new( + Tlv::null(), |injected| { let worker_thread = WorkerThread::current(); assert!(injected && !worker_thread.is_null()); @@ -470,7 +492,9 @@ impl Registry { l, ); self.inject(&[job.as_job_ref()]); + self.release_thread(); job.latch.wait_and_reset(); // Make sure we can use the same latch again next time. + self.acquire_thread(); // flush accumulated logs as we exit the thread self.logger.log(|| Flush); @@ -490,6 +514,7 @@ impl Registry { debug_assert!(current_thread.registry().id() != self.id()); let latch = SpinLatch::cross(current_thread); let job = StackJob::new( + Tlv::null(), |injected| { let worker_thread = WorkerThread::current(); assert!(injected && !worker_thread.is_null()); @@ -548,6 +573,24 @@ impl Registry { } } +/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler +/// if no other worker thread is active +#[inline] +pub fn mark_blocked() { + let worker_thread = WorkerThread::current(); + assert!(!worker_thread.is_null()); + unsafe { + let registry = &(*worker_thread).registry; + registry.sleep.mark_blocked(®istry.deadlock_handler) + } +} + +/// Mark a previously blocked Rayon worker thread as unblocked +#[inline] +pub fn mark_unblocked(registry: &Registry) { + registry.sleep.mark_unblocked() +} + #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub(super) struct RegistryId { addr: usize, @@ -597,12 +640,12 @@ pub(super) struct WorkerThread { /// local queue used for `spawn_fifo` indirection fifo: JobFifo, - index: usize, + pub(crate) index: usize, /// A weak random number generator. rng: XorShift64Star, - registry: Arc, + pub(crate) registry: Arc, } // This is a bit sketchy, but basically: the WorkerThread is @@ -731,7 +774,7 @@ impl WorkerThread { } else { self.registry .sleep - .no_work_found(&mut idle_state, latch, || self.registry.has_injected_job()) + .no_work_found(&mut idle_state, latch, &self.registry) } } @@ -833,6 +876,7 @@ unsafe fn main_loop(worker: Worker, registry: Arc, index: usiz worker: index, terminate_addr: my_terminate_latch.as_core_latch().addr(), }); + registry.acquire_thread(); worker_thread.wait_until(my_terminate_latch); // Should not be any work left in our queue. @@ -856,6 +900,8 @@ unsafe fn main_loop(worker: Worker, registry: Arc, index: usiz } // We're already exiting the thread, there's nothing else to do. } + + registry.release_thread(); } /// If already in a worker-thread, just execute `op`. Otherwise, diff --git a/rayon-core/src/scope/mod.rs b/rayon-core/src/scope/mod.rs index f8d90ce20..bd99d701a 100644 --- a/rayon-core/src/scope/mod.rs +++ b/rayon-core/src/scope/mod.rs @@ -8,6 +8,7 @@ use crate::job::{HeapJob, JobFifo}; use crate::latch::{CountLatch, CountLockLatch, Latch}; use crate::registry::{global_registry, in_worker, Registry, WorkerThread}; +use crate::tlv::{self, Tlv}; use crate::unwind; use std::any::Any; use std::fmt; @@ -75,6 +76,9 @@ struct ScopeBase<'scope> { /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because /// the closures are only *moved* across threads to be executed. marker: PhantomData) + Send + Sync + 'scope>>, + + /// The TLV at the scope's creation. Used to set the TLV for spawned jobs. + tlv: Tlv, } /// Creates a "fork-join" scope `s` and invokes the closure with a @@ -540,7 +544,7 @@ impl<'scope> Scope<'scope> { { self.base.increment(); unsafe { - let job_ref = Box::new(HeapJob::new(move || { + let job_ref = Box::new(HeapJob::new(self.base.tlv, move || { self.base.execute_job(move || body(self)) })) .as_job_ref(); @@ -581,7 +585,7 @@ impl<'scope> ScopeFifo<'scope> { { self.base.increment(); unsafe { - let job_ref = Box::new(HeapJob::new(move || { + let job_ref = Box::new(HeapJob::new(self.base.tlv, move || { self.base.execute_job(move || body(self)) })) .as_job_ref(); @@ -612,6 +616,7 @@ impl<'scope> ScopeBase<'scope> { panic: AtomicPtr::new(ptr::null_mut()), job_completed_latch: ScopeLatch::new(owner), marker: PhantomData, + tlv: tlv::get(), } } @@ -627,6 +632,10 @@ impl<'scope> ScopeBase<'scope> { { let result = self.execute_job_closure(func); self.job_completed_latch.wait(owner); + + // Restore the TLV if we ran some jobs while waiting + tlv::set(self.tlv); + self.maybe_propagate_panic(); result.unwrap() // only None if `op` panicked, and that would have been propagated } @@ -680,6 +689,10 @@ impl<'scope> ScopeBase<'scope> { let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed); if !panic.is_null() { let value = unsafe { Box::from_raw(panic) }; + + // Restore the TLV if we ran some jobs while waiting + tlv::set(self.tlv); + unwind::resume_unwinding(*value); } } diff --git a/rayon-core/src/sleep/README.md b/rayon-core/src/sleep/README.md index c62c3975d..bcd74af97 100644 --- a/rayon-core/src/sleep/README.md +++ b/rayon-core/src/sleep/README.md @@ -216,4 +216,37 @@ Meanwhile, the sleepy thread does the following: Either PushFence or SleepFence must come first: * If PushFence comes first, then PushJob must be visible to ReadJob. -* If SleepFence comes first, then IncSleepers is visible to ReadSleepers. \ No newline at end of file +* If SleepFence comes first, then IncSleepers is visible to ReadSleepers. + +# Deadlock detection + +This module tracks a number of variables in order to detect deadlocks due to user code blocking. +These variables are stored in the `SleepData` struct which itself is kept behind a mutex. +It contains the following fields: +- `worker_count` - The number of threads in the thread pool. +- `active_threads` - The number of threads in the thread pool which are running + and aren't blocked in user code or sleeping. +- `blocked_threads` - The number of threads which are blocked in user code. + This doesn't include threads blocked by Rayon. + +User code can indicate blocking by calling `mark_blocked` before blocking and +calling `mark_unblocked` before unblocking a thread. +This will adjust `active_threads` and `blocked_threads` accordingly. + +When we tickle the thread pool in `Sleep::tickle_cold`, we set `active_threads` to +`worker_count` - `blocked_threads` since we wake up all Rayon threads, but not thread blocked +by user code. + +A deadlock is detected by checking if `active_threads` is 0 and `blocked_threads` is above 0. +If we ignored `blocked_threads` we would have a deadlock +immediately when creating the thread pool. +We would also deadlock once the thread pool ran out of work. +It is not possible for Rayon itself to deadlock. +Deadlocks can only be caused by user code blocking, so this condition doesn't miss any deadlocks. + +We check for the deadlock condition when +threads fall asleep in `mark_unblocked` and in `Sleep::sleep`. +If there's a deadlock detected we call the user provided deadlock handler while we hold the +lock to `SleepData`. This means the deadlock handler cannot call `mark_blocked` and +`mark_unblocked`. The user is expected to handle the deadlock in some non-Rayon thread. +Once the deadlock handler returns, the thread which called the deadlock handler will go to sleep. diff --git a/rayon-core/src/sleep/mod.rs b/rayon-core/src/sleep/mod.rs index 2c4ac7c28..fb113fac0 100644 --- a/rayon-core/src/sleep/mod.rs +++ b/rayon-core/src/sleep/mod.rs @@ -4,6 +4,8 @@ use crate::latch::CoreLatch; use crate::log::Event::*; use crate::log::Logger; +use crate::registry::Registry; +use crate::DeadlockHandler; use crossbeam_utils::CachePadded; use std::sync::atomic::Ordering; use std::sync::{Condvar, Mutex}; @@ -14,6 +16,29 @@ mod counters; pub(crate) use self::counters::THREADS_MAX; use self::counters::{AtomicCounters, JobsEventCounter}; +struct SleepData { + /// The number of threads in the thread pool. + worker_count: usize, + + /// The number of threads in the thread pool which are running and + /// aren't blocked in user code or sleeping. + active_threads: usize, + + /// The number of threads which are blocked in user code. + /// This doesn't include threads blocked by this module. + blocked_threads: usize, +} + +impl SleepData { + /// Checks if the conditions for a deadlock holds and if so calls the deadlock handler + #[inline] + pub fn deadlock_check(&self, deadlock_handler: &Option>) { + if self.active_threads == 0 && self.blocked_threads > 0 { + (deadlock_handler.as_ref().unwrap())(); + } + } +} + /// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping /// of workers. It has callbacks that are invoked periodically at significant events, /// such as when workers are looping and looking for work, when latches are set, or when @@ -29,6 +54,8 @@ pub(super) struct Sleep { worker_sleep_states: Vec>, counters: AtomicCounters, + + data: Mutex, } /// An instance of this struct is created when a thread becomes idle. @@ -68,9 +95,38 @@ impl Sleep { logger, worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(), counters: AtomicCounters::new(), + data: Mutex::new(SleepData { + worker_count: n_threads, + active_threads: n_threads, + blocked_threads: 0, + }), } } + /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler + /// if no other worker thread is active + #[inline] + pub fn mark_blocked(&self, deadlock_handler: &Option>) { + let mut data = self.data.lock().unwrap(); + debug_assert!(data.active_threads > 0); + debug_assert!(data.blocked_threads < data.worker_count); + debug_assert!(data.active_threads > 0); + data.active_threads -= 1; + data.blocked_threads += 1; + + data.deadlock_check(deadlock_handler); + } + + /// Mark a previously blocked Rayon worker thread as unblocked + #[inline] + pub fn mark_unblocked(&self) { + let mut data = self.data.lock().unwrap(); + debug_assert!(data.active_threads < data.worker_count); + debug_assert!(data.blocked_threads > 0); + data.active_threads += 1; + data.blocked_threads -= 1; + } + #[inline] pub(super) fn start_looking(&self, worker_index: usize, latch: &CoreLatch) -> IdleState { self.logger.log(|| ThreadIdle { @@ -105,7 +161,7 @@ impl Sleep { &self, idle_state: &mut IdleState, latch: &CoreLatch, - has_injected_jobs: impl FnOnce() -> bool, + registry: &Registry, ) { if idle_state.rounds < ROUNDS_UNTIL_SLEEPY { thread::yield_now(); @@ -119,7 +175,7 @@ impl Sleep { thread::yield_now(); } else { debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING); - self.sleep(idle_state, latch, has_injected_jobs); + self.sleep(idle_state, latch, registry); } } @@ -137,12 +193,7 @@ impl Sleep { } #[cold] - fn sleep( - &self, - idle_state: &mut IdleState, - latch: &CoreLatch, - has_injected_jobs: impl FnOnce() -> bool, - ) { + fn sleep(&self, idle_state: &mut IdleState, latch: &CoreLatch, registry: &Registry) { let worker_index = idle_state.worker_index; if !latch.get_sleepy() { @@ -209,12 +260,19 @@ impl Sleep { // - that job triggers the rollover over the JEC such that we don't see it // - we are the last active worker thread std::sync::atomic::fence(Ordering::SeqCst); - if has_injected_jobs() { + if registry.has_injected_job() { // If we see an externally injected job, then we have to 'wake // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by // the one that wakes us.) self.counters.sub_sleeping_thread(); } else { + { + // Decrement the number of active threads and check for a deadlock + let mut data = self.data.lock().unwrap(); + data.active_threads -= 1; + data.deadlock_check(®istry.deadlock_handler); + } + // If we don't see an injected job (the normal case), then flag // ourselves as asleep and wait till we are notified. // @@ -223,10 +281,12 @@ impl Sleep { // that whomever is coming to wake us will have to wait until we // release the mutex in the call to `wait`, so they will see this // boolean as true.) + registry.release_thread(); *is_blocked = true; while *is_blocked { is_blocked = sleep_state.condvar.wait(is_blocked).unwrap(); } + registry.acquire_thread(); } // Update other state: @@ -372,6 +432,9 @@ impl Sleep { // do. self.counters.sub_sleeping_thread(); + // Increment the number of active threads + self.data.lock().unwrap().active_threads += 1; + self.logger.log(|| ThreadNotify { worker: index }); true diff --git a/rayon-core/src/spawn/mod.rs b/rayon-core/src/spawn/mod.rs index dfa47818e..8c25b379c 100644 --- a/rayon-core/src/spawn/mod.rs +++ b/rayon-core/src/spawn/mod.rs @@ -1,5 +1,6 @@ use crate::job::*; use crate::registry::Registry; +use crate::tlv::Tlv; use crate::unwind; use std::mem; use std::sync::Arc; @@ -91,7 +92,7 @@ where // executed. This ref is decremented at the (*) below. registry.increment_terminate_count(); - Box::new(HeapJob::new({ + Box::new(HeapJob::new(Tlv::null(), { let registry = Arc::clone(registry); move || { match unwind::halt_unwinding(func) { diff --git a/rayon-core/src/thread_pool/mod.rs b/rayon-core/src/thread_pool/mod.rs index 5edaedc37..0791fc4e9 100644 --- a/rayon-core/src/thread_pool/mod.rs +++ b/rayon-core/src/thread_pool/mod.rs @@ -277,6 +277,12 @@ impl ThreadPool { // We assert that `self.registry` has not terminated. unsafe { spawn::spawn_fifo_in(op, &self.registry) } } + + pub(crate) fn wait_until_stopped(self) { + let registry = self.registry.clone(); + drop(self); + registry.wait_until_stopped(); + } } impl Drop for ThreadPool { diff --git a/rayon-core/src/tlv.rs b/rayon-core/src/tlv.rs new file mode 100644 index 000000000..ce22f7aa0 --- /dev/null +++ b/rayon-core/src/tlv.rs @@ -0,0 +1,31 @@ +//! Allows access to the Rayon's thread local value +//! which is preserved when moving jobs across threads + +use std::{cell::Cell, ptr}; + +thread_local!(pub static TLV: Cell<*const ()> = const { Cell::new(ptr::null()) }); + +#[derive(Copy, Clone)] +pub(crate) struct Tlv(pub(crate) *const ()); + +impl Tlv { + #[inline] + pub(crate) fn null() -> Self { + Self(ptr::null()) + } +} + +unsafe impl Sync for Tlv {} +unsafe impl Send for Tlv {} + +/// Sets the current thread-local value +#[inline] +pub(crate) fn set(value: Tlv) { + TLV.with(|tlv| tlv.set(value.0)); +} + +/// Returns the current thread-local value +#[inline] +pub(crate) fn get() -> Tlv { + TLV.with(|tlv| Tlv(tlv.get())) +} diff --git a/rayon-core/src/worker_local.rs b/rayon-core/src/worker_local.rs new file mode 100644 index 000000000..85d51687c --- /dev/null +++ b/rayon-core/src/worker_local.rs @@ -0,0 +1,78 @@ +use crate::registry::{Registry, WorkerThread}; +use std::fmt; +use std::ops::Deref; +use std::sync::Arc; + +#[repr(align(64))] +#[derive(Debug)] +struct CacheAligned(T); + +/// Holds worker-locals values for each thread in a thread pool. +/// You can only access the worker local value through the Deref impl +/// on the thread pool it was constructed on. It will panic otherwise +pub struct WorkerLocal { + locals: Vec>, + registry: Arc, +} + +/// We prevent concurrent access to the underlying value in the +/// Deref impl, thus any values safe to send across threads can +/// be used with WorkerLocal. +unsafe impl Sync for WorkerLocal {} + +impl WorkerLocal { + /// Creates a new worker local where the `initial` closure computes the + /// value this worker local should take for each thread in the thread pool. + #[inline] + pub fn new T>(mut initial: F) -> WorkerLocal { + let registry = Registry::current(); + WorkerLocal { + locals: (0..registry.num_threads()) + .map(|i| CacheAligned(initial(i))) + .collect(), + registry, + } + } + + /// Returns the worker-local value for each thread + #[inline] + pub fn into_inner(self) -> Vec { + self.locals.into_iter().map(|c| c.0).collect() + } + + fn current(&self) -> &T { + unsafe { + let worker_thread = WorkerThread::current(); + if worker_thread.is_null() + || &*(*worker_thread).registry as *const _ != &*self.registry as *const _ + { + panic!("WorkerLocal can only be used on the thread pool it was created on") + } + &self.locals[(*worker_thread).index].0 + } + } +} + +impl WorkerLocal> { + /// Joins the elements of all the worker locals into one Vec + pub fn join(self) -> Vec { + self.into_inner().into_iter().flat_map(|v| v).collect() + } +} + +impl fmt::Debug for WorkerLocal { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WorkerLocal") + .field("registry", &self.registry.id()) + .finish() + } +} + +impl Deref for WorkerLocal { + type Target = T; + + #[inline(always)] + fn deref(&self) -> &T { + self.current() + } +} diff --git a/rayon-demo/Cargo.toml b/rayon-demo/Cargo.toml index 987eb3aa2..d08d8f783 100644 --- a/rayon-demo/Cargo.toml +++ b/rayon-demo/Cargo.toml @@ -6,7 +6,7 @@ authors = ["Niko Matsakis "] publish = false [dependencies] -rayon = { path = "../" } +rustc-rayon = { path = "../" } cgmath = "0.18" docopt = "1" fixedbitset = "0.4"