Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions turbopack/crates/turbo-persistence-tools/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ fn main() -> Result<()> {
amqf_entries,
sst_size,
key_compression_dictionary_size,
value_compression_dictionary_size,
block_count,
} in meta_file.entries
{
Expand All @@ -45,15 +44,11 @@ fn main() -> Result<()> {
);
println!(" AMQF {amqf_entries} entries = {} KiB", amqf_size / 1024);
println!(
" {} KiB = {} kiB key compression dict + {} KiB value compression dict + \
{block_count} blocks (avg {} bytes/block)",
" {} KiB = {} kiB key compression dict + {block_count} blocks (avg {} \
bytes/block)",
sst_size / 1024,
key_compression_dictionary_size / 1024,
value_compression_dictionary_size / 1024,
(sst_size
- key_compression_dictionary_size as u64
- value_compression_dictionary_size as u64)
/ block_count as u64
(sst_size - key_compression_dictionary_size as u64) / block_count as u64
);
}
if !meta_file.obsolete_sst_files.is_empty() {
Expand Down
2 changes: 0 additions & 2 deletions turbopack/crates/turbo-persistence/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ A meta file can contain metadata about multiple SST files. The metadata is store
- foreach described SST file
- 4 bytes sequence number of the SST file
- 2 bytes key Compression Dictionary length
- 2 bytes value Compression Dictionary length
- 2 bytes block count
- 8 bytes min hash
- 8 bytes max hash
Expand All @@ -59,7 +58,6 @@ A meta file can contain metadata about multiple SST files. The metadata is store
The SST file contains only data without any header.

- serialized key Compression Dictionary
- serialized value Compression Dictionary
- foreach block
- 4 bytes uncompressed block length
- compressed data
Expand Down
6 changes: 3 additions & 3 deletions turbopack/crates/turbo-persistence/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ impl<K: StoreKey, const SIZE_SHIFT: usize> Collector<K, SIZE_SHIFT> {
self.entries.push(entry);
}

/// Sorts the entries and returns them along with the total key and value sizes. This doesn't
/// Sorts the entries and returns them along with the total key size. This doesn't
Copy link
Contributor

@vercel vercel bot Aug 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Collector::is_full() method uses total_value_size in its calculation but the sorted() method doesn't return it, creating inconsistent behavior between fullness detection and data usage.

View Details
📝 Patch Details
diff --git a/turbopack/crates/turbo-persistence/src/collector.rs b/turbopack/crates/turbo-persistence/src/collector.rs
index b955d6102b..3c51a200d6 100644
--- a/turbopack/crates/turbo-persistence/src/collector.rs
+++ b/turbopack/crates/turbo-persistence/src/collector.rs
@@ -92,11 +92,11 @@ impl<K: StoreKey, const SIZE_SHIFT: usize> Collector<K, SIZE_SHIFT> {
         self.entries.push(entry);
     }
 
-    /// Sorts the entries and returns them along with the total key size. This doesn't
+    /// Sorts the entries and returns them along with the total key size and total value size. This doesn't
     /// clear the entries.
-    pub fn sorted(&mut self) -> (&[CollectorEntry<K>], usize) {
+    pub fn sorted(&mut self) -> (&[CollectorEntry<K>], usize, usize) {
         self.entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
-        (&self.entries, self.total_key_size)
+        (&self.entries, self.total_key_size, self.total_value_size)
     }
 
     /// Clears the collector.
diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs
index 4426f8795a..68dfa64db6 100644
--- a/turbopack/crates/turbo-persistence/src/write_batch.rs
+++ b/turbopack/crates/turbo-persistence/src/write_batch.rs
@@ -407,9 +407,9 @@ impl<K: StoreKey + Send + Sync, S: ParallelScheduler, const FAMILIES: usize>
     fn create_sst_file(
         &self,
         family: u32,
-        collector_data: (&[CollectorEntry<K>], usize),
+        collector_data: (&[CollectorEntry<K>], usize, usize),
     ) -> Result<(u32, File)> {
-        let (entries, total_key_size) = collector_data;
+        let (entries, total_key_size, _total_value_size) = collector_data;
         let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
 
         let path = self.db_path.join(format!("{seq:08}.sst"));

Analysis

The Collector struct maintains both total_key_size and total_value_size (lines 15-16) and uses both in the is_full() method (lines 38-39) to determine when the collector should be flushed. However, the sorted() method (line 99) only returns total_key_size, not total_value_size.

This creates an inconsistency where the collector decides it's "full" based on both key and value sizes, but the calling code only receives the key size. This could lead to situations where the collector reports being full due to large values, but the caller receives a small key size and makes incorrect decisions about file splitting or resource allocation.


Recommendation

If value compression dictionary support is being removed completely, update the is_full() method to only consider total_key_size in the threshold check. Change line 38-39 to: self.total_key_size > DATA_THRESHOLD_PER_INITIAL_FILE >> SIZE_SHIFT. Alternatively, if value sizes are still relevant, update the sorted() method to return both sizes.

/// clear the entries.
pub fn sorted(&mut self) -> (&[CollectorEntry<K>], usize, usize) {
pub fn sorted(&mut self) -> (&[CollectorEntry<K>], usize) {
self.entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
(&self.entries, self.total_key_size, self.total_value_size)
(&self.entries, self.total_key_size)
}

/// Clears the collector.
Expand Down
34 changes: 9 additions & 25 deletions turbopack/crates/turbo-persistence/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,8 +898,6 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
amqf,
key_compression_dictionary_length: entry
.key_compression_dictionary_length(),
value_compression_dictionary_length: entry
.value_compression_dictionary_length(),
block_count: entry.block_count(),
size: entry.size(),
entries: 0,
Expand All @@ -914,7 +912,6 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
parallel_scheduler: &S,
entries: &[LookupEntry<'l>],
total_key_size: usize,
total_value_size: usize,
path: &Path,
seq: u32,
) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)>
Expand All @@ -924,7 +921,6 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
write_static_stored_file(
entries,
total_key_size,
total_value_size,
&path.join(format!("{seq:08}.sst")),
)
})?;
Expand Down Expand Up @@ -959,7 +955,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
let mut current: Option<LookupEntry<'_>> = None;
let mut entries = Vec::new();
let mut last_entries = Vec::new();
let mut last_entries_total_sizes = (0, 0);
let mut last_entries_total_key_size = 0;
for entry in iter {
let entry = entry?;

Expand All @@ -975,15 +971,10 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
> DATA_THRESHOLD_PER_COMPACTED_FILE
|| entries.len() >= MAX_ENTRIES_PER_COMPACTED_FILE
{
let (
selected_total_key_size,
selected_total_value_size,
) = last_entries_total_sizes;
let selected_total_key_size =
last_entries_total_key_size;
swap(&mut entries, &mut last_entries);
last_entries_total_sizes = (
total_key_size - key_size,
total_value_size - value_size,
);
last_entries_total_key_size = total_key_size - key_size;
total_key_size = key_size;
total_value_size = value_size;

Expand All @@ -997,7 +988,6 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
&self.parallel_scheduler,
&entries,
selected_total_key_size,
selected_total_value_size,
path,
seq,
)?);
Expand All @@ -1015,7 +1005,8 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
}
if let Some(entry) = current {
total_key_size += entry.key.len();
total_value_size += entry.value.uncompressed_size_in_sst();
// Obsolete as we no longer need total_value_size
// total_value_size += entry.value.uncompressed_size_in_sst();
entries.push(entry);
}

