Skip to content

Commit 69310a8

Browse files
authored
Turbopack: use block in place for db writes (#82380)
<!-- Thanks for opening a PR! Your contribution is much appreciated. To make sure your PR is handled as smoothly as possible we request that you follow the checklist sections below. Choose the right checklist for the change(s) that you're making: ## For Contributors ### Improving Documentation - Run `pnpm prettier-fix` to fix formatting issues before opening the PR. - Read the Docs Contribution Guide to ensure your contribution follows the docs guidelines: https://nextjs.org/docs/community/contribution-guide ### Adding or Updating Examples - The "examples guidelines" are followed from our contributing doc https://github.com/vercel/next.js/blob/canary/contributing/examples/adding-examples.md - Make sure the linting passes by running `pnpm build && pnpm lint`. See https://github.com/vercel/next.js/blob/canary/contributing/repository/linting.md ### Fixing a bug - Related issues linked using `fixes #number` - Tests added. See: https://github.com/vercel/next.js/blob/canary/contributing/core/testing.md#writing-tests-for-nextjs - Errors have a helpful link attached, see https://github.com/vercel/next.js/blob/canary/contributing.md ### Adding a feature - Implements an existing feature request or RFC. Make sure the feature request has been accepted for implementation before opening a PR. (A discussion must be opened, see https://github.com/vercel/next.js/discussions/new?category=ideas) - Related issues/discussions are linked using `fixes #number` - e2e tests added (https://github.com/vercel/next.js/blob/canary/contributing/core/testing.md#writing-tests-for-nextjs) - Documentation added - Telemetry added. In case of a feature if it's used or not. - Errors have a helpful link attached, see https://github.com/vercel/next.js/blob/canary/contributing.md ## For Maintainers - Minimal description (aim for explaining to someone not on the team to understand the PR) - When linking to a Slack thread, you might want to share details of the conclusion - Link both the Linear (Fixes NEXT-xxx) and the GitHub issues - Add review comments if necessary to explain to the reviewer the logic behind a change ### What? ### Why? ### How? Closes NEXT- Fixes # -->
1 parent 450b07e commit 69310a8

File tree

8 files changed

+151
-117
lines changed

8 files changed

+151
-117
lines changed

turbopack/crates/turbo-persistence/src/db.rs

Lines changed: 100 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -508,12 +508,15 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
508508
sst_filter.apply_filter(meta_file);
509509
}
510510

511-
for (_, file) in new_sst_files.iter() {
512-
file.sync_all()?;
513-
}
514-
for (_, file) in new_blob_files.iter() {
515-
file.sync_all()?;
516-
}
511+
self.parallel_scheduler.block_in_place(|| {
512+
for (_, file) in new_sst_files.iter() {
513+
file.sync_all()?;
514+
}
515+
for (_, file) in new_blob_files.iter() {
516+
file.sync_all()?;
517+
}
518+
anyhow::Ok(())
519+
})?;
517520

518521
let new_meta_info = new_meta_files
519522
.iter()
@@ -566,86 +569,88 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
566569
inner.current_sequence_number = seq;
567570
}
568571

