Skip to content

Commit

Permalink
refactor memory tracking
Browse files Browse the repository at this point in the history
refactor memory for tracing
track memory for tasks
  • Loading branch information
sokra committed May 15, 2024
1 parent edd2118 commit 6f6ae57
Show file tree
Hide file tree
Showing 14 changed files with 220 additions and 94 deletions.
24 changes: 5 additions & 19 deletions crates/turbo-tasks-malloc/src/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::{
sync::atomic::{AtomicUsize, Ordering},
};

use crate::AllocationInfo;

static ALLOCATED: AtomicUsize = AtomicUsize::new(0);
const KB: usize = 1024;
/// When global counter is updates we will keep a thread-local buffer of this
Expand All @@ -13,23 +15,6 @@ const TARGET_BUFFER: usize = 100 * KB;
/// global counter.
const MAX_BUFFER: usize = 200 * KB;

#[derive(Default)]
pub struct AllocationInfo {
pub allocations: usize,
pub deallocations: usize,
pub allocation_count: usize,
pub deallocation_count: usize,
}

impl AllocationInfo {
pub fn is_empty(&self) -> bool {
self.allocations == 0
&& self.deallocations == 0
&& self.allocation_count == 0
&& self.deallocation_count == 0
}
}

#[derive(Default)]
struct ThreadLocalCounter {
/// Thread-local buffer of allocated bytes that have been added to the
Expand Down Expand Up @@ -69,6 +54,7 @@ impl ThreadLocalCounter {
ALLOCATED.fetch_sub(self.buffer, Ordering::Relaxed);
self.buffer = 0;
}
self.allocation_info = AllocationInfo::default();
}
}

Expand All @@ -80,8 +66,8 @@ pub fn get() -> usize {
ALLOCATED.load(Ordering::Relaxed)
}

