Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add helper for walking a tree concurrently and deterministic #3619

Merged
merged 7 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 29 additions & 14 deletions crates/next-core/src/manifest.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use anyhow::Result;
use indexmap::IndexSet;
use mime::APPLICATION_JSON;
use turbo_tasks::primitives::StringsVc;
use turbo_tasks::{primitives::StringsVc, TryFlatMapRecursiveJoinIterExt, TryJoinIterExt};
use turbo_tasks_fs::File;
use turbopack_core::asset::AssetContentVc;
use turbopack_dev_server::source::{
Expand All @@ -24,31 +23,47 @@ impl DevManifestContentSourceVc {
#[turbo_tasks::function]
async fn find_routes(self) -> Result<StringsVc> {
let this = &*self.await?;
let mut queue = this.page_roots.clone();
let mut routes = IndexSet::new();

while let Some(content_source) = queue.pop() {
queue.extend(content_source.get_children().await?.iter());

async fn content_source_to_pathname(
content_source: ContentSourceVc,
) -> Result<Option<String>> {
// TODO This shouldn't use casts but an public api instead
if let Some(api_source) = NodeApiContentSourceVc::resolve_from(content_source).await? {
routes.insert(format!("/{}", api_source.get_pathname().await?));

continue;
return Ok(Some(format!("/{}", api_source.get_pathname().await?)));
}

if let Some(page_source) =
NodeRenderContentSourceVc::resolve_from(content_source).await?
{
routes.insert(format!("/{}", page_source.get_pathname().await?));

continue;
return Ok(Some(format!("/{}", page_source.get_pathname().await?)));
}

Ok(None)
}

async fn get_content_source_children(
content_source: ContentSourceVc,
) -> Result<Vec<ContentSourceVc>> {
Ok(content_source.get_children().await?.clone_value())
}

let mut routes = this
.page_roots
.iter()
.copied()
.try_flat_map_recursive_join(get_content_source_children)
.await?
.into_iter()
.map(content_source_to_pathname)
.try_join()
.await?
.into_iter()
.flatten()
.collect::<Vec<_>>();

routes.sort();

Ok(StringsVc::cell(routes.into_iter().collect()))
Ok(StringsVc::cell(routes))
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/next-core/src/next_font_google/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl ImportMappingReplacement for NextFontGoogleCssModuleReplacer {
Ok(r) => Some(
update_stylesheet(r.await?.body.to_string(), options, scoped_font_family)
.await?
.clone(),
.clone_value(),
),
Err(err) => {
// Inform the user of the failure to retreive the stylesheet, but don't
Expand Down
6 changes: 3 additions & 3 deletions crates/turbo-tasks-fetch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ impl HttpResponseBodyVc {

#[turbo_tasks::function]
pub async fn fetch(url: StringVc, user_agent: OptionStringVc) -> Result<FetchResultVc> {
let url = url.await?.clone();
let url = &*url.await?;
let user_agent = &*user_agent.await?;
let client = reqwest::Client::new();

let mut builder = client.get(&url);
let mut builder = client.get(url);
if let Some(user_agent) = user_agent {
builder = builder.header("User-Agent", user_agent);
}
Expand All @@ -59,7 +59,7 @@ pub async fn fetch(url: StringVc, user_agent: OptionStringVc) -> Result<FetchRes
.cell())))
}
Err(err) => Ok(FetchResultVc::cell(Err(FetchError::from_reqwest_error(
&err, &url,
&err, url,
)
.cell()))),
}
Expand Down
2 changes: 1 addition & 1 deletion crates/turbo-tasks-fs/examples/hash_glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async fn hash_glob_result(result: ReadGlobResultVc) -> Result<StringVc> {
for (name, result) in result.inner.iter() {
let hash = hash_glob_result(*result).await?;
if !hash.is_empty() {
hashes.insert(name, hash.clone());
hashes.insert(name, hash.clone_value());
}
}
if hashes.is_empty() {
Expand Down
1 change: 1 addition & 0 deletions crates/turbo-tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ regex = "1.6.0"
serde = { version = "1.0.136", features = ["rc", "derive"] }
serde_json = "1.0.85"
serde_regex = "1.1.0"
stable_deref_trait = "1.2.0"
thiserror = "1.0.31"
tokio = { version = "1.21.2", features = ["full"] }
turbo-tasks-hash = { path = "../turbo-tasks-hash" }
Expand Down
102 changes: 100 additions & 2 deletions crates/turbo-tasks/src/join_iter_ext.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
use std::future::{Future, IntoFuture};
use std::{
future::{Future, IntoFuture},
hash::Hash,
mem::take,
pin::Pin,
task::ready,
};

use anyhow::Result;
use futures::{
future::{join_all, JoinAll},
FutureExt,
stream::FuturesOrdered,
FutureExt, Stream,
};
use indexmap::IndexSet;

/// Future for the [JoinIterExt::join] method.
pub struct Join<F>
Expand Down Expand Up @@ -57,6 +65,53 @@ where
}
}

pub struct TryFlatMapRecursiveJoin<T, C, F, CI>
where
T: Hash + PartialEq + Eq + Clone,
C: Fn(T) -> F,
F: Future<Output = Result<CI>>,
CI: IntoIterator<Item = T>,
{
set: IndexSet<T>,
futures: FuturesOrdered<F>,
get_children: C,
}

impl<T, C, F, CI> Future for TryFlatMapRecursiveJoin<T, C, F, CI>
where
T: Hash + PartialEq + Eq + Clone,
C: Fn(T) -> F,
F: Future<Output = Result<CI>>,
CI: IntoIterator<Item = T>,
{
type Output = Result<IndexSet<T>>;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
loop {
let futures = unsafe { Pin::new_unchecked(&mut this.futures) };
if let Some(result) = ready!(futures.poll_next(cx)) {
match result {
Ok(children) => {
for item in children {
let (index, new) = this.set.insert_full(item);
if new {
this.futures
.push_back((this.get_children)(this.set[index].clone()));
}
}
}
Err(err) => return std::task::Poll::Ready(Err(err)),
}
} else {
return std::task::Poll::Ready(Ok(take(&mut this.set)));
}
}
}
}

pub trait JoinIterExt<T, F>: Iterator
where
T: Unpin,
Expand All @@ -80,6 +135,24 @@ where
fn try_join(self) -> TryJoin<F>;
}

pub trait TryFlatMapRecursiveJoinIterExt<T, C, F, CI>: Iterator
where
T: Hash + PartialEq + Eq + Clone,
C: Fn(T) -> F,
F: Future<Output = Result<CI>>,
CI: IntoIterator<Item = T>,
{
/// Applies the `get_children` function on each item in the iterator, and on
/// each item that is returned by `get_children`. Collects all items from
/// the iterator and all items returns by `get_children` into an index set.
/// The order of items is equal to a breadth-first traversal of the tree,
/// but `get_children` will execute concurrently. It will handle circular
/// references gracefully. Returns a future that resolve to a
/// [Result<IndexSet>]. It will resolve to the first error that occur in
/// breadth-first order.
fn try_flat_map_recursive_join(self, get_children: C) -> TryFlatMapRecursiveJoin<T, C, F, CI>;
sokra marked this conversation as resolved.
Show resolved Hide resolved
}

impl<T, F, IF, It> JoinIterExt<T, F> for It
where
T: Unpin,
Expand Down Expand Up @@ -107,3 +180,28 @@ where
}
}
}

