From 4c94258a82cb2bc1eea00dc0b4fcf481b7769fe2 Mon Sep 17 00:00:00 2001 From: Nikolai Vazquez Date: Sun, 24 Nov 2024 17:31:15 -0500 Subject: [PATCH] Reuse threads across multi-threaded benchmarks Once Divan spawns threads, it will keep them around for later benchmarks to reuse. The result is that when running Divan benchmarks under a sampling profiler, the profiler's output will be cleaner and easier to understand. Fixes #37. Closes #44. --- CHANGELOG.md | 4 + src/bench/mod.rs | 85 ++++------- src/bench/tests.rs | 5 + src/divan.rs | 6 +- src/lib.rs | 1 + src/thread_pool.rs | 350 +++++++++++++++++++++++++++++++++++++++++++++ src/util/mod.rs | 17 +++ src/util/sync.rs | 2 +- 8 files changed, 409 insertions(+), 61 deletions(-) create mode 100644 src/thread_pool.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 2666732..12e0cbf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ Versioning](http://semver.org/spec/v2.0.0.html). ### Added +- Thread pool for reusing threads across multi-threaded benchmarks. The result + is that when running Divan benchmarks under a sampling profiler, the + profiler's output will be cleaner and easier to understand. + - Track the maximum number of allocations during a benchmark. ### Changed diff --git a/src/bench/mod.rs b/src/bench/mod.rs index b1dfc7d..1e79c3a 100644 --- a/src/bench/mod.rs +++ b/src/bench/mod.rs @@ -4,7 +4,6 @@ use std::{ mem::{self, MaybeUninit}, num::NonZeroUsize, sync::Barrier, - thread, }; use crate::{ @@ -18,6 +17,7 @@ use crate::{ }, divan::SharedContext, stats::{RawSample, SampleCollection, Stats, StatsSet, TimeSample}, + thread_pool::BENCH_POOL, time::{FineDuration, Timestamp, UntaggedTimestamp}, util::{self, sync::SyncWrap, Unit}, }; @@ -607,7 +607,6 @@ impl<'a> BenchContext<'a> { let is_test = current_mode.is_test(); let record_sample = self.sample_recorder(gen_input, benched, drop_input); - let mut defer_store = DeferStore::default(); let thread_count = self.thread_count.get(); let aux_thread_count = thread_count - 1; @@ -617,7 +616,7 @@ impl<'a> BenchContext<'a> { // Per-thread sample info returned by `record_sample`. These are // processed locally to emit user-facing sample info. As a result, this // only contains `thread_count` many elements at a time. - let mut raw_samples = Vec::::new(); + let mut raw_samples = Vec::>::new(); // The time spent benchmarking, in picoseconds. // @@ -678,7 +677,7 @@ impl<'a> BenchContext<'a> { let barrier = if is_single_thread { None } else { Some(Barrier::new(thread_count)) }; // Sample loop helper: - let record_sample = |defer_store: &mut DeferStore| -> RawSample { + let record_sample = || -> RawSample { let mut counter_totals: [u128; KnownCounterKind::COUNT] = [0; KnownCounterKind::COUNT]; @@ -697,57 +696,31 @@ impl<'a> BenchContext<'a> { }; // Sample loop: - let ([start, end], alloc_info) = record_sample( - sample_size as usize, - barrier.as_ref(), - defer_store, - &mut count_input, - ); + let ([start, end], alloc_info) = + record_sample(sample_size as usize, barrier.as_ref(), &mut count_input); RawSample { start, end, timer, alloc_info, counter_totals } }; // Sample loop: raw_samples.clear(); - if is_single_thread { - let sample = record_sample(&mut defer_store); - if !is_test { - raw_samples.push(sample); + BENCH_POOL.par_extend(&mut raw_samples, aux_thread_count, |_| record_sample()); + + // Convert `&[Option]` to `&[Sample]`. + let raw_samples: &[RawSample] = { + if let Some(thread) = raw_samples + .iter() + .enumerate() + .find_map(|(thread, sample)| sample.is_none().then_some(thread)) + { + panic!("Divan benchmarking thread {thread} panicked"); } - } else { - // TODO: Reuse auxiliary threads across samples. - thread::scope(|scope| { - let thread_handles: Vec<_> = (0..aux_thread_count) - .map(|_| scope.spawn(|| record_sample(&mut DeferStore::default()))) - .collect(); - - let local_sample = record_sample(&mut defer_store); - - if !is_test { - raw_samples.extend( - thread_handles - .into_iter() - .map(|handle| { - // Propagate panics to behave the same as - // automatic joining. - handle - .join() - .unwrap_or_else(|error| std::panic::resume_unwind(error)) - }) - .chain(Some(local_sample)), - ); - } - }); - } - #[cfg(test)] - if is_test { - // '--test' should run the expected number of times but not - // allocate any samples. - assert_eq!(raw_samples.capacity(), 0); - } else { - assert_eq!(raw_samples.len(), thread_count); - } + unsafe { + assert_eq!(size_of::(), size_of::>()); + std::slice::from_raw_parts(raw_samples.as_ptr().cast(), raw_samples.len()) + } + }; // If testing, exit the benchmarking loop immediately after timing a // single run. @@ -794,7 +767,7 @@ impl<'a> BenchContext<'a> { .clamp_to(timer_precision) }; - for raw_sample in &raw_samples { + for raw_sample in raw_samples { let sample_index = self.samples.time_samples.len(); self.samples @@ -849,12 +822,8 @@ impl<'a> BenchContext<'a> { gen_input: impl Fn() -> I, benched: impl Fn(&UnsafeCell>) -> O, drop_input: impl Fn(&UnsafeCell>), - ) -> impl Fn( - usize, - Option<&Barrier>, - &mut DeferStore, - &mut dyn FnMut(&I), - ) -> ([Timestamp; 2], ThreadAllocInfo) { + ) -> impl Fn(usize, Option<&Barrier>, &mut dyn FnMut(&I)) -> ([Timestamp; 2], ThreadAllocInfo) + { // We defer: // - Usage of `gen_input` values. // - Drop destructor for `O`, preventing it from affecting sample @@ -864,12 +833,10 @@ impl<'a> BenchContext<'a> { let timer_kind = self.shared_context.timer.kind(); - move |sample_size: usize, - barrier: Option<&Barrier>, - defer_store: &mut DeferStore, - count_input: &mut dyn FnMut(&I)| { - let mut saved_alloc_info = ThreadAllocInfo::new(); + move |sample_size: usize, barrier: Option<&Barrier>, count_input: &mut dyn FnMut(&I)| { + let mut defer_store = DeferStore::::default(); + let mut saved_alloc_info = ThreadAllocInfo::new(); let mut save_alloc_info = || { if crate::alloc::IGNORE_ALLOC.get() { return; diff --git a/src/bench/tests.rs b/src/bench/tests.rs index da7072e..22f006f 100644 --- a/src/bench/tests.rs +++ b/src/bench/tests.rs @@ -6,6 +6,8 @@ use std::{ sync::atomic::{AtomicUsize, Ordering::SeqCst}, }; +use util::defer; + use super::*; use crate::{ config::Action, @@ -33,6 +35,9 @@ const THREAD_COUNTS: &[usize] = if cfg!(miri) { #[track_caller] fn test_bencher(test: &mut dyn FnMut(Bencher)) { + // Silence Miri about leaking threads. + let _drop_threads = defer(|| BENCH_POOL.drop_threads()); + let bench_options = BenchOptions { sample_count: Some(SAMPLE_COUNT), sample_size: Some(SAMPLE_SIZE), diff --git a/src/divan.rs b/src/divan.rs index e35a40e..801131f 100644 --- a/src/divan.rs +++ b/src/divan.rs @@ -13,9 +13,11 @@ use crate::{ PrivBytesFormat, }, entry::{AnyBenchEntry, BenchEntryRunner, EntryTree}, + thread_pool::BENCH_POOL, time::{Timer, TimerKind}, tree_painter::{TreeColumn, TreePainter}, - util, Bencher, + util::{self, defer}, + Bencher, }; /// The benchmark runner. @@ -94,6 +96,8 @@ impl Divan { } pub(crate) fn run_action(&self, action: Action) { + let _drop_threads = defer(|| BENCH_POOL.drop_threads()); + let mut tree: Vec = if cfg!(miri) { // Miri does not work with our linker tricks. Vec::new() diff --git a/src/lib.rs b/src/lib.rs index 31b6c7d..ad24b46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,7 @@ mod config; mod divan; mod entry; mod stats; +mod thread_pool; mod time; mod tree_painter; mod util; diff --git a/src/thread_pool.rs b/src/thread_pool.rs new file mode 100644 index 0000000..9f813d9 --- /dev/null +++ b/src/thread_pool.rs @@ -0,0 +1,350 @@ +use std::{ + num::NonZeroUsize, + panic::AssertUnwindSafe, + ptr::NonNull, + sync::{ + atomic::{AtomicUsize, Ordering}, + mpsc, Mutex, PoisonError, + }, + thread::Thread, +}; + +use crate::util::{defer, sync::SyncWrap}; + +/// Single shared thread pool for running benchmarks on. +pub(crate) static BENCH_POOL: ThreadPool = ThreadPool::new(); + +/// Reusable threads for broadcasting tasks. +/// +/// This thread pool runs only a single task at a time, since only one benchmark +/// should run at a time. Invoking `broadcast` from two threads will cause one +/// thread to wait for the other to finish. +/// +/// # How It Works +/// +/// Upon calling `broadcast`: +/// +/// 1. The main thread creates a `Task`, which is a pointer to a `TaskShared` +/// pinned on the stack. `TaskShared` stores the function to run, along with +/// other fields for coordinating threads. +/// +/// 2. New threads are spawned if the requested amount is not available. Each +/// receives tasks over an associated channel. +/// +/// 3. The main thread sends the `Task` over the channels to the requested +/// amount of threads. Upon receiving the task, each auxiliary thread will +/// execute it and then decrement the task's reference count. +/// +/// 4. The main thread executes the `Task` like auxiliary threads. It then waits +/// until the reference count is 0 before returning. +pub(crate) struct ThreadPool { + threads: Mutex>>, +} + +impl ThreadPool { + const fn new() -> Self { + Self { threads: Mutex::new(Vec::new()) } + } + + /// Performs the given task and pushes the results into a `vec`. + #[inline] + pub fn par_extend(&self, vec: &mut Vec>, aux_threads: usize, task: F) + where + F: Sync + Fn(usize) -> T, + T: Sync + Send, + { + unsafe { + let old_len = vec.len(); + let additional = aux_threads + 1; + + vec.reserve_exact(additional); + vec.spare_capacity_mut().iter_mut().for_each(|val| { + val.write(None); + }); + vec.set_len(old_len + additional); + + let ptr = SyncWrap::new(vec.as_mut_ptr().add(old_len)); + + self.broadcast(aux_threads, move |index| { + ptr.add(index).write(Some(task(index))); + }); + } + } + + /// Performs the given task across the current thread and auxiliary worker + /// threads. + /// + /// This function returns once all threads complete the task. + #[inline] + pub fn broadcast(&self, aux_threads: usize, task: F) + where + F: Sync + Fn(usize), + { + // SAFETY: The `TaskShared` instance is guaranteed to be accessible to + // all threads until this function returns, because this thread waits + // until `aux_threads` is 0 before continuing. + unsafe { + let task = TaskShared::new(aux_threads, task); + let task = Task { shared: NonNull::from(&task).cast() }; + + self.broadcast_task(aux_threads, task); + } + } + + /// Type-erased monomorphized implementation for `broadcast`. + unsafe fn broadcast_task(&self, aux_threads: usize, task: Task) { + // Send task to auxiliary threads. + if aux_threads > 0 { + let threads = &mut *self.threads.lock().unwrap_or_else(PoisonError::into_inner); + + // Spawn more threads if necessary. + if let Some(additional) = NonZeroUsize::new(aux_threads.saturating_sub(threads.len())) { + spawn(additional, threads); + } + + for thread in &threads[..aux_threads] { + thread.send(task).unwrap(); + } + } + + // Run the task on the main thread. + let main_result = std::panic::catch_unwind(AssertUnwindSafe(|| task.run(0))); + + // Wait for other threads to finish writing their results. + // + // SAFETY: The acquire memory ordering ensures that all writes performed + // by the task on other threads will become visible to this thread after + // returning from `broadcast`. + while task.shared.as_ref().ref_count.load(Ordering::Acquire) > 0 { + std::thread::park(); + } + + // Don't drop our result until other threads finish, in case the panic + // error's drop handler itself also panics. + drop(main_result); + } + + pub fn drop_threads(&self) { + *self.threads.lock().unwrap_or_else(PoisonError::into_inner) = Default::default(); + } + + #[cfg(test)] + fn aux_thread_count(&self) -> usize { + self.threads.lock().unwrap_or_else(PoisonError::into_inner).len() + } +} + +/// Type-erased function and metadata. +#[derive(Clone, Copy)] +struct Task { + shared: NonNull>, +} + +unsafe impl Send for Task {} +unsafe impl Sync for Task {} + +impl Task { + /// Runs this task on behalf of `thread_id`. + /// + /// # Safety + /// + /// The caller must ensure: + /// + /// - This task has not outlived the `TaskShared` it came from, or else + /// there will be a use-after-free. + /// + /// - `thread_id` is within the number of `broadcast` threads requested, so + /// that it can be used to index input or output buffers. + #[inline] + unsafe fn run(&self, thread_id: usize) { + let shared_ptr = self.shared.as_ptr(); + let shared = &*shared_ptr; + + (shared.task_fn_ptr)(shared_ptr.cast(), thread_id); + } +} + +/// Data stored on the main thread that gets shared with auxiliary threads. +/// +/// # Memory Layout +/// +/// Since the benchmark may have thrashed the cache, this type's fields are +/// ordered by usage order. This type is also placed on its own cache line. +#[repr(C)] +struct TaskShared { + /// Once an auxiliary thread sets `aux_threads` to 0, it should notify the + /// main thread to wake up. + main_thread: Thread, + + /// The number of auxiliary threads executing the task. + /// + /// Once this is 0, the main thread can read any results the task produced. + ref_count: AtomicUsize, + + /// Performs `*result = Some(task_fn(thread))`. + task_fn_ptr: unsafe fn(task: *const TaskShared<()>, thread: usize), + + /// Stores the closure state of the provided task. + /// + /// This must be stored as the last field so that all other fields are in + /// the same place regardless of this field's type. + task_fn: F, +} + +impl TaskShared { + #[inline] + fn new(aux_threads: usize, task_fn: F) -> Self + where + F: Sync + Fn(usize), + { + unsafe fn call(task: *const TaskShared<()>, thread: usize) + where + F: Fn(usize), + { + let task_fn = &(*task.cast::>()).task_fn; + + task_fn(thread); + } + + Self { + main_thread: std::thread::current(), + ref_count: AtomicUsize::new(aux_threads), + task_fn_ptr: call::, + task_fn, + } + } +} + +/// Spawns N additional threads and appends their channels to the list. +/// +/// Threads are given names like `divan-3` to indicate that. +#[cold] +fn spawn(additional: NonZeroUsize, threads: &mut Vec>) { + let next_thread_id = threads.len() + 1; + + threads.extend((next_thread_id..(next_thread_id + additional.get())).map(|thread_id| { + // Create single-task channel. + // + // If we used 0 (rendezvous channel), then the main thread would block + // while sending a task until the auxiliary thread accepts the task. + // Using a capacity of 1 allows the main thread to immediately progress + // onto finishing its local work, including sending the task to all + // remaining threads. + let (sender, receiver) = mpsc::sync_channel::(1); + + let work = move || { + // Abort the process if the caught panic error itself panics when + // dropped. + let panic_guard = defer(|| std::process::abort()); + + while let Ok(task) = receiver.recv() { + // Run the task on this auxiliary thread. + // + // SAFETY: The task is valid until `aux_threads == 0`. + let result = + std::panic::catch_unwind(AssertUnwindSafe(|| unsafe { task.run(thread_id) })); + + // Decrement the `aux_threads` count to notify the main thread + // that we finished our work. + // + // SAFETY: This release operation makes writes within the task + // become visible to the main thread. + unsafe { + // Clone the main thread's handle for unparking because the + // `TaskShared` will be invalidated when `ref_count` is 0. + let main_thread = task.shared.as_ref().main_thread.clone(); + + if task.shared.as_ref().ref_count.fetch_sub(1, Ordering::Release) == 1 { + main_thread.unpark(); + } + } + + // Don't drop our result until after notifying the main thread, + // in case the panic error's drop handler itself also panics. + drop(result); + } + + std::mem::forget(panic_guard); + }; + + std::thread::Builder::new() + .name(format!("divan-{thread_id}")) + .spawn(work) + .expect("failed to spawn thread"); + + sender + })); +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Make every thread write its ID to a buffer and then check that the + /// buffer contains all IDs. + #[test] + fn extend() { + static TEST_POOL: ThreadPool = ThreadPool::new(); + + fn test(aux_threads: usize, final_aux_threads: usize) { + let total_threads = aux_threads + 1; + + let mut results = Vec::new(); + let expected = (0..total_threads).map(Some).collect::>(); + + TEST_POOL.par_extend(&mut results, aux_threads, |index| index); + + assert_eq!(results, expected); + assert_eq!(TEST_POOL.aux_thread_count(), final_aux_threads); + } + + test(0, 0); + test(1, 1); + test(2, 2); + test(3, 3); + test(4, 4); + test(8, 8); + + // Decreasing auxiliary threads on later calls should still leave + // previously spawned threads running. + test(4, 8); + test(0, 8); + + // Silence Miri about leaking threads. + TEST_POOL.drop_threads(); + } + + /// Execute a task that takes longer on all other threads than the main + /// thread. + #[test] + fn broadcast_sleep() { + use std::time::Duration; + + static TEST_POOL: ThreadPool = ThreadPool::new(); + + TEST_POOL.broadcast(10, |thread_id| { + if thread_id > 0 { + std::thread::sleep(Duration::from_millis(10)); + } + }); + + // Silence Miri about leaking threads. + TEST_POOL.drop_threads(); + } + + /// Checks that thread ID 0 refers to the main thread. + #[test] + fn broadcast_thread_id() { + static TEST_POOL: ThreadPool = ThreadPool::new(); + + let main_thread = std::thread::current().id(); + + TEST_POOL.broadcast(10, |thread_id| { + let is_main = main_thread == std::thread::current().id(); + assert_eq!(is_main, thread_id == 0); + }); + + // Silence Miri about leaking threads. + TEST_POOL.drop_threads(); + } +} diff --git a/src/util/mod.rs b/src/util/mod.rs index 6788384..1b54520 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,4 +1,5 @@ use std::{ + mem::ManuallyDrop, num::NonZeroUsize, sync::atomic::{AtomicUsize, Ordering::Relaxed}, }; @@ -16,6 +17,22 @@ pub mod ty; #[non_exhaustive] pub struct Unit; +#[inline] +pub(crate) fn defer(f: F) -> impl Drop { + struct Defer(ManuallyDrop); + + impl Drop for Defer { + #[inline] + fn drop(&mut self) { + let f = unsafe { ManuallyDrop::take(&mut self.0) }; + + f(); + } + } + + Defer(ManuallyDrop::new(f)) +} + /// Returns the index of `ptr` in the slice, assuming it is in the slice. #[inline] pub(crate) fn slice_ptr_index(slice: &[T], ptr: *const T) -> usize { diff --git a/src/util/sync.rs b/src/util/sync.rs index 73fec91..d84f07b 100644 --- a/src/util/sync.rs +++ b/src/util/sync.rs @@ -9,7 +9,7 @@ use std::{ /// Makes the wrapped value [`Send`] + [`Sync`] even though it isn't. pub struct SyncWrap { - value: T, + pub value: T, } unsafe impl Sync for SyncWrap {}