pub fn pop_allocations() -> AllocationInfo {
with_local_counter(|local| std::mem::take(&mut local.allocation_info))
pub fn allocations() -> AllocationInfo {
with_local_counter(|local| local.allocation_info.clone())
}

fn with_local_counter<T>(f: impl FnOnce(&mut ThreadLocalCounter) -> T) -> T {
Expand Down
31 changes: 29 additions & 2 deletions crates/turbo-tasks-malloc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,33 @@ use std::alloc::{GlobalAlloc, Layout};

use self::counter::{add, flush, get, remove};

#[derive(Default, Clone)]
pub struct AllocationInfo {
pub allocations: usize,
pub deallocations: usize,
pub allocation_count: usize,
pub deallocation_count: usize,
}

impl AllocationInfo {
pub fn is_empty(&self) -> bool {
self.allocations == 0
&& self.deallocations == 0
&& self.allocation_count == 0
&& self.deallocation_count == 0
}

pub fn until_now(&self) -> Self {
let new = TurboMalloc::allocations();
Self {
allocations: new.allocations - self.allocations,
deallocations: new.deallocations - self.deallocations,
allocation_count: new.allocation_count - self.allocation_count,
deallocation_count: new.deallocation_count - self.deallocation_count,
}
}
}

/// Turbo's preferred global allocator. This is a new type instead of a type
/// alias because you can't use type aliases to instantiate unit types (E0423).
pub struct TurboMalloc;
Expand All @@ -17,8 +44,8 @@ impl TurboMalloc {
flush();
}

pub fn pop_allocations() -> self::counter::AllocationInfo {
self::counter::pop_allocations()
pub fn allocations() -> AllocationInfo {
self::counter::allocations()
}
}

Expand Down
3 changes: 2 additions & 1 deletion crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,12 @@ impl Backend for MemoryBackend {
task_id: TaskId,
duration: Duration,
instant: Instant,
memory_usage: usize,
stateful: bool,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> bool {
let reexecute = self.with_task(task_id, |task| {
task.execution_completed(duration, instant, stateful, self, turbo_tasks)
task.execution_completed(duration, instant, memory_usage, stateful, self, turbo_tasks)
});
if !reexecute {
self.run_gc(false, turbo_tasks);
Expand Down
1 change: 1 addition & 0 deletions crates/turbo-tasks-memory/src/memory_backend_with_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,7 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
task: TaskId,
duration: Duration,
_instant: Instant,
_memory_usage: usize,
_stateful: bool,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackendWithPersistedGraph<P>>,
) -> bool {
Expand Down
1 change: 1 addition & 0 deletions crates/turbo-tasks-memory/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,7 @@ impl Task {
&self,
duration: Duration,
instant: Instant,
_memory_usage: usize,
stateful: bool,
backend: &MemoryBackend,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
Expand Down
1 change: 1 addition & 0 deletions crates/turbo-tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
turbo-tasks-hash = { workspace = true }
turbo-tasks-macros = { workspace = true }
turbo-tasks-malloc = { workspace = true }

[dev-dependencies]
serde_test = "1.0.157"
Expand Down
1 change: 1 addition & 0 deletions crates/turbo-tasks/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ pub trait Backend: Sync + Send {
task: TaskId,
duration: Duration,
instant: Instant,
memory_usage: usize,
stateful: bool,
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) -> bool;
Expand Down
79 changes: 79 additions & 0 deletions crates/turbo-tasks/src/capture_future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
time::{Duration, Instant},
};

use pin_project_lite::pin_project;
use tokio::{task::futures::TaskLocalFuture, task_local};
use turbo_tasks_malloc::{AllocationInfo, TurboMalloc};

task_local! {
static EXTRA: Arc<Mutex<(Duration, usize, usize)>>;
}

pin_project! {
pub struct CaptureFuture<T, F: Future<Output = T>> {
cell: Arc<Mutex<(Duration, usize, usize)>>,
#[pin]
future: TaskLocalFuture<Arc<Mutex<(Duration, usize, usize)>>, F>,
duration: Duration,
allocations: usize,
deallocations: usize,
}
}

impl<T, F: Future<Output = T>> CaptureFuture<T, F> {
pub fn new(future: F) -> Self {
let cell = Arc::new(Mutex::new((Duration::ZERO, 0, 0)));
Self {
future: EXTRA.scope(cell.clone(), future),
cell,
duration: Duration::ZERO,
allocations: 0,
deallocations: 0,
}
}
}

pub fn add_duration(duration: Duration) {
EXTRA.with(|cell| cell.lock().unwrap().0 += duration);
}

pub fn add_allocation_info(alloc_info: AllocationInfo) {
EXTRA.with(|cell| {
let mut guard = cell.lock().unwrap();
guard.1 += alloc_info.allocations;
guard.2 += alloc_info.deallocations;
});
}

impl<T, F: Future<Output = T>> Future for CaptureFuture<T, F> {
type Output = (T, Duration, Instant, usize);

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let start = Instant::now();
let start_allocations = TurboMalloc::allocations();
let result = this.future.poll(cx);
let elapsed = start.elapsed();
let allocations = start_allocations.until_now();
*this.duration += elapsed;
*this.allocations += allocations.allocations;
*this.deallocations += allocations.deallocations;
match result {
Poll::Ready(r) => {
let (duration, allocations, deallocations) = *this.cell.lock().unwrap();
Poll::Ready((
r,
*this.duration + duration,
start + elapsed,
allocations.saturating_sub(deallocations),
))
}
Poll::Pending => Poll::Pending,
}
}
}
2 changes: 1 addition & 1 deletion crates/turbo-tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#![feature(never_type)]

pub mod backend;
mod capture_future;
mod collectibles;
mod completion;
pub mod debug;
Expand Down Expand Up @@ -64,7 +65,6 @@ pub mod registry;
pub mod small_duration;
mod state;
pub mod task;
mod timed_future;
pub mod trace;
mod trait_ref;
pub mod util;
Expand Down
29 changes: 19 additions & 10 deletions crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ use nohash_hasher::BuildNoHashHasher;
use serde::{de::Visitor, Deserialize, Serialize};
use tokio::{runtime::Handle, select, task_local};
use tracing::{info_span, instrument, trace_span, Instrument, Level};
use turbo_tasks_malloc::TurboMalloc;

use crate::{
backend::{Backend, CellContent, PersistentTaskType, TaskExecutionSpec, TransientTaskType},
capture_future::{
CaptureFuture, {self},
},
event::{Event, EventListener},
id::{BackendJobId, FunctionId, TraitTypeId},
id_factory::IdFactory,
raw_vc::{CellId, RawVc},
registry,
timed_future::{
TimedFuture, {self},
},
trace::TraceRawVcs,
util::StaticOrArc,
Completion, ConcreteTaskInput, InvalidationReason, InvalidationReasonSet, SharedReference,
Expand Down Expand Up @@ -478,8 +479,9 @@ impl<B: Backend + 'static> TurboTasks<B> {
};

async {
let (result, duration, instant) =
TimedFuture::new(AssertUnwindSafe(future).catch_unwind()).await;
let (result, duration, instant, memory_usage) =
CaptureFuture::new(AssertUnwindSafe(future).catch_unwind())
.await;

let result = result.map_err(|any| match any.downcast::<String>() {
Ok(owned) => Some(Cow::Owned(*owned)),
Expand All @@ -491,7 +493,12 @@ impl<B: Backend + 'static> TurboTasks<B> {
this.backend.task_execution_result(task_id, result, &*this);
let stateful = this.finish_current_task_state();
this.backend.task_execution_completed(
task_id, duration, instant, stateful, &*this,
task_id,
duration,
instant,
memory_usage,
stateful,
&*this,
)
}
.instrument(span)
Expand Down Expand Up @@ -1413,16 +1420,18 @@ pub fn emit<T: VcValueTrait + Send>(collectible: Vc<T>) {

pub async fn spawn_blocking<T: Send + 'static>(func: impl FnOnce() -> T + Send + 'static) -> T {
let span = trace_span!("blocking operation").or_current();
let (r, d) = tokio::task::spawn_blocking(|| {
let (result, duration, alloc_info) = tokio::task::spawn_blocking(|| {
let _guard = span.entered();
let start = Instant::now();
let start_allocations = TurboMalloc::allocations();
let r = func();
(r, start.elapsed())
(r, start.elapsed(), start_allocations.until_now())
})
.await
.unwrap();
timed_future::add_duration(d);
r
capture_future::add_duration(duration);
capture_future::add_allocation_info(alloc_info);
result
}

pub fn spawn_thread(func: impl FnOnce() + Send + 'static) {
Expand Down
58 changes: 0 additions & 58 deletions crates/turbo-tasks/src/timed_future.rs

This file was deleted.

Loading

0 comments on commit 6f6ae57

Please sign in to comment.