diff --git a/turbopack/crates/turbo-tasks/Cargo.toml b/turbopack/crates/turbo-tasks/Cargo.toml index 8bf86edb4d0dbc..40584135378c07 100644 --- a/turbopack/crates/turbo-tasks/Cargo.toml +++ b/turbopack/crates/turbo-tasks/Cargo.toml @@ -12,6 +12,7 @@ bench = false default = [] tokio_tracing = ["tokio/tracing"] hanging_detection = [] +local_resolution = [] [lints] workspace = true diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index 79f487d08021ba..8f6ac04af8584e 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -40,13 +40,13 @@ use crate::{ raw_vc::{CellId, RawVc}, registry::{self, get_function}, serialization_invalidation::SerializationInvalidator, - task::shared_reference::TypedSharedReference, + task::{local_task::LocalTask, shared_reference::TypedSharedReference}, trace::TraceRawVcs, trait_helpers::get_trait_method, util::StaticOrArc, vc::ReadVcFuture, - Completion, FunctionMeta, InvalidationReason, InvalidationReasonSet, SharedReference, TaskId, - TaskIdSet, ValueTypeId, Vc, VcRead, VcValueTrait, VcValueType, + Completion, FunctionMeta, InvalidationReason, InvalidationReasonSet, OutputContent, + SharedReference, TaskId, TaskIdSet, ValueTypeId, Vc, VcRead, VcValueTrait, VcValueType, }; pub trait TurboTasksCallApi: Sync + Send { @@ -317,7 +317,7 @@ pub struct UpdateInfo { placeholder_for_future_fields: (), } -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum TaskPersistence { /// Tasks that may be persisted across sessions using serialization. Persistent, @@ -406,8 +406,11 @@ struct CurrentGlobalTaskState { /// `CurrentGlobalTaskState`) when the task finishes executing. local_cells: Vec, + /// Local tasks created while this global task has been running. Indexed by `LocalTaskId`. + local_tasks: Vec, + /// Tracks currently running local tasks, and defers cleanup of the global task until those - /// complete. + /// complete. Also used by `detached_for_testing`. local_task_tracker: TaskTracker, backend_state: Box, @@ -421,10 +424,51 @@ impl CurrentGlobalTaskState { stateful: false, cell_counters: Some(AutoMap::default()), local_cells: Vec::new(), + local_tasks: Vec::new(), local_task_tracker: TaskTracker::new(), backend_state, } } + + fn assert_task_id(&self, expected_task_id: TaskId) { + if self.task_id != expected_task_id { + unimplemented!( + "Local tasks can currently only be scheduled/awaited within their parent task" + ); + } + } + + /// Create a [`LocalTask::Unscheduled`]. + #[cfg(feature = "local_resolution")] + fn create_local_task( + &mut self, + ty: CachedTaskType, + // if this is a `CachedTaskType::Resolve*`, we'll spawn another task with this persistence + persistence: TaskPersistence, + ) -> LocalTaskId { + use crate::task::local_task; + + self.local_tasks + .push(LocalTask::Unscheduled(Arc::new(local_task::Unscheduled { + ty, + persistence, + }))); + // generate a one-indexed id + if cfg!(debug_assertions) { + LocalTaskId::from(u32::try_from(self.local_tasks.len()).unwrap()) + } else { + unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) } + } + } + + fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask { + // local task ids are one-indexed (they use NonZeroU32) + &self.local_tasks[(*local_task_id as usize) - 1] + } + + fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask { + &mut self.local_tasks[(*local_task_id as usize) - 1] + } } /// Information specific to the current "local" task. A local task re-uses it's parent global task's @@ -434,8 +478,8 @@ impl CurrentGlobalTaskState { /// of the global task. #[derive(Clone)] struct CurrentLocalTaskState { - /// A unique identifier created for each unique [`CurrentLocalTaskState`]. Used to check that - /// [`CurrentTaskState::local_cells`] are valid for the current [`RawVc::LocalCell`]. + /// A unique identifier created for each unique[`CurrentLocalTaskState`]. Used to check that + /// [`CurrentTaskState::local_cells`] are valid for the currant [`RawVc::LocalCell`]. execution_id: ExecutionId, /// The function's metadata if this is a persistent task. Contains information about arguments @@ -633,28 +677,32 @@ impl TurboTasks { if registry::get_function(func).arg_meta.is_resolved(&*arg) { return self.native_call(func, arg, persistence); } + let task_type = CachedTaskType::ResolveNative { + fn_type: func, + this: None, + arg, + }; + #[cfg(feature = "local_resolution")] + return CURRENT_GLOBAL_TASK_STATE.with(move |gts| { + let mut gts_write = gts.write().unwrap(); + let local_task_id = gts_write.create_local_task(task_type, persistence); + RawVc::LocalOutput(gts_write.task_id, local_task_id) + }); + #[cfg(not(feature = "local_resolution"))] match persistence { TaskPersistence::LocalCells => { todo!("bgw: local tasks"); } TaskPersistence::Transient => { RawVc::TaskOutput(self.backend.get_or_create_transient_task( - CachedTaskType::ResolveNative { - fn_type: func, - this: None, - arg, - }, + task_type, current_task("turbo_function calls"), self, )) } TaskPersistence::Persistent => { RawVc::TaskOutput(self.backend.get_or_create_persistent_task( - CachedTaskType::ResolveNative { - fn_type: func, - this: None, - arg, - }, + task_type, current_task("turbo_function calls"), self, )) @@ -677,7 +725,14 @@ impl TurboTasks { this: Some(this), arg, }; - match persistence { + #[cfg(feature = "local_resolution")] + return CURRENT_GLOBAL_TASK_STATE.with(move |gts| { + let mut gts_write = gts.write().unwrap(); + let local_task_id = gts_write.create_local_task(task_type, persistence); + RawVc::LocalOutput(gts_write.task_id, local_task_id) + }); + #[cfg(not(feature = "local_resolution"))] + return match persistence { TaskPersistence::LocalCells => { todo!("bgw: local tasks"); } @@ -695,7 +750,7 @@ impl TurboTasks { self, )) } - } + }; } pub fn trait_call( @@ -727,7 +782,15 @@ impl TurboTasks { this, arg, }; - match persistence { + + #[cfg(feature = "local_resolution")] + return CURRENT_GLOBAL_TASK_STATE.with(move |gts| { + let mut gts_write = gts.write().unwrap(); + let local_task_id = gts_write.create_local_task(task_type, persistence); + RawVc::LocalOutput(gts_write.task_id, local_task_id) + }); + #[cfg(not(feature = "local_resolution"))] + return match persistence { TaskPersistence::LocalCells => { todo!("bgw: local tasks"); } @@ -745,7 +808,7 @@ impl TurboTasks { self, )) } - } + }; } #[track_caller] @@ -753,9 +816,6 @@ impl TurboTasks { self.begin_primary_job(); self.scheduled_tasks.fetch_add(1, Ordering::AcqRel); - #[cfg(feature = "tokio_tracing")] - let description = self.backend.get_task_description(task_id); - let this = self.pin(); let future = async move { let mut schedule_again = true; @@ -831,6 +891,102 @@ impl TurboTasks { let future = TURBO_TASKS.scope(self.pin(), future).in_current_span(); + #[cfg(feature = "tokio_tracing")] + { + let description = self.backend.get_task_description(task_id); + tokio::task::Builder::new() + .name(&description) + .spawn(future) + .unwrap(); + } + #[cfg(not(feature = "tokio_tracing"))] + tokio::task::spawn(future); + } + + fn schedule_local_task(&self, parent_task_id: TaskId, local_task_id: LocalTaskId) { + let Some((global_task_state, unscheduled_local_task)) = + CURRENT_GLOBAL_TASK_STATE.with(|gts| { + let mut gts_write = gts.write().unwrap(); + gts_write.assert_task_id(parent_task_id); + let local_task = gts_write.get_mut_local_task(local_task_id); + let LocalTask::Unscheduled(unscheduled_local_task) = local_task else { + return None; + }; + let unscheduled_local_task = Arc::clone(unscheduled_local_task); + *local_task = LocalTask::Scheduled { + done_event: Event::new({ + let ult = Arc::clone(&unscheduled_local_task); + move || format!("LocalTask({})::done_event", ult.ty) + }), + }; + + Some((Arc::clone(gts), unscheduled_local_task)) + }) + else { + // it's either already scheduled or already done + return; + }; + + let local_task_state = CurrentLocalTaskState::new( + self.execution_id_factory.get(), + unscheduled_local_task + .ty + .try_get_function_id() + .map(|func_id| &get_function(func_id).function_meta), + ); + + #[cfg(feature = "tokio_tracing")] + let description = format!( + "[local] (parent: {}) {}", + self.backend.get_task_description(parent_task_id), + unscheduled_local_task.ty, + ); + + let this = self.pin(); + let future = async move { + let TaskExecutionSpec { future, span } = unscheduled_local_task.start_execution(&*this); + async move { + let (result, _duration, _memory_usage) = + CaptureFuture::new(AssertUnwindSafe(future).catch_unwind()).await; + + let result = result.map_err(|any| match any.downcast::() { + Ok(owned) => Some(Cow::Owned(*owned)), + Err(any) => match any.downcast::<&'static str>() { + Ok(str) => Some(Cow::Borrowed(*str)), + Err(_) => None, + }, + }); + let local_task = LocalTask::Done { + output: match result { + Ok(Ok(raw_vc)) => OutputContent::Link(raw_vc), + Ok(Err(err)) => OutputContent::Error(err.into()), + Err(panic_err) => OutputContent::Panic(panic_err.map(Box::new)), + }, + }; + + let done_event = CURRENT_GLOBAL_TASK_STATE.with(move |gts| { + let mut gts_write = gts.write().unwrap(); + let scheduled_task = + std::mem::replace(gts_write.get_mut_local_task(local_task_id), local_task); + let LocalTask::Scheduled { done_event } = scheduled_task else { + panic!("local task finished, but was not in the scheduled state?"); + }; + done_event + }); + done_event.notify(usize::MAX) + } + .instrument(span) + .await + }; + let future = global_task_state + .read() + .unwrap() + .local_task_tracker + .track_future(future); + let future = CURRENT_LOCAL_TASK_STATE.scope(local_task_state, future); + let future = CURRENT_GLOBAL_TASK_STATE.scope(global_task_state, future); + let future = TURBO_TASKS.scope(self.pin(), future).in_current_span(); + #[cfg(feature = "tokio_tracing")] tokio::task::Builder::new() .name(&description) @@ -1325,22 +1481,42 @@ impl TurboTasksApi for TurboTasks { fn try_read_local_output( &self, - _parent_task_id: TaskId, - _local_task_id: LocalTaskId, - _consistency: ReadConsistency, + parent_task_id: TaskId, + local_task_id: LocalTaskId, + consistency: ReadConsistency, ) -> Result> { - todo!("bgw: local outputs"); + // we don't currently support reading a local output outside of it's own task, so + // tracked/untracked is currently irrelevant + self.try_read_local_output_untracked(parent_task_id, local_task_id, consistency) } /// INVALIDATION: Be careful with this, it will not track dependencies, so /// using it could break cache invalidation. fn try_read_local_output_untracked( &self, - _parent_task_id: TaskId, - _local_task_id: LocalTaskId, + parent_task_id: TaskId, + local_task_id: LocalTaskId, + // we don't currently support reading a local output outside of it's own task, so + // consistency is currently irrelevant _consistency: ReadConsistency, ) -> Result> { - todo!("bgw: local outputs"); + CURRENT_GLOBAL_TASK_STATE.with(|gts| loop { + let gts_read = gts.read().unwrap(); + gts_read.assert_task_id(parent_task_id); + match gts_read.get_local_task(local_task_id) { + LocalTask::Unscheduled(..) => { + drop(gts_read); + self.schedule_local_task(parent_task_id, local_task_id); + continue; + } + LocalTask::Scheduled { done_event } => { + return Ok(Err(done_event.listen())); + } + LocalTask::Done { output } => { + return Ok(Ok(output.read_untracked()?)); + } + } + }) } fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap { @@ -1673,7 +1849,10 @@ pub fn with_turbo_tasks_for_testing( current_task, Box::new(()), ))), - CURRENT_LOCAL_TASK_STATE.scope(CurrentLocalTaskState::new(execution_id, None), f), + CURRENT_LOCAL_TASK_STATE.scope( + CurrentLocalTaskState::new(execution_id, /* function_meta */ None), + f, + ), ), ) } @@ -2029,12 +2208,17 @@ pub(crate) fn read_local_cell( } pub(crate) async fn read_local_output( - _this: &dyn TurboTasksApi, - _task_id: TaskId, - _local_output_id: LocalTaskId, - _consistency: ReadConsistency, + this: &dyn TurboTasksApi, + parent_task_id: TaskId, + local_task_id: LocalTaskId, + consistency: ReadConsistency, ) -> Result { - todo!("bgw: local outputs"); + loop { + match this.try_read_local_output(parent_task_id, local_task_id, consistency)? { + Ok(raw_vc) => return Ok(raw_vc), + Err(event_listener) => event_listener.await, + } + } } /// Panics if the [`ExecutionId`] does not match the current task's diff --git a/turbopack/crates/turbo-tasks/src/task/local_task.rs b/turbopack/crates/turbo-tasks/src/task/local_task.rs new file mode 100644 index 00000000000000..657e1f7a873032 --- /dev/null +++ b/turbopack/crates/turbo-tasks/src/task/local_task.rs @@ -0,0 +1,82 @@ +use std::sync::Arc; + +use crate::{ + backend::{Backend, CachedTaskType, TaskExecutionSpec}, + event::Event, + registry, OutputContent, TaskPersistence, TurboTasksBackendApi, +}; + +/// A potentially in-flight local task stored in `CurrentGlobalTaskState::local_tasks`. +pub enum LocalTask { + Unscheduled(Arc), + Scheduled { done_event: Event }, + Done { output: OutputContent }, +} + +pub struct Unscheduled { + pub ty: CachedTaskType, + /// if this is a `CachedTaskType::Resolve*`, we'll spawn another task with this persistence + pub persistence: TaskPersistence, +} + +impl Unscheduled { + pub fn start_execution<'a>( + &'a self, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> TaskExecutionSpec<'a> { + let Self { ty, persistence } = self; + match ty { + CachedTaskType::Native { + fn_type: native_fn_id, + this, + arg, + } => { + debug_assert_eq!(persistence, &TaskPersistence::LocalCells); + let func = registry::get_function(*native_fn_id); + let span = func.span(); + let entered = span.enter(); + let future = func.execute(*this, &**arg); + drop(entered); + TaskExecutionSpec { future, span } + } + CachedTaskType::ResolveNative { + fn_type: native_fn_id, + this, + arg, + } => { + let func = registry::get_function(*native_fn_id); + let span = func.resolve_span(); + let entered = span.enter(); + let future = Box::pin(CachedTaskType::run_resolve_native( + *native_fn_id, + *this, + &**arg, + *persistence, + turbo_tasks.pin(), + )); + drop(entered); + TaskExecutionSpec { future, span } + } + CachedTaskType::ResolveTrait { + trait_type: trait_type_id, + method_name: name, + this, + arg, + } => { + let trait_type = registry::get_trait(*trait_type_id); + let span = trait_type.resolve_span(name); + let entered = span.enter(); + let future = Box::pin(CachedTaskType::run_resolve_trait( + *trait_type_id, + name.clone(), + *this, + &**arg, + *persistence, + turbo_tasks.pin(), + )); + drop(entered); + TaskExecutionSpec { future, span } + } + } + } +} diff --git a/turbopack/crates/turbo-tasks/src/task/mod.rs b/turbopack/crates/turbo-tasks/src/task/mod.rs index 963a3a74a99d1b..4ad8840c556223 100644 --- a/turbopack/crates/turbo-tasks/src/task/mod.rs +++ b/turbopack/crates/turbo-tasks/src/task/mod.rs @@ -1,4 +1,5 @@ pub(crate) mod function; +pub mod local_task; pub(crate) mod shared_reference; pub(crate) mod task_input; pub(crate) mod task_output;