Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 100 additions & 86 deletions turbopack/crates/turbo-persistence/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,15 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
sst_filter.apply_filter(meta_file);
}

for (_, file) in new_sst_files.iter() {
file.sync_all()?;
}
for (_, file) in new_blob_files.iter() {
file.sync_all()?;
}
self.parallel_scheduler.block_in_place(|| {
for (_, file) in new_sst_files.iter() {
file.sync_all()?;
}
for (_, file) in new_blob_files.iter() {
file.sync_all()?;
}
anyhow::Ok(())
})?;

let new_meta_info = new_meta_files
.iter()
Expand Down Expand Up @@ -566,86 +569,88 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
inner.current_sequence_number = seq;
}

if has_delete_file {
sst_seq_numbers_to_delete.sort_unstable();
meta_seq_numbers_to_delete.sort_unstable();
blob_seq_numbers_to_delete.sort_unstable();
// Write *.del file, marking the selected files as to delete
let mut buf = Vec::with_capacity(
(sst_seq_numbers_to_delete.len()
+ meta_seq_numbers_to_delete.len()
+ blob_seq_numbers_to_delete.len())
* size_of::<u32>(),
);
for seq in sst_seq_numbers_to_delete.iter() {
buf.write_u32::<BE>(*seq)?;
}
for seq in meta_seq_numbers_to_delete.iter() {
buf.write_u32::<BE>(*seq)?;
}
for seq in blob_seq_numbers_to_delete.iter() {
buf.write_u32::<BE>(*seq)?;
}
let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
file.write_all(&buf)?;
file.sync_all()?;
}

let mut current_file = OpenOptions::new()
.write(true)
.truncate(false)
.read(false)
.open(self.path.join("CURRENT"))?;
current_file.write_u32::<BE>(seq)?;
current_file.sync_all()?;

for seq in sst_seq_numbers_to_delete.iter() {
fs::remove_file(self.path.join(format!("{seq:08}.sst")))?;
}
for seq in meta_seq_numbers_to_delete.iter() {
fs::remove_file(self.path.join(format!("{seq:08}.meta")))?;
}
for seq in blob_seq_numbers_to_delete.iter() {
fs::remove_file(self.path.join(format!("{seq:08}.blob")))?;
}

