Skip to content

Commit c154eb9

Browse files
committed
Turbopack: limit compaction merging by size instead of count
1 parent 3e6abe3 commit c154eb9

File tree

4 files changed

+93
-13
lines changed

4 files changed

+93
-13
lines changed

turbopack/crates/turbo-persistence/src/compaction/selector.rs

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ type Range = (u64, u64);
1919
pub trait Compactable {
2020
/// Returns the range of the compactable.
2121
fn range(&self) -> Range;
22+
23+
/// Returns the size of the compactable.
24+
fn size(&self) -> usize;
2225
}
2326

2427
fn is_overlapping(a: &Range, b: &Range) -> bool {
@@ -55,11 +58,14 @@ pub fn total_coverage<T: Compactable>(compactables: &[T], full_range: Range) ->
5558

5659
/// Configuration for the compaction algorithm.
5760
pub struct CompactConfig {
61+
/// The minimum number of files to merge at once.
62+
pub min_merge: usize,
63+
5864
/// The maximum number of files to merge at once.
5965
pub max_merge: usize,
6066

61-
/// The minimum number of files to merge at once.
62-
pub min_merge: usize,
67+
/// The maximum size of all files to merge at once.
68+
pub max_merge_size: usize,
6369
}
6470

6571
/// For a list of compactables, computes merge and move jobs that are expected to perform best.
@@ -102,6 +108,7 @@ fn get_compaction_jobs_internal<T: Compactable>(
102108
let start_range = compactables[start].range();
103109
let mut range = start_range;
104110

111+
let mut merge_job_size = compactables[start].size();
105112
let mut merge_job = Vec::new();
106113
merge_job.push(start);
107114
let mut merge_job_input_spread = spread(&start_range) as f32;
@@ -116,8 +123,13 @@ fn get_compaction_jobs_internal<T: Compactable>(
116123
if is_overlapping(&range, &range_for_i) {
117124
let mut extended_range = range;
118125
if !extend_range(&mut extended_range, &range_for_i) {
126+
let size = compactables[i].size();
127+
if merge_job_size + size > config.max_merge_size {
128+
break 'outer;
129+
}
119130
used_compactables[i] = true;
120131
merge_job.push(i);
132+
merge_job_size += compactables[i].size();
121133
merge_job_input_spread += spread(&range_for_i) as f32;
122134
} else {
123135
let s = spread(&range);
@@ -208,6 +220,7 @@ mod tests {
208220
use std::{
209221
fmt::Debug,
210222
mem::{swap, take},
223+
usize,
211224
};
212225

213226
use rand::{Rng, SeedableRng};
@@ -216,22 +229,32 @@ mod tests {
216229

217230
struct TestCompactable {
218231
range: Range,
232+
size: usize,
219233
}
220234

221235
impl Compactable for TestCompactable {
222236
fn range(&self) -> Range {
223237
self.range
224238
}
239+
240+
fn size(&self) -> usize {
241+
self.size
242+
}
225243
}
226244

227-
fn compact<const N: usize>(ranges: [(u64, u64); N], max_merge: usize) -> CompactionJobs {
245+
fn compact<const N: usize>(
246+
ranges: [(u64, u64); N],
247+
max_merge: usize,
248+
max_merge_size: usize,
249+
) -> CompactionJobs {
228250
let compactables = ranges
229251
.iter()
230-
.map(|&range| TestCompactable { range })
252+
.map(|&range| TestCompactable { range, size: 100 })
231253
.collect::<Vec<_>>();
232254
let config = CompactConfig {
233255
max_merge,
234256
min_merge: 2,
257+
max_merge_size,
235258
};
236259
get_compaction_jobs(&compactables, &config)
237260
}
@@ -255,6 +278,32 @@ mod tests {
255278
(30, 40),
256279
],
257280
3,
281+
usize::MAX,
282+
);
283+
assert_eq!(merge_jobs, vec![vec![0, 1, 2], vec![4, 5, 6]]);
284+
assert_eq!(move_jobs, vec![3, 8]);
285+
}
286+
287+
#[test]
288+
fn test_compaction_jobs_by_size() {
289+
let CompactionJobs {
290+
merge_jobs,
291+
move_jobs,
292+
..
293+
} = compact(
294+
[
295+
(0, 10),
296+
(10, 30),
297+
(9, 13),
298+
(0, 30),
299+
(40, 44),
300+
(41, 42),
301+
(41, 47),
302+
(90, 100),
303+
(30, 40),
304+
],
305+
usize::MAX,
306+
300,
258307
);
259308
assert_eq!(merge_jobs, vec![vec![0, 1, 2], vec![4, 5, 6]]);
260309
assert_eq!(move_jobs, vec![3, 8]);
@@ -293,6 +342,7 @@ mod tests {
293342
let config = CompactConfig {
294343
max_merge: 4,
295344
min_merge: 2,
345+
max_merge_size: usize::MAX,
296346
};
297347
let jobs = get_compaction_jobs(&containers, &config);
298348
if !jobs.is_empty() {
@@ -337,6 +387,10 @@ mod tests {
337387
fn range(&self) -> Range {
338388
(self.keys[0], *self.keys.last().unwrap())
339389
}
390+
391+
fn size(&self) -> usize {
392+
self.keys.len()
393+
}
340394
}
341395

342396
impl Debug for Container {

turbopack/crates/turbo-persistence/src/db.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::{
99
atomic::{AtomicBool, AtomicU32, Ordering},
1010
Arc,
1111
},
12+
usize,
1213
};
1314

1415
use anyhow::{bail, Context, Result};
@@ -534,15 +535,20 @@ impl TurboPersistence {
534535
/// Runs a full compaction on the database. This will rewrite all SST files, removing all
535536
/// duplicate keys and separating all key ranges into unique files.
536537
pub fn full_compact(&self) -> Result<()> {
537-
self.compact(0.0, usize::MAX)?;
538+
self.compact(0.0, usize::MAX, usize::MAX)?;
538539
Ok(())
539540
}
540541

541542
/// Runs a (partial) compaction. Compaction will only be performed if the coverage of the SST
542543
/// files is above the given threshold. The coverage is the average number of SST files that
543544
/// need to be read to find a key. It also limits the maximum number of SST files that are
544545
/// merged at once, which is the main factor for the runtime of the compaction.
545-
pub fn compact(&self, max_coverage: f32, max_merge_sequence: usize) -> Result<()> {
546+
pub fn compact(
547+
&self,
548+
max_coverage: f32,
549+
max_merge_sequence: usize,
550+
max_merge_size: usize,
551+
) -> Result<()> {
546552
if self
547553
.active_write_operation
548554
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
@@ -568,6 +574,7 @@ impl TurboPersistence {
568574
&mut indicies_to_delete,
569575
max_coverage,
570576
max_merge_sequence,
577+
max_merge_size,
571578
)?;
572579
}
573580

@@ -594,6 +601,7 @@ impl TurboPersistence {
594601
indicies_to_delete: &mut Vec<usize>,
595602
max_coverage: f32,
596603
max_merge_sequence: usize,
604+
max_merge_size: usize,
597605
) -> Result<()> {
598606
if static_sorted_files.is_empty() {
599607
return Ok(());
@@ -602,18 +610,29 @@ impl TurboPersistence {
602610
struct SstWithRange {
603611
index: usize,
604612
range: StaticSortedFileRange,
613+
size: usize,
605614
}
606615

607616
impl Compactable for SstWithRange {
608617
fn range(&self) -> (u64, u64) {
609618
(self.range.min_hash, self.range.max_hash)
610619
}
620+
621+
fn size(&self) -> usize {
622+
self.size
623+
}
611624
}
612625

613626
let ssts_with_ranges = static_sorted_files
614627
.iter()
615628
.enumerate()
616-
.flat_map(|(index, sst)| sst.range().ok().map(|range| SstWithRange { index, range }))
629+
.flat_map(|(index, sst)| {
630+
sst.range().ok().map(|range| SstWithRange {
631+
index,
632+
range,
633+
size: sst.size(),
634+
})
635+
})
617636
.collect::<Vec<_>>();
618637

619638
let families = ssts_with_ranges
@@ -651,8 +670,9 @@ impl TurboPersistence {
651670
} = get_compaction_jobs(
652671
&ssts_with_ranges,
653672
&CompactConfig {
654-
max_merge: max_merge_sequence,
655673
min_merge: 2,
674+
max_merge: max_merge_sequence,
675+
max_merge_size,
656676
},
657677
);
658678

turbopack/crates/turbo-persistence/src/tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::time::Instant;
1+
use std::{time::Instant, usize};
22

33
use anyhow::Result;
44
use rayon::iter::{IntoParallelIterator, ParallelIterator};
@@ -433,7 +433,7 @@ fn persist_changes() -> Result<()> {
433433
{
434434
let db = TurboPersistence::open(path.to_path_buf())?;
435435

436-
db.compact(1.0, 3)?;
436+
db.compact(1.0, 3, usize::MAX)?;
437437

438438
check(&db, 1, 13)?;
439439
check(&db, 2, 22)?;

turbopack/crates/turbo-tasks-backend/src/database/turbo.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ use crate::database::{
1414
};
1515

1616
const COMPACT_MAX_COVERAGE: f32 = 10.0;
17-
const COMPACT_MAX_MERGE_SEQUENCE: usize = 8;
17+
const COMPACT_MAX_MERGE_SEQUENCE: usize = 16;
18+
const COMPACT_MAX_MERGE_SIZE: usize = 512 * 1024 * 1024; // 1GiB
1819

1920
pub struct TurboKeyValueDatabase {
2021
db: Arc<TurboPersistence>,
@@ -30,8 +31,13 @@ impl TurboKeyValueDatabase {
3031
};
3132
// start compaction in background if the database is not empty
3233
if !db.is_empty() {
33-
let handle =
34-
spawn(move || db.compact(COMPACT_MAX_COVERAGE, COMPACT_MAX_MERGE_SEQUENCE));
34+
let handle = spawn(move || {
35+
db.compact(
36+
COMPACT_MAX_COVERAGE,
37+
COMPACT_MAX_MERGE_SEQUENCE,
38+
COMPACT_MAX_MERGE_SIZE,
39+
)
40+
});
3541
this.compact_join_handle.get_mut().replace(handle);
3642
}
3743
Ok(this)

0 commit comments

Comments
 (0)