Skip to content

Commit 6ceb64c

Browse files
committed
Turbopack: use block in place for db writes
1 parent 6897339 commit 6ceb64c

File tree

8 files changed

+151
-117
lines changed

8 files changed

+151
-117
lines changed

turbopack/crates/turbo-persistence/src/db.rs

Lines changed: 100 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -508,12 +508,15 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
508508
sst_filter.apply_filter(meta_file);
509509
}
510510

511-
for (_, file) in new_sst_files.iter() {
512-
file.sync_all()?;
513-
}
514-
for (_, file) in new_blob_files.iter() {
515-
file.sync_all()?;
516-
}
511+
self.parallel_scheduler.block_in_place(|| {
512+
for (_, file) in new_sst_files.iter() {
513+
file.sync_all()?;
514+
}
515+
for (_, file) in new_blob_files.iter() {
516+
file.sync_all()?;
517+
}
518+
anyhow::Ok(())
519+
})?;
517520

518521
let new_meta_info = new_meta_files
519522
.iter()
@@ -566,86 +569,88 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
566569
inner.current_sequence_number = seq;
567570
}
568571

569-
if has_delete_file {
570-
sst_seq_numbers_to_delete.sort_unstable();
571-
meta_seq_numbers_to_delete.sort_unstable();
572-
blob_seq_numbers_to_delete.sort_unstable();
573-
// Write *.del file, marking the selected files as to delete
574-
let mut buf = Vec::with_capacity(
575-
(sst_seq_numbers_to_delete.len()
576-
+ meta_seq_numbers_to_delete.len()
577-
+ blob_seq_numbers_to_delete.len())
578-
* size_of::<u32>(),
579-
);
580-
for seq in sst_seq_numbers_to_delete.iter() {
581-
buf.write_u32::<BE>(*seq)?;
582-
}
583-
for seq in meta_seq_numbers_to_delete.iter() {
584-
buf.write_u32::<BE>(*seq)?;
585-
}
586-
for seq in blob_seq_numbers_to_delete.iter() {
587-
buf.write_u32::<BE>(*seq)?;
588-
}
589-
let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
590-
file.write_all(&buf)?;
591-
file.sync_all()?;
592-
}
593-
594-
let mut current_file = OpenOptions::new()
595-
.write(true)
596-
.truncate(false)
597-
.read(false)
598-
.open(self.path.join("CURRENT"))?;
599-
current_file.write_u32::<BE>(seq)?;
600-
current_file.sync_all()?;
601-
602-
for seq in sst_seq_numbers_to_delete.iter() {
603-
fs::remove_file(self.path.join(format!("{seq:08}.sst")))?;
604-
}
605-
for seq in meta_seq_numbers_to_delete.iter() {
606-
fs::remove_file(self.path.join(format!("{seq:08}.meta")))?;
607-
}
608-
for seq in blob_seq_numbers_to_delete.iter() {
609-
fs::remove_file(self.path.join(format!("{seq:08}.blob")))?;
610-
}
611-
612-
{
613-
let mut log = self.open_log()?;
614-
writeln!(log, "Time {time}")?;
615-
let span = time.until(Timestamp::now())?;
616-
writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
617-
for (seq, family, ssts, obsolete) in new_meta_info {
618-
writeln!(log, "{seq:08} META family:{family}",)?;
619-
for (seq, min, max, size) in ssts {
620-
writeln!(
621-
log,
622-
" {seq:08} SST {min:016x}-{max:016x} {} MiB",
623-
size / 1024 / 1024
624-
)?;
572+
self.parallel_scheduler.block_in_place(|| {
573+
if has_delete_file {
574+
sst_seq_numbers_to_delete.sort_unstable();
575+
meta_seq_numbers_to_delete.sort_unstable();
576+
blob_seq_numbers_to_delete.sort_unstable();
577+
// Write *.del file, marking the selected files as to delete
578+
let mut buf = Vec::with_capacity(
579+
(sst_seq_numbers_to_delete.len()
580+
+ meta_seq_numbers_to_delete.len()
581+
+ blob_seq_numbers_to_delete.len())
582+
* size_of::<u32>(),
583+
);
584+
for seq in sst_seq_numbers_to_delete.iter() {
585+
buf.write_u32::<BE>(*seq)?;
625586
}
626-
for seq in obsolete {
627-
writeln!(log, " {seq:08} OBSOLETE SST")?;
587+
for seq in meta_seq_numbers_to_delete.iter() {
588+
buf.write_u32::<BE>(*seq)?;
628589
}
590+
for seq in blob_seq_numbers_to_delete.iter() {
591+
buf.write_u32::<BE>(*seq)?;
592+
}
593+
let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
594+
file.write_all(&buf)?;
595+
file.sync_all()?;
629596
}
630-
new_sst_files.sort_unstable_by_key(|(seq, _)| *seq);
631-
for (seq, _) in new_sst_files.iter() {
632-
writeln!(log, "{seq:08} NEW SST")?;
633-
}
634-
new_blob_files.sort_unstable_by_key(|(seq, _)| *seq);
635-
for (seq, _) in new_blob_files.iter() {
636-
writeln!(log, "{seq:08} NEW BLOB")?;
637-
}
597+
598+
let mut current_file = OpenOptions::new()
599+
.write(true)
600+
.truncate(false)
601+
.read(false)
602+
.open(self.path.join("CURRENT"))?;
603+
current_file.write_u32::<BE>(seq)?;
604+
current_file.sync_all()?;
605+
638606
for seq in sst_seq_numbers_to_delete.iter() {
639-
writeln!(log, "{seq:08} SST DELETED")?;
607+
fs::remove_file(self.path.join(format!("{seq:08}.sst")))?;
640608
}
641609
for seq in meta_seq_numbers_to_delete.iter() {
642-
writeln!(log, "{seq:08} META DELETED")?;
610+
fs::remove_file(self.path.join(format!("{seq:08}.meta")))?;
643611
}
644612
for seq in blob_seq_numbers_to_delete.iter() {
645-
writeln!(log, "{seq:08} BLOB DELETED")?;
613+
fs::remove_file(self.path.join(format!("{seq:08}.blob")))?;
646614
}
647-
}
648615

616+
{
617+
let mut log = self.open_log()?;
618+
writeln!(log, "Time {time}")?;
619+
let span = time.until(Timestamp::now())?;
620+
writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
621+
for (seq, family, ssts, obsolete) in new_meta_info {
622+
writeln!(log, "{seq:08} META family:{family}",)?;
623+
for (seq, min, max, size) in ssts {
624+
writeln!(
625+
log,
626+
" {seq:08} SST {min:016x}-{max:016x} {} MiB",
627+
size / 1024 / 1024
628+
)?;
629+
}
630+
for seq in obsolete {
631+
writeln!(log, " {seq:08} OBSOLETE SST")?;
632+
}
633+
}
634+
new_sst_files.sort_unstable_by_key(|(seq, _)| *seq);
635+
for (seq, _) in new_sst_files.iter() {
636+
writeln!(log, "{seq:08} NEW SST")?;
637+
}
638+
new_blob_files.sort_unstable_by_key(|(seq, _)| *seq);
639+
for (seq, _) in new_blob_files.iter() {
640+
writeln!(log, "{seq:08} NEW BLOB")?;
641+
}
642+
for seq in sst_seq_numbers_to_delete.iter() {
643+
writeln!(log, "{seq:08} SST DELETED")?;
644+
}
645+
for seq in meta_seq_numbers_to_delete.iter() {
646+
writeln!(log, "{seq:08} META DELETED")?;
647+
}
648+
for seq in blob_seq_numbers_to_delete.iter() {
649+
writeln!(log, "{seq:08} BLOB DELETED")?;
650+
}
651+
}
652+
anyhow::Ok(())
653+
})?;
649654
Ok(())
650655
}
651656

@@ -837,7 +842,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
837842
});
838843
}
839844

