Skip to content

Commit 1fa24a4

Browse files
authored
Turbopack: Lazy decompress medium value compressed blocks (#82257)
### What? During compaction we can lazily recompress medium-sized values to avoid the large temporary memory usage.
1 parent a874eb5 commit 1fa24a4

File tree

8 files changed

+200
-85
lines changed

8 files changed

+200
-85
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
use std::{mem::MaybeUninit, sync::Arc};
2+
3+
use anyhow::{Context, Result};
4+
use lzzzz::lz4::{ACC_LEVEL_DEFAULT, decompress, decompress_with_dict};
5+
6+
#[tracing::instrument(level = "trace", skip_all)]
7+
pub fn decompress_into_arc(
8+
uncompressed_length: u32,
9+
block: &[u8],
10+
compression_dictionary: Option<&[u8]>,
11+
_long_term: bool,
12+
) -> Result<Arc<[u8]>> {
13+
// We directly allocate the buffer in an Arc to avoid copying it into an Arc and avoiding
14+
// double indirection. This is a dynamically sized arc.
15+
let buffer: Arc<[MaybeUninit<u8>]> = Arc::new_zeroed_slice(uncompressed_length as usize);
16+
// Assume that the buffer is initialized.
17+
let buffer = Arc::into_raw(buffer);
18+
// Safety: Assuming that the buffer is initialized is safe because we just created it as
19+
// zeroed slice and u8 doesn't require initialization.
20+
let mut buffer = unsafe { Arc::from_raw(buffer as *mut [u8]) };
21+
// Safety: We know that the buffer is not shared yet.
22+
let decompressed = unsafe { Arc::get_mut_unchecked(&mut buffer) };
23+
let bytes_writes = if let Some(dict) = compression_dictionary {
24+
// Safety: decompress_with_dict will only write to `decompressed` and not read from it.
25+
decompress_with_dict(block, decompressed, dict)?
26+
} else {
27+
// Safety: decompress will only write to `decompressed` and not read from it.
28+
decompress(block, decompressed)?
29+
};
30+
assert_eq!(
31+
bytes_writes, uncompressed_length as usize,
32+
"Decompressed length does not match expected length"
33+
);
34+
// Safety: The buffer is now fully initialized and can be used.
35+
Ok(buffer)
36+
}
37+
38+
#[tracing::instrument(level = "trace", skip_all)]
39+
pub fn compress_into_buffer(
40+
block: &[u8],
41+
dict: Option<&[u8]>,
42+
_long_term: bool,
43+
buffer: &mut Vec<u8>,
44+
) -> Result<()> {
45+
let mut compressor = if let Some(dict) = dict {
46+
lzzzz::lz4::Compressor::with_dict(dict)
47+
} else {
48+
lzzzz::lz4::Compressor::new()
49+
}
50+
.context("LZ4 compressor creation failed")?;
51+
let acc_factor = ACC_LEVEL_DEFAULT;
52+
compressor
53+
.next_to_vec(block, buffer, acc_factor)
54+
.context("Compression failed")?;
55+
Ok(())
56+
}

turbopack/crates/turbo-persistence/src/db.rs

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,15 @@ use std::{
33
collections::HashSet,
44
fs::{self, File, OpenOptions, ReadDir},
55
io::{BufWriter, Write},
6-
mem::{MaybeUninit, swap, transmute},
6+
mem::swap,
77
ops::RangeInclusive,
88
path::{Path, PathBuf},
9-
sync::{
10-
Arc,
11-
atomic::{AtomicBool, AtomicU32, Ordering},
12-
},
9+
sync::atomic::{AtomicBool, AtomicU32, Ordering},
1310
};
1411

1512
use anyhow::{Context, Result, bail};
1613
use byteorder::{BE, ReadBytesExt, WriteBytesExt};
1714
use jiff::Timestamp;
18-
use lzzzz::lz4::decompress;
1915
use memmap2::Mmap;
2016
use parking_lot::{Mutex, RwLock};
2117

@@ -24,6 +20,7 @@ use crate::{
2420
QueryKey,
2521
arc_slice::ArcSlice,
2622
compaction::selector::{Compactable, compute_metrics, get_merge_segments},
23+
compression::decompress_into_arc,
2724
constants::{
2825
AMQF_AVG_SIZE, AMQF_CACHE_SIZE, DATA_THRESHOLD_PER_COMPACTED_FILE, KEY_BLOCK_AVG_SIZE,
2926
KEY_BLOCK_CACHE_SIZE, MAX_ENTRIES_PER_COMPACTED_FILE, VALUE_BLOCK_AVG_SIZE,
@@ -392,14 +389,9 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
392389
#[cfg(target_os = "linux")]
393390
mmap.advise(memmap2::Advice::Unmergeable)?;
394391
let mut compressed = &mmap[..];
395-
let uncompressed_length = compressed.read_u32::<BE>()? as usize;
396-
397-
let buffer = Arc::new_zeroed_slice(uncompressed_length);
398-
// Safety: MaybeUninit<u8> can be safely transmuted to u8.
399-
let mut buffer = unsafe { transmute::<Arc<[MaybeUninit<u8>]>, Arc<[u8]>>(buffer) };
400-
// Safety: We know that the buffer is not shared yet.
401-
let decompressed = unsafe { Arc::get_mut_unchecked(&mut buffer) };
402-
decompress(compressed, decompressed)?;
392+
let uncompressed_length = compressed.read_u32::<BE>()?;
393+
394+
let buffer = decompress_into_arc(uncompressed_length, compressed, None, true)?;
403395
Ok(ArcSlice::from(buffer))
404396
}
405397

@@ -918,9 +910,9 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
918910
});
919911
}
920912

