Skip to content

Commit

Permalink
[Turbopack] cleanup and documentation (#73496)
Browse files Browse the repository at this point in the history
### What?

* avoid flagging "thread" span as incomplete
* improve error message
* add comments
* fixup print_stats
* fix lmdb feature
  • Loading branch information
sokra authored Dec 10, 2024
1 parent 74671c1 commit 778ee4f
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 9 deletions.
2 changes: 1 addition & 1 deletion crates/next-core/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ pub async fn load_next_js_templateon<T: DeserializeOwned>(
let content = &*file_path.read().await?;

let FileContent::Content(file) = content else {
bail!("Expected file content at {}", path);
bail!("Expected file content at {}", file_path.to_string().await?);
};

let result: T = parse_json_rope_with_source_context(file.content())?;
Expand Down
2 changes: 1 addition & 1 deletion turbopack/crates/turbo-persistence/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ impl TurboPersistence {
.into_iter()
.map(|seq| self.open_sst(seq))
.collect::<Result<Vec<StaticSortedFile>>>()?;
#[cfg(feature = "stats")]
#[cfg(feature = "print_stats")]
{
for sst in sst_files.iter() {
let crate::static_sorted_file::StaticSortedFileRange {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@ use crate::{
pub const LEAF_NUMBER: u32 = 16;
const MAX_COUNT_BEFORE_YIELD: usize = 1000;

/// Returns true, when a node is aggregating its children and a partial subgraph.
pub fn is_aggregating_node(aggregation_number: u32) -> bool {
aggregation_number >= LEAF_NUMBER
}

/// Returns true, when a node is aggregating the whole subgraph.
pub fn is_root_node(aggregation_number: u32) -> bool {
aggregation_number == u32::MAX
}

/// Returns a list of tasks that are considered as "following" the task.
fn get_followers_with_aggregation_number(
task: &impl TaskGuard,
aggregation_number: u32,
Expand All @@ -54,71 +57,89 @@ fn get_followers_with_aggregation_number(
}
}

/// Returns a list of tasks that are considered as "following" the task. The current tasks is not
/// aggregating over the follower tasks and they should be aggregated by all upper tasks.
fn get_followers(task: &impl TaskGuard) -> Vec<TaskId> {
get_followers_with_aggregation_number(task, get_aggregation_number(task))
}

/// Returns a list of tasks that are considered as "upper" tasks of the task. The upper tasks are
/// aggregating over the task.
pub fn get_uppers(task: &impl TaskGuard) -> Vec<TaskId> {
get_many!(task, Upper { task } count if *count > 0 => *task)
}

/// Returns an iterator of tasks that are considered as "upper" tasks of the task. See `get_uppers`
fn iter_uppers<'a>(task: &'a (impl TaskGuard + 'a)) -> impl Iterator<Item = TaskId> + 'a {
iter_many!(task, Upper { task } count if *count > 0 => *task)
}

/// Returns the aggregation number of the task.
pub fn get_aggregation_number(task: &impl TaskGuard) -> u32 {
get!(task, AggregationNumber)
.map(|a| a.effective)
.unwrap_or_default()
}

/// A job in the job queue for updating something in the aggregated graph.
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum AggregationUpdateJob {
/// Update the aggregation number of a task. This might result in balancing needed to update
/// "upper" and "follower" edges.
UpdateAggregationNumber {
task_id: TaskId,
base_aggregation_number: u32,
distance: Option<NonZeroU32>,
},
/// Notifies an upper task that one of its inner tasks has a new follower.
InnerOfUpperHasNewFollower {
upper_id: TaskId,
new_follower_id: TaskId,
},
/// Notifies multiple upper tasks that one of its inner tasks has a new follower.
InnerOfUppersHasNewFollower {
upper_ids: Vec<TaskId>,
new_follower_id: TaskId,
},
/// Notifies an upper task that one of its inner tasks has new followers.
InnerOfUpperHasNewFollowers {
upper_id: TaskId,
new_follower_ids: Vec<TaskId>,
},
/// Notifies multiple upper tasks that one of its inner tasks has new followers.
InnerOfUppersHasNewFollowers {
upper_ids: Vec<TaskId>,
new_follower_ids: Vec<TaskId>,
},
/// Notifies multiple upper tasks that one of its inner tasks has lost a follower.
InnerOfUppersLostFollower {
upper_ids: Vec<TaskId>,
lost_follower_id: TaskId,
},
/// Notifies multiple upper tasks that one of its inner tasks has lost followers.
InnerOfUppersLostFollowers {
upper_ids: Vec<TaskId>,
lost_follower_ids: Vec<TaskId>,
},
/// Notifies an upper task that one of its inner tasks has lost followers.
InnerOfUpperLostFollowers {
upper_id: TaskId,
lost_follower_ids: Vec<TaskId>,
},
/// Notifies an upper task about changed data from an inner task.
AggregatedDataUpdate {
upper_ids: Vec<TaskId>,
update: AggregatedDataUpdate,
},
/// Invalidates tasks that are dependent on a collectible type.
InvalidateDueToCollectiblesChange {
task_ids: SmallVec<[TaskId; 4]>,
collectible_type: TraitTypeId,
},
BalanceEdge {
upper_id: TaskId,
task_id: TaskId,
},
/// Balances the edges of the graph. This checks if the graph invariant is still met for this
/// edge and coverts a upper edge to a follower edge or vice versa. Balancing might triggers
/// more changes to the structure.
BalanceEdge { upper_id: TaskId, task_id: TaskId },
}

impl AggregationUpdateJob {
Expand All @@ -138,13 +159,18 @@ impl AggregationUpdateJob {
}
}

/// Aggregated data update.
#[derive(Default, Serialize, Deserialize, Clone, Debug)]
pub struct AggregatedDataUpdate {
/// One of the inner tasks has changed its dirty state or aggregated dirty state.
dirty_container_update: Option<(TaskId, DirtyContainerCount)>,
/// One of the inner tasks has changed its collectibles count or aggregated collectibles count.
collectibles_update: Vec<(CollectibleRef, i32)>,
}

impl AggregatedDataUpdate {
/// Derives an `AggregatedDataUpdate` from a task. This is used when a task is connected to an
/// upper task.
fn from_task(task: &mut impl TaskGuard) -> Self {
let aggregation = get_aggregation_number(task);
let mut dirty_container_count = Default::default();
Expand Down Expand Up @@ -187,6 +213,7 @@ impl AggregatedDataUpdate {
result
}

/// Inverts the update. This is used when the task is removed from an upper task.
fn invert(mut self) -> Self {
let Self {
dirty_container_update,
Expand All @@ -201,6 +228,8 @@ impl AggregatedDataUpdate {
self
}

/// Applies the update to the task. It may return an aggregated update that should be applied to
/// upper tasks.
fn apply(
&self,
task: &mut impl TaskGuard,
Expand Down Expand Up @@ -312,6 +341,7 @@ impl AggregatedDataUpdate {
result
}

/// Returns true, when the update is empty resp. a no-op.
fn is_empty(&self) -> bool {
let Self {
dirty_container_update,
Expand All @@ -320,24 +350,28 @@ impl AggregatedDataUpdate {
dirty_container_update.is_none() && collectibles_update.is_empty()
}

/// Creates a new empty update.
pub fn new() -> Self {
Self {
dirty_container_update: None,
collectibles_update: Vec::new(),
}
}

/// Adds a dirty container update to the update.
pub fn dirty_container_update(mut self, task_id: TaskId, count: DirtyContainerCount) -> Self {
self.dirty_container_update = Some((task_id, count));
self
}

/// Adds a collectibles update to the update.
pub fn collectibles_update(mut self, collectibles_update: Vec<(CollectibleRef, i32)>) -> Self {
self.collectibles_update = collectibles_update;
self
}
}

/// An aggregation number update job that is enqueued.
#[derive(Serialize, Deserialize, Clone)]
struct AggregationNumberUpdate {
base_aggregation_number: u32,
Expand All @@ -347,6 +381,7 @@ struct AggregationNumberUpdate {
span: Option<Span>,
}

/// An aggregated data update job that is enqueued. See `AggregatedDataUpdate`.
#[derive(Serialize, Deserialize, Clone)]
struct AggregationUpdateJobItem {
job: AggregationUpdateJob,
Expand Down Expand Up @@ -379,6 +414,7 @@ struct AggregationUpdateJobGuard {
_guard: Option<tracing::span::EnteredSpan>,
}

/// A balancing job that is enqueued. See `balance_edge`.
#[derive(Serialize, Deserialize, Clone)]
struct BalanceJob {
upper_id: TaskId,
Expand Down Expand Up @@ -414,6 +450,7 @@ impl PartialEq for BalanceJob {

impl Eq for BalanceJob {}

/// An optimization job that is enqueued. See `optimize_task`.
#[derive(Serialize, Deserialize, Clone)]
struct OptimizeJob {
task_id: TaskId,
Expand Down Expand Up @@ -479,6 +516,7 @@ impl PartialEq for FindAndScheduleJob {

impl Eq for FindAndScheduleJob {}

/// A queue for aggregation update jobs.
#[derive(Default, Serialize, Deserialize, Clone)]
pub struct AggregationUpdateQueue {
jobs: VecDeque<AggregationUpdateJobItem>,
Expand All @@ -491,6 +529,7 @@ pub struct AggregationUpdateQueue {
}

impl AggregationUpdateQueue {
/// Creates a new empty queue.
pub fn new() -> Self {
Self {
jobs: VecDeque::with_capacity(8),
Expand All @@ -503,10 +542,12 @@ impl AggregationUpdateQueue {
}
}

/// Returns true, when the queue is empty.
pub fn is_empty(&self) -> bool {
self.jobs.is_empty()
}

/// Pushes a job to the queue.
pub fn push(&mut self, job: AggregationUpdateJob) {
match job {
AggregationUpdateJob::UpdateAggregationNumber {
Expand Down Expand Up @@ -565,19 +606,22 @@ impl AggregationUpdateQueue {
}
}

/// Extends the queue with multiple jobs.
pub fn extend(&mut self, jobs: impl IntoIterator<Item = AggregationUpdateJob>) {
for job in jobs {
self.push(job);
}
}

/// Pushes a job to find and schedule dirty tasks.
pub fn push_find_and_schedule_dirty(&mut self, task_id: TaskId) {
if !self.done_find_and_schedule.contains(&task_id) {
self.find_and_schedule
.insert_back(FindAndScheduleJob::new(task_id));
}
}

/// Extends the queue with multiple jobs to find and schedule dirty tasks.
pub fn extend_find_and_schedule_dirty(&mut self, task_ids: impl IntoIterator<Item = TaskId>) {
self.find_and_schedule.extend(
task_ids
Expand All @@ -587,17 +631,21 @@ impl AggregationUpdateQueue {
);
}

pub fn push_optimize_task(&mut self, task_id: TaskId) {
/// Pushes a job to optimize a task.
fn push_optimize_task(&mut self, task_id: TaskId) {
self.optimize_queue.insert_back(OptimizeJob::new(task_id));
}

/// Runs the job and all dependent jobs until it's done. It can persist the operation, so
/// following code might not be executed when persisted.
pub fn run(job: AggregationUpdateJob, ctx: &mut impl ExecuteContext) {
debug_assert!(ctx.should_track_children());
let mut queue = Self::new();
queue.push(job);
queue.execute(ctx);
}

/// Executes a single step of the queue. Returns true, when the queue is empty.
pub fn process(&mut self, ctx: &mut impl ExecuteContext) -> bool {
if let Some(job) = self.jobs.pop_front() {
let job = job.entered();
Expand Down Expand Up @@ -813,6 +861,11 @@ impl AggregationUpdateQueue {
}
}

/// Balances the edge between two tasks. This checks if the graph invariant is still met for
/// this edge and coverts a upper edge to a follower edge or vice versa. Balancing might
/// triggers more changes to the structure.
///
/// It locks both tasks simultaneously to atomically change the edges.
fn balance_edge(&mut self, ctx: &mut impl ExecuteContext, upper_id: TaskId, task_id: TaskId) {
#[cfg(feature = "trace_aggregation_update")]
let _span = trace_span!("process balance edge").entered();
Expand Down Expand Up @@ -948,6 +1001,9 @@ impl AggregationUpdateQueue {
}
}

/// Schedules the task if it's dirty.
///
/// For aggregating nodes that are
fn find_and_schedule_dirty(&mut self, task_id: TaskId, ctx: &mut impl ExecuteContext) {
#[cfg(feature = "trace_find_and_schedule")]
let _span = trace_span!(
Expand Down Expand Up @@ -1593,6 +1649,10 @@ impl AggregationUpdateQueue {
}
}

/// Checks an task for optimization. Optimization ensures that the aggregation number is bigger
/// than the number of upper edges. Increasing the aggregation reduces the number of upper
/// edges, as it places the task in a bigger aggregation group. We want to avoid having too many
/// upper edges as this amplifies the updates needed when changes to that task occur.
fn optimize_task(&mut self, ctx: &mut impl ExecuteContext<'_>, task_id: TaskId) {
#[cfg(feature = "trace_aggregation_update")]
let _span = trace_span!("optimize").entered();
Expand Down
4 changes: 2 additions & 2 deletions turbopack/crates/turbopack-trace-server/src/span_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,8 @@ impl<'a> SpanRef<'a> {
.and_modify(|_, v| v.push(span.index()))
.or_insert_with(|| (value.to_string(), vec![span.index()]));
}
if !span.is_complete() {
let name = "incomplete";
if !span.is_complete() && !span.time_data().ignore_self_time {
let name = "incomplete_span";
index
.raw_entry_mut()
.from_key(name)
Expand Down

0 comments on commit 778ee4f

Please sign in to comment.