From 5729e4da599e479b49ff0a56e845a721bc83441f Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Wed, 12 Mar 2025 11:55:43 +0100 Subject: [PATCH 1/2] when reading a non yet existing cell from a in progress tasks, wait for the computation to finish --- .../turbo-tasks-backend/src/backend/mod.rs | 146 ++++++++---------- 1 file changed, 66 insertions(+), 80 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index ac651fded253a..a5a5f81ffed29 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -676,20 +676,29 @@ impl TurboTasksBackendInner { ))); } + let in_progress = get!(task, InProgress); + if matches!(in_progress, Some(InProgressState::InProgress(..))) { + return Ok(Err(self.listen_to_cell(&mut task, task_id, reader, cell).0)); + } + let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled)); + let is_scheduled = matches!(in_progress, Some(InProgressState::Scheduled { .. })); + // Check cell index range (cell might not exist at all) - let Some(max_id) = get!( + let max_id = get!( task, CellTypeMaxIndex { cell_type: cell.type_id } - ) else { + ) + .copied(); + let Some(max_id) = max_id else { add_cell_dependency(self, task, reader, cell, task_id, &mut ctx); bail!( "Cell {cell:?} no longer exists in task {} (no cell of this type exists)", ctx.get_task_description(task_id) ); }; - if cell.index >= *max_id { + if cell.index >= max_id { add_cell_dependency(self, task, reader, cell, task_id, &mut ctx); bail!( "Cell {cell:?} no longer exists in task {} (index out of bounds)", @@ -700,19 +709,9 @@ impl TurboTasksBackendInner { // Cell should exist, but data was dropped or is not serializable. We need to recompute the // task the get the cell content. - let reader_desc = reader.map(|r| self.get_task_desc_fn(r)); - let note = move || { - if let Some(reader_desc) = reader_desc.as_ref() { - format!("try_read_task_cell from {}", reader_desc()) - } else { - "try_read_task_cell (untracked)".to_string() - } - }; - - // Register event listener for cell computation - if let Some(in_progress) = get!(task, InProgressCell { cell }) { - // Someone else is already computing the cell - let listener = in_progress.event.listen_with_note(note); + // Listen to the cell and potentially schedule the task + let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, reader, cell); + if !new_listener { return Ok(Err(listener)); } @@ -723,58 +722,47 @@ impl TurboTasksBackendInner { ) .entered(); - // We create the event and potentially schedule the task - let in_progress = InProgressCellState::new(task_id, cell); + // Schedule the task, if not already scheduled + if is_cancelled { + bail!("{} was canceled", ctx.get_task_description(task_id)); + } else if !is_scheduled { + if task.add(CachedDataItem::new_scheduled( + self.get_task_desc_fn(task_id), + )) { + turbo_tasks.schedule(task_id); + } + } + Ok(Err(listener)) + } + + fn listen_to_cell( + &self, + task: &mut impl TaskGuard, + task_id: TaskId, + reader: Option, + cell: CellId, + ) -> (EventListener, bool) { + let reader_desc = reader.map(|r| self.get_task_desc_fn(r)); + let note = move || { + if let Some(reader_desc) = reader_desc.as_ref() { + format!("try_read_task_cell (in progress) from {}", reader_desc()) + } else { + "try_read_task_cell (in progress, untracked)".to_string() + } + }; + if let Some(in_progress) = get!(task, InProgressCell { cell }) { + // Someone else is already computing the cell + let listener = in_progress.event.listen_with_note(note); + return (listener, false); + } + let in_progress = InProgressCellState::new(task_id, cell); let listener = in_progress.event.listen_with_note(note); task.add_new(CachedDataItem::InProgressCell { cell, value: in_progress, }); - - // Schedule the task, if not already scheduled - if let Some(existing) = get!(task, InProgress) { - match existing { - InProgressState::InProgress(box InProgressStateInner { stale, .. }) => { - if !*stale { - let idx = get!( - task, - CellTypeMaxIndex { - cell_type: cell.type_id - } - ) - .copied() - .unwrap_or_default(); - if cell.index <= idx { - // The current execution is past the cell, so we need to reexecute. - let Some(InProgressState::InProgress(box InProgressStateInner { - stale, - .. - })) = get_mut!(task, InProgress) - else { - unreachable!(); - }; - *stale = true; - } else { - // The cell will still be written in the current execution, so we can - // just continue here. - } - } - } - InProgressState::Scheduled { .. } => { - // Already scheduled - } - InProgressState::Canceled => { - bail!("{} was canceled", ctx.get_task_description(task_id)); - } - } - } else if task.add(CachedDataItem::new_scheduled( - self.get_task_desc_fn(task_id), - )) { - turbo_tasks.schedule(task_id); - } - - Ok(Err(listener)) + (listener, true) } fn lookup_task_type(&self, task_id: TaskId) -> Option> { @@ -1385,25 +1373,8 @@ impl TurboTasksBackendInner { old_edges.extend(iter_many!(task, Child { task } => task).map(OutdatedEdge::Child)); } - // Remove no longer existing cells and notify in progress cells + // Remove no longer existing cells and // find all outdated data items (removed cells, outdated edges) - removed_data.extend( - task.extract_if(CachedDataItemType::InProgressCell, |key, value| { - match (key, value) { - ( - CachedDataItemKey::InProgressCell { cell }, - CachedDataItemValueRef::InProgressCell { value }, - ) if cell_counters - .get(&cell.type_id) - .is_none_or(|start_index| cell.index >= *start_index) => - { - value.event.notify(usize::MAX); - true - } - _ => false, - } - }), - ); removed_data.extend(task.extract_if(CachedDataItemType::CellData, |key, _| { matches!(key, CachedDataItemKey::CellData { cell } if cell_counters .get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index)) @@ -1540,6 +1511,21 @@ impl TurboTasksBackendInner { return true; } + // Notify in progress cells + removed_data.extend(task.extract_if( + CachedDataItemType::InProgressCell, + |key, value| match (key, value) { + ( + CachedDataItemKey::InProgressCell { .. }, + CachedDataItemValueRef::InProgressCell { value }, + ) => { + value.event.notify(usize::MAX); + true + } + _ => false, + }, + )); + // Update the dirty state let new_dirty_state = if session_dependent { Some(DirtyState { From 1c951dd7d5ea521322857ce92d85fb1f3ab24040 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 13 Mar 2025 01:21:40 +0100 Subject: [PATCH 2/2] clippy --- .../crates/turbo-tasks-backend/src/backend/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index a5a5f81ffed29..c53a986bfddd9 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -725,12 +725,12 @@ impl TurboTasksBackendInner { // Schedule the task, if not already scheduled if is_cancelled { bail!("{} was canceled", ctx.get_task_description(task_id)); - } else if !is_scheduled { - if task.add(CachedDataItem::new_scheduled( + } else if !is_scheduled + && task.add(CachedDataItem::new_scheduled( self.get_task_desc_fn(task_id), - )) { - turbo_tasks.schedule(task_id); - } + )) + { + turbo_tasks.schedule(task_id); } Ok(Err(listener))