Skip to content

Commit

Permalink
Store aggregate read/execute count statistics (vercel/turborepo#8286)
Browse files Browse the repository at this point in the history
### Why?

I want to determine percent "cache hit" rates for tasks. Tasks with very
low task hit rates should likely have their annotations removed.

Eventually, we might be able to use this information in a more automated
way, by leaving the annotation in, but skipping the caching for
low-cache-hit tasks.

### What?

This implementation only logs persistent tasks, which should compromise
all or the majority of tasks we care about for memory usage.

The implementation should bail out quickly if caching is disabled, so it
should be okay to leave in release builds, which is important for making
it easy to gather statistics from willing users.

### Testing

Run included unit tests!

This is used as part of
canary...bgw/cache-hit-stats
  • Loading branch information
bgw authored Jul 2, 2024
1 parent 22efbbb commit f421658
Show file tree
Hide file tree
Showing 6 changed files with 474 additions and 38 deletions.
4 changes: 3 additions & 1 deletion crates/turbo-tasks-memory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ parking_lot = { workspace = true }
priority-queue = "1.3.0"
ref-cast = "1.0.20"
rustc-hash = { workspace = true }
serde = { workspace = true }
smallvec = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
Expand All @@ -38,8 +39,9 @@ criterion = { workspace = true, features = ["async_tokio"] }
lazy_static = { workspace = true }
loom = "0.7.2"
rand = { workspace = true, features = ["small_rng"] }
regex = { workspace = true }
rstest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["full"] }
turbo-tasks-testing = { workspace = true }

Expand Down
2 changes: 2 additions & 0 deletions crates/turbo-tasks-memory/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ mod memory_backend;
mod memory_backend_with_pg;
mod output;
mod task;
mod task_statistics;

pub use memory_backend::MemoryBackend;
pub use memory_backend_with_pg::MemoryBackendWithPersistedGraph;
pub use task_statistics::{TaskStatistics, TaskStatisticsApi};
52 changes: 52 additions & 0 deletions crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::{
gc::{GcQueue, PERCENTAGE_IDLE_TARGET_MEMORY, PERCENTAGE_TARGET_MEMORY},
output::Output,
task::{Task, DEPENDENCIES_TO_TRACK},
task_statistics::TaskStatisticsApi,
};

fn prehash_task_type(task_type: PersistentTaskType) -> PreHashed<PersistentTaskType> {
Expand All @@ -49,6 +50,7 @@ pub struct MemoryBackend {
memory_limit: usize,
gc_queue: Option<GcQueue>,
idle_gc_active: AtomicBool,
task_statistics: TaskStatisticsApi,
}

impl Default for MemoryBackend {
Expand All @@ -71,6 +73,7 @@ impl MemoryBackend {
memory_limit,
gc_queue: (memory_limit != usize::MAX).then(GcQueue::new),
idle_gc_active: AtomicBool::new(false),
task_statistics: TaskStatisticsApi::default(),
}
}

Expand Down Expand Up @@ -231,6 +234,10 @@ impl MemoryBackend {
});
}
}

pub fn task_statistics(&self) -> &TaskStatisticsApi {
&self.task_statistics
}
}

