Skip to content

Commit 602fa70

Browse files
committed
Turbopack: use parallel execution helpers
1 parent b7da0c5 commit 602fa70

File tree

7 files changed

+111
-159
lines changed

7 files changed

+111
-159
lines changed

Cargo.lock

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

turbopack/crates/turbo-tasks-backend/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ once_cell = { workspace = true }
4040
parking_lot = { workspace = true }
4141
pot = "3.0.0"
4242
rand = { workspace = true }
43-
rayon = { workspace = true }
4443
ringmap = { workspace = true, features = ["serde"] }
4544
rustc-hash = { workspace = true }
4645
serde = { workspace = true }

turbopack/crates/turbo-tasks-backend/src/backend/storage.rs

Lines changed: 33 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@ use std::{
66
};
77

88
use bitfield::bitfield;
9-
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
109
use smallvec::SmallVec;
11-
use turbo_tasks::{FxDashMap, TaskId};
10+
use turbo_tasks::{FxDashMap, TaskId, parallel};
1211

1312
use crate::{
1413
backend::dynamic_storage::DynamicStorage,
@@ -664,48 +663,43 @@ impl Storage {
664663

665664
// The number of shards is much larger than the number of threads, so the effect of the
666665
// locks held is negligible.
667-
self.modified
668-
.shards()
669-
.par_iter()
670-
.with_max_len(1)
671-
.map(|shard| {
672-
let mut direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)> = Vec::new();
673-
let mut modified: SmallVec<[TaskId; 4]> = SmallVec::new();
674-
{
675-
// Take the snapshots from the modified map
676-
let guard = shard.write();
677-
// Safety: guard must outlive the iterator.
678-
for bucket in unsafe { guard.iter() } {
679-
// Safety: the guard guarantees that the bucket is not removed and the ptr
680-
// is valid.
681-
let (key, shared_value) = unsafe { bucket.as_mut() };
682-
let modified_state = shared_value.get_mut();
683-
match modified_state {
684-
ModifiedState::Modified => {
685-
modified.push(*key);
686-
}
687-
ModifiedState::Snapshot(snapshot) => {
688-
if let Some(snapshot) = snapshot.take() {
689-
direct_snapshots.push((*key, snapshot));
690-
}
666+
parallel::map_collect::<_, _, Vec<_>>(self.modified.shards(), |shard| {
667+
let mut direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)> = Vec::new();
668+
let mut modified: SmallVec<[TaskId; 4]> = SmallVec::new();
669+
{
670+
// Take the snapshots from the modified map
671+
let guard = shard.write();
672+
// Safety: guard must outlive the iterator.
673+
for bucket in unsafe { guard.iter() } {
674+
// Safety: the guard guarantees that the bucket is not removed and the ptr
675+
// is valid.
676+
let (key, shared_value) = unsafe { bucket.as_mut() };
677+
let modified_state = shared_value.get_mut();
678+
match modified_state {
679+
ModifiedState::Modified => {
680+
modified.push(*key);
681+
}
682+
ModifiedState::Snapshot(snapshot) => {
683+
if let Some(snapshot) = snapshot.take() {
684+
direct_snapshots.push((*key, snapshot));
691685
}
692686
}
693687
}
694-
// Safety: guard must outlive the iterator.
695-
drop(guard);
696688
}
689+
// Safety: guard must outlive the iterator.
690+
drop(guard);
691+
}
697692

698-
SnapshotShard {
699-
direct_snapshots,
700-
modified,
701-
storage: self,
702-
guard: Some(guard.clone()),
703-
process,
704-
preprocess,
705-
process_snapshot,
706-
}
707-
})
708-
.collect::<Vec<_>>()
693+
SnapshotShard {
694+
direct_snapshots,
695+
modified,
696+
storage: self,
697+
guard: Some(guard.clone()),
698+
process,
699+
preprocess,
700+
process_snapshot,
701+
}
702+
})
709703
}
710704

711705
/// Start snapshot mode.

turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs

Lines changed: 44 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,18 @@
11
use std::{
22
borrow::Borrow,
3-
cmp::max,
43
env,
54
path::PathBuf,
65
sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
76
};
87

98
use anyhow::{Context, Result, anyhow};
10-
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
119
use serde::{Deserialize, Serialize};
1210
use smallvec::SmallVec;
13-
use tracing::Span;
1411
use turbo_tasks::{
1512
SessionId, TaskId,
1613
backend::CachedTaskType,
1714
panic_hooks::{PanicHookGuard, register_panic_hook},
18-
turbo_tasks_scope,
15+
parallel,
1916
};
2017

2118
use crate::{
@@ -331,14 +328,15 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
331328
let _span = tracing::trace_span!("update task data").entered();
332329
process_task_data(snapshots, Some(batch))?;
333330
let span = tracing::trace_span!("flush task data").entered();
334-
[KeySpace::TaskMeta, KeySpace::TaskData]
335-
.into_par_iter()
336-
.try_for_each(|key_space| {
331+
parallel::try_for_each(
332+
&[KeySpace::TaskMeta, KeySpace::TaskData],
333+
|&key_space| {
337334
let _span = span.clone().entered();
338335
// Safety: We already finished all processing of the task data and task
339336
// meta
340337
unsafe { batch.flush(key_space) }
341-
})?;
338+
},
339+
)?;
342340
}
343341

344342
let mut next_task_id = get_next_free_task_id::<
@@ -352,10 +350,9 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
352350
items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
353351
)
354352
.entered();
355-
let result = task_cache_updates
356-
.into_par_iter()
357-
.with_max_len(1)
358-
.map(|updates| {
353+
let result = parallel::map_collect_owned::<_, _, Result<Vec<_>>>(
354+
task_cache_updates,
355+
|updates| {
359356
let _span = _span.clone().entered();
360357
let mut max_task_id = 0;
361358

@@ -390,15 +387,11 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
390387
}
391388

392389
Ok(max_task_id)
393-
})
394-
.reduce(
395-
|| Ok(0),
396-
|a, b| -> anyhow::Result<_> {
397-
let a_max = a?;
398-
let b_max = b?;
399-
Ok(max(a_max, b_max))
400-
},
401-
)?;
390+
},
391+
)?
392+
.into_iter()
393+
.max()
394+
.unwrap_or(0);
402395
next_task_id = next_task_id.max(result);
403396
}
404397