840-
{
845+
self.parallel_scheduler.block_in_place(|| {
841846
let metrics = compute_metrics(&ssts_with_ranges, 0..=u64::MAX);
842847
let guard = log_mutex.lock();
843848
let mut log = self.open_log()?;
@@ -859,7 +864,8 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
859864
}
860865
}
861866
drop(guard);
862-
}
867+
anyhow::Ok(())
868+
})?;
863869

864870
// Later we will remove the merged files
865871
let sst_seq_numbers_to_delete = merge_jobs
@@ -912,7 +918,8 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
912918
});
913919
}
914920

915-
fn create_sst_file(
921+
fn create_sst_file<S: ParallelScheduler>(
922+
parallel_scheduler: &S,
916923
entries: &[LookupEntry],
917924
total_key_size: usize,
918925
total_value_size: usize,
@@ -921,12 +928,14 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
921928
) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)>
922929
{
923930
let _span = tracing::trace_span!("write merged sst file").entered();
924-
let (meta, file) = write_static_stored_file(
925-
entries,
926-
total_key_size,
927-
total_value_size,
928-
&path.join(format!("{seq:08}.sst")),
929-
)?;
931+
let (meta, file) = parallel_scheduler.block_in_place(|| {
932+
write_static_stored_file(
933+
entries,
934+
total_key_size,
935+
total_value_size,
936+
&path.join(format!("{seq:08}.sst")),
937+
)
938+
})?;
930939
Ok((seq, file, meta))
931940
}
932941

