Skip to content

Commit

Permalink
Chunking Refactoring followup fixes (vercel/turborepo#6172)
Browse files Browse the repository at this point in the history
### Description

* fix double emitting problem
* some allocation, resolving, and tracing improvements

### Testing Instructions

<!--
  Give a quick description of steps to test your changes.
-->
  • Loading branch information
sokra authored Oct 13, 2023
1 parent 83b82ae commit ab2e77a
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 83 deletions.
14 changes: 9 additions & 5 deletions crates/turbo-tasks/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
fmt,
fmt::{Debug, Display, Write},
future::Future,
mem::take,
pin::Pin,
sync::Arc,
time::{Duration, Instant},
Expand Down Expand Up @@ -351,19 +352,22 @@ pub trait Backend: Sync + Send {
impl PersistentTaskType {
pub async fn run_resolve_native<B: Backend + 'static>(
fn_id: FunctionId,
inputs: Vec<ConcreteTaskInput>,
mut inputs: Vec<ConcreteTaskInput>,
turbo_tasks: Arc<dyn TurboTasksBackendApi<B>>,
) -> Result<RawVc> {
let span = tracing::trace_span!(
"turbo_tasks::resolve_call",
name = &registry::get_function(fn_id).name.as_str()
);
async move {
let mut resolved_inputs = Vec::with_capacity(inputs.len());
for input in inputs.into_iter() {
resolved_inputs.push(input.resolve().await?)
for i in 0..inputs.len() {
let input = unsafe { take(inputs.get_unchecked_mut(i)) };
let input = input.resolve().await?;
unsafe {
*inputs.get_unchecked_mut(i) = input;
}
}
Ok(turbo_tasks.native_call(fn_id, resolved_inputs))
Ok(turbo_tasks.native_call(fn_id, inputs))
}
.instrument(span)
.await
Expand Down
30 changes: 26 additions & 4 deletions crates/turbo-tasks/src/keyed_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,20 @@ struct KeyedCell {
cell_ref: CurrentCellRef,
}

impl KeyedCell {}

#[turbo_tasks::value_impl]
impl KeyedCell {
#[turbo_tasks::function]
fn new(_task: TaskId, _key: ConcreteTaskInput, value_type_id: ValueTypeId) -> Vc<Self> {
fn new_local(_task: TaskId, _key: ConcreteTaskInput, value_type_id: ValueTypeId) -> Vc<Self> {
let cell_ref = find_cell_by_type(value_type_id);
KeyedCell {
cell: cell_ref.into(),
cell_ref,
}
.cell()
}

#[turbo_tasks::function]
fn new_global(_key: ConcreteTaskInput, value_type_id: ValueTypeId) -> Vc<Self> {
let cell_ref = find_cell_by_type(value_type_id);
KeyedCell {
cell: cell_ref.into(),
Expand All @@ -44,7 +52,7 @@ pub async fn keyed_cell<T: PartialEq + Eq + VcValueType, K: TaskInput>(
key: K,
content: T,
) -> Result<Vc<T>> {
let cell = KeyedCell::new(
let cell = KeyedCell::new_local(
current_task("keyed_cell"),
key.into_concrete(),
T::get_value_type_id(),
Expand All @@ -53,3 +61,17 @@ pub async fn keyed_cell<T: PartialEq + Eq + VcValueType, K: TaskInput>(
cell.cell_ref.compare_and_update_shared(content);
Ok(cell.cell.into())
}

/// Cells a value in a cell with a given key. A key MUST only be used once for
/// the whole application.
///
/// This allows to create singleton Vcs for values while avoiding to pass the
/// whole value as argument and creating a large task key.
pub async fn global_keyed_cell<T: PartialEq + Eq + VcValueType, K: TaskInput>(
key: K,
content: T,
) -> Result<Vc<T>> {
let cell = KeyedCell::new_global(key.into_concrete(), T::get_value_type_id()).await?;
cell.cell_ref.compare_and_update_shared(content);
Ok(cell.cell.into())
}
2 changes: 1 addition & 1 deletion crates/turbo-tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub use invalidation::{
DynamicEqHash, InvalidationReason, InvalidationReasonKind, InvalidationReasonSet,
};
pub use join_iter_ext::{JoinIterExt, TryFlatJoinIterExt, TryJoinIterExt};
pub use keyed_cell::keyed_cell;
pub use keyed_cell::{global_keyed_cell, keyed_cell};
pub use manager::{
dynamic_call, emit, get_invalidator, mark_finished, mark_stateful, run_once,
run_once_with_reason, spawn_blocking, spawn_thread, trait_call, turbo_tasks, CurrentCellRef,
Expand Down
3 changes: 2 additions & 1 deletion crates/turbo-tasks/src/task/concrete_task_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ impl<'de> Deserialize<'de> for SharedValue {
/// converted back into the argument types. This is handled by the [`TaskInput`]
/// trait.
#[allow(clippy::derived_hash_with_manual_eq)]
#[derive(Debug, Hash, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[derive(Debug, Hash, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Default)]
pub enum ConcreteTaskInput {
TaskOutput(TaskId),
TaskCell(TaskId, CellId),
Expand All @@ -337,6 +337,7 @@ pub enum ConcreteTaskInput {
I32(i32),
U32(u32),
U64(u64),
#[default]
Nothing,
SharedValue(SharedValue),
TransientSharedValue(TransientSharedValue),
Expand Down
16 changes: 12 additions & 4 deletions crates/turbo-tasks/src/task/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,16 @@ macro_rules! task_fn_impl {
)*

Ok(Box::new(move || {
let span = macro_helpers::tracing::trace_span!("turbo_tasks::function", name);
let _entered = span.enter();
let task_fn = task_fn.clone();
$(
let $arg = $arg.clone();
)*

Box::pin(macro_helpers::tracing::Instrument::instrument(async move {
Output::try_into_raw_vc((task_fn)($($arg),*))
}, macro_helpers::tracing::trace_span!("turbo_tasks::function", name)))
}, span.clone()))
}))
}
}
Expand Down Expand Up @@ -172,6 +174,8 @@ macro_rules! task_fn_impl {
)*

Ok(Box::new(move || {
let span = macro_helpers::tracing::trace_span!("turbo_tasks::function", name);
let _entered = span.enter();
let task_fn = task_fn.clone();
$(
let $arg = $arg.clone();
Expand All @@ -181,7 +185,7 @@ macro_rules! task_fn_impl {
let result = Output::try_into_raw_vc((task_fn)($($arg),*).await);
macro_helpers::notify_scheduled_tasks();
result
}, macro_helpers::tracing::trace_span!("turbo_tasks::function", name)))
}, span.clone()))
}))
}
}
Expand Down Expand Up @@ -213,6 +217,8 @@ macro_rules! task_fn_impl {
)*

Ok(Box::new(move || {
let span = macro_helpers::tracing::trace_span!("turbo_tasks::function", name);
let _entered = span.enter();
let task_fn = task_fn.clone();
let recv = recv.clone();
$(
Expand All @@ -225,7 +231,7 @@ macro_rules! task_fn_impl {
let result = Output::try_into_raw_vc((task_fn)(recv, $($arg),*));
macro_helpers::notify_scheduled_tasks();
result
}, macro_helpers::tracing::trace_span!("turbo_tasks::function", name)))
}, span.clone()))
}))
}
}
Expand Down Expand Up @@ -271,6 +277,8 @@ macro_rules! task_fn_impl {
)*

