Skip to content

Commit

Permalink
Refactor scope latches to reduce matching
Browse files Browse the repository at this point in the history
The former `enum ScopeLatch` forced a `match` during both `increment`
and `set` (decrement), even though both variants only need to update an
`AtomicUsize` most of the time. rayon-rs#1057 helped hide that for `increment`,
but `set` branching still showed up in perf profiles.

Now this is refactored to a unified `CountLatch` that has a direct field
for its `counter` used in the frequent case, and then an internal enum
for the one-time notification variants. Therefore, most of its updates
will have no `match` reached at all.

The only other use of the former `CountLatch` was the one-shot
termination latch in `WorkerThread`, so that's now renamed to
`OnceLatch`.
  • Loading branch information
cuviper committed Jun 21, 2023
1 parent d5e8f8d commit 32d3774
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 168 deletions.
5 changes: 2 additions & 3 deletions rayon-core/src/broadcast/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::job::{ArcJob, StackJob};
use crate::latch::LatchRef;
use crate::latch::{CountLatch, LatchRef};
use crate::registry::{Registry, WorkerThread};
use crate::scope::ScopeLatch;
use std::fmt;
use std::marker::PhantomData;
use std::sync::Arc;
Expand Down Expand Up @@ -107,7 +106,7 @@ where

let n_threads = registry.num_threads();
let current_thread = WorkerThread::current().as_ref();
let latch = ScopeLatch::with_count(n_threads, current_thread);
let latch = CountLatch::with_count(n_threads, current_thread);
let jobs: Vec<_> = (0..n_threads)
.map(|_| StackJob::new(&f, LatchRef::new(&latch)))
.collect();
Expand Down
163 changes: 107 additions & 56 deletions rayon-core/src/latch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ impl CoreLatch {
}
}

impl AsCoreLatch for CoreLatch {
#[inline]
fn as_core_latch(&self) -> &CoreLatch {
self
}
}

/// Spin latches are the simplest, most efficient kind, but they do
/// not support a `wait()` operation. They just have a boolean flag
/// that becomes true when `set()` is called.
Expand Down Expand Up @@ -269,96 +276,114 @@ impl Latch for LockLatch {
}
}