@@ -993,6 +1002,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
9931002

9941003
keys_written += entries.len() as u64;
9951004
new_sst_files.push(create_sst_file(
1005+
&self.parallel_scheduler,
9961006
&entries,
9971007
selected_total_key_size,
9981008
selected_total_value_size,
@@ -1023,6 +1033,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
10231033

10241034
keys_written += entries.len() as u64;
10251035
new_sst_files.push(create_sst_file(
1036+
&self.parallel_scheduler,
10261037
&entries,
10271038
total_key_size,
10281039
total_value_size,
@@ -1046,6 +1057,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
10461057

10471058
keys_written += part1.len() as u64;
10481059
new_sst_files.push(create_sst_file(
1060+
&self.parallel_scheduler,
10491061
part1,
10501062
// We don't know the exact sizes so we estimate them
10511063
last_entries_total_sizes.0 / 2,
@@ -1056,6 +1068,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
10561068

10571069
keys_written += part2.len() as u64;
10581070
new_sst_files.push(create_sst_file(
1071+
&self.parallel_scheduler,
10591072
part2,
10601073
last_entries_total_sizes.0 / 2,
10611074
last_entries_total_sizes.1 / 2,
@@ -1126,7 +1139,8 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
11261139
let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
11271140
let meta_file = {
11281141
let _span = tracing::trace_span!("write meta file").entered();
1129-
meta_file_builder.write(&self.path, seq)?
1142+
self.parallel_scheduler
1143+
.block_in_place(|| meta_file_builder.write(&self.path, seq))?
11301144
};
11311145

11321146
Ok(PartialResultPerFamily {

turbopack/crates/turbo-persistence/src/parallel_scheduler.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
pub trait ParallelScheduler: Clone + Sync + Send {
2+
fn block_in_place<R>(&self, f: impl FnOnce() -> R + Send) -> R
3+
where
4+
R: Send;
5+
26
fn parallel_for_each<T>(&self, items: &[T], f: impl Fn(&T) + Send + Sync)
37
where
48
T: Sync;
@@ -55,6 +59,13 @@ pub trait ParallelScheduler: Clone + Sync + Send {
5559
pub struct SerialScheduler;
5660

5761
impl ParallelScheduler for SerialScheduler {
62+
fn block_in_place<R>(&self, f: impl FnOnce() -> R + Send) -> R
63+
where
64+
R: Send,
65+
{
66+
f()
67+
}
68+
5869
fn parallel_for_each<T>(&self, items: &[T], f: impl Fn(&T) + Send + Sync)
5970
where
6071
T: Sync,

turbopack/crates/turbo-persistence/src/tests.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ use crate::{
1414
struct RayonParallelScheduler;
1515

1616
impl ParallelScheduler for RayonParallelScheduler {
17+
fn block_in_place<R>(&self, f: impl FnOnce() -> R + Send) -> R
18+
where
19+
R: Send,
20+
{
21+
f()
22+
}
23+
1724
fn parallel_for_each<T>(&self, items: &[T], f: impl Fn(&T) + Send + Sync)
1825
where
1926
T: Sync,

turbopack/crates/turbo-persistence/src/write_batch.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -413,9 +413,12 @@ impl<K: StoreKey + Send + Sync, S: ParallelScheduler, const FAMILIES: usize>
413413
let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
414414

415415
let path = self.db_path.join(format!("{seq:08}.sst"));
416-
let (meta, file) =
417-
write_static_stored_file(entries, total_key_size, total_value_size, &path)
418-
.with_context(|| format!("Unable to write SST file {seq:08}.sst"))?;
416+
let (meta, file) = self
417+
.parallel_scheduler
418+
.block_in_place(|| {
419+
write_static_stored_file(entries, total_key_size, total_value_size, &path)
420+
})
421+
.with_context(|| format!("Unable to write SST file {seq:08}.sst"))?;
419422

420423
#[cfg(feature = "verify_sst_content")]
421424
{

turbopack/crates/turbo-tasks-backend/src/database/turbo/parallel_scheduler.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
use turbo_persistence::ParallelScheduler;
2-
use turbo_tasks::parallel;
2+
use turbo_tasks::{block_in_place, parallel};
33

44
#[derive(Clone, Copy, Default)]
55
pub struct TurboTasksParallelScheduler;
66

77
impl ParallelScheduler for TurboTasksParallelScheduler {
8+
fn block_in_place<R>(&self, f: impl FnOnce() -> R + Send) -> R
9+
where
10+
R: Send,
11+
{
12+
block_in_place(f)
13+
}
14+
815
fn parallel_for_each<T>(&self, items: &[T], f: impl Fn(&T) + Send + Sync)
916
where
1017
T: Sync,

0 commit comments

Comments
 (0)