Expand All @@ -1028,7 +1019,6 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
&self.parallel_scheduler,
&entries,
total_key_size,
total_value_size,
path,
seq,
)?);
Expand All @@ -1039,8 +1029,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
if !last_entries.is_empty() {
last_entries.append(&mut entries);

last_entries_total_sizes.0 += total_key_size;
last_entries_total_sizes.1 += total_value_size;
last_entries_total_key_size += total_key_size;

let (part1, part2) = last_entries.split_at(last_entries.len() / 2);

Expand All @@ -1052,8 +1041,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
&self.parallel_scheduler,
part1,
// We don't know the exact sizes so we estimate them
last_entries_total_sizes.0 / 2,
last_entries_total_sizes.1 / 2,
last_entries_total_key_size / 2,
path,
seq1,
)?);
Expand All @@ -1062,8 +1050,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
new_sst_files.push(create_sst_file(
&self.parallel_scheduler,
part2,
last_entries_total_sizes.0 / 2,
last_entries_total_sizes.1 / 2,
last_entries_total_key_size / 2,
path,
seq2,
)?);
Expand Down Expand Up @@ -1263,8 +1250,6 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
amqf_entries: amqf.len(),
key_compression_dictionary_size: entry
.key_compression_dictionary_length(),
value_compression_dictionary_size: entry
.value_compression_dictionary_length(),
block_count: entry.block_count(),
}
})
Expand Down Expand Up @@ -1302,6 +1287,5 @@ pub struct MetaFileEntryInfo {
pub amqf_entries: usize,
pub sst_size: u64,
pub key_compression_dictionary_size: u16,
pub value_compression_dictionary_size: u16,
pub block_count: u16,
}
3 changes: 0 additions & 3 deletions turbopack/crates/turbo-persistence/src/lookup_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ pub enum LazyLookupValue<'l> {
Medium {
uncompressed_size: u32,
block: &'l [u8],
dictionary: &'l [u8],
},
}

