Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turbopack: when reading a non yet existing cell from a in progress tasks, wait for the computation to finish #77029

Merged
merged 2 commits into from
Mar 13, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 66 additions & 80 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,20 +676,29 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
)));
}

let in_progress = get!(task, InProgress);
if matches!(in_progress, Some(InProgressState::InProgress(..))) {
return Ok(Err(self.listen_to_cell(&mut task, task_id, reader, cell).0));
}
let is_cancelled = matches!(in_progress, Some(InProgressState::Canceled));
let is_scheduled = matches!(in_progress, Some(InProgressState::Scheduled { .. }));

// Check cell index range (cell might not exist at all)
let Some(max_id) = get!(
let max_id = get!(
task,
CellTypeMaxIndex {
cell_type: cell.type_id
}
) else {
)
.copied();
let Some(max_id) = max_id else {
add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
bail!(
"Cell {cell:?} no longer exists in task {} (no cell of this type exists)",
ctx.get_task_description(task_id)
);
};
if cell.index >= *max_id {
if cell.index >= max_id {
add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
bail!(
"Cell {cell:?} no longer exists in task {} (index out of bounds)",
Expand All @@ -700,19 +709,9 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
// Cell should exist, but data was dropped or is not serializable. We need to recompute the
// task the get the cell content.

let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
let note = move || {
if let Some(reader_desc) = reader_desc.as_ref() {
format!("try_read_task_cell from {}", reader_desc())
} else {
"try_read_task_cell (untracked)".to_string()
}
};

// Register event listener for cell computation
if let Some(in_progress) = get!(task, InProgressCell { cell }) {
// Someone else is already computing the cell
let listener = in_progress.event.listen_with_note(note);
// Listen to the cell and potentially schedule the task
let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, reader, cell);
if !new_listener {
return Ok(Err(listener));
}

Expand All @@ -723,58 +722,47 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
)
.entered();

// We create the event and potentially schedule the task
let in_progress = InProgressCellState::new(task_id, cell);
// Schedule the task, if not already scheduled
if is_cancelled {
bail!("{} was canceled", ctx.get_task_description(task_id));
} else if !is_scheduled
&& task.add(CachedDataItem::new_scheduled(
self.get_task_desc_fn(task_id),
))
{
turbo_tasks.schedule(task_id);
}

Ok(Err(listener))
}

fn listen_to_cell(
&self,
task: &mut impl TaskGuard,
task_id: TaskId,
reader: Option<TaskId>,
cell: CellId,
) -> (EventListener, bool) {
let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
let note = move || {
if let Some(reader_desc) = reader_desc.as_ref() {
format!("try_read_task_cell (in progress) from {}", reader_desc())
} else {
"try_read_task_cell (in progress, untracked)".to_string()
}
};
if let Some(in_progress) = get!(task, InProgressCell { cell }) {
// Someone else is already computing the cell
let listener = in_progress.event.listen_with_note(note);
return (listener, false);
}
let in_progress = InProgressCellState::new(task_id, cell);
let listener = in_progress.event.listen_with_note(note);
task.add_new(CachedDataItem::InProgressCell {
cell,
value: in_progress,
});

// Schedule the task, if not already scheduled
if let Some(existing) = get!(task, InProgress) {
match existing {
InProgressState::InProgress(box InProgressStateInner { stale, .. }) => {
if !*stale {
let idx = get!(
task,
CellTypeMaxIndex {
cell_type: cell.type_id
}
)
.copied()
.unwrap_or_default();
if cell.index <= idx {
// The current execution is past the cell, so we need to reexecute.
let Some(InProgressState::InProgress(box InProgressStateInner {
stale,
..
})) = get_mut!(task, InProgress)
else {
unreachable!();
};
*stale = true;
} else {
// The cell will still be written in the current execution, so we can
// just continue here.
}
}
}
InProgressState::Scheduled { .. } => {
// Already scheduled
}
InProgressState::Canceled => {
bail!("{} was canceled", ctx.get_task_description(task_id));
}
}
} else if task.add(CachedDataItem::new_scheduled(
self.get_task_desc_fn(task_id),
)) {
turbo_tasks.schedule(task_id);
}

Ok(Err(listener))
(listener, true)
}

fn lookup_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
Expand Down Expand Up @@ -1385,25 +1373,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
old_edges.extend(iter_many!(task, Child { task } => task).map(OutdatedEdge::Child));
}

// Remove no longer existing cells and notify in progress cells
// Remove no longer existing cells and
// find all outdated data items (removed cells, outdated edges)
removed_data.extend(
task.extract_if(CachedDataItemType::InProgressCell, |key, value| {
match (key, value) {
(
CachedDataItemKey::InProgressCell { cell },
CachedDataItemValueRef::InProgressCell { value },
) if cell_counters
.get(&cell.type_id)
.is_none_or(|start_index| cell.index >= *start_index) =>
{
value.event.notify(usize::MAX);
true
}
_ => false,
}
}),
);
removed_data.extend(task.extract_if(CachedDataItemType::CellData, |key, _| {
matches!(key, CachedDataItemKey::CellData { cell } if cell_counters
.get(&cell.type_id).is_none_or(|start_index| cell.index >= *start_index))
Expand Down Expand Up @@ -1540,6 +1511,21 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
return true;
}

// Notify in progress cells
removed_data.extend(task.extract_if(
CachedDataItemType::InProgressCell,
|key, value| match (key, value) {
(
CachedDataItemKey::InProgressCell { .. },
CachedDataItemValueRef::InProgressCell { value },
) => {
value.event.notify(usize::MAX);
true
}
_ => false,
},
));

// Update the dirty state
let new_dirty_state = if session_dependent {
Some(DirtyState {
Expand Down
Loading