From 4d3880dad1c83f8a2fa8d3b29e3af11778953971 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Mon, 7 Oct 2024 18:22:58 +0200 Subject: [PATCH 01/14] add support for session dependent tasks --- .../turbo-tasks-backend/src/backend/mod.rs | 106 +++++++++++-- .../backend/operation/aggregation_update.rs | 129 ++++++++-------- .../src/backend/operation/invalidate.rs | 64 ++++++-- .../src/backend/operation/mod.rs | 6 +- .../src/backing_storage.rs | 4 +- .../crates/turbo-tasks-backend/src/data.rs | 145 ++++++++++++++++-- .../src/lmdb_backing_storage.rs | 25 ++- .../crates/turbo-tasks-testing/src/lib.rs | 2 +- turbopack/crates/turbo-tasks/src/backend.rs | 2 +- turbopack/crates/turbo-tasks/src/id.rs | 1 + turbopack/crates/turbo-tasks/src/lib.rs | 5 +- turbopack/crates/turbo-tasks/src/manager.rs | 13 +- turbopack/crates/turbo-tasks/src/state.rs | 4 +- 13 files changed, 384 insertions(+), 122 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 2d3b594ba1fa3..130f60552195d 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -31,8 +31,8 @@ use turbo_tasks::{ event::{Event, EventListener}, registry, util::IdFactoryWithReuse, - CellId, FunctionId, RawVc, ReadConsistency, TaskId, TraitTypeId, TurboTasksBackendApi, - ValueTypeId, TRANSIENT_TASK_BIT, + CellId, FunctionId, RawVc, ReadConsistency, SessionId, TaskId, TraitTypeId, + TurboTasksBackendApi, ValueTypeId, TRANSIENT_TASK_BIT, }; pub use self::{operation::AnyOperation, storage::TaskDataCategory}; @@ -43,13 +43,13 @@ use crate::{ AggregationUpdateQueue, CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, OutdatedEdge, }, - storage::{get, get_many, iter_many, remove, Storage}, + storage::{get, get_many, get_mut, iter_many, remove, Storage}, }, backing_storage::{BackingStorage, ReadTransaction}, data::{ ActiveType, AggregationNumber, CachedDataItem, CachedDataItemIndex, CachedDataItemKey, CachedDataItemValue, CachedDataUpdate, CellRef, CollectibleRef, CollectiblesRef, - InProgressCellState, InProgressState, OutputValue, RootState, + DirtyState, InProgressCellState, InProgressState, OutputValue, RootState, }, utils::{bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc, sharded::Sharded}, }; @@ -99,6 +99,7 @@ pub struct TurboTasksBackend(Arc); struct TurboTasksBackendInner { start_time: Instant, + session_id: SessionId, persisted_task_id_factory: IdFactoryWithReuse, transient_task_id_factory: IdFactoryWithReuse, @@ -149,6 +150,7 @@ impl TurboTasksBackendInner { (available_parallelism().map_or(4, |v| v.get()) * 64).next_power_of_two(); Self { start_time: Instant::now(), + session_id: backing_storage.next_session_id(), persisted_task_id_factory: IdFactoryWithReuse::new( *backing_storage.next_free_task_id() as u64, (TRANSIENT_TASK_BIT - 1) as u64, @@ -183,6 +185,10 @@ impl TurboTasksBackendInner { ExecuteContext::new(self, turbo_tasks) } + fn session_id(&self) -> SessionId { + self.session_id + } + /// # Safety /// /// `tx` must be a transaction from this TurboTasksBackendInner instance. @@ -347,12 +353,22 @@ impl TurboTasksBackendInner { task = ctx.task(task_id, TaskDataCategory::All); } + let is_dirty = get!(task, Dirty) + .map(|dirty_state| { + dirty_state + .clean_in_session + .map(|clean_in_session| clean_in_session != self.session_id) + .unwrap_or(true) + }) + .unwrap_or(false); + // Check the dirty count of the root node let dirty_tasks = get!(task, AggregatedDirtyContainerCount) .copied() - .unwrap_or_default(); - let root = get!(task, AggregateRoot); - if dirty_tasks > 0 { + .unwrap_or_default() + .get(self.session_id); + if dirty_tasks > 0 || is_dirty { + let root = get!(task, AggregateRoot); // When there are dirty task, subscribe to the all_clean_event let root = if let Some(root) = root { root @@ -591,6 +607,7 @@ impl TurboTasksBackendInner { { new_items = true; if let Err(err) = self.backing_storage.save_snapshot( + self.session_id, suspended_operations, persisted_task_cache_log, persisted_storage_meta_log, @@ -813,6 +830,7 @@ impl TurboTasksBackendInner { stale: false, once_task, done_event, + session_dependent: false, }, }); @@ -1033,6 +1051,7 @@ impl TurboTasksBackendInner { done_event, once_task: _, stale, + session_dependent, } = in_progress else { panic!("Task execution completed, but task is not in progress: {task:#?}"); @@ -1157,21 +1176,53 @@ impl TurboTasksBackendInner { .collect::>() }; - let was_dirty = task.remove(&CachedDataItemKey::Dirty {}).is_some(); - let data_update = if was_dirty { - let dirty_containers = get!(task, AggregatedDirtyContainerCount) + let new_dirty_state = if session_dependent { + Some(DirtyState { + clean_in_session: Some(self.session_id), + }) + } else { + None + }; + + let old_dirty = if let Some(new_dirty_state) = new_dirty_state { + task.insert(CachedDataItem::Dirty { + value: new_dirty_state, + }) + } else { + task.remove(&CachedDataItemKey::Dirty {}) + }; + + let old_dirty_state = old_dirty.map(|old_dirty| match old_dirty { + CachedDataItemValue::Dirty { value } => value, + _ => unreachable!(), + }); + + let data_update = if old_dirty_state.is_some() || new_dirty_state.is_some() { + let mut dirty_containers = get!(task, AggregatedDirtyContainerCount) .copied() .unwrap_or_default(); - if dirty_containers == 0 { - if let Some(root_state) = get!(task, AggregateRoot) { - root_state.all_clean_event.notify(usize::MAX); - if matches!(root_state.ty, ActiveType::CachedActiveUntilClean) { - task.remove(&CachedDataItemKey::AggregateRoot {}); + if let Some(old_dirty_state) = old_dirty_state { + dirty_containers.update_with_dirty_state(&old_dirty_state); + } + let aggregated_update = match (old_dirty_state, new_dirty_state) { + (None, None) => unreachable!(), + (Some(old), None) => dirty_containers.undo_update_with_dirty_state(&old), + (None, Some(new)) => dirty_containers.update_with_dirty_state(&new), + (Some(old), Some(new)) => dirty_containers.replace_dirty_state(&old, &new), + }; + if !aggregated_update.is_default() { + if aggregated_update.get(self.session_id) < 0 { + if let Some(root_state) = get!(task, AggregateRoot) { + root_state.all_clean_event.notify(usize::MAX); + if matches!(root_state.ty, ActiveType::CachedActiveUntilClean) { + task.remove(&CachedDataItemKey::AggregateRoot {}); + } } } AggregationUpdateJob::data_update( &mut task, - AggregatedDataUpdate::new().no_longer_dirty_container(task_id), + AggregatedDataUpdate::new() + .dirty_container_update(task_id, aggregated_update), ) } else { None @@ -1422,6 +1473,21 @@ impl TurboTasksBackendInner { ); } + fn mark_own_task_as_session_dependent( + &self, + task: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi, + ) { + let mut ctx = self.execute_context(turbo_tasks); + let mut task = ctx.task(task, TaskDataCategory::Data); + if let Some(InProgressState::InProgress { + session_dependent, .. + }) = get_mut!(task, InProgress) + { + *session_dependent = true; + } + } + fn connect_task( &self, task: TaskId, @@ -1677,6 +1743,14 @@ impl Backend for TurboTasksBackend { self.0.update_task_cell(task_id, cell, content, turbo_tasks); } + fn mark_own_task_as_session_dependent( + &self, + task: TaskId, + turbo_tasks: &dyn TurboTasksBackendApi, + ) { + self.0.mark_own_task_as_session_dependent(task, turbo_tasks); + } + fn connect_task( &self, task: TaskId, diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index 5e1a6133de45e..f7ab5c29cca9d 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -2,7 +2,7 @@ use std::{cmp::max, collections::VecDeque, num::NonZeroU32}; use serde::{Deserialize, Serialize}; use smallvec::SmallVec; -use turbo_tasks::TaskId; +use turbo_tasks::{SessionId, TaskId}; use crate::{ backend::{ @@ -11,7 +11,8 @@ use crate::{ TaskDataCategory, }, data::{ - ActiveType, AggregationNumber, CachedDataItem, CachedDataItemKey, CollectibleRef, RootState, + ActiveType, AggregationNumber, CachedDataItem, CachedDataItemKey, CollectibleRef, + DirtyContainerCount, RootState, }, }; @@ -116,41 +117,51 @@ impl AggregationUpdateJob { #[derive(Default, Serialize, Deserialize, Clone, Debug)] pub struct AggregatedDataUpdate { - dirty_container_update: Option<(TaskId, i32)>, + dirty_container_update: Option<(TaskId, DirtyContainerCount)>, collectibles_update: Vec<(CollectibleRef, i32)>, } impl AggregatedDataUpdate { fn from_task(task: &mut TaskGuard<'_>) -> Self { let aggregation = get_aggregation_number(task); - let mut dirty = get!(task, Dirty).is_some(); + let mut dirty_container_count = Default::default(); let mut collectibles_update: Vec<_> = get_many!(task, Collectible { collectible } => (collectible, 1)); if is_aggregating_node(aggregation) { - let dirty_container_count = get!(task, AggregatedDirtyContainerCount) + dirty_container_count = get!(task, AggregatedDirtyContainerCount) .copied() - .unwrap_or(0); - if dirty_container_count > 0 { - dirty = true; - } - for collectible in iter_many!( + .unwrap_or_default(); + let collectibles = iter_many!( task, AggregatedCollectible { collectible } count if count > 0 => { collectible } - ) { + ); + for collectible in collectibles { collectibles_update.push((collectible, 1)); } } - if dirty { - Self::new() - .dirty_container(task.id()) - .collectibles_update(collectibles_update) - } else { - Self::new().collectibles_update(collectibles_update) + if let Some(dirty) = get!(task, Dirty) { + dirty_container_count.update_with_dirty_state(dirty); } + + let mut result = Self::new().collectibles_update(collectibles_update); + if !dirty_container_count.is_default() { + let DirtyContainerCount { + count, + count_in_session, + } = dirty_container_count; + result = result.dirty_container_update( + task.id(), + DirtyContainerCount { + count: if count > 0 { 1 } else { 0 }, + count_in_session: count_in_session.map(|(s, c)| (s, if c > 0 { 1 } else { 0 })), + }, + ); + } + result } fn invert(mut self) -> Self { @@ -159,7 +170,7 @@ impl AggregatedDataUpdate { collectibles_update, } = &mut self; if let Some((_, value)) = dirty_container_update.as_mut() { - *value = -*value; + *value = value.invert() } for (_, value) in collectibles_update.iter_mut() { *value = -*value; @@ -170,6 +181,7 @@ impl AggregatedDataUpdate { fn apply( &self, task: &mut TaskGuard<'_>, + session_id: SessionId, queue: &mut AggregationUpdateQueue, ) -> AggregatedDataUpdate { let Self { @@ -180,51 +192,47 @@ impl AggregatedDataUpdate { if let Some((dirty_container_id, count)) = dirty_container_update { // When a dirty container count is increased and the task is considered as active // `AggregateRoot` we need to schedule the dirty tasks in the new dirty container - if *count > 0 && task.has_key(&CachedDataItemKey::AggregateRoot {}) { + let current_session_update = count.get(session_id); + if current_session_update > 0 && task.has_key(&CachedDataItemKey::AggregateRoot {}) { queue.push(AggregationUpdateJob::FindAndScheduleDirty { task_ids: vec![*dirty_container_id], }) } - let mut added = false; - let mut removed = false; + let mut aggregated_update = Default::default(); update!( task, AggregatedDirtyContainer { task: *dirty_container_id }, - |old: Option| { - let old = old.unwrap_or(0); - let new = old + *count; - if old <= 0 && new > 0 { - added = true; - } else if old > 0 && new <= 0 { - removed = true; - } - (new != 0).then_some(new) + |old: Option| { + let mut new = old.unwrap_or_default(); + aggregated_update = new.update_count(count); + (!new.is_default()).then_some(new) } ); - let mut count_update = 0; - if added { - count_update += 1; - } else if removed { - count_update -= 1; - } - let dirty = task.has_key(&CachedDataItemKey::Dirty {}); + + let dirty_state = get!(task, Dirty).copied(); let task_id = task.id(); - update!(task, AggregatedDirtyContainerCount, |old: Option| { - let old = old.unwrap_or(0); - let new = old + count_update; - if !dirty { - if old <= 0 && new > 0 { - result.dirty_container_update = Some((task_id, 1)); - } else if old > 0 && new <= 0 { - result.dirty_container_update = Some((task_id, -1)); - } + update!(task, AggregatedDirtyContainerCount, |old: Option< + DirtyContainerCount, + >| { + let mut new = old.unwrap_or_default(); + if let Some(dirty_state) = dirty_state { + new.update_with_dirty_state(&dirty_state); + } + let aggregated_update = new.update_count(&aggregated_update); + if let Some(dirty_state) = dirty_state { + new.undo_update_with_dirty_state(&dirty_state); } - (new != 0).then_some(new) + if !aggregated_update.is_default() { + result.dirty_container_update = Some((task_id, aggregated_update)); + } + (!new.is_default()).then_some(new) }); if let Some((_, count)) = result.dirty_container_update.as_ref() { - if *count < 0 { + if count.get(session_id) < 0 { + // When the current task is no longer dirty, we need to fire the aggregate root + // events and do some cleanup if let Some(root_state) = get!(task, AggregateRoot) { root_state.all_clean_event.notify(usize::MAX); if matches!(root_state.ty, ActiveType::CachedActiveUntilClean) { @@ -294,13 +302,8 @@ impl AggregatedDataUpdate { } } - pub fn dirty_container(mut self, task_id: TaskId) -> Self { - self.dirty_container_update = Some((task_id, 1)); - self - } - - pub fn no_longer_dirty_container(mut self, task_id: TaskId) -> Self { - self.dirty_container_update = Some((task_id, -1)); + pub fn dirty_container_update(mut self, task_id: TaskId, count: DirtyContainerCount) -> Self { + self.dirty_container_update = Some((task_id, count)); self } @@ -497,7 +500,7 @@ impl AggregationUpdateQueue { // followers let data = AggregatedDataUpdate::from_task(&mut task); let followers = get_followers(&task); - let diff = data.apply(&mut upper, self); + let diff = data.apply(&mut upper, ctx.session_id(), self); if !upper_ids.is_empty() && !diff.is_empty() { // Notify uppers about changed aggregated data @@ -545,7 +548,7 @@ impl AggregationUpdateQueue { // followers let data = AggregatedDataUpdate::from_task(&mut task).invert(); let followers = get_followers(&task); - let diff = data.apply(&mut upper, self); + let diff = data.apply(&mut upper, ctx.session_id(), self); if !upper_ids.is_empty() && !diff.is_empty() { self.push(AggregationUpdateJob::AggregatedDataUpdate { upper_ids: upper_ids.clone(), @@ -591,8 +594,8 @@ impl AggregationUpdateQueue { value: RootState::new(ActiveType::CachedActiveUntilClean, task_id), }); } - let dirty_containers: Vec<_> = - get_many!(task, AggregatedDirtyContainer { task } count if count > 0 => task); + let session_id = ctx.session_id(); + let dirty_containers: Vec<_> = get_many!(task, AggregatedDirtyContainer { task } count if count.get(session_id) > 0 => task); if !dirty_containers.is_empty() { self.push(AggregationUpdateJob::FindAndScheduleDirty { task_ids: dirty_containers, @@ -610,7 +613,7 @@ impl AggregationUpdateQueue { ) { for upper_id in upper_ids { let mut upper = ctx.task(upper_id, TaskDataCategory::Meta); - let diff = update.apply(&mut upper, self); + let diff = update.apply(&mut upper, ctx.session_id(), self); if !diff.is_empty() { let upper_ids = get_uppers(&upper); if !upper_ids.is_empty() { @@ -659,7 +662,7 @@ impl AggregationUpdateQueue { for upper_id in upper_ids.iter() { // remove data from upper let mut upper = ctx.task(*upper_id, TaskDataCategory::Meta); - let diff = data.apply(&mut upper, self); + let diff = data.apply(&mut upper, ctx.session_id(), self); if !diff.is_empty() { let upper_ids = get_uppers(&upper); self.push(AggregationUpdateJob::AggregatedDataUpdate { @@ -744,7 +747,7 @@ impl AggregationUpdateQueue { for upper_id in upper_ids.iter() { // add data to upper let mut upper = ctx.task(*upper_id, TaskDataCategory::Meta); - let diff = data.apply(&mut upper, self); + let diff = data.apply(&mut upper, ctx.session_id(), self); if !diff.is_empty() { let upper_ids = get_uppers(&upper); self.push(AggregationUpdateJob::AggregatedDataUpdate { @@ -846,7 +849,7 @@ impl AggregationUpdateQueue { let diffs = upper_data_updates .into_iter() .filter_map(|data| { - let diff = data.apply(&mut upper, self); + let diff = data.apply(&mut upper, ctx.session_id(), self); (!diff.is_empty()).then_some(diff) }) .collect::>(); diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs index c3d9162050f5d..c03cf974d3340 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs @@ -13,7 +13,7 @@ use crate::{ storage::{get, get_mut}, TaskDataCategory, }, - data::{CachedDataItem, CachedDataItemKey, InProgressState}, + data::{CachedDataItem, CachedDataItemKey, CachedDataItemValue, DirtyState, InProgressState}, }; #[derive(Serialize, Deserialize, Clone, Default)] @@ -91,22 +91,54 @@ pub fn make_task_dirty_internal( *stale = true; } } - if task.add(CachedDataItem::Dirty { value: () }) { - let dirty_container = get!(task, AggregatedDirtyContainerCount) - .copied() - .unwrap_or_default(); - if dirty_container == 0 { - queue.extend(AggregationUpdateJob::data_update( - task, - AggregatedDataUpdate::new().dirty_container(task_id), - )); + let old = task.insert(CachedDataItem::Dirty { + value: DirtyState { + clean_in_session: None, + }, + }); + let mut dirty_container = match old { + Some(CachedDataItemValue::Dirty { + value: DirtyState { + clean_in_session: None, + }, + }) => { + // already dirty + return; } - let root = task.has_key(&CachedDataItemKey::AggregateRoot {}); - if root { - let description = ctx.backend.get_task_desc_fn(task_id); - if task.add(CachedDataItem::new_scheduled(description)) { - ctx.schedule(task_id); - } + Some(CachedDataItemValue::Dirty { + value: DirtyState { + clean_in_session: Some(session_id), + }, + }) => { + // Got dirty in that one session only + let mut dirty_container = get!(task, AggregatedDirtyContainerCount) + .copied() + .unwrap_or_default(); + dirty_container.update_session_dependent(session_id, 1); + dirty_container + } + None => { + // Get dirty for all sessions + get!(task, AggregatedDirtyContainerCount) + .copied() + .unwrap_or_default() + } + _ => unreachable!(), + }; + let aggregated_update = dirty_container.update_with_dirty_state(&DirtyState { + clean_in_session: None, + }); + if !aggregated_update.is_default() { + queue.extend(AggregationUpdateJob::data_update( + task, + AggregatedDataUpdate::new().dirty_container_update(task_id, aggregated_update), + )); + } + let root = task.has_key(&CachedDataItemKey::AggregateRoot {}); + if root { + let description = ctx.backend.get_task_desc_fn(task_id); + if task.add(CachedDataItem::new_scheduled(description)) { + ctx.schedule(task_id); } } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 515b16900a933..11fa77772d6b3 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -12,7 +12,7 @@ use std::{ }; use serde::{Deserialize, Serialize}; -use turbo_tasks::{KeyValuePair, TaskId, TurboTasksBackendApi}; +use turbo_tasks::{KeyValuePair, SessionId, TaskId, TurboTasksBackendApi}; use crate::{ backend::{ @@ -97,6 +97,10 @@ impl<'a> ExecuteContext<'a> { } } + pub fn session_id(&self) -> SessionId { + self.backend.session_id() + } + pub fn task(&mut self, task_id: TaskId, category: TaskDataCategory) -> TaskGuard<'a> { let mut task = self.backend.storage.access_mut(task_id); if !task.persistance_state().is_restored(category) { diff --git a/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs index 59be1ab742020..699518c1533b6 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use anyhow::Result; -use turbo_tasks::{backend::CachedTaskType, TaskId}; +use turbo_tasks::{backend::CachedTaskType, SessionId, TaskId}; use crate::{ backend::{AnyOperation, TaskDataCategory}, @@ -14,9 +14,11 @@ pub struct ReadTransaction(pub *const ()); pub trait BackingStorage { fn next_free_task_id(&self) -> TaskId; + fn next_session_id(&self) -> SessionId; fn uncompleted_operations(&self) -> Vec; fn save_snapshot( &self, + session_id: SessionId, operations: Vec>, task_cache_updates: Vec, TaskId)>>, meta_updates: Vec>, diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index ea42f2ed1a3dd..2bb7691d98d89 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -1,9 +1,11 @@ +use std::cmp::Ordering; + use serde::{Deserialize, Serialize}; use turbo_tasks::{ event::{Event, EventListener}, registry, util::SharedError, - CellId, KeyValuePair, TaskId, TraitTypeId, TypedSharedReference, ValueTypeId, + CellId, KeyValuePair, SessionId, TaskId, TraitTypeId, TypedSharedReference, ValueTypeId, }; use crate::backend::{indexed::Indexed, TaskDataCategory}; @@ -83,6 +85,134 @@ transient_traits!(RootState); impl Eq for RootState {} +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +pub struct DirtyState { + pub clean_in_session: Option, +} + +fn add_with_diff(v: &mut i32, u: i32) -> i32 { + let old = *v; + *v += u; + if old <= 0 && *v > 0 { + 1 + } else if old > 0 && *v <= 0 { + -1 + } else { + 0 + } +} + +#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +pub struct DirtyContainerCount { + pub count: i32, + pub count_in_session: Option<(SessionId, i32)>, +} + +impl DirtyContainerCount { + pub fn get(&self, session: SessionId) -> i32 { + if let Some((s, count)) = self.count_in_session { + if s == session { + return count; + } + } + self.count + } + + pub fn update(&mut self, count: i32) -> DirtyContainerCount { + self.update_count(&DirtyContainerCount { + count, + count_in_session: None, + }) + } + + pub fn update_session_dependent( + &mut self, + ignore_session: SessionId, + count: i32, + ) -> DirtyContainerCount { + self.update_count(&DirtyContainerCount { + count, + count_in_session: Some((ignore_session, 0)), + }) + } + + pub fn update_count(&mut self, count: &DirtyContainerCount) -> DirtyContainerCount { + let mut diff = DirtyContainerCount::default(); + match ( + self.count_in_session.as_mut(), + count.count_in_session.as_ref(), + ) { + (None, None) => {} + (Some((s, c)), None) => { + let d = add_with_diff(c, count.count); + diff.count_in_session = Some((*s, d)); + } + (None, Some((s, c))) => { + let mut new = self.count; + let d = add_with_diff(&mut new, *c); + self.count_in_session = Some((*s, new)); + diff.count_in_session = Some((*s, d)); + } + (Some((s1, c1)), Some((s2, c2))) => match (*s1).cmp(s2) { + Ordering::Less => { + let mut new = self.count; + let d = add_with_diff(&mut new, *c2); + self.count_in_session = Some((*s2, new)); + diff.count_in_session = Some((*s2, d)); + } + Ordering::Equal => { + let d = add_with_diff(c1, *c2); + diff.count_in_session = Some((*s1, d)); + } + Ordering::Greater => { + let d = add_with_diff(c1, count.count); + diff.count_in_session = Some((*s1, d)); + } + }, + } + let d = add_with_diff(&mut self.count, count.count); + diff.count = d; + diff + } + + pub fn update_with_dirty_state(&mut self, dirty: &DirtyState) -> DirtyContainerCount { + if let Some(clean_in_session) = dirty.clean_in_session { + self.update_session_dependent(clean_in_session, 1) + } else { + self.update(1) + } + } + + pub fn undo_update_with_dirty_state(&mut self, dirty: &DirtyState) -> DirtyContainerCount { + if let Some(clean_in_session) = dirty.clean_in_session { + self.update_session_dependent(clean_in_session, -1) + } else { + self.update(-1) + } + } + + pub fn replace_dirty_state( + &mut self, + old: &DirtyState, + new: &DirtyState, + ) -> DirtyContainerCount { + let mut diff = self.undo_update_with_dirty_state(old); + diff.update_count(&self.update_with_dirty_state(new)); + diff + } + + pub fn is_default(&self) -> bool { + self.count == 0 && self.count_in_session.map(|(_, c)| c == 0).unwrap_or(true) + } + + pub fn invert(&self) -> Self { + Self { + count: -self.count, + count_in_session: self.count_in_session.map(|(s, c)| (s, -c)), + } + } +} + #[derive(Debug, Clone, Copy)] pub enum ActiveType { RootTask, @@ -102,6 +232,7 @@ pub enum InProgressState { stale: bool, #[allow(dead_code)] once_task: bool, + session_dependent: bool, done_event: Event, }, } @@ -149,10 +280,7 @@ pub enum CachedDataItem { // State Dirty { - value: (), - }, - DirtyWhenPersisted { - value: (), + value: DirtyState, }, // Children @@ -217,14 +345,14 @@ pub enum CachedDataItem { // Aggregated Data AggregatedDirtyContainer { task: TaskId, - value: i32, + value: DirtyContainerCount, }, AggregatedCollectible { collectible: CollectibleRef, value: i32, }, AggregatedDirtyContainerCount { - value: i32, + value: DirtyContainerCount, }, // Transient Root Type @@ -284,7 +412,6 @@ impl CachedDataItem { !collectible.cell.task.is_transient() } CachedDataItem::Dirty { .. } => true, - CachedDataItem::DirtyWhenPersisted { .. } => true, CachedDataItem::Child { task, .. } => !task.is_transient(), CachedDataItem::CellData { .. } => true, CachedDataItem::CellTypeMaxIndex { .. } => true, @@ -349,7 +476,6 @@ impl CachedDataItemKey { !collectible.cell.task.is_transient() } CachedDataItemKey::Dirty { .. } => true, - CachedDataItemKey::DirtyWhenPersisted { .. } => true, CachedDataItemKey::Child { task, .. } => !task.is_transient(), CachedDataItemKey::CellData { .. } => true, CachedDataItemKey::CellTypeMaxIndex { .. } => true, @@ -403,7 +529,6 @@ impl CachedDataItemKey { CachedDataItemKey::AggregationNumber { .. } | CachedDataItemKey::Dirty { .. } - | CachedDataItemKey::DirtyWhenPersisted { .. } | CachedDataItemKey::Follower { .. } | CachedDataItemKey::Upper { .. } | CachedDataItemKey::AggregatedDirtyContainer { .. } diff --git a/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs index d7c81929fe038..0255c1ced3377 100644 --- a/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs @@ -17,7 +17,7 @@ use lmdb::{ }; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use tracing::Span; -use turbo_tasks::{backend::CachedTaskType, KeyValuePair, TaskId}; +use turbo_tasks::{backend::CachedTaskType, KeyValuePair, SessionId, TaskId}; use crate::{ backend::{AnyOperation, TaskDataCategory}, @@ -28,6 +28,7 @@ use crate::{ const META_KEY_OPERATIONS: u32 = 0; const META_KEY_NEXT_FREE_TASK_ID: u32 = 1; +const META_KEY_SESSION_ID: u32 = 2; struct IntKey([u8; 4]); @@ -138,6 +139,15 @@ impl BackingStorage for LmdbBackingStorage { TaskId::from(get(self).unwrap_or(1)) } + fn next_session_id(&self) -> SessionId { + fn get(this: &LmdbBackingStorage) -> Result { + let tx = this.env.begin_rw_txn()?; + let session_id = as_u32(tx.get(this.infra_db, &IntKey::new(META_KEY_SESSION_ID)))?; + Ok(session_id) + } + SessionId::from(get(self).unwrap_or(0) + 1) + } + fn uncompleted_operations(&self) -> Vec { fn get(this: &LmdbBackingStorage) -> Result> { let tx = this.env.begin_ro_txn()?; @@ -151,6 +161,7 @@ impl BackingStorage for LmdbBackingStorage { #[tracing::instrument(level = "trace", skip_all, fields(operations = operations.len(), task_cache_updates = task_cache_updates.len(), data_updates = data_updates.len()))] fn save_snapshot( &self, + session_id: SessionId, operations: Vec>, task_cache_updates: Vec, TaskId)>>, meta_updates: Vec>, @@ -208,6 +219,18 @@ impl BackingStorage for LmdbBackingStorage { }); }); + { + let _span = + tracing::trace_span!("update session id", session_id = ?session_id).entered(); + tx.put( + self.infra_db, + &IntKey::new(META_KEY_SESSION_ID), + &session_id.to_be_bytes(), + WriteFlags::empty(), + ) + .with_context(|| anyhow!("Unable to write next session id"))?; + } + let mut next_task_id = as_u32(tx.get(self.infra_db, &IntKey::new(META_KEY_NEXT_FREE_TASK_ID))) .unwrap_or(1); diff --git a/turbopack/crates/turbo-tasks-testing/src/lib.rs b/turbopack/crates/turbo-tasks-testing/src/lib.rs index 370194d94d151..1ea96cf8f6329 100644 --- a/turbopack/crates/turbo-tasks-testing/src/lib.rs +++ b/turbopack/crates/turbo-tasks-testing/src/lib.rs @@ -313,7 +313,7 @@ impl TurboTasksApi for VcStorage { // no-op } - fn mark_own_task_as_dirty_when_persisted(&self, _task: TaskId) { + fn mark_own_task_as_session_dependent(&self, _task: TaskId) { // no-op } diff --git a/turbopack/crates/turbo-tasks/src/backend.rs b/turbopack/crates/turbo-tasks/src/backend.rs index 42792a3f69506..3baaef2b92dd7 100644 --- a/turbopack/crates/turbo-tasks/src/backend.rs +++ b/turbopack/crates/turbo-tasks/src/backend.rs @@ -717,7 +717,7 @@ pub trait Backend: Sync + Send { // Do nothing by default } - fn mark_own_task_as_dirty_when_persisted( + fn mark_own_task_as_session_dependent( &self, _task: TaskId, _turbo_tasks: &dyn TurboTasksBackendApi, diff --git a/turbopack/crates/turbo-tasks/src/id.rs b/turbopack/crates/turbo-tasks/src/id.rs index 99c7b0997a047..ed3162a2f6d64 100644 --- a/turbopack/crates/turbo-tasks/src/id.rs +++ b/turbopack/crates/turbo-tasks/src/id.rs @@ -78,6 +78,7 @@ define_id!(ValueTypeId: u32); define_id!(TraitTypeId: u32); define_id!(BackendJobId: u32); define_id!(ExecutionId: u64, derive(Debug)); +define_id!(SessionId: u32, derive(Debug, Serialize, Deserialize), serde(transparent)); define_id!( LocalCellId: u32, derive(Debug), diff --git a/turbopack/crates/turbo-tasks/src/lib.rs b/turbopack/crates/turbo-tasks/src/lib.rs index 026c9484bfba0..e0d764f198dd0 100644 --- a/turbopack/crates/turbo-tasks/src/lib.rs +++ b/turbopack/crates/turbo-tasks/src/lib.rs @@ -86,7 +86,8 @@ pub use collectibles::CollectiblesSource; pub use completion::{Completion, Completions}; pub use display::ValueToString; pub use id::{ - ExecutionId, FunctionId, LocalTaskId, TaskId, TraitTypeId, ValueTypeId, TRANSIENT_TASK_BIT, + ExecutionId, FunctionId, LocalTaskId, SessionId, TaskId, TraitTypeId, ValueTypeId, + TRANSIENT_TASK_BIT, }; pub use invalidation::{ get_invalidator, DynamicEqHash, InvalidationReason, InvalidationReasonKind, @@ -96,7 +97,7 @@ pub use join_iter_ext::{JoinIterExt, TryFlatJoinIterExt, TryJoinIterExt}; pub use key_value_pair::KeyValuePair; pub use magic_any::MagicAny; pub use manager::{ - dynamic_call, dynamic_this_call, emit, mark_dirty_when_persisted, mark_finished, mark_stateful, + dynamic_call, dynamic_this_call, emit, mark_finished, mark_session_dependent, mark_stateful, prevent_gc, run_once, run_once_with_reason, spawn_blocking, spawn_thread, trait_call, turbo_tasks, turbo_tasks_scope, CurrentCellRef, ReadConsistency, TaskPersistence, TurboTasks, TurboTasksApi, TurboTasksBackendApi, TurboTasksBackendApiExt, TurboTasksCallApi, Unused, diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index e5dd9cc4b06ae..3b06e1ef04a85 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -183,7 +183,7 @@ pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send { fn read_own_task_cell(&self, task: TaskId, index: CellId) -> Result; fn update_own_task_cell(&self, task: TaskId, index: CellId, content: CellContent); fn mark_own_task_as_finished(&self, task: TaskId); - fn mark_own_task_as_dirty_when_persisted(&self, task: TaskId); + fn mark_own_task_as_session_dependent(&self, task: TaskId); fn connect_task(&self, task: TaskId); @@ -1408,9 +1408,8 @@ impl TurboTasksApi for TurboTasks { self.backend.mark_own_task_as_finished(task, self); } - fn mark_own_task_as_dirty_when_persisted(&self, task: TaskId) { - self.backend - .mark_own_task_as_dirty_when_persisted(task, self); + fn mark_own_task_as_session_dependent(&self, task: TaskId) { + self.backend.mark_own_task_as_session_dependent(task, self); } /// Creates a future that inherits the current task id and task state. The current global task @@ -1704,11 +1703,9 @@ pub fn current_task_for_testing() -> TaskId { } /// Marks the current task as dirty when restored from persistent cache. -pub fn mark_dirty_when_persisted() { +pub fn mark_session_dependent() { with_turbo_tasks(|tt| { - tt.mark_own_task_as_dirty_when_persisted(current_task( - "turbo_tasks::mark_dirty_when_persisted()", - )) + tt.mark_own_task_as_session_dependent(current_task("turbo_tasks::mark_session_dependent()")) }); } diff --git a/turbopack/crates/turbo-tasks/src/state.rs b/turbopack/crates/turbo-tasks/src/state.rs index e62b410aca236..90bc7928ab9b1 100644 --- a/turbopack/crates/turbo-tasks/src/state.rs +++ b/turbopack/crates/turbo-tasks/src/state.rs @@ -9,7 +9,7 @@ use parking_lot::{Mutex, MutexGuard}; use serde::{Deserialize, Serialize}; use crate::{ - get_invalidator, mark_dirty_when_persisted, mark_stateful, trace::TraceRawVcs, Invalidator, + get_invalidator, mark_session_dependent, mark_stateful, trace::TraceRawVcs, Invalidator, SerializationInvalidator, }; @@ -266,7 +266,7 @@ impl TransientState { /// as dependency of the state and will be invalidated when the state /// changes. pub fn get(&self) -> StateRef<'_, Option> { - mark_dirty_when_persisted(); + mark_session_dependent(); let invalidator = get_invalidator(); let mut inner = self.inner.lock(); inner.add_invalidator(invalidator); From 94936964706a98fc9cdba93ec1e4ef9f68c1b9e9 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Mon, 7 Oct 2024 18:23:47 +0200 Subject: [PATCH 02/14] make filesystem session dependent instead of invalidating on startup --- turbopack/crates/turbo-tasks-fs/src/lib.rs | 23 ++++++++----------- .../crates/turbo-tasks-fs/src/watcher.rs | 4 +--- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/turbopack/crates/turbo-tasks-fs/src/lib.rs b/turbopack/crates/turbo-tasks-fs/src/lib.rs index 8cdb819cdd95d..d08b88c31beda 100644 --- a/turbopack/crates/turbo-tasks-fs/src/lib.rs +++ b/turbopack/crates/turbo-tasks-fs/src/lib.rs @@ -58,8 +58,8 @@ use tokio::{ }; use tracing::Instrument; use turbo_tasks::{ - mark_stateful, trace::TraceRawVcs, Completion, Invalidator, RcStr, ReadRef, ResolvedVc, - SerializationInvalidator, ValueToString, Vc, + mark_session_dependent, trace::TraceRawVcs, Completion, Invalidator, RcStr, ReadRef, + ResolvedVc, ValueToString, Vc, }; use turbo_tasks_hash::{ hash_xxh3_hash128, hash_xxh3_hash64, DeterministicHash, DeterministicHasher, @@ -206,8 +206,6 @@ pub struct DiskFileSystem { invalidator_map: Arc, #[turbo_tasks(debug_ignore, trace_ignore)] dir_invalidator_map: Arc, - #[turbo_tasks(debug_ignore, trace_ignore)] - serialization_invalidator: SerializationInvalidator, /// Lock that makes invalidation atomic. It will keep a write lock during /// watcher invalidation and a read lock during other operations. #[turbo_tasks(debug_ignore, trace_ignore)] @@ -228,7 +226,6 @@ impl DiskFileSystem { fn register_invalidator(&self, path: &Path) -> Result<()> { let invalidator = turbo_tasks::get_invalidator(); self.invalidator_map.insert(path_to_key(path), invalidator); - self.serialization_invalidator.invalidate(); #[cfg(not(any(target_os = "macos", target_os = "windows")))] if let Some(dir) = path.parent() { self.watcher.ensure_watching(dir, self.root_path())?; @@ -244,7 +241,6 @@ impl DiskFileSystem { let mut invalidator_map = self.invalidator_map.lock().unwrap(); let old_invalidators = invalidator_map.insert(path_to_key(path), [invalidator].into()); drop(invalidator_map); - self.serialization_invalidator.invalidate(); #[cfg(not(any(target_os = "macos", target_os = "windows")))] if let Some(dir) = path.parent() { self.watcher.ensure_watching(dir, self.root_path())?; @@ -258,7 +254,6 @@ impl DiskFileSystem { let invalidator = turbo_tasks::get_invalidator(); self.dir_invalidator_map .insert(path_to_key(path), invalidator); - self.serialization_invalidator.invalidate(); #[cfg(not(any(target_os = "macos", target_os = "windows")))] self.watcher.ensure_watching(path, self.root_path())?; Ok(()) @@ -285,7 +280,6 @@ impl DiskFileSystem { let _guard = handle.enter(); i.invalidate() }); - self.serialization_invalidator.invalidate(); } pub fn invalidate_with_reason(&self) { @@ -309,7 +303,6 @@ impl DiskFileSystem { let _guard = handle.enter(); invalidator.invalidate_with_reason(reason) }); - self.serialization_invalidator.invalidate(); } pub fn start_watching(&self, poll_interval: Option) -> Result<()> { @@ -345,9 +338,7 @@ impl DiskFileSystem { invalidator_map, dir_invalidator_map, poll_interval, - self.serialization_invalidator.clone(), )?; - self.serialization_invalidator.invalidate(); Ok(()) } @@ -424,7 +415,7 @@ impl DiskFileSystem { /// ignore specific subpaths from each. #[turbo_tasks::function] pub async fn new(name: RcStr, root: RcStr, ignored_subpaths: Vec) -> Result> { - let serialization_invalidator = mark_stateful(); + mark_session_dependent(); // create the directory for the filesystem on disk, if it doesn't exist fs::create_dir_all(&root).await?; @@ -435,7 +426,6 @@ impl DiskFileSystem { invalidation_lock: Default::default(), invalidator_map: Arc::new(InvalidatorMap::new()), dir_invalidator_map: Arc::new(InvalidatorMap::new()), - serialization_invalidator, watcher: Arc::new(DiskWatcher::new( ignored_subpaths.into_iter().map(PathBuf::from).collect(), )), @@ -449,6 +439,7 @@ impl DiskFileSystem { &self, fs_path: Vc, ) -> Result> { + mark_session_dependent(); let full_path = self.to_sys_path(fs_path).await?; self.register_dir_invalidator(&full_path)?; @@ -572,6 +563,7 @@ impl FileSystem for DiskFileSystem { #[turbo_tasks::function(fs)] async fn read_link(&self, fs_path: Vc) -> Result> { + mark_session_dependent(); let full_path = self.to_sys_path(fs_path).await?; self.register_invalidator(&full_path)?; @@ -656,6 +648,7 @@ impl FileSystem for DiskFileSystem { #[turbo_tasks::function(fs)] async fn track(&self, fs_path: Vc) -> Result> { + mark_session_dependent(); let full_path = self.to_sys_path(fs_path).await?; self.register_invalidator(&full_path)?; Ok(Completion::new()) @@ -667,6 +660,7 @@ impl FileSystem for DiskFileSystem { fs_path: Vc, content: Vc, ) -> Result> { + mark_session_dependent(); let full_path = self.to_sys_path(fs_path).await?; let full_path = validate_path_length(&full_path)?; @@ -695,7 +689,6 @@ impl FileSystem for DiskFileSystem { for i in old_invalidators { self.invalidator_map.insert(key.clone(), i); } - self.serialization_invalidator.invalidate(); } return Ok(Completion::unchanged()); } @@ -784,6 +777,7 @@ impl FileSystem for DiskFileSystem { fs_path: Vc, target: Vc, ) -> Result> { + mark_session_dependent(); let full_path = self.to_sys_path(fs_path).await?; // TODO(sokra) preform a untracked read here, register an invalidator and get // all existing invalidators @@ -866,6 +860,7 @@ impl FileSystem for DiskFileSystem { #[turbo_tasks::function(fs)] async fn metadata(&self, fs_path: Vc) -> Result> { + mark_session_dependent(); let full_path = self.to_sys_path(fs_path).await?; self.register_invalidator(&full_path)?; diff --git a/turbopack/crates/turbo-tasks-fs/src/watcher.rs b/turbopack/crates/turbo-tasks-fs/src/watcher.rs index 9b71b2f86f7dc..50450d1360e6a 100644 --- a/turbopack/crates/turbo-tasks-fs/src/watcher.rs +++ b/turbopack/crates/turbo-tasks-fs/src/watcher.rs @@ -18,7 +18,7 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; use tracing::instrument; -use turbo_tasks::{spawn_thread, Invalidator, RcStr, SerializationInvalidator}; +use turbo_tasks::{spawn_thread, Invalidator, RcStr}; use crate::{ format_absolute_fs_path, @@ -144,7 +144,6 @@ impl DiskWatcher { invalidator_map: Arc, dir_invalidator_map: Arc, poll_interval: Option, - serialization_invalidator: SerializationInvalidator, ) -> Result<()> { let mut watcher_guard = self.watcher.lock().unwrap(); if watcher_guard.is_some() { @@ -215,7 +214,6 @@ impl DiskWatcher { invalidator.invalidate() }); } - serialization_invalidator.invalidate(); } watcher_guard.replace(watcher); From 214dedced02b48df0077a311eaa2d3e137108e19 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 8 Oct 2024 21:39:01 +0200 Subject: [PATCH 03/14] make DiskFileSystem::new not session dependent --- crates/next-api/src/project.rs | 8 +++-- turbopack/crates/node-file-trace/src/lib.rs | 2 +- .../turbo-tasks-fs/examples/hash_directory.rs | 2 +- .../turbo-tasks-fs/examples/hash_glob.rs | 2 +- .../crates/turbo-tasks-fs/src/embed/dir.rs | 2 +- .../crates/turbo-tasks-fs/src/embed/file.rs | 2 +- .../turbo-tasks-fs/src/invalidator_map.rs | 10 ++++-- turbopack/crates/turbo-tasks-fs/src/lib.rs | 32 ++++++++++++------- turbopack/crates/turbopack-cli/src/util.rs | 4 +-- .../crates/turbopack/examples/turbopack.rs | 2 +- 10 files changed, 43 insertions(+), 23 deletions(-) diff --git a/crates/next-api/src/project.rs b/crates/next-api/src/project.rs index bee43fb477d4f..efb87c07160ec 100644 --- a/crates/next-api/src/project.rs +++ b/crates/next-api/src/project.rs @@ -223,7 +223,9 @@ impl ProjectContainer { let project = self.project(); let project_fs = project.project_fs().strongly_consistent().await?; if watch.enable { - project_fs.start_watching_with_invalidation_reason(watch.poll_interval)?; + project_fs + .start_watching_with_invalidation_reason(watch.poll_interval) + .await?; } else { project_fs.invalidate_with_reason(); } @@ -304,7 +306,9 @@ impl ProjectContainer { if !ReadRef::ptr_eq(&prev_project_fs, &project_fs) { if watch.enable { // TODO stop watching: prev_project_fs.stop_watching()?; - project_fs.start_watching_with_invalidation_reason(watch.poll_interval)?; + project_fs + .start_watching_with_invalidation_reason(watch.poll_interval) + .await?; } else { project_fs.invalidate_with_reason(); } diff --git a/turbopack/crates/node-file-trace/src/lib.rs b/turbopack/crates/node-file-trace/src/lib.rs index d36c75ae8b794..197248c429954 100644 --- a/turbopack/crates/node-file-trace/src/lib.rs +++ b/turbopack/crates/node-file-trace/src/lib.rs @@ -190,7 +190,7 @@ impl Args { async fn create_fs(name: &str, root: &str, watch: bool) -> Result>> { let fs = DiskFileSystem::new(name.into(), root.into(), vec![]); if watch { - fs.await?.start_watching(None)?; + fs.await?.start_watching(None).await?; } else { fs.await?.invalidate_with_reason(); } diff --git a/turbopack/crates/turbo-tasks-fs/examples/hash_directory.rs b/turbopack/crates/turbo-tasks-fs/examples/hash_directory.rs index 505931e29c03a..a1728fd9fadaa 100644 --- a/turbopack/crates/turbo-tasks-fs/examples/hash_directory.rs +++ b/turbopack/crates/turbo-tasks-fs/examples/hash_directory.rs @@ -32,7 +32,7 @@ async fn main() -> Result<()> { Box::pin(async { let root = current_dir().unwrap().to_str().unwrap().into(); let disk_fs = DiskFileSystem::new("project".into(), root, vec![]); - disk_fs.await?.start_watching(None)?; + disk_fs.await?.start_watching(None).await?; // Smart Pointer cast let fs: Vc> = Vc::upcast(disk_fs); diff --git a/turbopack/crates/turbo-tasks-fs/examples/hash_glob.rs b/turbopack/crates/turbo-tasks-fs/examples/hash_glob.rs index 1bdc1029e1f03..b5102fd80903a 100644 --- a/turbopack/crates/turbo-tasks-fs/examples/hash_glob.rs +++ b/turbopack/crates/turbo-tasks-fs/examples/hash_glob.rs @@ -29,7 +29,7 @@ async fn main() -> Result<()> { Box::pin(async { let root = current_dir().unwrap().to_str().unwrap().into(); let disk_fs = DiskFileSystem::new("project".into(), root, vec![]); - disk_fs.await?.start_watching(None)?; + disk_fs.await?.start_watching(None).await?; // Smart Pointer cast let fs: Vc> = Vc::upcast(disk_fs); diff --git a/turbopack/crates/turbo-tasks-fs/src/embed/dir.rs b/turbopack/crates/turbo-tasks-fs/src/embed/dir.rs index 3dea50975096d..0a834c682a103 100644 --- a/turbopack/crates/turbo-tasks-fs/src/embed/dir.rs +++ b/turbopack/crates/turbo-tasks-fs/src/embed/dir.rs @@ -12,7 +12,7 @@ pub async fn directory_from_relative_path( path: RcStr, ) -> Result>> { let disk_fs = DiskFileSystem::new(name, path, vec![]); - disk_fs.await?.start_watching(None)?; + disk_fs.await?.start_watching(None).await?; Ok(Vc::upcast(disk_fs)) } diff --git a/turbopack/crates/turbo-tasks-fs/src/embed/file.rs b/turbopack/crates/turbo-tasks-fs/src/embed/file.rs index d5161254f1d1c..84b806b524cab 100644 --- a/turbopack/crates/turbo-tasks-fs/src/embed/file.rs +++ b/turbopack/crates/turbo-tasks-fs/src/embed/file.rs @@ -23,7 +23,7 @@ pub async fn content_from_relative_path( root_path.to_string_lossy().into(), vec![], ); - disk_fs.await?.start_watching(None)?; + disk_fs.await?.start_watching(None).await?; let fs_path = disk_fs.root().join(path.into()); Ok(fs_path.read()) diff --git a/turbopack/crates/turbo-tasks-fs/src/invalidator_map.rs b/turbopack/crates/turbo-tasks-fs/src/invalidator_map.rs index 7bb6b0d734ecf..7fd36737a05b9 100644 --- a/turbopack/crates/turbo-tasks-fs/src/invalidator_map.rs +++ b/turbopack/crates/turbo-tasks-fs/src/invalidator_map.rs @@ -12,13 +12,19 @@ pub struct InvalidatorMap { map: Mutex>>, } -impl InvalidatorMap { - pub fn new() -> Self { +impl Default for InvalidatorMap { + fn default() -> Self { Self { queue: ConcurrentQueue::unbounded(), map: Default::default(), } } +} + +impl InvalidatorMap { + pub fn new() -> Self { + Self::default() + } pub fn lock(&self) -> LockResult>>> { let mut guard = self.map.lock()?; diff --git a/turbopack/crates/turbo-tasks-fs/src/lib.rs b/turbopack/crates/turbo-tasks-fs/src/lib.rs index d08b88c31beda..1202fe1a80db2 100644 --- a/turbopack/crates/turbo-tasks-fs/src/lib.rs +++ b/turbopack/crates/turbo-tasks-fs/src/lib.rs @@ -58,8 +58,8 @@ use tokio::{ }; use tracing::Instrument; use turbo_tasks::{ - mark_session_dependent, trace::TraceRawVcs, Completion, Invalidator, RcStr, ReadRef, - ResolvedVc, ValueToString, Vc, + mark_session_dependent, mark_stateful, trace::TraceRawVcs, Completion, Invalidator, RcStr, + ReadRef, ResolvedVc, ValueToString, Vc, }; use turbo_tasks_hash::{ hash_xxh3_hash128, hash_xxh3_hash64, DeterministicHash, DeterministicHasher, @@ -203,8 +203,10 @@ pub struct DiskFileSystem { #[serde(skip)] mutex_map: MutexMap, #[turbo_tasks(debug_ignore, trace_ignore)] + #[serde(skip)] invalidator_map: Arc, #[turbo_tasks(debug_ignore, trace_ignore)] + #[serde(skip)] dir_invalidator_map: Arc, /// Lock that makes invalidation atomic. It will keep a write lock during /// watcher invalidation and a read lock during other operations. @@ -305,27 +307,37 @@ impl DiskFileSystem { }); } - pub fn start_watching(&self, poll_interval: Option) -> Result<()> { - self.start_watching_internal(false, poll_interval) + pub async fn start_watching(&self, poll_interval: Option) -> Result<()> { + self.start_watching_internal(false, poll_interval).await } - pub fn start_watching_with_invalidation_reason( + pub async fn start_watching_with_invalidation_reason( &self, poll_interval: Option, ) -> Result<()> { - self.start_watching_internal(true, poll_interval) + self.start_watching_internal(true, poll_interval).await } - fn start_watching_internal( + #[tracing::instrument(level = "info", name = "start filesystem watching", skip_all, fields(path = %self.root))] + async fn start_watching_internal( &self, report_invalidation_reason: bool, poll_interval: Option, ) -> Result<()> { - let _span = tracing::info_span!("start filesystem watching", path = &*self.root).entered(); let invalidator_map = self.invalidator_map.clone(); let dir_invalidator_map = self.dir_invalidator_map.clone(); let root_path = self.root_path().to_path_buf(); + // create the directory for the filesystem on disk, if it doesn't exist + retry_future(|| { + let path = root_path.as_path(); + fs::create_dir_all(&root_path).instrument(tracing::info_span!( + "create root directory", + path = display(path.display()) + )) + }) + .await?; + let report_invalidation_reason = report_invalidation_reason.then(|| (self.name.clone(), root_path.clone())); let invalidation_lock = self.invalidation_lock.clone(); @@ -415,9 +427,7 @@ impl DiskFileSystem { /// ignore specific subpaths from each. #[turbo_tasks::function] pub async fn new(name: RcStr, root: RcStr, ignored_subpaths: Vec) -> Result> { - mark_session_dependent(); - // create the directory for the filesystem on disk, if it doesn't exist - fs::create_dir_all(&root).await?; + mark_stateful(); let instance = DiskFileSystem { name, diff --git a/turbopack/crates/turbopack-cli/src/util.rs b/turbopack/crates/turbopack-cli/src/util.rs index 5841b061f177c..69ebdf0da9d4f 100644 --- a/turbopack/crates/turbopack-cli/src/util.rs +++ b/turbopack/crates/turbopack-cli/src/util.rs @@ -62,13 +62,13 @@ pub fn normalize_entries(entries: &Option>) -> Vec { #[turbo_tasks::function] pub async fn project_fs(project_dir: RcStr) -> Result>> { let disk_fs = DiskFileSystem::new("project".into(), project_dir, vec![]); - disk_fs.await?.start_watching(None)?; + disk_fs.await?.start_watching(None).await?; Ok(Vc::upcast(disk_fs)) } #[turbo_tasks::function] pub async fn output_fs(project_dir: RcStr) -> Result>> { let disk_fs = DiskFileSystem::new("output".into(), project_dir, vec![]); - disk_fs.await?.start_watching(None)?; + disk_fs.await?.start_watching(None).await?; Ok(Vc::upcast(disk_fs)) } diff --git a/turbopack/crates/turbopack/examples/turbopack.rs b/turbopack/crates/turbopack/examples/turbopack.rs index ea2910af93717..81d1b12bac23c 100644 --- a/turbopack/crates/turbopack/examples/turbopack.rs +++ b/turbopack/crates/turbopack/examples/turbopack.rs @@ -34,7 +34,7 @@ async fn main() -> Result<()> { Box::pin(async { let root: RcStr = current_dir().unwrap().to_str().unwrap().into(); let disk_fs = DiskFileSystem::new(PROJECT_FILESYSTEM_NAME.into(), root, vec![]); - disk_fs.await?.start_watching(None)?; + disk_fs.await?.start_watching(None).await?; // Smart Pointer cast let fs: Vc> = Vc::upcast(disk_fs); From 2e0353a1d051ae4426a5b9398b859d80eab33c28 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Wed, 9 Oct 2024 10:55:00 +0200 Subject: [PATCH 04/14] fix race condition when scheduling dirty tasks --- .../crates/turbo-tasks-backend/src/backend/mod.rs | 10 ++-------- .../src/backend/operation/aggregation_update.rs | 9 ++++++--- turbopack/crates/turbo-tasks-backend/src/data.rs | 6 ++++++ 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 130f60552195d..5410a1bd12a1f 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -353,14 +353,8 @@ impl TurboTasksBackendInner { task = ctx.task(task_id, TaskDataCategory::All); } - let is_dirty = get!(task, Dirty) - .map(|dirty_state| { - dirty_state - .clean_in_session - .map(|clean_in_session| clean_in_session != self.session_id) - .unwrap_or(true) - }) - .unwrap_or(false); + let is_dirty = + get!(task, Dirty).map_or(false, |dirty_state| dirty_state.get(self.session_id)); // Check the dirty count of the root node let dirty_tasks = get!(task, AggregatedDirtyContainerCount) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index f7ab5c29cca9d..b2f5b8e75a002 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -198,6 +198,7 @@ impl AggregatedDataUpdate { task_ids: vec![*dirty_container_id], }) } + let mut aggregated_update = Default::default(); update!( task, @@ -581,20 +582,22 @@ impl AggregationUpdateQueue { } if let Some(task_id) = popped { let mut task = ctx.task(task_id, TaskDataCategory::Meta); - #[allow(clippy::collapsible_if, reason = "readablility")] - if task.has_key(&CachedDataItemKey::Dirty {}) { + let session_id = ctx.session_id(); + let dirty = get!(task, Dirty).map_or(false, |d| d.get(session_id)); + if dirty { let description = ctx.backend.get_task_desc_fn(task_id); if task.add(CachedDataItem::new_scheduled(description)) { ctx.turbo_tasks.schedule(task_id); } } if is_aggregating_node(get_aggregation_number(&task)) { + // TODO if it has an `AggregateRoot` we can skip visiting the nested nodes since + // this would already be scheduled by the `AggregateRoot` if !task.has_key(&CachedDataItemKey::AggregateRoot {}) { task.insert(CachedDataItem::AggregateRoot { value: RootState::new(ActiveType::CachedActiveUntilClean, task_id), }); } - let session_id = ctx.session_id(); let dirty_containers: Vec<_> = get_many!(task, AggregatedDirtyContainer { task } count if count.get(session_id) > 0 => task); if !dirty_containers.is_empty() { self.push(AggregationUpdateJob::FindAndScheduleDirty { diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index 2bb7691d98d89..d3d3ed27fb834 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -90,6 +90,12 @@ pub struct DirtyState { pub clean_in_session: Option, } +impl DirtyState { + pub fn get(&self, session: SessionId) -> bool { + self.clean_in_session != Some(session) + } +} + fn add_with_diff(v: &mut i32, u: i32) -> i32 { let old = *v; *v += u; From 3a651f775494dd9f6fb2edf6423b54216f555f9d Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Wed, 9 Oct 2024 12:07:35 +0200 Subject: [PATCH 05/14] fix tracing counts for sharded lists --- .../src/lmdb_backing_storage.rs | 42 ++++++++++++------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs index 0255c1ced3377..964f7db062913 100644 --- a/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs @@ -158,7 +158,7 @@ impl BackingStorage for LmdbBackingStorage { get(self).unwrap_or_default() } - #[tracing::instrument(level = "trace", skip_all, fields(operations = operations.len(), task_cache_updates = task_cache_updates.len(), data_updates = data_updates.len()))] + #[tracing::instrument(level = "trace", skip_all, fields(operations = operations.len()))] fn save_snapshot( &self, session_id: SessionId, @@ -184,15 +184,19 @@ impl BackingStorage for LmdbBackingStorage { // Start organizing the updates in parallel s.spawn(|_| { let task_meta_updates = { - let _span = - tracing::trace_span!("organize task meta", updates = meta_updates.len()) - .entered(); + let _span = tracing::trace_span!( + "organize task meta", + updates = meta_updates.iter().map(|m| m.len()).sum::() + ) + .entered(); organize_task_data(meta_updates) }; let items_result = { - let _span = - tracing::trace_span!("restore task meta", tasks = task_meta_updates.len()) - .entered(); + let _span = tracing::trace_span!( + "restore task meta", + tasks = task_meta_updates.iter().map(|m| m.len()).sum::() + ) + .entered(); restore_task_data(self, self.meta_db, task_meta_updates) }; task_meta_items_result = items_result.and_then(|items| { @@ -202,15 +206,19 @@ impl BackingStorage for LmdbBackingStorage { }); s.spawn(|_| { let task_data_updates = { - let _span = - tracing::trace_span!("organize task data", updates = data_updates.len()) - .entered(); + let _span = tracing::trace_span!( + "organize task data", + updates = data_updates.iter().map(|m| m.len()).sum::() + ) + .entered(); organize_task_data(data_updates) }; let items_result = { - let _span = - tracing::trace_span!("restore task data", tasks = task_data_updates.len()) - .entered(); + let _span = tracing::trace_span!( + "restore task data", + tasks = task_data_updates.iter().map(|m| m.len()).sum::() + ) + .entered(); restore_task_data(self, self.data_db, task_data_updates) }; task_data_items_result = items_result.and_then(|items| { @@ -235,9 +243,11 @@ impl BackingStorage for LmdbBackingStorage { as_u32(tx.get(self.infra_db, &IntKey::new(META_KEY_NEXT_FREE_TASK_ID))) .unwrap_or(1); { - let _span = - tracing::trace_span!("update task cache", items = task_cache_updates.len()) - .entered(); + let _span = tracing::trace_span!( + "update task cache", + items = task_cache_updates.iter().map(|m| m.len()).sum::() + ) + .entered(); for (task_type, task_id) in task_cache_updates.into_iter().flatten() { let task_id = *task_id; let task_type_bytes = pot::to_vec(&*task_type).with_context(|| { From 548f3914e443c864e4762c75499ffdcae4964fb7 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 10 Oct 2024 08:42:31 +0200 Subject: [PATCH 06/14] remove outdated edges before removing in_progress schedule tasks when adding AggregatedRoot for strongly consistent read --- .../turbo-tasks-backend/src/backend/mod.rs | 365 ++++++++++-------- .../backend/operation/cleanup_old_edges.rs | 12 +- 2 files changed, 206 insertions(+), 171 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 5410a1bd12a1f..10178751ff166 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -41,7 +41,7 @@ use crate::{ operation::{ get_aggregation_number, is_root_node, AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue, CleanupOldEdgesOperation, ConnectChildOperation, - ExecuteContext, OutdatedEdge, + ExecuteContext, Operation, OutdatedEdge, }, storage::{get, get_many, get_mut, iter_many, remove, Storage}, }, @@ -363,6 +363,7 @@ impl TurboTasksBackendInner { .get(self.session_id); if dirty_tasks > 0 || is_dirty { let root = get!(task, AggregateRoot); + let mut task_ids_to_schedule: Vec<_> = Vec::new(); // When there are dirty task, subscribe to the all_clean_event let root = if let Some(root) = root { root @@ -373,6 +374,11 @@ impl TurboTasksBackendInner { task.add_new(CachedDataItem::AggregateRoot { value: RootState::new(ActiveType::CachedActiveUntilClean, task_id), }); + // A newly added AggregateRoot need to make sure to schedule the tasks + task_ids_to_schedule = get_many!(task, AggregatedDirtyContainer { task } count if count.get(self.session_id) > 0 => task); + if is_dirty { + task_ids_to_schedule.push(task_id); + } get!(task, AggregateRoot).unwrap() }; let listener = root.all_clean_event.listen_with_note(move || { @@ -381,6 +387,15 @@ impl TurboTasksBackendInner { reader ) }); + drop(task); + if !task_ids_to_schedule.is_empty() { + let mut queue = AggregationUpdateQueue::new(); + queue.push(AggregationUpdateJob::FindAndScheduleDirty { + task_ids: task_ids_to_schedule, + }); + queue.execute(&mut ctx); + } + return Ok(Err(listener)); } } @@ -1036,205 +1051,231 @@ impl TurboTasksBackendInner { ) -> bool { let mut ctx = self.execute_context(turbo_tasks); let mut task = ctx.task(task_id, TaskDataCategory::All); - let Some(CachedDataItemValue::InProgress { value: in_progress }) = - task.remove(&CachedDataItemKey::InProgress {}) - else { + let Some(in_progress) = get!(task, InProgress) else { panic!("Task execution completed, but task is not in progress: {task:#?}"); }; - let InProgressState::InProgress { - done_event, - once_task: _, - stale, - session_dependent, - } = in_progress - else { + let &InProgressState::InProgress { stale, .. } = in_progress else { panic!("Task execution completed, but task is not in progress: {task:#?}"); }; - // TODO handle stateful - let _ = stateful; - + // If the task is stale, reschedule it if stale { + let Some(InProgressState::InProgress { done_event, .. }) = remove!(task, InProgress) + else { + unreachable!(); + }; task.add_new(CachedDataItem::InProgress { value: InProgressState::Scheduled { done_event }, }); - drop(task); - drop(ctx); - } else { - // handle cell counters: update max index and remove cells that are no longer used - let mut removed_cells = HashMap::new(); - let mut old_counters: HashMap<_, _> = - get_many!(task, CellTypeMaxIndex { cell_type } max_index => (cell_type, max_index)); - for (&cell_type, &max_index) in cell_counters.iter() { - if let Some(old_max_index) = old_counters.remove(&cell_type) { - if old_max_index != max_index { - task.insert(CachedDataItem::CellTypeMaxIndex { - cell_type, - value: max_index, - }); - if old_max_index > max_index { - removed_cells.insert(cell_type, max_index + 1..=old_max_index); - } - } - } else { - task.add_new(CachedDataItem::CellTypeMaxIndex { + return true; + } + + // TODO handle stateful + let _ = stateful; + + // handle cell counters: update max index and remove cells that are no longer used + let mut removed_cells = HashMap::new(); + let mut old_counters: HashMap<_, _> = + get_many!(task, CellTypeMaxIndex { cell_type } max_index => (cell_type, max_index)); + for (&cell_type, &max_index) in cell_counters.iter() { + if let Some(old_max_index) = old_counters.remove(&cell_type) { + if old_max_index != max_index { + task.insert(CachedDataItem::CellTypeMaxIndex { cell_type, value: max_index, }); + if old_max_index > max_index { + removed_cells.insert(cell_type, max_index + 1..=old_max_index); + } } + } else { + task.add_new(CachedDataItem::CellTypeMaxIndex { + cell_type, + value: max_index, + }); } - for (cell_type, old_max_index) in old_counters { - task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type }); - removed_cells.insert(cell_type, 0..=old_max_index); - } - let mut removed_data = Vec::new(); - for (&cell_type, range) in removed_cells.iter() { - for index in range.clone() { - removed_data.extend( - task.remove(&CachedDataItemKey::CellData { - cell: CellId { - type_id: cell_type, - index, - }, - }) - .into_iter(), - ); - } + } + for (cell_type, old_max_index) in old_counters { + task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type }); + removed_cells.insert(cell_type, 0..=old_max_index); + } + let mut removed_data = Vec::new(); + for (&cell_type, range) in removed_cells.iter() { + for index in range.clone() { + removed_data.extend( + task.remove(&CachedDataItemKey::CellData { + cell: CellId { + type_id: cell_type, + index, + }, + }) + .into_iter(), + ); } + } - // find all outdated data items (removed cells, outdated edges) - let old_edges = if task.is_indexed() { - task.iter(CachedDataItemIndex::Children) - .filter_map(|(key, _)| match *key { - CachedDataItemKey::OutdatedChild { task } => { - Some(OutdatedEdge::Child(task)) - } - _ => None, - }) - .chain( - task.iter(CachedDataItemIndex::Dependencies) - .filter_map(|(key, _)| match *key { - CachedDataItemKey::OutdatedCellDependency { target } => { - Some(OutdatedEdge::CellDependency(target)) - } - CachedDataItemKey::OutdatedOutputDependency { target } => { - Some(OutdatedEdge::OutputDependency(target)) - } - _ => None, - }), - ) - .chain( - task.iter(CachedDataItemIndex::CellDependent) - .filter_map(|(key, _)| match *key { - CachedDataItemKey::CellDependent { cell, task } - if removed_cells - .get(&cell.type_id) - .map_or(false, |range| range.contains(&cell.index)) => - { - Some(OutdatedEdge::RemovedCellDependent(task)) - } - _ => None, - }), - ) - .collect::>() - } else { - task.iter_all() - .filter_map(|(key, value)| match *key { - CachedDataItemKey::OutdatedChild { task } => { - Some(OutdatedEdge::Child(task)) - } - CachedDataItemKey::OutdatedCollectible { collectible } => { - let CachedDataItemValue::OutdatedCollectible { value } = *value else { - unreachable!(); - }; - Some(OutdatedEdge::Collectible(collectible, value)) - } + // find all outdated data items (removed cells, outdated edges) + let old_edges = if task.is_indexed() { + task.iter(CachedDataItemIndex::Children) + .filter_map(|(key, _)| match *key { + CachedDataItemKey::OutdatedChild { task } => Some(OutdatedEdge::Child(task)), + _ => None, + }) + .chain(task.iter(CachedDataItemIndex::Dependencies).filter_map( + |(key, _)| match *key { CachedDataItemKey::OutdatedCellDependency { target } => { Some(OutdatedEdge::CellDependency(target)) } CachedDataItemKey::OutdatedOutputDependency { target } => { Some(OutdatedEdge::OutputDependency(target)) } - CachedDataItemKey::OutdatedCollectiblesDependency { target } => { - Some(OutdatedEdge::CollectiblesDependency(target)) - } - CachedDataItemKey::CellDependent { cell, task } - if removed_cells - .get(&cell.type_id) - .map_or(false, |range| range.contains(&cell.index)) => - { - Some(OutdatedEdge::RemovedCellDependent(task)) - } _ => None, - }) - .collect::>() - }; - - let new_dirty_state = if session_dependent { - Some(DirtyState { - clean_in_session: Some(self.session_id), + }, + )) + .chain( + task.iter(CachedDataItemIndex::CellDependent).filter_map( + |(key, _)| match *key { + CachedDataItemKey::CellDependent { cell, task } + if removed_cells + .get(&cell.type_id) + .map_or(false, |range| range.contains(&cell.index)) => + { + Some(OutdatedEdge::RemovedCellDependent(task)) + } + _ => None, + }, + ), + ) + .collect::>() + } else { + task.iter_all() + .filter_map(|(key, value)| match *key { + CachedDataItemKey::OutdatedChild { task } => Some(OutdatedEdge::Child(task)), + CachedDataItemKey::OutdatedCollectible { collectible } => { + let CachedDataItemValue::OutdatedCollectible { value } = *value else { + unreachable!(); + }; + Some(OutdatedEdge::Collectible(collectible, value)) + } + CachedDataItemKey::OutdatedCellDependency { target } => { + Some(OutdatedEdge::CellDependency(target)) + } + CachedDataItemKey::OutdatedOutputDependency { target } => { + Some(OutdatedEdge::OutputDependency(target)) + } + CachedDataItemKey::OutdatedCollectiblesDependency { target } => { + Some(OutdatedEdge::CollectiblesDependency(target)) + } + CachedDataItemKey::CellDependent { cell, task } + if removed_cells + .get(&cell.type_id) + .map_or(false, |range| range.contains(&cell.index)) => + { + Some(OutdatedEdge::RemovedCellDependent(task)) + } + _ => None, }) - } else { - None - }; + .collect::>() + }; + drop(task); - let old_dirty = if let Some(new_dirty_state) = new_dirty_state { - task.insert(CachedDataItem::Dirty { - value: new_dirty_state, - }) - } else { - task.remove(&CachedDataItemKey::Dirty {}) - }; + // Remove outdated edges first, before removing in_progress+dirty flag. + // We need to make sure all outdated edges are removed before the task can potentially be + // scheduled and executed again + CleanupOldEdgesOperation::run(task_id, old_edges, &mut ctx); - let old_dirty_state = old_dirty.map(|old_dirty| match old_dirty { - CachedDataItemValue::Dirty { value } => value, - _ => unreachable!(), + // When restoring from persistent caching the following might not be executed (since we can + // suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and + // would be executed again. + + let mut task = ctx.task(task_id, TaskDataCategory::All); + let Some(in_progress) = remove!(task, InProgress) else { + panic!("Task execution completed, but task is not in progress: {task:#?}"); + }; + let InProgressState::InProgress { + done_event, + once_task: _, + stale: _, + session_dependent, + } = in_progress + else { + panic!("Task execution completed, but task is not in progress: {task:#?}"); + }; + + // If the task is stale, reschedule it + if stale { + task.add_new(CachedDataItem::InProgress { + value: InProgressState::Scheduled { done_event }, }); + return true; + } - let data_update = if old_dirty_state.is_some() || new_dirty_state.is_some() { - let mut dirty_containers = get!(task, AggregatedDirtyContainerCount) - .copied() - .unwrap_or_default(); - if let Some(old_dirty_state) = old_dirty_state { - dirty_containers.update_with_dirty_state(&old_dirty_state); - } - let aggregated_update = match (old_dirty_state, new_dirty_state) { - (None, None) => unreachable!(), - (Some(old), None) => dirty_containers.undo_update_with_dirty_state(&old), - (None, Some(new)) => dirty_containers.update_with_dirty_state(&new), - (Some(old), Some(new)) => dirty_containers.replace_dirty_state(&old, &new), - }; - if !aggregated_update.is_default() { - if aggregated_update.get(self.session_id) < 0 { - if let Some(root_state) = get!(task, AggregateRoot) { - root_state.all_clean_event.notify(usize::MAX); - if matches!(root_state.ty, ActiveType::CachedActiveUntilClean) { - task.remove(&CachedDataItemKey::AggregateRoot {}); - } + // Update the dirty state + let new_dirty_state = if session_dependent { + Some(DirtyState { + clean_in_session: Some(self.session_id), + }) + } else { + None + }; + + let old_dirty = if let Some(new_dirty_state) = new_dirty_state { + task.insert(CachedDataItem::Dirty { + value: new_dirty_state, + }) + } else { + task.remove(&CachedDataItemKey::Dirty {}) + }; + + let old_dirty_state = old_dirty.map(|old_dirty| match old_dirty { + CachedDataItemValue::Dirty { value } => value, + _ => unreachable!(), + }); + + let data_update = if old_dirty_state.is_some() || new_dirty_state.is_some() { + let mut dirty_containers = get!(task, AggregatedDirtyContainerCount) + .copied() + .unwrap_or_default(); + if let Some(old_dirty_state) = old_dirty_state { + dirty_containers.update_with_dirty_state(&old_dirty_state); + } + let aggregated_update = match (old_dirty_state, new_dirty_state) { + (None, None) => unreachable!(), + (Some(old), None) => dirty_containers.undo_update_with_dirty_state(&old), + (None, Some(new)) => dirty_containers.update_with_dirty_state(&new), + (Some(old), Some(new)) => dirty_containers.replace_dirty_state(&old, &new), + }; + if !aggregated_update.is_default() { + if aggregated_update.get(self.session_id) < 0 { + if let Some(root_state) = get!(task, AggregateRoot) { + root_state.all_clean_event.notify(usize::MAX); + if matches!(root_state.ty, ActiveType::CachedActiveUntilClean) { + task.remove(&CachedDataItemKey::AggregateRoot {}); } } - AggregationUpdateJob::data_update( - &mut task, - AggregatedDataUpdate::new() - .dirty_container_update(task_id, aggregated_update), - ) - } else { - None } + AggregationUpdateJob::data_update( + &mut task, + AggregatedDataUpdate::new().dirty_container_update(task_id, aggregated_update), + ) } else { None - }; - - drop(task); + } + } else { + None + }; - done_event.notify(usize::MAX); + drop(task); - CleanupOldEdgesOperation::run(task_id, old_edges, data_update, ctx); + done_event.notify(usize::MAX); - drop(removed_data) + if let Some(data_update) = data_update { + AggregationUpdateQueue::run(data_update, &mut ctx); } - stale + drop(removed_data); + + false } fn run_backend_job<'a>( diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs index a92e687006215..2e5e29b087d2c 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs @@ -45,20 +45,14 @@ pub enum OutdatedEdge { } impl CleanupOldEdgesOperation { - pub fn run( - task_id: TaskId, - outdated: Vec, - data_update: Option, - mut ctx: ExecuteContext<'_>, - ) { - let mut queue = AggregationUpdateQueue::new(); - queue.extend(data_update); + pub fn run(task_id: TaskId, outdated: Vec, ctx: &mut ExecuteContext<'_>) { + let queue = AggregationUpdateQueue::new(); CleanupOldEdgesOperation::RemoveEdges { task_id, outdated, queue, } - .execute(&mut ctx); + .execute(ctx); } } From d956e841c6e9a33762982eb1d2a210175647cc8d Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 10 Oct 2024 09:39:49 +0200 Subject: [PATCH 07/14] remove extra scheduling --- .../src/backend/operation/connect_child.rs | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs index 626698d7e9145..8321cfb3a25ce 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs @@ -38,12 +38,6 @@ impl ConnectChildOperation { task: child_task_id, value: (), }) { - // When task is added to a AggregateRoot is need to be scheduled, - // indirect connections are handled by the aggregation update. - let mut should_schedule = false; - if parent_task.has_key(&CachedDataItemKey::AggregateRoot {}) { - should_schedule = true; - } // Update the task aggregation let mut queue = AggregationUpdateQueue::new(); @@ -108,18 +102,6 @@ impl ConnectChildOperation { } drop(parent_task); - { - let mut task = ctx.task(child_task_id, TaskDataCategory::Data); - should_schedule = should_schedule || !task.has_key(&CachedDataItemKey::Output {}); - if should_schedule { - let description = ctx.backend.get_task_desc_fn(child_task_id); - should_schedule = task.add(CachedDataItem::new_scheduled(description)); - } - } - if should_schedule { - ctx.schedule(child_task_id); - } - ConnectChildOperation::UpdateAggregation { aggregation_update: queue, } From 15aeb60e0120c0fc56e6a872386a87167b338646 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 10 Oct 2024 09:40:43 +0200 Subject: [PATCH 08/14] schedule task without output --- .../src/backend/operation/aggregation_update.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index b2f5b8e75a002..290bf7d0f8c70 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -583,8 +583,10 @@ impl AggregationUpdateQueue { if let Some(task_id) = popped { let mut task = ctx.task(task_id, TaskDataCategory::Meta); let session_id = ctx.session_id(); + // Task need to be scheduled if it's dirty or doesn't have output let dirty = get!(task, Dirty).map_or(false, |d| d.get(session_id)); - if dirty { + let should_schedule = dirty || !task.has_key(&CachedDataItemKey::Output {}); + if should_schedule { let description = ctx.backend.get_task_desc_fn(task_id); if task.add(CachedDataItem::new_scheduled(description)) { ctx.turbo_tasks.schedule(task_id); From a7ba338d2b18d32739b7a5a680b118868b994af1 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 10 Oct 2024 11:04:09 +0200 Subject: [PATCH 09/14] schedule tasks when they are added as inner to a aggregate root --- .../backend/operation/aggregation_update.rs | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index 290bf7d0f8c70..bcfb7219e1cda 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -516,6 +516,14 @@ impl AggregationUpdateQueue { new_follower_ids: followers, }); } + + if upper.has_key(&CachedDataItemKey::AggregateRoot {}) { + // If the upper node is an `AggregateRoot` we need to schedule the + // dirty tasks in the new dirty container + self.push(AggregationUpdateJob::FindAndScheduleDirty { + task_ids: vec![task_id], + }); + } } // notify uppers about lost follower @@ -716,6 +724,7 @@ impl AggregationUpdateQueue { get_aggregation_number(&follower) }; let mut upper_ids_as_follower = Vec::new(); + let mut is_aggregate_root = false; upper_ids.retain(|&upper_id| { let upper = ctx.task(upper_id, TaskDataCategory::Meta); // decide if it should be an inner or follower @@ -729,6 +738,9 @@ impl AggregationUpdateQueue { false } else { // It's an inner node, continue with the list + if upper.has_key(&CachedDataItemKey::AggregateRoot {}) { + is_aggregate_root = true; + } true } }); @@ -782,6 +794,11 @@ impl AggregationUpdateQueue { 1 ) }); + if is_aggregate_root { + self.push(AggregationUpdateJob::FindAndScheduleDirty { + task_ids: vec![new_follower_id], + }); + } if !upper_ids_as_follower.is_empty() { self.push(AggregationUpdateJob::InnerOfUppersHasNewFollower { upper_ids: upper_ids_as_follower, @@ -805,8 +822,10 @@ impl AggregationUpdateQueue { .collect::>(); let mut followers_of_upper = Vec::new(); + let is_aggregate_root; { let upper = ctx.task(upper_id, TaskDataCategory::Meta); + is_aggregate_root = upper.has_key(&CachedDataItemKey::AggregateRoot {}); // decide if it should be an inner or follower let upper_aggregation_number = get_aggregation_number(&upper); @@ -828,7 +847,7 @@ impl AggregationUpdateQueue { let mut upper_data_updates = Vec::new(); let mut upper_new_followers = Vec::new(); - for (follower_id, _) in followers_with_aggregation_number { + for &(follower_id, _) in followers_with_aggregation_number.iter() { let mut follower = ctx.task(follower_id, TaskDataCategory::Meta); if update_count!(follower, Upper { task: upper_id }, 1) { // It's a new upper @@ -876,6 +895,14 @@ impl AggregationUpdateQueue { }); } } + if is_aggregate_root { + self.push(AggregationUpdateJob::FindAndScheduleDirty { + task_ids: followers_with_aggregation_number + .into_iter() + .map(|(id, _)| id) + .collect(), + }); + } if !followers_of_upper.is_empty() { let mut upper = ctx.task(upper_id, TaskDataCategory::Meta); followers_of_upper From 39e715f61609813658af94b0c04e058ad1ac8bb0 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 10 Oct 2024 11:34:49 +0200 Subject: [PATCH 10/14] place output in meta data --- turbopack/crates/turbo-tasks-backend/src/data.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index d3d3ed27fb834..c45f947fe40dd 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -513,8 +513,7 @@ impl CachedDataItemKey { pub fn category(&self) -> TaskDataCategory { match self { - CachedDataItemKey::Output { .. } - | CachedDataItemKey::Collectible { .. } + CachedDataItemKey::Collectible { .. } | CachedDataItemKey::Child { .. } | CachedDataItemKey::CellData { .. } | CachedDataItemKey::CellTypeMaxIndex { .. } @@ -533,7 +532,8 @@ impl CachedDataItemKey { | CachedDataItemKey::OutdatedChild { .. } | CachedDataItemKey::Error { .. } => TaskDataCategory::Data, - CachedDataItemKey::AggregationNumber { .. } + CachedDataItemKey::Output { .. } + | CachedDataItemKey::AggregationNumber { .. } | CachedDataItemKey::Dirty { .. } | CachedDataItemKey::Follower { .. } | CachedDataItemKey::Upper { .. } From 3df73e391818e869b8fc096786a5e971c9474b54 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 10 Oct 2024 11:52:47 +0200 Subject: [PATCH 11/14] connecting tasks without output should schedule them --- .../src/backend/operation/connect_child.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs index 8321cfb3a25ce..f51f1db161337 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs @@ -102,6 +102,18 @@ impl ConnectChildOperation { } drop(parent_task); + { + let mut task = ctx.task(child_task_id, TaskDataCategory::Data); + if !task.has_key(&CachedDataItemKey::Output {}) { + let description = ctx.backend.get_task_desc_fn(child_task_id); + let should_schedule = task.add(CachedDataItem::new_scheduled(description)); + drop(task); + if should_schedule { + ctx.schedule(child_task_id); + } + } + } + ConnectChildOperation::UpdateAggregation { aggregation_update: queue, } From 3e3158a292ffa5cdbed66f0246d5c98ec3b0e294 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 24 Sep 2024 11:48:33 +0200 Subject: [PATCH 12/14] reuse outdated children --- .../src/backend/operation/connect_child.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs index f51f1db161337..b390785070878 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs @@ -31,9 +31,15 @@ pub enum ConnectChildOperation { impl ConnectChildOperation { pub fn run(parent_task_id: TaskId, child_task_id: TaskId, mut ctx: ExecuteContext<'_>) { let mut parent_task = ctx.task(parent_task_id, TaskDataCategory::All); - parent_task.remove(&CachedDataItemKey::OutdatedChild { - task: child_task_id, - }); + // Quick skip if the child was already connected before + if parent_task + .remove(&CachedDataItemKey::OutdatedChild { + task: child_task_id, + }) + .is_some() + { + return; + } if parent_task.add(CachedDataItem::Child { task: child_task_id, value: (), From d76c606141355322cfdccb55d5b2fa73327ae578 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 10 Oct 2024 20:36:20 +0200 Subject: [PATCH 13/14] mark read as session dependent --- turbopack/crates/turbo-tasks-fs/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/turbopack/crates/turbo-tasks-fs/src/lib.rs b/turbopack/crates/turbo-tasks-fs/src/lib.rs index 1202fe1a80db2..be153e3cebb5d 100644 --- a/turbopack/crates/turbo-tasks-fs/src/lib.rs +++ b/turbopack/crates/turbo-tasks-fs/src/lib.rs @@ -517,6 +517,7 @@ impl Debug for DiskFileSystem { impl FileSystem for DiskFileSystem { #[turbo_tasks::function(fs)] async fn read(&self, fs_path: Vc) -> Result> { + mark_session_dependent(); let full_path = self.to_sys_path(fs_path).await?; self.register_invalidator(&full_path)?; From 075ee01d5239605653889286b8b36315987707ce Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 10 Oct 2024 20:36:45 +0200 Subject: [PATCH 14/14] mark command line env as session dependent --- turbopack/crates/turbo-tasks-env/src/command_line.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/turbopack/crates/turbo-tasks-env/src/command_line.rs b/turbopack/crates/turbo-tasks-env/src/command_line.rs index e3ffa7e70765c..5d2977d3e7460 100644 --- a/turbopack/crates/turbo-tasks-env/src/command_line.rs +++ b/turbopack/crates/turbo-tasks-env/src/command_line.rs @@ -1,5 +1,5 @@ use indexmap::IndexMap; -use turbo_tasks::{RcStr, Vc}; +use turbo_tasks::{mark_session_dependent, RcStr, Vc}; use crate::{sorted_env_vars, EnvMap, ProcessEnv, GLOBAL_ENV_LOCK}; @@ -25,6 +25,7 @@ fn env_snapshot() -> IndexMap { impl ProcessEnv for CommandLineProcessEnv { #[turbo_tasks::function] fn read_all(&self) -> Vc { + mark_session_dependent(); Vc::cell(env_snapshot()) } }