Ok(Box::new(move || {
let span = macro_helpers::tracing::trace_span!("turbo_tasks::function", name);
let _entered = span.enter();
let task_fn = task_fn.clone();
let recv = recv.clone();
$(
Expand All @@ -283,7 +291,7 @@ macro_rules! task_fn_impl {
let result = <F as $async_fn_trait<&Recv, $($arg,)*>>::Output::try_into_raw_vc((task_fn)(recv, $($arg),*).await);
macro_helpers::notify_scheduled_tasks();
result
}, macro_helpers::tracing::trace_span!("turbo_tasks::function", name)))
}, span.clone()))
}))
}
}
Expand Down
7 changes: 6 additions & 1 deletion crates/turbopack-build/src/chunking_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,16 @@ impl ChunkingContext for BuildChunkingContext {
)
.await?;

let assets: Vec<Vc<Box<dyn OutputAsset>>> = chunks
let mut assets: Vec<Vc<Box<dyn OutputAsset>>> = chunks
.iter()
.map(|chunk| self.generate_chunk(*chunk))
.collect();

// Resolve assets
for asset in assets.iter_mut() {
*asset = asset.resolve().await?;
}

Ok(Vc::cell(assets))
}

Expand Down
26 changes: 15 additions & 11 deletions crates/turbopack-convert-trace/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,10 +602,10 @@ fn main() {
if graph && !items.is_empty() {
let parent_name = &*span.name;
let mut groups = IndexMap::new();
let mut self_items = Vec::new();
let mut self_items = 0;
fn add_items_to_groups<'a>(
groups: &mut IndexMap<Cow<'a, str>, Vec<SpanItem>>,
self_items: &mut Vec<SpanItem>,
groups: &mut IndexMap<(Cow<'a, str>, usize), Vec<SpanItem>>,
self_items: &mut usize,
spans: &mut Vec<Span<'a>>,
parent_count: &mut u32,
parent_name: &str,
Expand All @@ -615,7 +615,17 @@ fn main() {
for item in items {
match item {
SpanItem::SelfTime { .. } => {
self_items.push(item);
if let Some(((key, _), last)) = groups.last_mut() {
if key == &Cow::Borrowed("SELF_TIME") {
last.push(item);
continue;
}
}
groups.insert(
(Cow::Borrowed("SELF_TIME"), *self_items),
vec![item],
);
*self_items += 1;
}
SpanItem::Child(id) => {
let key = spans[id].name.clone();
Expand All @@ -634,7 +644,7 @@ fn main() {
);
add_to_span_counter();
} else {
let group = groups.entry(key).or_default();
let group = groups.entry((key, 0)).or_default();
if !group.is_empty() {
add_to_span_counter();
}
Expand All @@ -653,12 +663,6 @@ fn main() {
items,
&mut add_to_span_counter,
);
if !self_items.is_empty() {
groups
.entry(Cow::Borrowed("SELF_TIME"))
.or_default()
.append(&mut self_items);
}
let groups = groups.into_values().collect::<Vec<_>>();
let mut new_items = Vec::new();
for group in groups {
Expand Down
23 changes: 7 additions & 16 deletions crates/turbopack-core/src/chunk/chunking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ use indexmap::IndexMap;
use once_cell::sync::Lazy;
use regex::Regex;
use tracing::Level;
use turbo_tasks::{keyed_cell, ReadRef, TryJoinIterExt, ValueToString, Vc};
use turbo_tasks::{ReadRef, TryJoinIterExt, ValueToString, Vc};

use super::{
AsyncModuleInfo, Chunk, ChunkItem, ChunkItemsWithAsyncModuleInfo, ChunkType, ChunkingContext,
};
use super::{AsyncModuleInfo, Chunk, ChunkItem, ChunkType, ChunkingContext};
use crate::output::{OutputAsset, OutputAssets};

/// Creates chunks based on heuristics for the passed `chunk_items`. Also
Expand All @@ -40,7 +38,6 @@ pub async fn make_chunks(
}

let mut referenced_output_assets = Vc::cell(referenced_output_assets);
let other_referenced_output_assets = Vc::cell(Vec::new());

let mut chunks = Vec::new();
for (ty, chunk_items) in map {
Expand All @@ -65,7 +62,7 @@ pub async fn make_chunks(
chunking_context,
chunks: &mut chunks,
referenced_output_assets: &mut referenced_output_assets,
empty_referenced_output_assets: other_referenced_output_assets,
empty_referenced_output_assets: OutputAssets::empty().resolve().await?,
};

app_vendors_split(
Expand Down Expand Up @@ -128,16 +125,10 @@ async fn make_chunk(
split_context.chunks.push(
split_context.ty.chunk(
split_context.chunking_context,
keyed_cell(
take(key),
ChunkItemsWithAsyncModuleInfo(
chunk_items
.into_iter()
.map(|(chunk_item, async_info, ..)| (chunk_item, async_info))
.collect(),
),
)
.await?,
chunk_items
.into_iter()
.map(|(chunk_item, async_info, ..)| (chunk_item, async_info))
.collect(),
replace(
split_context.referenced_output_assets,
split_context.empty_referenced_output_assets,
Expand Down
5 changes: 1 addition & 4 deletions crates/turbopack-core/src/chunk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ pub trait ChunkType: ValueToString {
fn chunk(
&self,
chunking_context: Vc<Box<dyn ChunkingContext>>,
chunk_items: Vc<ChunkItemsWithAsyncModuleInfo>,
chunk_items: Vec<ChunkItemWithAsyncModuleInfo>,
referenced_output_assets: Vc<OutputAssets>,
) -> Vc<Box<dyn Chunk>>;

Expand Down Expand Up @@ -611,9 +611,6 @@ impl AsyncModuleInfo {

pub type ChunkItemWithAsyncModuleInfo = (Vc<Box<dyn ChunkItem>>, Option<Vc<AsyncModuleInfo>>);

#[turbo_tasks::value(transparent)]
pub struct ChunkItemsWithAsyncModuleInfo(Vec<ChunkItemWithAsyncModuleInfo>);

pub trait ChunkItemExt: Send {
/// Returns the module id of this chunk item.
fn id(self: Vc<Self>) -> Vc<ModuleId>;
Expand Down
32 changes: 32 additions & 0 deletions crates/turbopack-core/src/resolve/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,35 @@ pub fn node_cjs_resolve_options(root: Vc<FileSystemPath>) -> Vc<ResolveOptions>
}
.cell()
}

#[turbo_tasks::function]
pub fn node_esm_resolve_options(root: Vc<FileSystemPath>) -> Vc<ResolveOptions> {
let conditions: ResolutionConditions = [
("node".to_string(), ConditionValue::Set),
("import".to_string(), ConditionValue::Set),
]
.into();
ResolveOptions {
extensions: vec![],
modules: vec![ResolveModules::Nested(
root,
vec!["node_modules".to_string()],
)],
into_package: vec![
ResolveIntoPackage::ExportsField {
conditions: conditions.clone(),
unspecified_conditions: ConditionValue::Unset,
},
ResolveIntoPackage::MainField("main".to_string()),
ResolveIntoPackage::Default("index.js".to_string()),
ResolveIntoPackage::Default("index.json".to_string()),
ResolveIntoPackage::Default("index.node".to_string()),
],
in_package: vec![ResolveInPackage::ImportsField {
conditions,
unspecified_conditions: ConditionValue::Unset,
}],
..Default::default()
}
.cell()
}
Loading

0 comments on commit ab2e77a

Please sign in to comment.