Skip to content

Commit

Permalink
setup aggregation number before connecting children
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Feb 3, 2025
1 parent f1f8518 commit fdaad90
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 70 deletions.
96 changes: 74 additions & 22 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use auto_hash_map::{AutoMap, AutoSet};
use dashmap::DashMap;
use parking_lot::{Condvar, Mutex};
use rustc_hash::FxHasher;
use smallvec::smallvec;
use smallvec::{smallvec, SmallVec};
use tokio::time::{Duration, Instant};
use turbo_tasks::{
backend::{
Expand All @@ -42,10 +42,10 @@ use crate::backend::operation::TaskDirtyCause;
use crate::{
backend::{
operation::{
connect_children, get_aggregation_number, is_root_node, AggregatedDataUpdate,
AggregationUpdateJob, AggregationUpdateQueue, CleanupOldEdgesOperation,
ConnectChildOperation, ExecuteContext, ExecuteContextImpl, Operation, OutdatedEdge,
TaskGuard,
connect_children, get_aggregation_number, is_root_node, prepare_new_children,
AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue,
CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
Operation, OutdatedEdge, TaskGuard,
},
persisted_storage_log::PersistedStorageLog,
storage::{get, get_many, get_mut, get_mut_or_insert_with, iter_many, remove, Storage},
Expand Down Expand Up @@ -1208,23 +1208,22 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
let mut removed_data = Vec::new();
let mut old_edges = Vec::new();

// Connect children
{
for old_child in iter_many!(task, Child { task } => task) {
if !new_children.remove(&old_child) {
old_edges.push(OutdatedEdge::Child(old_child));
}
}
// Prepare all new children
prepare_new_children(task_id, &mut task, &new_children, &mut queue);

let has_active_count =
get!(task, Activeness).map_or(false, |activeness| activeness.active_counter > 0);
connect_children(
task_id,
&mut task,
new_children,
&mut queue,
has_active_count,
);
// Filter actual new children
let mut kept_children = SmallVec::new();
for old_child in iter_many!(task, Child { task } => task) {
if !new_children.remove(&old_child) {
old_edges.push(OutdatedEdge::Child(old_child));
} else {
kept_children.push(old_child);
}
}
if !kept_children.is_empty() {
queue.push(AggregationUpdateJob::DecreaseActiveCounts {
task_ids: kept_children,
});
}

// Remove no longer existing cells and notify in progress cells
Expand Down Expand Up @@ -1288,7 +1287,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
drop(task);

{
let _span = tracing::trace_span!("CleanupOldEdgesOperation").entered();
let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
// Remove outdated edges first, before removing in_progress+dirty flag.
// We need to make sure all outdated edges are removed before the task can potentially
// be scheduled and executed again
Expand All @@ -1299,6 +1298,59 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
// suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
// would be executed again.

let mut task = ctx.task(task_id, TaskDataCategory::All);
let Some(in_progress) = get!(task, InProgress) else {
panic!("Task execution completed, but task is not in progress: {task:#?}");
};
let InProgressState::InProgress(box InProgressStateInner { stale, .. }) = in_progress
else {
panic!("Task execution completed, but task is not in progress: {task:#?}");
};

// If the task is stale, reschedule it
if *stale {
let Some(InProgressState::InProgress(box InProgressStateInner {
done_event,
new_children,
..
})) = remove!(task, InProgress)
else {
unreachable!();
};
task.add_new(CachedDataItem::InProgress {
value: InProgressState::Scheduled { done_event },
});

// All `new_children` are currently hold active with an active count and we need to undo
// that.
AggregationUpdateQueue::run(
AggregationUpdateJob::DecreaseActiveCounts {
task_ids: new_children.into_iter().collect(),
},
&mut ctx,
);
return true;
}

let mut queue = AggregationUpdateQueue::new();

let has_active_count =
get!(task, Activeness).map_or(false, |activeness| activeness.active_counter > 0);
connect_children(
task_id,
&mut task,
new_children,
&mut queue,
has_active_count,
);

drop(task);

{
let _span = tracing::trace_span!("connect new children").entered();
queue.execute(&mut ctx);
}

let mut task = ctx.task(task_id, TaskDataCategory::All);
let Some(in_progress) = remove!(task, InProgress) else {
panic!("Task execution completed, but task is not in progress: {task:#?}");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
use std::{cmp::max, num::NonZeroU32};

use rustc_hash::FxHashSet;
use smallvec::SmallVec;
use turbo_tasks::TaskId;

use crate::{
backend::{
get,
operation::{
aggregation_update::InnerOfUppersHasNewFollowersJob, get_uppers, is_aggregating_node,
is_root_node, AggregationUpdateJob, AggregationUpdateQueue, TaskGuard,
},
backend::operation::{
aggregation_update::InnerOfUppersHasNewFollowersJob, get_aggregation_number, get_uppers,
is_aggregating_node, AggregationUpdateJob, AggregationUpdateQueue, TaskGuard,
},
data::CachedDataItem,
};

const AGGREGATION_NUMBER_BUFFER_SPACE: u32 = 3;

pub fn connect_children(
parent_task_id: TaskId,
parent_task: &mut impl TaskGuard,
Expand All @@ -27,45 +20,8 @@ pub fn connect_children(
if new_children.is_empty() {
return;
}
let children_count = new_children.len();

// Compute future parent aggregation number based on the number of children
let current_parent_aggregation = get!(parent_task, AggregationNumber)
.copied()
.unwrap_or_default();
let (parent_aggregation, future_parent_aggregation) =
if is_root_node(current_parent_aggregation.base) {
(u32::MAX, u32::MAX)
} else {
let target_distance = children_count.ilog2() * 2;
if target_distance > current_parent_aggregation.distance {
queue.push(AggregationUpdateJob::UpdateAggregationNumber {
task_id: parent_task_id,
base_aggregation_number: 0,
distance: NonZeroU32::new(target_distance),
})
}
(
current_parent_aggregation.effective,
current_parent_aggregation
.base
.saturating_add(max(target_distance, current_parent_aggregation.distance)),
)
};

// When the parent is a leaf node, we need to increase the aggregation number of the children to
// be counting from the parent's aggregation number.
if !is_aggregating_node(future_parent_aggregation) {
let child_base_aggregation_number =
future_parent_aggregation + AGGREGATION_NUMBER_BUFFER_SPACE;
for &new_child in new_children.iter() {
queue.push(AggregationUpdateJob::UpdateAggregationNumber {
task_id: new_child,
base_aggregation_number: child_base_aggregation_number,
distance: None,
});
}
};
let parent_aggregation = get_aggregation_number(parent_task);

for &new_child in new_children.iter() {
parent_task.add_new(CachedDataItem::Child {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod cleanup_old_edges;
mod connect_child;
mod connect_children;
mod invalidate;
mod prepare_new_children;
mod update_cell;
mod update_collectible;
mod update_output;
Expand Down Expand Up @@ -752,6 +753,7 @@ pub use self::{
},
cleanup_old_edges::OutdatedEdge,
connect_children::connect_children,
prepare_new_children::prepare_new_children,
update_cell::UpdateCellOperation,
update_collectible::UpdateCollectibleOperation,
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use std::{cmp::max, num::NonZeroU32};

use rustc_hash::FxHashSet;
use turbo_tasks::TaskId;

use crate::backend::{
get,
operation::{
is_aggregating_node, is_root_node, AggregationUpdateJob, AggregationUpdateQueue, TaskGuard,
},
};

const AGGREGATION_NUMBER_BUFFER_SPACE: u32 = 3;

pub fn prepare_new_children(
parent_task_id: TaskId,
parent_task: &mut impl TaskGuard,
new_children: &FxHashSet<TaskId>,
queue: &mut AggregationUpdateQueue,
) {
if new_children.is_empty() {
return;
}
let children_count = new_children.len();

// Compute future parent aggregation number based on the number of children
let current_parent_aggregation = get!(parent_task, AggregationNumber)
.copied()
.unwrap_or_default();
let future_parent_aggregation = if is_root_node(current_parent_aggregation.base) {
u32::MAX
} else {
let target_distance = children_count.ilog2() * 2;
if target_distance > current_parent_aggregation.distance {
queue.push(AggregationUpdateJob::UpdateAggregationNumber {
task_id: parent_task_id,
base_aggregation_number: 0,
distance: NonZeroU32::new(target_distance),
})
}
current_parent_aggregation
.base
.saturating_add(max(target_distance, current_parent_aggregation.distance))
};

// When the parent is a leaf node, we need to increase the aggregation number of the children to
// be counting from the parent's aggregation number.
if !is_aggregating_node(future_parent_aggregation) {
let child_base_aggregation_number =
future_parent_aggregation + AGGREGATION_NUMBER_BUFFER_SPACE;
for &new_child in new_children.iter() {
queue.push(AggregationUpdateJob::UpdateAggregationNumber {
task_id: new_child,
base_aggregation_number: child_base_aggregation_number,
distance: None,
});
}
};
}

0 comments on commit fdaad90

Please sign in to comment.