921-
fn create_sst_file<S: ParallelScheduler>(
913+
fn create_sst_file<'l, S: ParallelScheduler>(
922914
parallel_scheduler: &S,
923-
entries: &[LookupEntry],
915+
entries: &[LookupEntry<'l>],
924916
total_key_size: usize,
925917
total_value_size: usize,
926918
path: &Path,
@@ -964,7 +956,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
964956

965957
let mut total_key_size = 0;
966958
let mut total_value_size = 0;
967-
let mut current: Option<LookupEntry> = None;
959+
let mut current: Option<LookupEntry<'_>> = None;
968960
let mut entries = Vec::new();
969961
let mut last_entries = Vec::new();
970962
let mut last_entries_total_sizes = (0, 0);
@@ -975,7 +967,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
975967
if let Some(current) = current.take() {
976968
if current.key != entry.key {
977969
let key_size = current.key.len();
978-
let value_size = current.value.size_in_sst();
970+
let value_size = current.value.uncompressed_size_in_sst();
979971
total_key_size += key_size;
980972
total_value_size += value_size;
981973

@@ -1023,7 +1015,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
10231015
}
10241016
if let Some(entry) = current {
10251017
total_key_size += entry.key.len();
1026-
total_value_size += entry.value.size_in_sst();
1018+
total_value_size += entry.value.uncompressed_size_in_sst();
10271019
entries.push(entry);
10281020
}
10291021

turbopack/crates/turbo-persistence/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ mod arc_slice;
88
mod collector;
99
mod collector_entry;
1010
mod compaction;
11+
mod compression;
1112
mod constants;
1213
mod db;
1314
mod key;

turbopack/crates/turbo-persistence/src/lookup_entry.rs

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,28 +14,43 @@ pub enum LookupValue {
1414
Blob { sequence_number: u32 },
1515
}
1616

17-
impl LookupValue {
17+
/// A value from a SST file lookup.
18+
pub enum LazyLookupValue<'l> {
19+
/// A LookupValue
20+
Eager(LookupValue),
21+
/// A medium sized value that is still compressed.
22+
Medium {
23+
uncompressed_size: u32,
24+
block: &'l [u8],
25+
dictionary: &'l [u8],
26+
},
27+
}
28+
29+
impl LazyLookupValue<'_> {
1830
/// Returns the size of the value in the SST file.
19-
pub fn size_in_sst(&self) -> usize {
31+
pub fn uncompressed_size_in_sst(&self) -> usize {
2032
match self {
21-
LookupValue::Slice { value } => value.len(),
22-
LookupValue::Deleted => 0,
23-
LookupValue::Blob { .. } => 0,
33+
LazyLookupValue::Eager(LookupValue::Slice { value }) => value.len(),
34+
LazyLookupValue::Eager(LookupValue::Deleted) => 0,
35+
LazyLookupValue::Eager(LookupValue::Blob { .. }) => 0,
36+
LazyLookupValue::Medium {
37+
uncompressed_size, ..
38+
} => *uncompressed_size as usize,
2439
}
2540
}
2641
}
2742

2843
/// An entry from a SST file lookup.
29-
pub struct LookupEntry {
44+
pub struct LookupEntry<'l> {
3045
/// The hash of the key.
3146
pub hash: u64,
3247
/// The key.
3348
pub key: ArcSlice<u8>,
3449
/// The value.
35-
pub value: LookupValue,
50+
pub value: LazyLookupValue<'l>,
3651
}
3752

38-
impl Entry for LookupEntry {
53+
impl Entry for LookupEntry<'_> {
3954
fn key_hash(&self) -> u64 {
4055
self.hash
4156
}
@@ -50,17 +65,26 @@ impl Entry for LookupEntry {
5065

5166
fn value(&self) -> EntryValue<'_> {
5267
match &self.value {
53-
LookupValue::Deleted => EntryValue::Deleted,
54-
LookupValue::Slice { value } => {
68+
LazyLookupValue::Eager(LookupValue::Deleted) => EntryValue::Deleted,
69+
LazyLookupValue::Eager(LookupValue::Slice { value }) => {
5570
if value.len() > MAX_SMALL_VALUE_SIZE {
5671
EntryValue::Medium { value }
5772
} else {
5873
EntryValue::Small { value }
5974
}
6075
}
61-
LookupValue::Blob { sequence_number } => EntryValue::Large {
76+
LazyLookupValue::Eager(LookupValue::Blob { sequence_number }) => EntryValue::Large {
6277
blob: *sequence_number,
6378
},
79+
LazyLookupValue::Medium {
80+
uncompressed_size,
81+
block,
82+
dictionary,
83+
} => EntryValue::MediumCompressed {
84+
uncompressed_size: *uncompressed_size,
85+
block,
86+
dictionary,
87+
},
6488
}
6589
}
6690
}

turbopack/crates/turbo-persistence/src/merge_iter.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,27 @@ use crate::lookup_entry::LookupEntry;
66

77
/// An active iterator that is being merged. It has peeked the next element and can be compared
88
/// according to that element. The `order` is used when multiple iterators have the same key.
9-
struct ActiveIterator<T: Iterator<Item = Result<LookupEntry>>> {
9+
struct ActiveIterator<'l, T: Iterator<Item = Result<LookupEntry<'l>>>> {
1010
iter: T,
1111
order: usize,
12-
entry: LookupEntry,
12+
entry: LookupEntry<'l>,
1313
}
1414

15-
impl<T: Iterator<Item = Result<LookupEntry>>> PartialEq for ActiveIterator<T> {
15+
impl<'l, T: Iterator<Item = Result<LookupEntry<'l>>>> PartialEq for ActiveIterator<'l, T> {
1616
fn eq(&self, other: &Self) -> bool {
1717
self.entry.hash == other.entry.hash && *self.entry.key == *other.entry.key
1818
}
1919
}
2020

21-
impl<T: Iterator<Item = Result<LookupEntry>>> Eq for ActiveIterator<T> {}
21+
impl<'l, T: Iterator<Item = Result<LookupEntry<'l>>>> Eq for ActiveIterator<'l, T> {}
2222

23-
impl<T: Iterator<Item = Result<LookupEntry>>> PartialOrd for ActiveIterator<T> {
23+
impl<'l, T: Iterator<Item = Result<LookupEntry<'l>>>> PartialOrd for ActiveIterator<'l, T> {
2424
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2525
Some(self.cmp(other))
2626
}
2727
}
2828

29-
impl<T: Iterator<Item = Result<LookupEntry>>> Ord for ActiveIterator<T> {
29+
impl<'l, T: Iterator<Item = Result<LookupEntry<'l>>>> Ord for ActiveIterator<'l, T> {
3030
fn cmp(&self, other: &Self) -> Ordering {
3131
self.entry
3232
.hash
@@ -39,11 +39,11 @@ impl<T: Iterator<Item = Result<LookupEntry>>> Ord for ActiveIterator<T> {
3939

4040
/// An iterator that merges multiple sorted iterators into a single sorted iterator. Internal it
4141
/// uses an heap of iterators to iterate them in order.
42-
pub struct MergeIter<T: Iterator<Item = Result<LookupEntry>>> {
43-
heap: BinaryHeap<ActiveIterator<T>>,
42+
pub struct MergeIter<'l, T: Iterator<Item = Result<LookupEntry<'l>>>> {
43+
heap: BinaryHeap<ActiveIterator<'l, T>>,
4444
}
4545

46-
impl<T: Iterator<Item = Result<LookupEntry>>> MergeIter<T> {
46+
impl<'l, T: Iterator<Item = Result<LookupEntry<'l>>>> MergeIter<'l, T> {
4747
pub fn new(iters: impl Iterator<Item = T>) -> Result<Self> {
4848
let mut heap = BinaryHeap::new();
4949
for (order, mut iter) in iters.enumerate() {
@@ -56,8 +56,8 @@ impl<T: Iterator<Item = Result<LookupEntry>>> MergeIter<T> {
5656
}
5757
}
5858

59-
impl<T: Iterator<Item = Result<LookupEntry>>> Iterator for MergeIter<T> {
60-
type Item = Result<LookupEntry>;
59+
impl<'l, T: Iterator<Item = Result<LookupEntry<'l>>>> Iterator for MergeIter<'l, T> {
60+
type Item = Result<LookupEntry<'l>>;
6161

6262
fn next(&mut self) -> Option<Self::Item> {
6363
let ActiveIterator {

0 commit comments

Comments
 (0)