Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
05e5753
Turbopack: increase time before persisting
sokra Aug 6, 2025
da38be4
tests need to run in multi-thread pool
sokra Aug 2, 2025
12b5d31
fix hit count tracking
sokra Aug 2, 2025
fa7618f
Turbopack: avoid using rayon in favor of tokio tasks
sokra Jul 31, 2025
0c3d99a
fixup
sokra Jul 31, 2025
a73266d
fix result lifetime
sokra Aug 1, 2025
651f7d0
add tests
sokra Aug 1, 2025
abdd173
make turbo_tasks context optional
sokra Aug 1, 2025
df08cc4
fixup tokio context
sokra Aug 1, 2025
cf92846
clippy
sokra Aug 1, 2025
166811e
clippy
sokra Aug 2, 2025
9881fa0
rename methods for clippy
sokra Aug 2, 2025
9481a43
crash eariler without memory corruption when not in multi-thread pool
sokra Aug 2, 2025
608346b
run node-file-trace tests multi threaded
sokra Aug 3, 2025
7dffaec
fix test name
sokra Aug 4, 2025
f8a6064
Turbopack: show timing event for database compaction
sokra Aug 5, 2025
38539c7
avoid initial compaction
sokra Aug 5, 2025
473441b
fixup timing event for compact
sokra Aug 5, 2025
fd463db
Update messages
sokra Aug 5, 2025
ad91351
Turbopack: use block in place for db writes
sokra Aug 5, 2025
8388df2
fix initial duplication set
sokra Aug 5, 2025
64264b0
improve compaction config
sokra Aug 5, 2025
4662966
update test case
sokra Aug 6, 2025
636651d
Turbopack: parallel drop data before shutdown
sokra Jul 30, 2025
66f570c
fixup
sokra Aug 1, 2025
7749a9e
Turbopack: Decompress medium values lazily to reduce memory during co…
sokra Aug 4, 2025
7ac1de8
clippy
sokra Aug 4, 2025
775824e
fixup
sokra Aug 6, 2025
c1dcb69
refactor (de)compression code
sokra Aug 6, 2025
994ebac
fixup
sokra Aug 6, 2025
847dcee
Turbopack: bigger small value blocks
sokra Aug 5, 2025
cb53ea3
Turbopack: remove value compression dictionary
sokra Aug 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/next/src/server/dev/hot-reloader-turbopack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ export async function createHotReloaderTurbopack(
}
)
backgroundLogCompilationEvents(project, {
eventTypes: ['StartupCacheInvalidationEvent'],
eventTypes: ['StartupCacheInvalidationEvent', 'TimingEvent'],
})
setBundlerFindSourceMapImplementation(
getSourceMapFromTurbopack.bind(null, project, projectPath)
Expand Down
15 changes: 5 additions & 10 deletions turbopack/crates/turbo-persistence-tools/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::path::PathBuf;

use anyhow::{Context, Result, bail};
use turbo_persistence::{MetaFileEntryInfo, TurboPersistence};
use turbo_persistence::{MetaFileEntryInfo, SerialScheduler, TurboPersistence};

fn main() -> Result<()> {
// Get CLI argument
Expand All @@ -16,7 +16,7 @@ fn main() -> Result<()> {
bail!("The provided path does not exist: {}", path.display());
}

let db = TurboPersistence::open_read_only(path)?;
let db: TurboPersistence<SerialScheduler> = TurboPersistence::open_read_only(path)?;
let meta_info = db
.meta_info()
.context("Failed to retrieve meta information")?;
Expand All @@ -35,7 +35,6 @@ fn main() -> Result<()> {
amqf_entries,
sst_size,
key_compression_dictionary_size,
value_compression_dictionary_size,
block_count,
} in meta_file.entries
{
Expand All @@ -45,15 +44,11 @@ fn main() -> Result<()> {
);
println!(" AMQF {amqf_entries} entries = {} KiB", amqf_size / 1024);
println!(
" {} KiB = {} kiB key compression dict + {} KiB value compression dict + \
{block_count} blocks (avg {} bytes/block)",
" {} KiB = {} kiB key compression dict + {block_count} blocks (avg {} \
bytes/block)",
sst_size / 1024,
key_compression_dictionary_size / 1024,
value_compression_dictionary_size / 1024,
(sst_size
- key_compression_dictionary_size as u64
- value_compression_dictionary_size as u64)
/ block_count as u64
(sst_size - key_compression_dictionary_size as u64) / block_count as u64
);
}
if !meta_file.obsolete_sst_files.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion turbopack/crates/turbo-persistence/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ memmap2 = "0.9.5"
parking_lot = { workspace = true }
qfilter = { version = "0.2.4", features = ["serde"] }
quick_cache = { workspace = true }
rayon = { workspace = true }
rustc-hash = { workspace = true }
smallvec = { workspace = true}
thread_local = { workspace = true }
Expand All @@ -32,6 +31,7 @@ zstd = { version = "0.13.2", features = ["zdict_builder"] }

[dev-dependencies]
rand = { workspace = true, features = ["small_rng"] }
rayon = { workspace = true }
tempfile = { workspace = true }

[lints]
Expand Down
2 changes: 0 additions & 2 deletions turbopack/crates/turbo-persistence/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ A meta file can contain metadata about multiple SST files. The metadata is store
- foreach described SST file
- 4 bytes sequence number of the SST file
- 2 bytes key Compression Dictionary length
- 2 bytes value Compression Dictionary length
- 2 bytes block count
- 8 bytes min hash
- 8 bytes max hash
Expand All @@ -59,7 +58,6 @@ A meta file can contain metadata about multiple SST files. The metadata is store
The SST file contains only data without any header.

- serialized key Compression Dictionary
- serialized value Compression Dictionary
- foreach block
- 4 bytes uncompressed block length
- compressed data
Expand Down
15 changes: 12 additions & 3 deletions turbopack/crates/turbo-persistence/src/collector.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::mem::take;

use crate::{
ValueBuffer,
collector_entry::{CollectorEntry, CollectorEntryValue, EntryKey},
Expand Down Expand Up @@ -90,11 +92,11 @@ impl<K: StoreKey, const SIZE_SHIFT: usize> Collector<K, SIZE_SHIFT> {
self.entries.push(entry);
}

/// Sorts the entries and returns them along with the total key and value sizes. This doesn't
/// Sorts the entries and returns them along with the total key size. This doesn't
/// clear the entries.
pub fn sorted(&mut self) -> (&[CollectorEntry<K>], usize, usize) {
pub fn sorted(&mut self) -> (&[CollectorEntry<K>], usize) {
self.entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
(&self.entries, self.total_key_size, self.total_value_size)
(&self.entries, self.total_key_size)
}

/// Clears the collector.
Expand All @@ -111,4 +113,11 @@ impl<K: StoreKey, const SIZE_SHIFT: usize> Collector<K, SIZE_SHIFT> {
self.total_value_size = 0;
self.entries.drain(..)
}

/// Clears the collector and drops the capacity
pub fn drop_contents(&mut self) {
drop(take(&mut self.entries));
self.total_key_size = 0;
self.total_value_size = 0;
}
}
21 changes: 14 additions & 7 deletions turbopack/crates/turbo-persistence/src/compaction/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ impl Default for CompactConfig {
optimal_merge_count: 8,
max_merge_count: 32,
max_merge_bytes: 500 * MB,
min_merge_duplication_bytes: MB,
optimal_merge_duplication_bytes: 10 * MB,
min_merge_duplication_bytes: 50 * MB,
optimal_merge_duplication_bytes: 100 * MB,
max_merge_segment_count: 8,
}
}
Expand Down Expand Up @@ -233,13 +233,20 @@ pub fn get_merge_segments<T: Compactable>(
// We have reached the maximum number of merge jobs, so we stop here.
break;
}
let mut current_range = start_compactable.range();
let start_compactable_range = start_compactable.range();
let start_compactable_size = start_compactable.size();
let mut current_range = start_compactable_range.clone();

// We might need to restart the search if we need to extend the range.
'search: loop {
let mut current_set = smallvec![start_index];
let mut current_size = start_compactable.size();
let mut current_size = start_compactable_size;
let mut duplication = IntervalMap::<Option<DuplicationInfo>>::new();
duplication.update(start_compactable_range.clone(), |dup_info| {
dup_info
.get_or_insert_default()
.add(start_compactable_size, &start_compactable_range);
});
let mut current_skip = 0;

// We will capture compactables in the current_range until we find a optimal merge
Expand Down Expand Up @@ -609,8 +616,8 @@ mod tests {
min_merge_count: 2,
optimal_merge_count: 4,
max_merge_bytes: 5000,
min_merge_duplication_bytes: 200,
optimal_merge_duplication_bytes: 500,
min_merge_duplication_bytes: 500,
optimal_merge_duplication_bytes: 1000,
max_merge_segment_count: 4,
};
let jobs = get_merge_segments(&containers, &config);
Expand Down Expand Up @@ -653,7 +660,7 @@ mod tests {
println!("Number of compactions: {number_of_compactions}");

let metrics = compute_metrics(&containers, 0..=KEY_RANGE);
assert!(number_of_compactions < 40);
assert!(number_of_compactions < 30);
assert!(containers.len() < 30);
assert!(metrics.duplication < 0.5);
}
Expand Down
56 changes: 56 additions & 0 deletions turbopack/crates/turbo-persistence/src/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use std::{mem::MaybeUninit, sync::Arc};

use anyhow::{Context, Result};
use lzzzz::lz4::{ACC_LEVEL_DEFAULT, decompress, decompress_with_dict};

#[tracing::instrument(level = "trace", skip_all)]
pub fn decompress_into_arc(
uncompressed_length: u32,
block: &[u8],
compression_dictionary: Option<&[u8]>,
_long_term: bool,
) -> Result<Arc<[u8]>> {
// We directly allocate the buffer in an Arc to avoid copying it into an Arc and avoiding
// double indirection. This is a dynamically sized arc.
let buffer: Arc<[MaybeUninit<u8>]> = Arc::new_zeroed_slice(uncompressed_length as usize);
// Assume that the buffer is initialized.
let buffer = Arc::into_raw(buffer);
// Safety: Assuming that the buffer is initialized is safe because we just created it as
// zeroed slice and u8 doesn't require initialization.
let mut buffer = unsafe { Arc::from_raw(buffer as *mut [u8]) };
// Safety: We know that the buffer is not shared yet.
let decompressed = unsafe { Arc::get_mut_unchecked(&mut buffer) };
let bytes_writes = if let Some(dict) = compression_dictionary {
// Safety: decompress_with_dict will only write to `decompressed` and not read from it.
decompress_with_dict(block, decompressed, dict)?
} else {
// Safety: decompress will only write to `decompressed` and not read from it.
decompress(block, decompressed)?
};
assert_eq!(
bytes_writes, uncompressed_length as usize,
"Decompressed length does not match expected length"
);
// Safety: The buffer is now fully initialized and can be used.
Ok(buffer)
}

#[tracing::instrument(level = "trace", skip_all)]
pub fn compress_into_buffer(
block: &[u8],
dict: Option<&[u8]>,
_long_term: bool,
buffer: &mut Vec<u8>,
) -> Result<()> {
let mut compressor = if let Some(dict) = dict {
lzzzz::lz4::Compressor::with_dict(dict)
} else {
lzzzz::lz4::Compressor::new()
}
.context("LZ4 compressor creation failed")?;
let acc_factor = ACC_LEVEL_DEFAULT;
compressor
.next_to_vec(block, buffer, acc_factor)
.context("Compression failed")?;
Ok(())
}
Loading