Expand Down Expand Up @@ -79,11 +78,9 @@ impl Entry for LookupEntry<'_> {
LazyLookupValue::Medium {
uncompressed_size,
block,
dictionary,
} => EntryValue::MediumCompressed {
uncompressed_size: *uncompressed_size,
block,
dictionary,
},
}
}
Expand Down
5 changes: 0 additions & 5 deletions turbopack/crates/turbo-persistence/src/meta_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,6 @@ impl MetaEntry {
self.sst_data.key_compression_dictionary_length
}

pub fn value_compression_dictionary_length(&self) -> u16 {
self.sst_data.value_compression_dictionary_length
}

pub fn block_count(&self) -> u16 {
self.sst_data.block_count
}
Expand Down Expand Up @@ -222,7 +218,6 @@ impl MetaFile {
sst_data: StaticSortedFileMetaData {
sequence_number: file.read_u32::<BE>()?,
key_compression_dictionary_length: file.read_u16::<BE>()?,
value_compression_dictionary_length: file.read_u16::<BE>()?,
Copy link
Contributor

@vercel vercel bot Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meta file reading skips the value_compression_dictionary_length field that exists in existing files, causing all subsequent fields to be misaligned and corrupted.

View Details
📝 Patch Details
diff --git a/turbopack/crates/turbo-persistence/src/meta_file.rs b/turbopack/crates/turbo-persistence/src/meta_file.rs
index 3c0f1b3aa7..3ac01098f1 100644
--- a/turbopack/crates/turbo-persistence/src/meta_file.rs
+++ b/turbopack/crates/turbo-persistence/src/meta_file.rs
@@ -218,7 +218,12 @@ impl MetaFile {
                 sst_data: StaticSortedFileMetaData {
                     sequence_number: file.read_u32::<BE>()?,
                     key_compression_dictionary_length: file.read_u16::<BE>()?,
-                    block_count: file.read_u16::<BE>()?,
+                    // Read and discard the obsolete value_compression_dictionary_length field
+                    // for backward compatibility with existing meta files
+                    block_count: {
+                        let _unused_value_compression_dictionary_length = file.read_u16::<BE>()?;
+                        file.read_u16::<BE>()?
+                    },
                 },
                 family,
                 min_hash: file.read_u64::<BE>()?,

Analysis

The code removes reading of the value_compression_dictionary_length field from the meta file format (line 220-221), but existing meta files written with the previous format still contain this 2-byte field. This causes a format mismatch where:

  1. Old format: sequence_number (4 bytes) + key_compression_dictionary_length (2 bytes) + value_compression_dictionary_length (2 bytes) + block_count (2 bytes) + ...
  2. New format: sequence_number (4 bytes) + key_compression_dictionary_length (2 bytes) + block_count (2 bytes) + ...

When reading existing meta files, the new code will interpret the value_compression_dictionary_length bytes as part of the block_count, causing all subsequent fields (min_hash, max_hash, etc.) to be read from wrong offsets, leading to corrupted metadata and potential crashes or data corruption.


Recommendation

Either implement a versioned meta file format that can handle both old and new formats, or add migration logic to read and skip the value_compression_dictionary_length field when present in existing files. A simple fix would be to read and discard the field: let _unused = file.read_u16::<BE>()?; after reading key_compression_dictionary_length.

block_count: file.read_u16::<BE>()?,
},
family,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ impl<'a> MetaFileBuilder<'a> {
for (sequence_number, sst) in &self.entries {
file.write_u32::<BE>(*sequence_number)?;
file.write_u16::<BE>(sst.key_compression_dictionary_length)?;
file.write_u16::<BE>(sst.value_compression_dictionary_length)?;
file.write_u16::<BE>(sst.block_count)?;
file.write_u64::<BE>(sst.min_hash)?;
file.write_u64::<BE>(sst.max_hash)?;
Expand Down
32 changes: 11 additions & 21 deletions turbopack/crates/turbo-persistence/src/static_sorted_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ pub struct StaticSortedFileMetaData {
pub sequence_number: u32,
/// The length of the key compression dictionary.
pub key_compression_dictionary_length: u16,
/// The length of the value compression dictionary.
pub value_compression_dictionary_length: u16,
/// The number of blocks in the SST file.
pub block_count: u16,
}
Expand All @@ -79,21 +77,14 @@ impl StaticSortedFileMetaData {

pub fn blocks_start(&self) -> usize {
let k: usize = self.key_compression_dictionary_length.into();
let v: usize = self.value_compression_dictionary_length.into();
k + v
k
}

pub fn key_compression_dictionary_range(&self) -> Range<usize> {
let start = 0;
let end: usize = self.key_compression_dictionary_length.into();
start..end
}

pub fn value_compression_dictionary_range(&self) -> Range<usize> {
let start = self.key_compression_dictionary_length as usize;
let end = start + self.value_compression_dictionary_length as usize;
start..end
}
}

/// A memory mapped SST file.
Expand Down Expand Up @@ -310,7 +301,7 @@ impl StaticSortedFile {
match value_block_cache.get_value_or_guard(&(self.meta.sequence_number, block), None) {
GuardResult::Value(block) => block,
GuardResult::Guard(guard) => {
let block = self.read_value_block(block)?;
let block = self.read_small_value_block(block)?;
let _ = guard.insert(block.clone());
block
}
Expand All @@ -323,33 +314,34 @@ impl StaticSortedFile {
fn read_key_block(&self, block_index: u16) -> Result<ArcSlice<u8>> {
self.read_block(
block_index,
&self.mmap[self.meta.key_compression_dictionary_range()],
Some(&self.mmap[self.meta.key_compression_dictionary_range()]),
false,
)
}

/// Reads a value block from the file.
fn read_small_value_block(&self, block_index: u16) -> Result<ArcSlice<u8>> {
self.read_block(block_index, None, false)
}

/// Reads a value block from the file.
fn read_value_block(&self, block_index: u16) -> Result<ArcSlice<u8>> {
self.read_block(
block_index,
&self.mmap[self.meta.value_compression_dictionary_range()],
false,
)
self.read_block(block_index, None, true)
}

/// Reads a block from the file.
fn read_block(
&self,
block_index: u16,
compression_dictionary: &[u8],
compression_dictionary: Option<&[u8]>,
long_term: bool,
) -> Result<ArcSlice<u8>> {
let (uncompressed_length, block) = self.get_compressed_block(block_index)?;

let buffer = decompress_into_arc(
uncompressed_length,
block,
Some(compression_dictionary),
compression_dictionary,
long_term,
)?;
Ok(ArcSlice::from(buffer))
Expand Down Expand Up @@ -496,8 +488,6 @@ impl<'l> StaticSortedFileIter<'l> {
LazyLookupValue::Medium {
uncompressed_size,
block,
dictionary: &self.this.mmap
[self.this.meta.value_compression_dictionary_range()],
}
} else {
let value = self
Expand Down
Loading
Loading