Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(turbopack): Only perform strongly consistent reads/resolves on VcOperation #75016

Merged
merged 6 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 35 additions & 21 deletions crates/napi/src/next_api/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,17 @@ impl Deref for ExternalEndpoint {
// Await the source and return fatal issues if there are any, otherwise
// propagate any actual error results.
async fn strongly_consistent_catch_collectables<R: VcValueType + Send>(
source: OperationVc<R>,
source_op: OperationVc<R>,
) -> Result<(
Option<ReadRef<R>>,
Arc<Vec<ReadRef<PlainIssue>>>,
Arc<Vec<ReadRef<PlainDiagnostic>>>,
Arc<Effects>,
)> {
let result = source.connect().strongly_consistent().await;
let issues = get_issues(source).await?;
let diagnostics = get_diagnostics(source).await?;
let effects = Arc::new(get_effects(source).await?);
let result = source_op.read_strongly_consistent().await;
let issues = get_issues(source_op).await?;
let diagnostics = get_diagnostics(source_op).await?;
let effects = Arc::new(get_effects(source_op).await?);

let result = if result.is_err() && issues.iter().any(|i| i.severity <= IssueSeverity::Error) {
None
Expand All @@ -131,13 +131,13 @@ struct WrittenEndpointWithIssues {
effects: Arc<Effects>,
}

#[turbo_tasks::function]
async fn get_written_endpoint_with_issues(
#[turbo_tasks::function(operation)]
async fn get_written_endpoint_with_issues_operation(
endpoint_op: OperationVc<Box<dyn Endpoint>>,
) -> Result<Vc<WrittenEndpointWithIssues>> {
let write_to_disk = endpoint_write_to_disk_operation(endpoint_op);
let write_to_disk_op = endpoint_write_to_disk_operation(endpoint_op);
let (written, issues, diagnostics, effects) =
strongly_consistent_catch_collectables(write_to_disk).await?;
strongly_consistent_catch_collectables(write_to_disk_op).await?;
Ok(WrittenEndpointWithIssues {
written,
issues,
Expand All @@ -156,13 +156,16 @@ pub async fn endpoint_write_to_disk(
let endpoint_op = ***endpoint;
let (written, issues, diags) = turbo_tasks
.run_once(async move {
let operation = get_written_endpoint_with_issues(endpoint_op);
let written_entrypoint_with_issues_op =
get_written_endpoint_with_issues_operation(endpoint_op);
let WrittenEndpointWithIssues {
written,
issues,
diagnostics,
effects,
} = &*operation.strongly_consistent().await?;
} = &*written_entrypoint_with_issues_op
.read_strongly_consistent()
.await?;
effects.apply().await?;

Ok((written.clone(), issues.clone(), diagnostics.clone()))
Expand All @@ -189,8 +192,8 @@ pub fn endpoint_server_changed_subscribe(
func,
move || {
async move {
let vc = subscribe_issues_and_diags(endpoint, issues);
let result = vc.strongly_consistent().await?;
let issues_and_diags_op = subscribe_issues_and_diags_operation(endpoint, issues);
let result = issues_and_diags_op.read_strongly_consistent().await?;
result.effects.apply().await?;
Ok(result)
}
Expand Down Expand Up @@ -237,16 +240,16 @@ impl PartialEq for EndpointIssuesAndDiags {

impl Eq for EndpointIssuesAndDiags {}

#[turbo_tasks::function]
async fn subscribe_issues_and_diags(
#[turbo_tasks::function(operation)]
async fn subscribe_issues_and_diags_operation(
endpoint_op: OperationVc<Box<dyn Endpoint>>,
should_include_issues: bool,
) -> Result<Vc<EndpointIssuesAndDiags>> {
let changed = endpoint_server_changed_operation(endpoint_op);
let changed_op = endpoint_server_changed_operation(endpoint_op);

if should_include_issues {
let (changed_value, issues, diagnostics, effects) =
strongly_consistent_catch_collectables(changed).await?;
strongly_consistent_catch_collectables(changed_op).await?;
Ok(EndpointIssuesAndDiags {
changed: changed_value,
issues,
Expand All @@ -255,7 +258,7 @@ async fn subscribe_issues_and_diags(
}
.cell())
} else {
let changed_value = changed.connect().strongly_consistent().await?;
let changed_value = changed_op.read_strongly_consistent().await?;
Ok(EndpointIssuesAndDiags {
changed: Some(changed_value),
issues: Arc::new(vec![]),
Expand All @@ -266,22 +269,33 @@ async fn subscribe_issues_and_diags(
}
}

#[turbo_tasks::function(operation)]
fn endpoint_client_changed_operation(
endpoint_op: OperationVc<Box<dyn Endpoint>>,
) -> Vc<Completion> {
endpoint_op.connect().client_changed()
}

#[napi(ts_return_type = "{ __napiType: \"RootTask\" }")]
pub fn endpoint_client_changed_subscribe(
#[napi(ts_arg_type = "{ __napiType: \"Endpoint\" }")] endpoint: External<ExternalEndpoint>,
func: JsFunction,
) -> napi::Result<External<RootTask>> {
let turbo_tasks = endpoint.turbo_tasks().clone();
let endpoint = ***endpoint;
let endpoint_op = ***endpoint;
subscribe(
turbo_tasks,
func,
move || {
async move {
let changed = endpoint.connect().client_changed();
let changed_op = endpoint_client_changed_operation(endpoint_op);
// We don't capture issues and diagnostics here since we don't want to be
// notified when they change
changed.strongly_consistent().await?;
//
// This must be a *read*, not just a resolve, because we need the root task created
// by `subscribe` to re-run when the `Completion`'s value changes (via equality),
// even if the cell id doesn't change.
let _ = changed_op.read_strongly_consistent().await?;
Ok(())
}
.instrument(tracing::info_span!("client changes subscription"))
Expand Down
72 changes: 38 additions & 34 deletions crates/napi/src/next_api/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@
let container = turbo_tasks
.run_once(async move {
let project = ProjectContainer::new("next.js".into(), options.dev);
let project = project.resolve().await?;
let project = project.to_resolved().await?;
project.initialize(options).await?;
Ok(project)
})
Expand All @@ -423,7 +423,7 @@
Ok(External::new_with_size_hint(
ProjectInstance {
turbo_tasks,
container,
container: *container,
exit_receiver: tokio::sync::Mutex::new(Some(exit_receiver)),
},
100,
Expand Down Expand Up @@ -659,16 +659,13 @@
effects: Arc<Effects>,
}

#[turbo_tasks::function]
async fn get_entrypoints_with_issues(
#[turbo_tasks::function(operation)]
async fn get_entrypoints_with_issues_operation(
container: ResolvedVc<ProjectContainer>,
) -> Result<Vc<EntrypointsWithIssues>> {
let entrypoints_operation =
EntrypointsOperation::new(project_container_entrypoints_operation(container));
let entrypoints = entrypoints_operation
.connect()
.strongly_consistent()
.await?;
let entrypoints = entrypoints_operation.read_strongly_consistent().await?;
let issues = get_issues(entrypoints_operation).await?;
let diagnostics = get_diagnostics(entrypoints_operation).await?;
let effects = Arc::new(get_effects(entrypoints_operation).await?);
Expand Down Expand Up @@ -702,13 +699,16 @@
func,
move || {
async move {
let operation = get_entrypoints_with_issues(container);
let entrypoints_with_issues_op =
get_entrypoints_with_issues_operation(container.to_resolved().await?);
let EntrypointsWithIssues {
entrypoints,
issues,
diagnostics,
effects,
} = &*operation.strongly_consistent().await?;
} = &*entrypoints_with_issues_op
.read_strongly_consistent()
.await?;
effects.apply().await?;
Ok((entrypoints.clone(), issues.clone(), diagnostics.clone()))
}
Expand Down Expand Up @@ -771,17 +771,17 @@
effects: Arc<Effects>,
}

#[turbo_tasks::function]
async fn hmr_update(
#[turbo_tasks::function(operation)]
async fn hmr_update_with_issues_operation(
project: ResolvedVc<Project>,
identifier: RcStr,
state: ResolvedVc<VersionState>,
) -> Result<Vc<HmrUpdateWithIssues>> {
let update_operation = project_hmr_update_operation(project, identifier, state);
let update = update_operation.connect().strongly_consistent().await?;
let issues = get_issues(update_operation).await?;
let diagnostics = get_diagnostics(update_operation).await?;
let effects = Arc::new(get_effects(update_operation).await?);
let update_op = project_hmr_update_operation(project, identifier, state);
let update = update_op.read_strongly_consistent().await?;
let issues = get_issues(update_op).await?;
let diagnostics = get_diagnostics(update_op).await?;
let effects = Arc::new(get_effects(update_op).await?);
Ok(HmrUpdateWithIssues {
update,
issues,
Expand Down Expand Up @@ -819,11 +819,15 @@
let identifier: RcStr = outer_identifier.clone().into();
let session = session.clone();
async move {
let project = project.project().resolve().await?;
let state = project.hmr_version_state(identifier.clone(), session);

let operation = hmr_update(project, identifier.clone(), state);
let update = operation.strongly_consistent().await?;
let project = project.project().to_resolved().await?;
let state = project
.hmr_version_state(identifier.clone(), session)
.to_resolved()
.await?;

let update_op =
hmr_update_with_issues_operation(project, identifier.clone(), state);
let update = update_op.read_strongly_consistent().await?;
let HmrUpdateWithIssues {
update,
issues,
Expand Down Expand Up @@ -898,18 +902,15 @@
effects: Arc<Effects>,
}

#[turbo_tasks::function]
async fn get_hmr_identifiers_with_issues(
#[turbo_tasks::function(operation)]
async fn get_hmr_identifiers_with_issues_operation(
container: ResolvedVc<ProjectContainer>,
) -> Result<Vc<HmrIdentifiersWithIssues>> {
let hmr_identifiers_operation = project_container_hmr_identifiers_operation(container);
let hmr_identifiers = hmr_identifiers_operation
.connect()
.strongly_consistent()
.await?;
let issues = get_issues(hmr_identifiers_operation).await?;
let diagnostics = get_diagnostics(hmr_identifiers_operation).await?;
let effects = Arc::new(get_effects(hmr_identifiers_operation).await?);
let hmr_identifiers_op = project_container_hmr_identifiers_operation(container);
let hmr_identifiers = hmr_identifiers_op.read_strongly_consistent().await?;
let issues = get_issues(hmr_identifiers_op).await?;
let diagnostics = get_diagnostics(hmr_identifiers_op).await?;
let effects = Arc::new(get_effects(hmr_identifiers_op).await?);
Ok(HmrIdentifiersWithIssues {
identifiers: hmr_identifiers,
issues,
Expand Down Expand Up @@ -937,13 +938,16 @@
turbo_tasks.clone(),
func,
move || async move {
let operation = get_hmr_identifiers_with_issues(container);
let hmr_identifiers_with_issues_op =
get_hmr_identifiers_with_issues_operation(container.to_resolved().await?);
let HmrIdentifiersWithIssues {
identifiers,
issues,
diagnostics,
effects,
} = &*operation.strongly_consistent().await?;
} = &*hmr_identifiers_with_issues_op
.read_strongly_consistent()
.await?;
effects.apply().await?;

Ok((identifiers.clone(), issues.clone(), diagnostics.clone()))
Expand Down Expand Up @@ -1012,7 +1016,7 @@
}
}

/// Subscribes to lifecycle events of the compilation.

Check warning on line 1019 in crates/napi/src/next_api/project.rs

View workflow job for this annotation

GitHub Actions / rustdoc check / build

public documentation for `project_update_info_subscribe` links to private item `UpdateMessage::Start`

Check warning on line 1019 in crates/napi/src/next_api/project.rs

View workflow job for this annotation

GitHub Actions / rustdoc check / build

public documentation for `project_update_info_subscribe` links to private item `UpdateMessage::End`

Check warning on line 1019 in crates/napi/src/next_api/project.rs

View workflow job for this annotation

GitHub Actions / rustdoc check / build

public documentation for `project_update_info_subscribe` links to private item `UpdateMessage::End`

Check warning on line 1019 in crates/napi/src/next_api/project.rs

View workflow job for this annotation

GitHub Actions / rustdoc check / build

public documentation for `project_update_info_subscribe` links to private item `UpdateMessage::Start`
///
/// Emits an [UpdateMessage::Start] event when any computation starts.
/// Emits an [UpdateMessage::End] event when there was no computation for the
Expand Down
10 changes: 6 additions & 4 deletions crates/next-api/src/module_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,10 +587,12 @@ pub async fn get_reduced_graphs_for_endpoint(
// TODO get rid of this function once everything inside of
// `get_reduced_graphs_for_endpoint_inner` calls `take_collectibles()` when needed
let result_op = get_reduced_graphs_for_endpoint_inner_operation(module_graph, is_single_page);
let result_vc = result_op.connect();
if !is_single_page {
result_vc.strongly_consistent().await?;
let result_vc = if !is_single_page {
let result_vc = result_op.resolve_strongly_consistent().await?;
let _issues = result_op.take_collectibles::<Box<dyn Issue>>();
}
*result_vc
} else {
result_op.connect()
};
Ok(result_vc)
}
50 changes: 36 additions & 14 deletions crates/next-api/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,23 +273,37 @@ impl ProjectContainer {
}
}

#[turbo_tasks::function(operation)]
fn project_fs_operation(project: ResolvedVc<Project>) -> Vc<DiskFileSystem> {
project.project_fs()
}

#[turbo_tasks::function(operation)]
fn output_fs_operation(project: ResolvedVc<Project>) -> Vc<DiskFileSystem> {
project.project_fs()
}

impl ProjectContainer {
#[tracing::instrument(level = "info", name = "initialize project", skip_all)]
pub async fn initialize(self: Vc<Self>, options: ProjectOptions) -> Result<()> {
pub async fn initialize(self: ResolvedVc<Self>, options: ProjectOptions) -> Result<()> {
let watch = options.watch;

self.await?.options_state.set(Some(options));

let project = self.project();
let project_fs = project.project_fs().strongly_consistent().await?;
let project = self.project().to_resolved().await?;
let project_fs = project_fs_operation(project)
.read_strongly_consistent()
.await?;
if watch.enable {
project_fs
.start_watching_with_invalidation_reason(watch.poll_interval)
.await?;
} else {
project_fs.invalidate_with_reason();
}
let output_fs = project.output_fs().strongly_consistent().await?;
let output_fs = output_fs_operation(project)
.read_strongly_consistent()
.await?;
output_fs.invalidate_with_reason();
Ok(())
}
Expand Down Expand Up @@ -355,13 +369,22 @@ impl ProjectContainer {
// TODO: Handle mode switch, should prevent mode being switched.
let watch = new_options.watch;

let project = self.project();
let prev_project_fs = project.project_fs().strongly_consistent().await?;
let prev_output_fs = project.output_fs().strongly_consistent().await?;
let project = self.project().to_resolved().await?;
let prev_project_fs = project_fs_operation(project)
.read_strongly_consistent()
.await?;
let prev_output_fs = output_fs_operation(project)
.read_strongly_consistent()
.await?;

this.options_state.set(Some(new_options));
let project_fs = project.project_fs().strongly_consistent().await?;
let output_fs = project.output_fs().strongly_consistent().await?;
let project = self.project().to_resolved().await?;
let project_fs = project_fs_operation(project)
.read_strongly_consistent()
.await?;
let output_fs = output_fs_operation(project)
.read_strongly_consistent()
.await?;

if !ReadRef::ptr_eq(&prev_project_fs, &project_fs) {
if watch.enable {
Expand Down Expand Up @@ -868,11 +891,10 @@ impl Project {
#[turbo_tasks::function]
pub async fn whole_app_module_graphs(self: ResolvedVc<Self>) -> Result<Vc<ModuleGraphs>> {
async move {
let operation = whole_app_module_graph_operation(self);
let module_graphs = operation.connect();
let _ = module_graphs.resolve_strongly_consistent().await?;
let _ = operation.take_issues_with_path().await?;
Ok(module_graphs)
let module_graphs_op = whole_app_module_graph_operation(self);
let module_graphs_vc = module_graphs_op.resolve_strongly_consistent().await?;
let _ = module_graphs_op.take_issues_with_path().await?;
Ok(*module_graphs_vc)
}
.instrument(tracing::info_span!("module graph for app"))
.await
Expand Down
Loading
Loading