{
let mut log = self.open_log()?;
writeln!(log, "Time {time}")?;
let span = time.until(Timestamp::now())?;
writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
for (seq, family, ssts, obsolete) in new_meta_info {
writeln!(log, "{seq:08} META family:{family}",)?;
for (seq, min, max, size) in ssts {
writeln!(
log,
" {seq:08} SST {min:016x}-{max:016x} {} MiB",
size / 1024 / 1024
)?;
self.parallel_scheduler.block_in_place(|| {
if has_delete_file {
sst_seq_numbers_to_delete.sort_unstable();
meta_seq_numbers_to_delete.sort_unstable();
blob_seq_numbers_to_delete.sort_unstable();
// Write *.del file, marking the selected files as to delete
let mut buf = Vec::with_capacity(
(sst_seq_numbers_to_delete.len()
+ meta_seq_numbers_to_delete.len()
+ blob_seq_numbers_to_delete.len())
* size_of::<u32>(),
);
for seq in sst_seq_numbers_to_delete.iter() {
buf.write_u32::<BE>(*seq)?;
}
for seq in obsolete {
writeln!(log, " {seq:08} OBSOLETE SST")?;
for seq in meta_seq_numbers_to_delete.iter() {
buf.write_u32::<BE>(*seq)?;
}
for seq in blob_seq_numbers_to_delete.iter() {
buf.write_u32::<BE>(*seq)?;
}
let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
file.write_all(&buf)?;
file.sync_all()?;
}
new_sst_files.sort_unstable_by_key(|(seq, _)| *seq);
for (seq, _) in new_sst_files.iter() {
writeln!(log, "{seq:08} NEW SST")?;
}
new_blob_files.sort_unstable_by_key(|(seq, _)| *seq);
for (seq, _) in new_blob_files.iter() {
writeln!(log, "{seq:08} NEW BLOB")?;
}

let mut current_file = OpenOptions::new()
.write(true)
.truncate(false)
.read(false)
.open(self.path.join("CURRENT"))?;
current_file.write_u32::<BE>(seq)?;
current_file.sync_all()?;

for seq in sst_seq_numbers_to_delete.iter() {
writeln!(log, "{seq:08} SST DELETED")?;
fs::remove_file(self.path.join(format!("{seq:08}.sst")))?;
}
for seq in meta_seq_numbers_to_delete.iter() {
writeln!(log, "{seq:08} META DELETED")?;
fs::remove_file(self.path.join(format!("{seq:08}.meta")))?;
}
for seq in blob_seq_numbers_to_delete.iter() {
writeln!(log, "{seq:08} BLOB DELETED")?;
fs::remove_file(self.path.join(format!("{seq:08}.blob")))?;
}
}

{
let mut log = self.open_log()?;
writeln!(log, "Time {time}")?;
let span = time.until(Timestamp::now())?;
writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
for (seq, family, ssts, obsolete) in new_meta_info {
writeln!(log, "{seq:08} META family:{family}",)?;
for (seq, min, max, size) in ssts {
writeln!(
log,
" {seq:08} SST {min:016x}-{max:016x} {} MiB",
size / 1024 / 1024
)?;
}
for seq in obsolete {
writeln!(log, " {seq:08} OBSOLETE SST")?;
}
}
new_sst_files.sort_unstable_by_key(|(seq, _)| *seq);
for (seq, _) in new_sst_files.iter() {
writeln!(log, "{seq:08} NEW SST")?;
}
new_blob_files.sort_unstable_by_key(|(seq, _)| *seq);
for (seq, _) in new_blob_files.iter() {
writeln!(log, "{seq:08} NEW BLOB")?;
}
for seq in sst_seq_numbers_to_delete.iter() {
writeln!(log, "{seq:08} SST DELETED")?;
}
for seq in meta_seq_numbers_to_delete.iter() {
writeln!(log, "{seq:08} META DELETED")?;
}
for seq in blob_seq_numbers_to_delete.iter() {
writeln!(log, "{seq:08} BLOB DELETED")?;
}
}
anyhow::Ok(())
})?;
Ok(())
}

Expand Down Expand Up @@ -837,7 +842,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
});
}

{
self.parallel_scheduler.block_in_place(|| {
let metrics = compute_metrics(&ssts_with_ranges, 0..=u64::MAX);
let guard = log_mutex.lock();
let mut log = self.open_log()?;
Expand All @@ -859,7 +864,8 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
}
}
drop(guard);
}
anyhow::Ok(())
})?;

// Later we will remove the merged files
let sst_seq_numbers_to_delete = merge_jobs
Expand Down Expand Up @@ -912,7 +918,8 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
});
}

