Skip to content

Commit

Permalink
add helper for walking a tree concurrently and deterministic (vercel/…
Browse files Browse the repository at this point in the history
…turborepo#3619)

Similar to our `try_join` helper this adds a
`try_flat_map_recursive_join` helper, which allows to async expand a
tree structure into all nodes. It will call the async mapper function
concurrently to allow parallelism. It will handle circular and duplicate
references and return all nodes in a determinstic way (breath-first).
  • Loading branch information
sokra authored Feb 3, 2023
1 parent f682e73 commit 5e688da
Show file tree
Hide file tree
Showing 13 changed files with 196 additions and 55 deletions.
43 changes: 29 additions & 14 deletions crates/next-core/src/manifest.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use anyhow::Result;
use indexmap::IndexSet;
use mime::APPLICATION_JSON;
use turbo_tasks::primitives::StringsVc;
use turbo_tasks::{primitives::StringsVc, TryFlatMapRecursiveJoinIterExt, TryJoinIterExt};
use turbo_tasks_fs::File;
use turbopack_core::asset::AssetContentVc;
use turbopack_dev_server::source::{
Expand All @@ -24,31 +23,47 @@ impl DevManifestContentSourceVc {
#[turbo_tasks::function]
async fn find_routes(self) -> Result<StringsVc> {
let this = &*self.await?;
let mut queue = this.page_roots.clone();
let mut routes = IndexSet::new();

while let Some(content_source) = queue.pop() {
queue.extend(content_source.get_children().await?.iter());

async fn content_source_to_pathname(
content_source: ContentSourceVc,
) -> Result<Option<String>> {
// TODO This shouldn't use casts but an public api instead
if let Some(api_source) = NodeApiContentSourceVc::resolve_from(content_source).await? {
routes.insert(format!("/{}", api_source.get_pathname().await?));

continue;
return Ok(Some(format!("/{}", api_source.get_pathname().await?)));
}

if let Some(page_source) =
NodeRenderContentSourceVc::resolve_from(content_source).await?
{
routes.insert(format!("/{}", page_source.get_pathname().await?));

continue;
return Ok(Some(format!("/{}", page_source.get_pathname().await?)));
}

Ok(None)
}

async fn get_content_source_children(
content_source: ContentSourceVc,
) -> Result<Vec<ContentSourceVc>> {
Ok(content_source.get_children().await?.clone_value())
}

let mut routes = this
.page_roots
.iter()
.copied()
.try_flat_map_recursive_join(get_content_source_children)
.await?
.into_iter()
.map(content_source_to_pathname)
.try_join()
.await?
.into_iter()
.flatten()
.collect::<Vec<_>>();

routes.sort();

Ok(StringsVc::cell(routes.into_iter().collect()))
Ok(StringsVc::cell(routes))
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/next-core/src/next_font_google/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl ImportMappingReplacement for NextFontGoogleCssModuleReplacer {
Ok(r) => Some(
update_stylesheet(r.await?.body.to_string(), options, scoped_font_family)
.await?
.clone(),
.clone_value(),
),
Err(err) => {
// Inform the user of the failure to retreive the stylesheet, but don't
Expand Down
6 changes: 3 additions & 3 deletions crates/turbo-tasks-fetch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ impl HttpResponseBodyVc {

#[turbo_tasks::function]
pub async fn fetch(url: StringVc, user_agent: OptionStringVc) -> Result<FetchResultVc> {
let url = url.await?.clone();
let url = &*url.await?;
let user_agent = &*user_agent.await?;
let client = reqwest::Client::new();

let mut builder = client.get(&url);
let mut builder = client.get(url);
if let Some(user_agent) = user_agent {
builder = builder.header("User-Agent", user_agent);
}
Expand All @@ -59,7 +59,7 @@ pub async fn fetch(url: StringVc, user_agent: OptionStringVc) -> Result<FetchRes
.cell())))
}
Err(err) => Ok(FetchResultVc::cell(Err(FetchError::from_reqwest_error(
&err, &url,
&err, url,
)
.cell()))),
}
Expand Down
2 changes: 1 addition & 1 deletion crates/turbo-tasks-fs/examples/hash_glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async fn hash_glob_result(result: ReadGlobResultVc) -> Result<StringVc> {
for (name, result) in result.inner.iter() {
let hash = hash_glob_result(*result).await?;
if !hash.is_empty() {
hashes.insert(name, hash.clone());
hashes.insert(name, hash.clone_value());
}
}
if hashes.is_empty() {
Expand Down
1 change: 1 addition & 0 deletions crates/turbo-tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ regex = "1.6.0"
serde = { version = "1.0.136", features = ["rc", "derive"] }
serde_json = "1.0.85"
serde_regex = "1.1.0"
stable_deref_trait = "1.2.0"
thiserror = "1.0.31"
tokio = { version = "1.21.2", features = ["full"] }
turbo-tasks-hash = { path = "../turbo-tasks-hash" }
Expand Down
102 changes: 100 additions & 2 deletions crates/turbo-tasks/src/join_iter_ext.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
use std::future::{Future, IntoFuture};
use std::{
future::{Future, IntoFuture},
hash::Hash,
mem::take,
pin::Pin,
task::ready,
};

use anyhow::Result;
use futures::{
future::{join_all, JoinAll},
FutureExt,
stream::FuturesOrdered,
FutureExt, Stream,
};
use indexmap::IndexSet;

/// Future for the [JoinIterExt::join] method.
pub struct Join<F>
Expand Down Expand Up @@ -57,6 +65,53 @@ where
}
}

pub struct TryFlatMapRecursiveJoin<T, C, F, CI>
where
T: Hash + PartialEq + Eq + Clone,
C: Fn(T) -> F,
F: Future<Output = Result<CI>>,
CI: IntoIterator<Item = T>,
{
set: IndexSet<T>,
futures: FuturesOrdered<F>,
get_children: C,
}

impl<T, C, F, CI> Future for TryFlatMapRecursiveJoin<T, C, F, CI>
where
T: Hash + PartialEq + Eq + Clone,
C: Fn(T) -> F,
F: Future<Output = Result<CI>>,
CI: IntoIterator<Item = T>,
{
type Output = Result<IndexSet<T>>;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
loop {
let futures = unsafe { Pin::new_unchecked(&mut this.futures) };
if let Some(result) = ready!(futures.poll_next(cx)) {
match result {
Ok(children) => {
for item in children {
let (index, new) = this.set.insert_full(item);
if new {
this.futures
.push_back((this.get_children)(this.set[index].clone()));
}
}
}
Err(err) => return std::task::Poll::Ready(Err(err)),
}
} else {
return std::task::Poll::Ready(Ok(take(&mut this.set)));
}
}
}
}

pub trait JoinIterExt<T, F>: Iterator
where
T: Unpin,
Expand All @@ -80,6 +135,24 @@ where
fn try_join(self) -> TryJoin<F>;
}

pub trait TryFlatMapRecursiveJoinIterExt<T, C, F, CI>: Iterator
where
T: Hash + PartialEq + Eq + Clone,
C: Fn(T) -> F,
F: Future<Output = Result<CI>>,
CI: IntoIterator<Item = T>,
{
/// Applies the `get_children` function on each item in the iterator, and on
/// each item that is returned by `get_children`. Collects all items from
/// the iterator and all items returns by `get_children` into an index set.
/// The order of items is equal to a breadth-first traversal of the tree,
/// but `get_children` will execute concurrently. It will handle circular
/// references gracefully. Returns a future that resolve to a
/// [Result<IndexSet>]. It will resolve to the first error that occur in
/// breadth-first order.
fn try_flat_map_recursive_join(self, get_children: C) -> TryFlatMapRecursiveJoin<T, C, F, CI>;
}

impl<T, F, IF, It> JoinIterExt<T, F> for It
where
T: Unpin,
Expand Down Expand Up @@ -107,3 +180,28 @@ where
}
}
}

