Skip to content

Commit 8491337

Browse files
committed
Turbopack: use parallel execution helpers
1 parent 7d130db commit 8491337

File tree

7 files changed

+161
-215
lines changed

7 files changed

+161
-215
lines changed

Cargo.lock

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

turbopack/crates/turbo-tasks-backend/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ once_cell = { workspace = true }
4040
parking_lot = { workspace = true }
4141
pot = "3.0.0"
4242
rand = { workspace = true }
43-
rayon = { workspace = true }
4443
ringmap = { workspace = true, features = ["serde"] }
4544
rustc-hash = { workspace = true }
4645
serde = { workspace = true }

turbopack/crates/turbo-tasks-backend/src/backend/storage.rs

Lines changed: 33 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@ use std::{
66
};
77

88
use bitfield::bitfield;
9-
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
109
use smallvec::SmallVec;
11-
use turbo_tasks::{FxDashMap, TaskId};
10+
use turbo_tasks::{FxDashMap, TaskId, parallel};
1211

1312
use crate::{
1413
backend::dynamic_storage::DynamicStorage,
@@ -664,48 +663,43 @@ impl Storage {
664663

665664
// The number of shards is much larger than the number of threads, so the effect of the
666665
// locks held is negligible.
667-
self.modified
668-
.shards()
669-
.par_iter()
670-
.with_max_len(1)
671-
.map(|shard| {
672-
let mut direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)> = Vec::new();
673-
let mut modified: SmallVec<[TaskId; 4]> = SmallVec::new();
674-
{
675-
// Take the snapshots from the modified map
676-
let guard = shard.write();
677-
// Safety: guard must outlive the iterator.
678-
for bucket in unsafe { guard.iter() } {
679-
// Safety: the guard guarantees that the bucket is not removed and the ptr
680-
// is valid.
681-
let (key, shared_value) = unsafe { bucket.as_mut() };
682-
let modified_state = shared_value.get_mut();
683-
match modified_state {
684-
ModifiedState::Modified => {
685-
modified.push(*key);
686-
}
687-
ModifiedState::Snapshot(snapshot) => {
688-
if let Some(snapshot) = snapshot.take() {
689-
direct_snapshots.push((*key, snapshot));
690-
}
666+
parallel::map_collect::<_, _, Vec<_>>(self.modified.shards(), |shard| {
667+
let mut direct_snapshots: Vec<(TaskId, Box<InnerStorageSnapshot>)> = Vec::new();
668+
let mut modified: SmallVec<[TaskId; 4]> = SmallVec::new();
669+
{
670+
// Take the snapshots from the modified map
671+
let guard = shard.write();
672+
// Safety: guard must outlive the iterator.
673+
for bucket in unsafe { guard.iter() } {
674+
// Safety: the guard guarantees that the bucket is not removed and the ptr
675+
// is valid.
676+
let (key, shared_value) = unsafe { bucket.as_mut() };
677+
let modified_state = shared_value.get_mut();
678+
match modified_state {
679+
ModifiedState::Modified => {
680+
modified.push(*key);
681+
}
682+
ModifiedState::Snapshot(snapshot) => {
683+
if let Some(snapshot) = snapshot.take() {
684+
direct_snapshots.push((*key, snapshot));
691685
}
692686
}
693687
}
694-
// Safety: guard must outlive the iterator.
695-
drop(guard);
696688
}
689+
// Safety: guard must outlive the iterator.
690+
drop(guard);
691+
}
697692

698-
SnapshotShard {
699-
direct_snapshots,
700-
modified,
701-
storage: self,
702-
guard: Some(guard.clone()),
703-
process,
704-
preprocess,
705-
process_snapshot,
706-
}
707-
})
708-
.collect::<Vec<_>>()
693+
SnapshotShard {
694+
direct_snapshots,
695+
modified,
696+
storage: self,
697+
guard: Some(guard.clone()),
698+
process,
699+
preprocess,
700+
process_snapshot,
701+
}
702+
})
709703
}
710704

711705
/// Start snapshot mode.

turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs

Lines changed: 94 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,18 @@
11
use std::{
22
borrow::Borrow,
3-
cmp::max,
43
env,
54
path::PathBuf,
65
sync::{Arc, LazyLock, Mutex, PoisonError, Weak},
76
};
87

98
use anyhow::{Context, Result, anyhow};
10-
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
119
use serde::{Deserialize, Serialize};
1210
use smallvec::SmallVec;
13-
use tracing::Span;
1411
use turbo_tasks::{
1512
SessionId, TaskId,
1613
backend::CachedTaskType,
1714
panic_hooks::{PanicHookGuard, register_panic_hook},
18-
turbo_tasks_scope,
15+
parallel,
1916
};
2017

2118
use crate::{
@@ -331,14 +328,15 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
331328
let _span = tracing::trace_span!("update task data").entered();
332329
process_task_data(snapshots, Some(batch))?;
333330
let span = tracing::trace_span!("flush task data").entered();
334-
[KeySpace::TaskMeta, KeySpace::TaskData]
335-
.into_par_iter()
336-
.try_for_each(|key_space| {
331+
parallel::try_for_each(
332+
&[KeySpace::TaskMeta, KeySpace::TaskData],
333+
|&key_space| {
337334
let _span = span.clone().entered();
338335
// Safety: We already finished all processing of the task data and task
339336
// meta
340337
unsafe { batch.flush(key_space) }
341-
})?;
338+
},
339+
)?;
342340
}
343341

344342
let mut next_task_id = get_next_free_task_id::<
@@ -352,10 +350,9 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
352350
items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
353351
)
354352
.entered();
355-
let result = task_cache_updates
356-
.into_par_iter()
357-
.with_max_len(1)
358-
.map(|updates| {
353+
let result = parallel::map_collect_owned::<_, _, Result<Vec<_>>>(
354+
task_cache_updates,
355+
|updates| {
359356
let _span = _span.clone().entered();
360357
let mut max_task_id = 0;
361358

@@ -390,15 +387,11 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
390387
}
391388

392389
Ok(max_task_id)
393-
})
394-
.reduce(
395-
|| Ok(0),
396-
|a, b| -> anyhow::Result<_> {
397-
let a_max = a?;
398-
let b_max = b?;
399-
Ok(max(a_max, b_max))
400-
},
401-
)?;
390+
},
391+
)?
392+
.into_iter()
393+
.max()
394+
.unwrap_or(0);
402395
next_task_id = next_task_id.max(result);
403396
}
404397

@@ -410,64 +403,11 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
410403
)?;
411404
}
412405
WriteBatch::Serial(batch) => {
413-
let mut task_items_result = Ok(Vec::new());
414-
turbo_tasks::scope(|s| {
415-
s.spawn(|_| {
416-
task_items_result =
417-
process_task_data(snapshots, None::<&T::ConcurrentWriteBatch<'_>>);
418-
});
419-
420-
let mut next_task_id =
421-
get_next_free_task_id::<
422-
T::SerialWriteBatch<'_>,
423-
T::ConcurrentWriteBatch<'_>,
424-
>(&mut WriteBatchRef::serial(batch))?;
425-
426-
{
427-
let _span = tracing::trace_span!(
428-
"update task cache",
429-
items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
430-
)
431-
.entered();
432-
let mut task_type_bytes = Vec::new();
433-
for (task_type, task_id) in task_cache_updates.into_iter().flatten() {
434-
let task_id = *task_id;
435-
serialize_task_type(&task_type, &mut task_type_bytes, task_id)?;
436-
437-
batch
438-
.put(
439-
KeySpace::ForwardTaskCache,
440-
WriteBuffer::Borrowed(&task_type_bytes),
441-
WriteBuffer::Borrowed(&task_id.to_le_bytes()),
442-
)
443-
.with_context(|| {
444-
anyhow!("Unable to write task cache {task_type:?} => {task_id}")
445-
})?;
446-
batch
447-
.put(
448-
KeySpace::ReverseTaskCache,
449-
WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
450-
WriteBuffer::Borrowed(&task_type_bytes),
451-
)
452-
.with_context(|| {
453-
anyhow!("Unable to write task cache {task_id} => {task_type:?}")
454-
})?;
455-
next_task_id = next_task_id.max(task_id + 1);
456-
}
457-
}
458-
459-
save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
460-
&mut WriteBatchRef::serial(batch),
461-
next_task_id,
462-
session_id,
463-
operations,
464-
)?;
465-
anyhow::Ok(())
466-
})?;
467-
468406
{
469407
let _span = tracing::trace_span!("update tasks").entered();
470-
for (task_id, meta, data) in task_items_result?.into_iter().flatten() {
408+
let task_items =
409+
process_task_data(snapshots, None::<&T::ConcurrentWriteBatch<'_>>)?;
410+
for (task_id, meta, data) in task_items.into_iter().flatten() {
471411
let key = IntKey::new(*task_id);
472412
let key = key.as_ref();
473413
if let Some(meta) = meta {
@@ -485,7 +425,54 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
485425
})?;
486426
}
487427
}
428+
batch.flush(KeySpace::TaskMeta)?;
429+
batch.flush(KeySpace::TaskData)?;
430+
}
431+
432+
let mut next_task_id = get_next_free_task_id::<
433+
T::SerialWriteBatch<'_>,
434+
T::ConcurrentWriteBatch<'_>,
435+
>(&mut WriteBatchRef::serial(batch))?;
436+
437+
{
438+
let _span = tracing::trace_span!(
439+
"update task cache",
440+
items = task_cache_updates.iter().map(|m| m.len()).sum::<usize>()
441+
)
442+
.entered();
443+
let mut task_type_bytes = Vec::new();
444+
for (task_type, task_id) in task_cache_updates.into_iter().flatten() {
445+
let task_id = *task_id;
446+
serialize_task_type(&task_type, &mut task_type_bytes, task_id)?;
447+
448+
batch
449+
.put(
450+
KeySpace::ForwardTaskCache,
451+
WriteBuffer::Borrowed(&task_type_bytes),
452+
WriteBuffer::Borrowed(&task_id.to_le_bytes()),
453+
)
454+
.with_context(|| {
455+
anyhow!("Unable to write task cache {task_type:?} => {task_id}")
456+
})?;
457+
batch
458+
.put(
459+
KeySpace::ReverseTaskCache,
460+
WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()),
461+
WriteBuffer::Borrowed(&task_type_bytes),
462+
)
463+
.with_context(|| {
464+
anyhow!("Unable to write task cache {task_id} => {task_type:?}")
465+
})?;
466+
next_task_id = next_task_id.max(task_id + 1);
467+
}
488468
}
469+
470+
save_infra::<T::SerialWriteBatch<'_>, T::ConcurrentWriteBatch<'_>>(
471+
&mut WriteBatchRef::serial(batch),
472+
next_task_id,
473+
session_id,
474+
operations,
475+
)?;
489476
}
490477
}
491478

