diff --git a/crates/turbo-tasks-malloc/src/counter.rs b/crates/turbo-tasks-malloc/src/counter.rs index 74199f2c77527..abefbebb15a78 100644 --- a/crates/turbo-tasks-malloc/src/counter.rs +++ b/crates/turbo-tasks-malloc/src/counter.rs @@ -4,6 +4,8 @@ use std::{ sync::atomic::{AtomicUsize, Ordering}, }; +use crate::AllocationCounters; + static ALLOCATED: AtomicUsize = AtomicUsize::new(0); const KB: usize = 1024; /// When global counter is updates we will keep a thread-local buffer of this @@ -13,23 +15,6 @@ const TARGET_BUFFER: usize = 100 * KB; /// global counter. const MAX_BUFFER: usize = 200 * KB; -#[derive(Default)] -pub struct AllocationInfo { - pub allocations: usize, - pub deallocations: usize, - pub allocation_count: usize, - pub deallocation_count: usize, -} - -impl AllocationInfo { - pub fn is_empty(&self) -> bool { - self.allocations == 0 - && self.deallocations == 0 - && self.allocation_count == 0 - && self.deallocation_count == 0 - } -} - #[derive(Default)] struct ThreadLocalCounter { /// Thread-local buffer of allocated bytes that have been added to the @@ -37,13 +22,13 @@ struct ThreadLocalCounter { /// means the global counter is always equal or greater than the real /// value. buffer: usize, - allocation_info: AllocationInfo, + allocation_counters: AllocationCounters, } impl ThreadLocalCounter { fn add(&mut self, size: usize) { - self.allocation_info.allocations += size; - self.allocation_info.allocation_count += 1; + self.allocation_counters.allocations += size; + self.allocation_counters.allocation_count += 1; if self.buffer >= size { self.buffer -= size; } else { @@ -54,8 +39,8 @@ impl ThreadLocalCounter { } fn remove(&mut self, size: usize) { - self.allocation_info.deallocations += size; - self.allocation_info.deallocation_count += 1; + self.allocation_counters.deallocations += size; + self.allocation_counters.deallocation_count += 1; self.buffer += size; if self.buffer > MAX_BUFFER { let offset = self.buffer - TARGET_BUFFER; @@ -69,6 +54,7 @@ impl ThreadLocalCounter { ALLOCATED.fetch_sub(self.buffer, Ordering::Relaxed); self.buffer = 0; } + self.allocation_counters = AllocationCounters::default(); } } @@ -80,8 +66,8 @@ pub fn get() -> usize { ALLOCATED.load(Ordering::Relaxed) } -pub fn pop_allocations() -> AllocationInfo { - with_local_counter(|local| std::mem::take(&mut local.allocation_info)) +pub fn allocation_counters() -> AllocationCounters { + with_local_counter(|local| local.allocation_counters.clone()) } fn with_local_counter(f: impl FnOnce(&mut ThreadLocalCounter) -> T) -> T { diff --git a/crates/turbo-tasks-malloc/src/lib.rs b/crates/turbo-tasks-malloc/src/lib.rs index 3e0d1462ef19a..5624cc279befc 100644 --- a/crates/turbo-tasks-malloc/src/lib.rs +++ b/crates/turbo-tasks-malloc/src/lib.rs @@ -1,9 +1,50 @@ mod counter; -use std::alloc::{GlobalAlloc, Layout}; +use std::{ + alloc::{GlobalAlloc, Layout}, + marker::PhantomData, +}; use self::counter::{add, flush, get, remove}; +#[derive(Default, Clone, Debug)] +pub struct AllocationInfo { + pub allocations: usize, + pub deallocations: usize, + pub allocation_count: usize, + pub deallocation_count: usize, +} + +impl AllocationInfo { + pub fn is_empty(&self) -> bool { + self.allocations == 0 + && self.deallocations == 0 + && self.allocation_count == 0 + && self.deallocation_count == 0 + } +} + +#[derive(Default, Clone, Debug)] +pub struct AllocationCounters { + pub allocations: usize, + pub deallocations: usize, + pub allocation_count: usize, + pub deallocation_count: usize, + _not_send: PhantomData<*mut ()>, +} + +impl AllocationCounters { + pub fn until_now(&self) -> AllocationInfo { + let new = TurboMalloc::allocation_counters(); + AllocationInfo { + allocations: new.allocations - self.allocations, + deallocations: new.deallocations - self.deallocations, + allocation_count: new.allocation_count - self.allocation_count, + deallocation_count: new.deallocation_count - self.deallocation_count, + } + } +} + /// Turbo's preferred global allocator. This is a new type instead of a type /// alias because you can't use type aliases to instantiate unit types (E0423). pub struct TurboMalloc; @@ -17,8 +58,8 @@ impl TurboMalloc { flush(); } - pub fn pop_allocations() -> self::counter::AllocationInfo { - self::counter::pop_allocations() + pub fn allocation_counters() -> AllocationCounters { + self::counter::allocation_counters() } } diff --git a/crates/turbo-tasks-memory/src/memory_backend.rs b/crates/turbo-tasks-memory/src/memory_backend.rs index 2d43eac6bb996..612359852b7ad 100644 --- a/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/crates/turbo-tasks-memory/src/memory_backend.rs @@ -321,11 +321,12 @@ impl Backend for MemoryBackend { task_id: TaskId, duration: Duration, instant: Instant, + memory_usage: usize, stateful: bool, turbo_tasks: &dyn TurboTasksBackendApi, ) -> bool { let reexecute = self.with_task(task_id, |task| { - task.execution_completed(duration, instant, stateful, self, turbo_tasks) + task.execution_completed(duration, instant, memory_usage, stateful, self, turbo_tasks) }); if !reexecute { self.run_gc(false, turbo_tasks); diff --git a/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs b/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs index ca76792764f12..07d2305d7e571 100644 --- a/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs +++ b/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs @@ -1154,6 +1154,7 @@ impl Backend for MemoryBackendWithPersistedGraph