impl<T, C, F, CI, It> TryFlatMapRecursiveJoinIterExt<T, C, F, CI> for It
where
T: Hash + PartialEq + Eq + Clone,
C: Fn(T) -> F,
F: Future<Output = Result<CI>>,
CI: IntoIterator<Item = T>,
It: Iterator<Item = T>,
{
fn try_flat_map_recursive_join(self, get_children: C) -> TryFlatMapRecursiveJoin<T, C, F, CI> {
let mut set = IndexSet::new();
let mut futures = FuturesOrdered::new();
for item in self {
let (index, new) = set.insert_full(item);
if new {
futures.push_back(get_children(set[index].clone()));
}
}
TryFlatMapRecursiveJoin {
set,
futures,
get_children,
}
}
}
2 changes: 1 addition & 1 deletion crates/turbo-tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub use id::{
with_task_id_mapping, without_task_id_mapping, FunctionId, IdMapping, TaskId, TraitTypeId,
ValueTypeId,
};
pub use join_iter_ext::{JoinIterExt, TryJoinIterExt};
pub use join_iter_ext::{JoinIterExt, TryFlatMapRecursiveJoinIterExt, TryJoinIterExt};
pub use manager::{
dynamic_call, emit, get_invalidator, mark_stateful, run_once, spawn_blocking, spawn_thread,
trait_call, turbo_tasks, Invalidator, StatsType, TaskIdProvider, TurboTasks, TurboTasksApi,
Expand Down
10 changes: 9 additions & 1 deletion crates/turbo-tasks/src/read_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ use crate::{
/// Internally it stores a reference counted reference to a value on the heap.
///
/// Invariant: T and U are binary identical (#[repr(transparent)])
#[derive(Clone)]
pub struct ReadRef<T, U = T>(Arc<T>, PhantomData<Arc<U>>);

impl<T, U> Clone for ReadRef<T, U> {
fn clone(&self) -> Self {
Self(self.0.clone(), PhantomData)
}
}

impl<T, U> std::ops::Deref for ReadRef<T, U> {
type Target = U;

Expand All @@ -34,6 +39,9 @@ impl<T, U> std::ops::Deref for ReadRef<T, U> {
}
}

unsafe impl<T, U> stable_deref_trait::StableDeref for ReadRef<T, U> {}
unsafe impl<T, U> stable_deref_trait::CloneStableDeref for ReadRef<T, U> {}

impl<T, U: Display> Display for ReadRef<T, U> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Display::fmt(&**self, f)
Expand Down
57 changes: 39 additions & 18 deletions crates/turbopack-core/src/chunk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use turbo_tasks::{
debug::ValueDebugFormat,
primitives::{BoolVc, StringVc},
trace::TraceRawVcs,
ValueToString, ValueToStringVc,
TryFlatMapRecursiveJoinIterExt, TryJoinIterExt, ValueToString, ValueToStringVc,
};
use turbo_tasks_fs::FileSystemPathVc;
use turbo_tasks_hash::DeterministicHash;
Expand Down Expand Up @@ -124,27 +124,48 @@ impl ChunkGroupVc {
/// All chunks should be loaded in parallel.
#[turbo_tasks::function]
pub async fn chunks(self) -> Result<ChunksVc> {
let mut chunks = IndexSet::new();

let mut queue = vec![self.await?.entry];
while let Some(chunk) = queue.pop() {
let chunk = chunk.resolve().await?;
if chunks.insert(chunk) {
for r in chunk.references().await?.iter() {
if let Some(pc) = ParallelChunkReferenceVc::resolve_from(r).await? {
if *pc.is_loaded_in_parallel().await? {
let result = r.resolve_reference();
for a in result.primary_assets().await?.iter() {
if let Some(chunk) = ChunkVc::resolve_from(a).await? {
queue.push(chunk);
}
}
}
}
async fn reference_to_chunks(
r: AssetReferenceVc,
) -> Result<impl Iterator<Item = ChunkVc> + Send> {
let mut result = Vec::new();
if let Some(pc) = ParallelChunkReferenceVc::resolve_from(r).await? {
if *pc.is_loaded_in_parallel().await? {
result = r
.resolve_reference()
.primary_assets()
.await?
.iter()
.map(|r| async move { Ok(ChunkVc::resolve_from(r).await?) })
.try_join()
.await?;
}
}
Ok(result.into_iter().flatten())
}

// async fn get_chunk_children(
// chunk: ChunkVc,
// ) -> Result<Flatten<IntoIter<Flatten<IntoIter<Option<ChunkVc>>>>>> {
async fn get_chunk_children(
chunk: ChunkVc,
) -> Result<impl Iterator<Item = ChunkVc> + Send> {
Ok(chunk
.references()
.await?
.iter()
.copied()
.map(reference_to_chunks)
.try_join()
.await?
.into_iter()
.flatten())
}

let chunks = [self.await?.entry]
.into_iter()
.try_flat_map_recursive_join(get_chunk_children)
.await?;

let chunks = ChunksVc::cell(chunks.into_iter().collect());
let chunks = optimize(chunks, self);

Expand Down
16 changes: 7 additions & 9 deletions crates/turbopack-core/src/resolve/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,15 +314,13 @@ impl ValueToString for Request {
} => format!("uri \"{protocol}\" \"{remainder}\""),
Request::Unknown { path } => format!("unknown {path}"),
Request::Dynamic => "dynamic".to_string(),
Request::Alternatives { requests } => requests
.iter()
.map(|i| i.to_string())
.try_join()
.await?
.into_iter()
.map(|r| r.clone())
.collect::<Vec<_>>()
.join(" or "),
Request::Alternatives { requests } => {
let vec = requests.iter().map(|i| i.to_string()).try_join().await?;
vec.iter()
.map(|r| r.as_str())
.collect::<Vec<_>>()
.join(" or ")
}
}))
}
}
4 changes: 2 additions & 2 deletions crates/turbopack-ecmascript/src/references/cjs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl CodeGenerateable for CjsRequireAssetReference {

let path = &self.path.await?;
if let PatternMapping::Invalid = &*pm {
let request_string = self.request.to_string().await?.clone();
let request_string = self.request.to_string().await?;
visitors.push(create_visitor!(path, visit_mut_expr(expr: &mut Expr) {
// In Node.js, a require call that cannot be resolved will throw an error.
*expr = throw_module_not_found_expr(&request_string);
Expand Down Expand Up @@ -202,7 +202,7 @@ impl CodeGenerateable for CjsRequireResolveAssetReference {

let path = &self.path.await?;
if let PatternMapping::Invalid = &*pm {
let request_string = self.request.to_string().await?.clone();
let request_string = self.request.to_string().await?;
visitors.push(create_visitor!(path, visit_mut_expr(expr: &mut Expr) {
// In Node.js, a require.resolve call that cannot be resolved will throw an error.
*expr = throw_module_not_found_expr(&request_string);
Expand Down
Loading

0 comments on commit 5e688da

Please sign in to comment.