Skip to content

Commit 7cc6c22

Browse files
authored
Turbopack: add tracing to turbo-persistence (#78777)
### What? Add support for tracing and add a few trace spans
1 parent 03a65fd commit 7cc6c22

File tree

5 files changed

+31
-0
lines changed

5 files changed

+31
-0
lines changed

turbopack/crates/turbo-persistence/src/db.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use lzzzz::lz4::decompress;
1818
use memmap2::Mmap;
1919
use parking_lot::{Mutex, RwLock};
2020
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
21+
use tracing::Span;
2122

2223
use crate::{
2324
arc_slice::ArcSlice,
@@ -548,6 +549,7 @@ impl TurboPersistence {
548549
max_merge_sequence: usize,
549550
max_merge_size: usize,
550551
) -> Result<()> {
552+
let _span = tracing::info_span!("compact database").entered();
551553
if self
552554
.active_write_operation
553555
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
@@ -653,11 +655,13 @@ impl TurboPersistence {
653655
let path = &self.path;
654656

655657
let log_mutex = Mutex::new(());
658+
let span = Span::current();
656659
let result = sst_by_family
657660
.into_par_iter()
658661
.with_min_len(1)
659662
.enumerate()
660663
.map(|(family, ssts_with_ranges)| {
664+
let _span = span.clone().entered();
661665
let coverage = total_coverage(&ssts_with_ranges, (0, u64::MAX));
662666
if coverage <= max_coverage {
663667
return Ok((Vec::new(), Vec::new()));
@@ -710,10 +714,12 @@ impl TurboPersistence {
710714
.collect::<Vec<_>>();
711715

712716
// Merge SST files
717+
let span = tracing::trace_span!("merge files");
713718
let merge_result = merge_jobs
714719
.into_par_iter()
715720
.with_min_len(1)
716721
.map(|indicies| {
722+
let _span = span.clone().entered();
717723
fn create_sst_file(
718724
family: u32,
719725
entries: &[LookupEntry],
@@ -722,6 +728,7 @@ impl TurboPersistence {
722728
path: &Path,
723729
seq: u32,
724730
) -> Result<(u32, File)> {
731+
let _span = tracing::trace_span!("write merged sst file").entered();
725732
let builder = StaticSortedFileBuilder::new(
726733
family,
727734
entries,
@@ -865,10 +872,12 @@ impl TurboPersistence {
865872
.collect::<Vec<_>>();
866873

867874
// Move SST files
875+
let span = tracing::trace_span!("move files");
868876
let mut new_sst_files = move_jobs
869877
.into_par_iter()
870878
.with_min_len(1)
871879
.map(|(index, seq)| {
880+
let _span = span.clone().entered();
872881
let index = ssts_with_ranges[index].index;
873882
let sst = &static_sorted_files[index];
874883
let src_path = self.path.join(format!("{:08}.sst", sst.sequence_number()));

turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ impl StaticSortedFileBuilder {
103103
}
104104

105105
/// Computes a AQMF from the keys of all entries.
106+
#[tracing::instrument(level = "trace", skip_all)]
106107
fn compute_aqmf<E: Entry>(&mut self, entries: &[E]) {
107108
let mut filter = qfilter::Filter::new(entries.len() as u64, AQMF_FALSE_POSITIVE_RATE)
108109
// This won't fail as we limit the number of entries per SST file
@@ -117,6 +118,7 @@ impl StaticSortedFileBuilder {
117118
}
118119

119120
/// Computes compression dictionaries from keys and values of all entries
121+
#[tracing::instrument(level = "trace", skip_all)]
120122
fn compute_compression_dictionary<E: Entry>(
121123
&mut self,
122124
entries: &[E],
@@ -202,6 +204,7 @@ impl StaticSortedFileBuilder {
202204
}
203205

204206
/// Compute index, key and value blocks.
207+
#[tracing::instrument(level = "trace", skip_all)]
205208
fn compute_blocks<E: Entry>(&mut self, entries: &[E]) {
206209
// TODO implement multi level index
207210
// TODO place key and value block near to each other
@@ -352,16 +355,19 @@ impl StaticSortedFileBuilder {
352355
}
353356

354357
/// Compresses an index or key block.
358+
#[tracing::instrument(level = "trace", skip_all)]
355359
fn compress_key_block(&self, block: &[u8]) -> (u32, Vec<u8>) {
356360
self.compress_block(block, &self.key_compression_dictionary)
357361
}
358362

359363
/// Compresses a value block.
364+
#[tracing::instrument(level = "trace", skip_all)]
360365
fn compress_value_block(&self, block: &[u8]) -> (u32, Vec<u8>) {
361366
self.compress_block(block, &self.value_compression_dictionary)
362367
}
363368

364369
/// Writes the SST file.
370+
#[tracing::instrument(level = "trace", skip_all)]
365371
pub fn write(&self, file: &Path) -> io::Result<File> {
366372
let mut file = BufWriter::new(File::create(file)?);
367373
// magic number and version

turbopack/crates/turbo-persistence/src/write_batch.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
138138
Ok(collector)
139139
}
140140

141+
#[tracing::instrument(level = "trace", skip(self, collector))]
141142
fn flush_thread_local_collector(
142143
&self,
143144
family: u32,
@@ -235,6 +236,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
235236
///
236237
/// Caller must ensure that no concurrent put or delete operation is happening on the flushed
237238
/// family.
239+
#[tracing::instrument(level = "trace", skip(self))]
238240
pub unsafe fn flush(&self, family: u32) -> Result<()> {
239241
// Flush the thread local collectors to the global collector.
240242
let mut collectors = Vec::new();
@@ -290,12 +292,14 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
290292

291293
/// Finishes the write batch by returning the new sequence number and the new SST files. This
292294
/// writes all outstanding thread local data to disk.
295+
#[tracing::instrument(level = "trace", skip(self))]
293296
pub(crate) fn finish(&mut self) -> Result<FinishResult> {
294297
let mut new_blob_files = Vec::new();
295298
let shared_error = Mutex::new(Ok(()));
296299

297300
// First, we flush all thread local collectors to the global collectors.
298301
scope(|scope| {
302+
let _span = tracing::trace_span!("flush thread local collectors").entered();
299303
let mut collectors = [const { Vec::new() }; FAMILIES];
300304
for cell in self.thread_locals.iter_mut() {
301305
let state = cell.get_mut();
@@ -312,7 +316,9 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
312316
for mut collector in thread_local_collectors {
313317
let this = &self;
314318
let shared_error = &shared_error;
319+
let span = Span::current();
315320
scope.spawn(move |_| {
321+
let _span = span.entered();
316322
if let Err(err) =
317323
this.flush_thread_local_collector(family as u32, &mut collector)
318324
{
@@ -324,13 +330,16 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
324330
}
325331
});
326332

333+
let _span = tracing::trace_span!("flush collectors").entered();
334+
327335
// Now we reduce the global collectors in parallel
328336
let mut new_sst_files = take(self.new_sst_files.get_mut());
329337
let shared_new_sst_files = Mutex::new(&mut new_sst_files);
330338

331339
let new_collectors = [(); FAMILIES]
332340
.map(|_| Mutex::new(GlobalCollectorState::Unsharded(self.get_new_collector())));
333341
let collectors = replace(&mut self.collectors, new_collectors);
342+
let span = Span::current();
334343
collectors
335344
.into_par_iter()
336345
.enumerate()
@@ -348,6 +357,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
348357
}
349358
})
350359
.try_for_each(|(family, mut collector)| {
360+
let _span = span.clone().entered();
351361
let family = family as u32;
352362
if !collector.is_empty() {
353363
let sst = self.create_sst_file(family, collector.sorted())?;
@@ -370,6 +380,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
370380

371381
/// Creates a new blob file with the given value.
372382
/// Returns a tuple of (sequence number, file).
383+
#[tracing::instrument(level = "trace", skip(self, value), fields(value_len = value.len()))]
373384
fn create_blob(&self, value: &[u8]) -> Result<(u32, File)> {
374385
let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
375386
let mut buffer = Vec::new();
@@ -387,6 +398,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
387398

388399
/// Creates a new SST file with the given collector data.
389400
/// Returns a tuple of (sequence number, file).
401+
#[tracing::instrument(level = "trace", skip(self, collector_data))]
390402
fn create_sst_file(
391403
&self,
392404
family: u32,

turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,9 +184,11 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
184184
{
185185
let _span = tracing::trace_span!("update task data").entered();
186186
process_task_data(snapshots, Some(batch))?;
187+
let span = tracing::trace_span!("flush task data").entered();
187188
[KeySpace::TaskMeta, KeySpace::TaskData]
188189
.into_par_iter()
189190
.try_for_each(|key_space| {
191+
let _span = span.clone().entered();
190192
// Safety: We already finished all processing of the task data and task
191193
// meta
192194
unsafe { batch.flush(key_space) }
@@ -208,6 +210,7 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
208210
.into_par_iter()
209211
.with_max_len(1)
210212
.map(|updates| {
213+
let _span = _span.clone().entered();
211214
let mut max_task_id = 0;
212215

213216
let mut task_type_bytes = Vec::new();

turbopack/crates/turbopack-trace-utils/src/tracing_presets.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ pub static TRACING_TURBO_TASKS_TARGETS: Lazy<Vec<&str>> = Lazy::new(|| {
7171
"turbo_tasks_hash=trace",
7272
"turbo_tasks_memory=trace",
7373
"turbo_tasks_backend=trace",
74+
"turbo_persistence=trace",
7475
],
7576
]
7677
.concat()

0 commit comments

Comments
 (0)