569-
if has_delete_file {
570-
sst_seq_numbers_to_delete.sort_unstable();
571-
meta_seq_numbers_to_delete.sort_unstable();
572-
blob_seq_numbers_to_delete.sort_unstable();
573-
// Write *.del file, marking the selected files as to delete
574-
let mut buf = Vec::with_capacity(
575-
(sst_seq_numbers_to_delete.len()
576-
+ meta_seq_numbers_to_delete.len()
577-
+ blob_seq_numbers_to_delete.len())
578-
* size_of::<u32>(),
579-
);
580-
for seq in sst_seq_numbers_to_delete.iter() {
581-
buf.write_u32::<BE>(*seq)?;
582-
}
583-
for seq in meta_seq_numbers_to_delete.iter() {
584-
buf.write_u32::<BE>(*seq)?;
585-
}
586-
for seq in blob_seq_numbers_to_delete.iter() {
587-
buf.write_u32::<BE>(*seq)?;
588-
}
589-
let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
590-
file.write_all(&buf)?;
591-
file.sync_all()?;
592-
}
593-
594-
let mut current_file = OpenOptions::new()
595-
.write(true)
596-
.truncate(false)
597-
.read(false)
598-
.open(self.path.join("CURRENT"))?;
599-
current_file.write_u32::<BE>(seq)?;
600-
current_file.sync_all()?;
601-
602-
for seq in sst_seq_numbers_to_delete.iter() {
603-
fs::remove_file(self.path.join(format!("{seq:08}.sst")))?;
604-
}
605-
for seq in meta_seq_numbers_to_delete.iter() {
606-
fs::remove_file(self.path.join(format!("{seq:08}.meta")))?;
607-
}
608-
for seq in blob_seq_numbers_to_delete.iter() {
609-
fs::remove_file(self.path.join(format!("{seq:08}.blob")))?;
610-
}
611-
612-
{
613-
let mut log = self.open_log()?;
614-
writeln!(log, "Time {time}")?;
615-
let span = time.until(Timestamp::now())?;
616-
writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
617-
for (seq, family, ssts, obsolete) in new_meta_info {
618-
writeln!(log, "{seq:08} META family:{family}",)?;
619-
for (seq, min, max, size) in ssts {
620-
writeln!(
621-
log,
622-
" {seq:08} SST {min:016x}-{max:016x} {} MiB",
623-
size / 1024 / 1024
624-
)?;
572+
self.parallel_scheduler.block_in_place(|| {
573+
if has_delete_file {
574+
sst_seq_numbers_to_delete.sort_unstable();
575+
meta_seq_numbers_to_delete.sort_unstable();
576+
blob_seq_numbers_to_delete.sort_unstable();
577+
// Write *.del file, marking the selected files as to delete
578+
let mut buf = Vec::with_capacity(
579+
(sst_seq_numbers_to_delete.len()
580+
+ meta_seq_numbers_to_delete.len()
581+
+ blob_seq_numbers_to_delete.len())
582+
* size_of::<u32>(),
583+
);
584+
for seq in sst_seq_numbers_to_delete.iter() {
585+
buf.write_u32::<BE>(*seq)?;
625586
}
626-
for seq in obsolete {
627-
writeln!(log, " {seq:08} OBSOLETE SST")?;
587+
for seq in meta_seq_numbers_to_delete.iter() {
588+
buf.write_u32::<BE>(*seq)?;
628589
}
590+
for seq in blob_seq_numbers_to_delete.iter() {
591+
buf.write_u32::<BE>(*seq)?;
592+
}
593+
let mut file = File::create(self.path.join(format!("{seq:08}.del")))?;
594+
file.write_all(&buf)?;
595+
file.sync_all()?;
629596
}
630-
new_sst_files.sort_unstable_by_key(|(seq, _)| *seq);
631-
for (seq, _) in new_sst_files.iter() {
632-
writeln!(log, "{seq:08} NEW SST")?;
633-
}
634-
new_blob_files.sort_unstable_by_key(|(seq, _)| *seq);
635-
for (seq, _) in new_blob_files.iter() {
636-
writeln!(log, "{seq:08} NEW BLOB")?;
637-
}
597+
598+
let mut current_file = OpenOptions::new()
599+
.write(true)
600+
.truncate(false)
601+
.read(false)
602+
.open(self.path.join("CURRENT"))?;
603+
current_file.write_u32::<BE>(seq)?;
604+
current_file.sync_all()?;
605+
638606
for seq in sst_seq_numbers_to_delete.iter() {
639-
writeln!(log, "{seq:08} SST DELETED")?;
607+
fs::remove_file(self.path.join(format!("{seq:08}.sst")))?;
640608
}
641609
for seq in meta_seq_numbers_to_delete.iter() {
642-
writeln!(log, "{seq:08} META DELETED")?;
610+
fs::remove_file(self.path.join(format!("{seq:08}.meta")))?;
643611
}
644612
for seq in blob_seq_numbers_to_delete.iter() {
645-
writeln!(log, "{seq:08} BLOB DELETED")?;
613+
fs::remove_file(self.path.join(format!("{seq:08}.blob")))?;
646614
}
647-
}
648615

616+
{
617+
let mut log = self.open_log()?;
618+
writeln!(log, "Time {time}")?;
619+
let span = time.until(Timestamp::now())?;
620+
writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
621+
for (seq, family, ssts, obsolete) in new_meta_info {
622+
writeln!(log, "{seq:08} META family:{family}",)?;
623+
for (seq, min, max, size) in ssts {
624+
writeln!(
625+
log,
626+
" {seq:08} SST {min:016x}-{max:016x} {} MiB",
627+
size / 1024 / 1024
628+
)?;
629+
}
630+
for seq in obsolete {
631+
writeln!(log, " {seq:08} OBSOLETE SST")?;
632+
}
633+
}
634+
new_sst_files.sort_unstable_by_key(|(seq, _)| *seq);
635+
for (seq, _) in new_sst_files.iter() {
636+
writeln!(log, "{seq:08} NEW SST")?;
637+
}
638+
new_blob_files.sort_unstable_by_key(|(seq, _)| *seq);
639+
for (seq, _) in new_blob_files.iter() {
640+
writeln!(log, "{seq:08} NEW BLOB")?;
641+
}
642+
for seq in sst_seq_numbers_to_delete.iter() {
643+
writeln!(log, "{seq:08} SST DELETED")?;
644+
}
645+
for seq in meta_seq_numbers_to_delete.iter() {
646+
writeln!(log, "{seq:08} META DELETED")?;
647+
}
648+
for seq in blob_seq_numbers_to_delete.iter() {
649+
writeln!(log, "{seq:08} BLOB DELETED")?;
650+
}
651+
}
652+
anyhow::Ok(())
653+
})?;
649654
Ok(())
650655
}
651656

@@ -837,7 +842,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
837842
});
838843
}
839844