/// Counting latches are used to implement scopes. They track a
/// counter. Unlike other latches, calling `set()` does not
/// necessarily make the latch be considered `set()`; instead, it just
/// decrements the counter. The latch is only "set" (in the sense that
/// `probe()` returns true) once the counter reaches zero.
/// Once latches are used to implement one-time blocking, primarily
/// for the termination flag of the threads in the pool.
///
/// Note: like a `SpinLatch`, count laches are always associated with
/// Note: like a `SpinLatch`, once-latches are always associated with
/// some registry that is probing them, which must be tickled when
/// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a
/// reference to that registry. This is because in some cases the
/// registry owns the count-latch, and that would create a cycle. So a
/// `CountLatch` must be given a reference to its owning registry when
/// registry owns the once-latch, and that would create a cycle. So a
/// `OnceLatch` must be given a reference to its owning registry when
/// it is set. For this reason, it does not implement the `Latch`
/// trait (but it doesn't have to, as it is not used in those generic
/// contexts).
#[derive(Debug)]
pub(super) struct CountLatch {
// counter is first to nudge layout like CountLockLatch
counter: AtomicUsize,
pub(super) struct OnceLatch {
core_latch: CoreLatch,
}

impl CountLatch {
#[inline]
pub(super) fn new() -> CountLatch {
Self::with_count(1)
}

impl OnceLatch {
#[inline]
pub(super) fn with_count(n: usize) -> CountLatch {
CountLatch {
pub(super) fn new() -> OnceLatch {
Self {
core_latch: CoreLatch::new(),
counter: AtomicUsize::new(n),
}
}

#[inline]
pub(super) fn increment(&self) {
debug_assert!(!self.core_latch.probe());
self.counter.fetch_add(1, Ordering::Relaxed);
}

/// Decrements the latch counter by one. If this is the final
/// count, then the latch is **set**, and calls to `probe()` will
/// return true. Returns whether the latch was set.
#[inline]
pub(super) unsafe fn set(this: *const Self) -> bool {
if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
CoreLatch::set(&(*this).core_latch);
true
} else {
false
}
}

/// Decrements the latch counter by one and possibly set it. If
/// the latch is set, then the specific worker thread is tickled,
/// Set the latch, then tickle the specific worker thread,
/// which should be the one that owns this latch.
#[inline]
pub(super) unsafe fn set_and_tickle_one(
this: *const Self,
registry: &Registry,
target_worker_index: usize,
) {
if Self::set(this) {
if CoreLatch::set(&(*this).core_latch) {
registry.notify_worker_latch_is_set(target_worker_index);
}
}
}

impl AsCoreLatch for CountLatch {
impl AsCoreLatch for OnceLatch {
#[inline]
fn as_core_latch(&self) -> &CoreLatch {
&self.core_latch
}
}

/// Counting latches are used to implement scopes. They track a
/// counter. Unlike other latches, calling `set()` does not
/// necessarily make the latch be considered `set()`; instead, it just
/// decrements the counter. The latch is only "set" (in the sense that
/// `probe()` returns true) once the counter reaches zero.
#[derive(Debug)]
pub(super) struct CountLockLatch {
// counter is first to nudge layout like CountLatch
pub(super) struct CountLatch {
counter: AtomicUsize,
lock_latch: LockLatch,
kind: CountLatchKind,
}

impl CountLockLatch {
#[inline]
pub(super) fn with_count(n: usize) -> CountLockLatch {
CountLockLatch {
lock_latch: LockLatch::new(),
counter: AtomicUsize::new(n),
enum CountLatchKind {
/// A latch for scopes created on a rayon thread which will participate in work-
/// stealing while it waits for completion. This thread is not necessarily part
/// of the same registry as the scope itself!
Stealing {
latch: CoreLatch,
/// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
/// with registry B, when a job completes in a thread of registry B, we may
/// need to call `notify_worker_latch_is_set()` to wake the thread in registry A.
/// That means we need a reference to registry A (since at that point we will
/// only have a reference to registry B), so we stash it here.
registry: Arc<Registry>,
/// The index of the worker to wake in `registry`
worker_index: usize,
},

/// A latch for scopes created on a non-rayon thread which will block to wait.
Blocking { latch: LockLatch },
}

impl std::fmt::Debug for CountLatchKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CountLatchKind::Stealing { latch, .. } => {
f.debug_tuple("Stealing").field(latch).finish()
}
CountLatchKind::Blocking { latch, .. } => {
f.debug_tuple("Blocking").field(latch).finish()
}
}
}
}

impl CountLatch {
pub(super) fn new(owner: Option<&WorkerThread>) -> Self {
Self::with_count(1, owner)
}

pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self {
Self {
counter: AtomicUsize::new(count),
kind: match owner {
Some(owner) => CountLatchKind::Stealing {
latch: CoreLatch::new(),
registry: Arc::clone(owner.registry()),
worker_index: owner.index(),
},
None => CountLatchKind::Blocking {
latch: LockLatch::new(),
},
},
}
}

Expand All @@ -368,16 +393,42 @@ impl CountLockLatch {
debug_assert!(old_counter != 0);
}

pub(super) fn wait(&self) {
self.lock_latch.wait();
pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
match &self.kind {
CountLatchKind::Stealing {
latch,
registry,
worker_index,
} => unsafe {
let owner = owner.expect("owner thread");
debug_assert_eq!(registry.id(), owner.registry().id());
debug_assert_eq!(*worker_index, owner.index());
owner.wait_until(latch);
},
CountLatchKind::Blocking { latch } => latch.wait(),
}
}
}

impl Latch for CountLockLatch {
impl Latch for CountLatch {
#[inline]
unsafe fn set(this: *const Self) {
if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
LockLatch::set(&(*this).lock_latch);
// NOTE: Once we call `set` on the internal `latch`,
// the target may proceed and invalidate `this`!
match (*this).kind {
CountLatchKind::Stealing {
ref latch,
ref registry,
worker_index,
} => {
let registry = Arc::clone(registry);
if CoreLatch::set(latch) {
registry.notify_worker_latch_is_set(worker_index);
}
}
CountLatchKind::Blocking { ref latch } => LockLatch::set(latch),
}
}
}
}
Expand Down
11 changes: 4 additions & 7 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::job::{JobFifo, JobRef, StackJob};
use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LatchRef, LockLatch, SpinLatch};
use crate::latch::{AsCoreLatch, CoreLatch, Latch, LatchRef, LockLatch, OnceLatch, SpinLatch};
use crate::log::Event::*;
use crate::log::Logger;
use crate::sleep::Sleep;
Expand Down Expand Up @@ -610,7 +610,7 @@ impl Registry {
pub(super) fn terminate(&self) {
if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
for (i, thread_info) in self.thread_infos.iter().enumerate() {
unsafe { CountLatch::set_and_tickle_one(&thread_info.terminate, self, i) };
unsafe { OnceLatch::set_and_tickle_one(&thread_info.terminate, self, i) };
}
}
}
Expand Down Expand Up @@ -640,10 +640,7 @@ struct ThreadInfo {
/// This latch is *set* by the `terminate` method on the
/// `Registry`, once the registry's main "terminate" counter
/// reaches zero.
///
/// NB. We use a `CountLatch` here because it has no lifetimes and is
/// meant for async use, but the count never gets higher than one.
terminate: CountLatch,
terminate: OnceLatch,

/// the "stealer" half of the worker's deque
stealer: Stealer<JobRef>,
Expand All @@ -654,7 +651,7 @@ impl ThreadInfo {
ThreadInfo {
primed: LockLatch::new(),
stopped: LockLatch::new(),
terminate: CountLatch::new(),
terminate: OnceLatch::new(),
stealer,
}
}
Expand Down
Loading

0 comments on commit 32d3774

Please sign in to comment.