From 5e688da7c8d3bd51996d96bc37ba8fcaf3fd1f37 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 3 Feb 2023 20:31:45 +0100 Subject: [PATCH] add helper for walking a tree concurrently and deterministic (vercel/turbo#3619) Similar to our `try_join` helper this adds a `try_flat_map_recursive_join` helper, which allows to async expand a tree structure into all nodes. It will call the async mapper function concurrently to allow parallelism. It will handle circular and duplicate references and return all nodes in a determinstic way (breath-first). --- crates/next-core/src/manifest.rs | 43 +++++--- crates/next-core/src/next_font_google/mod.rs | 2 +- crates/turbo-tasks-fetch/src/lib.rs | 6 +- crates/turbo-tasks-fs/examples/hash_glob.rs | 2 +- crates/turbo-tasks/Cargo.toml | 1 + crates/turbo-tasks/src/join_iter_ext.rs | 102 +++++++++++++++++- crates/turbo-tasks/src/lib.rs | 2 +- crates/turbo-tasks/src/read_ref.rs | 10 +- crates/turbopack-core/src/chunk/mod.rs | 57 ++++++---- crates/turbopack-core/src/resolve/parse.rs | 16 ++- .../src/references/cjs.rs | 4 +- ...hot_emotion_emotion_input_index_f0bbb5.js} | 6 +- ...emotion_emotion_input_index_f0bbb5.js.map} | 0 13 files changed, 196 insertions(+), 55 deletions(-) rename crates/turbopack-tests/tests/snapshot/emotion/emotion/output/{crates_turbopack-tests_tests_snapshot_emotion_emotion_input_index_6545dc.js => crates_turbopack-tests_tests_snapshot_emotion_emotion_input_index_f0bbb5.js} (98%) rename crates/turbopack-tests/tests/snapshot/emotion/emotion/output/{crates_turbopack-tests_tests_snapshot_emotion_emotion_input_index_6545dc.js.map => crates_turbopack-tests_tests_snapshot_emotion_emotion_input_index_f0bbb5.js.map} (100%) diff --git a/crates/next-core/src/manifest.rs b/crates/next-core/src/manifest.rs index 25e793f69e0e3..19c5f51c38bc1 100644 --- a/crates/next-core/src/manifest.rs +++ b/crates/next-core/src/manifest.rs @@ -1,7 +1,6 @@ use anyhow::Result; -use indexmap::IndexSet; use mime::APPLICATION_JSON; -use turbo_tasks::primitives::StringsVc; +use turbo_tasks::{primitives::StringsVc, TryFlatMapRecursiveJoinIterExt, TryJoinIterExt}; use turbo_tasks_fs::File; use turbopack_core::asset::AssetContentVc; use turbopack_dev_server::source::{ @@ -24,31 +23,47 @@ impl DevManifestContentSourceVc { #[turbo_tasks::function] async fn find_routes(self) -> Result { let this = &*self.await?; - let mut queue = this.page_roots.clone(); - let mut routes = IndexSet::new(); - - while let Some(content_source) = queue.pop() { - queue.extend(content_source.get_children().await?.iter()); + async fn content_source_to_pathname( + content_source: ContentSourceVc, + ) -> Result> { // TODO This shouldn't use casts but an public api instead if let Some(api_source) = NodeApiContentSourceVc::resolve_from(content_source).await? { - routes.insert(format!("/{}", api_source.get_pathname().await?)); - - continue; + return Ok(Some(format!("/{}", api_source.get_pathname().await?))); } if let Some(page_source) = NodeRenderContentSourceVc::resolve_from(content_source).await? { - routes.insert(format!("/{}", page_source.get_pathname().await?)); - - continue; + return Ok(Some(format!("/{}", page_source.get_pathname().await?))); } + + Ok(None) } + async fn get_content_source_children( + content_source: ContentSourceVc, + ) -> Result> { + Ok(content_source.get_children().await?.clone_value()) + } + + let mut routes = this + .page_roots + .iter() + .copied() + .try_flat_map_recursive_join(get_content_source_children) + .await? + .into_iter() + .map(content_source_to_pathname) + .try_join() + .await? + .into_iter() + .flatten() + .collect::>(); + routes.sort(); - Ok(StringsVc::cell(routes.into_iter().collect())) + Ok(StringsVc::cell(routes)) } } diff --git a/crates/next-core/src/next_font_google/mod.rs b/crates/next-core/src/next_font_google/mod.rs index 1160b48f8ea02..6aef378d4923f 100644 --- a/crates/next-core/src/next_font_google/mod.rs +++ b/crates/next-core/src/next_font_google/mod.rs @@ -168,7 +168,7 @@ impl ImportMappingReplacement for NextFontGoogleCssModuleReplacer { Ok(r) => Some( update_stylesheet(r.await?.body.to_string(), options, scoped_font_family) .await? - .clone(), + .clone_value(), ), Err(err) => { // Inform the user of the failure to retreive the stylesheet, but don't diff --git a/crates/turbo-tasks-fetch/src/lib.rs b/crates/turbo-tasks-fetch/src/lib.rs index baa6cdfe74e07..4c7cf1d0c83b1 100644 --- a/crates/turbo-tasks-fetch/src/lib.rs +++ b/crates/turbo-tasks-fetch/src/lib.rs @@ -37,11 +37,11 @@ impl HttpResponseBodyVc { #[turbo_tasks::function] pub async fn fetch(url: StringVc, user_agent: OptionStringVc) -> Result { - let url = url.await?.clone(); + let url = &*url.await?; let user_agent = &*user_agent.await?; let client = reqwest::Client::new(); - let mut builder = client.get(&url); + let mut builder = client.get(url); if let Some(user_agent) = user_agent { builder = builder.header("User-Agent", user_agent); } @@ -59,7 +59,7 @@ pub async fn fetch(url: StringVc, user_agent: OptionStringVc) -> Result Ok(FetchResultVc::cell(Err(FetchError::from_reqwest_error( - &err, &url, + &err, url, ) .cell()))), } diff --git a/crates/turbo-tasks-fs/examples/hash_glob.rs b/crates/turbo-tasks-fs/examples/hash_glob.rs index fc9a16d2b9cee..8785e94ee79ee 100644 --- a/crates/turbo-tasks-fs/examples/hash_glob.rs +++ b/crates/turbo-tasks-fs/examples/hash_glob.rs @@ -74,7 +74,7 @@ async fn hash_glob_result(result: ReadGlobResultVc) -> Result { for (name, result) in result.inner.iter() { let hash = hash_glob_result(*result).await?; if !hash.is_empty() { - hashes.insert(name, hash.clone()); + hashes.insert(name, hash.clone_value()); } } if hashes.is_empty() { diff --git a/crates/turbo-tasks/Cargo.toml b/crates/turbo-tasks/Cargo.toml index 0912358b9aedf..cccf01b3391d2 100644 --- a/crates/turbo-tasks/Cargo.toml +++ b/crates/turbo-tasks/Cargo.toml @@ -33,6 +33,7 @@ regex = "1.6.0" serde = { version = "1.0.136", features = ["rc", "derive"] } serde_json = "1.0.85" serde_regex = "1.1.0" +stable_deref_trait = "1.2.0" thiserror = "1.0.31" tokio = { version = "1.21.2", features = ["full"] } turbo-tasks-hash = { path = "../turbo-tasks-hash" } diff --git a/crates/turbo-tasks/src/join_iter_ext.rs b/crates/turbo-tasks/src/join_iter_ext.rs index 4e7befb3f0fd6..31c3fc7fd1983 100644 --- a/crates/turbo-tasks/src/join_iter_ext.rs +++ b/crates/turbo-tasks/src/join_iter_ext.rs @@ -1,10 +1,18 @@ -use std::future::{Future, IntoFuture}; +use std::{ + future::{Future, IntoFuture}, + hash::Hash, + mem::take, + pin::Pin, + task::ready, +}; use anyhow::Result; use futures::{ future::{join_all, JoinAll}, - FutureExt, + stream::FuturesOrdered, + FutureExt, Stream, }; +use indexmap::IndexSet; /// Future for the [JoinIterExt::join] method. pub struct Join @@ -57,6 +65,53 @@ where } } +pub struct TryFlatMapRecursiveJoin +where + T: Hash + PartialEq + Eq + Clone, + C: Fn(T) -> F, + F: Future>, + CI: IntoIterator, +{ + set: IndexSet, + futures: FuturesOrdered, + get_children: C, +} + +impl Future for TryFlatMapRecursiveJoin +where + T: Hash + PartialEq + Eq + Clone, + C: Fn(T) -> F, + F: Future>, + CI: IntoIterator, +{ + type Output = Result>; + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let this = unsafe { self.get_unchecked_mut() }; + loop { + let futures = unsafe { Pin::new_unchecked(&mut this.futures) }; + if let Some(result) = ready!(futures.poll_next(cx)) { + match result { + Ok(children) => { + for item in children { + let (index, new) = this.set.insert_full(item); + if new { + this.futures + .push_back((this.get_children)(this.set[index].clone())); + } + } + } + Err(err) => return std::task::Poll::Ready(Err(err)), + } + } else { + return std::task::Poll::Ready(Ok(take(&mut this.set))); + } + } + } +} + pub trait JoinIterExt: Iterator where T: Unpin, @@ -80,6 +135,24 @@ where fn try_join(self) -> TryJoin; } +pub trait TryFlatMapRecursiveJoinIterExt: Iterator +where + T: Hash + PartialEq + Eq + Clone, + C: Fn(T) -> F, + F: Future>, + CI: IntoIterator, +{ + /// Applies the `get_children` function on each item in the iterator, and on + /// each item that is returned by `get_children`. Collects all items from + /// the iterator and all items returns by `get_children` into an index set. + /// The order of items is equal to a breadth-first traversal of the tree, + /// but `get_children` will execute concurrently. It will handle circular + /// references gracefully. Returns a future that resolve to a + /// [Result]. It will resolve to the first error that occur in + /// breadth-first order. + fn try_flat_map_recursive_join(self, get_children: C) -> TryFlatMapRecursiveJoin; +} + impl JoinIterExt for It where T: Unpin, @@ -107,3 +180,28 @@ where } } } + +impl TryFlatMapRecursiveJoinIterExt for It +where + T: Hash + PartialEq + Eq + Clone, + C: Fn(T) -> F, + F: Future>, + CI: IntoIterator, + It: Iterator, +{ + fn try_flat_map_recursive_join(self, get_children: C) -> TryFlatMapRecursiveJoin { + let mut set = IndexSet::new(); + let mut futures = FuturesOrdered::new(); + for item in self { + let (index, new) = set.insert_full(item); + if new { + futures.push_back(get_children(set[index].clone())); + } + } + TryFlatMapRecursiveJoin { + set, + futures, + get_children, + } + } +} diff --git a/crates/turbo-tasks/src/lib.rs b/crates/turbo-tasks/src/lib.rs index da011152d292f..d9342720451ae 100644 --- a/crates/turbo-tasks/src/lib.rs +++ b/crates/turbo-tasks/src/lib.rs @@ -70,7 +70,7 @@ pub use id::{ with_task_id_mapping, without_task_id_mapping, FunctionId, IdMapping, TaskId, TraitTypeId, ValueTypeId, }; -pub use join_iter_ext::{JoinIterExt, TryJoinIterExt}; +pub use join_iter_ext::{JoinIterExt, TryFlatMapRecursiveJoinIterExt, TryJoinIterExt}; pub use manager::{ dynamic_call, emit, get_invalidator, mark_stateful, run_once, spawn_blocking, spawn_thread, trait_call, turbo_tasks, Invalidator, StatsType, TaskIdProvider, TurboTasks, TurboTasksApi, diff --git a/crates/turbo-tasks/src/read_ref.rs b/crates/turbo-tasks/src/read_ref.rs index 41a56f77237bb..b60e454796180 100644 --- a/crates/turbo-tasks/src/read_ref.rs +++ b/crates/turbo-tasks/src/read_ref.rs @@ -22,9 +22,14 @@ use crate::{ /// Internally it stores a reference counted reference to a value on the heap. /// /// Invariant: T and U are binary identical (#[repr(transparent)]) -#[derive(Clone)] pub struct ReadRef(Arc, PhantomData>); +impl Clone for ReadRef { + fn clone(&self) -> Self { + Self(self.0.clone(), PhantomData) + } +} + impl std::ops::Deref for ReadRef { type Target = U; @@ -34,6 +39,9 @@ impl std::ops::Deref for ReadRef { } } +unsafe impl stable_deref_trait::StableDeref for ReadRef {} +unsafe impl stable_deref_trait::CloneStableDeref for ReadRef {} + impl Display for ReadRef { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { Display::fmt(&**self, f) diff --git a/crates/turbopack-core/src/chunk/mod.rs b/crates/turbopack-core/src/chunk/mod.rs index d4b2627950844..b4c40deaa203d 100644 --- a/crates/turbopack-core/src/chunk/mod.rs +++ b/crates/turbopack-core/src/chunk/mod.rs @@ -13,7 +13,7 @@ use turbo_tasks::{ debug::ValueDebugFormat, primitives::{BoolVc, StringVc}, trace::TraceRawVcs, - ValueToString, ValueToStringVc, + TryFlatMapRecursiveJoinIterExt, TryJoinIterExt, ValueToString, ValueToStringVc, }; use turbo_tasks_fs::FileSystemPathVc; use turbo_tasks_hash::DeterministicHash; @@ -124,27 +124,48 @@ impl ChunkGroupVc { /// All chunks should be loaded in parallel. #[turbo_tasks::function] pub async fn chunks(self) -> Result { - let mut chunks = IndexSet::new(); - - let mut queue = vec![self.await?.entry]; - while let Some(chunk) = queue.pop() { - let chunk = chunk.resolve().await?; - if chunks.insert(chunk) { - for r in chunk.references().await?.iter() { - if let Some(pc) = ParallelChunkReferenceVc::resolve_from(r).await? { - if *pc.is_loaded_in_parallel().await? { - let result = r.resolve_reference(); - for a in result.primary_assets().await?.iter() { - if let Some(chunk) = ChunkVc::resolve_from(a).await? { - queue.push(chunk); - } - } - } - } + async fn reference_to_chunks( + r: AssetReferenceVc, + ) -> Result + Send> { + let mut result = Vec::new(); + if let Some(pc) = ParallelChunkReferenceVc::resolve_from(r).await? { + if *pc.is_loaded_in_parallel().await? { + result = r + .resolve_reference() + .primary_assets() + .await? + .iter() + .map(|r| async move { Ok(ChunkVc::resolve_from(r).await?) }) + .try_join() + .await?; } } + Ok(result.into_iter().flatten()) } + // async fn get_chunk_children( + // chunk: ChunkVc, + // ) -> Result>>>>> { + async fn get_chunk_children( + chunk: ChunkVc, + ) -> Result + Send> { + Ok(chunk + .references() + .await? + .iter() + .copied() + .map(reference_to_chunks) + .try_join() + .await? + .into_iter() + .flatten()) + } + + let chunks = [self.await?.entry] + .into_iter() + .try_flat_map_recursive_join(get_chunk_children) + .await?; + let chunks = ChunksVc::cell(chunks.into_iter().collect()); let chunks = optimize(chunks, self); diff --git a/crates/turbopack-core/src/resolve/parse.rs b/crates/turbopack-core/src/resolve/parse.rs index 7060584d37b6f..6418f41a50856 100644 --- a/crates/turbopack-core/src/resolve/parse.rs +++ b/crates/turbopack-core/src/resolve/parse.rs @@ -314,15 +314,13 @@ impl ValueToString for Request { } => format!("uri \"{protocol}\" \"{remainder}\""), Request::Unknown { path } => format!("unknown {path}"), Request::Dynamic => "dynamic".to_string(), - Request::Alternatives { requests } => requests - .iter() - .map(|i| i.to_string()) - .try_join() - .await? - .into_iter() - .map(|r| r.clone()) - .collect::>() - .join(" or "), + Request::Alternatives { requests } => { + let vec = requests.iter().map(|i| i.to_string()).try_join().await?; + vec.iter() + .map(|r| r.as_str()) + .collect::>() + .join(" or ") + } })) } } diff --git a/crates/turbopack-ecmascript/src/references/cjs.rs b/crates/turbopack-ecmascript/src/references/cjs.rs index f6717ba4bc954..897f4e47e420d 100644 --- a/crates/turbopack-ecmascript/src/references/cjs.rs +++ b/crates/turbopack-ecmascript/src/references/cjs.rs @@ -113,7 +113,7 @@ impl CodeGenerateable for CjsRequireAssetReference { let path = &self.path.await?; if let PatternMapping::Invalid = &*pm { - let request_string = self.request.to_string().await?.clone(); + let request_string = self.request.to_string().await?; visitors.push(create_visitor!(path, visit_mut_expr(expr: &mut Expr) { // In Node.js, a require call that cannot be resolved will throw an error. *expr = throw_module_not_found_expr(&request_string); @@ -202,7 +202,7 @@ impl CodeGenerateable for CjsRequireResolveAssetReference { let path = &self.path.await?; if let PatternMapping::Invalid = &*pm { - let request_string = self.request.to_string().await?.clone(); + let request_string = self.request.to_string().await?; visitors.push(create_visitor!(path, visit_mut_expr(expr: &mut Expr) { // In Node.js, a require.resolve call that cannot be resolved will throw an error. *expr = throw_module_not_found_expr(&request_string); diff --git a/crates/turbopack-tests/tests/snapshot/emotion/emotion/output/crates_turbopack-tests_tests_snapshot_emotion_emotion_input_index_6545dc.js b/crates/turbopack-tests/tests/snapshot/emotion/emotion/output/crates_turbopack-tests_tests_snapshot_emotion_emotion_input_index_f0bbb5.js similarity index 98% rename from crates/turbopack-tests/tests/snapshot/emotion/emotion/output/crates_turbopack-tests_tests_snapshot_emotion_emotion_input_index_6545dc.js rename to crates/turbopack-tests/tests/snapshot/emotion/emotion/output/crates_turbopack-tests_tests_snapshot_emotion_emotion_input_index_f0bbb5.js index 37c777498018f..9f094626a8880 100644 --- a/crates/turbopack-tests/tests/snapshot/emotion/emotion/output/crates_turbopack-tests_tests_snapshot_emotion_emotion_input_index_6545dc.js +++ b/crates/turbopack-tests/tests/snapshot/emotion/emotion/output/crates_turbopack-tests_tests_snapshot_emotion_emotion_input_index_f0bbb5.js @@ -1,4 +1,4 @@ -(self.TURBOPACK = self.TURBOPACK || []).push(["output/crates_turbopack-tests_tests_snapshot_emotion_emotion_input_index_6545dc.js", { +(self.TURBOPACK = self.TURBOPACK || []).push(["output/crates_turbopack-tests_tests_snapshot_emotion_emotion_input_index_f0bbb5.js", { "[project]/crates/turbopack-tests/tests/snapshot/emotion/emotion/input/index.js (ecmascript)": (({ r: __turbopack_require__, x: __turbopack_external_require__, i: __turbopack_import__, s: __turbopack_esm__, v: __turbopack_export_value__, c: __turbopack_cache__, l: __turbopack_load__, j: __turbopack_cjs__, p: process, g: global, __dirname }) => (() => { @@ -29,7 +29,7 @@ console.log(StyledButton, ClassNameButton); })()), }, ({ loadedChunks, instantiateRuntimeModule }) => { - if(!(true && loadedChunks.has("output/63a02_@emotion_styled_index.js") && loadedChunks.has("output/63a02_@emotion_react_index.js") && loadedChunks.has("output/63a02_@emotion_react_jsx-dev-runtime.js") && loadedChunks.has("output/crates_turbopack-tests_tests_snapshot_emotion_emotion_input_index_8c70a7.js"))) return true; + if(!(true && loadedChunks.has("output/63a02_@emotion_react_jsx-dev-runtime.js") && loadedChunks.has("output/63a02_@emotion_react_index.js") && loadedChunks.has("output/63a02_@emotion_styled_index.js") && loadedChunks.has("output/crates_turbopack-tests_tests_snapshot_emotion_emotion_input_index_ccc906.js"))) return true; instantiateRuntimeModule("[project]/crates/turbopack-tests/tests/snapshot/emotion/emotion/input/index.js (ecmascript)"); }]); (() => { @@ -1079,4 +1079,4 @@ globalThis.TURBOPACK = { })(); -//# sourceMappingURL=crates_turbopack-tests_tests_snapshot_emotion_emotion_input_index_6545dc.js.map \ No newline at end of file +//# sourceMappingURL=crates_turbopack-tests_tests_snapshot_emotion_emotion_input_index_f0bbb5.js.map \ No newline at end of file diff --git a/crates/turbopack-tests/tests/snapshot/emotion/emotion/output/crates_turbopack-tests_tests_snapshot_emotion_emotion_input_index_6545dc.js.map b/crates/turbopack-tests/tests/snapshot/emotion/emotion/output/crates_turbopack-tests_tests_snapshot_emotion_emotion_input_index_f0bbb5.js.map similarity index 100% rename from crates/turbopack-tests/tests/snapshot/emotion/emotion/output/crates_turbopack-tests_tests_snapshot_emotion_emotion_input_index_6545dc.js.map rename to crates/turbopack-tests/tests/snapshot/emotion/emotion/output/crates_turbopack-tests_tests_snapshot_emotion_emotion_input_index_f0bbb5.js.map