840-
{
845+
self.parallel_scheduler.block_in_place(|| {
841846
let metrics = compute_metrics(&ssts_with_ranges, 0..=u64::MAX);
842847
let guard = log_mutex.lock();
843848
let mut log = self.open_log()?;
@@ -859,7 +864,8 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
859864
}
860865
}
861866
drop(guard);
862-
}
867+
anyhow::Ok(())
868+
})?;
863869

864870
// Later we will remove the merged files
865871
let sst_seq_numbers_to_delete = merge_jobs
@@ -912,7 +918,8 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
912918
});
913919
}
914920

915-
fn create_sst_file(
921+
fn create_sst_file<S: ParallelScheduler>(
922+
parallel_scheduler: &S,
916923
entries: &[LookupEntry],
917924
total_key_size: usize,
918925
total_value_size: usize,
@@ -921,12 +928,14 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
921928
) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)>
922929
{
923930
let _span = tracing::trace_span!("write merged sst file").entered();
924-
let (meta, file) = write_static_stored_file(
925-
entries,
926-
total_key_size,
927-
total_value_size,
928-
&path.join(format!("{seq:08}.sst")),
929-
)?;
931+
let (meta, file) = parallel_scheduler.block_in_place(|| {
932+
write_static_stored_file(
933+
entries,
934+
total_key_size,
935+
total_value_size,
936+
&path.join(format!("{seq:08}.sst")),
937+
)
938+
})?;
930939
Ok((seq, file, meta))
931940
}
932941

@@ -993,6 +1002,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
9931002

