Skip to content

Commit e21ac64

Browse files
authored
Turbopack: use parallel execution helpers (#82667)
<!-- Thanks for opening a PR! Your contribution is much appreciated. To make sure your PR is handled as smoothly as possible we request that you follow the checklist sections below. Choose the right checklist for the change(s) that you're making: ## For Contributors ### Improving Documentation - Run `pnpm prettier-fix` to fix formatting issues before opening the PR. - Read the Docs Contribution Guide to ensure your contribution follows the docs guidelines: https://nextjs.org/docs/community/contribution-guide ### Adding or Updating Examples - The "examples guidelines" are followed from our contributing doc https://github.com/vercel/next.js/blob/canary/contributing/examples/adding-examples.md - Make sure the linting passes by running `pnpm build && pnpm lint`. See https://github.com/vercel/next.js/blob/canary/contributing/repository/linting.md ### Fixing a bug - Related issues linked using `fixes #number` - Tests added. See: https://github.com/vercel/next.js/blob/canary/contributing/core/testing.md#writing-tests-for-nextjs - Errors have a helpful link attached, see https://github.com/vercel/next.js/blob/canary/contributing.md ### Adding a feature - Implements an existing feature request or RFC. Make sure the feature request has been accepted for implementation before opening a PR. (A discussion must be opened, see https://github.com/vercel/next.js/discussions/new?category=ideas) - Related issues/discussions are linked using `fixes #number` - e2e tests added (https://github.com/vercel/next.js/blob/canary/contributing/core/testing.md#writing-tests-for-nextjs) - Documentation added - Telemetry added. In case of a feature if it's used or not. - Errors have a helpful link attached, see https://github.com/vercel/next.js/blob/canary/contributing.md ## For Maintainers - Minimal description (aim for explaining to someone not on the team to understand the PR) - When linking to a Slack thread, you might want to share details of the conclusion - Link both the Linear (Fixes NEXT-xxx) and the GitHub issues - Add review comments if necessary to explain to the reviewer the logic behind a change ### What? ### Why? ### How? Closes NEXT- Fixes # -->
1 parent 532018c commit e21ac64

File tree

7 files changed

+111
-159
lines changed

7 files changed

+111
-159
lines changed

Cargo.lock

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

turbopack/crates/turbo-tasks-backend/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ once_cell = { workspace = true }
4040
parking_lot = { workspace = true }
4141
pot = "3.0.0"
4242
rand = { workspace = true }
43-
rayon = { workspace = true }
4443
ringmap = { workspace = true, features = ["serde"] }
4544
rustc-hash = { workspace = true }
4645
serde = { workspace = true }

turbopack/crates/turbo-tasks-backend/src/backend/storage.rs

Lines changed: 33 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@ use std::{
66
};
77

88
use bitfield::bitfield;
9-
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
109
use smallvec::SmallVec;
11-
use turbo_tasks::{FxDashMap, TaskId};
10+
use turbo_tasks::{FxDashMap, TaskId, parallel};
1211

1312
use crate::{
1413
backend::dynamic_storage::DynamicStorage,
@@ -664,48 +663,43 @@ impl Storage {
664663

665664
// The number of shards is much larger than the number of threads, so the effect of the
666665
// locks held is negligible.
667-
self.modified
668-
.shards()
669-
.par_iter()
670-
.with_max_len(1)
671-
.map(|shard| {
672-
let mut direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)> = Vec::new();
673-
let mut modified: SmallVec<[TaskId; 4]> = SmallVec::new();
674-
{
675-
// Take the snapshots from the modified map
676-
let guard = shard.write();
677-
// Safety: guard must outlive the iterator.
678-
for bucket in unsafe { guard.iter() } {
679-
// Safety: the guard guarantees that the bucket is not removed and the ptr
680-
// is valid.
681-
let (key, shared_value) = unsafe { bucket.as_mut() };
682-
let modified_state = shared_value.get_mut();
683-
match modified_state {
684-
ModifiedState::Modified => {
685-
modified.push(*key);
686-
}
687-
ModifiedState::Snapshot(snapshot) => {
688-
if let Some(snapshot) = snapshot.take() {
689-
direct_snapshots.push((*key, snapshot));
690-
}
666+
parallel::map_collect::<_, _, Vec<_>>(self.modified.shards(), |shard| {
667+
let mut direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)> = Vec::new();
668+
let mut modified: SmallVec<[TaskId; 4]> = SmallVec::new();
669+
{
670+
// Take the snapshots from the modified map
671+
let guard = shard.write();
672+
// Safety: guard must outlive the iterator.
673+
for bucket in unsafe { guard.iter() } {
674+
// Safety: the guard guarantees that the bucket is not removed and the ptr
675+
// is valid.
676+
let (key, shared_value) = unsafe { bucket.as_mut() };
677+
let modified_state = shared_value.get_mut();
678+
match modified_state {
679+
ModifiedState::Modified => {
680+
modified.push(*key);
681+
}
682+
ModifiedState::Snapshot(snapshot) => {
683+
if let Some(snapshot) = snapshot.take() {
684+
direct_snapshots.push((*key, snapshot));
691685
}
692686
}
693687
}
694-
// Safety: guard must outlive the iterator.
695-
drop(guard);
696688
}
689+
// Safety: guard must outlive the iterator.
690+
drop(guard);
691+
}
697692

698-
SnapshotShard {
699-
direct_snapshots,
700-
modified,
701-
storage: self,
702-
guard: Some(guard.clone()),
703-
process,
704-
preprocess,
705-
process_snapshot,
706-
}
707-
})
708-
.collect::<Vec<_>>()
693+
SnapshotShard {
694+
direct_snapshots,
695+
modified,
696+
storage: self,
697+
guard: Some(guard.clone()),
698+
process,
699+
preprocess,
700+
process_snapshot,
701+
}
702+
})
709703
}
710704

711705
/// Start snapshot mode.

turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs

Lines changed: 44 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,18 @@
11
use std::{
22
borrow::Borrow,
3-
cmp::max,
43
env,
54
path::PathBuf,
65
sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
76
};
87

98
use anyhow::{Context, Result, anyhow};
10-
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
119
use serde::{Deserialize, Serialize};
1210
use smallvec::SmallVec;
13-
use tracing::Span;
1411
use turbo_tasks::{
1512
SessionId, TaskId,
1613
backend::CachedTaskType,
1714
panic_hooks::{PanicHookGuard, register_panic_hook},
18-
turbo_tasks_scope,
15+
parallel,
1916
};
2017

2118
use crate::{
@@ -331,14 +328,15 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
331328
let _span = tracing::trace_span!("update task data").entered();
332329
process_task_data(snapshots, Some(batch))?;
333330
let span = tracing::trace_span!("flush task data").entered();
334-
[KeySpace::TaskMeta, KeySpace::TaskData]
335-
.into_par_iter()
336-
.try_for_each(|key_space| {
331+
parallel::try_for_each(
332+
&[KeySpace::TaskMeta, KeySpace::TaskData],
333+
|&key_space| {
337334
let _span = span.clone().entered();
338335
// Safety: We already finished all processing of the task data and task
339336
// meta
340337
unsafe { batch.flush(key_space) }
341-
})?;
338+
},
339+
)?;
342340
}
343341

344342
let mut next_task_id = get_next_free_task_id::<
@@ -352,10 +350,9 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
352350
items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
353351
)
354352
.entered();
355-
let result = task_cache_updates
356-
.into_par_iter()
357-
.with_max_len(1)
358-
.map(|updates| {
353+
let result = parallel::map_collect_owned::<_, _, Result<Vec<_>>>(
354+
task_cache_updates,
355+
|updates| {
359356
let _span = _span.clone().entered();
360357
let mut max_task_id = 0;
361358

@@ -390,15 +387,11 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
390387
}
391388

392389
Ok(max_task_id)
393-
})
394-
.reduce(
395-
|| Ok(0),
396-
|a, b| -> anyhow::Result<_> {
397-
let a_max = a?;
398-
let b_max = b?;
399-
Ok(max(a_max, b_max))
400-
},
401-
)?;
390+
},
391+
)?
392+
.into_iter()
393+
.max()
394+
.unwrap_or(0);
402395
next_task_id = next_task_id.max(result);
403396
}
404397

@@ -697,48 +690,38 @@ where
697690
> + Send
698691
+ Sync,
699692
{
700-
let span = Span::current();
701-
let turbo_tasks = turbo_tasks::turbo_tasks();
702-
let handle = tokio::runtime::Handle::current();
703-
tasks
704-
.into_par_iter()
705-
.map(|tasks| {
706-
let _span = span.clone().entered();
707-
let _guard = handle.clone().enter();
708-
turbo_tasks_scope(turbo_tasks.clone(), || {
709-
let mut result = Vec::new();
710-
for (task_id, meta, data) in tasks {
711-
if let Some(batch) = batch {
712-
let key = IntKey::new(*task_id);
713-
let key = key.as_ref();
714-
if let Some(meta) = meta {
715-
batch.put(
716-
KeySpace::TaskMeta,
717-
WriteBuffer::Borrowed(key),
718-
WriteBuffer::SmallVec(meta),
719-
)?;
720-
}
721-
if let Some(data) = data {
722-
batch.put(
723-
KeySpace::TaskData,
724-
WriteBuffer::Borrowed(key),
725-
WriteBuffer::SmallVec(data),
726-
)?;
727-
}
728-
} else {
729-
// Store the new task data
730-
result.push((
731-
task_id,
732-
meta.map(WriteBuffer::SmallVec),
733-
data.map(WriteBuffer::SmallVec),
734-
));
735-
}
693+
parallel::map_collect_owned::<_, _, Result<Vec<_>>>(tasks, |tasks| {
694+
let mut result = Vec::new();
695+
for (task_id, meta, data) in tasks {
696+
if let Some(batch) = batch {
697+
let key = IntKey::new(*task_id);
698+
let key = key.as_ref();
699+
if let Some(meta) = meta {
700+
batch.put(
701+
KeySpace::TaskMeta,
702+
WriteBuffer::Borrowed(key),
703+
WriteBuffer::SmallVec(meta),
704+
)?;
705+
}
706+
if let Some(data) = data {
707+
batch.put(
708+
KeySpace::TaskData,
709+
WriteBuffer::Borrowed(key),
710+
WriteBuffer::SmallVec(data),
711+
)?;
736712
}
713+
} else {
714+
// Store the new task data
715+
result.push((
716+
task_id,
717+
meta.map(WriteBuffer::SmallVec),
718+
data.map(WriteBuffer::SmallVec),
719+
));
720+
}
721+
}
737722

738-
Ok(result)
739-
})
740-
})
741-
.collect::<Result<Vec<_>>>()
723+
Ok(result)
724+
})
742725
}
743726

744727
fn serialize(task: TaskId, data: &Vec<CachedDataItem>) -> Result<SmallVec<[u8; 16]>> {

turbopack/crates/turbo-tasks-fs/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ jsonc-parser = { version = "0.26.3", features = ["serde"] }
3737
mime = { workspace = true }
3838
notify = { workspace = true }
3939
parking_lot = { workspace = true }
40-
rayon = { workspace = true }
4140
regex = { workspace = true }
4241
rustc-hash = { workspace = true }
4342
serde = { workspace = true, features = ["rc"] }

turbopack/crates/turbo-tasks-fs/src/lib.rs

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ use dunce::simplified;
4646
use indexmap::IndexSet;
4747
use jsonc_parser::{ParseOptions, parse_to_serde_value};
4848
use mime::Mime;
49-
use rayon::iter::{IntoParallelIterator, ParallelIterator};
5049
use rustc_hash::FxHashSet;
5150
use serde::{Deserialize, Serialize};
5251
use serde_json::Value;
@@ -56,7 +55,7 @@ use turbo_rcstr::{RcStr, rcstr};
5655
use turbo_tasks::{
5756
ApplyEffectsContext, Completion, InvalidationReason, Invalidator, NonLocalValue, ReadRef,
5857
ResolvedVc, TaskInput, ValueToString, Vc, debug::ValueDebugFormat, effect,
59-
mark_session_dependent, mark_stateful, trace::TraceRawVcs,
58+
mark_session_dependent, mark_stateful, parallel, trace::TraceRawVcs,
6059
};
6160
use turbo_tasks_hash::{DeterministicHash, DeterministicHasher, hash_xxh3_hash64};
6261
use turbo_unix_path::{
@@ -309,19 +308,14 @@ impl DiskFileSystemInner {
309308

310309
fn invalidate(&self) {
311310
let _span = tracing::info_span!("invalidate filesystem", name = &*self.root).entered();
312-
let span = tracing::Span::current();
313-
let handle = tokio::runtime::Handle::current();
314311
let invalidator_map = take(&mut *self.invalidator_map.lock().unwrap());
315312
let dir_invalidator_map = take(&mut *self.dir_invalidator_map.lock().unwrap());
316-
let iter = invalidator_map
317-
.into_par_iter()
318-
.chain(dir_invalidator_map.into_par_iter())
319-
.flat_map(|(_, invalidators)| invalidators.into_par_iter());
320-
iter.for_each(|(i, _)| {
321-
let _span = span.clone().entered();
322-
let _guard = handle.enter();
323-
i.invalidate()
324-
});
313+
let invalidators = invalidator_map
314+
.into_iter()
315+
.chain(dir_invalidator_map)
316+
.flat_map(|(_, invalidators)| invalidators.into_keys())
317+
.collect::<Vec<_>>();
318+
parallel::for_each_owned(invalidators, |invalidator| invalidator.invalidate());
325319
}
326320

327321
/// Invalidates every tracked file in the filesystem.
@@ -332,23 +326,19 @@ impl DiskFileSystemInner {
332326
reason: impl Fn(&Path) -> R + Sync,
333327
) {
334328
let _span = tracing::info_span!("invalidate filesystem", name = &*self.root).entered();
335-
let span = tracing::Span::current();
336-
let handle = tokio::runtime::Handle::current();
337329
let invalidator_map = take(&mut *self.invalidator_map.lock().unwrap());
338330
let dir_invalidator_map = take(&mut *self.dir_invalidator_map.lock().unwrap());
339-
let iter = invalidator_map
340-
.into_par_iter()
341-
.chain(dir_invalidator_map.into_par_iter())
331+
let invalidators = invalidator_map
332+
.into_iter()
333+
.chain(dir_invalidator_map)
342334
.flat_map(|(path, invalidators)| {
343-
let _span = span.clone().entered();
344335
let reason_for_path = reason(&path);
345336
invalidators
346-
.into_par_iter()
337+
.into_keys()
347338
.map(move |i| (reason_for_path.clone(), i))
348-
});
349-
iter.for_each(|(reason, (invalidator, _))| {
350-
let _span = span.clone().entered();
351-
let _guard = handle.enter();
339+
})
340+
.collect::<Vec<_>>();
341+
parallel::for_each_owned(invalidators, |(reason, invalidator)| {
352342
invalidator.invalidate_with_reason(reason)
353343
});
354344
}

0 commit comments

Comments
 (0)