impl Backend for MemoryBackend {
Expand Down Expand Up @@ -529,8 +536,53 @@ impl Backend for MemoryBackend {
self.lookup_and_connect_task(parent_task, &self.task_cache, &task_type, turbo_tasks)
{
// fast pass without creating a new task
self.task_statistics().map(|stats| match &*task_type {
PersistentTaskType::ResolveNative(function_id, ..)
| PersistentTaskType::Native(function_id, ..) => {
stats.increment_cache_hit(*function_id);
}
PersistentTaskType::ResolveTrait(trait_type, name, inputs) => {
// HACK: Resolve the first argument (`self`) in order to attribute the cache hit
// to the concrete trait implementation, rather than the dynamic trait method.
// This ensures cache hits and misses are both attributed to the same thing.
//
// Because this task already resolved, in most cases `self` should either be
// resolved, or already in the process of being resolved.
//
// However, `self` could become unloaded due to cache eviction, and this might
// trigger an otherwise unnecessary re-evalutation.
//
// This is a potentially okay trade-off as long as we don't log statistics by
// default. The alternative would be to store function ids on completed
// ResolveTrait tasks.
let trait_type = *trait_type;
let name = name.clone();
let this = inputs
.first()
.cloned()
.expect("No arguments for trait call");
let stats = Arc::clone(stats);
turbo_tasks.run_once(Box::pin(async move {
let function_id =
PersistentTaskType::resolve_trait_method(trait_type, name, this)
.await?;
stats.increment_cache_hit(function_id);
Ok(())
}));
}
});
task
} else {
self.task_statistics().map(|stats| match &*task_type {
PersistentTaskType::Native(function_id, ..) => {
stats.increment_cache_miss(*function_id);
}
PersistentTaskType::ResolveTrait(..) | PersistentTaskType::ResolveNative(..) => {
// these types re-execute themselves as `Native` after
// resolving their arguments, skip counting their
// executions here to avoid double-counting
}
});
// It's important to avoid overallocating memory as this will go into the task
// cache and stay there forever. We can to be as small as possible.
let (task_type_hash, mut task_type) = PreHashed::into_parts(task_type);
Expand Down
86 changes: 86 additions & 0 deletions crates/turbo-tasks-memory/src/task_statistics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use std::{
hash::BuildHasherDefault,
sync::{Arc, OnceLock},
};

use dashmap::DashMap;
use rustc_hash::FxHasher;
use serde::{ser::SerializeMap, Serialize, Serializer};
use turbo_tasks::{registry, FunctionId};

/// An API for optionally enabling, updating, and reading aggregated statistics.
#[derive(Default)]
pub struct TaskStatisticsApi {
inner: OnceLock<Arc<TaskStatistics>>,
}

impl TaskStatisticsApi {
pub fn enable(&self) -> &Arc<TaskStatistics> {
self.inner.get_or_init(|| {
Arc::new(TaskStatistics {
inner: DashMap::with_hasher(Default::default()),
})
})
}

pub fn is_enabled(&self) -> bool {
self.inner.get().is_some()
}

// Calls `func` if statistics have been enabled (via
// [`TaskStatisticsApi::enable`]).
pub fn map<T>(&self, func: impl FnOnce(&Arc<TaskStatistics>) -> T) -> Option<T> {
self.get().map(func)
}

// Calls `func` if statistics have been enabled (via
// [`TaskStatisticsApi::enable`]).
pub fn get(&self) -> Option<&Arc<TaskStatistics>> {
self.inner.get()
}
}

/// A type representing the enabled state of [`TaskStatisticsApi`]. Implements
/// [`serde::Serialize`].
pub struct TaskStatistics {
inner: DashMap<FunctionId, TaskFunctionStatistics, BuildHasherDefault<FxHasher>>,
}

impl TaskStatistics {
pub(crate) fn increment_cache_hit(&self, function_id: FunctionId) {
self.with_task_type_statistics(function_id, |stats| stats.cache_hit += 1)
}

pub(crate) fn increment_cache_miss(&self, function_id: FunctionId) {
self.with_task_type_statistics(function_id, |stats| stats.cache_miss += 1)
}

fn with_task_type_statistics(
&self,
task_function_id: FunctionId,
func: impl Fn(&mut TaskFunctionStatistics),
) {
func(self.inner.entry(task_function_id).or_default().value_mut())
}
}

/// Statistics for an individual function.
#[derive(Default, Serialize)]
struct TaskFunctionStatistics {
cache_hit: u32,
cache_miss: u32,
}

impl Serialize for TaskStatistics {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(Some(self.inner.len()))?;
for entry in &self.inner {
let key = registry::get_function_global_name(*entry.key());
map.serialize_entry(key, entry.value())?;
}
map.end()
}
}
Loading

0 comments on commit f421658

Please sign in to comment.