Skip to content

Commit 88925b2

Browse files
sokrawyattjoh
authored andcommitted
[Turbopack] fix hanging waiting on in progress cells (#72758)
### What? We need to notify the in progress cells when task has finished or cells are updated Also fixes a off-by-one bug with max cell indicies when reading
1 parent a8c68ee commit 88925b2

File tree

6 files changed

+259
-46
lines changed

6 files changed

+259
-46
lines changed

turbopack/crates/turbo-tasks-auto-hash-map/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#![feature(hash_raw_entry)]
2+
#![feature(hash_extract_if)]
23

34
pub mod map;
45
pub mod set;

turbopack/crates/turbo-tasks-auto-hash-map/src/map.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,16 @@ impl<K: Eq + Hash, V, H: BuildHasher + Default, const I: usize> AutoMap<K, V, H,
235235
}
236236
}
237237

238+
pub fn extract_if<'l, F>(&'l mut self, f: F) -> ExtractIfIter<'l, K, V, I, F>
239+
where
240+
F: for<'a, 'b> FnMut(&'a K, &'b mut V) -> bool,
241+
{
242+
match self {
243+
AutoMap::List(list) => ExtractIfIter::List { list, index: 0, f },
244+
AutoMap::Map(map) => ExtractIfIter::Map(map.extract_if(f)),
245+
}
246+
}
247+
238248
/// see [HashMap::shrink_to_fit](https://doc.rust-lang.org/std/collections/struct.HashMap.html#method.shrink_to_fit)
239249
pub fn shrink_to_fit(&mut self) {
240250
match self {
@@ -843,6 +853,43 @@ where
843853
}
844854
}
845855

856+
pub enum ExtractIfIter<'l, K, V, const I: usize, F>
857+
where
858+
F: for<'a, 'b> FnMut(&'a K, &'b mut V) -> bool,
859+
{
860+
List {
861+
list: &'l mut SmallVec<[(K, V); I]>,
862+
index: usize,
863+
f: F,
864+
},
865+
Map(std::collections::hash_map::ExtractIf<'l, K, V, F>),
866+
}
867+
868+
impl<K, V, const I: usize, F> Iterator for ExtractIfIter<'_, K, V, I, F>
869+
where
870+
F: for<'a, 'b> FnMut(&'a K, &'b mut V) -> bool,
871+
{
872+
type Item = (K, V);
873+
874+
fn next(&mut self) -> Option<Self::Item> {
875+
match self {
876+
ExtractIfIter::List { list, index, f } => {
877+
while *index < list.len() {
878+
let (key, value) = &mut list[*index];
879+
if f(key, value) {
880+
let item = list.swap_remove(*index);
881+
return Some(item);
882+
} else {
883+
*index += 1;
884+
}
885+
}
886+
None
887+
}
888+
ExtractIfIter::Map(extract_if) => extract_if.next(),
889+
}
890+
}
891+
}
892+
846893
#[cfg(test)]
847894
mod tests {
848895
use super::*;
@@ -863,4 +910,37 @@ mod tests {
863910
}
864911
assert_eq!(map.remove(&(MAX_LIST_SIZE * 2)), None);
865912
}
913+
914+
#[test]
915+
fn test_extract_if_map() {
916+
let mut map = AutoMap::new();
917+
for i in 0..MAX_LIST_SIZE * 2 {
918+
map.insert(i, i);
919+
}
920+
let iter = map.extract_if(|_, v| *v % 2 == 0);
921+
assert_eq!(iter.count(), MAX_LIST_SIZE);
922+
assert_eq!(map.len(), MAX_LIST_SIZE);
923+
}
924+
925+
#[test]
926+
fn test_extract_if_list() {
927+
let mut map = AutoMap::new();
928+
for i in 0..MIN_HASH_SIZE {
929+
map.insert(i, i);
930+
}
931+
let iter = map.extract_if(|_, v| *v % 2 == 0);
932+
assert_eq!(iter.count(), MIN_HASH_SIZE / 2);
933+
assert_eq!(map.len(), MIN_HASH_SIZE / 2);
934+
}
935+
936+
#[test]
937+
fn test_extract_if_list2() {
938+
let mut map = AutoMap::new();
939+
for i in 0..MIN_HASH_SIZE {
940+
map.insert(i, i);
941+
}
942+
let iter = map.extract_if(|_, v| *v < 5);
943+
assert_eq!(iter.count(), 5);
944+
assert_eq!(map.len(), MIN_HASH_SIZE - 5);
945+
}
866946
}

turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Lines changed: 70 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
604604
ctx.get_task_description(task_id)
605605
);
606606
};
607-
if cell.index > *max_id {
607+
if cell.index >= *max_id {
608608
add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
609609
bail!(
610610
"Cell {cell:?} no longer exists in task {} (index out of bounds)",
@@ -1228,7 +1228,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
12281228
let _ = stateful;
12291229

12301230
// handle cell counters: update max index and remove cells that are no longer used
1231-
let mut removed_cells = HashMap::new();
12321231
let mut old_counters: HashMap<_, _> =
12331232
get_many!(task, CellTypeMaxIndex { cell_type } max_index => (*cell_type, *max_index));
12341233
for (&cell_type, &max_index) in cell_counters.iter() {
@@ -1238,9 +1237,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
12381237
cell_type,
12391238
value: max_index,
12401239
});
1241-
if old_max_index > max_index {
1242-
removed_cells.insert(cell_type, max_index + 1..=old_max_index);
1243-
}
12441240
}
12451241
} else {
12461242
task.add_new(CachedDataItem::CellTypeMaxIndex {
@@ -1249,28 +1245,39 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
12491245
});
12501246
}
12511247
}
1252-
for (cell_type, old_max_index) in old_counters {
1248+
for (cell_type, _) in old_counters {
12531249
task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type });
1254-
removed_cells.insert(cell_type, 0..=old_max_index);
12551250
}
1251+
12561252
let mut removed_data = Vec::new();
1257-
for (&cell_type, range) in removed_cells.iter() {
1258-
for index in range.clone() {
1259-
removed_data.extend(
1260-
task.remove(&CachedDataItemKey::CellData {
1261-
cell: CellId {
1262-
type_id: cell_type,
1263-
index,
1264-
},
1265-
})
1266-
.into_iter(),
1267-
);
1268-
}
1269-
}
1253+
let mut old_edges = Vec::new();
12701254

