Skip to content

Commit

Permalink
refactor memory tracking (vercel/turborepo#8130)
Browse files Browse the repository at this point in the history
### Description

refactor memory for tracing
track memory for tasks

### Testing Instructions

<!--
  Give a quick description of steps to test your changes.
-->
  • Loading branch information
sokra authored May 16, 2024
1 parent a6b6f0c commit a31540b
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 107 deletions.
34 changes: 10 additions & 24 deletions crates/turbo-tasks-malloc/src/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::{
sync::atomic::{AtomicUsize, Ordering},
};

use crate::AllocationCounters;

static ALLOCATED: AtomicUsize = AtomicUsize::new(0);
const KB: usize = 1024;
/// When global counter is updates we will keep a thread-local buffer of this
Expand All @@ -13,37 +15,20 @@ const TARGET_BUFFER: usize = 100 * KB;
/// global counter.
const MAX_BUFFER: usize = 200 * KB;

#[derive(Default)]
pub struct AllocationInfo {
pub allocations: usize,
pub deallocations: usize,
pub allocation_count: usize,
pub deallocation_count: usize,
}

impl AllocationInfo {
pub fn is_empty(&self) -> bool {
self.allocations == 0
&& self.deallocations == 0
&& self.allocation_count == 0
&& self.deallocation_count == 0
}
}

#[derive(Default)]
struct ThreadLocalCounter {
/// Thread-local buffer of allocated bytes that have been added to the
/// global counter desprite not being allocated yet. It is unsigned so that
/// means the global counter is always equal or greater than the real
/// value.
buffer: usize,
allocation_info: AllocationInfo,
allocation_counters: AllocationCounters,
}

impl ThreadLocalCounter {
fn add(&mut self, size: usize) {
self.allocation_info.allocations += size;
self.allocation_info.allocation_count += 1;
self.allocation_counters.allocations += size;
self.allocation_counters.allocation_count += 1;
if self.buffer >= size {
self.buffer -= size;
} else {
Expand All @@ -54,8 +39,8 @@ impl ThreadLocalCounter {
}

fn remove(&mut self, size: usize) {
self.allocation_info.deallocations += size;
self.allocation_info.deallocation_count += 1;
self.allocation_counters.deallocations += size;
self.allocation_counters.deallocation_count += 1;
self.buffer += size;
if self.buffer > MAX_BUFFER {
let offset = self.buffer - TARGET_BUFFER;
Expand All @@ -69,6 +54,7 @@ impl ThreadLocalCounter {
ALLOCATED.fetch_sub(self.buffer, Ordering::Relaxed);
self.buffer = 0;
}
self.allocation_counters = AllocationCounters::default();
}
}

Expand All @@ -80,8 +66,8 @@ pub fn get() -> usize {
ALLOCATED.load(Ordering::Relaxed)
}

pub fn pop_allocations() -> AllocationInfo {
with_local_counter(|local| std::mem::take(&mut local.allocation_info))
pub fn allocation_counters() -> AllocationCounters {
with_local_counter(|local| local.allocation_counters.clone())
}

fn with_local_counter<T>(f: impl FnOnce(&mut ThreadLocalCounter) -> T) -> T {
Expand Down
47 changes: 44 additions & 3 deletions crates/turbo-tasks-malloc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,50 @@
mod counter;

use std::alloc::{GlobalAlloc, Layout};
use std::{
alloc::{GlobalAlloc, Layout},
marker::PhantomData,
};

use self::counter::{add, flush, get, remove};

#[derive(Default, Clone, Debug)]
pub struct AllocationInfo {
pub allocations: usize,
pub deallocations: usize,
pub allocation_count: usize,
pub deallocation_count: usize,
}

impl AllocationInfo {
pub fn is_empty(&self) -> bool {
self.allocations == 0
&& self.deallocations == 0
&& self.allocation_count == 0
&& self.deallocation_count == 0
}
}

#[derive(Default, Clone, Debug)]
pub struct AllocationCounters {
pub allocations: usize,
pub deallocations: usize,
pub allocation_count: usize,
pub deallocation_count: usize,
_not_send: PhantomData<*mut ()>,
}

impl AllocationCounters {
pub fn until_now(&self) -> AllocationInfo {
let new = TurboMalloc::allocation_counters();
AllocationInfo {
allocations: new.allocations - self.allocations,
deallocations: new.deallocations - self.deallocations,
allocation_count: new.allocation_count - self.allocation_count,
deallocation_count: new.deallocation_count - self.deallocation_count,
}
}
}

/// Turbo's preferred global allocator. This is a new type instead of a type
/// alias because you can't use type aliases to instantiate unit types (E0423).
pub struct TurboMalloc;
Expand All @@ -17,8 +58,8 @@ impl TurboMalloc {
flush();
}

pub fn pop_allocations() -> self::counter::AllocationInfo {
self::counter::pop_allocations()
pub fn allocation_counters() -> AllocationCounters {
self::counter::allocation_counters()
}
}

Expand Down
3 changes: 2 additions & 1 deletion crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,12 @@ impl Backend for MemoryBackend {
task_id: TaskId,
duration: Duration,
instant: Instant,
memory_usage: usize,
stateful: bool,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> bool {
let reexecute = self.with_task(task_id, |task| {
task.execution_completed(duration, instant, stateful, self, turbo_tasks)
task.execution_completed(duration, instant, memory_usage, stateful, self, turbo_tasks)
});
if !reexecute {
self.run_gc(false, turbo_tasks);
Expand Down
1 change: 1 addition & 0 deletions crates/turbo-tasks-memory/src/memory_backend_with_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,7 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
task: TaskId,
duration: Duration,
_instant: Instant,
_memory_usage: usize,
_stateful: bool,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackendWithPersistedGraph<P>>,
) -> bool {
Expand Down
1 change: 1 addition & 0 deletions crates/turbo-tasks-memory/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,7 @@ impl Task {
&self,
duration: Duration,
instant: Instant,
_memory_usage: usize,
stateful: bool,
backend: &MemoryBackend,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
Expand Down
1 change: 1 addition & 0 deletions crates/turbo-tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
turbo-tasks-hash = { workspace = true }
turbo-tasks-macros = { workspace = true }
turbo-tasks-malloc = { workspace = true }

[dev-dependencies]
serde_test = "1.0.157"
Expand Down
1 change: 1 addition & 0 deletions crates/turbo-tasks/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ pub trait Backend: Sync + Send {
task: TaskId,
duration: Duration,
instant: Instant,
memory_usage: usize,
stateful: bool,
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) -> bool;
Expand Down
76 changes: 76 additions & 0 deletions crates/turbo-tasks/src/capture_future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
time::{Duration, Instant},
};

use pin_project_lite::pin_project;
use tokio::{task::futures::TaskLocalFuture, task_local};
use turbo_tasks_malloc::{AllocationInfo, TurboMalloc};

task_local! {
static EXTRA: Arc<Mutex<(Duration, usize, usize)>>;
}

pin_project! {
pub struct CaptureFuture<T, F: Future<Output = T>> {
cell: Arc<Mutex<(Duration, usize, usize)>>,
#[pin]
future: TaskLocalFuture<Arc<Mutex<(Duration, usize, usize)>>, F>,
duration: Duration,
allocations: usize,
deallocations: usize,
}
}

impl<T, F: Future<Output = T>> CaptureFuture<T, F> {
pub fn new(future: F) -> Self {
let cell = Arc::new(Mutex::new((Duration::ZERO, 0, 0)));
Self {
future: EXTRA.scope(cell.clone(), future),
cell,
duration: Duration::ZERO,
allocations: 0,
deallocations: 0,
}
}
}

pub fn add_duration(duration: Duration) {
EXTRA.with(|cell| cell.lock().unwrap().0 += duration);
}

pub fn add_allocation_info(alloc_info: AllocationInfo) {
EXTRA.with(|cell| {
let mut guard = cell.lock().unwrap();
guard.1 += alloc_info.allocations;
guard.2 += alloc_info.deallocations;
});
}

impl<T, F: Future<Output = T>> Future for CaptureFuture<T, F> {
type Output = (T, Duration, Instant, usize);

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let start = Instant::now();
let start_allocations = TurboMalloc::allocation_counters();
let result = this.future.poll(cx);
let elapsed = start.elapsed();
let allocations = start_allocations.until_now();
*this.duration += elapsed;
*this.allocations += allocations.allocations;
*this.deallocations += allocations.deallocations;
match result {
Poll::Ready(r) => {
let (duration, allocations, deallocations) = *this.cell.lock().unwrap();
let memory_usage = (*this.allocations + allocations)
.saturating_sub(*this.deallocations + deallocations);
Poll::Ready((r, *this.duration + duration, start + elapsed, memory_usage))
}
Poll::Pending => Poll::Pending,
}
}
}
2 changes: 1 addition & 1 deletion crates/turbo-tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#![feature(never_type)]

pub mod backend;
mod capture_future;
mod collectibles;
mod completion;
pub mod debug;
Expand Down Expand Up @@ -64,7 +65,6 @@ pub mod registry;
pub mod small_duration;
mod state;
pub mod task;
mod timed_future;
pub mod trace;
mod trait_ref;
pub mod util;
Expand Down
29 changes: 19 additions & 10 deletions crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ use nohash_hasher::BuildNoHashHasher;
use serde::{de::Visitor, Deserialize, Serialize};
use tokio::{runtime::Handle, select, task_local};
use tracing::{info_span, instrument, trace_span, Instrument, Level};
use turbo_tasks_malloc::TurboMalloc;

use crate::{
backend::{Backend, CellContent, PersistentTaskType, TaskExecutionSpec, TransientTaskType},
capture_future::{
CaptureFuture, {self},
},
event::{Event, EventListener},
id::{BackendJobId, FunctionId, TraitTypeId},
id_factory::IdFactory,
raw_vc::{CellId, RawVc},
registry,
timed_future::{
TimedFuture, {self},
},
trace::TraceRawVcs,
util::StaticOrArc,
Completion, ConcreteTaskInput, InvalidationReason, InvalidationReasonSet, SharedReference,
Expand Down Expand Up @@ -478,8 +479,9 @@ impl<B: Backend + 'static> TurboTasks<B> {
};

async {
let (result, duration, instant) =
TimedFuture::new(AssertUnwindSafe(future).catch_unwind()).await;
let (result, duration, instant, memory_usage) =
CaptureFuture::new(AssertUnwindSafe(future).catch_unwind())
.await;

let result = result.map_err(|any| match any.downcast::<String>() {
Ok(owned) => Some(Cow::Owned(*owned)),
Expand All @@ -491,7 +493,12 @@ impl<B: Backend + 'static> TurboTasks<B> {
this.backend.task_execution_result(task_id, result, &*this);
let stateful = this.finish_current_task_state();
this.backend.task_execution_completed(
task_id, duration, instant, stateful, &*this,
task_id,
duration,
instant,
memory_usage,
stateful,
&*this,
)
}
.instrument(span)
Expand Down Expand Up @@ -1413,16 +1420,18 @@ pub fn emit<T: VcValueTrait + Send>(collectible: Vc<T>) {

pub async fn spawn_blocking<T: Send + 'static>(func: impl FnOnce() -> T + Send + 'static) -> T {
let span = trace_span!("blocking operation").or_current();
let (r, d) = tokio::task::spawn_blocking(|| {
let (result, duration, alloc_info) = tokio::task::spawn_blocking(|| {
let _guard = span.entered();
let start = Instant::now();
let start_allocations = TurboMalloc::allocation_counters();
let r = func();
(r, start.elapsed())
(r, start.elapsed(), start_allocations.until_now())
})
.await
.unwrap();
timed_future::add_duration(d);
r
capture_future::add_duration(duration);
capture_future::add_allocation_info(alloc_info);
result
}

pub fn spawn_thread(func: impl FnOnce() + Send + 'static) {
Expand Down
Loading

0 comments on commit a31540b

Please sign in to comment.