9941003
keys_written += entries.len() as u64;
9951004
new_sst_files.push(create_sst_file(
1005+
&self.parallel_scheduler,
9961006
&entries,
9971007
selected_total_key_size,
9981008
selected_total_value_size,
@@ -1023,6 +1033,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
10231033

10241034
keys_written += entries.len() as u64;
10251035
new_sst_files.push(create_sst_file(
1036+
&self.parallel_scheduler,
10261037
&entries,
10271038
total_key_size,
10281039
total_value_size,
@@ -1046,6 +1057,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
10461057

10471058
keys_written += part1.len() as u64;
10481059
new_sst_files.push(create_sst_file(
1060+
&self.parallel_scheduler,
10491061
part1,
10501062
// We don't know the exact sizes so we estimate them
10511063
last_entries_total_sizes.0 / 2,
@@ -1056,6 +1068,7 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
10561068

10571069
keys_written += part2.len() as u64;
10581070
new_sst_files.push(create_sst_file(
1071+
&self.parallel_scheduler,
10591072
part2,
10601073
last_entries_total_sizes.0 / 2,
10611074
last_entries_total_sizes.1 / 2,
@@ -1126,7 +1139,8 @@ impl<S: ParallelScheduler> TurboPersistence<S> {
11261139
let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
11271140
let meta_file = {
11281141
let _span = tracing::trace_span!("write meta file").entered();
1129-
meta_file_builder.write(&self.path, seq)?
1142+
self.parallel_scheduler
1143+
.block_in_place(|| meta_file_builder.write(&self.path, seq))?
11301144
};
11311145

11321146
Ok(PartialResultPerFamily {

turbopack/crates/turbo-persistence/src/parallel_scheduler.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
pub trait ParallelScheduler: Clone + Sync + Send {
2+
fn block_in_place<R>(&self, f: impl FnOnce() -> R + Send) -> R
3+
where
4+
R: Send;
5+
26
fn parallel_for_each<T>(&self, items: &[T], f: impl Fn(&T) + Send + Sync)
37
where
48
T: Sync;
@@ -55,6 +59,13 @@ pub trait ParallelScheduler: Clone + Sync + Send {
5559
pub struct SerialScheduler;
5660

5761
impl ParallelScheduler for SerialScheduler {
62+
fn block_in_place<R>(&self, f: impl FnOnce() -> R + Send) -> R
63+
where
64+
R: Send,
65+
{
66+
f()
67+
}
68+
5869
fn parallel_for_each<T>(&self, items: &[T], f: impl Fn(&T) + Send + Sync)
5970
where
6071
T: Sync,

turbopack/crates/turbo-persistence/src/tests.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ use crate::{
1414
struct RayonParallelScheduler;
1515

1616
impl ParallelScheduler for RayonParallelScheduler {
17+
fn block_in_place<R>(&self, f: impl FnOnce() -> R + Send) -> R
18+
where
19+
R: Send,
20+
{
21+
f()
22+
}
23+
1724
fn parallel_for_each<T>(&self, items: &[T], f: impl Fn(&T) + Send + Sync)
1825
where
1926
T: Sync,

turbopack/crates/turbo-persistence/src/write_batch.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -413,9 +413,12 @@ impl<K: StoreKey + Send + Sync, S: ParallelScheduler, const FAMILIES: usize>
413413
let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
414414

415415
let path = self.db_path.join(format!("{seq:08}.sst"));
416-
let (meta, file) =
417-
write_static_stored_file(entries, total_key_size, total_value_size, &path)
418-
.with_context(|| format!("Unable to write SST file {seq:08}.sst"))?;
416+
let (meta, file) = self
417+
.parallel_scheduler
418+
.block_in_place(|| {
419+
write_static_stored_file(entries, total_key_size, total_value_size, &path)
420+
})
421+
.with_context(|| format!("Unable to write SST file {seq:08}.sst"))?;
419422

420423
#[cfg(feature = "verify_sst_content")]
421424
{

turbopack/crates/turbo-tasks-backend/src/database/turbo/parallel_scheduler.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
use turbo_persistence::ParallelScheduler;
2-
use turbo_tasks::parallel;
2+
use turbo_tasks::{block_in_place, parallel};
33

44
#[derive(Clone, Copy, Default)]
55
pub struct TurboTasksParallelScheduler;
66

77
impl ParallelScheduler for TurboTasksParallelScheduler {
8+
fn block_in_place<R>(&self, f: impl FnOnce() -> R + Send) -> R
9+
where
10+
R: Send,
11+
{
12+
block_in_place(f)
13+
}
14+
815
fn parallel_for_each<T>(&self, items: &[T], f: impl Fn(&T) + Send + Sync)
916
where
1017
T: Sync,

0 commit comments

Comments
 (0)