1255+
// Remove no longer existing cells and notify in progress cells
12711256
// find all outdated data items (removed cells, outdated edges)
1272-
let old_edges = if task.is_indexed() {
1273-
let mut old_edges = Vec::new();
1257+
if task.is_indexed() {
1258+
removed_data.extend(task.extract_if(
1259+
CachedDataItemIndex::InProgressCell,
1260+
|key, value| {
1261+
match (key, value) {
1262+
(
1263+
&CachedDataItemKey::InProgressCell { cell },
1264+
CachedDataItemValue::InProgressCell { value },
1265+
) if cell_counters
1266+
.get(&cell.type_id)
1267+
.map_or(true, |start_index| cell.index >= *start_index) =>
1268+
{
1269+
value.event.notify(usize::MAX);
1270+
true
1271+
}
1272+
_ => false,
1273+
}
1274+
},
1275+
));
1276+
removed_data.extend(task.extract_if(CachedDataItemIndex::CellData, |key, _| {
1277+
matches!(key, &CachedDataItemKey::CellData { cell } if cell_counters
1278+
.get(&cell.type_id)
1279+
.map_or(true, |start_index| cell.index >= *start_index))
1280+
}));
12741281
if self.should_track_children() {
12751282
old_edges.extend(task.iter(CachedDataItemIndex::Children).filter_map(
12761283
|(key, _)| match *key {
@@ -1306,9 +1313,9 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
13061313
|(key, _)| {
13071314
match *key {
13081315
CachedDataItemKey::CellDependent { cell, task }
1309-
if removed_cells
1316+
if cell_counters
13101317
.get(&cell.type_id)
1311-
.map_or(false, |range| range.contains(&cell.index)) =>
1318+
.map_or(true, |start_index| cell.index >= *start_index) =>
13121319
{
13131320
Some(OutdatedEdge::RemovedCellDependent(task, cell.type_id))
13141321
}
@@ -1317,36 +1324,53 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
13171324
},
13181325
));
13191326
}
1320-
old_edges
13211327
} else {
1322-
task.iter_all()
1323-
.filter_map(|(key, value)| match *key {
1324-
CachedDataItemKey::OutdatedChild { task } => Some(OutdatedEdge::Child(task)),
1325-
CachedDataItemKey::OutdatedCollectible { collectible } => {
1326-
let CachedDataItemValue::OutdatedCollectible { value } = *value else {
1327-
unreachable!();
1328-
};
1329-
Some(OutdatedEdge::Collectible(collectible, value))
1328+
removed_data.extend(task.extract_if_all(|key, value| {
1329+
match (key, value) {
1330+
(
1331+
&CachedDataItemKey::InProgressCell { cell },
1332+
CachedDataItemValue::InProgressCell { value },
1333+
) if cell_counters
1334+
.get(&cell.type_id)
1335+
.map_or(true, |start_index| cell.index >= *start_index) =>
1336+
{
1337+
value.event.notify(usize::MAX);
1338+
return true;
13301339
}
1331-
CachedDataItemKey::OutdatedCellDependency { target } => {
1332-
Some(OutdatedEdge::CellDependency(target))
1340+
(&CachedDataItemKey::CellData { cell }, _)
1341+
if cell_counters
1342+
.get(&cell.type_id)
1343+
.map_or(true, |start_index| cell.index >= *start_index) =>
1344+
{
1345+
return true;
13331346
}
1334-
CachedDataItemKey::OutdatedOutputDependency { target } => {
1335-
Some(OutdatedEdge::OutputDependency(target))
1347+
(&CachedDataItemKey::OutdatedChild { task }, _) => {
1348+
old_edges.push(OutdatedEdge::Child(task));
13361349
}
1337-
CachedDataItemKey::OutdatedCollectiblesDependency { target } => {
1338-
Some(OutdatedEdge::CollectiblesDependency(target))
1350+
(
1351+
&CachedDataItemKey::OutdatedCollectible { collectible },
1352+
&CachedDataItemValue::OutdatedCollectible { value },
1353+
) => old_edges.push(OutdatedEdge::Collectible(collectible, value)),
1354+
(&CachedDataItemKey::OutdatedCellDependency { target }, _) => {
1355+
old_edges.push(OutdatedEdge::CellDependency(target));
1356+
}
1357+
(&CachedDataItemKey::OutdatedOutputDependency { target }, _) => {
1358+
old_edges.push(OutdatedEdge::OutputDependency(target));
1359+
}
1360+
(&CachedDataItemKey::OutdatedCollectiblesDependency { target }, _) => {
1361+
old_edges.push(OutdatedEdge::CollectiblesDependency(target));
13391362
}
1340-
CachedDataItemKey::CellDependent { cell, task }
1341-
if removed_cells
1363+
(&CachedDataItemKey::CellDependent { cell, task }, _)
1364+
if cell_counters
13421365
.get(&cell.type_id)
1343-
.map_or(false, |range| range.contains(&cell.index)) =>
1366+
.map_or(true, |start_index| cell.index >= *start_index) =>
13441367
{
1345-
Some(OutdatedEdge::RemovedCellDependent(task, cell.type_id))
1368+
old_edges.push(OutdatedEdge::RemovedCellDependent(task, cell.type_id));
13461369
}
1347-
_ => None,
1348-
})
1349-
.collect::<Vec<_>>()
1370+
_ => {}
1371+
}
1372+
false
1373+
}));
13501374
};
13511375
drop(task);
13521376

turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::{
1111
mem::{take, transmute},
1212
};
1313

14+
use either::Either;
1415
use serde::{Deserialize, Serialize};
1516
use turbo_tasks::{KeyValuePair, SessionId, TaskId, TurboTasksBackendApi};
1617

@@ -370,6 +371,16 @@ pub trait TaskGuard: Debug {
370371
index: CachedDataItemIndex,
371372
) -> impl Iterator<Item = (&CachedDataItemKey, &CachedDataItemValue)>;
372373
fn iter_all(&self) -> impl Iterator<Item = (&CachedDataItemKey, &CachedDataItemValue)>;
374+
fn extract_if<'l, F>(
375+
&'l mut self,
376+
index: CachedDataItemIndex,
377+
f: F,
378+
) -> impl Iterator<Item = CachedDataItem>
379+
where
380+
F: for<'a, 'b> FnMut(&'a CachedDataItemKey, &'b CachedDataItemValue) -> bool + 'l;
381+
fn extract_if_all<'l, F>(&'l mut self, f: F) -> impl Iterator<Item = CachedDataItem>
382+
where
383+
F: for<'a, 'b> FnMut(&'a CachedDataItemKey, &'b CachedDataItemValue) -> bool + 'l;
373384
fn invalidate_serialization(&mut self);
374385
}
375386

@@ -586,6 +597,60 @@ impl<B: BackingStorage> TaskGuard for TaskGuardImpl<'_, B> {
586597
self.task.iter_all()
587598
}
588599

600+
fn extract_if<'l, F>(
601+
&'l mut self,
602+
index: CachedDataItemIndex,
603+
f: F,
604+
) -> impl Iterator<Item = CachedDataItem>
605+
where
606+
F: for<'a, 'b> FnMut(&'a CachedDataItemKey, &'b CachedDataItemValue) -> bool + 'l,
607+
{
608+
if !self.backend.should_persist() || self.task_id.is_transient() {
609+
return Either::Left(self.task.extract_if(Some(index), f));
610+
}
611+
Either::Right(self.task.extract_if(Some(index), f).inspect(|item| {
612+
if item.is_persistent() {
613+
let key = item.key();
614+
let value = item.value();
615+
self.backend
616+
.persisted_storage_log(key.category())
617+
.unwrap()
618+
.lock(self.task_id)
619+
.push(CachedDataUpdate {
620+
key,
621+
task: self.task_id,
622+
value: None,
623+
old_value: Some(value),
624+
});
625+
}
626+
}))
627+
}
628+
629+
fn extract_if_all<'l, F>(&'l mut self, f: F) -> impl Iterator<Item = CachedDataItem>
630+
where
631+
F: for<'a, 'b> FnMut(&'a CachedDataItemKey, &'b CachedDataItemValue) -> bool + 'l,
632+
{
633+
if !self.backend.should_persist() || self.task_id.is_transient() {
634+
return Either::Left(self.task.extract_if_all(f));
635+
}
636+
Either::Right(self.task.extract_if_all(f).inspect(|item| {
637+
if item.is_persistent() {
638+
let key = item.key();
639+
let value = item.value();
640+
self.backend
641+
.persisted_storage_log(key.category())
642+
.unwrap()
643+
.lock(self.task_id)
644+
.push(CachedDataUpdate {
645+
key,
646+
task: self.task_id,
647+
value: None,
648+
old_value: Some(value),
649+
});
650+
}
651+
}))
652+
}
653+
589654
fn invalidate_serialization(&mut self) {
590655
if !self.backend.should_persist() {
591656
return;

0 commit comments

Comments
 (0)