From 039faafe5abb1897ef6818fc2fefed457acc4f8c Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 5 Aug 2025 21:00:33 +0200 Subject: [PATCH] Turbopack: use block in place for db writes --- turbopack/crates/turbo-persistence/src/db.rs | 186 ++++++++++-------- .../src/parallel_scheduler.rs | 11 ++ .../crates/turbo-persistence/src/tests.rs | 7 + .../turbo-persistence/src/write_batch.rs | 9 +- .../src/database/turbo/parallel_scheduler.rs | 9 +- turbopack/crates/turbo-tasks/src/effect.rs | 31 +-- turbopack/crates/turbo-tasks/src/lib.rs | 4 +- turbopack/crates/turbo-tasks/src/spawn.rs | 11 +- 8 files changed, 151 insertions(+), 117 deletions(-) diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index b6c91e3e38804..9d95aee336cba 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -508,12 +508,15 @@ impl TurboPersistence { sst_filter.apply_filter(meta_file); } - for (_, file) in new_sst_files.iter() { - file.sync_all()?; - } - for (_, file) in new_blob_files.iter() { - file.sync_all()?; - } + self.parallel_scheduler.block_in_place(|| { + for (_, file) in new_sst_files.iter() { + file.sync_all()?; + } + for (_, file) in new_blob_files.iter() { + file.sync_all()?; + } + anyhow::Ok(()) + })?; let new_meta_info = new_meta_files .iter() @@ -566,86 +569,88 @@ impl TurboPersistence { inner.current_sequence_number = seq; } - if has_delete_file { - sst_seq_numbers_to_delete.sort_unstable(); - meta_seq_numbers_to_delete.sort_unstable(); - blob_seq_numbers_to_delete.sort_unstable(); - // Write *.del file, marking the selected files as to delete - let mut buf = Vec::with_capacity( - (sst_seq_numbers_to_delete.len() - + meta_seq_numbers_to_delete.len() - + blob_seq_numbers_to_delete.len()) - * size_of::(), - ); - for seq in sst_seq_numbers_to_delete.iter() { - buf.write_u32::(*seq)?; - } - for seq in meta_seq_numbers_to_delete.iter() { - buf.write_u32::(*seq)?; - } - for seq in blob_seq_numbers_to_delete.iter() { - buf.write_u32::(*seq)?; - } - let mut file = File::create(self.path.join(format!("{seq:08}.del")))?; - file.write_all(&buf)?; - file.sync_all()?; - } - - let mut current_file = OpenOptions::new() - .write(true) - .truncate(false) - .read(false) - .open(self.path.join("CURRENT"))?; - current_file.write_u32::(seq)?; - current_file.sync_all()?; - - for seq in sst_seq_numbers_to_delete.iter() { - fs::remove_file(self.path.join(format!("{seq:08}.sst")))?; - } - for seq in meta_seq_numbers_to_delete.iter() { - fs::remove_file(self.path.join(format!("{seq:08}.meta")))?; - } - for seq in blob_seq_numbers_to_delete.iter() { - fs::remove_file(self.path.join(format!("{seq:08}.blob")))?; - } - - { - let mut log = self.open_log()?; - writeln!(log, "Time {time}")?; - let span = time.until(Timestamp::now())?; - writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?; - for (seq, family, ssts, obsolete) in new_meta_info { - writeln!(log, "{seq:08} META family:{family}",)?; - for (seq, min, max, size) in ssts { - writeln!( - log, - " {seq:08} SST {min:016x}-{max:016x} {} MiB", - size / 1024 / 1024 - )?; + self.parallel_scheduler.block_in_place(|| { + if has_delete_file { + sst_seq_numbers_to_delete.sort_unstable(); + meta_seq_numbers_to_delete.sort_unstable(); + blob_seq_numbers_to_delete.sort_unstable(); + // Write *.del file, marking the selected files as to delete + let mut buf = Vec::with_capacity( + (sst_seq_numbers_to_delete.len() + + meta_seq_numbers_to_delete.len() + + blob_seq_numbers_to_delete.len()) + * size_of::(), + ); + for seq in sst_seq_numbers_to_delete.iter() { + buf.write_u32::(*seq)?; } - for seq in obsolete { - writeln!(log, " {seq:08} OBSOLETE SST")?; + for seq in meta_seq_numbers_to_delete.iter() { + buf.write_u32::(*seq)?; } + for seq in blob_seq_numbers_to_delete.iter() { + buf.write_u32::(*seq)?; + } + let mut file = File::create(self.path.join(format!("{seq:08}.del")))?; + file.write_all(&buf)?; + file.sync_all()?; } - new_sst_files.sort_unstable_by_key(|(seq, _)| *seq); - for (seq, _) in new_sst_files.iter() { - writeln!(log, "{seq:08} NEW SST")?; - } - new_blob_files.sort_unstable_by_key(|(seq, _)| *seq); - for (seq, _) in new_blob_files.iter() { - writeln!(log, "{seq:08} NEW BLOB")?; - } + + let mut current_file = OpenOptions::new() + .write(true) + .truncate(false) + .read(false) + .open(self.path.join("CURRENT"))?; + current_file.write_u32::(seq)?; + current_file.sync_all()?; + for seq in sst_seq_numbers_to_delete.iter() { - writeln!(log, "{seq:08} SST DELETED")?; + fs::remove_file(self.path.join(format!("{seq:08}.sst")))?; } for seq in meta_seq_numbers_to_delete.iter() { - writeln!(log, "{seq:08} META DELETED")?; + fs::remove_file(self.path.join(format!("{seq:08}.meta")))?; } for seq in blob_seq_numbers_to_delete.iter() { - writeln!(log, "{seq:08} BLOB DELETED")?; + fs::remove_file(self.path.join(format!("{seq:08}.blob")))?; } - } + { + let mut log = self.open_log()?; + writeln!(log, "Time {time}")?; + let span = time.until(Timestamp::now())?; + writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?; + for (seq, family, ssts, obsolete) in new_meta_info { + writeln!(log, "{seq:08} META family:{family}",)?; + for (seq, min, max, size) in ssts { + writeln!( + log, + " {seq:08} SST {min:016x}-{max:016x} {} MiB", + size / 1024 / 1024 + )?; + } + for seq in obsolete { + writeln!(log, " {seq:08} OBSOLETE SST")?; + } + } + new_sst_files.sort_unstable_by_key(|(seq, _)| *seq); + for (seq, _) in new_sst_files.iter() { + writeln!(log, "{seq:08} NEW SST")?; + } + new_blob_files.sort_unstable_by_key(|(seq, _)| *seq); + for (seq, _) in new_blob_files.iter() { + writeln!(log, "{seq:08} NEW BLOB")?; + } + for seq in sst_seq_numbers_to_delete.iter() { + writeln!(log, "{seq:08} SST DELETED")?; + } + for seq in meta_seq_numbers_to_delete.iter() { + writeln!(log, "{seq:08} META DELETED")?; + } + for seq in blob_seq_numbers_to_delete.iter() { + writeln!(log, "{seq:08} BLOB DELETED")?; + } + } + anyhow::Ok(()) + })?; Ok(()) } @@ -837,7 +842,7 @@ impl TurboPersistence { }); } - { + self.parallel_scheduler.block_in_place(|| { let metrics = compute_metrics(&ssts_with_ranges, 0..=u64::MAX); let guard = log_mutex.lock(); let mut log = self.open_log()?; @@ -859,7 +864,8 @@ impl TurboPersistence { } } drop(guard); - } + anyhow::Ok(()) + })?; // Later we will remove the merged files let sst_seq_numbers_to_delete = merge_jobs @@ -912,7 +918,8 @@ impl TurboPersistence { }); } - fn create_sst_file( + fn create_sst_file( + parallel_scheduler: &S, entries: &[LookupEntry], total_key_size: usize, total_value_size: usize, @@ -921,12 +928,14 @@ impl TurboPersistence { ) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)> { let _span = tracing::trace_span!("write merged sst file").entered(); - let (meta, file) = write_static_stored_file( - entries, - total_key_size, - total_value_size, - &path.join(format!("{seq:08}.sst")), - )?; + let (meta, file) = parallel_scheduler.block_in_place(|| { + write_static_stored_file( + entries, + total_key_size, + total_value_size, + &path.join(format!("{seq:08}.sst")), + ) + })?; Ok((seq, file, meta)) } @@ -993,6 +1002,7 @@ impl TurboPersistence { keys_written += entries.len() as u64; new_sst_files.push(create_sst_file( + &self.parallel_scheduler, &entries, selected_total_key_size, selected_total_value_size, @@ -1023,6 +1033,7 @@ impl TurboPersistence { keys_written += entries.len() as u64; new_sst_files.push(create_sst_file( + &self.parallel_scheduler, &entries, total_key_size, total_value_size, @@ -1046,6 +1057,7 @@ impl TurboPersistence { keys_written += part1.len() as u64; new_sst_files.push(create_sst_file( + &self.parallel_scheduler, part1, // We don't know the exact sizes so we estimate them last_entries_total_sizes.0 / 2, @@ -1056,6 +1068,7 @@ impl TurboPersistence { keys_written += part2.len() as u64; new_sst_files.push(create_sst_file( + &self.parallel_scheduler, part2, last_entries_total_sizes.0 / 2, last_entries_total_sizes.1 / 2, @@ -1126,7 +1139,8 @@ impl TurboPersistence { let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; let meta_file = { let _span = tracing::trace_span!("write meta file").entered(); - meta_file_builder.write(&self.path, seq)? + self.parallel_scheduler + .block_in_place(|| meta_file_builder.write(&self.path, seq))? }; Ok(PartialResultPerFamily { diff --git a/turbopack/crates/turbo-persistence/src/parallel_scheduler.rs b/turbopack/crates/turbo-persistence/src/parallel_scheduler.rs index 52a9d626090fc..1ec00c98b6e9b 100644 --- a/turbopack/crates/turbo-persistence/src/parallel_scheduler.rs +++ b/turbopack/crates/turbo-persistence/src/parallel_scheduler.rs @@ -1,4 +1,8 @@ pub trait ParallelScheduler: Clone + Sync + Send { + fn block_in_place(&self, f: impl FnOnce() -> R + Send) -> R + where + R: Send; + fn parallel_for_each(&self, items: &[T], f: impl Fn(&T) + Send + Sync) where T: Sync; @@ -55,6 +59,13 @@ pub trait ParallelScheduler: Clone + Sync + Send { pub struct SerialScheduler; impl ParallelScheduler for SerialScheduler { + fn block_in_place(&self, f: impl FnOnce() -> R + Send) -> R + where + R: Send, + { + f() + } + fn parallel_for_each(&self, items: &[T], f: impl Fn(&T) + Send + Sync) where T: Sync, diff --git a/turbopack/crates/turbo-persistence/src/tests.rs b/turbopack/crates/turbo-persistence/src/tests.rs index 6e0b42b92fe78..a52f7e9dd44b0 100644 --- a/turbopack/crates/turbo-persistence/src/tests.rs +++ b/turbopack/crates/turbo-persistence/src/tests.rs @@ -14,6 +14,13 @@ use crate::{ struct RayonParallelScheduler; impl ParallelScheduler for RayonParallelScheduler { + fn block_in_place(&self, f: impl FnOnce() -> R + Send) -> R + where + R: Send, + { + f() + } + fn parallel_for_each(&self, items: &[T], f: impl Fn(&T) + Send + Sync) where T: Sync, diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index 81a954d0ef18e..f6cbd44acc6fd 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -413,9 +413,12 @@ impl let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1; let path = self.db_path.join(format!("{seq:08}.sst")); - let (meta, file) = - write_static_stored_file(entries, total_key_size, total_value_size, &path) - .with_context(|| format!("Unable to write SST file {seq:08}.sst"))?; + let (meta, file) = self + .parallel_scheduler + .block_in_place(|| { + write_static_stored_file(entries, total_key_size, total_value_size, &path) + }) + .with_context(|| format!("Unable to write SST file {seq:08}.sst"))?; #[cfg(feature = "verify_sst_content")] { diff --git a/turbopack/crates/turbo-tasks-backend/src/database/turbo/parallel_scheduler.rs b/turbopack/crates/turbo-tasks-backend/src/database/turbo/parallel_scheduler.rs index dd78022c3640b..a8c11256115fc 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/turbo/parallel_scheduler.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/turbo/parallel_scheduler.rs @@ -1,10 +1,17 @@ use turbo_persistence::ParallelScheduler; -use turbo_tasks::parallel; +use turbo_tasks::{block_in_place, parallel}; #[derive(Clone, Copy, Default)] pub struct TurboTasksParallelScheduler; impl ParallelScheduler for TurboTasksParallelScheduler { + fn block_in_place(&self, f: impl FnOnce() -> R + Send) -> R + where + R: Send, + { + block_in_place(f) + } + fn parallel_for_each(&self, items: &[T], f: impl Fn(&T) + Send + Sync) where T: Sync, diff --git a/turbopack/crates/turbo-tasks/src/effect.rs b/turbopack/crates/turbo-tasks/src/effect.rs index 0b893b2a128ad..6373306853477 100644 --- a/turbopack/crates/turbo-tasks/src/effect.rs +++ b/turbopack/crates/turbo-tasks/src/effect.rs @@ -1,6 +1,5 @@ use std::{ any::{Any, TypeId}, - borrow::Cow, future::Future, mem::replace, panic, @@ -8,20 +7,20 @@ use std::{ sync::Arc, }; -use anyhow::{Result, anyhow}; +use anyhow::Result; use auto_hash_map::AutoSet; use futures::{StreamExt, TryStreamExt}; use parking_lot::Mutex; use rustc_hash::{FxHashMap, FxHashSet}; use tokio::task_local; -use tracing::{Instrument, Span}; +use tracing::Instrument; use crate::{ self as turbo_tasks, CollectiblesSource, NonLocalValue, ReadRef, ResolvedVc, TryJoinIterExt, debug::ValueDebugFormat, emit, event::{Event, EventListener}, - manager::turbo_tasks_future_scope, + spawn, trace::TraceRawVcs, util::SharedError, }; @@ -98,28 +97,10 @@ impl EffectInstance { listener.await; } State::NotStarted(EffectInner { future }) => { - let join_handle = tokio::spawn(ApplyEffectsContext::in_current_scope( - turbo_tasks_future_scope(turbo_tasks::turbo_tasks(), future) - .instrument(Span::current()), - )); + let join_handle = spawn(ApplyEffectsContext::in_current_scope(future)); let result = match join_handle.await { - Ok(Err(err)) => Err(SharedError::new(err)), - Err(err) => { - let any = err.into_panic(); - let panic = match any.downcast::() { - Ok(owned) => Some(Cow::Owned(*owned)), - Err(any) => match any.downcast::<&'static str>() { - Ok(str) => Some(Cow::Borrowed(*str)), - Err(_) => None, - }, - }; - Err(SharedError::new(if let Some(panic) = panic { - anyhow!("Task effect panicked: {panic}") - } else { - anyhow!("Task effect panicked") - })) - } - Ok(Ok(())) => Ok(()), + Err(err) => Err(SharedError::new(err)), + Ok(()) => Ok(()), }; let event = { let mut guard = self.inner.lock(); diff --git a/turbopack/crates/turbo-tasks/src/lib.rs b/turbopack/crates/turbo-tasks/src/lib.rs index c3aab4cfdad54..4a994d7d556a1 100644 --- a/turbopack/crates/turbo-tasks/src/lib.rs +++ b/turbopack/crates/turbo-tasks/src/lib.rs @@ -120,7 +120,9 @@ pub use read_ref::ReadRef; use rustc_hash::FxHasher; pub use serialization_invalidation::SerializationInvalidator; pub use shrink_to_fit::ShrinkToFit; -pub use spawn::{JoinHandle, block_for_future, spawn, spawn_blocking, spawn_thread}; +pub use spawn::{ + JoinHandle, block_for_future, block_in_place, spawn, spawn_blocking, spawn_thread, +}; pub use state::{State, TransientState}; pub use task::{SharedReference, TypedSharedReference, task_input::TaskInput}; pub use task_execution_reason::TaskExecutionReason; diff --git a/turbopack/crates/turbo-tasks/src/spawn.rs b/turbopack/crates/turbo-tasks/src/spawn.rs index 7a217194a0c9c..278b14fb000a4 100644 --- a/turbopack/crates/turbo-tasks/src/spawn.rs +++ b/turbopack/crates/turbo-tasks/src/spawn.rs @@ -8,7 +8,7 @@ use std::{ use anyhow::Result; use futures::{FutureExt, ready}; -use tokio::{runtime::Handle, task::block_in_place}; +use tokio::runtime::Handle; use tracing::{Instrument, Span, info_span}; use turbo_tasks_malloc::{AllocationInfo, TurboMalloc}; @@ -94,6 +94,15 @@ pub fn spawn_thread(func: impl FnOnce() + Send + 'static) { }); } +/// Tells the scheduler about blocking work happening in the current thread. +/// It will make sure to allocate extra threads for the pool. +pub fn block_in_place(f: impl FnOnce() -> R + Send) -> R +where + R: Send, +{ + tokio::task::block_in_place(f) +} + /// Blocking waits for a future to complete. This blocks the current thread potentially staling /// other concurrent futures (but not other concurrent tasks). Try to avoid this method infavor of /// awaiting the future instead.