Skip to content

Commit

Permalink
Merge #1019
Browse files Browse the repository at this point in the history
1019: Add a fallback when threading is unsupported r=cuviper a=cuviper

Fixes #861.

Co-authored-by: Josh Stone <cuviper@gmail.com>
  • Loading branch information
bors[bot] and cuviper authored Feb 22, 2023
2 parents 94c16e0 + 26c249f commit 76a3030
Show file tree
Hide file tree
Showing 23 changed files with 249 additions and 23 deletions.
12 changes: 10 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,24 @@ jobs:
- run: cargo test --verbose --package rayon
- run: cargo test --verbose --package rayon-core

# wasm won't actually work without threading, but it builds
# wasm32-unknown-unknown builds, and even has the runtime fallback for
# unsupported threading, but we don't have an environment to execute in.
# wasm32-wasi can test the fallback by running in wasmtime.
wasm:
name: WebAssembly
runs-on: ubuntu-latest
env:
CARGO_TARGET_WASM32_WASI_RUNNER: /home/runner/.wasmtime/bin/wasmtime
steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@stable
with:
target: wasm32-unknown-unknown
targets: wasm32-unknown-unknown,wasm32-wasi
- run: cargo check --verbose --target wasm32-unknown-unknown
- run: cargo check --verbose --target wasm32-wasi
- run: curl https://wasmtime.dev/install.sh -sSf | bash
- run: cargo test --verbose --target wasm32-wasi --package rayon
- run: cargo test --verbose --target wasm32-wasi --package rayon-core

