From a654851c1b58c576f44a8ec6240ed5d8e1093af9 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Wed, 13 Nov 2024 13:32:57 +0100 Subject: [PATCH 1/3] fix hanging waiting on in progress cells --- .../turbo-tasks-auto-hash-map/src/lib.rs | 1 + .../turbo-tasks-auto-hash-map/src/map.rs | 80 ++++++++++++ .../turbo-tasks-backend/src/backend/mod.rs | 123 +++++++++++------- .../src/backend/operation/mod.rs | 65 +++++++++ .../src/backend/storage.rs | 40 ++++++ .../crates/turbo-tasks-backend/src/data.rs | 3 + 6 files changed, 266 insertions(+), 46 deletions(-) diff --git a/turbopack/crates/turbo-tasks-auto-hash-map/src/lib.rs b/turbopack/crates/turbo-tasks-auto-hash-map/src/lib.rs index 17361c57a72c8..e003b024b4f5c 100644 --- a/turbopack/crates/turbo-tasks-auto-hash-map/src/lib.rs +++ b/turbopack/crates/turbo-tasks-auto-hash-map/src/lib.rs @@ -1,4 +1,5 @@ #![feature(hash_raw_entry)] +#![feature(hash_extract_if)] pub mod map; pub mod set; diff --git a/turbopack/crates/turbo-tasks-auto-hash-map/src/map.rs b/turbopack/crates/turbo-tasks-auto-hash-map/src/map.rs index 03ff1982d0166..36f2e6d8aa31b 100644 --- a/turbopack/crates/turbo-tasks-auto-hash-map/src/map.rs +++ b/turbopack/crates/turbo-tasks-auto-hash-map/src/map.rs @@ -235,6 +235,16 @@ impl AutoMap(&'l mut self, f: F) -> ExtractIfIter<'l, K, V, I, F> + where + F: for<'a, 'b> FnMut(&'a K, &'b mut V) -> bool, + { + match self { + AutoMap::List(list) => ExtractIfIter::List { list, index: 0, f }, + AutoMap::Map(map) => ExtractIfIter::Map(map.extract_if(f)), + } + } + /// see [HashMap::shrink_to_fit](https://doc.rust-lang.org/std/collections/struct.HashMap.html#method.shrink_to_fit) pub fn shrink_to_fit(&mut self) { match self { @@ -843,6 +853,43 @@ where } } +pub enum ExtractIfIter<'l, K, V, const I: usize, F> +where + F: for<'a, 'b> FnMut(&'a K, &'b mut V) -> bool, +{ + List { + list: &'l mut SmallVec<[(K, V); I]>, + index: usize, + f: F, + }, + Map(std::collections::hash_map::ExtractIf<'l, K, V, F>), +} + +impl<'l, K, V, const I: usize, F> Iterator for ExtractIfIter<'l, K, V, I, F> +where + F: for<'a, 'b> FnMut(&'a K, &'b mut V) -> bool, +{ + type Item = (K, V); + + fn next(&mut self) -> Option { + match self { + ExtractIfIter::List { list, index, f } => { + while *index < list.len() { + let (key, value) = &mut list[*index]; + if f(key, value) { + let item = list.swap_remove(*index); + return Some(item); + } else { + *index += 1; + } + } + None + } + ExtractIfIter::Map(extract_if) => extract_if.next(), + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -863,4 +910,37 @@ mod tests { } assert_eq!(map.remove(&(MAX_LIST_SIZE * 2)), None); } + + #[test] + fn test_extract_if_map() { + let mut map = AutoMap::new(); + for i in 0..MAX_LIST_SIZE * 2 { + map.insert(i, i); + } + let iter = map.extract_if(|_, v| *v % 2 == 0); + assert_eq!(iter.count(), MAX_LIST_SIZE); + assert_eq!(map.len(), MAX_LIST_SIZE); + } + + #[test] + fn test_extract_if_list() { + let mut map = AutoMap::new(); + for i in 0..MIN_HASH_SIZE { + map.insert(i, i); + } + let iter = map.extract_if(|_, v| *v % 2 == 0); + assert_eq!(iter.count(), MIN_HASH_SIZE / 2); + assert_eq!(map.len(), MIN_HASH_SIZE / 2); + } + + #[test] + fn test_extract_if_list2() { + let mut map = AutoMap::new(); + for i in 0..MIN_HASH_SIZE { + map.insert(i, i); + } + let iter = map.extract_if(|_, v| *v < 5); + assert_eq!(iter.count(), 5); + assert_eq!(map.len(), MIN_HASH_SIZE - 5); + } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index e13c069d3eb6c..6fe5e6af479e2 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -604,7 +604,7 @@ impl TurboTasksBackendInner { ctx.get_task_description(task_id) ); }; - if cell.index > *max_id { + if cell.index >= *max_id { add_cell_dependency(self, task, reader, cell, task_id, &mut ctx); bail!( "Cell {cell:?} no longer exists in task {} (index out of bounds)", @@ -1228,7 +1228,6 @@ impl TurboTasksBackendInner { let _ = stateful; // handle cell counters: update max index and remove cells that are no longer used - let mut removed_cells = HashMap::new(); let mut old_counters: HashMap<_, _> = get_many!(task, CellTypeMaxIndex { cell_type } max_index => (*cell_type, *max_index)); for (&cell_type, &max_index) in cell_counters.iter() { @@ -1238,9 +1237,6 @@ impl TurboTasksBackendInner { cell_type, value: max_index, }); - if old_max_index > max_index { - removed_cells.insert(cell_type, max_index + 1..=old_max_index); - } } } else { task.add_new(CachedDataItem::CellTypeMaxIndex { @@ -1249,28 +1245,46 @@ impl TurboTasksBackendInner { }); } } - for (cell_type, old_max_index) in old_counters { + for (cell_type, _) in old_counters { task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type }); - removed_cells.insert(cell_type, 0..=old_max_index); } + let mut removed_data = Vec::new(); - for (&cell_type, range) in removed_cells.iter() { - for index in range.clone() { - removed_data.extend( - task.remove(&CachedDataItemKey::CellData { - cell: CellId { - type_id: cell_type, - index, - }, - }) - .into_iter(), - ); - } - } + let mut old_edges = Vec::new(); + // Remove no longer existing cells and notify in progress cells // find all outdated data items (removed cells, outdated edges) - let old_edges = if task.is_indexed() { - let mut old_edges = Vec::new(); + if task.is_indexed() { + removed_data.extend(task.extract_if( + CachedDataItemIndex::InProgressCell, + |key, value| { + match (key, value) { + ( + &CachedDataItemKey::InProgressCell { cell }, + CachedDataItemValue::InProgressCell { value }, + ) if cell_counters + .get(&cell.type_id) + .map_or(true, |start_index| cell.index >= *start_index) => + { + value.event.notify(usize::MAX); + true + } + _ => false, + } + }, + )); + removed_data.extend(task.extract_if(CachedDataItemIndex::CellData, |key, _| { + match key { + &CachedDataItemKey::CellData { cell } + if cell_counters + .get(&cell.type_id) + .map_or(true, |start_index| cell.index >= *start_index) => + { + true + } + _ => false, + } + })); if self.should_track_children() { old_edges.extend(task.iter(CachedDataItemIndex::Children).filter_map( |(key, _)| match *key { @@ -1306,9 +1320,9 @@ impl TurboTasksBackendInner { |(key, _)| { match *key { CachedDataItemKey::CellDependent { cell, task } - if removed_cells + if cell_counters .get(&cell.type_id) - .map_or(false, |range| range.contains(&cell.index)) => + .map_or(true, |start_index| cell.index >= *start_index) => { Some(OutdatedEdge::RemovedCellDependent(task, cell.type_id)) } @@ -1317,36 +1331,53 @@ impl TurboTasksBackendInner { }, )); } - old_edges } else { - task.iter_all() - .filter_map(|(key, value)| match *key { - CachedDataItemKey::OutdatedChild { task } => Some(OutdatedEdge::Child(task)), - CachedDataItemKey::OutdatedCollectible { collectible } => { - let CachedDataItemValue::OutdatedCollectible { value } = *value else { - unreachable!(); - }; - Some(OutdatedEdge::Collectible(collectible, value)) + removed_data.extend(task.extract_if_all(|key, value| { + match (key, value) { + ( + &CachedDataItemKey::InProgressCell { cell }, + CachedDataItemValue::InProgressCell { value }, + ) if cell_counters + .get(&cell.type_id) + .map_or(true, |start_index| cell.index >= *start_index) => + { + value.event.notify(usize::MAX); + return true; } - CachedDataItemKey::OutdatedCellDependency { target } => { - Some(OutdatedEdge::CellDependency(target)) + (&CachedDataItemKey::CellData { cell }, _) + if cell_counters + .get(&cell.type_id) + .map_or(true, |start_index| cell.index >= *start_index) => + { + return true; + } + (&CachedDataItemKey::OutdatedChild { task }, _) => { + old_edges.push(OutdatedEdge::Child(task)); + } + ( + &CachedDataItemKey::OutdatedCollectible { collectible }, + &CachedDataItemValue::OutdatedCollectible { value }, + ) => old_edges.push(OutdatedEdge::Collectible(collectible, value)), + (&CachedDataItemKey::OutdatedCellDependency { target }, _) => { + old_edges.push(OutdatedEdge::CellDependency(target)); } - CachedDataItemKey::OutdatedOutputDependency { target } => { - Some(OutdatedEdge::OutputDependency(target)) + (&CachedDataItemKey::OutdatedOutputDependency { target }, _) => { + old_edges.push(OutdatedEdge::OutputDependency(target)); } - CachedDataItemKey::OutdatedCollectiblesDependency { target } => { - Some(OutdatedEdge::CollectiblesDependency(target)) + (&CachedDataItemKey::OutdatedCollectiblesDependency { target }, _) => { + old_edges.push(OutdatedEdge::CollectiblesDependency(target)); } - CachedDataItemKey::CellDependent { cell, task } - if removed_cells + (&CachedDataItemKey::CellDependent { cell, task }, _) + if cell_counters .get(&cell.type_id) - .map_or(false, |range| range.contains(&cell.index)) => + .map_or(true, |start_index| cell.index >= *start_index) => { - Some(OutdatedEdge::RemovedCellDependent(task, cell.type_id)) + old_edges.push(OutdatedEdge::RemovedCellDependent(task, cell.type_id)); } - _ => None, - }) - .collect::>() + _ => {} + } + false + })); }; drop(task); diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index b7cbf9d432360..39f399877e4e9 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -11,6 +11,7 @@ use std::{ mem::{take, transmute}, }; +use either::Either; use serde::{Deserialize, Serialize}; use turbo_tasks::{KeyValuePair, SessionId, TaskId, TurboTasksBackendApi}; @@ -370,6 +371,16 @@ pub trait TaskGuard: Debug { index: CachedDataItemIndex, ) -> impl Iterator; fn iter_all(&self) -> impl Iterator; + fn extract_if<'l, F>( + &'l mut self, + index: CachedDataItemIndex, + f: F, + ) -> impl Iterator + where + F: for<'a, 'b> FnMut(&'a CachedDataItemKey, &'b CachedDataItemValue) -> bool + 'l; + fn extract_if_all<'l, F>(&'l mut self, f: F) -> impl Iterator + where + F: for<'a, 'b> FnMut(&'a CachedDataItemKey, &'b CachedDataItemValue) -> bool + 'l; fn invalidate_serialization(&mut self); } @@ -586,6 +597,60 @@ impl TaskGuard for TaskGuardImpl<'_, B> { self.task.iter_all() } + fn extract_if<'l, F>( + &'l mut self, + index: CachedDataItemIndex, + f: F, + ) -> impl Iterator + where + F: for<'a, 'b> FnMut(&'a CachedDataItemKey, &'b CachedDataItemValue) -> bool + 'l, + { + if !self.backend.should_persist() || self.task_id.is_transient() { + return Either::Left(self.task.extract_if(Some(index), f)); + } + Either::Right(self.task.extract_if(Some(index), f).inspect(|item| { + if item.is_persistent() { + let key = item.key(); + let value = item.value(); + self.backend + .persisted_storage_log(key.category()) + .unwrap() + .lock(self.task_id) + .push(CachedDataUpdate { + key, + task: self.task_id, + value: None, + old_value: Some(value), + }); + } + })) + } + + fn extract_if_all<'l, F>(&'l mut self, f: F) -> impl Iterator + where + F: for<'a, 'b> FnMut(&'a CachedDataItemKey, &'b CachedDataItemValue) -> bool + 'l, + { + if !self.backend.should_persist() || self.task_id.is_transient() { + return Either::Left(self.task.extract_if_all(f)); + } + Either::Right(self.task.extract_if_all(f).inspect(|item| { + if item.is_persistent() { + let key = item.key(); + let value = item.value(); + self.backend + .persisted_storage_log(key.category()) + .unwrap() + .lock(self.task_id) + .push(CachedDataUpdate { + key, + task: self.task_id, + value: None, + old_value: Some(value), + }); + } + })) + } + fn invalidate_serialization(&mut self) { if !self.backend.should_persist() { return; diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index 5531e6a8e9321..b51578cd77042 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -1,3 +1,4 @@ +use core::panic; use std::{ hash::{BuildHasherDefault, Hash}, mem::take, @@ -223,6 +224,16 @@ where } } + fn index_map_mut( + &mut self, + index: ::Index, + ) -> Option<&mut AutoMap> { + match self { + InnerStorage::Plain { map, .. } => Some(map), + InnerStorage::Indexed { map, .. } => map.get_mut(&index), + } + } + pub fn add(&mut self, item: T) -> bool { let (key, value) = item.into_key_and_value(); match self.get_or_create_map_mut(&key).entry(key) { @@ -279,6 +290,35 @@ where } } } + + pub fn extract_if<'l, F>( + &'l mut self, + index: ::Index, + mut f: F, + ) -> impl Iterator + use<'l, T, F> + where + F: for<'a, 'b> FnMut(&'a T::Key, &'b T::Value) -> bool + 'l, + { + self.index_map_mut(index) + .map(move |m| m.extract_if(move |k, v| f(k, v))) + .into_iter() + .flatten() + .map(|(key, value)| T::from_key_and_value(key, value)) + } + + pub fn extract_if_all<'l, F>(&'l mut self, mut f: F) -> impl Iterator + use<'l, T, F> + where + F: for<'a, 'b> FnMut(&'a T::Key, &'b T::Value) -> bool + 'l, + { + match self { + InnerStorage::Plain { map, .. } => map + .extract_if(move |k, v| f(k, v)) + .map(|(key, value)| T::from_key_and_value(key, value)), + InnerStorage::Indexed { .. } => { + panic!("Do not use extract_if_all with indexed storage") + } + } + } } impl InnerStorage diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index 9e628e94ac3c0..36bf68ec8fcd6 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -600,6 +600,7 @@ pub enum CachedDataItemIndex { OutputDependent, CollectiblesDependent, Dependencies, + InProgressCell, } #[allow(non_upper_case_globals, dead_code)] @@ -631,6 +632,7 @@ pub mod indicies { CachedDataItemIndex::Dependencies; pub const OutdatedCollectibleDependency: CachedDataItemIndex = CachedDataItemIndex::Dependencies; + pub const InProgressCell: CachedDataItemIndex = CachedDataItemIndex::InProgressCell; } impl Indexed for CachedDataItemKey { @@ -672,6 +674,7 @@ impl Indexed for CachedDataItemKey { CachedDataItemKey::OutdatedCollectiblesDependency { .. } => { Some(CachedDataItemIndex::Dependencies) } + CachedDataItemKey::InProgressCell { .. } => Some(CachedDataItemIndex::InProgressCell), _ => None, } } From 2c3de6d022296ffbee917ee8ffa19f17c2739322 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Wed, 13 Nov 2024 13:47:34 +0100 Subject: [PATCH 2/3] clippy --- .../crates/turbo-tasks-auto-hash-map/src/map.rs | 2 +- .../crates/turbo-tasks-backend/src/backend/mod.rs | 15 +++++---------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/turbopack/crates/turbo-tasks-auto-hash-map/src/map.rs b/turbopack/crates/turbo-tasks-auto-hash-map/src/map.rs index 36f2e6d8aa31b..1934f12999439 100644 --- a/turbopack/crates/turbo-tasks-auto-hash-map/src/map.rs +++ b/turbopack/crates/turbo-tasks-auto-hash-map/src/map.rs @@ -865,7 +865,7 @@ where Map(std::collections::hash_map::ExtractIf<'l, K, V, F>), } -impl<'l, K, V, const I: usize, F> Iterator for ExtractIfIter<'l, K, V, I, F> +impl Iterator for ExtractIfIter<'_, K, V, I, F> where F: for<'a, 'b> FnMut(&'a K, &'b mut V) -> bool, { diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 6fe5e6af479e2..36270c84b0002 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -1274,16 +1274,11 @@ impl TurboTasksBackendInner { }, )); removed_data.extend(task.extract_if(CachedDataItemIndex::CellData, |key, _| { - match key { - &CachedDataItemKey::CellData { cell } - if cell_counters - .get(&cell.type_id) - .map_or(true, |start_index| cell.index >= *start_index) => - { - true - } - _ => false, - } + matches!(key, &CachedDataItemKey::CellData { cell } if cell_counters + .get(&cell.type_id) + .map_or(true, |start_index| cell.index >= *start_index) && { + true + }) })); if self.should_track_children() { old_edges.extend(task.iter(CachedDataItemIndex::Children).filter_map( From 19acb87efd52508450901f3779cf988550b28dd0 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 14 Nov 2024 11:39:37 +0100 Subject: [PATCH 3/3] review fixups --- turbopack/crates/turbo-tasks-backend/src/backend/mod.rs | 4 +--- turbopack/crates/turbo-tasks-backend/src/backend/storage.rs | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 36270c84b0002..82de596a9adce 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -1276,9 +1276,7 @@ impl TurboTasksBackendInner { removed_data.extend(task.extract_if(CachedDataItemIndex::CellData, |key, _| { matches!(key, &CachedDataItemKey::CellData { cell } if cell_counters .get(&cell.type_id) - .map_or(true, |start_index| cell.index >= *start_index) && { - true - }) + .map_or(true, |start_index| cell.index >= *start_index)) })); if self.should_track_children() { old_edges.extend(task.iter(CachedDataItemIndex::Children).filter_map( diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index b51578cd77042..2bcb9cc674016 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -1,8 +1,8 @@ -use core::panic; use std::{ hash::{BuildHasherDefault, Hash}, mem::take, ops::{Deref, DerefMut}, + panic, thread::available_parallelism, };