fn create_sst_file(
fn create_sst_file<S: ParallelScheduler>(
parallel_scheduler: &S,
entries: &[LookupEntry],
total_key_size: usize,
total_value_size: usize,
Expand All @@ -921,12 +928,14 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)>
{
let _span = tracing::trace_span!("write merged sst file").entered();
let (meta, file) = write_static_stored_file(
entries,
total_key_size,
total_value_size,
&path.join(format!("{seq:08}.sst")),
)?;
let (meta, file) = parallel_scheduler.block_in_place(|| {
write_static_stored_file(
entries,
total_key_size,
total_value_size,
&path.join(format!("{seq:08}.sst")),
)
})?;
Ok((seq, file, meta))
}

Expand Down Expand Up @@ -993,6 +1002,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {

keys_written += entries.len() as u64;
new_sst_files.push(create_sst_file(
&self.parallel_scheduler,
&entries,
selected_total_key_size,
selected_total_value_size,
Expand Down Expand Up @@ -1023,6 +1033,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {

keys_written += entries.len() as u64;
new_sst_files.push(create_sst_file(
&self.parallel_scheduler,
&entries,
total_key_size,
total_value_size,
Expand All @@ -1046,6 +1057,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {

keys_written += part1.len() as u64;
new_sst_files.push(create_sst_file(
&self.parallel_scheduler,
part1,
// We don't know the exact sizes so we estimate them
last_entries_total_sizes.0 / 2,
Expand All @@ -1056,6 +1068,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {

keys_written += part2.len() as u64;
new_sst_files.push(create_sst_file(
&self.parallel_scheduler,
part2,
last_entries_total_sizes.0 / 2,
last_entries_total_sizes.1 / 2,
Expand Down Expand Up @@ -1126,7 +1139,8 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
let meta_file = {
let _span = tracing::trace_span!("write meta file").entered();
meta_file_builder.write(&self.path, seq)?
self.parallel_scheduler
.block_in_place(|| meta_file_builder.write(&self.path, seq))?
};

Ok(PartialResultPerFamily {
Expand Down
11 changes: 11 additions & 0 deletions turbopack/crates/turbo-persistence/src/parallel_scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
pub trait ParallelScheduler: Clone + Sync + Send {
fn block_in_place<R>(&self, f: impl FnOnce() -> R + Send) -> R
where
R: Send;

fn parallel_for_each<T>(&self, items: &[T], f: impl Fn(&T) + Send + Sync)
where
T: Sync;
Expand Down Expand Up @@ -55,6 +59,13 @@ pub trait ParallelScheduler: Clone + Sync + Send {
pub struct SerialScheduler;

impl ParallelScheduler for SerialScheduler {
fn block_in_place<R>(&self, f: impl FnOnce() -> R + Send) -> R
where
R: Send,
{
f()
}

fn parallel_for_each<T>(&self, items: &[T], f: impl Fn(&T) + Send + Sync)
where
T: Sync,
Expand Down
7 changes: 7 additions & 0 deletions turbopack/crates/turbo-persistence/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ use crate::{
struct RayonParallelScheduler;

impl ParallelScheduler for RayonParallelScheduler {
fn block_in_place<R>(&self, f: impl FnOnce() -> R + Send) -> R
where
R: Send,
{
f()
}

fn parallel_for_each<T>(&self, items: &[T], f: impl Fn(&T) + Send + Sync)
where
T: Sync,
Expand Down
9 changes: 6 additions & 3 deletions turbopack/crates/turbo-persistence/src/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,12 @@ impl<K: StoreKey + Send + Sync, S: ParallelScheduler, const FAMILIES: usize>
let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1;

let path = self.db_path.join(format!("{seq:08}.sst"));
let (meta, file) =
write_static_stored_file(entries, total_key_size, total_value_size, &path)
.with_context(|| format!("Unable to write SST file {seq:08}.sst"))?;
let (meta, file) = self
Copy link
Contributor

@vercel vercel bot Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The create_blob function contains multiple blocking I/O operations (File::create, write_all, flush) that should be wrapped with block_in_place for consistency with the pattern established in this changeset.

View Details
📝 Patch Details
diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs
index f6cbd44acc..d2a00f7885 100644
--- a/turbopack/crates/turbo-persistence/src/write_batch.rs
+++ b/turbopack/crates/turbo-persistence/src/write_batch.rs
@@ -394,10 +394,13 @@ impl<K: StoreKey + Send + Sync, S: ParallelScheduler, const FAMILIES: usize>
             .context("Compression of value for blob file failed")?;
 
         let file = self.db_path.join(format!("{seq:08}.blob"));
-        let mut file = File::create(&file).context("Unable to create blob file")?;
-        file.write_all(&buffer)
-            .context("Unable to write blob file")?;
-        file.flush().context("Unable to flush blob file")?;
+        let file = self.parallel_scheduler.block_in_place(|| {
+            let mut file = File::create(&file).context("Unable to create blob file")?;
+            file.write_all(&buffer)
+                .context("Unable to write blob file")?;
+            file.flush().context("Unable to flush blob file")?;
+            Ok(file)
+        })?;
         Ok((seq, file))
     }
 

Analysis

The create_blob function performs several blocking I/O operations:

  • File::create(&file) on line 397
  • file.write_all(&buffer) on line 398
  • file.flush() on line 400

These are all synchronous file system operations that can block the current thread, but they're not wrapped with self.parallel_scheduler.block_in_place(). This is inconsistent with the pattern established throughout this changeset where blocking I/O operations are being wrapped to properly inform the async scheduler.

This inconsistency could lead to issues in async contexts where the scheduler expects to be notified about blocking work so it can allocate additional threads to maintain throughput.


Recommendation

Wrap all the blocking I/O operations in the create_blob function with self.parallel_scheduler.block_in_place():

fn create_blob(&self, value: &[u8]) -> Result<(u32, File)> {
    let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
    let mut buffer = Vec::new();
    buffer.write_u32::<BE>(value.len() as u32)?;
    lz4::compress_to_vec(value, &mut buffer, ACC_LEVEL_DEFAULT)
        .context("Compression of value for blob file failed")?;

    let file = self.db_path.join(format!("{seq:08}.blob"));
    let file = self.parallel_scheduler.block_in_place(|| {
        let mut file = File::create(&file).context("Unable to create blob file")?;
        file.write_all(&buffer)
            .context("Unable to write blob file")?;
        file.flush().context("Unable to flush blob file")?;
        anyhow::Ok(file)
    })?;
    Ok((seq, file))
}

.parallel_scheduler
.block_in_place(|| {
write_static_stored_file(entries, total_key_size, total_value_size, &path)
})
.with_context(|| format!("Unable to write SST file {seq:08}.sst"))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file.sync_all() call in create_sst_file is a blocking I/O operation that should be wrapped with block_in_place for consistency with the pattern established in this changeset.

View Details
📝 Patch Details
diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs
index f6cbd44acc..6365e37cfc 100644
--- a/turbopack/crates/turbo-persistence/src/write_batch.rs
+++ b/turbopack/crates/turbo-persistence/src/write_batch.rs
@@ -434,7 +434,9 @@ impl<K: StoreKey + Send + Sync, S: ParallelScheduler, const FAMILIES: usize>
                 static_sorted_file_builder::Entry,
             };
 
-            file.sync_all()?;
+            self.parallel_scheduler.block_in_place(|| {
+                file.sync_all()
+            })?;
             let sst = StaticSortedFile::open(
                 &self.db_path,
                 StaticSortedFileMetaData {

Analysis

In the create_sst_file function, there's a blocking I/O operation file.sync_all()? on line 437 that's not wrapped with self.parallel_scheduler.block_in_place(). This is inconsistent with the pattern established throughout this changeset, where all blocking I/O operations are being wrapped with block_in_place to properly inform the async scheduler about blocking work.

The function already wraps the write_static_stored_file call with block_in_place (lines 416-420), but the subsequent sync_all() call is left unwrapped. This could cause issues in async contexts where the scheduler needs to be informed about blocking operations to properly manage thread allocation.

This is particularly important because sync_all() is a synchronous file system operation that forces all buffered data to be written to disk, which can be a relatively expensive blocking operation that should be scheduled appropriately.


Recommendation

Wrap the file.sync_all()? operation with self.parallel_scheduler.block_in_place():

self.parallel_scheduler.block_in_place(|| {
    file.sync_all()
})?;

This maintains consistency with the other blocking I/O operations in the same function and across the codebase.


#[cfg(feature = "verify_sst_content")]
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
use turbo_persistence::ParallelScheduler;
use turbo_tasks::parallel;
use turbo_tasks::{block_in_place, parallel};

#[derive(Clone, Copy, Default)]
pub struct TurboTasksParallelScheduler;

impl ParallelScheduler for TurboTasksParallelScheduler {
fn block_in_place<R>(&self, f: impl FnOnce() -> R + Send) -> R
where
R: Send,
{
block_in_place(f)
}

fn parallel_for_each<T>(&self, items: &[T], f: impl Fn(&T) + Send + Sync)
where
T: Sync,
Expand Down
Loading
Loading