@@ -697,48 +690,38 @@ where
697690
> + Send
698691
+ Sync,
699692
{
700-
let span = Span::current();
701-
let turbo_tasks = turbo_tasks::turbo_tasks();
702-
let handle = tokio::runtime::Handle::current();
703-
tasks
704-
.into_par_iter()
705-
.map(|tasks| {
706-
let _span = span.clone().entered();
707-
let _guard = handle.clone().enter();
708-
turbo_tasks_scope(turbo_tasks.clone(), || {
709-
let mut result = Vec::new();
710-
for (task_id, meta, data) in tasks {
711-
if let Some(batch) = batch {
712-
let key = IntKey::new(*task_id);
713-
let key = key.as_ref();
714-
if let Some(meta) = meta {
715-
batch.put(
716-
KeySpace::TaskMeta,
717-
WriteBuffer::Borrowed(key),
718-
WriteBuffer::SmallVec(meta),
719-
)?;
720-
}
721-
if let Some(data) = data {
722-
batch.put(
723-
KeySpace::TaskData,
724-
WriteBuffer::Borrowed(key),
725-
WriteBuffer::SmallVec(data),
726-
)?;
727-
}
728-
} else {
729-
// Store the new task data
730-
result.push((
731-
task_id,
732-
meta.map(WriteBuffer::SmallVec),
733-
data.map(WriteBuffer::SmallVec),
734-
));
735-
}
693+
parallel::map_collect_owned::<_, _, Result<Vec<_>>>(tasks, |tasks| {
694+
let mut result = Vec::new();
695+
for (task_id, meta, data) in tasks {
696+
if let Some(batch) = batch {
697+
let key = IntKey::new(*task_id);
698+
let key = key.as_ref();
699+
if let Some(meta) = meta {
700+
batch.put(
701+
KeySpace::TaskMeta,
702+
WriteBuffer::Borrowed(key),
703+
WriteBuffer::SmallVec(meta),
704+
)?;
705+
}
706+
if let Some(data) = data {
707+
batch.put(
708+
KeySpace::TaskData,
709+
WriteBuffer::Borrowed(key),
710+
WriteBuffer::SmallVec(data),
711+
)?;
736712
}
713+
} else {
714+
// Store the new task data
715+
result.push((
716+
task_id,
717+
meta.map(WriteBuffer::SmallVec),
718+
data.map(WriteBuffer::SmallVec),
719+
));
720+
}
721+
}
737722

738-
Ok(result)
739-
})
740-
})
741-
.collect::<Result<Vec<_>>>()
723+
Ok(result)
724+
})
742725
}
743726

744727
fn serialize(task: TaskId, data: &Vec<CachedDataItem>) -> Result<SmallVec<[u8; 16]>> {

turbopack/crates/turbo-tasks-fs/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ jsonc-parser = { version = "0.26.3", features = ["serde"] }
3737
mime = { workspace = true }
3838
notify = { workspace = true }
3939
parking_lot = { workspace = true }
40-
rayon = { workspace = true }
4140
regex = { workspace = true }
4241
rustc-hash = { workspace = true }
4342
serde = { workspace = true, features = ["rc"] }

turbopack/crates/turbo-tasks-fs/src/lib.rs

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ use dunce::simplified;
4646
use indexmap::IndexSet;
4747
use jsonc_parser::{ParseOptions, parse_to_serde_value};
4848
use mime::Mime;
49-
use rayon::iter::{IntoParallelIterator, ParallelIterator};
5049
use rustc_hash::FxHashSet;
5150
use serde::{Deserialize, Serialize};
5251
use serde_json::Value;
@@ -56,7 +55,7 @@ use turbo_rcstr::{RcStr, rcstr};
5655
use turbo_tasks::{
5756
ApplyEffectsContext, Completion, InvalidationReason, Invalidator, NonLocalValue, ReadRef,
5857
ResolvedVc, TaskInput, ValueToString, Vc, debug::ValueDebugFormat, effect,
59-
mark_session_dependent, mark_stateful, trace::TraceRawVcs,
58+
mark_session_dependent, mark_stateful, parallel, trace::TraceRawVcs,
6059
};
6160
use turbo_tasks_hash::{DeterministicHash, DeterministicHasher, hash_xxh3_hash64};
6261
use turbo_unix_path::{
@@ -309,19 +308,14 @@ impl DiskFileSystemInner {
309308

310309
fn invalidate(&self) {
311310
let _span = tracing::info_span!("invalidate filesystem", name = &*self.root).entered();
312-
let span = tracing::Span::current();
313-
let handle = tokio::runtime::Handle::current();
314311
let invalidator_map = take(&mut *self.invalidator_map.lock().unwrap());
315312
let dir_invalidator_map = take(&mut *self.dir_invalidator_map.lock().unwrap());
316-
let iter = invalidator_map
317-
.into_par_iter()
318-
.chain(dir_invalidator_map.into_par_iter())
319-
.flat_map(|(_, invalidators)| invalidators.into_par_iter());
320-
iter.for_each(|(i, _)| {
321-
let _span = span.clone().entered();
322-
let _guard = handle.enter();
323-
i.invalidate()
324-
});
313+
let invalidators = invalidator_map
314+
.into_iter()
315+
.chain(dir_invalidator_map)
316+
.flat_map(|(_, invalidators)| invalidators.into_keys())
317+
.collect::<Vec<_>>();
318+
parallel::for_each_owned(invalidators, |invalidator| invalidator.invalidate());
325319
}
326320

327321
/// Invalidates every tracked file in the filesystem.
@@ -332,23 +326,19 @@ impl DiskFileSystemInner {
332326
reason: impl Fn(&Path) -> R + Sync,
333327
) {
334328
let _span = tracing::info_span!("invalidate filesystem", name = &*self.root).entered();
335-
let span = tracing::Span::current();
336-
let handle = tokio::runtime::Handle::current();
337329
let invalidator_map = take(&mut *self.invalidator_map.lock().unwrap());
338330
let dir_invalidator_map = take(&mut *self.dir_invalidator_map.lock().unwrap());
339-
let iter = invalidator_map
340-
.into_par_iter()
341-
.chain(dir_invalidator_map.into_par_iter())
331+
let invalidators = invalidator_map
332+
.into_iter()
333+
.chain(dir_invalidator_map)
342334
.flat_map(|(path, invalidators)| {
343-
let _span = span.clone().entered();
344335
let reason_for_path = reason(&path);
345336
invalidators
346-
.into_par_iter()
337+
.into_keys()
347338
.map(move |i| (reason_for_path.clone(), i))
348-
});
349-
iter.for_each(|(reason, (invalidator, _))| {
350-
let _span = span.clone().entered();
351-
let _guard = handle.enter();
339+
})
340+
.collect::<Vec<_>>();
341+
parallel::for_each_owned(invalidators, |(reason, invalidator)| {
352342
invalidator.invalidate_with_reason(reason)
353343
});
354344
}

0 commit comments

Comments
 (0)