Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Turbopack] enable new backend #69669

Draft
wants to merge 8 commits into
base: canary
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions packages/next/src/build/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1676,11 +1676,20 @@ export default async function build(
let shutdownPromise = Promise.resolve()
if (!isGenerateMode) {
if (turboNextBuild) {
const start = Date.now()
const {
duration: compilerDuration,
shutdownPromise: p,
...rest
} = await turbopackBuild()
const duration = Date.now() - start
const msg =
duration > 2000
? `Turbopack build: ${Math.round(duration / 10) / 100}s`
: `Turbopack build: ${Math.round(duration)}ms`
console.log(`\x1b[1;91;103m ${' '.repeat(msg.length)} \x1b[0m`)
console.log(`\x1b[1;91;103m ${msg} \x1b[0m`)
console.log(`\x1b[1;91;103m ${' '.repeat(msg.length)} \x1b[0m`)
shutdownPromise = p
traceMemoryUsage('Finished build', nextBuildSpan)

Expand All @@ -1705,6 +1714,7 @@ export default async function build(
buildStage: 'compile-server',
})

console.time('webpack build')
const serverBuildPromise = webpackBuild(useBuildWorker, [
'server',
]).then((res) => {
Expand Down Expand Up @@ -1772,6 +1782,7 @@ export default async function build(
durationInSeconds += res.duration
traceMemoryUsage('Finished client compilation', nextBuildSpan)
})
console.timeEnd('webpack build')

Log.event('Compiled successfully')

Expand Down
3 changes: 3 additions & 0 deletions packages/next/src/server/dev/turbopack-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,9 @@ export function isPersistentCachingEnabled(
config: NextConfigComplete
): boolean {
const unstableValue = config.experimental.turbo?.unstablePersistentCaching
if (unstableValue === undefined) {
return true
}
if (typeof unstableValue === 'number' && unstableValue > 1) {
throw new Error(
'Persistent caching in this version of Turbopack is not as stable as expected. Upgrade to a newer version of Turbopack to use this feature with the expected stability.'
Expand Down
7 changes: 5 additions & 2 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,8 +795,11 @@ impl TurboTasksBackendInner {
}

fn get_task_description(&self, task: TaskId) -> std::string::String {
let task_type = self.lookup_task_type(task).expect("Task not found");
task_type.to_string()
if let Some(task_type) = self.lookup_task_type(task) {
task_type.to_string()
} else {
format!("{task:?} transient")
}
}

fn try_get_function_id(&self, task_id: TaskId) -> Option<FunctionId> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{cmp::max, collections::VecDeque, num::NonZeroU32};

use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use tracing::Span;
use turbo_tasks::{SessionId, TaskId};

use crate::{
Expand Down Expand Up @@ -316,7 +317,8 @@ impl AggregatedDataUpdate {

#[derive(Default, Serialize, Deserialize, Clone)]
pub struct AggregationUpdateQueue {
jobs: VecDeque<AggregationUpdateJob>,
#[serde(skip)]
jobs: VecDeque<(Span, AggregationUpdateJob)>,
}

impl AggregationUpdateQueue {
Expand All @@ -331,11 +333,12 @@ impl AggregationUpdateQueue {
}

pub fn push(&mut self, job: AggregationUpdateJob) {
self.jobs.push_back(job);
self.jobs.push_back((Span::current(), job));
}

pub fn extend(&mut self, jobs: impl IntoIterator<Item = AggregationUpdateJob>) {
self.jobs.extend(jobs);
self.jobs
.extend(jobs.into_iter().map(|job| (Span::current(), job)));
}

pub fn run(job: AggregationUpdateJob, ctx: &mut ExecuteContext<'_>) {
Expand All @@ -345,7 +348,9 @@ impl AggregationUpdateQueue {
}

pub fn process(&mut self, ctx: &mut ExecuteContext<'_>) -> bool {
if let Some(job) = self.jobs.pop_front() {
if let Some((span, job)) = self.jobs.pop_front() {
let span = span.entered();
let span = &*span;
match job {
AggregationUpdateJob::UpdateAggregationNumber {
task_id,
Expand All @@ -366,50 +371,56 @@ impl AggregationUpdateQueue {
if upper_ids.len() > new_follower_ids.len() {
if let Some(new_follower_id) = new_follower_ids.pop() {
if new_follower_ids.is_empty() {
self.jobs.push_front(
self.jobs.push_front((
span.clone(),
AggregationUpdateJob::InnerOfUppersHasNewFollower {
upper_ids,
new_follower_id,
},
);
));
} else {
self.jobs.push_front(
self.jobs.push_front((
span.clone(),
AggregationUpdateJob::InnerOfUppersHasNewFollowers {
upper_ids: upper_ids.clone(),
new_follower_ids,
},
);
self.jobs.push_front(
));
self.jobs.push_front((
span.clone(),
AggregationUpdateJob::InnerOfUppersHasNewFollower {
upper_ids,
new_follower_id,
},
);
));
}
}
} else {
#[allow(clippy::collapsible_if, reason = "readablility")]
if let Some(upper_id) = upper_ids.pop() {
if upper_ids.is_empty() {
self.jobs.push_front(
self.jobs.push_front((
span.clone(),
AggregationUpdateJob::InnerOfUpperHasNewFollowers {
upper_id,
new_follower_ids,
},
);
));
} else {
self.jobs.push_front(
self.jobs.push_front((
span.clone(),
AggregationUpdateJob::InnerOfUppersHasNewFollowers {
upper_ids,
new_follower_ids: new_follower_ids.clone(),
},
);
self.jobs.push_front(
));
self.jobs.push_front((
span.clone(),
AggregationUpdateJob::InnerOfUpperHasNewFollowers {
upper_id,
new_follower_ids,
},
);
));
}
}
}
Expand All @@ -432,22 +443,28 @@ impl AggregationUpdateQueue {
} => {
if let Some(lost_follower_id) = lost_follower_ids.pop() {
if lost_follower_ids.is_empty() {
self.jobs
.push_front(AggregationUpdateJob::InnerLostFollower {
self.jobs.push_front((
span.clone(),
AggregationUpdateJob::InnerLostFollower {
upper_ids,
lost_follower_id,
});
},
));
} else {
self.jobs
.push_front(AggregationUpdateJob::InnerLostFollowers {
self.jobs.push_front((
span.clone(),
AggregationUpdateJob::InnerLostFollowers {
upper_ids: upper_ids.clone(),
lost_follower_ids,
});
self.jobs
.push_front(AggregationUpdateJob::InnerLostFollower {
},
));
self.jobs.push_front((
span.clone(),
AggregationUpdateJob::InnerLostFollower {
upper_ids,
lost_follower_id,
});
},
));
}
}
}
Expand Down Expand Up @@ -625,7 +642,14 @@ impl AggregationUpdateQueue {
update: AggregatedDataUpdate,
) {
for upper_id in upper_ids {
let desc = ctx.backend.get_task_description(upper_id);
let mut upper = ctx.task(upper_id, TaskDataCategory::Meta);
let _span = tracing::trace_span!(
"aggregated_data_update",
task = desc,
n = get_aggregation_number(&upper),
)
.entered();
let diff = update.apply(&mut upper, ctx.session_id(), self);
if !diff.is_empty() {
let upper_ids = get_uppers(&upper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ pub fn make_task_dirty_internal(
}
_ => unreachable!(),
};
let _span = tracing::trace_span!(
"invalidate",
task = ctx.backend.get_task_description(task_id)
)
.entered();
let aggregated_update = dirty_container.update_with_dirty_state(&DirtyState {
clean_in_session: None,
});
Expand Down
13 changes: 13 additions & 0 deletions turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,19 @@ impl<'a> ExecuteContext<'a> {
task_id: TaskId,
category: TaskDataCategory,
) -> Vec<CachedDataItem> {
let _span = match category {
TaskDataCategory::Data => tracing::trace_span!(
"restore task data",
task = self.backend.get_task_description(task_id)
),
TaskDataCategory::Meta => tracing::trace_span!(
"restore task meta",
task = self.backend.get_task_description(task_id)
),
_ => unreachable!(),
}
.entered();

// Safety: `transaction` is a valid transaction from `self.backend.backing_storage`.
unsafe {
self.backend
Expand Down
2 changes: 1 addition & 1 deletion turbopack/crates/turbo-tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ edition = "2021"
bench = false

[features]
default = []
default = ["hanging_detection"]
tokio_tracing = ["tokio/tracing"]
hanging_detection = []

Expand Down
21 changes: 14 additions & 7 deletions turbopack/crates/turbopack-node/src/evaluate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use indexmap::indexmap;
use parking_lot::Mutex;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::Value as JsonValue;
use tracing::{trace_span, Instrument};
use turbo_tasks::{
duration_span, mark_finished, prevent_gc, util::SharedError, Completion, RawVc, TaskInput,
TryJoinIterExt, Value, Vc,
Expand Down Expand Up @@ -308,6 +309,7 @@ pub fn evaluate(
})
}

#[tracing::instrument(skip_all)]
pub async fn compute(
evaluate_context: impl EvaluateContext,
sender: Vc<JavaScriptStreamSender>,
Expand All @@ -324,9 +326,9 @@ pub async fn compute(

// Read this strongly consistent, since we don't want to run inconsistent
// node.js code.
let pool = pool.strongly_consistent().await?;
let pool = pool.strongly_consistent().instrument(tracing::trace_span!("pool")).await?;

let args = evaluate_context.args().iter().try_join().await?;
let args = evaluate_context.args().iter().try_join().instrument(tracing::trace_span!("args")).await?;
// Assume this is a one-off operation, so we can kill the process
// TODO use a better way to decide that.
let kill = !evaluate_context.keep_alive();
Expand All @@ -348,14 +350,15 @@ pub async fn compute(
},
PoolErrorHandler,
)
.instrument(trace_span!("evaluate"))
.await
.map_err(|(e, _)| e)?;

// The evaluation sent an initial intermediate value without completing. We'll
// need to spawn a new thread to continually pull data out of the process,
// and ferry that along.
loop {
let output = pull_operation(&mut operation, &pool, &evaluate_context, &mut state).await?;
let output = pull_operation(&mut operation, &pool, &evaluate_context, &mut state).instrument(trace_span!("pull")).await?;

match output {
LoopResult::Continue(data) => {
Expand All @@ -376,11 +379,14 @@ pub async fn compute(
}
}

evaluate_context.finish(state, &pool).await?;
async move {
evaluate_context.finish(state, &pool).await?;

if kill {
operation.wait_or_kill().await?;
}
if kill {
operation.wait_or_kill().await?;
}
anyhow::Ok(())
}.instrument(tracing::trace_span!("finish")).await?;
};

let mut sender = (sender.get)();
Expand All @@ -399,6 +405,7 @@ pub async fn compute(

/// Repeatedly pulls from the NodeJsOperation until we receive a
/// value/error/end.
#[tracing::instrument(skip_all)]
async fn pull_operation<T: EvaluateContext>(
operation: &mut NodeJsOperation,
pool: &NodeJsPool,
Expand Down
Loading