impl<T, C, F, CI, It> TryFlatMapRecursiveJoinIterExt<T, C, F, CI> for It
where
T: Hash + PartialEq + Eq + Clone,
C: Fn(T) -> F,
F: Future<Output = Result<CI>>,
CI: IntoIterator<Item = T>,
It: Iterator<Item = T>,
{
fn try_flat_map_recursive_join(self, get_children: C) -> TryFlatMapRecursiveJoin<T, C, F, CI> {
let mut set = IndexSet::new();
let mut futures = FuturesOrdered::new();
for item in self {
let (index, new) = set.insert_full(item);
if new {
futures.push_back(get_children(set[index].clone()));
}
}
TryFlatMapRecursiveJoin {
set,
futures,
get_children,
}
}
}
2 changes: 1 addition & 1 deletion crates/turbo-tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub use id::{
with_task_id_mapping, without_task_id_mapping, FunctionId, IdMapping, TaskId, TraitTypeId,
ValueTypeId,
};
pub use join_iter_ext::{JoinIterExt, TryJoinIterExt};
pub use join_iter_ext::{JoinIterExt, TryFlatMapRecursiveJoinIterExt, TryJoinIterExt};
pub use manager::{
dynamic_call, emit, get_invalidator, mark_stateful, run_once, spawn_blocking, spawn_thread,
trait_call, turbo_tasks, Invalidator, StatsType, TaskIdProvider, TurboTasks, TurboTasksApi,
Expand Down
10 changes: 9 additions & 1 deletion crates/turbo-tasks/src/read_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ use crate::{
/// Internally it stores a reference counted reference to a value on the heap.
///
/// Invariant: T and U are binary identical (#[repr(transparent)])
#[derive(Clone)]
pub struct ReadRef<T, U = T>(Arc<T>, PhantomData<Arc<U>>);

impl<T, U> Clone for ReadRef<T, U> {
fn clone(&self) -> Self {
Self(self.0.clone(), PhantomData)
}
}

impl<T, U> std::ops::Deref for ReadRef<T, U> {
type Target = U;

Expand All @@ -34,6 +39,9 @@ impl<T, U> std::ops::Deref for ReadRef<T, U> {
}
}

unsafe impl<T, U> stable_deref_trait::StableDeref for ReadRef<T, U> {}
unsafe impl<T, U> stable_deref_trait::CloneStableDeref for ReadRef<T, U> {}

impl<T, U: Display> Display for ReadRef<T, U> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Display::fmt(&**self, f)
Expand Down
57 changes: 39 additions & 18 deletions crates/turbopack-core/src/chunk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use turbo_tasks::{
debug::ValueDebugFormat,
primitives::{BoolVc, StringVc},
trace::TraceRawVcs,
ValueToString, ValueToStringVc,
TryFlatMapRecursiveJoinIterExt, TryJoinIterExt, ValueToString, ValueToStringVc,
};
use turbo_tasks_fs::FileSystemPathVc;
use turbo_tasks_hash::DeterministicHash;
Expand Down Expand Up @@ -124,27 +124,48 @@ impl ChunkGroupVc {
/// All chunks should be loaded in parallel.
#[turbo_tasks::function]
pub async fn chunks(self) -> Result<ChunksVc> {
let mut chunks = IndexSet::new();

let mut queue = vec![self.await?.entry];
while let Some(chunk) = queue.pop() {
let chunk = chunk.resolve().await?;
if chunks.insert(chunk) {
for r in chunk.references().await?.iter() {
if let Some(pc) = ParallelChunkReferenceVc::resolve_from(r).await? {
if *pc.is_loaded_in_parallel().await? {
let result = r.resolve_reference();
for a in result.primary_assets().await?.iter() {
if let Some(chunk) = ChunkVc::resolve_from(a).await? {
queue.push(chunk);
}
}
}
}
async fn reference_to_chunks(
r: AssetReferenceVc,
) -> Result<impl Iterator<Item = ChunkVc> + Send> {
let mut result = Vec::new();
if let Some(pc) = ParallelChunkReferenceVc::resolve_from(r).await? {
if *pc.is_loaded_in_parallel().await? {
result = r
.resolve_reference()
.primary_assets()
.await?
.iter()
.map(|r| async move { Ok(ChunkVc::resolve_from(r).await?) })
.try_join()
.await?;
}
}
Ok(result.into_iter().flatten())
}

// async fn get_chunk_children(
// chunk: ChunkVc,
// ) -> Result<Flatten<IntoIter<Flatten<IntoIter<Option<ChunkVc>>>>>> {
async fn get_chunk_children(
chunk: ChunkVc,
) -> Result<impl Iterator<Item = ChunkVc> + Send> {
Ok(chunk
.references()
.await?
.iter()
.copied()
.map(reference_to_chunks)
.try_join()
.await?
.into_iter()
.flatten())
}

let chunks = [self.await?.entry]
.into_iter()
.try_flat_map_recursive_join(get_chunk_children)
.await?;

let chunks = ChunksVc::cell(chunks.into_iter().collect());
let chunks = optimize(chunks, self);

Expand Down
16 changes: 7 additions & 9 deletions crates/turbopack-core/src/resolve/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,15 +314,13 @@ impl ValueToString for Request {
} => format!("uri \"{protocol}\" \"{remainder}\""),
Request::Unknown { path } => format!("unknown {path}"),
Request::Dynamic => "dynamic".to_string(),
Request::Alternatives { requests } => requests
.iter()
.map(|i| i.to_string())
.try_join()
.await?
.into_iter()
.map(|r| r.clone())
.collect::<Vec<_>>()
.join(" or "),
Request::Alternatives { requests } => {
let vec = requests.iter().map(|i| i.to_string()).try_join().await?;
vec.iter()
.map(|r| r.as_str())
.collect::<Vec<_>>()
.join(" or ")
}
}))
}
}
Loading