Skip to content

Commit

Permalink
more aggregation operations
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Sep 4, 2024
1 parent 686c198 commit e126a75
Show file tree
Hide file tree
Showing 9 changed files with 556 additions and 102 deletions.
3 changes: 3 additions & 0 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,8 @@ impl Backend for TurboTasksBackend {
})
.collect::<Vec<_>>();

task.remove(&CachedDataItemKey::Dirty {});

done_event.notify(usize::MAX);
drop(task);

Expand Down Expand Up @@ -912,6 +914,7 @@ impl Backend for TurboTasksBackend {
let mut task = self.storage.access_mut(task_id);
task.add(CachedDataItem::new_scheduled(task_id));
task.add(CachedDataItem::AggregateRootType { value: root_type });
task.add(CachedDataItem::AggregationNumber { value: u32::MAX });
}
turbo_tasks.schedule(task_id);
task_id
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,166 @@
use std::{collections::HashMap, ops::Add};

use serde::{Deserialize, Serialize};
use turbo_tasks::TaskId;

use super::ExecuteContext;
use crate::data::{CachedDataItem, CachedDataItemKey};
use super::{ExecuteContext, TaskGuard};
use crate::{
data::{CachedDataItem, CachedDataItemKey},
get, get_many, update, update_count,
};

#[derive(Serialize, Deserialize, Clone)]
pub enum AggregationUpdateJob {
InnerHasNewFollower {
upper_ids: Vec<TaskId>,
new_follower_id: TaskId,
new_follower_data: (),
},
InnerHasNewFollowers {
upper_ids: Vec<TaskId>,
new_follower_ids: Vec<TaskId>,
},
InnerLostFollower {
upper_ids: Vec<TaskId>,
lost_follower_id: TaskId,
},
AggregatedDataUpdate {
upper_ids: Vec<TaskId>,
update: AggregatedDataUpdate,
},
DataUpdate {
task_id: TaskId,
update: AggregatedDataUpdate,
},
ScheduleWhenDirty {
task_ids: Vec<TaskId>,
},
}

#[derive(Default, Serialize, Deserialize, Clone)]
pub struct AggregatedDataUpdate {
unfinished: i32,
dirty_tasks_update: HashMap<TaskId, i32>,
// TODO collectibles
}

impl AggregatedDataUpdate {
fn from_task(task: &mut TaskGuard<'_>) -> Self {
let aggregation = get!(task, AggregationNumber);
if aggregation.is_some() {
let unfinished = get!(task, AggregatedUnfinishedTasks);
let dirty_tasks_update = task
.iter()
.filter_map(|(key, _)| match *key {
CachedDataItemKey::AggregatedDirtyTask { task } => Some((task, 1)),
_ => None,
})
.collect();
Self {
unfinished: unfinished.copied().unwrap_or(0) as i32,
dirty_tasks_update,
}
} else {
let dirty = get!(task, Dirty);
if dirty.is_some() {
Self::dirty_task(task.id())
} else {
Self::default()
}
}
}

fn apply(
&self,
task: &mut TaskGuard<'_>,
queue: &mut AggregationUpdateQueue,
) -> AggregatedDataUpdate {
let Self {
unfinished,
dirty_tasks_update,
} = self;
let mut result = Self::default();
if *unfinished != 0 {
update!(task, AggregatedUnfinishedTasks, |old: Option<u32>| {
let old = old.unwrap_or(0);
let new = (old as i32 + *unfinished) as u32;
if new == 0 {
result.unfinished = -1;
None
} else {
if old > 0 {
result.unfinished = 1;
}
Some(new)
}
});
}
if !dirty_tasks_update.is_empty() {
let mut task_to_schedule = Vec::new();
let root_type = get!(task, AggregateRootType).copied();
for (task_id, count) in dirty_tasks_update {
update!(
task,
AggregatedDirtyTask { task: *task_id },
|old: Option<u32>| {
let old = old.unwrap_or(0);
if old == 0 {

Check failure on line 106 in turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs

View workflow job for this annotation

GitHub Actions / rust check / build

this `if` statement can be collapsed
if root_type.is_some() {
task_to_schedule.push(*task_id);
}
}
let new = (old as i32 + *count) as u32;
if new == 0 {
result.dirty_tasks_update.insert(*task_id, -1);
None
} else {
if old > 0 {
result.dirty_tasks_update.insert(*task_id, 1);
}
Some(new)
}
}
);
}
if !task_to_schedule.is_empty() {
queue.push(AggregationUpdateJob::ScheduleWhenDirty {
task_ids: task_to_schedule,
})
}
}
result
}

fn is_empty(&self) -> bool {
let Self {
unfinished,
dirty_tasks_update,
} = self;
*unfinished == 0 && dirty_tasks_update.is_empty()
}

pub fn dirty_task(task_id: TaskId) -> Self {
Self {
unfinished: 1,
dirty_tasks_update: HashMap::from([(task_id, 1)]),
}
}
}

impl Add for AggregatedDataUpdate {
type Output = Self;

fn add(self, rhs: Self) -> Self::Output {
let mut dirty_tasks_update = self.dirty_tasks_update;
for (task, count) in rhs.dirty_tasks_update {
*dirty_tasks_update.entry(task).or_default() += count;
}
Self {
unfinished: self.unfinished + rhs.unfinished,
dirty_tasks_update,
}
}
}

