From ad06558723224732c5ef9a69abbc4aefa3a5e6bc Mon Sep 17 00:00:00 2001 From: Niklas Mischkulnig <4586894+mischnic@users.noreply.github.com> Date: Thu, 14 Aug 2025 20:28:29 +0200 Subject: [PATCH] Revert "Turbopack: avoid using rayon in favor of tokio tasks (#82256)" This reverts commit 823b9f50cd21214260cc03890d20b81364fc22ce. --- Cargo.lock | 1 + .../turbo-persistence-tools/src/main.rs | 4 +- turbopack/crates/turbo-persistence/Cargo.toml | 2 +- .../crates/turbo-persistence/src/collector.rs | 9 - turbopack/crates/turbo-persistence/src/db.rs | 676 +++++++++--------- turbopack/crates/turbo-persistence/src/lib.rs | 10 +- .../src/parallel_scheduler.rs | 137 ---- .../crates/turbo-persistence/src/tests.rs | 219 +----- .../turbo-persistence/src/write_batch.rs | 154 ++-- .../crates/turbo-tasks-backend/Cargo.toml | 1 + .../turbo-tasks-backend/src/backend/mod.rs | 9 +- .../src/backend/storage.rs | 72 +- .../src/database/{turbo/mod.rs => turbo.rs} | 28 +- .../src/database/turbo/parallel_scheduler.rs | 76 -- .../src/kv_backing_storage.rs | 211 +++--- .../turbo-tasks-backend/tests/all_in_one.rs | 2 +- .../crates/turbo-tasks-backend/tests/basic.rs | 2 +- .../crates/turbo-tasks-backend/tests/bug.rs | 2 +- .../crates/turbo-tasks-backend/tests/bug2.rs | 2 +- .../turbo-tasks-backend/tests/call_types.rs | 6 +- .../turbo-tasks-backend/tests/collectibles.rs | 14 +- .../crates/turbo-tasks-backend/tests/debug.rs | 18 +- .../turbo-tasks-backend/tests/detached.rs | 4 +- .../tests/dirty_in_progress.rs | 2 +- .../tests/emptied_cells.rs | 2 +- .../tests/filter_unused_args.rs | 2 +- .../turbo-tasks-backend/tests/immutable.rs | 2 +- .../turbo-tasks-backend/tests/local_tasks.rs | 2 +- .../turbo-tasks-backend/tests/operation_vc.rs | 2 +- .../turbo-tasks-backend/tests/panics.rs | 2 +- .../turbo-tasks-backend/tests/performance.rs | 22 +- .../tests/random_change.rs | 2 +- .../tests/read_ref_cell.rs | 2 +- .../turbo-tasks-backend/tests/recompute.rs | 4 +- .../tests/recompute_collectibles.rs | 2 +- .../turbo-tasks-backend/tests/resolved_vc.rs | 10 +- .../tests/shrink_to_fit.rs | 2 +- .../tests/task_statistics.rs | 12 +- .../tests/trace_transient.rs | 2 +- .../tests/trait_ref_cell.rs | 2 +- .../tests/trait_ref_cell_mode.rs | 4 +- .../tests/transient_collectible.rs | 2 +- .../turbo-tasks-backend/tests/transient_vc.rs | 2 +- .../crates/turbo-tasks-fetch/tests/fetch.rs | 12 +- turbopack/crates/turbo-tasks-fs/src/lib.rs | 38 +- .../crates/turbo-tasks-fs/src/watcher.rs | 51 +- turbopack/crates/turbo-tasks/src/lib.rs | 6 +- turbopack/crates/turbo-tasks/src/manager.rs | 43 +- turbopack/crates/turbo-tasks/src/parallel.rs | 308 -------- turbopack/crates/turbo-tasks/src/scope.rs | 309 +------- turbopack/crates/turbo-tasks/src/util.rs | 123 ---- .../crates/turbopack/tests/node-file-trace.rs | 53 +- 52 files changed, 837 insertions(+), 1847 deletions(-) delete mode 100644 turbopack/crates/turbo-persistence/src/parallel_scheduler.rs rename turbopack/crates/turbo-tasks-backend/src/database/{turbo/mod.rs => turbo.rs} (89%) delete mode 100644 turbopack/crates/turbo-tasks-backend/src/database/turbo/parallel_scheduler.rs delete mode 100644 turbopack/crates/turbo-tasks/src/parallel.rs diff --git a/Cargo.lock b/Cargo.lock index 4cea0eeeadb36..886d0a2c8ffeb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9252,6 +9252,7 @@ dependencies = [ "parking_lot", "pot", "rand 0.9.0", + "rayon", "regex", "ringmap", "rstest", diff --git a/turbopack/crates/turbo-persistence-tools/src/main.rs b/turbopack/crates/turbo-persistence-tools/src/main.rs index 6a384bae92ab4..25fbe6d31201b 100644 --- a/turbopack/crates/turbo-persistence-tools/src/main.rs +++ b/turbopack/crates/turbo-persistence-tools/src/main.rs @@ -3,7 +3,7 @@ use std::path::PathBuf; use anyhow::{Context, Result, bail}; -use turbo_persistence::{MetaFileEntryInfo, SerialScheduler, TurboPersistence}; +use turbo_persistence::{MetaFileEntryInfo, TurboPersistence}; fn main() -> Result<()> { // Get CLI argument @@ -16,7 +16,7 @@ fn main() -> Result<()> { bail!("The provided path does not exist: {}", path.display()); } - let db: TurboPersistence = TurboPersistence::open_read_only(path)?; + let db = TurboPersistence::open_read_only(path)?; let meta_info = db .meta_info() .context("Failed to retrieve meta information")?; diff --git a/turbopack/crates/turbo-persistence/Cargo.toml b/turbopack/crates/turbo-persistence/Cargo.toml index ac4e7e5cc45f3..d771b66855296 100644 --- a/turbopack/crates/turbo-persistence/Cargo.toml +++ b/turbopack/crates/turbo-persistence/Cargo.toml @@ -22,6 +22,7 @@ memmap2 = "0.9.5" parking_lot = { workspace = true } qfilter = { version = "0.2.4", features = ["serde"] } quick_cache = { workspace = true } +rayon = { workspace = true } rustc-hash = { workspace = true } smallvec = { workspace = true } thread_local = { workspace = true } @@ -31,7 +32,6 @@ zstd = { version = "0.13.2", features = ["zdict_builder"] } [dev-dependencies] rand = { workspace = true, features = ["small_rng"] } -rayon = { workspace = true } tempfile = { workspace = true } [lints] diff --git a/turbopack/crates/turbo-persistence/src/collector.rs b/turbopack/crates/turbo-persistence/src/collector.rs index 6637ea2c13e3c..ea8b04ab16e70 100644 --- a/turbopack/crates/turbo-persistence/src/collector.rs +++ b/turbopack/crates/turbo-persistence/src/collector.rs @@ -1,5 +1,3 @@ -use std::mem::take; - use crate::{ ValueBuffer, collector_entry::{CollectorEntry, CollectorEntryValue, EntryKey}, @@ -113,11 +111,4 @@ impl Collector { self.total_value_size = 0; self.entries.drain(..) } - - /// Clears the collector and drops the capacity - pub fn drop_contents(&mut self) { - drop(take(&mut self.entries)); - self.total_key_size = 0; - self.total_value_size = 0; - } } diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index 9936c9349cc8b..58e35e7d19175 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -18,6 +18,8 @@ use jiff::Timestamp; use lzzzz::lz4::decompress; use memmap2::Mmap; use parking_lot::{Mutex, RwLock}; +use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; +use tracing::Span; pub use crate::compaction::selector::CompactConfig; use crate::{ @@ -34,7 +36,6 @@ use crate::{ merge_iter::MergeIter, meta_file::{AmqfCache, MetaFile, MetaLookupResult, StaticSortedFileRange}, meta_file_builder::MetaFileBuilder, - parallel_scheduler::ParallelScheduler, sst_filter::SstFilter, static_sorted_file::{BlockCache, SstLookupResult}, static_sorted_file_builder::{StaticSortedFileBuilderMeta, write_static_stored_file}, @@ -107,8 +108,7 @@ struct TrackedStats { /// TurboPersistence is a persistent key-value store. It is limited to a single writer at a time /// using a single write batch. It allows for concurrent reads. -pub struct TurboPersistence { - parallel_scheduler: S, +pub struct TurboPersistence { /// The path to the directory where the database is stored path: PathBuf, /// If true, the database is opened in read-only mode. In this mode, no writes are allowed and @@ -148,26 +148,9 @@ pub struct CommitOptions { keys_written: u64, } -impl TurboPersistence { - /// Open a TurboPersistence database at the given path. - /// This will read the directory and might performance cleanup when the database was not closed - /// properly. Cleanup only requires to read a few bytes from a few files and to delete - /// files, so it's fast. - pub fn open(path: PathBuf) -> Result { - Self::open_with_parallel_scheduler(path, Default::default()) - } - - /// Open a TurboPersistence database at the given path in read only mode. - /// This will read the directory. No Cleanup is performed. - pub fn open_read_only(path: PathBuf) -> Result { - Self::open_read_only_with_parallel_scheduler(path, Default::default()) - } -} - -impl TurboPersistence { - fn new(path: PathBuf, read_only: bool, parallel_scheduler: S) -> Self { +impl TurboPersistence { + fn new(path: PathBuf, read_only: bool) -> Self { Self { - parallel_scheduler, path, read_only, inner: RwLock::new(Inner { @@ -205,19 +188,16 @@ impl TurboPersistence { /// This will read the directory and might performance cleanup when the database was not closed /// properly. Cleanup only requires to read a few bytes from a few files and to delete /// files, so it's fast. - pub fn open_with_parallel_scheduler(path: PathBuf, parallel_scheduler: S) -> Result { - let mut db = Self::new(path, false, parallel_scheduler); + pub fn open(path: PathBuf) -> Result { + let mut db = Self::new(path, false); db.open_directory(false)?; Ok(db) } /// Open a TurboPersistence database at the given path in read only mode. /// This will read the directory. No Cleanup is performed. - pub fn open_read_only_with_parallel_scheduler( - path: PathBuf, - parallel_scheduler: S, - ) -> Result { - let mut db = Self::new(path, true, parallel_scheduler); + pub fn open_read_only(path: PathBuf) -> Result { + let mut db = Self::new(path, true); db.open_directory(false)?; Ok(db) } @@ -361,12 +341,16 @@ impl TurboPersistence { meta_files.retain(|seq| !deleted_files.contains(seq)); meta_files.sort_unstable(); - let mut meta_files = self - .parallel_scheduler - .parallel_map_collect::<_, _, Result>>(&meta_files, |&seq| { + let span = Span::current(); + let mut meta_files = meta_files + .into_par_iter() + .with_min_len(1) + .map(|seq| { + let _span = span.enter(); let meta_file = MetaFile::open(&self.path, seq)?; Ok(meta_file) - })?; + }) + .collect::>>()?; let mut sst_filter = SstFilter::new(); for meta_file in meta_files.iter_mut().rev() { @@ -414,7 +398,7 @@ impl TurboPersistence { /// This data will only become visible after the WriteBatch is committed. pub fn write_batch( &self, - ) -> Result> { + ) -> Result> { if self.read_only { bail!("Cannot write to a read-only database"); } @@ -429,11 +413,7 @@ impl TurboPersistence { ); } let current = self.inner.read().current_sequence_number; - Ok(WriteBatch::new( - self.path.clone(), - current, - self.parallel_scheduler.clone(), - )) + Ok(WriteBatch::new(self.path.clone(), current)) } fn open_log(&self) -> Result> { @@ -452,7 +432,7 @@ impl TurboPersistence { /// visible to readers. pub fn commit_write_batch( &self, - mut write_batch: WriteBatch, + mut write_batch: WriteBatch, ) -> Result<()> { if self.read_only { unreachable!("It's not possible to create a write batch for a read-only database"); @@ -495,13 +475,15 @@ impl TurboPersistence { new_meta_files.sort_unstable_by_key(|(seq, _)| *seq); - let mut new_meta_files = self - .parallel_scheduler - .parallel_map_collect_owned::<_, _, Result>>(new_meta_files, |(seq, file)| { + let mut new_meta_files = new_meta_files + .into_par_iter() + .with_min_len(1) + .map(|(seq, file)| { file.sync_all()?; let meta_file = MetaFile::open(&self.path, seq)?; Ok(meta_file) - })?; + }) + .collect::>>()?; let mut sst_filter = SstFilter::new(); for meta_file in new_meta_files.iter_mut().rev() { @@ -795,6 +777,7 @@ impl TurboPersistence { let path = &self.path; let log_mutex = Mutex::new(()); + let span = Span::current(); struct PartialResultPerFamily { new_meta_file: Option<(u32, File)>, @@ -806,337 +789,336 @@ impl TurboPersistence { let mut compact_config = compact_config.clone(); let merge_jobs = sst_by_family - .into_iter() - .enumerate() - .filter_map(|(family, ssts_with_ranges)| { + .iter() + .map(|ssts_with_ranges| { if compact_config.max_merge_segment_count == 0 { - return None; + return Vec::new(); } let (merge_jobs, real_merge_job_size) = - get_merge_segments(&ssts_with_ranges, &compact_config); + get_merge_segments(ssts_with_ranges, &compact_config); compact_config.max_merge_segment_count -= real_merge_job_size; - Some((family, ssts_with_ranges, merge_jobs)) + merge_jobs }) .collect::>(); - let result = self - .parallel_scheduler - .parallel_map_collect_owned::<_, _, Result>>( - merge_jobs, - |(family, ssts_with_ranges, merge_jobs)| { - let family = family as u32; - - if merge_jobs.is_empty() { - return Ok(PartialResultPerFamily { - new_meta_file: None, - new_sst_files: Vec::new(), - sst_seq_numbers_to_delete: Vec::new(), - blob_seq_numbers_to_delete: Vec::new(), - keys_written: 0, - }); - } + let result = sst_by_family + .into_par_iter() + .zip(merge_jobs.into_par_iter()) + .with_min_len(1) + .enumerate() + .map(|(family, (ssts_with_ranges, merge_jobs))| { + let family = family as u32; + let _span = span.clone().entered(); + + if merge_jobs.is_empty() { + return Ok(PartialResultPerFamily { + new_meta_file: None, + new_sst_files: Vec::new(), + sst_seq_numbers_to_delete: Vec::new(), + blob_seq_numbers_to_delete: Vec::new(), + keys_written: 0, + }); + } - { - let metrics = compute_metrics(&ssts_with_ranges, 0..=u64::MAX); - let guard = log_mutex.lock(); - let mut log = self.open_log()?; - writeln!( - log, - "Compaction for family {family} (coverage: {}, overlap: {}, \ - duplication: {} / {} MiB):", - metrics.coverage, - metrics.overlap, - metrics.duplication, - metrics.duplicated_size / 1024 / 1024 - )?; - for job in merge_jobs.iter() { - writeln!(log, " merge")?; - for i in job.iter() { - let seq = ssts_with_ranges[*i].seq; - let (min, max) = ssts_with_ranges[*i].range().into_inner(); - writeln!(log, " {seq:08} {min:016x}-{max:016x}")?; - } + { + let metrics = compute_metrics(&ssts_with_ranges, 0..=u64::MAX); + let guard = log_mutex.lock(); + let mut log = self.open_log()?; + writeln!( + log, + "Compaction for family {family} (coverage: {}, overlap: {}, duplication: \ + {} / {} MiB):", + metrics.coverage, + metrics.overlap, + metrics.duplication, + metrics.duplicated_size / 1024 / 1024 + )?; + for job in merge_jobs.iter() { + writeln!(log, " merge")?; + for i in job.iter() { + let seq = ssts_with_ranges[*i].seq; + let (min, max) = ssts_with_ranges[*i].range().into_inner(); + writeln!(log, " {seq:08} {min:016x}-{max:016x}")?; } - drop(guard); } + drop(guard); + } - // Later we will remove the merged files - let sst_seq_numbers_to_delete = merge_jobs - .iter() - .filter(|l| l.len() > 1) - .flat_map(|l| l.iter().copied()) - .map(|index| ssts_with_ranges[index].seq) - .collect::>(); - - // Merge SST files - let span = tracing::trace_span!("merge files"); - enum PartialMergeResult<'l> { - Merged { - new_sst_files: Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>, - blob_seq_numbers_to_delete: Vec, - keys_written: u64, - }, - Move { + // Later we will remove the merged files + let sst_seq_numbers_to_delete = merge_jobs + .iter() + .filter(|l| l.len() > 1) + .flat_map(|l| l.iter().copied()) + .map(|index| ssts_with_ranges[index].seq) + .collect::>(); + + // Merge SST files + let span = tracing::trace_span!("merge files"); + enum PartialMergeResult<'l> { + Merged { + new_sst_files: Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>, + blob_seq_numbers_to_delete: Vec, + keys_written: u64, + }, + Move { + seq: u32, + meta: StaticSortedFileBuilderMeta<'l>, + }, + } + let merge_result = merge_jobs + .into_par_iter() + .with_min_len(1) + .map(|indices| { + let _span = span.clone().entered(); + if indices.len() == 1 { + // If we only have one file, we can just move it + let index = indices[0]; + let meta_index = ssts_with_ranges[index].meta_index; + let index_in_meta = ssts_with_ranges[index].index_in_meta; + let meta_file = &meta_files[meta_index]; + let entry = meta_file.entry(index_in_meta); + let amqf = Cow::Borrowed(entry.raw_amqf(meta_file.amqf_data())); + let meta = StaticSortedFileBuilderMeta { + min_hash: entry.min_hash(), + max_hash: entry.max_hash(), + amqf, + key_compression_dictionary_length: entry + .key_compression_dictionary_length(), + value_compression_dictionary_length: entry + .value_compression_dictionary_length(), + block_count: entry.block_count(), + size: entry.size(), + entries: 0, + }; + return Ok(PartialMergeResult::Move { + seq: entry.sequence_number(), + meta, + }); + } + + fn create_sst_file( + entries: &[LookupEntry], + total_key_size: usize, + total_value_size: usize, + path: &Path, seq: u32, - meta: StaticSortedFileBuilderMeta<'l>, - }, - } - let merge_result = self - .parallel_scheduler - .parallel_map_collect_owned::<_, _, Result>>(merge_jobs, |indices| { - let _span = span.clone().entered(); - if indices.len() == 1 { - // If we only have one file, we can just move it - let index = indices[0]; - let meta_index = ssts_with_ranges[index].meta_index; - let index_in_meta = ssts_with_ranges[index].index_in_meta; - let meta_file = &meta_files[meta_index]; - let entry = meta_file.entry(index_in_meta); - let amqf = Cow::Borrowed(entry.raw_amqf(meta_file.amqf_data())); - let meta = StaticSortedFileBuilderMeta { - min_hash: entry.min_hash(), - max_hash: entry.max_hash(), - amqf, - key_compression_dictionary_length: entry - .key_compression_dictionary_length(), - value_compression_dictionary_length: entry - .value_compression_dictionary_length(), - block_count: entry.block_count(), - size: entry.size(), - entries: 0, - }; - return Ok(PartialMergeResult::Move { - seq: entry.sequence_number(), - meta, - }); - } + ) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)> + { + let _span = tracing::trace_span!("write merged sst file").entered(); + let (meta, file) = write_static_stored_file( + entries, + total_key_size, + total_value_size, + &path.join(format!("{seq:08}.sst")), + )?; + Ok((seq, file, meta)) + } - fn create_sst_file( - entries: &[LookupEntry], - total_key_size: usize, - total_value_size: usize, - path: &Path, - seq: u32, - ) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)> - { - let _span = tracing::trace_span!("write merged sst file").entered(); - let (meta, file) = write_static_stored_file( - entries, - total_key_size, - total_value_size, - &path.join(format!("{seq:08}.sst")), - )?; - Ok((seq, file, meta)) - } + let mut new_sst_files = Vec::new(); - let mut new_sst_files = Vec::new(); - - // Iterate all SST files - let iters = indices - .iter() - .map(|&index| { - let meta_index = ssts_with_ranges[index].meta_index; - let index_in_meta = ssts_with_ranges[index].index_in_meta; - let meta = &meta_files[meta_index]; - meta.entry(index_in_meta) - .sst(meta)? - .iter(key_block_cache, value_block_cache) - }) - .collect::>>()?; - - let iter = MergeIter::new(iters.into_iter())?; - - // TODO figure out how to delete blobs when they are no longer - // referenced - let blob_seq_numbers_to_delete: Vec = Vec::new(); - - let mut keys_written = 0; - - let mut total_key_size = 0; - let mut total_value_size = 0; - let mut current: Option = None; - let mut entries = Vec::new(); - let mut last_entries = Vec::new(); - let mut last_entries_total_sizes = (0, 0); - for entry in iter { - let entry = entry?; - - // Remove duplicates - if let Some(current) = current.take() { - if current.key != entry.key { - let key_size = current.key.len(); - let value_size = current.value.size_in_sst(); - total_key_size += key_size; - total_value_size += value_size; - - if total_key_size + total_value_size - > DATA_THRESHOLD_PER_COMPACTED_FILE - || entries.len() >= MAX_ENTRIES_PER_COMPACTED_FILE - { - let ( + // Iterate all SST files + let iters = indices + .iter() + .map(|&index| { + let meta_index = ssts_with_ranges[index].meta_index; + let index_in_meta = ssts_with_ranges[index].index_in_meta; + let meta = &meta_files[meta_index]; + meta.entry(index_in_meta) + .sst(meta)? + .iter(key_block_cache, value_block_cache) + }) + .collect::>>()?; + + let iter = MergeIter::new(iters.into_iter())?; + + // TODO figure out how to delete blobs when they are no longer + // referenced + let blob_seq_numbers_to_delete: Vec = Vec::new(); + + let mut keys_written = 0; + + let mut total_key_size = 0; + let mut total_value_size = 0; + let mut current: Option = None; + let mut entries = Vec::new(); + let mut last_entries = Vec::new(); + let mut last_entries_total_sizes = (0, 0); + for entry in iter { + let entry = entry?; + + // Remove duplicates + if let Some(current) = current.take() { + if current.key != entry.key { + let key_size = current.key.len(); + let value_size = current.value.size_in_sst(); + total_key_size += key_size; + total_value_size += value_size; + + if total_key_size + total_value_size + > DATA_THRESHOLD_PER_COMPACTED_FILE + || entries.len() >= MAX_ENTRIES_PER_COMPACTED_FILE + { + let (selected_total_key_size, selected_total_value_size) = + last_entries_total_sizes; + swap(&mut entries, &mut last_entries); + last_entries_total_sizes = ( + total_key_size - key_size, + total_value_size - value_size, + ); + total_key_size = key_size; + total_value_size = value_size; + + if !entries.is_empty() { + let seq = + sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + + keys_written += entries.len() as u64; + new_sst_files.push(create_sst_file( + &entries, selected_total_key_size, selected_total_value_size, - ) = last_entries_total_sizes; - swap(&mut entries, &mut last_entries); - last_entries_total_sizes = ( - total_key_size - key_size, - total_value_size - value_size, - ); - total_key_size = key_size; - total_value_size = value_size; - - if !entries.is_empty() { - let seq = sequence_number - .fetch_add(1, Ordering::SeqCst) - + 1; - - keys_written += entries.len() as u64; - new_sst_files.push(create_sst_file( - &entries, - selected_total_key_size, - selected_total_value_size, - path, - seq, - )?); - - entries.clear(); - } - } + path, + seq, + )?); - entries.push(current); - } else { - // Override value + entries.clear(); + } } + + entries.push(current); + } else { + // Override value } - current = Some(entry); - } - if let Some(entry) = current { - total_key_size += entry.key.len(); - total_value_size += entry.value.size_in_sst(); - entries.push(entry); } + current = Some(entry); + } + if let Some(entry) = current { + total_key_size += entry.key.len(); + total_value_size += entry.value.size_in_sst(); + entries.push(entry); + } - // If we have one set of entries left, write them to a new SST file - if last_entries.is_empty() && !entries.is_empty() { - let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; - - keys_written += entries.len() as u64; - new_sst_files.push(create_sst_file( - &entries, - total_key_size, - total_value_size, - path, - seq, - )?); - } else - // If we have two sets of entries left, merge them and - // split it into two SST files, to avoid having a - // single SST file that is very small. - if !last_entries.is_empty() { - last_entries.append(&mut entries); - - last_entries_total_sizes.0 += total_key_size; - last_entries_total_sizes.1 += total_value_size; - - let (part1, part2) = last_entries.split_at(last_entries.len() / 2); - - let seq1 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; - let seq2 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; - - keys_written += part1.len() as u64; - new_sst_files.push(create_sst_file( - part1, - // We don't know the exact sizes so we estimate them - last_entries_total_sizes.0 / 2, - last_entries_total_sizes.1 / 2, - path, - seq1, - )?); - - keys_written += part2.len() as u64; - new_sst_files.push(create_sst_file( - part2, - last_entries_total_sizes.0 / 2, - last_entries_total_sizes.1 / 2, - path, - seq2, - )?); - } - Ok(PartialMergeResult::Merged { - new_sst_files, - blob_seq_numbers_to_delete, - keys_written, - }) - }) - .with_context(|| { - format!("Failed to merge database files for family {family}") - })?; - - let Some((sst_files_len, blob_delete_len)) = merge_result - .iter() - .map(|r| { - if let PartialMergeResult::Merged { - new_sst_files, - blob_seq_numbers_to_delete, - keys_written: _, - } = r - { - (new_sst_files.len(), blob_seq_numbers_to_delete.len()) - } else { - (0, 0) - } + // If we have one set of entries left, write them to a new SST file + if last_entries.is_empty() && !entries.is_empty() { + let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + + keys_written += entries.len() as u64; + new_sst_files.push(create_sst_file( + &entries, + total_key_size, + total_value_size, + path, + seq, + )?); + } else + // If we have two sets of entries left, merge them and + // split it into two SST files, to avoid having a + // single SST file that is very small. + if !last_entries.is_empty() { + last_entries.append(&mut entries); + + last_entries_total_sizes.0 += total_key_size; + last_entries_total_sizes.1 += total_value_size; + + let (part1, part2) = last_entries.split_at(last_entries.len() / 2); + + let seq1 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + let seq2 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + + keys_written += part1.len() as u64; + new_sst_files.push(create_sst_file( + part1, + // We don't know the exact sizes so we estimate them + last_entries_total_sizes.0 / 2, + last_entries_total_sizes.1 / 2, + path, + seq1, + )?); + + keys_written += part2.len() as u64; + new_sst_files.push(create_sst_file( + part2, + last_entries_total_sizes.0 / 2, + last_entries_total_sizes.1 / 2, + path, + seq2, + )?); + } + Ok(PartialMergeResult::Merged { + new_sst_files, + blob_seq_numbers_to_delete, + keys_written, }) - .reduce(|(a1, a2), (b1, b2)| (a1 + b1, a2 + b2)) - else { - unreachable!() - }; - - let mut new_sst_files = Vec::with_capacity(sst_files_len); - let mut blob_seq_numbers_to_delete = Vec::with_capacity(blob_delete_len); - - let mut meta_file_builder = MetaFileBuilder::new(family); - - let mut keys_written = 0; - for result in merge_result { - match result { - PartialMergeResult::Merged { - new_sst_files: merged_new_sst_files, - blob_seq_numbers_to_delete: merged_blob_seq_numbers_to_delete, - keys_written: merged_keys_written, - } => { - for (seq, file, meta) in merged_new_sst_files { - meta_file_builder.add(seq, meta); - new_sst_files.push((seq, file)); - } - blob_seq_numbers_to_delete - .extend(merged_blob_seq_numbers_to_delete); - keys_written += merged_keys_written; - } - PartialMergeResult::Move { seq, meta } => { + }) + .collect::>>() + .with_context(|| { + format!("Failed to merge database files for family {family}") + })?; + + let Some((sst_files_len, blob_delete_len)) = merge_result + .iter() + .map(|r| { + if let PartialMergeResult::Merged { + new_sst_files, + blob_seq_numbers_to_delete, + keys_written: _, + } = r + { + (new_sst_files.len(), blob_seq_numbers_to_delete.len()) + } else { + (0, 0) + } + }) + .reduce(|(a1, a2), (b1, b2)| (a1 + b1, a2 + b2)) + else { + unreachable!() + }; + + let mut new_sst_files = Vec::with_capacity(sst_files_len); + let mut blob_seq_numbers_to_delete = Vec::with_capacity(blob_delete_len); + + let mut meta_file_builder = MetaFileBuilder::new(family); + + let mut keys_written = 0; + for result in merge_result { + match result { + PartialMergeResult::Merged { + new_sst_files: merged_new_sst_files, + blob_seq_numbers_to_delete: merged_blob_seq_numbers_to_delete, + keys_written: merged_keys_written, + } => { + for (seq, file, meta) in merged_new_sst_files { meta_file_builder.add(seq, meta); + new_sst_files.push((seq, file)); } + blob_seq_numbers_to_delete.extend(merged_blob_seq_numbers_to_delete); + keys_written += merged_keys_written; + } + PartialMergeResult::Move { seq, meta } => { + meta_file_builder.add(seq, meta); } } + } - for &seq in sst_seq_numbers_to_delete.iter() { - meta_file_builder.add_obsolete_sst_file(seq); - } + for &seq in sst_seq_numbers_to_delete.iter() { + meta_file_builder.add_obsolete_sst_file(seq); + } - let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; - let meta_file = { - let _span = tracing::trace_span!("write meta file").entered(); - meta_file_builder.write(&self.path, seq)? - }; - - Ok(PartialResultPerFamily { - new_meta_file: Some((seq, meta_file)), - new_sst_files, - sst_seq_numbers_to_delete, - blob_seq_numbers_to_delete, - keys_written, - }) - }, - )?; + let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + let meta_file = { + let _span = tracing::trace_span!("write meta file").entered(); + meta_file_builder.write(&self.path, seq)? + }; + + Ok(PartialResultPerFamily { + new_meta_file: Some((seq, meta_file)), + new_sst_files, + sst_seq_numbers_to_delete, + blob_seq_numbers_to_delete, + keys_written, + }) + }) + .collect::>>()?; for PartialResultPerFamily { new_meta_file: inner_new_meta_file, diff --git a/turbopack/crates/turbo-persistence/src/lib.rs b/turbopack/crates/turbo-persistence/src/lib.rs index f944e4b4d1202..70c87199f396c 100644 --- a/turbopack/crates/turbo-persistence/src/lib.rs +++ b/turbopack/crates/turbo-persistence/src/lib.rs @@ -13,21 +13,19 @@ mod db; mod key; mod lookup_entry; mod merge_iter; -mod meta_file; -mod meta_file_builder; -mod parallel_scheduler; -mod sst_filter; mod static_sorted_file; mod static_sorted_file_builder; -mod value_buf; mod write_batch; +mod meta_file; +mod meta_file_builder; +mod sst_filter; #[cfg(test)] mod tests; +mod value_buf; pub use arc_slice::ArcSlice; pub use db::{CompactConfig, MetaFileEntryInfo, MetaFileInfo, TurboPersistence}; pub use key::{KeyBase, QueryKey, StoreKey}; -pub use parallel_scheduler::{ParallelScheduler, SerialScheduler}; pub use value_buf::ValueBuffer; pub use write_batch::WriteBatch; diff --git a/turbopack/crates/turbo-persistence/src/parallel_scheduler.rs b/turbopack/crates/turbo-persistence/src/parallel_scheduler.rs deleted file mode 100644 index 52a9d626090fc..0000000000000 --- a/turbopack/crates/turbo-persistence/src/parallel_scheduler.rs +++ /dev/null @@ -1,137 +0,0 @@ -pub trait ParallelScheduler: Clone + Sync + Send { - fn parallel_for_each(&self, items: &[T], f: impl Fn(&T) + Send + Sync) - where - T: Sync; - - fn try_parallel_for_each<'l, T, E>( - &self, - items: &'l [T], - f: impl (Fn(&'l T) -> Result<(), E>) + Send + Sync, - ) -> Result<(), E> - where - T: Sync, - E: Send + 'static; - - fn try_parallel_for_each_mut<'l, T, E>( - &self, - items: &'l mut [T], - f: impl (Fn(&'l mut T) -> Result<(), E>) + Send + Sync, - ) -> Result<(), E> - where - T: Send + Sync, - E: Send + 'static; - - fn try_parallel_for_each_owned( - &self, - items: Vec, - f: impl (Fn(T) -> Result<(), E>) + Send + Sync, - ) -> Result<(), E> - where - T: Send + Sync, - E: Send + 'static; - - fn parallel_map_collect<'l, Item, PerItemResult, Result>( - &self, - items: &'l [Item], - f: impl Fn(&'l Item) -> PerItemResult + Send + Sync, - ) -> Result - where - Item: Sync, - PerItemResult: Send + Sync + 'l, - Result: FromIterator; - - fn parallel_map_collect_owned( - &self, - items: Vec, - f: impl Fn(Item) -> PerItemResult + Send + Sync, - ) -> Result - where - Item: Send + Sync, - PerItemResult: Send + Sync, - Result: FromIterator; -} - -#[derive(Clone, Copy, Default)] -pub struct SerialScheduler; - -impl ParallelScheduler for SerialScheduler { - fn parallel_for_each(&self, items: &[T], f: impl Fn(&T) + Send + Sync) - where - T: Sync, - { - for item in items { - f(item); - } - } - - fn try_parallel_for_each<'l, T, E>( - &self, - items: &'l [T], - f: impl (Fn(&'l T) -> Result<(), E>) + Send + Sync, - ) -> Result<(), E> - where - T: Sync, - E: Send, - { - for item in items { - f(item)?; - } - Ok(()) - } - - fn try_parallel_for_each_mut<'l, T, E>( - &self, - items: &'l mut [T], - f: impl (Fn(&'l mut T) -> Result<(), E>) + Send + Sync, - ) -> Result<(), E> - where - T: Sync, - E: Send, - { - for item in items { - f(item)?; - } - Ok(()) - } - - fn try_parallel_for_each_owned( - &self, - items: Vec, - f: impl (Fn(T) -> Result<(), E>) + Send + Sync, - ) -> Result<(), E> - where - T: Sync, - E: Send, - { - for item in items { - f(item)?; - } - Ok(()) - } - - fn parallel_map_collect<'l, Item, PerItemResult, Result>( - &self, - items: &'l [Item], - f: impl Fn(&'l Item) -> PerItemResult + Send + Sync, - ) -> Result - where - Item: Sync, - PerItemResult: Send + Sync + 'l, - Result: FromIterator, - { - items.iter().map(f).collect() - } - - fn parallel_map_collect_owned( - &self, - items: Vec, - f: impl Fn(Item) -> PerItemResult + Send + Sync, - ) -> Result - where - Item: Send + Sync, - PerItemResult: Send + Sync, - Result: FromIterator, - { - items.into_iter().map(f).collect() - } -} diff --git a/turbopack/crates/turbo-persistence/src/tests.rs b/turbopack/crates/turbo-persistence/src/tests.rs index 6e0b42b92fe78..5c123611d8759 100644 --- a/turbopack/crates/turbo-persistence/src/tests.rs +++ b/turbopack/crates/turbo-persistence/src/tests.rs @@ -6,116 +6,28 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use crate::{ constants::MAX_MEDIUM_VALUE_SIZE, db::{CompactConfig, TurboPersistence}, - parallel_scheduler::ParallelScheduler, write_batch::WriteBatch, }; -#[derive(Clone, Copy)] -struct RayonParallelScheduler; - -impl ParallelScheduler for RayonParallelScheduler { - fn parallel_for_each(&self, items: &[T], f: impl Fn(&T) + Send + Sync) - where - T: Sync, - { - items.into_par_iter().for_each(f); - } - - fn try_parallel_for_each<'l, T, E>( - &self, - items: &'l [T], - f: impl (Fn(&'l T) -> Result<(), E>) + Send + Sync, - ) -> Result<(), E> - where - T: Sync, - E: Send, - { - items.into_par_iter().try_for_each(f) - } - - fn try_parallel_for_each_mut<'l, T, E>( - &self, - items: &'l mut [T], - f: impl (Fn(&'l mut T) -> Result<(), E>) + Send + Sync, - ) -> Result<(), E> - where - T: Send + Sync, - E: Send, - { - items.into_par_iter().try_for_each(f) - } - - fn try_parallel_for_each_owned( - &self, - items: Vec, - f: impl (Fn(T) -> Result<(), E>) + Send + Sync, - ) -> Result<(), E> - where - T: Send + Sync, - E: Send, - { - items.into_par_iter().try_for_each(f) - } - - fn parallel_map_collect<'l, Item, PerItemResult, Result>( - &self, - items: &'l [Item], - f: impl Fn(&'l Item) -> PerItemResult + Send + Sync, - ) -> Result - where - Item: Sync, - PerItemResult: Send + Sync, - Result: FromIterator, - { - items - .into_par_iter() - .map(f) - .collect_vec_list() - .into_iter() - .flatten() - .collect() - } - - fn parallel_map_collect_owned( - &self, - items: Vec, - f: impl Fn(Item) -> PerItemResult + Send + Sync, - ) -> Result - where - Item: Send + Sync, - PerItemResult: Send + Sync, - Result: FromIterator, - { - items - .into_par_iter() - .map(f) - .collect_vec_list() - .into_iter() - .flatten() - .collect() - } -} - #[test] fn full_cycle() -> Result<()> { let mut test_cases = Vec::new(); type TestCases = Vec<( &'static str, - Box, RayonParallelScheduler, 16>) -> Result<()>>, - Box) -> Result<()>>, + Box, 16>) -> Result<()>>, + Box Result<()>>, )>; fn test_case( test_cases: &mut TestCases, name: &'static str, - write: impl Fn(&mut WriteBatch, RayonParallelScheduler, 16>) -> Result<()> + 'static, - read: impl Fn(&TurboPersistence) -> Result<()> + 'static, + write: impl Fn(&mut WriteBatch, 16>) -> Result<()> + 'static, + read: impl Fn(&TurboPersistence) -> Result<()> + 'static, ) { test_cases.push(( name, - Box::new(write) - as Box, RayonParallelScheduler, 16>) -> Result<()>>, - Box::new(read) as Box) -> Result<()>>, + Box::new(write) as Box, 16>) -> Result<()>>, + Box::new(read) as Box Result<()>>, )); } @@ -303,10 +215,7 @@ fn full_cycle() -> Result<()> { { let start = Instant::now(); - let db = TurboPersistence::open_with_parallel_scheduler( - path.to_path_buf(), - RayonParallelScheduler, - )?; + let db = TurboPersistence::open(path.to_path_buf())?; let mut batch = db.write_batch()?; write(&mut batch)?; db.commit_write_batch(batch)?; @@ -322,10 +231,7 @@ fn full_cycle() -> Result<()> { } { let start = Instant::now(); - let db = TurboPersistence::open_with_parallel_scheduler( - path.to_path_buf(), - RayonParallelScheduler, - )?; + let db = TurboPersistence::open(path.to_path_buf())?; println!("{name} restore time: {:?}", start.elapsed()); let start = Instant::now(); read(&db)?; @@ -351,10 +257,7 @@ fn full_cycle() -> Result<()> { } { let start = Instant::now(); - let db = TurboPersistence::open_with_parallel_scheduler( - path.to_path_buf(), - RayonParallelScheduler, - )?; + let db = TurboPersistence::open(path.to_path_buf())?; println!("{name} restore time after compact: {:?}", start.elapsed()); let start = Instant::now(); read(&db)?; @@ -388,10 +291,7 @@ fn full_cycle() -> Result<()> { { let start = Instant::now(); - let db = TurboPersistence::open_with_parallel_scheduler( - path.to_path_buf(), - RayonParallelScheduler, - )?; + let db = TurboPersistence::open(path.to_path_buf())?; let mut batch = db.write_batch()?; for (_, write, _) in test_cases.iter() { write(&mut batch)?; @@ -411,10 +311,7 @@ fn full_cycle() -> Result<()> { } { let start = Instant::now(); - let db = TurboPersistence::open_with_parallel_scheduler( - path.to_path_buf(), - RayonParallelScheduler, - )?; + let db = TurboPersistence::open(path.to_path_buf())?; println!("All restore time: {:?}", start.elapsed()); for (name, _, read) in test_cases.iter() { let start = Instant::now(); @@ -446,10 +343,7 @@ fn full_cycle() -> Result<()> { { let start = Instant::now(); - let db = TurboPersistence::open_with_parallel_scheduler( - path.to_path_buf(), - RayonParallelScheduler, - )?; + let db = TurboPersistence::open(path.to_path_buf())?; println!("All restore time after compact: {:?}", start.elapsed()); for (name, _, read) in test_cases.iter() { @@ -489,17 +383,13 @@ fn persist_changes() -> Result<()> { let path = tempdir.path(); const READ_COUNT: u32 = 2_000; // we'll read every 10th value, so writes are 10x this value - fn put( - b: &WriteBatch<(u8, [u8; 4]), RayonParallelScheduler, 1>, - key: u8, - value: u8, - ) -> Result<()> { + fn put(b: &WriteBatch<(u8, [u8; 4]), 1>, key: u8, value: u8) -> Result<()> { for i in 0..(READ_COUNT * 10) { b.put(0, (key, i.to_be_bytes()), vec![value].into())?; } Ok(()) } - fn check(db: &TurboPersistence, key: u8, value: u8) -> Result<()> { + fn check(db: &TurboPersistence, key: u8, value: u8) -> Result<()> { for i in 0..READ_COUNT { // read every 10th item let i = i * 10; @@ -512,10 +402,7 @@ fn persist_changes() -> Result<()> { } { - let db = TurboPersistence::open_with_parallel_scheduler( - path.to_path_buf(), - RayonParallelScheduler, - )?; + let db = TurboPersistence::open(path.to_path_buf())?; let b = db.write_batch::<_, 1>()?; put(&b, 1, 11)?; put(&b, 2, 21)?; @@ -531,10 +418,7 @@ fn persist_changes() -> Result<()> { println!("---"); { - let db = TurboPersistence::open_with_parallel_scheduler( - path.to_path_buf(), - RayonParallelScheduler, - )?; + let db = TurboPersistence::open(path.to_path_buf())?; let b = db.write_batch::<_, 1>()?; put(&b, 1, 12)?; put(&b, 2, 22)?; @@ -548,10 +432,7 @@ fn persist_changes() -> Result<()> { } { - let db = TurboPersistence::open_with_parallel_scheduler( - path.to_path_buf(), - RayonParallelScheduler, - )?; + let db = TurboPersistence::open(path.to_path_buf())?; let b = db.write_batch::<_, 1>()?; put(&b, 1, 13)?; db.commit_write_batch(b)?; @@ -565,10 +446,7 @@ fn persist_changes() -> Result<()> { println!("---"); { - let db = TurboPersistence::open_with_parallel_scheduler( - path.to_path_buf(), - RayonParallelScheduler, - )?; + let db = TurboPersistence::open(path.to_path_buf())?; check(&db, 1, 13)?; check(&db, 2, 22)?; @@ -579,10 +457,7 @@ fn persist_changes() -> Result<()> { println!("---"); { - let db = TurboPersistence::open_with_parallel_scheduler( - path.to_path_buf(), - RayonParallelScheduler, - )?; + let db = TurboPersistence::open(path.to_path_buf())?; db.compact(&CompactConfig { optimal_merge_count: 4, @@ -600,10 +475,7 @@ fn persist_changes() -> Result<()> { println!("---"); { - let db = TurboPersistence::open_with_parallel_scheduler( - path.to_path_buf(), - RayonParallelScheduler, - )?; + let db = TurboPersistence::open(path.to_path_buf())?; check(&db, 1, 13)?; check(&db, 2, 22)?; @@ -621,17 +493,13 @@ fn partial_compaction() -> Result<()> { let path = tempdir.path(); const READ_COUNT: u32 = 2_000; // we'll read every 10th value, so writes are 10x this value - fn put( - b: &WriteBatch<(u8, [u8; 4]), RayonParallelScheduler, 1>, - key: u8, - value: u8, - ) -> Result<()> { + fn put(b: &WriteBatch<(u8, [u8; 4]), 1>, key: u8, value: u8) -> Result<()> { for i in 0..(READ_COUNT * 10) { b.put(0, (key, i.to_be_bytes()), vec![value].into())?; } Ok(()) } - fn check(db: &TurboPersistence, key: u8, value: u8) -> Result<()> { + fn check(db: &TurboPersistence, key: u8, value: u8) -> Result<()> { for i in 0..READ_COUNT { // read every 10th item let i = i * 10; @@ -648,10 +516,7 @@ fn partial_compaction() -> Result<()> { println!("--- Iteration {i} ---"); println!("Add more entries"); { - let db = TurboPersistence::open_with_parallel_scheduler( - path.to_path_buf(), - RayonParallelScheduler, - )?; + let db = TurboPersistence::open(path.to_path_buf())?; let b = db.write_batch::<_, 1>()?; put(&b, i, i)?; put(&b, i + 1, i)?; @@ -670,10 +535,7 @@ fn partial_compaction() -> Result<()> { println!("Compaction"); { - let db = TurboPersistence::open_with_parallel_scheduler( - path.to_path_buf(), - RayonParallelScheduler, - )?; + let db = TurboPersistence::open(path.to_path_buf())?; db.compact(&CompactConfig { optimal_merge_count: 4, @@ -694,10 +556,7 @@ fn partial_compaction() -> Result<()> { println!("Restore check"); { - let db = TurboPersistence::open_with_parallel_scheduler( - path.to_path_buf(), - RayonParallelScheduler, - )?; + let db = TurboPersistence::open(path.to_path_buf())?; for j in 0..i { check(&db, j, j)?; @@ -721,11 +580,7 @@ fn merge_file_removal() -> Result<()> { let _ = fs::remove_dir_all(path); const READ_COUNT: u32 = 2_000; // we'll read every 10th value, so writes are 10x this value - fn put( - b: &WriteBatch<(u8, [u8; 4]), RayonParallelScheduler, 1>, - key: u8, - value: u32, - ) -> Result<()> { + fn put(b: &WriteBatch<(u8, [u8; 4]), 1>, key: u8, value: u32) -> Result<()> { for i in 0..(READ_COUNT * 10) { b.put( 0, @@ -735,7 +590,7 @@ fn merge_file_removal() -> Result<()> { } Ok(()) } - fn check(db: &TurboPersistence, key: u8, value: u32) -> Result<()> { + fn check(db: &TurboPersistence, key: u8, value: u32) -> Result<()> { for i in 0..READ_COUNT { // read every 10th item let i = i * 10; @@ -753,10 +608,7 @@ fn merge_file_removal() -> Result<()> { { println!("--- Init ---"); - let db = TurboPersistence::open_with_parallel_scheduler( - path.to_path_buf(), - RayonParallelScheduler, - )?; + let db = TurboPersistence::open(path.to_path_buf())?; let b = db.write_batch::<_, 1>()?; for j in 0..=255 { put(&b, j, 0)?; @@ -772,10 +624,7 @@ fn merge_file_removal() -> Result<()> { let i = i * 37; println!("Add more entries"); { - let db = TurboPersistence::open_with_parallel_scheduler( - path.to_path_buf(), - RayonParallelScheduler, - )?; + let db = TurboPersistence::open(path.to_path_buf())?; let b = db.write_batch::<_, 1>()?; for j in iter_bits(i) { println!("Put {j} = {i}"); @@ -793,10 +642,7 @@ fn merge_file_removal() -> Result<()> { println!("Compaction"); { - let db = TurboPersistence::open_with_parallel_scheduler( - path.to_path_buf(), - RayonParallelScheduler, - )?; + let db = TurboPersistence::open(path.to_path_buf())?; db.compact(&CompactConfig { optimal_merge_count: 4, @@ -814,10 +660,7 @@ fn merge_file_removal() -> Result<()> { println!("Restore check"); { - let db = TurboPersistence::open_with_parallel_scheduler( - path.to_path_buf(), - RayonParallelScheduler, - )?; + let db = TurboPersistence::open(path.to_path_buf())?; for j in 0..32 { check(&db, j, expected_values[j as usize])?; diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index 81a954d0ef18e..490cf38e88a90 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -9,11 +9,15 @@ use std::{ use anyhow::{Context, Result}; use byteorder::{BE, WriteBytesExt}; -use either::Either; use lzzzz::lz4::{self, ACC_LEVEL_DEFAULT}; use parking_lot::Mutex; +use rayon::{ + iter::{Either, IndexedParallelIterator, IntoParallelIterator, ParallelIterator}, + scope, +}; use smallvec::SmallVec; use thread_local::ThreadLocal; +use tracing::Span; use crate::{ ValueBuffer, @@ -22,7 +26,6 @@ use crate::{ constants::{MAX_MEDIUM_VALUE_SIZE, THREAD_LOCAL_SIZE_SHIFT}, key::StoreKey, meta_file_builder::MetaFileBuilder, - parallel_scheduler::ParallelScheduler, static_sorted_file_builder::{StaticSortedFileBuilderMeta, write_static_stored_file}, }; @@ -65,9 +68,7 @@ enum GlobalCollectorState { } /// A write batch. -pub struct WriteBatch { - /// Parallel scheduler - parallel_scheduler: S, +pub struct WriteBatch { /// The database path db_path: PathBuf, /// The current sequence number counter. Increased for every new SST file or blob file. @@ -83,16 +84,13 @@ pub struct WriteBatch>, } -impl - WriteBatch -{ +impl WriteBatch { /// Creates a new write batch for a database. - pub(crate) fn new(path: PathBuf, current: u32, parallel_scheduler: S) -> Self { + pub(crate) fn new(path: PathBuf, current: u32) -> Self { const { assert!(FAMILIES <= usize_from_u32(u32::MAX)); }; Self { - parallel_scheduler, db_path: path, current_sequence_number: AtomicU32::new(current), thread_locals: ThreadLocal::new(), @@ -225,12 +223,13 @@ impl } } - self.parallel_scheduler - .try_parallel_for_each_owned(collectors, |mut collector| { - self.flush_thread_local_collector(family, &mut collector)?; - drop(collector); - anyhow::Ok(()) - })?; + let span = Span::current(); + collectors.into_par_iter().try_for_each(|mut collector| { + let _span = span.clone().entered(); + self.flush_thread_local_collector(family, &mut collector)?; + drop(collector); + anyhow::Ok(()) + })?; // Now we flush the global collector(s). let mut collector_state = self.collectors[usize_from_u32(family)].lock(); @@ -243,22 +242,22 @@ impl } } GlobalCollectorState::Sharded(_) => { - let GlobalCollectorState::Sharded(mut shards) = replace( + let GlobalCollectorState::Sharded(shards) = replace( &mut *collector_state, GlobalCollectorState::Unsharded(Collector::new()), ) else { unreachable!(); }; - self.parallel_scheduler - .try_parallel_for_each_mut(&mut shards, |collector| { - if !collector.is_empty() { - let sst = self.create_sst_file(family, collector.sorted())?; - collector.clear(); - self.new_sst_files.lock().push(sst); - collector.drop_contents(); - } - anyhow::Ok(()) - })?; + shards.into_par_iter().try_for_each(|mut collector| { + let _span = span.clone().entered(); + if !collector.is_empty() { + let sst = self.create_sst_file(family, collector.sorted())?; + collector.clear(); + self.new_sst_files.lock().push(sst); + drop(collector); + } + anyhow::Ok(()) + })?; } } @@ -270,9 +269,10 @@ impl #[tracing::instrument(level = "trace", skip(self))] pub(crate) fn finish(&mut self) -> Result { let mut new_blob_files = Vec::new(); + let shared_error = Mutex::new(Ok(())); // First, we flush all thread local collectors to the global collectors. - { + scope(|scope| { let _span = tracing::trace_span!("flush thread local collectors").entered(); let mut collectors = [const { Vec::new() }; FAMILIES]; for cell in self.thread_locals.iter_mut() { @@ -286,24 +286,23 @@ impl } } } - let to_flush = collectors - .into_iter() - .enumerate() - .flat_map(|(family, collector)| { - collector - .into_iter() - .map(move |collector| (family as u32, collector)) - }) - .collect::>(); - self.parallel_scheduler.try_parallel_for_each_owned( - to_flush, - |(family, mut collector)| { - self.flush_thread_local_collector(family, &mut collector)?; - drop(collector); - anyhow::Ok(()) - }, - )?; - } + for (family, thread_local_collectors) in collectors.into_iter().enumerate() { + for mut collector in thread_local_collectors { + let this = &self; + let shared_error = &shared_error; + let span = Span::current(); + scope.spawn(move |_| { + let _span = span.entered(); + if let Err(err) = + this.flush_thread_local_collector(family as u32, &mut collector) + { + *shared_error.lock() = Err(err); + } + drop(collector); + }); + } + } + }); let _span = tracing::trace_span!("flush collectors").entered(); @@ -314,24 +313,25 @@ impl let new_collectors = [(); FAMILIES].map(|_| Mutex::new(GlobalCollectorState::Unsharded(Collector::new()))); let collectors = replace(&mut self.collectors, new_collectors); - let collectors = collectors - .into_iter() + let span = Span::current(); + collectors + .into_par_iter() .enumerate() .flat_map(|(family, state)| { let collector = state.into_inner(); match collector { GlobalCollectorState::Unsharded(collector) => { - Either::Left([(family, collector)].into_iter()) - } - GlobalCollectorState::Sharded(shards) => { - Either::Right(shards.into_iter().map(move |collector| (family, collector))) + Either::Left([(family, collector)].into_par_iter()) } + GlobalCollectorState::Sharded(shards) => Either::Right( + shards + .into_par_iter() + .map(move |collector| (family, collector)), + ), } }) - .collect::>(); - self.parallel_scheduler.try_parallel_for_each_owned( - collectors, - |(family, mut collector)| { + .try_for_each(|(family, mut collector)| { + let _span = span.clone().entered(); let family = family as u32; if !collector.is_empty() { let sst = self.create_sst_file(family, collector.sorted())?; @@ -340,37 +340,33 @@ impl shared_new_sst_files.lock().push(sst); } anyhow::Ok(()) - }, - )?; + })?; + + shared_error.into_inner()?; // Not we need to write the new meta files. let new_meta_collectors = [(); FAMILIES].map(|_| Mutex::new(Vec::new())); let meta_collectors = replace(&mut self.meta_collectors, new_meta_collectors); let keys_written = AtomicU64::new(0); - let file_to_write = meta_collectors - .into_iter() + let new_meta_files = meta_collectors + .into_par_iter() .map(|mutex| mutex.into_inner()) .enumerate() .filter(|(_, sst_files)| !sst_files.is_empty()) - .collect::>(); - let new_meta_files = self - .parallel_scheduler - .parallel_map_collect_owned::<_, _, Result>>( - file_to_write, - |(family, sst_files)| { - let family = family as u32; - let mut entries = 0; - let mut builder = MetaFileBuilder::new(family); - for (seq, sst) in sst_files { - entries += sst.entries; - builder.add(seq, sst); - } - keys_written.fetch_add(entries, Ordering::Relaxed); - let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1; - let file = builder.write(&self.db_path, seq)?; - Ok((seq, file)) - }, - )?; + .map(|(family, sst_files)| { + let family = family as u32; + let mut entries = 0; + let mut builder = MetaFileBuilder::new(family); + for (seq, sst) in sst_files { + entries += sst.entries; + builder.add(seq, sst); + } + keys_written.fetch_add(entries, Ordering::Relaxed); + let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + let file = builder.write(&self.db_path, seq)?; + Ok((seq, file)) + }) + .collect::>>()?; // Finally we return the new files and sequence number. let seq = self.current_sequence_number.load(Ordering::SeqCst); diff --git a/turbopack/crates/turbo-tasks-backend/Cargo.toml b/turbopack/crates/turbo-tasks-backend/Cargo.toml index 3554ce1f31b20..eb3ee57b72093 100644 --- a/turbopack/crates/turbo-tasks-backend/Cargo.toml +++ b/turbopack/crates/turbo-tasks-backend/Cargo.toml @@ -40,6 +40,7 @@ once_cell = { workspace = true } parking_lot = { workspace = true } pot = "3.0.0" rand = { workspace = true } +rayon = { workspace = true } ringmap = { workspace = true, features = ["serde"] } rustc-hash = { workspace = true } serde = { workspace = true } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index fbba6b1ab13d4..0bf9249ecb622 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -1267,6 +1267,7 @@ impl TurboTasksBackendInner { return task_id; } + self.track_cache_miss(&task_type); let tx = self .should_restore() .then(|| self.backing_storage.start_read_transaction()) @@ -1278,7 +1279,6 @@ impl TurboTasksBackendInner { .forward_lookup_task_cache(tx.as_ref(), &task_type) .expect("Failed to lookup task id") } { - self.track_cache_hit(&task_type); let _ = self.task_cache.try_insert(Arc::new(task_type), task_id); task_id } else { @@ -1287,14 +1287,12 @@ impl TurboTasksBackendInner { let task_id = if let Err(existing_task_id) = self.task_cache.try_insert(task_type.clone(), task_id) { - self.track_cache_hit(&task_type); // Safety: We just created the id and failed to insert it. unsafe { self.persisted_task_id_factory.reuse(task_id); } existing_task_id } else { - self.track_cache_miss(&task_type); task_id }; if let Some(log) = &self.persisted_task_cache_log { @@ -1329,10 +1327,10 @@ impl TurboTasksBackendInner { return task_id; } + self.track_cache_miss(&task_type); let task_type = Arc::new(task_type); let task_id = self.transient_task_id_factory.get(); - if let Err(existing_task_id) = self.task_cache.try_insert(task_type.clone(), task_id) { - self.track_cache_hit(&task_type); + if let Err(existing_task_id) = self.task_cache.try_insert(task_type, task_id) { // Safety: We just created the id and failed to insert it. unsafe { self.transient_task_id_factory.reuse(task_id); @@ -1341,7 +1339,6 @@ impl TurboTasksBackendInner { return existing_task_id; } - self.track_cache_miss(&task_type); self.connect_child(parent_task, task_id, turbo_tasks); task_id diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index ba83f1da5a209..ceab626298854 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -6,8 +6,9 @@ use std::{ }; use bitfield::bitfield; +use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator}; use smallvec::SmallVec; -use turbo_tasks::{FxDashMap, TaskId, parallel}; +use turbo_tasks::{FxDashMap, TaskId}; use crate::{ backend::dynamic_storage::DynamicStorage, @@ -663,43 +664,48 @@ impl Storage { // The number of shards is much larger than the number of threads, so the effect of the // locks held is negligible. - parallel::map_collect::<_, _, Vec<_>>(self.modified.shards(), |shard| { - let mut direct_snapshots: Vec<(TaskId, Box)> = Vec::new(); - let mut modified: SmallVec<[TaskId; 4]> = SmallVec::new(); - { - // Take the snapshots from the modified map - let guard = shard.write(); - // Safety: guard must outlive the iterator. - for bucket in unsafe { guard.iter() } { - // Safety: the guard guarantees that the bucket is not removed and the ptr - // is valid. - let (key, shared_value) = unsafe { bucket.as_mut() }; - let modified_state = shared_value.get_mut(); - match modified_state { - ModifiedState::Modified => { - modified.push(*key); - } - ModifiedState::Snapshot(snapshot) => { - if let Some(snapshot) = snapshot.take() { - direct_snapshots.push((*key, snapshot)); + self.modified + .shards() + .par_iter() + .with_max_len(1) + .map(|shard| { + let mut direct_snapshots: Vec<(TaskId, Box)> = Vec::new(); + let mut modified: SmallVec<[TaskId; 4]> = SmallVec::new(); + { + // Take the snapshots from the modified map + let guard = shard.write(); + // Safety: guard must outlive the iterator. + for bucket in unsafe { guard.iter() } { + // Safety: the guard guarantees that the bucket is not removed and the ptr + // is valid. + let (key, shared_value) = unsafe { bucket.as_mut() }; + let modified_state = shared_value.get_mut(); + match modified_state { + ModifiedState::Modified => { + modified.push(*key); + } + ModifiedState::Snapshot(snapshot) => { + if let Some(snapshot) = snapshot.take() { + direct_snapshots.push((*key, snapshot)); + } } } } + // Safety: guard must outlive the iterator. + drop(guard); } - // Safety: guard must outlive the iterator. - drop(guard); - } - SnapshotShard { - direct_snapshots, - modified, - storage: self, - guard: Some(guard.clone()), - process, - preprocess, - process_snapshot, - } - }) + SnapshotShard { + direct_snapshots, + modified, + storage: self, + guard: Some(guard.clone()), + process, + preprocess, + process_snapshot, + } + }) + .collect::>() } /// Start snapshot mode. diff --git a/turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs b/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs similarity index 89% rename from turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs rename to turbopack/crates/turbo-tasks-backend/src/database/turbo.rs index f2934b3c857ad..be6c5ee684a83 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs @@ -1,21 +1,21 @@ -use std::{cmp::max, path::PathBuf, sync::Arc, thread::available_parallelism}; +use std::{ + cmp::max, + path::PathBuf, + sync::Arc, + thread::{JoinHandle, available_parallelism, spawn}, +}; use anyhow::{Ok, Result}; use parking_lot::Mutex; -use tokio::{runtime::Handle, task::block_in_place}; use turbo_persistence::{ ArcSlice, CompactConfig, KeyBase, StoreKey, TurboPersistence, ValueBuffer, }; -use turbo_tasks::{JoinHandle, spawn}; use crate::database::{ key_value_database::{KeySpace, KeyValueDatabase}, - turbo::parallel_scheduler::TurboTasksParallelScheduler, write_batch::{BaseWriteBatch, ConcurrentWriteBatch, WriteBatch, WriteBuffer}, }; -mod parallel_scheduler; - const MB: u64 = 1024 * 1024; const COMPACT_CONFIG: CompactConfig = CompactConfig { min_merge_count: 3, @@ -28,7 +28,7 @@ const COMPACT_CONFIG: CompactConfig = CompactConfig { }; pub struct TurboKeyValueDatabase { - db: Arc>, + db: Arc, compact_join_handle: Mutex>>>, is_ci: bool, is_short_session: bool, @@ -84,7 +84,7 @@ impl KeyValueDatabase for TurboKeyValueDatabase { ) -> Result, Self::ConcurrentWriteBatch<'_>>> { // Wait for the compaction to finish if let Some(join_handle) = self.compact_join_handle.lock().take() { - join(join_handle)?; + join_handle.join().unwrap()?; } // Start a new write batch Ok(WriteBatch::concurrent(TurboWriteBatch { @@ -100,7 +100,7 @@ impl KeyValueDatabase for TurboKeyValueDatabase { fn shutdown(&self) -> Result<()> { // Wait for the compaction to finish if let Some(join_handle) = self.compact_join_handle.lock().take() { - join(join_handle)?; + join_handle.join().unwrap()?; } // Compact the database on shutdown self.db.compact(&CompactConfig { @@ -118,8 +118,8 @@ impl KeyValueDatabase for TurboKeyValueDatabase { } pub struct TurboWriteBatch<'a> { - batch: turbo_persistence::WriteBatch, TurboTasksParallelScheduler, 5>, - db: &'a Arc>, + batch: turbo_persistence::WriteBatch, 5>, + db: &'a Arc, compact_join_handle: Option<&'a Mutex>>>>, } @@ -144,7 +144,7 @@ impl<'a> BaseWriteBatch<'a> for TurboWriteBatch<'a> { if let Some(compact_join_handle) = self.compact_join_handle { // Start a new compaction in the background let db = self.db.clone(); - let handle = spawn(async move { + let handle = spawn(move || { db.compact(&CompactConfig { max_merge_segment_count: available_parallelism() .map_or(4, |c| max(4, c.get() / 2)), @@ -220,7 +220,3 @@ impl<'l> From> for ValueBuffer<'l> { } } } - -fn join(handle: JoinHandle>) -> Result<()> { - block_in_place(|| Handle::current().block_on(handle)) -} diff --git a/turbopack/crates/turbo-tasks-backend/src/database/turbo/parallel_scheduler.rs b/turbopack/crates/turbo-tasks-backend/src/database/turbo/parallel_scheduler.rs deleted file mode 100644 index dd78022c3640b..0000000000000 --- a/turbopack/crates/turbo-tasks-backend/src/database/turbo/parallel_scheduler.rs +++ /dev/null @@ -1,76 +0,0 @@ -use turbo_persistence::ParallelScheduler; -use turbo_tasks::parallel; - -#[derive(Clone, Copy, Default)] -pub struct TurboTasksParallelScheduler; - -impl ParallelScheduler for TurboTasksParallelScheduler { - fn parallel_for_each(&self, items: &[T], f: impl Fn(&T) + Send + Sync) - where - T: Sync, - { - parallel::for_each(items, f); - } - - fn try_parallel_for_each<'l, T, E>( - &self, - items: &'l [T], - f: impl (Fn(&'l T) -> Result<(), E>) + Send + Sync, - ) -> Result<(), E> - where - T: Sync, - E: Send + 'static, - { - parallel::try_for_each(items, f) - } - - fn try_parallel_for_each_mut<'l, T, E>( - &self, - items: &'l mut [T], - f: impl (Fn(&'l mut T) -> Result<(), E>) + Send + Sync, - ) -> Result<(), E> - where - T: Send + Sync, - E: Send + 'static, - { - parallel::try_for_each_mut(items, f) - } - - fn try_parallel_for_each_owned( - &self, - items: Vec, - f: impl (Fn(T) -> Result<(), E>) + Send + Sync, - ) -> Result<(), E> - where - T: Send + Sync, - E: Send + 'static, - { - parallel::try_for_each_owned(items, f) - } - - fn parallel_map_collect<'l, T, I, R>( - &self, - items: &'l [T], - f: impl Fn(&'l T) -> I + Send + Sync, - ) -> R - where - T: Sync, - I: Send + Sync + 'l, - R: FromIterator, - { - parallel::map_collect(items, f) - } - - fn parallel_map_collect_owned( - &self, - items: Vec, - f: impl Fn(T) -> I + Send + Sync, - ) -> R - where - T: Send + Sync, - I: Send + Sync, - R: FromIterator, - { - parallel::map_collect_owned(items, f) - } -} diff --git a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs index 3127a15ab7066..c4b84310d651f 100644 --- a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs @@ -1,18 +1,21 @@ use std::{ borrow::Borrow, + cmp::max, env, path::PathBuf, sync::{Arc, LazyLock, Mutex, PoisonError, Weak}, }; use anyhow::{Context, Result, anyhow}; +use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use serde::{Deserialize, Serialize}; use smallvec::SmallVec; +use tracing::Span; use turbo_tasks::{ SessionId, TaskId, backend::CachedTaskType, panic_hooks::{PanicHookGuard, register_panic_hook}, - parallel, + turbo_tasks_scope, }; use crate::{ @@ -328,15 +331,14 @@ impl BackingStorageSealed let _span = tracing::trace_span!("update task data").entered(); process_task_data(snapshots, Some(batch))?; let span = tracing::trace_span!("flush task data").entered(); - parallel::try_for_each( - &[KeySpace::TaskMeta, KeySpace::TaskData], - |&key_space| { + [KeySpace::TaskMeta, KeySpace::TaskData] + .into_par_iter() + .try_for_each(|key_space| { let _span = span.clone().entered(); // Safety: We already finished all processing of the task data and task // meta unsafe { batch.flush(key_space) } - }, - )?; + })?; } let mut next_task_id = get_next_free_task_id::< @@ -350,9 +352,10 @@ impl BackingStorageSealed items = task_cache_updates.iter().map(|m| m.len()).sum::() ) .entered(); - let result = parallel::map_collect_owned::<_, _, Result>>( - task_cache_updates, - |updates| { + let result = task_cache_updates + .into_par_iter() + .with_max_len(1) + .map(|updates| { let _span = _span.clone().entered(); let mut max_task_id = 0; @@ -387,11 +390,15 @@ impl BackingStorageSealed } Ok(max_task_id) - }, - )? - .into_iter() - .max() - .unwrap_or(0); + }) + .reduce( + || Ok(0), + |a, b| -> anyhow::Result<_> { + let a_max = a?; + let b_max = b?; + Ok(max(a_max, b_max)) + }, + )?; next_task_id = next_task_id.max(result); } @@ -403,11 +410,64 @@ impl BackingStorageSealed )?; } WriteBatch::Serial(batch) => { + let mut task_items_result = Ok(Vec::new()); + turbo_tasks::scope(|s| { + s.spawn(|_| { + task_items_result = + process_task_data(snapshots, None::<&T::ConcurrentWriteBatch<'_>>); + }); + + let mut next_task_id = + get_next_free_task_id::< + T::SerialWriteBatch<'_>, + T::ConcurrentWriteBatch<'_>, + >(&mut WriteBatchRef::serial(batch))?; + + { + let _span = tracing::trace_span!( + "update task cache", + items = task_cache_updates.iter().map(|m| m.len()).sum::() + ) + .entered(); + let mut task_type_bytes = Vec::new(); + for (task_type, task_id) in task_cache_updates.into_iter().flatten() { + let task_id = *task_id; + serialize_task_type(&task_type, &mut task_type_bytes, task_id)?; + + batch + .put( + KeySpace::ForwardTaskCache, + WriteBuffer::Borrowed(&task_type_bytes), + WriteBuffer::Borrowed(&task_id.to_le_bytes()), + ) + .with_context(|| { + anyhow!("Unable to write task cache {task_type:?} => {task_id}") + })?; + batch + .put( + KeySpace::ReverseTaskCache, + WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()), + WriteBuffer::Borrowed(&task_type_bytes), + ) + .with_context(|| { + anyhow!("Unable to write task cache {task_id} => {task_type:?}") + })?; + next_task_id = next_task_id.max(task_id + 1); + } + } + + save_infra::, T::ConcurrentWriteBatch<'_>>( + &mut WriteBatchRef::serial(batch), + next_task_id, + session_id, + operations, + )?; + anyhow::Ok(()) + })?; + { let _span = tracing::trace_span!("update tasks").entered(); - let task_items = - process_task_data(snapshots, None::<&T::ConcurrentWriteBatch<'_>>)?; - for (task_id, meta, data) in task_items.into_iter().flatten() { + for (task_id, meta, data) in task_items_result?.into_iter().flatten() { let key = IntKey::new(*task_id); let key = key.as_ref(); if let Some(meta) = meta { @@ -425,54 +485,7 @@ impl BackingStorageSealed })?; } } - batch.flush(KeySpace::TaskMeta)?; - batch.flush(KeySpace::TaskData)?; - } - - let mut next_task_id = get_next_free_task_id::< - T::SerialWriteBatch<'_>, - T::ConcurrentWriteBatch<'_>, - >(&mut WriteBatchRef::serial(batch))?; - - { - let _span = tracing::trace_span!( - "update task cache", - items = task_cache_updates.iter().map(|m| m.len()).sum::() - ) - .entered(); - let mut task_type_bytes = Vec::new(); - for (task_type, task_id) in task_cache_updates.into_iter().flatten() { - let task_id = *task_id; - serialize_task_type(&task_type, &mut task_type_bytes, task_id)?; - - batch - .put( - KeySpace::ForwardTaskCache, - WriteBuffer::Borrowed(&task_type_bytes), - WriteBuffer::Borrowed(&task_id.to_le_bytes()), - ) - .with_context(|| { - anyhow!("Unable to write task cache {task_type:?} => {task_id}") - })?; - batch - .put( - KeySpace::ReverseTaskCache, - WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()), - WriteBuffer::Borrowed(&task_type_bytes), - ) - .with_context(|| { - anyhow!("Unable to write task cache {task_id} => {task_type:?}") - })?; - next_task_id = next_task_id.max(task_id + 1); - } } - - save_infra::, T::ConcurrentWriteBatch<'_>>( - &mut WriteBatchRef::serial(batch), - next_task_id, - session_id, - operations, - )?; } } @@ -690,38 +703,48 @@ where > + Send + Sync, { - parallel::map_collect_owned::<_, _, Result>>(tasks, |tasks| { - let mut result = Vec::new(); - for (task_id, meta, data) in tasks { - if let Some(batch) = batch { - let key = IntKey::new(*task_id); - let key = key.as_ref(); - if let Some(meta) = meta { - batch.put( - KeySpace::TaskMeta, - WriteBuffer::Borrowed(key), - WriteBuffer::SmallVec(meta), - )?; - } - if let Some(data) = data { - batch.put( - KeySpace::TaskData, - WriteBuffer::Borrowed(key), - WriteBuffer::SmallVec(data), - )?; + let span = Span::current(); + let turbo_tasks = turbo_tasks::turbo_tasks(); + let handle = tokio::runtime::Handle::current(); + tasks + .into_par_iter() + .map(|tasks| { + let _span = span.clone().entered(); + let _guard = handle.clone().enter(); + turbo_tasks_scope(turbo_tasks.clone(), || { + let mut result = Vec::new(); + for (task_id, meta, data) in tasks { + if let Some(batch) = batch { + let key = IntKey::new(*task_id); + let key = key.as_ref(); + if let Some(meta) = meta { + batch.put( + KeySpace::TaskMeta, + WriteBuffer::Borrowed(key), + WriteBuffer::SmallVec(meta), + )?; + } + if let Some(data) = data { + batch.put( + KeySpace::TaskData, + WriteBuffer::Borrowed(key), + WriteBuffer::SmallVec(data), + )?; + } + } else { + // Store the new task data + result.push(( + task_id, + meta.map(WriteBuffer::SmallVec), + data.map(WriteBuffer::SmallVec), + )); + } } - } else { - // Store the new task data - result.push(( - task_id, - meta.map(WriteBuffer::SmallVec), - data.map(WriteBuffer::SmallVec), - )); - } - } - Ok(result) - }) + Ok(result) + }) + }) + .collect::>>() } fn serialize(task: TaskId, data: &Vec) -> Result> { diff --git a/turbopack/crates/turbo-tasks-backend/tests/all_in_one.rs b/turbopack/crates/turbo-tasks-backend/tests/all_in_one.rs index add31c32ecd35..f9321cfd797fb 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/all_in_one.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/all_in_one.rs @@ -9,7 +9,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn all_in_one() { run(®ISTRATION, || async { let a: Vc = Vc::cell(4242); diff --git a/turbopack/crates/turbo-tasks-backend/tests/basic.rs b/turbopack/crates/turbo-tasks-backend/tests/basic.rs index a22cb96ade456..a12da0b8578d8 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/basic.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/basic.rs @@ -8,7 +8,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn basic() { run(®ISTRATION, || async { let output1 = func_without_args(); diff --git a/turbopack/crates/turbo-tasks-backend/tests/bug.rs b/turbopack/crates/turbo-tasks-backend/tests/bug.rs index 5d225bdb8c48e..f7e8097a1b7aa 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/bug.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/bug.rs @@ -24,7 +24,7 @@ struct TaskSpec { #[turbo_tasks::value(transparent)] struct TasksSpec(Vec); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn graph_bug() { // see https://github.com/vercel/next.js/pull/79451 run(®ISTRATION, || async { diff --git a/turbopack/crates/turbo-tasks-backend/tests/bug2.rs b/turbopack/crates/turbo-tasks-backend/tests/bug2.rs index a1495eeeca91b..df3115b8aa3da 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/bug2.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/bug2.rs @@ -33,7 +33,7 @@ pub struct TaskSpec { #[turbo_tasks::value(transparent)] struct Iteration(State); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn graph_bug() { run(®ISTRATION, move || async move { let spec = vec![ diff --git a/turbopack/crates/turbo-tasks-backend/tests/call_types.rs b/turbopack/crates/turbo-tasks-backend/tests/call_types.rs index f06430ada2bd0..17875d2630d78 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/call_types.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/call_types.rs @@ -8,7 +8,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn functions() { run(®ISTRATION, || async { assert_eq!(*fn_plain().await?, 42); @@ -53,7 +53,7 @@ async fn async_fn_vc_arg(n: Vc) -> Result> { Ok(Vc::cell(*n.await?)) } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn methods() { run(®ISTRATION, || async { assert_eq!(*Value::static_method().await?, 42); @@ -106,7 +106,7 @@ impl Value { } } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn trait_methods() { run(®ISTRATION, || async { assert_eq!(*Value::static_trait_method().await?, 42); diff --git a/turbopack/crates/turbo-tasks-backend/tests/collectibles.rs b/turbopack/crates/turbo-tasks-backend/tests/collectibles.rs index 945845a86e3a2..a86c0e09343d0 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/collectibles.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/collectibles.rs @@ -14,7 +14,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn transitive_emitting() { run(®ISTRATION, || async { let result_op = my_transitive_emitting_function(rcstr!(""), rcstr!("")); @@ -32,7 +32,7 @@ async fn transitive_emitting() { .unwrap() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn transitive_emitting_indirect() { run(®ISTRATION, || async { let result_op = my_transitive_emitting_function(rcstr!(""), rcstr!("")); @@ -50,7 +50,7 @@ async fn transitive_emitting_indirect() { .unwrap() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn multi_emitting() { run(®ISTRATION, || async { let result_op = my_multi_emitting_function(); @@ -68,7 +68,7 @@ async fn multi_emitting() { .unwrap() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn taking_collectibles() { run(®ISTRATION, || async { let result_op = my_collecting_function(); @@ -84,7 +84,7 @@ async fn taking_collectibles() { .unwrap() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn taking_collectibles_extra_layer() { run(®ISTRATION, || async { let result_op = my_collecting_function_indirect(); @@ -100,7 +100,7 @@ async fn taking_collectibles_extra_layer() { .unwrap() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn taking_collectibles_parallel() { run(®ISTRATION, || async { let result_op = my_transitive_emitting_function(rcstr!(""), rcstr!("a")); @@ -142,7 +142,7 @@ async fn taking_collectibles_parallel() { .unwrap() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn taking_collectibles_with_resolve() { run(®ISTRATION, || async { let result_op = my_transitive_emitting_function_with_resolve(rcstr!("resolve")); diff --git a/turbopack/crates/turbo-tasks-backend/tests/debug.rs b/turbopack/crates/turbo-tasks-backend/tests/debug.rs index ccc833eeb85d8..854d57b234395 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/debug.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/debug.rs @@ -9,7 +9,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn primitive_debug() { run(®ISTRATION, || async { let a: Vc = Vc::cell(42); @@ -20,7 +20,7 @@ async fn primitive_debug() { .unwrap() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn transparent_debug() { run(®ISTRATION, || async { let a: Vc = Transparent(42).cell(); @@ -32,7 +32,7 @@ async fn transparent_debug() { .unwrap() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn enum_none_debug() { run(®ISTRATION, || async { let a: Vc = Enum::None.cell(); @@ -44,7 +44,7 @@ async fn enum_none_debug() { .unwrap() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn enum_transparent_debug() { run(®ISTRATION, || async { let a: Vc = Enum::Transparent(Transparent(42).resolved_cell()).cell(); @@ -60,7 +60,7 @@ async fn enum_transparent_debug() { .unwrap() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn enum_inner_vc_debug() { run(®ISTRATION, || async { let a: Vc = Enum::Enum(Enum::None.resolved_cell()).cell(); @@ -76,7 +76,7 @@ async fn enum_inner_vc_debug() { .unwrap() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn struct_unit_debug() { run(®ISTRATION, || async { let a: Vc = StructUnit.cell(); @@ -87,7 +87,7 @@ async fn struct_unit_debug() { .unwrap() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn struct_transparent_debug() { run(®ISTRATION, || async { let a: Vc = StructWithTransparent { @@ -106,7 +106,7 @@ async fn struct_transparent_debug() { .unwrap() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn struct_vec_debug() { run(®ISTRATION, || async { let a: Vc = StructWithVec { vec: vec![] }.cell(); @@ -135,7 +135,7 @@ async fn struct_vec_debug() { .unwrap() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn struct_ignore_debug() { run(®ISTRATION, || async { let a: Vc = StructWithIgnore { diff --git a/turbopack/crates/turbo-tasks-backend/tests/detached.rs b/turbopack/crates/turbo-tasks-backend/tests/detached.rs index b1c80929fad6a..c76c23590f8ab 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/detached.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/detached.rs @@ -15,7 +15,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_spawns_detached() -> anyhow::Result<()> { run(®ISTRATION, || async { // HACK: The watch channel we use has an incorrect implementation of `TraceRawVcs`, just @@ -82,7 +82,7 @@ async fn spawns_detached( Vc::cell(()) } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_spawns_detached_changing() -> anyhow::Result<()> { run(®ISTRATION, || async { // HACK: The watch channel we use has an incorrect implementation of `TraceRawVcs` diff --git a/turbopack/crates/turbo-tasks-backend/tests/dirty_in_progress.rs b/turbopack/crates/turbo-tasks-backend/tests/dirty_in_progress.rs index 89aa8998fae80..8171cead7dd40 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/dirty_in_progress.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/dirty_in_progress.rs @@ -11,7 +11,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn dirty_in_progress() { run(®ISTRATION, || async { let cases = [ diff --git a/turbopack/crates/turbo-tasks-backend/tests/emptied_cells.rs b/turbopack/crates/turbo-tasks-backend/tests/emptied_cells.rs index 87c2d6672e468..4a3ddce3bfa73 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/emptied_cells.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/emptied_cells.rs @@ -8,7 +8,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn recompute() { run(®ISTRATION, || async { let input = ChangingInput { diff --git a/turbopack/crates/turbo-tasks-backend/tests/filter_unused_args.rs b/turbopack/crates/turbo-tasks-backend/tests/filter_unused_args.rs index 3193382110215..b7081174940c6 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/filter_unused_args.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/filter_unused_args.rs @@ -8,7 +8,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn filtered_trait_method_args() -> Result<()> { run(®ISTRATION, || async { let uses_arg = UsesArg.cell(); diff --git a/turbopack/crates/turbo-tasks-backend/tests/immutable.rs b/turbopack/crates/turbo-tasks-backend/tests/immutable.rs index 0c716c7544744..d90a4cb2f78de 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/immutable.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/immutable.rs @@ -8,7 +8,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn hidden_mutate() { run(®ISTRATION, || async { let input = create_input().resolve().await?; diff --git a/turbopack/crates/turbo-tasks-backend/tests/local_tasks.rs b/turbopack/crates/turbo-tasks-backend/tests/local_tasks.rs index f66363d374635..e2a6a7abdfa74 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/local_tasks.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/local_tasks.rs @@ -8,7 +8,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_local_task_id() -> Result<()> { run(®ISTRATION, || async { let local_vc = get_local_task_id(); diff --git a/turbopack/crates/turbo-tasks-backend/tests/operation_vc.rs b/turbopack/crates/turbo-tasks-backend/tests/operation_vc.rs index 457971d0667c7..8000ddc8b26e3 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/operation_vc.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/operation_vc.rs @@ -26,7 +26,7 @@ fn use_operations() -> Vc { forty_two.connect() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_use_operations() -> Result<()> { run(®ISTRATION, || async { assert_eq!(*use_operations().await?, 42); diff --git a/turbopack/crates/turbo-tasks-backend/tests/panics.rs b/turbopack/crates/turbo-tasks-backend/tests/panics.rs index 8b9458ab4f046..d321e825f1430 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/panics.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/panics.rs @@ -25,7 +25,7 @@ static FILE_PATH_REGEX: LazyLock = // // This test depends on the process-wide global panic handler. This test must be run in its own // process in isolation of any other tests. -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_panic_hook() { let prev_hook = take_hook(); set_hook(Box::new(move |info| { diff --git a/turbopack/crates/turbo-tasks-backend/tests/performance.rs b/turbopack/crates/turbo-tasks-backend/tests/performance.rs index 13b76582af633..904843fad2a63 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/performance.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/performance.rs @@ -142,7 +142,7 @@ fn check_skip() -> bool { false } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn many_calls_to_many_children() { if check_skip() { return; @@ -157,7 +157,7 @@ async fn many_calls_to_many_children() { .unwrap(); } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn many_calls_to_uncached_many_children() { if check_skip() { return; @@ -189,7 +189,7 @@ fn run_big_graph_test(counts: Vec) -> impl Future> + Se ) } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn many_calls_to_big_graph_1() { if check_skip() { return; @@ -199,7 +199,7 @@ async fn many_calls_to_big_graph_1() { .unwrap(); } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn many_calls_to_big_graph_2() { if check_skip() { return; @@ -211,7 +211,7 @@ async fn many_calls_to_big_graph_2() { .unwrap(); } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn many_calls_to_big_graph_3() { if check_skip() { return; @@ -221,7 +221,7 @@ async fn many_calls_to_big_graph_3() { .unwrap(); } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn many_calls_to_big_graph_4() { if check_skip() { return; @@ -231,7 +231,7 @@ async fn many_calls_to_big_graph_4() { .unwrap(); } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn many_calls_to_big_graph_5() { if check_skip() { return; @@ -243,7 +243,7 @@ async fn many_calls_to_big_graph_5() { .unwrap(); } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn many_calls_to_big_graph_6() { if check_skip() { return; @@ -255,7 +255,7 @@ async fn many_calls_to_big_graph_6() { .unwrap(); } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn many_calls_to_big_graph_7() { if check_skip() { return; @@ -270,7 +270,7 @@ async fn many_calls_to_big_graph_7() { .unwrap(); } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn many_calls_to_big_graph_8() { if check_skip() { return; @@ -282,7 +282,7 @@ async fn many_calls_to_big_graph_8() { .unwrap(); } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn many_calls_to_big_graph_9() { if check_skip() { return; diff --git a/turbopack/crates/turbo-tasks-backend/tests/random_change.rs b/turbopack/crates/turbo-tasks-backend/tests/random_change.rs index 089490ab1c79c..841c4564af444 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/random_change.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/random_change.rs @@ -9,7 +9,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn random_change() { run(®ISTRATION, || async { let state = make_state(); diff --git a/turbopack/crates/turbo-tasks-backend/tests/read_ref_cell.rs b/turbopack/crates/turbo-tasks-backend/tests/read_ref_cell.rs index 66c51c9e4f1ad..d7ccf3b37b6cf 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/read_ref_cell.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/read_ref_cell.rs @@ -10,7 +10,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn read_ref() { run(®ISTRATION, || async { let counter = Counter::cell(Counter { diff --git a/turbopack/crates/turbo-tasks-backend/tests/recompute.rs b/turbopack/crates/turbo-tasks-backend/tests/recompute.rs index dcad783b06e08..17a69e9c151d3 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/recompute.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/recompute.rs @@ -8,7 +8,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn recompute() { run(®ISTRATION, || async { let input = ChangingInput { @@ -58,7 +58,7 @@ async fn recompute() { .unwrap() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn immutable_analysis() { run(®ISTRATION, || async { let input = ChangingInput { diff --git a/turbopack/crates/turbo-tasks-backend/tests/recompute_collectibles.rs b/turbopack/crates/turbo-tasks-backend/tests/recompute_collectibles.rs index d7c0be301ac70..54074af628add 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/recompute_collectibles.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/recompute_collectibles.rs @@ -9,7 +9,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn recompute() { run(®ISTRATION, || async { let input = ChangingInput::new(1).resolve().await?; diff --git a/turbopack/crates/turbo-tasks-backend/tests/resolved_vc.rs b/turbopack/crates/turbo-tasks-backend/tests/resolved_vc.rs index a0b9914b7f8bb..da3a69ca62dce 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/resolved_vc.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/resolved_vc.rs @@ -23,7 +23,7 @@ fn assert_resolved(input: ResolvedVc) { assert!(input_vc.is_resolved()); } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_conversion() -> Result<()> { run(®ISTRATION, || async { let unresolved: Vc = Vc::cell(42); @@ -38,7 +38,7 @@ async fn test_conversion() -> Result<()> { .await } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_cell_construction() -> Result<()> { run(®ISTRATION, || async { let a: ResolvedVc = ResolvedVc::cell(42); @@ -50,7 +50,7 @@ async fn test_cell_construction() -> Result<()> { .await } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_resolved_vc_as_arg() -> Result<()> { run(®ISTRATION, || async { let unresolved: Vc = returns_int(42); @@ -62,7 +62,7 @@ async fn test_resolved_vc_as_arg() -> Result<()> { .await } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_into_future() -> Result<()> { run(®ISTRATION, || async { let mut resolved = ResolvedVc::cell(42); @@ -78,7 +78,7 @@ async fn test_into_future() -> Result<()> { .await } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_sidecast() -> Result<()> { run(®ISTRATION, || async { let concrete_value = ImplementsAAndB.resolved_cell(); diff --git a/turbopack/crates/turbo-tasks-backend/tests/shrink_to_fit.rs b/turbopack/crates/turbo-tasks-backend/tests/shrink_to_fit.rs index dc82e82174de5..524a78950acf2 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/shrink_to_fit.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/shrink_to_fit.rs @@ -11,7 +11,7 @@ static REGISTRATION: Registration = register!(); #[turbo_tasks::value(transparent)] struct Wrapper(Vec); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_shrink_to_fit() -> Result<()> { run(®ISTRATION, || async { // `Vec::shrink_to_fit` is implicitly called when a cell is constructed. diff --git a/turbopack/crates/turbo-tasks-backend/tests/task_statistics.rs b/turbopack/crates/turbo-tasks-backend/tests/task_statistics.rs index 869c944bcb5c7..8a391ace095aa 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/task_statistics.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/task_statistics.rs @@ -13,7 +13,7 @@ use turbo_tasks_testing::{Registration, register, run_without_cache_check}; static REGISTRATION: Registration = register!(); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_simple_task() -> Result<()> { run_without_cache_check(®ISTRATION, async move { enable_stats(); @@ -39,7 +39,7 @@ async fn test_simple_task() -> Result<()> { .await } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_await_same_vc_multiple_times() -> Result<()> { run_without_cache_check(®ISTRATION, async move { enable_stats(); @@ -61,7 +61,7 @@ async fn test_await_same_vc_multiple_times() -> Result<()> { .await } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_vc_receiving_task() -> Result<()> { run_without_cache_check(®ISTRATION, async move { enable_stats(); @@ -93,7 +93,7 @@ async fn test_vc_receiving_task() -> Result<()> { .await } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_trait_methods() -> Result<()> { run_without_cache_check(®ISTRATION, async move { enable_stats(); @@ -130,7 +130,7 @@ async fn test_trait_methods() -> Result<()> { .await } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_dyn_trait_methods() -> Result<()> { run_without_cache_check(®ISTRATION, async move { enable_stats(); @@ -174,7 +174,7 @@ async fn test_dyn_trait_methods() -> Result<()> { } // creates Vcs, but doesn't ever execute them -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_no_execution() -> Result<()> { run_without_cache_check(®ISTRATION, async move { enable_stats(); diff --git a/turbopack/crates/turbo-tasks-backend/tests/trace_transient.rs b/turbopack/crates/turbo-tasks-backend/tests/trace_transient.rs index f553a83a52c5b..74c21fcaebb65 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/trace_transient.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/trace_transient.rs @@ -18,7 +18,7 @@ Adder::add_method (read cell of type turbo-tasks@TODO::::primitives::u64) unknown transient task (read cell of type turbo-tasks@TODO::::primitives::u16) unknown transient task (read cell of type turbo-tasks@TODO::::primitives::u32)"; -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_trace_transient() { let result = run_without_cache_check(®ISTRATION, async { read_incorrect_task_input_operation(IncorrectTaskInput( diff --git a/turbopack/crates/turbo-tasks-backend/tests/trait_ref_cell.rs b/turbopack/crates/turbo-tasks-backend/tests/trait_ref_cell.rs index 2372947303360..c556e8d422489 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/trait_ref_cell.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/trait_ref_cell.rs @@ -10,7 +10,7 @@ use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn trait_ref() { run(®ISTRATION, || async { let counter = Counter::cell(Counter { diff --git a/turbopack/crates/turbo-tasks-backend/tests/trait_ref_cell_mode.rs b/turbopack/crates/turbo-tasks-backend/tests/trait_ref_cell_mode.rs index 3b8d1cb15c02a..15917f62563bf 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/trait_ref_cell_mode.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/trait_ref_cell_mode.rs @@ -9,7 +9,7 @@ static REGISTRATION: Registration = register!(); // Test that with `cell = "shared"`, the cell will be re-used as long as the // value is equal. -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_trait_ref_shared_cell_mode() { run(®ISTRATION, || async { let input = CellIdSelector { @@ -44,7 +44,7 @@ async fn test_trait_ref_shared_cell_mode() { // Test that with `cell = "new"`, the cell will is never re-used, even if the // value is equal. -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_trait_ref_new_cell_mode() { run(®ISTRATION, || async { let input = CellIdSelector { diff --git a/turbopack/crates/turbo-tasks-backend/tests/transient_collectible.rs b/turbopack/crates/turbo-tasks-backend/tests/transient_collectible.rs index b144319ed4763..216e8a285dbf8 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/transient_collectible.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/transient_collectible.rs @@ -10,7 +10,7 @@ static REGISTRATION: Registration = register!(); const EXPECTED_MSG: &str = "Collectible is transient, transient collectibles cannot be emitted from persistent tasks"; -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_transient_emit_from_persistent() { let result = run_without_cache_check(®ISTRATION, async { emit_incorrect_task_input_operation(IncorrectTaskInput(U32Wrapper(123).resolved_cell())) diff --git a/turbopack/crates/turbo-tasks-backend/tests/transient_vc.rs b/turbopack/crates/turbo-tasks-backend/tests/transient_vc.rs index 100008c755c5c..7db072310c915 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/transient_vc.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/transient_vc.rs @@ -7,7 +7,7 @@ use turbo_tasks_testing::{Registration, register, run_without_cache_check}; static REGISTRATION: Registration = register!(); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_transient_vc() -> Result<()> { run_without_cache_check(®ISTRATION, async { test_transient_operation(TransientValue::new(123)) diff --git a/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs b/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs index b44f2a3a00522..a325eefa0f445 100644 --- a/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs +++ b/turbopack/crates/turbo-tasks-fetch/tests/fetch.rs @@ -18,7 +18,7 @@ static REGISTRATION: Registration = register!(turbo_tasks_fetch::register); /// acquire and hold this lock to prevent potential flakiness. static GLOBAL_TEST_LOCK: TokioMutex<()> = TokioMutex::const_new(()); -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn basic_get() { let _guard = GLOBAL_TEST_LOCK.lock().await; run(®ISTRATION, || async { @@ -49,7 +49,7 @@ async fn basic_get() { .unwrap() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn sends_user_agent() { let _guard = GLOBAL_TEST_LOCK.lock().await; run(®ISTRATION, || async { @@ -85,7 +85,7 @@ async fn sends_user_agent() { // This is temporary behavior. // TODO: Implement invalidation that respects Cache-Control headers. -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn invalidation_does_not_invalidate() { let _guard = GLOBAL_TEST_LOCK.lock().await; run(®ISTRATION, || async { @@ -130,7 +130,7 @@ fn get_issue_context() -> Vc { DiskFileSystem::new(rcstr!("root"), rcstr!("/")).root() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn errors_on_failed_connection() { let _guard = GLOBAL_TEST_LOCK.lock().await; run(®ISTRATION, || async { @@ -161,7 +161,7 @@ async fn errors_on_failed_connection() { .unwrap() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn errors_on_404() { let _guard = GLOBAL_TEST_LOCK.lock().await; run(®ISTRATION, || async { @@ -196,7 +196,7 @@ async fn errors_on_404() { .unwrap() } -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn client_cache() { // a simple fetch that should always succeed async fn simple_fetch(path: &str, client: FetchClient) -> anyhow::Result<()> { diff --git a/turbopack/crates/turbo-tasks-fs/src/lib.rs b/turbopack/crates/turbo-tasks-fs/src/lib.rs index 2ae2dc3036f0c..9a17774a49c37 100644 --- a/turbopack/crates/turbo-tasks-fs/src/lib.rs +++ b/turbopack/crates/turbo-tasks-fs/src/lib.rs @@ -46,6 +46,7 @@ use dunce::simplified; use indexmap::IndexSet; use jsonc_parser::{ParseOptions, parse_to_serde_value}; use mime::Mime; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rustc_hash::FxHashSet; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -55,7 +56,7 @@ use turbo_rcstr::{RcStr, rcstr}; use turbo_tasks::{ ApplyEffectsContext, Completion, InvalidationReason, Invalidator, NonLocalValue, ReadRef, ResolvedVc, TaskInput, ValueToString, Vc, debug::ValueDebugFormat, effect, - mark_session_dependent, mark_stateful, parallel, trace::TraceRawVcs, + mark_session_dependent, mark_stateful, trace::TraceRawVcs, }; use turbo_tasks_hash::{DeterministicHash, DeterministicHasher, hash_xxh3_hash64}; use turbo_unix_path::{ @@ -308,14 +309,19 @@ impl DiskFileSystemInner { fn invalidate(&self) { let _span = tracing::info_span!("invalidate filesystem", name = &*self.root).entered(); + let span = tracing::Span::current(); + let handle = tokio::runtime::Handle::current(); let invalidator_map = take(&mut *self.invalidator_map.lock().unwrap()); let dir_invalidator_map = take(&mut *self.dir_invalidator_map.lock().unwrap()); - let invalidators = invalidator_map - .into_iter() - .chain(dir_invalidator_map) - .flat_map(|(_, invalidators)| invalidators.into_keys()) - .collect::>(); - parallel::for_each_owned(invalidators, |invalidator| invalidator.invalidate()); + let iter = invalidator_map + .into_par_iter() + .chain(dir_invalidator_map.into_par_iter()) + .flat_map(|(_, invalidators)| invalidators.into_par_iter()); + iter.for_each(|(i, _)| { + let _span = span.clone().entered(); + let _guard = handle.enter(); + i.invalidate() + }); } /// Invalidates every tracked file in the filesystem. @@ -326,19 +332,23 @@ impl DiskFileSystemInner { reason: impl Fn(&Path) -> R + Sync, ) { let _span = tracing::info_span!("invalidate filesystem", name = &*self.root).entered(); + let span = tracing::Span::current(); + let handle = tokio::runtime::Handle::current(); let invalidator_map = take(&mut *self.invalidator_map.lock().unwrap()); let dir_invalidator_map = take(&mut *self.dir_invalidator_map.lock().unwrap()); - let invalidators = invalidator_map - .into_iter() - .chain(dir_invalidator_map) + let iter = invalidator_map + .into_par_iter() + .chain(dir_invalidator_map.into_par_iter()) .flat_map(|(path, invalidators)| { + let _span = span.clone().entered(); let reason_for_path = reason(&path); invalidators - .into_keys() + .into_par_iter() .map(move |i| (reason_for_path.clone(), i)) - }) - .collect::>(); - parallel::for_each_owned(invalidators, |(reason, invalidator)| { + }); + iter.for_each(|(reason, (invalidator, _))| { + let _span = span.clone().entered(); + let _guard = handle.enter(); invalidator.invalidate_with_reason(reason) }); } diff --git a/turbopack/crates/turbo-tasks-fs/src/watcher.rs b/turbopack/crates/turbo-tasks-fs/src/watcher.rs index a9f58e0f2d6ca..e39a7a022d700 100644 --- a/turbopack/crates/turbo-tasks-fs/src/watcher.rs +++ b/turbopack/crates/turbo-tasks-fs/src/watcher.rs @@ -16,12 +16,13 @@ use notify::{ Config, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher, event::{MetadataKind, ModifyKind, RenameMode}, }; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rustc_hash::FxHashSet; use serde::{Deserialize, Serialize}; use tracing::instrument; use turbo_rcstr::RcStr; use turbo_tasks::{ - FxIndexSet, InvalidationReason, InvalidationReasonKind, Invalidator, parallel, spawn_thread, + FxIndexSet, InvalidationReason, InvalidationReasonKind, Invalidator, spawn_thread, util::StaticOrArc, }; @@ -396,30 +397,40 @@ impl DiskWatcher { // // Best is to start_watching before starting to read { - let _span = tracing::info_span!("invalidate filesystem").entered(); + let span = tracing::info_span!("invalidate filesystem"); + let _span = span.clone().entered(); let invalidator_map = take(&mut *fs_inner.invalidator_map.lock().unwrap()); let dir_invalidator_map = take(&mut *fs_inner.dir_invalidator_map.lock().unwrap()); - let iter = invalidator_map.into_iter().chain(dir_invalidator_map); + let iter = invalidator_map + .into_par_iter() + .chain(dir_invalidator_map.into_par_iter()); + let handle = tokio::runtime::Handle::current(); if report_invalidation_reason { - let invalidators = iter - .flat_map(|(path, invalidators)| { - let reason = WatchStart { - name: fs_inner.name.clone(), - // this path is just used for display purposes - path: RcStr::from(path.to_string_lossy()), - }; - invalidators.into_iter().map(move |i| (reason.clone(), i)) - }) - .collect::>(); - parallel::for_each_owned(invalidators, |(reason, (invalidator, _))| { - invalidator.invalidate_with_reason(reason); + iter.flat_map(|(path, invalidators)| { + let _span = span.clone().entered(); + let reason = WatchStart { + name: fs_inner.name.clone(), + // this path is just used for display purposes + path: RcStr::from(path.to_string_lossy()), + }; + invalidators + .into_par_iter() + .map(move |i| (reason.clone(), i)) + }) + .for_each(|(reason, (invalidator, _))| { + let _span = span.clone().entered(); + let _guard = handle.enter(); + invalidator.invalidate_with_reason(reason) }); } else { - let invalidators = iter - .flat_map(|(_, invalidators)| invalidators.into_keys()) - .collect::>(); - parallel::for_each_owned(invalidators, |invalidator| { - invalidator.invalidate(); + iter.flat_map(|(_, invalidators)| { + let _span = span.clone().entered(); + invalidators.into_par_iter().map(move |i| i) + }) + .for_each(|(invalidator, _)| { + let _span = span.clone().entered(); + let _guard = handle.enter(); + invalidator.invalidate() }); } } diff --git a/turbopack/crates/turbo-tasks/src/lib.rs b/turbopack/crates/turbo-tasks/src/lib.rs index 841e4b1f8c455..fc68a0cea6e45 100644 --- a/turbopack/crates/turbo-tasks/src/lib.rs +++ b/turbopack/crates/turbo-tasks/src/lib.rs @@ -37,8 +37,6 @@ #![feature(never_type)] #![feature(downcast_unchecked)] #![feature(ptr_metadata)] -#![feature(sync_unsafe_cell)] -#![feature(vec_into_raw_parts)] pub mod backend; mod capture_future; @@ -66,14 +64,13 @@ mod no_move_vec; mod once_map; mod output; pub mod panic_hooks; -pub mod parallel; pub mod persisted_graph; pub mod primitives; mod raw_vc; mod read_options; mod read_ref; pub mod registry; -pub mod scope; +mod scope; mod serialization_invalidation; pub mod small_duration; mod spawn; @@ -118,6 +115,7 @@ pub use raw_vc::{CellId, RawVc, ReadRawVcFuture, ResolveTypeError}; pub use read_options::ReadCellOptions; pub use read_ref::ReadRef; use rustc_hash::FxHasher; +pub use scope::scope; pub use serialization_invalidation::SerializationInvalidator; pub use shrink_to_fit::ShrinkToFit; pub use spawn::{JoinHandle, spawn, spawn_blocking, spawn_thread}; diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index 393c5224da390..b7f2cfc96b4c6 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -1058,30 +1058,27 @@ impl TurboTasks { } pub async fn stop_and_wait(&self) { - turbo_tasks_future_scope(self.pin(), async move { - self.backend.stopping(self); - self.stopped.store(true, Ordering::Release); - { - let listener = self - .event - .listen_with_note(|| || "wait for stop".to_string()); - if self.currently_scheduled_tasks.load(Ordering::Acquire) != 0 { - listener.await; - } + self.backend.stopping(self); + self.stopped.store(true, Ordering::Release); + { + let listener = self + .event + .listen_with_note(|| || "wait for stop".to_string()); + if self.currently_scheduled_tasks.load(Ordering::Acquire) != 0 { + listener.await; } + } + { + let listener = self.event_background.listen(); + if self + .currently_scheduled_background_jobs + .load(Ordering::Acquire) + != 0 { - let listener = self.event_background.listen(); - if self - .currently_scheduled_background_jobs - .load(Ordering::Acquire) - != 0 - { - listener.await; - } + listener.await; } - self.backend.stop(self); - }) - .await; + } + self.backend.stop(self); } #[track_caller] @@ -1678,10 +1675,6 @@ pub fn turbo_tasks() -> Arc { TURBO_TASKS.with(|arc| arc.clone()) } -pub fn try_turbo_tasks() -> Option> { - TURBO_TASKS.try_with(|arc| arc.clone()).ok() -} - pub fn with_turbo_tasks(func: impl FnOnce(&Arc) -> T) -> T { TURBO_TASKS.with(|arc| func(arc)) } diff --git a/turbopack/crates/turbo-tasks/src/parallel.rs b/turbopack/crates/turbo-tasks/src/parallel.rs deleted file mode 100644 index e20b67bf67892..0000000000000 --- a/turbopack/crates/turbo-tasks/src/parallel.rs +++ /dev/null @@ -1,308 +0,0 @@ -//! Parallel for each and map using tokio tasks. -//! -//! This avoid the problem of sleeping threads with mimalloc when using rayon in combination with -//! tokio. It also avoid having multiple thread pools. -//! see also https://pwy.io/posts/mimalloc-cigarette/ - -use std::{sync::LazyLock, thread::available_parallelism}; - -use crate::{scope::scope_and_block, util::into_chunks}; - -/// Calculates a good chunk size for parallel processing based on the number of available threads. -/// This is used to ensure that the workload is evenly distributed across the threads. -fn good_chunk_size(len: usize) -> usize { - static GOOD_CHUNK_COUNT: LazyLock = - LazyLock::new(|| available_parallelism().map_or(16, |c| c.get() * 4)); - let min_chunk_count = *GOOD_CHUNK_COUNT; - len.div_ceil(min_chunk_count) -} - -pub fn for_each<'l, T, F>(items: &'l [T], f: F) -where - T: Sync, - F: Fn(&'l T) + Send + Sync, -{ - let len = items.len(); - if len <= 1 { - for item in items { - f(item); - } - return; - } - let chunk_size = good_chunk_size(len); - let f = &f; - let _results = scope_and_block(len.div_ceil(chunk_size), |scope| { - for chunk in items.chunks(chunk_size) { - scope.spawn(async move { - for item in chunk { - f(item); - } - }) - } - }); -} - -pub fn for_each_owned(items: Vec, f: impl Fn(T) + Send + Sync) -where - T: Send + Sync, -{ - let len = items.len(); - if len <= 1 { - for item in items { - f(item); - } - return; - } - let chunk_size = good_chunk_size(len); - let f = &f; - let _results = scope_and_block(len.div_ceil(chunk_size), |scope| { - for chunk in into_chunks(items, chunk_size) { - scope.spawn(async move { - // SAFETY: Even when f() panics we drop all items in the chunk. - for item in chunk { - f(item); - } - }) - } - }); -} - -pub fn try_for_each<'l, T, E>( - items: &'l [T], - f: impl (Fn(&'l T) -> Result<(), E>) + Send + Sync, -) -> Result<(), E> -where - T: Sync, - E: Send + 'static, -{ - let len = items.len(); - if len <= 1 { - for item in items { - f(item)?; - } - return Ok(()); - } - let chunk_size = good_chunk_size(len); - let f = &f; - scope_and_block(len.div_ceil(chunk_size), |scope| { - for chunk in items.chunks(chunk_size) { - scope.spawn(async move { - for item in chunk { - f(item)?; - } - Ok(()) - }) - } - }) - .collect::>() -} - -pub fn try_for_each_mut<'l, T, E>( - items: &'l mut [T], - f: impl (Fn(&'l mut T) -> Result<(), E>) + Send + Sync, -) -> Result<(), E> -where - T: Send + Sync, - E: Send + 'static, -{ - let len = items.len(); - if len <= 1 { - for item in items { - f(item)?; - } - return Ok(()); - } - let chunk_size = good_chunk_size(len); - let f = &f; - scope_and_block(len.div_ceil(chunk_size), |scope| { - for chunk in items.chunks_mut(chunk_size) { - scope.spawn(async move { - for item in chunk { - f(item)?; - } - Ok(()) - }) - } - }) - .collect::>() -} - -pub fn try_for_each_owned( - items: Vec, - f: impl (Fn(T) -> Result<(), E>) + Send + Sync, -) -> Result<(), E> -where - T: Send + Sync, - E: Send + 'static, -{ - let len = items.len(); - if len <= 1 { - for item in items { - f(item)?; - } - return Ok(()); - } - let chunk_size = good_chunk_size(len); - let f = &f; - scope_and_block(len.div_ceil(chunk_size), |scope| { - for chunk in into_chunks(items, chunk_size) { - scope.spawn(async move { - for item in chunk { - f(item)?; - } - Ok(()) - }) - } - }) - .collect::>() -} - -pub fn map_collect<'l, Item, PerItemResult, Result>( - items: &'l [Item], - f: impl Fn(&'l Item) -> PerItemResult + Send + Sync, -) -> Result -where - Item: Sync, - PerItemResult: Send + Sync + 'l, - Result: FromIterator, -{ - let len = items.len(); - if len == 0 { - return Result::from_iter(std::iter::empty()); // No items to process, return empty - // collection - } - let chunk_size = good_chunk_size(len); - let f = &f; - scope_and_block(len.div_ceil(chunk_size), |scope| { - for chunk in items.chunks(chunk_size) { - scope.spawn(async move { chunk.iter().map(f).collect::>() }) - } - }) - .flatten() - .collect() -} - -pub fn map_collect_owned<'l, Item, PerItemResult, Result>( - items: Vec, - f: impl Fn(Item) -> PerItemResult + Send + Sync, -) -> Result -where - Item: Send + Sync, - PerItemResult: Send + Sync + 'l, - Result: FromIterator, -{ - let len = items.len(); - if len == 0 { - return Result::from_iter(std::iter::empty()); // No items to process, return empty - // collection; - } - let chunk_size = good_chunk_size(len); - let f = &f; - scope_and_block(len.div_ceil(chunk_size), |scope| { - for chunk in into_chunks(items, chunk_size) { - scope.spawn(async move { chunk.map(f).collect::>() }) - } - }) - .flatten() - .collect() -} - -#[cfg(test)] -mod tests { - use std::{ - panic::{AssertUnwindSafe, catch_unwind}, - sync::atomic::{AtomicI32, Ordering}, - }; - - use super::*; - - #[tokio::test(flavor = "multi_thread")] - async fn test_parallel_for_each() { - let input = vec![1, 2, 3, 4, 5]; - let sum = AtomicI32::new(0); - for_each(&input, |&x| { - sum.fetch_add(x, Ordering::SeqCst); - }); - assert_eq!(sum.load(Ordering::SeqCst), 15); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_parallel_try_for_each() { - let input = vec![1, 2, 3, 4, 5]; - let result = try_for_each(&input, |&x| { - if x % 2 == 0 { - Ok(()) - } else { - Err(format!("Odd number {x} encountered")) - } - }); - assert!(result.is_err()); - assert_eq!(result.unwrap_err(), "Odd number 1 encountered"); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_parallel_try_for_each_mut() { - let mut input = vec![1, 2, 3, 4, 5]; - let result = try_for_each_mut(&mut input, |x| { - *x += 10; - if *x % 2 == 0 { - Ok(()) - } else { - Err(format!("Odd number {} encountered", *x)) - } - }); - assert!(result.is_err()); - assert_eq!(result.unwrap_err(), "Odd number 11 encountered"); - assert_eq!(input, vec![11, 12, 13, 14, 15]); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_parallel_for_each_owned() { - let input = vec![1, 2, 3, 4, 5]; - let sum = AtomicI32::new(0); - for_each_owned(input, |x| { - sum.fetch_add(x, Ordering::SeqCst); - }); - assert_eq!(sum.load(Ordering::SeqCst), 15); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_parallel_map_collect() { - let input = vec![1, 2, 3, 4, 5]; - let result: Vec<_> = map_collect(&input, |&x| x * 2); - assert_eq!(result, vec![2, 4, 6, 8, 10]); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_parallel_map_collect_owned() { - let input = vec![1, 2, 3, 4, 5]; - let result: Vec<_> = map_collect_owned(input, |x| x * 2); - assert_eq!(result, vec![2, 4, 6, 8, 10]); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_parallel_map_collect_owned_many() { - let input = vec![1; 1000]; - let result: Vec<_> = map_collect_owned(input, |x| x * 2); - assert_eq!(result, vec![2; 1000]); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_panic_in_scope() { - let result = catch_unwind(AssertUnwindSafe(|| { - let mut input = vec![1; 1000]; - input[744] = 2; - for_each(&input, |x| { - if *x == 2 { - panic!("Intentional panic"); - } - }); - panic!("Should not get here") - })); - assert!(result.is_err()); - assert_eq!( - result.unwrap_err().downcast_ref::<&str>(), - Some(&"Intentional panic") - ); - } -} diff --git a/turbopack/crates/turbo-tasks/src/scope.rs b/turbopack/crates/turbo-tasks/src/scope.rs index 4c474b35f22a3..bfe5e355df358 100644 --- a/turbopack/crates/turbo-tasks/src/scope.rs +++ b/turbopack/crates/turbo-tasks/src/scope.rs @@ -1,289 +1,52 @@ -//! A scoped tokio spawn implementation that allow a non-'static lifetime for tasks. +use std::sync::Arc; -use std::{ - any::Any, - marker::PhantomData, - panic::{self, AssertUnwindSafe, catch_unwind}, - pin::Pin, - sync::{ - Arc, - atomic::{AtomicUsize, Ordering}, - }, - thread::{self, Thread}, -}; +use crate::{TurboTasksApi, turbo_tasks, turbo_tasks_scope}; -use futures::FutureExt; -use parking_lot::Mutex; -use tokio::{runtime::Handle, task::block_in_place}; -use tracing::{Instrument, Span, info_span}; - -use crate::{ - TurboTasksApi, - manager::{try_turbo_tasks, turbo_tasks_future_scope}, -}; - -struct ScopeInner { - main_thread: Thread, - remaining_tasks: AtomicUsize, - /// The first panic that occurred in the tasks, by task index. - /// The usize value is the index of the task. - panic: Mutex, usize)>>, +/// A wrapper around [`rayon::Scope`] that preserves the [`turbo_tasks_scope`]. +pub struct Scope<'scope, 'a> { + scope: &'a rayon::Scope<'scope>, + handle: tokio::runtime::Handle, + turbo_tasks: Arc, + span: tracing::Span, } -impl ScopeInner { - fn on_task_finished(&self, panic: Option<(Box, usize)>) { - if let Some((err, index)) = panic { - let mut old_panic = self.panic.lock(); - if old_panic.as_ref().is_none_or(|&(_, i)| i > index) { - *old_panic = Some((err, index)); - } - } - if self.remaining_tasks.fetch_sub(1, Ordering::Release) == 1 { - self.main_thread.unpark(); - } - } - - fn wait(&self) { - let _span = info_span!("blocking").entered(); - while self.remaining_tasks.load(Ordering::Acquire) != 0 { - thread::park(); - } - if let Some((err, _)) = self.panic.lock().take() { - panic::resume_unwind(err); - } - } -} - -/// Scope to allow spawning tasks with a limited lifetime. -/// -/// Dropping this Scope will wait for all tasks to complete. -pub struct Scope<'scope, 'env: 'scope, R: Send + 'env> { - results: &'scope [Mutex>], - index: AtomicUsize, - inner: Arc, - handle: Handle, - turbo_tasks: Option>, - span: Span, - /// Invariance over 'env, to make sure 'env cannot shrink, - /// which is necessary for soundness. - /// - /// see https://doc.rust-lang.org/src/std/thread/scoped.rs.html#12-29 - env: PhantomData<&'env mut &'env ()>, -} - -impl<'scope, 'env: 'scope, R: Send + 'env> Scope<'scope, 'env, R> { - /// Creates a new scope. - /// - /// # Safety - /// - /// The caller must ensure `Scope` is dropped and not forgotten. - unsafe fn new(results: &'scope [Mutex>]) -> Self { - Self { - results, - index: AtomicUsize::new(0), - inner: Arc::new(ScopeInner { - main_thread: thread::current(), - remaining_tasks: AtomicUsize::new(0), - panic: Mutex::new(None), - }), - handle: Handle::current(), - turbo_tasks: try_turbo_tasks(), - span: Span::current(), - env: PhantomData, - } - } - - /// Spawns a new task in the scope. - pub fn spawn(&self, f: F) +impl<'scope> Scope<'scope, '_> { + pub fn spawn(&self, body: Body) where - F: Future + Send + 'env, + Body: FnOnce(&Scope<'scope, '_>) + Send + 'scope, { - let index = self.index.fetch_add(1, Ordering::Relaxed); - assert!(index < self.results.len(), "Too many tasks spawned"); - let result_cell: &Mutex> = &self.results[index]; - - let f: Box + Send + 'scope> = Box::new(async move { - let result = f.await; - *result_cell.lock() = Some(result); - }); - let f: *mut (dyn Future + Send + 'scope) = Box::into_raw(f); - // SAFETY: Scope ensures (e. g. in Drop) that spawned tasks is awaited before the - // lifetime `'env` ends. - #[allow( - clippy::unnecessary_cast, - reason = "Clippy thinks this is unnecessary, but it actually changes the lifetime" - )] - let f = f as *mut (dyn Future + Send + 'static); - // SAFETY: We just called `Box::into_raw`. - let f = unsafe { Box::from_raw(f) }; - // We pin the future in the box in memory to be able to await it. - let f = Pin::from(f); - - let turbo_tasks = self.turbo_tasks.clone(); let span = self.span.clone(); - - let inner = self.inner.clone(); - inner.remaining_tasks.fetch_add(1, Ordering::Relaxed); - self.handle.spawn(async move { - let result = AssertUnwindSafe( - async move { - if let Some(turbo_tasks) = turbo_tasks { - // Ensure that the turbo tasks context is maintained across the task. - turbo_tasks_future_scope(turbo_tasks, f).await; - } else { - // If no turbo tasks context is available, just run the future. - f.await; - } - } - .instrument(span), - ) - .catch_unwind() - .await; - let panic = result.err().map(|e| (e, index)); - inner.on_task_finished(panic); + let handle = self.handle.clone(); + let turbo_tasks = self.turbo_tasks.clone(); + self.scope.spawn(|scope| { + let _span = span.clone().entered(); + let _guard = handle.enter(); + turbo_tasks_scope(turbo_tasks.clone(), || { + body(&Scope { + scope, + span, + handle, + turbo_tasks, + }) + }) }); } } -impl<'scope, 'env: 'scope, R: Send + 'env> Drop for Scope<'scope, 'env, R> { - fn drop(&mut self) { - self.inner.wait(); - } -} - -/// Helper method to spawn tasks in parallel, ensuring that all tasks are awaited and errors are -/// handled. Also ensures turbo tasks and tracing context are maintained across the tasks. -/// -/// Be aware that although this function avoids starving other independently spawned tasks, any -/// other code running concurrently in the same task will be suspended during the call to -/// block_in_place. This can happen e.g. when using the `join!` macro. To avoid this issue, call -/// `scope_and_block` in `spawn_blocking`. -pub fn scope_and_block<'env, F, R>(number_of_tasks: usize, f: F) -> impl Iterator +/// A wrapper around [`rayon::in_place_scope`] that preserves the [`turbo_tasks_scope`]. +pub fn scope<'scope, Op, R>(op: Op) -> R where - R: Send + 'env, - F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, R>) + 'env, + Op: FnOnce(&Scope<'scope, '_>) -> R, { - block_in_place(|| { - let mut results = Vec::with_capacity(number_of_tasks); - for _ in 0..number_of_tasks { - results.push(Mutex::new(None)); - } - let results = results.into_boxed_slice(); - let result = { - // SAFETY: We drop the Scope later. - let scope = unsafe { Scope::new(&results) }; - catch_unwind(AssertUnwindSafe(|| f(&scope))) - }; - if let Err(panic) = result { - panic::resume_unwind(panic); - } - results.into_iter().map(|mutex| { - mutex - .into_inner() - .expect("All values are set when the scope returns without panic") + let span = tracing::Span::current(); + let handle = tokio::runtime::Handle::current(); + let turbo_tasks = turbo_tasks(); + rayon::in_place_scope(|scope| { + op(&Scope { + scope, + span, + handle, + turbo_tasks, }) }) } - -#[cfg(test)] -mod tests { - use std::panic::{AssertUnwindSafe, catch_unwind}; - - use super::*; - - #[tokio::test(flavor = "multi_thread")] - async fn test_scope() { - let results = scope_and_block(1000, |scope| { - for i in 0..1000 { - scope.spawn(async move { i }); - } - }); - results.enumerate().for_each(|(i, result)| { - assert_eq!(result, i); - }); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_empty_scope() { - let results = scope_and_block(0, |scope| { - if false { - scope.spawn(async move { 42 }); - } - }); - assert_eq!(results.count(), 0); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_single_task() { - let results = scope_and_block(1, |scope| { - scope.spawn(async move { 42 }); - }) - .collect::>(); - assert_eq!(results, vec![42]); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_task_finish_before_scope() { - let results = scope_and_block(1, |scope| { - scope.spawn(async move { 42 }); - thread::sleep(std::time::Duration::from_millis(100)); - }) - .collect::>(); - assert_eq!(results, vec![42]); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_task_finish_after_scope() { - let results = scope_and_block(1, |scope| { - scope.spawn(async move { - thread::sleep(std::time::Duration::from_millis(100)); - 42 - }); - }) - .collect::>(); - assert_eq!(results, vec![42]); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_panic_in_scope_factory() { - let result = catch_unwind(AssertUnwindSafe(|| { - let _results = scope_and_block(1000, |scope| { - for i in 0..500 { - scope.spawn(async move { i }); - } - panic!("Intentional panic"); - }); - unreachable!(); - })); - assert!(result.is_err()); - assert_eq!( - result.unwrap_err().downcast_ref::<&str>(), - Some(&"Intentional panic") - ); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_panic_in_scope_task() { - let result = catch_unwind(AssertUnwindSafe(|| { - let _results = scope_and_block(1000, |scope| { - for i in 0..1000 { - scope.spawn(async move { - if i == 500 { - panic!("Intentional panic"); - } else if i == 501 { - panic!("Wrong intentional panic"); - } else { - i - } - }); - } - }); - unreachable!(); - })); - assert!(result.is_err()); - assert_eq!( - result.unwrap_err().downcast_ref::<&str>(), - Some(&"Intentional panic") - ); - } -} diff --git a/turbopack/crates/turbo-tasks/src/util.rs b/turbopack/crates/turbo-tasks/src/util.rs index ecc57de367173..bb39dff895224 100644 --- a/turbopack/crates/turbo-tasks/src/util.rs +++ b/turbopack/crates/turbo-tasks/src/util.rs @@ -1,10 +1,8 @@ use std::{ - cell::SyncUnsafeCell, error::Error as StdError, fmt::{Debug, Display}, future::Future, hash::{Hash, Hasher}, - mem::ManuallyDrop, ops::Deref, pin::Pin, sync::Arc, @@ -261,124 +259,3 @@ impl Fn(Pin<&mut F>, &mut Context<'a>) -> Poll> (this.wrapper)(this.future, cx) } } - -/// Similar to slice::chunks but for owned data. Chunks are Send and Sync to allow to use it for -/// parallelism. -pub fn into_chunks(data: Vec, chunk_size: usize) -> IntoChunks { - let (ptr, length, capacity) = data.into_raw_parts(); - // SAFETY: changing a pointer from T to SyncUnsafeCell> is safe as both types - // have repr(transparent). - let ptr = ptr as *mut SyncUnsafeCell>; - // SAFETY: The ptr, length and capacity were from into_raw_parts(). This is the only place where - // we use ptr. - let data = - unsafe { Vec::>>::from_raw_parts(ptr, length, capacity) }; - IntoChunks { - data: Arc::new(data), - index: 0, - chunk_size, - } -} - -pub struct IntoChunks { - data: Arc>>>, - index: usize, - chunk_size: usize, -} - -impl Iterator for IntoChunks { - type Item = Chunk; - - fn next(&mut self) -> Option { - if self.index < self.data.len() { - let end = self.data.len().min(self.index + self.chunk_size); - let item = Chunk { - data: Arc::clone(&self.data), - index: self.index, - end, - }; - self.index = end; - Some(item) - } else { - None - } - } -} - -impl IntoChunks { - fn next_item(&mut self) -> Option { - if self.index < self.data.len() { - // SAFETY: We are the only owner of this chunk of data and we make sure that this item - // is no longer dropped by moving the index - let item = unsafe { ManuallyDrop::take(&mut *self.data[self.index].get()) }; - self.index += 1; - Some(item) - } else { - None - } - } -} - -impl Drop for IntoChunks { - fn drop(&mut self) { - // To avoid leaking memory we need to drop the remaining items - while self.next_item().is_some() {} - } -} - -pub struct Chunk { - data: Arc>>>, - index: usize, - end: usize, -} - -impl Iterator for Chunk { - type Item = T; - - fn next(&mut self) -> Option { - if self.index < self.end { - // SAFETY: We are the only owner of this chunk of data and we make sure that this item - // is no longer dropped by moving the index - let item = unsafe { ManuallyDrop::take(&mut *self.data[self.index].get()) }; - self.index += 1; - Some(item) - } else { - None - } - } -} - -impl Drop for Chunk { - fn drop(&mut self) { - // To avoid leaking memory we need to drop the remaining items - while self.next().is_some() {} - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_chunk_iterator() { - let data = [(); 10] - .into_iter() - .enumerate() - .map(|(i, _)| Arc::new(i)) - .collect::>(); - let mut chunks = into_chunks(data.clone(), 3); - let mut first_chunk = chunks.next().unwrap(); - let second_chunk = chunks.next().unwrap(); - drop(chunks); - assert_eq!( - second_chunk.into_iter().map(|a| *a).collect::>(), - vec![3, 4, 5] - ); - assert_eq!(*first_chunk.next().unwrap(), 0); - assert_eq!(*first_chunk.next().unwrap(), 1); - drop(first_chunk); - for arc in data { - assert_eq!(Arc::strong_count(&arc), 1); - } - } -} diff --git a/turbopack/crates/turbopack/tests/node-file-trace.rs b/turbopack/crates/turbopack/tests/node-file-trace.rs index 8f4b0aa3d00be..4670a3905dddb 100644 --- a/turbopack/crates/turbopack/tests/node-file-trace.rs +++ b/turbopack/crates/turbopack/tests/node-file-trace.rs @@ -272,7 +272,7 @@ fn test_cases() {} #[apply(test_cases)] fn node_file_trace_noop_backing_storage(#[case] input: CaseInput) { - node_file_trace(input, "noop_backing_storage", 1, 120, |_| { + node_file_trace(input, "noop_backing_storage", false, 1, 120, |_| { TurboTasks::new(TurboTasksBackend::new( turbo_tasks_backend::BackendOptions::default(), turbo_tasks_backend::noop_backing_storage(), @@ -282,7 +282,7 @@ fn node_file_trace_noop_backing_storage(#[case] input: CaseInput) { #[apply(test_cases)] fn node_file_trace_persistent(#[case] input: CaseInput) { - node_file_trace(input, "persistent_cache", 2, 240, |directory_path| { + node_file_trace(input, "persistent_cache", false, 2, 240, |directory_path| { TurboTasks::new(TurboTasksBackend::new( turbo_tasks_backend::BackendOptions::default(), turbo_tasks_backend::default_backing_storage( @@ -302,18 +302,31 @@ fn node_file_trace_persistent(#[case] input: CaseInput) { #[cfg(feature = "bench_against_node_nft")] #[apply(test_cases)] -fn bench_against_node_nft(#[case] input: CaseInput) { - bench_against_node_nft_inner(input); +fn bench_against_node_nft_st(#[case] input: CaseInput) { + bench_against_node_nft_inner(input, false); } #[cfg(feature = "bench_against_node_nft")] -fn bench_against_node_nft_inner(input: CaseInput) { - node_file_trace(input, "noop_backing_storage", 1, 120, |_| { - TurboTasks::new(TurboTasksBackend::new( - turbo_tasks_backend::BackendOptions::default(), - turbo_tasks_backend::noop_backing_storage(), - )) - }); +#[apply(test_cases)] +fn bench_against_node_nft_mt(#[case] input: CaseInput) { + bench_against_node_nft_inner(input, true); +} + +#[cfg(feature = "bench_against_node_nft")] +fn bench_against_node_nft_inner(input: CaseInput, multi_threaded: bool) { + node_file_trace( + input, + "noop_backing_storage", + multi_threaded, + 1, + 120, + |_| { + TurboTasks::new(TurboTasksBackend::new( + turbo_tasks_backend::BackendOptions::default(), + turbo_tasks_backend::noop_backing_storage(), + )) + }, + ); } #[turbo_tasks::function(operation)] @@ -388,6 +401,7 @@ fn node_file_trace( expected_stderr, }: CaseInput, mode: &str, + multi_threaded: bool, run_count: i32, timeout_len: u64, create_turbo_tasks: impl Fn(&Path) -> Arc>, @@ -396,9 +410,15 @@ fn node_file_trace( LazyLock::new(|| Arc::new(Mutex::new(Vec::new()))); let r = &mut { - let mut builder = tokio::runtime::Builder::new_multi_thread(); + let mut builder = if multi_threaded { + tokio::runtime::Builder::new_multi_thread() + } else { + tokio::runtime::Builder::new_current_thread() + }; builder.enable_all(); - builder.max_blocking_threads(20); + if !multi_threaded { + builder.max_blocking_threads(20); + } builder.build().unwrap() }; r.block_on(async move { @@ -470,7 +490,12 @@ fn node_file_trace( bench_suites_lock.push(BenchSuite { suite: input .trim_start_matches("node-file-trace/integration/") - .to_string(), + .to_string() + + (if multi_threaded { + " (multi-threaded)" + } else { + "" + }), is_faster, rust_duration, node_duration,