Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turbopack: handle task cancelation #76831

Merged
merged 3 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/napi/src/next_api/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,8 @@ async fn project_on_exit_internal(project: &ProjectInstance) {
pub async fn project_shutdown(
#[napi(ts_arg_type = "{ __napiType: \"Project\" }")] project: External<ProjectInstance>,
) {
project_on_exit_internal(&project).await;
project.turbo_tasks.stop_and_wait().await;
project_on_exit_internal(&project).await;
}

#[napi(object)]
Expand Down
30 changes: 29 additions & 1 deletion packages/next/src/shared/lib/turbopack/manifest-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,16 @@ export class TurbopackManifestLoader {
mergeActionIds(manifest.node, m.node)
mergeActionIds(manifest.edge, m.edge)
}
for (const key in manifest.node) {
const entry = manifest.node[key]
entry.workers = sortObjectByKey(entry.workers)
entry.layer = sortObjectByKey(entry.layer)
}
for (const key in manifest.edge) {
const entry = manifest.edge[key]
entry.workers = sortObjectByKey(entry.workers)
entry.layer = sortObjectByKey(entry.layer)
}

return manifest
}
Expand Down Expand Up @@ -247,6 +257,7 @@ export class TurbopackManifestLoader {
for (const m of manifests) {
Object.assign(manifest.pages, m.pages)
}
manifest.pages = sortObjectByKey(manifest.pages)
return manifest
}

Expand Down Expand Up @@ -398,6 +409,7 @@ export class TurbopackManifestLoader {
// polyfillFiles should always be the same, so we can overwrite instead of actually merging
if (m.polyfillFiles.length) manifest.polyfillFiles = m.polyfillFiles
}
manifest.pages = sortObjectByKey(manifest.pages) as BuildManifest['pages']
return manifest
}

Expand Down Expand Up @@ -550,6 +562,8 @@ export class TurbopackManifestLoader {
manifest.pagesUsingSizeAdjust =
manifest.pagesUsingSizeAdjust || m.pagesUsingSizeAdjust
}
manifest.app = sortObjectByKey(manifest.app)
manifest.pages = sortObjectByKey(manifest.pages)
return manifest
}

Expand Down Expand Up @@ -620,6 +634,8 @@ export class TurbopackManifestLoader {
instrumentation = m.instrumentation
}
}
manifest.functions = sortObjectByKey(manifest.functions)
manifest.middleware = sortObjectByKey(manifest.middleware)
const updateFunctionDefinition = (
fun: EdgeFunctionDefinition
): EdgeFunctionDefinition => {
Expand Down Expand Up @@ -696,7 +712,7 @@ export class TurbopackManifestLoader {
for (const m of manifests) {
Object.assign(manifest, m)
}
return manifest
return sortObjectByKey(manifest)
}

private async writePagesManifest(): Promise<void> {
Expand Down Expand Up @@ -733,3 +749,15 @@ export class TurbopackManifestLoader {
}
}
}

function sortObjectByKey(obj: Record<string, any>) {
return Object.keys(obj)
.sort()
.reduce(
(acc, key) => {
acc[key] = obj[key]
return acc
},
{} as Record<string, any>
)
}
45 changes: 41 additions & 4 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
this: &TurboTasksBackendInner<B>,
task: &impl TaskGuard,
reader: Option<TaskId>,
ctx: &impl ExecuteContext<'_>,
) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
{
match get!(task, InProgress) {
Expand All @@ -457,10 +458,18 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
marked_as_completed,
done_event,
..
})) if !*marked_as_completed => {
Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
})) => {
if !*marked_as_completed {
Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
} else {
None
}
}
_ => None,
Some(InProgressState::Canceled) => Some(Err(anyhow::anyhow!(
"{} was canceled",
ctx.get_task_description(task.id())
))),
None => None,
}
}

Expand Down Expand Up @@ -545,7 +554,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
}
}

if let Some(value) = check_in_progress(self, &task, reader) {
if let Some(value) = check_in_progress(self, &task, reader, &ctx) {
return value;
}

Expand Down Expand Up @@ -755,6 +764,9 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
InProgressState::Scheduled { .. } => {
// Already scheduled
}
InProgressState::Canceled => {
bail!("{} was canceled", ctx.get_task_description(task_id));
}
}
} else if task.add(CachedDataItem::new_scheduled(
self.get_task_desc_fn(task_id),
Expand Down Expand Up @@ -1077,6 +1089,27 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
.map(|task_type| task_type.fn_type)
}

fn task_execution_canceled(
&self,
task_id: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
) {
let mut ctx = self.execute_context(turbo_tasks);
let mut task = ctx.task(task_id, TaskDataCategory::Data);
if let Some(in_progress) = remove!(task, InProgress) {
match in_progress {
InProgressState::Scheduled { done_event } => done_event.notify(usize::MAX),
InProgressState::InProgress(box InProgressStateInner { done_event, .. }) => {
done_event.notify(usize::MAX)
}
InProgressState::Canceled => {}
}
}
task.add_new(CachedDataItem::InProgress {
value: InProgressState::Canceled,
});
}

fn try_start_task_execution(
&self,
task_id: TaskId,
Expand Down Expand Up @@ -2036,6 +2069,10 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
type TaskState = ();
fn new_task_state(&self, _task: TaskId) -> Self::TaskState {}

fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
self.0.task_execution_canceled(task, turbo_tasks)
}

fn try_start_task_execution(
&self,
task_id: TaskId,
Expand Down
1 change: 1 addition & 0 deletions turbopack/crates/turbo-tasks-backend/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ pub struct InProgressStateInner {
pub enum InProgressState {
Scheduled { done_event: Event },
InProgress(Box<InProgressStateInner>),
Canceled,
}

transient_traits!(InProgressState);
Expand Down
8 changes: 8 additions & 0 deletions turbopack/crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,14 @@ impl Backend for MemoryBackend {
}
}

fn task_execution_canceled(
&self,
_task: TaskId,
_turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) {
todo!()
}

fn try_start_task_execution<'a>(
&'a self,
task: TaskId,
Expand Down
2 changes: 2 additions & 0 deletions turbopack/crates/turbo-tasks/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,8 @@ pub trait Backend: Sync + Send {
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
) -> Option<TaskExecutionSpec<'a>>;

fn task_execution_canceled(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>);

fn task_execution_result(
&self,
task: TaskId,
Expand Down
1 change: 1 addition & 0 deletions turbopack/crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
)));
let single_execution_future = async {
if this.stopped.load(Ordering::Acquire) {
this.backend.task_execution_canceled(task_id, &*this);
return false;
}

Expand Down
Loading