#[derive(Default, Serialize, Deserialize, Clone)]
pub struct AggregationUpdateQueue {
jobs: Vec<AggregationUpdateJob>,
Expand All @@ -23,19 +171,44 @@ impl AggregationUpdateQueue {
Self { jobs: Vec::new() }
}

pub fn is_empty(&self) -> bool {
self.jobs.is_empty()
}

pub fn push(&mut self, job: AggregationUpdateJob) {
self.jobs.push(job);
}

pub fn process(&mut self, ctx: &ExecuteContext<'_>) -> bool {
if let Some(job) = self.jobs.pop() {
match job {
AggregationUpdateJob::InnerHasNewFollowers {
upper_ids,
mut new_follower_ids,
} => {
if let Some(new_follower_id) = new_follower_ids.pop() {
if new_follower_ids.is_empty() {
self.jobs.push(AggregationUpdateJob::InnerHasNewFollower {
upper_ids,
new_follower_id,
});
} else {
self.jobs.push(AggregationUpdateJob::InnerHasNewFollowers {
upper_ids: upper_ids.clone(),
new_follower_ids,
});
self.jobs.push(AggregationUpdateJob::InnerHasNewFollower {
upper_ids,
new_follower_id,
});
}
}
}
AggregationUpdateJob::InnerHasNewFollower {
mut upper_ids,
new_follower_id,
new_follower_data,
} => {
upper_ids.retain(|&upper_id| {
upper_ids.retain(|&_upper_id| {
// let mut upper = ctx.task(upper_id);
// TODO decide if it should be an inner or follower
// TODO for now: always inner
Expand All @@ -46,15 +219,12 @@ impl AggregationUpdateQueue {
// TODO return true for inner, false for follower
true
});
let children;
let children: Vec<TaskId>;
let data;
{
let mut follower = ctx.task(new_follower_id);
upper_ids.retain(|&upper_id| {
if follower.add(CachedDataItem::Upper {
task: upper_id,
value: (),
}) {
if update_count!(follower, Upper { task: upper_id }, 1) {
// It's a new upper
true
} else {
Expand All @@ -63,31 +233,79 @@ impl AggregationUpdateQueue {
}
});
if !upper_ids.is_empty() {
// TODO get data
data = ();
children = follower
.iter()
.filter_map(|(key, _)| match *key {
CachedDataItemKey::Child { task } => Some(task),
_ => None,
})
.collect::<Vec<_>>();
data = AggregatedDataUpdate::from_task(&mut follower);
children = get_many!(follower, Child { task } => task);
} else {
data = Default::default();
children = Default::default();
}
}
for upper_id in upper_ids.iter() {
// TODO add data to upper
// add data to upper
let mut upper = ctx.task(*upper_id);
let diff = data.apply(&mut upper, self);
if !diff.is_empty() {
let upper_ids = get_many!(upper, Upper { task } => task);
self.jobs.push(AggregationUpdateJob::AggregatedDataUpdate {
upper_ids,
update: diff,
})
}
}
for child_id in children {
let child = ctx.task(child_id);
// TODO get child data
self.jobs.push(AggregationUpdateJob::InnerHasNewFollower {
if !children.is_empty() {
self.jobs.push(AggregationUpdateJob::InnerHasNewFollowers {
upper_ids: upper_ids.clone(),
new_follower_id: child_id,
new_follower_data: (),
})
new_follower_ids: children,
});
}
}
AggregationUpdateJob::InnerLostFollower {
upper_ids,
lost_follower_id,
} => {
for upper_id in upper_ids {
let mut upper = ctx.task(upper_id);
upper.remove(&CachedDataItemKey::Upper {
task: lost_follower_id,
});
let diff = AggregatedDataUpdate::dirty_task(lost_follower_id);
let upper_ids = get_many!(upper, Upper { task } => task);
self.jobs.push(AggregationUpdateJob::AggregatedDataUpdate {
upper_ids,
update: diff,
});
}
}
AggregationUpdateJob::AggregatedDataUpdate { upper_ids, update } => {
for upper_id in upper_ids {
let mut upper = ctx.task(upper_id);
let diff = update.apply(&mut upper, self);
if !diff.is_empty() {
let upper_ids = get_many!(upper, Upper { task } => task);
self.jobs.push(AggregationUpdateJob::AggregatedDataUpdate {
upper_ids,
update: diff,
});
}
}
}
AggregationUpdateJob::DataUpdate { task_id, update } => {
let mut task = ctx.task(task_id);
let diff = update.apply(&mut task, self);
if !diff.is_empty() {
let upper_ids = get_many!(task, Upper { task } => task);
self.jobs.push(AggregationUpdateJob::AggregatedDataUpdate {
upper_ids,
update: diff,
});
}
}
AggregationUpdateJob::ScheduleWhenDirty { task_ids } => {
for task_id in task_ids {
let mut task = ctx.task(task_id);
if task.add(CachedDataItem::new_scheduled(task_id)) {
ctx.turbo_tasks.schedule(task_id);
}
}
}
}
Expand Down
Loading

0 comments on commit e126a75

Please sign in to comment.