fmt:
name: Format
Expand Down
46 changes: 46 additions & 0 deletions rayon-core/src/broadcast/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ fn broadcast_global() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_global() {
let (tx, rx) = crossbeam_channel::unbounded();
crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
Expand All @@ -22,13 +23,15 @@ fn spawn_broadcast_global() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_pool() {
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
let v = pool.broadcast(|ctx| ctx.index());
assert!(v.into_iter().eq(0..7));
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_pool() {
let (tx, rx) = crossbeam_channel::unbounded();
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
Expand All @@ -40,13 +43,15 @@ fn spawn_broadcast_pool() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_self() {
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
let v = pool.install(|| crate::broadcast(|ctx| ctx.index()));
assert!(v.into_iter().eq(0..7));
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_self() {
let (tx, rx) = crossbeam_channel::unbounded();
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
Expand All @@ -58,6 +63,7 @@ fn spawn_broadcast_self() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_mutual() {
let count = AtomicUsize::new(0);
let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
Expand All @@ -73,6 +79,7 @@ fn broadcast_mutual() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_mutual() {
let (tx, rx) = crossbeam_channel::unbounded();
let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
Expand All @@ -90,6 +97,7 @@ fn spawn_broadcast_mutual() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_mutual_sleepy() {
let count = AtomicUsize::new(0);
let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
Expand All @@ -108,6 +116,7 @@ fn broadcast_mutual_sleepy() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_mutual_sleepy() {
let (tx, rx) = crossbeam_channel::unbounded();
let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
Expand All @@ -130,6 +139,7 @@ fn spawn_broadcast_mutual_sleepy() {
}

#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn broadcast_panic_one() {
let count = AtomicUsize::new(0);
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
Expand All @@ -146,6 +156,7 @@ fn broadcast_panic_one() {
}

#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn spawn_broadcast_panic_one() {
let (tx, rx) = crossbeam_channel::unbounded();
let (panic_tx, panic_rx) = crossbeam_channel::unbounded();
Expand All @@ -166,6 +177,7 @@ fn spawn_broadcast_panic_one() {
}

#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn broadcast_panic_many() {
let count = AtomicUsize::new(0);
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
Expand All @@ -182,6 +194,7 @@ fn broadcast_panic_many() {
}

#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn spawn_broadcast_panic_many() {
let (tx, rx) = crossbeam_channel::unbounded();
let (panic_tx, panic_rx) = crossbeam_channel::unbounded();
Expand All @@ -202,6 +215,7 @@ fn spawn_broadcast_panic_many() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_sleep_race() {
let test_duration = time::Duration::from_secs(1);
let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
Expand All @@ -214,3 +228,35 @@ fn broadcast_sleep_race() {
});
}
}

#[test]
fn broadcast_after_spawn_broadcast() {
let (tx, rx) = crossbeam_channel::unbounded();

// Queue a non-blocking spawn_broadcast.
crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());

// This blocking broadcast runs after all prior broadcasts.
crate::broadcast(|_| {});

// The spawn_broadcast **must** have run by now on all threads.
let mut v: Vec<_> = rx.try_iter().collect();
v.sort_unstable();
assert!(v.into_iter().eq(0..crate::current_num_threads()));
}

#[test]
fn broadcast_after_spawn() {
let (tx, rx) = crossbeam_channel::bounded(1);

// Queue a regular spawn on a thread-local deque.
crate::registry::in_worker(move |_, _| {
crate::spawn(move || tx.send(22).unwrap());
});

// Broadcast runs after the local deque is empty.
crate::broadcast(|_| {});

// The spawn **must** have run by now.
assert_eq!(22, rx.try_recv().unwrap());
}
6 changes: 6 additions & 0 deletions rayon-core/src/join/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ fn sort() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn sort_in_pool() {
let rng = seeded_rng();
let mut data: Vec<u32> = rng.sample_iter(&Standard).take(12 * 1024).collect();
Expand Down Expand Up @@ -77,6 +78,7 @@ fn panic_propagate_both() {
}

#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_b_still_executes() {
let mut x = false;
match unwind::halt_unwinding(|| join(|| panic!("Hello, world!"), || x = true)) {
Expand All @@ -86,6 +88,7 @@ fn panic_b_still_executes() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_context_both() {
// If we're not in a pool, both should be marked stolen as they're injected.
let (a_migrated, b_migrated) = join_context(|a| a.migrated(), |b| b.migrated());
Expand All @@ -94,6 +97,7 @@ fn join_context_both() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_context_neither() {
// If we're already in a 1-thread pool, neither job should be stolen.
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
Expand All @@ -104,6 +108,7 @@ fn join_context_neither() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_context_second() {
use std::sync::Barrier;

Expand All @@ -127,6 +132,7 @@ fn join_context_second() {
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn join_counter_overflow() {
const MAX: u32 = 500_000;

Expand Down
22 changes: 21 additions & 1 deletion rayon-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,23 @@
//! [`join()`]: struct.ThreadPool.html#method.join
//! [`ThreadPoolBuilder`]: struct.ThreadPoolBuilder.html
//!
//! ## Restricting multiple versions
//! # Global fallback when threading is unsupported
//!
//! Rayon uses `std` APIs for threading, but some targets have incomplete implementations that
//! always return `Unsupported` errors. The WebAssembly `wasm32-unknown-unknown` and `wasm32-wasi`
//! targets are notable examples of this. Rather than panicking on the unsupported error when
//! creating the implicit global threadpool, Rayon configures a fallback mode instead.
//!
//! This fallback mode mostly functions as if it were using a single-threaded "pool", like setting
//! `RAYON_NUM_THREADS=1`. For example, `join` will execute its two closures sequentially, since
//! there is no other thread to share the work. However, since the pool is not running independent
//! of the main thread, non-blocking calls like `spawn` may not execute at all, unless a lower-
//! priority call like `broadcast` gives them an opening. The fallback mode does not try to emulate
//! anything like thread preemption or `async` task switching.
//!
//! Explicit `ThreadPoolBuilder` methods always report their error without any fallback.
//!
//! # Restricting multiple versions
//!
//! In order to ensure proper coordination between threadpools, and especially
//! to make sure there's only one global threadpool, `rayon-core` is actively
Expand Down Expand Up @@ -707,6 +723,10 @@ impl ThreadPoolBuildError {
fn new(kind: ErrorKind) -> ThreadPoolBuildError {
ThreadPoolBuildError { kind }
}

fn is_unsupported(&self) -> bool {
matches!(&self.kind, ErrorKind::IOError(e) if e.kind() == io::ErrorKind::Unsupported)
}
}

const GLOBAL_POOL_ALREADY_INITIALIZED: &str =
Expand Down
76 changes: 59 additions & 17 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl ThreadBuilder {
/// Executes the main loop for this thread. This will not return until the
/// thread pool is dropped.
pub fn run(self) {
unsafe { main_loop(self.worker, self.stealer, self.registry, self.index) }
unsafe { main_loop(self) }
}
}

Expand Down Expand Up @@ -164,7 +164,7 @@ static THE_REGISTRY_SET: Once = Once::new();
/// initialization has not already occurred, use the default
/// configuration.
pub(super) fn global_registry() -> &'static Arc<Registry> {
set_global_registry(|| Registry::new(ThreadPoolBuilder::new()))
set_global_registry(default_global_registry)
.or_else(|err| unsafe { THE_REGISTRY.as_ref().ok_or(err) })
.expect("The global thread pool has not been initialized.")
}
Expand Down Expand Up @@ -198,6 +198,46 @@ where
result
}

fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> {
let result = Registry::new(ThreadPoolBuilder::new());

// If we're running in an environment that doesn't support threads at all, we can fall back to
// using the current thread alone. This is crude, and probably won't work for non-blocking
// calls like `spawn` or `broadcast_spawn`, but a lot of stuff does work fine.
//
// Notably, this allows current WebAssembly targets to work even though their threading support
// is stubbed out, and we won't have to change anything if they do add real threading.
let unsupported = matches!(&result, Err(e) if e.is_unsupported());
if unsupported && WorkerThread::current().is_null() {
let builder = ThreadPoolBuilder::new()
.num_threads(1)
.spawn_handler(|thread| {
// Rather than starting a new thread, we're just taking over the current thread
// *without* running the main loop, so we can still return from here.
// The WorkerThread is leaked, but we never shutdown the global pool anyway.
let worker_thread = Box::leak(Box::new(WorkerThread::from(thread)));
let registry = &*worker_thread.registry;
let index = worker_thread.index;

unsafe {
WorkerThread::set_current(worker_thread);

// let registry know we are ready to do work
Latch::set(&registry.thread_infos[index].primed);
}

Ok(())
});

let fallback_result = Registry::new(builder);
if fallback_result.is_ok() {
return fallback_result;
}
}

result
}

struct Terminator<'a>(&'a Arc<Registry>);

impl<'a> Drop for Terminator<'a> {
Expand Down Expand Up @@ -655,6 +695,19 @@ thread_local! {
static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null());
}

impl From<ThreadBuilder> for WorkerThread {
fn from(thread: ThreadBuilder) -> Self {
Self {
worker: thread.worker,
stealer: thread.stealer,
fifo: JobFifo::new(),
index: thread.index,
rng: XorShift64Star::new(),
registry: thread.registry,
}
}
}

impl Drop for WorkerThread {
fn drop(&mut self) {
// Undo `set_current`
Expand Down Expand Up @@ -851,22 +904,11 @@ impl WorkerThread {

/// ////////////////////////////////////////////////////////////////////////
unsafe fn main_loop(
worker: Worker<JobRef>,
stealer: Stealer<JobRef>,
registry: Arc<Registry>,
index: usize,
) {
let worker_thread = &WorkerThread {
worker,
stealer,
fifo: JobFifo::new(),
index,
rng: XorShift64Star::new(),
registry,
};
unsafe fn main_loop(thread: ThreadBuilder) {
let worker_thread = &WorkerThread::from(thread);
WorkerThread::set_current(worker_thread);
let registry = &*worker_thread.registry;
let index = worker_thread.index;

// let registry know we are ready to do work
Latch::set(&registry.thread_infos[index].primed);
Expand Down Expand Up @@ -924,7 +966,7 @@ where
// invalidated until we return.
op(&*owner_thread, false)
} else {
global_registry().in_worker_cold(op)
global_registry().in_worker(op)
}
}
}
Expand Down
Loading

0 comments on commit 76a3030

Please sign in to comment.