Skip to content

Commit

Permalink
[Turbopack] use double locking to balance edges (#70247)
Browse files Browse the repository at this point in the history
### What?

The previous approach was not completely correct and had a race
condition when data was propagated by the edge was being balanced. This
caused aggregated data to be incorrect. This caused some issues to stick
around while they shouldn't do that.

### How?

Now it locks both node at the same time to make sure to balance them in
a single lockstep.
  • Loading branch information
sokra authored Sep 19, 2024
1 parent b59fe88 commit 8e1a92b
Show file tree
Hide file tree
Showing 13 changed files with 306 additions and 593 deletions.
320 changes: 151 additions & 169 deletions turbopack/crates/turbo-tasks-memory/src/aggregation/balance_edge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,29 @@ use std::cmp::Ordering;

use super::{
balance_queue::BalanceQueue,
followers::{
add_follower_count, remove_follower_count, remove_positive_follower_count,
RemovePositveFollowerCountResult,
},
in_progress::is_in_progress,
in_progress::{is_in_progress, start_in_progress_all, start_in_progress_count},
increase::IncreaseReason,
increase_aggregation_number_internal,
uppers::{
add_upper_count, remove_positive_upper_count, remove_upper_count,
RemovePositiveUpperCountResult,
},
AggregationContext, AggregationNode,
notify_lost_follower::notify_lost_follower,
notify_new_follower::notify_new_follower,
util::{get_aggregated_add_change, get_aggregated_remove_change, get_followers_or_children},
AggregationContext, AggregationNode, PreparedInternalOperation, PreparedOperation, StackVec,
};

// Migrated followers to uppers or uppers to followers depending on the
// Migrate followers to uppers or uppers to followers depending on the
// aggregation numbers of the nodes involved in the edge. Might increase targets
// aggregation number if they are equal.
pub(super) fn balance_edge<C: AggregationContext>(
ctx: &C,
balance_queue: &mut BalanceQueue<C::NodeRef>,
upper_id: &C::NodeRef,
mut upper_aggregation_number: u32,
target_id: &C::NodeRef,
mut target_aggregation_number: u32,
) -> (u32, u32) {
// too many uppers on target
let mut extra_uppers = 0;
// too many followers on upper
let mut extra_followers = 0;
// The last info about uppers
let mut uppers_count: Option<isize> = None;
// The last info about followers
let mut followers_count = None;

) {
loop {
let (mut upper, mut target) = ctx.node_pair(upper_id, target_id);
let upper_aggregation_number = upper.aggregation_number();
let target_aggregation_number = target.aggregation_number();

let root = upper_aggregation_number == u32::MAX || target_aggregation_number == u32::MAX;
let order = if root {
Ordering::Greater
Expand All @@ -45,164 +33,158 @@ pub(super) fn balance_edge<C: AggregationContext>(
};
match order {
Ordering::Equal => {
// we probably want to increase the aggregation number of target
let upper = ctx.node(upper_id);
upper_aggregation_number = upper.aggregation_number();
drop(upper);
if upper_aggregation_number != u32::MAX
&& upper_aggregation_number == target_aggregation_number
{
let target = ctx.node(target_id);
target_aggregation_number = target.aggregation_number();
if upper_aggregation_number == target_aggregation_number {
// increase target aggregation number
increase_aggregation_number_internal(
ctx,
balance_queue,
target,
target_id,
target_aggregation_number + 1,
target_aggregation_number + 1,
IncreaseReason::EqualAggregationNumberOnBalance,
);
}
}
// increase target aggregation number
increase_aggregation_number_internal(
ctx,
balance_queue,
target,
target_id,
target_aggregation_number + 1,
target_aggregation_number + 1,
IncreaseReason::EqualAggregationNumberOnBalance,
);
}
Ordering::Less => {
// target should probably be a follower of upper
if uppers_count.map_or(false, |count| count <= 0) {
// We already removed all uppers, maybe too many
if is_in_progress(ctx, upper_id) {
drop(target);
let AggregationNode::Aggegating(aggregating) = &mut *upper else {
unreachable!();
};
aggregating
.enqueued_balancing
.push((upper_id.clone(), target_id.clone()));
drop(upper);
// Somebody else will balance this edge
break;
} else if extra_followers == 0 {
let upper = ctx.node(upper_id);
upper_aggregation_number = upper.aggregation_number();
if upper_aggregation_number < target_aggregation_number {
// target should be a follower of upper
// add some extra followers
let count = uppers_count.unwrap_or(1) as usize;
extra_followers += count;
followers_count = Some(add_follower_count(
ctx,
balance_queue,
upper,
upper_id,
target_id,
count,
true,
));
}
}

// target should be a follower of upper
let count = target
.uppers_mut()
.remove_all_positive_clonable_count(upper_id);
if count == 0 {
break;
}
let added = upper
.followers_mut()
.unwrap()
.add_clonable_count(target_id, count);

// target removed as upper
let remove_change = get_aggregated_remove_change(ctx, &target);
let followers = get_followers_or_children(ctx, &target);

let upper_uppers = if added {
// target added as follower
let uppers = upper.uppers().iter().cloned().collect::<StackVec<_>>();
start_in_progress_all(ctx, &uppers);
uppers
} else {
// we already have extra followers, remove some uppers to balance
let count = extra_followers + extra_uppers;
let target = ctx.node(target_id);
if is_in_progress(ctx, upper_id) {
drop(target);
let mut upper = ctx.node(upper_id);
if is_in_progress(ctx, upper_id) {
let AggregationNode::Aggegating(aggregating) = &mut *upper else {
unreachable!();
};
aggregating.enqueued_balancing.push((
upper_id.clone(),
upper_aggregation_number,
target_id.clone(),
target_aggregation_number,
));
drop(upper);
// Somebody else will balance this edge
return (upper_aggregation_number, target_aggregation_number);
}
} else {
let RemovePositiveUpperCountResult {
removed_count,
remaining_count,
} = remove_positive_upper_count(
ctx,
balance_queue,
target,
upper_id,
count,
);
decrease_numbers(removed_count, &mut extra_uppers, &mut extra_followers);
uppers_count = Some(remaining_count);
}
Default::default()
};

drop(target);

// target removed as upper
let remove_prepared =
remove_change.and_then(|remove_change| upper.apply_change(ctx, remove_change));
start_in_progress_count(ctx, upper_id, followers.len() as u32);
let prepared = followers
.into_iter()
.map(|child_id| {
upper.notify_lost_follower(ctx, balance_queue, upper_id, &child_id)
})
.collect::<StackVec<_>>();
drop(upper);

// target added as follower
for upper_id in upper_uppers {
notify_new_follower(
ctx,
balance_queue,
ctx.node(&upper_id),
&upper_id,
target_id,
false,
);
}

// target removed as upper
remove_prepared.apply(ctx);
prepared.apply(ctx, balance_queue);

break;
}
Ordering::Greater => {
// target should probably be an inner node of upper
if followers_count.map_or(false, |count| count <= 0) {
// We already removed all followers, maybe too many
if is_in_progress(ctx, upper_id) {
let AggregationNode::Aggegating(aggregating) = &mut *upper else {
unreachable!();
};
aggregating
.enqueued_balancing
.push((upper_id.clone(), target_id.clone()));
drop(upper);
// Somebody else will balance this edge
break;
}

// target should be a inner node of upper
let count = upper
.followers_mut()
.unwrap()
.remove_all_positive_clonable_count(target_id);
if count == 0 {
break;
} else if extra_uppers == 0 {
let target = ctx.node(target_id);
target_aggregation_number = target.aggregation_number();
if root || target_aggregation_number < upper_aggregation_number {
// target should be a inner node of upper
if is_in_progress(ctx, upper_id) {
drop(target);
let mut upper = ctx.node(upper_id);
if is_in_progress(ctx, upper_id) {
let AggregationNode::Aggegating(aggregating) = &mut *upper else {
unreachable!();
};
aggregating.enqueued_balancing.push((
upper_id.clone(),
upper_aggregation_number,
target_id.clone(),
target_aggregation_number,
));
drop(upper);
// Somebody else will balance this edge
return (upper_aggregation_number, target_aggregation_number);
}
} else {
// add some extra uppers
let count = followers_count.unwrap_or(1) as usize;
extra_uppers += count;
uppers_count = Some(
add_upper_count(
ctx,
balance_queue,
target,
target_id,
upper_id,
count,
true,
)
.new_count,
);
}
}
}
let added = target.uppers_mut().add_clonable_count(upper_id, count);

// target removed as follower
let uppers = upper.uppers().iter().cloned().collect::<StackVec<_>>();
start_in_progress_all(ctx, &uppers);

let (add_change, followers) = if added {
// target added as upper
let add_change = get_aggregated_add_change(ctx, &target);
let followers = get_followers_or_children(ctx, &target);
start_in_progress_count(ctx, upper_id, followers.len() as u32);
(add_change, followers)
} else {
// we already have extra uppers, try to remove some followers to balance
let count = extra_followers + extra_uppers;
let upper = ctx.node(upper_id);
let RemovePositveFollowerCountResult {
removed_count,
remaining_count,
} = remove_positive_follower_count(ctx, balance_queue, upper, target_id, count);
decrease_numbers(removed_count, &mut extra_followers, &mut extra_uppers);
followers_count = Some(remaining_count);
(None, Default::default())
};

drop(target);

// target added as upper
let add_prepared =
add_change.and_then(|add_change| upper.apply_change(ctx, add_change));
let prepared = followers
.into_iter()
.filter_map(|child_id| {
upper.notify_new_follower(ctx, balance_queue, upper_id, &child_id, false)
})
.collect::<StackVec<_>>();

drop(upper);

add_prepared.apply(ctx);
for prepared in prepared {
prepared.apply(ctx, balance_queue);
}

// target removed as follower
for upper_id in uppers {
notify_lost_follower(
ctx,
balance_queue,
ctx.node(&upper_id),
&upper_id,
target_id,
);
}

break;
}
}
}
if extra_followers > 0 {
let upper = ctx.node(upper_id);
remove_follower_count(ctx, balance_queue, upper, target_id, extra_followers);
}
if extra_uppers > 0 {
let target = ctx.node(target_id);
remove_upper_count(ctx, balance_queue, target, upper_id, extra_uppers);
}
(upper_aggregation_number, target_aggregation_number)
}

fn decrease_numbers(amount: usize, a: &mut usize, b: &mut usize) {
if *a >= amount {
*a -= amount;
} else {
*b -= amount - *a;
*a = 0;
}
}
Loading

0 comments on commit 8e1a92b

Please sign in to comment.