{ task: TaskId, duration: Duration, _instant: Instant, + _memory_usage: usize, _stateful: bool, turbo_tasks: &dyn TurboTasksBackendApi>, ) -> bool { diff --git a/crates/turbo-tasks-memory/src/task.rs b/crates/turbo-tasks-memory/src/task.rs index d8a11cdbf0d5d..3dba4c3657875 100644 --- a/crates/turbo-tasks-memory/src/task.rs +++ b/crates/turbo-tasks-memory/src/task.rs @@ -961,6 +961,7 @@ impl Task { &self, duration: Duration, instant: Instant, + _memory_usage: usize, stateful: bool, backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, diff --git a/crates/turbo-tasks/Cargo.toml b/crates/turbo-tasks/Cargo.toml index 330affe279574..2b95232eff703 100644 --- a/crates/turbo-tasks/Cargo.toml +++ b/crates/turbo-tasks/Cargo.toml @@ -42,6 +42,7 @@ tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } turbo-tasks-hash = { workspace = true } turbo-tasks-macros = { workspace = true } +turbo-tasks-malloc = { workspace = true } [dev-dependencies] serde_test = "1.0.157" diff --git a/crates/turbo-tasks/src/backend.rs b/crates/turbo-tasks/src/backend.rs index bb12e9af88bfd..90025f90feb01 100644 --- a/crates/turbo-tasks/src/backend.rs +++ b/crates/turbo-tasks/src/backend.rs @@ -228,6 +228,7 @@ pub trait Backend: Sync + Send { task: TaskId, duration: Duration, instant: Instant, + memory_usage: usize, stateful: bool, turbo_tasks: &dyn TurboTasksBackendApi, ) -> bool; diff --git a/crates/turbo-tasks/src/capture_future.rs b/crates/turbo-tasks/src/capture_future.rs new file mode 100644 index 0000000000000..30b87aefdbb01 --- /dev/null +++ b/crates/turbo-tasks/src/capture_future.rs @@ -0,0 +1,76 @@ +use std::{ + future::Future, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll}, + time::{Duration, Instant}, +}; + +use pin_project_lite::pin_project; +use tokio::{task::futures::TaskLocalFuture, task_local}; +use turbo_tasks_malloc::{AllocationInfo, TurboMalloc}; + +task_local! { + static EXTRA: Arc>; +} + +pin_project! { + pub struct CaptureFuture> { + cell: Arc>, + #[pin] + future: TaskLocalFuture>, F>, + duration: Duration, + allocations: usize, + deallocations: usize, + } +} + +impl> CaptureFuture { + pub fn new(future: F) -> Self { + let cell = Arc::new(Mutex::new((Duration::ZERO, 0, 0))); + Self { + future: EXTRA.scope(cell.clone(), future), + cell, + duration: Duration::ZERO, + allocations: 0, + deallocations: 0, + } + } +} + +pub fn add_duration(duration: Duration) { + EXTRA.with(|cell| cell.lock().unwrap().0 += duration); +} + +pub fn add_allocation_info(alloc_info: AllocationInfo) { + EXTRA.with(|cell| { + let mut guard = cell.lock().unwrap(); + guard.1 += alloc_info.allocations; + guard.2 += alloc_info.deallocations; + }); +} + +impl> Future for CaptureFuture { + type Output = (T, Duration, Instant, usize); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let start = Instant::now(); + let start_allocations = TurboMalloc::allocation_counters(); + let result = this.future.poll(cx); + let elapsed = start.elapsed(); + let allocations = start_allocations.until_now(); + *this.duration += elapsed; + *this.allocations += allocations.allocations; + *this.deallocations += allocations.deallocations; + match result { + Poll::Ready(r) => { + let (duration, allocations, deallocations) = *this.cell.lock().unwrap(); + let memory_usage = (*this.allocations + allocations) + .saturating_sub(*this.deallocations + deallocations); + Poll::Ready((r, *this.duration + duration, start + elapsed, memory_usage)) + } + Poll::Pending => Poll::Pending, + } + } +} diff --git a/crates/turbo-tasks/src/lib.rs b/crates/turbo-tasks/src/lib.rs index a61eb78c3bc4a..d6054f40d3039 100644 --- a/crates/turbo-tasks/src/lib.rs +++ b/crates/turbo-tasks/src/lib.rs @@ -35,6 +35,7 @@ #![feature(never_type)] pub mod backend; +mod capture_future; mod collectibles; mod completion; pub mod debug; @@ -64,7 +65,6 @@ pub mod registry; pub mod small_duration; mod state; pub mod task; -mod timed_future; pub mod trace; mod trait_ref; pub mod util; diff --git a/crates/turbo-tasks/src/manager.rs b/crates/turbo-tasks/src/manager.rs index a194964a7123f..d648e89bbb77e 100644 --- a/crates/turbo-tasks/src/manager.rs +++ b/crates/turbo-tasks/src/manager.rs @@ -21,17 +21,18 @@ use nohash_hasher::BuildNoHashHasher; use serde::{de::Visitor, Deserialize, Serialize}; use tokio::{runtime::Handle, select, task_local}; use tracing::{info_span, instrument, trace_span, Instrument, Level}; +use turbo_tasks_malloc::TurboMalloc; use crate::{ backend::{Backend, CellContent, PersistentTaskType, TaskExecutionSpec, TransientTaskType}, + capture_future::{ + CaptureFuture, {self}, + }, event::{Event, EventListener}, id::{BackendJobId, FunctionId, TraitTypeId}, id_factory::IdFactory, raw_vc::{CellId, RawVc}, registry, - timed_future::{ - TimedFuture, {self}, - }, trace::TraceRawVcs, util::StaticOrArc, Completion, ConcreteTaskInput, InvalidationReason, InvalidationReasonSet, SharedReference, @@ -478,8 +479,9 @@ impl TurboTasks { }; async { - let (result, duration, instant) = - TimedFuture::new(AssertUnwindSafe(future).catch_unwind()).await; + let (result, duration, instant, memory_usage) = + CaptureFuture::new(AssertUnwindSafe(future).catch_unwind()) + .await; let result = result.map_err(|any| match any.downcast::() { Ok(owned) => Some(Cow::Owned(*owned)), @@ -491,7 +493,12 @@ impl TurboTasks { this.backend.task_execution_result(task_id, result, &*this); let stateful = this.finish_current_task_state(); this.backend.task_execution_completed( - task_id, duration, instant, stateful, &*this, + task_id, + duration, + instant, + memory_usage, + stateful, + &*this, ) } .instrument(span) @@ -1413,16 +1420,18 @@ pub fn emit(collectible: Vc) { pub async fn spawn_blocking(func: impl FnOnce() -> T + Send + 'static) -> T { let span = trace_span!("blocking operation").or_current(); - let (r, d) = tokio::task::spawn_blocking(|| { + let (result, duration, alloc_info) = tokio::task::spawn_blocking(|| { let _guard = span.entered(); let start = Instant::now(); + let start_allocations = TurboMalloc::allocation_counters(); let r = func(); - (r, start.elapsed()) + (r, start.elapsed(), start_allocations.until_now()) }) .await .unwrap(); - timed_future::add_duration(d); - r + capture_future::add_duration(duration); + capture_future::add_allocation_info(alloc_info); + result } pub fn spawn_thread(func: impl FnOnce() + Send + 'static) { diff --git a/crates/turbo-tasks/src/timed_future.rs b/crates/turbo-tasks/src/timed_future.rs deleted file mode 100644 index 5575a86e6d1b9..0000000000000 --- a/crates/turbo-tasks/src/timed_future.rs +++ /dev/null @@ -1,58 +0,0 @@ -use std::{ - future::Future, - pin::Pin, - sync::{Arc, Mutex}, - task::{Context, Poll}, - time::{Duration, Instant}, -}; - -use pin_project_lite::pin_project; -use tokio::{task::futures::TaskLocalFuture, task_local}; - -task_local! { - static EXTRA_DURATION: Arc>; -} - -pin_project! { - pub struct TimedFuture> { - cell: Arc>, - #[pin] - future: TaskLocalFuture>, F>, - duration: Duration, - } -} - -impl> TimedFuture { - pub fn new(future: F) -> Self { - let cell = Arc::new(Mutex::new(Duration::ZERO)); - Self { - future: EXTRA_DURATION.scope(cell.clone(), future), - cell, - duration: Duration::ZERO, - } - } -} - -pub fn add_duration(duration: Duration) { - EXTRA_DURATION.with(|cell| *cell.lock().unwrap() += duration); -} - -impl> Future for TimedFuture { - type Output = (T, Duration, Instant); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let start = Instant::now(); - let result = this.future.poll(cx); - let elapsed = start.elapsed(); - *this.duration += elapsed; - match result { - Poll::Ready(r) => Poll::Ready(( - r, - *this.duration + *this.cell.lock().unwrap(), - start + elapsed, - )), - Poll::Pending => Poll::Pending, - } - } -} diff --git a/crates/turbopack-trace-server/src/reader/turbopack.rs b/crates/turbopack-trace-server/src/reader/turbopack.rs index 29fe0fad9b40b..f42c599edd415 100644 --- a/crates/turbopack-trace-server/src/reader/turbopack.rs +++ b/crates/turbopack-trace-server/src/reader/turbopack.rs @@ -1,5 +1,5 @@ use std::{ - collections::{HashMap, HashSet}, + collections::{hash_map::Entry, HashMap, HashSet}, sync::Arc, }; @@ -13,12 +13,21 @@ use crate::{ store_container::{StoreContainer, StoreWriteGuard}, }; +#[derive(Default)] +struct AllocationInfo { + allocations: u64, + deallocations: u64, + allocation_count: u64, + deallocation_count: u64, +} + pub struct TurbopackFormat { store: Arc, active_ids: HashMap, queued_rows: HashMap>>, outdated_spans: HashSet, thread_stacks: HashMap>, + thread_allocation_counters: HashMap, self_time_started: HashMap<(SpanIndex, u64), u64>, } @@ -30,6 +39,7 @@ impl TurbopackFormat { queued_rows: HashMap::new(), outdated_spans: HashSet::new(), thread_stacks: HashMap::new(), + thread_allocation_counters: HashMap::new(), self_time_started: HashMap::new(), } } @@ -226,6 +236,58 @@ impl TurbopackFormat { } } } + TraceRow::AllocationCounters { + ts: _, + thread_id, + allocations, + allocation_count, + deallocations, + deallocation_count, + } => { + let info = AllocationInfo { + allocations, + deallocations, + allocation_count, + deallocation_count, + }; + let mut diff = AllocationInfo::default(); + match self.thread_allocation_counters.entry(thread_id) { + Entry::Occupied(mut entry) => { + let counter = entry.get_mut(); + diff.allocations = info.allocations - counter.allocations; + diff.deallocations = info.deallocations - counter.deallocations; + diff.allocation_count = info.allocation_count - counter.allocation_count; + diff.deallocation_count = + info.deallocation_count - counter.deallocation_count; + counter.allocations = info.allocations; + counter.deallocations = info.deallocations; + counter.allocation_count = info.allocation_count; + counter.deallocation_count = info.deallocation_count; + } + Entry::Vacant(entry) => { + entry.insert(info); + } + } + let stack = self.thread_stacks.entry(thread_id).or_default(); + if let Some(&id) = stack.last() { + if diff.allocations > 0 { + store.add_allocation( + id, + diff.allocations, + diff.allocation_count, + &mut self.outdated_spans, + ); + } + if diff.deallocations > 0 { + store.add_deallocation( + id, + diff.deallocations, + diff.deallocation_count, + &mut self.outdated_spans, + ); + } + } + } } } } diff --git a/crates/turbopack-trace-utils/src/raw_trace.rs b/crates/turbopack-trace-utils/src/raw_trace.rs index 627af1267a631..4caf1188c5919 100644 --- a/crates/turbopack-trace-utils/src/raw_trace.rs +++ b/crates/turbopack-trace-utils/src/raw_trace.rs @@ -37,17 +37,14 @@ impl LookupSpan<'a>> RawTraceLayer { } fn report_allocations(&self, ts: u64, thread_id: u64) { - let allocation_info = turbo_tasks_malloc::TurboMalloc::pop_allocations(); - if allocation_info.is_empty() { - return; - } - self.write(TraceRow::Allocation { + let allocation_counters = turbo_tasks_malloc::TurboMalloc::allocation_counters(); + self.write(TraceRow::AllocationCounters { ts, thread_id, - allocations: allocation_info.allocations as u64, - deallocations: allocation_info.deallocations as u64, - allocation_count: allocation_info.allocation_count as u64, - deallocation_count: allocation_info.deallocation_count as u64, + allocations: allocation_counters.allocations as u64, + deallocations: allocation_counters.deallocations as u64, + allocation_count: allocation_counters.allocation_count as u64, + deallocation_count: allocation_counters.deallocation_count as u64, }); } } diff --git a/crates/turbopack-trace-utils/src/tracing.rs b/crates/turbopack-trace-utils/src/tracing.rs index 0f38d654dea55..76e4ab159d297 100644 --- a/crates/turbopack-trace-utils/src/tracing.rs +++ b/crates/turbopack-trace-utils/src/tracing.rs @@ -84,6 +84,22 @@ pub enum TraceRow<'a> { /// Deallocation count deallocation_count: u64, }, + /// Data about (de)allocations per thread counters. Actual allocations can + /// be computed from the difference. + AllocationCounters { + /// Timestamp + ts: u64, + /// The thread id of the thread where allocations happend. + thread_id: u64, + /// Allocations + allocations: u64, + /// Allocation count + allocation_count: u64, + /// Deallocations + deallocations: u64, + /// Deallocation count + deallocation_count: u64, + }, } #[derive(Debug, Serialize, Deserialize)]