@@ -703,48 +690,38 @@ where
703690
> + Send
704691
+ Sync,
705692
{
706-
let span = Span::current();
707-
let turbo_tasks = turbo_tasks::turbo_tasks();
708-
let handle = tokio::runtime::Handle::current();
709-
tasks
710-
.into_par_iter()
711-
.map(|tasks| {
712-
let _span = span.clone().entered();
713-
let _guard = handle.clone().enter();
714-
turbo_tasks_scope(turbo_tasks.clone(), || {
715-
let mut result = Vec::new();
716-
for (task_id, meta, data) in tasks {
717-
if let Some(batch) = batch {
718-
let key = IntKey::new(*task_id);
719-
let key = key.as_ref();
720-
if let Some(meta) = meta {
721-
batch.put(
722-
KeySpace::TaskMeta,
723-
WriteBuffer::Borrowed(key),
724-
WriteBuffer::SmallVec(meta),
725-
)?;
726-
}
727-
if let Some(data) = data {
728-
batch.put(
729-
KeySpace::TaskData,
730-
WriteBuffer::Borrowed(key),
731-
WriteBuffer::SmallVec(data),
732-
)?;
733-
}
734-
} else {
735-
// Store the new task data
736-
result.push((
737-
task_id,
738-
meta.map(WriteBuffer::SmallVec),
739-
data.map(WriteBuffer::SmallVec),
740-
));
741-
}
693+
parallel::map_collect_owned::<_, _, Result<Vec<_>>>(tasks, |tasks| {
694+
let mut result = Vec::new();
695+
for (task_id, meta, data) in tasks {
696+
if let Some(batch) = batch {
697+
let key = IntKey::new(*task_id);
698+
let key = key.as_ref();
699+
if let Some(meta) = meta {
700+
batch.put(
701+
KeySpace::TaskMeta,
702+
WriteBuffer::Borrowed(key),
703+
WriteBuffer::SmallVec(meta),
704+
)?;
705+
}
706+
if let Some(data) = data {
707+
batch.put(
708+
KeySpace::TaskData,
709+
WriteBuffer::Borrowed(key),
710+
WriteBuffer::SmallVec(data),
711+
)?;
742712
}
713+
} else {
714+
// Store the new task data
715+
result.push((
716+
task_id,
717+
meta.map(WriteBuffer::SmallVec),
718+
data.map(WriteBuffer::SmallVec),
719+
));
720+
}
721+
}
743722

744-
Ok(result)
745-
})
746-
})
747-
.collect::<Result<Vec<_>>>()
723+
Ok(result)
724+
})
748725
}
749726

750727
fn serialize(task: TaskId, data: &Vec<CachedDataItem>) -> Result<SmallVec<[u8; 16]>> {

0 commit comments

Comments
 (0)