Skip to content

Commit

Permalink
skip transient jobs in AggregationUpdateQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Jan 29, 2025
1 parent b1b2778 commit 07a3840
Showing 1 changed file with 37 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{

use indexmap::map::Entry;
use rustc_hash::{FxHashMap, FxHashSet};
use serde::{Deserialize, Serialize};
use serde::{ser::SerializeSeq, Deserialize, Serialize, Serializer};
use smallvec::{smallvec, SmallVec};
#[cfg(any(
feature = "trace_aggregation_update",
Expand Down Expand Up @@ -169,17 +169,23 @@ pub enum AggregationUpdateJob {
collectible_type: turbo_tasks::TraitTypeId,
},
/// Increases the active counter of the task
#[serde(skip)]
IncreaseActiveCount { task: TaskId },
/// Increases the active counters of the tasks
#[serde(skip)]
IncreaseActiveCounts { task_ids: TaskIdVec },
/// Decreases the active counter of the task
#[serde(skip)]
DecreaseActiveCount { task: TaskId },
/// Decreases the active counters of the tasks
#[serde(skip)]
DecreaseActiveCounts { task_ids: TaskIdVec },
/// Balances the edges of the graph. This checks if the graph invariant is still met for this
/// edge and coverts a upper edge to a follower edge or vice versa. Balancing might triggers
/// more changes to the structure.
BalanceEdge { upper_id: TaskId, task_id: TaskId },
/// Does nothing. This is used to filter out transient jobs during serialization.
Noop,
}

impl AggregationUpdateJob {
Expand Down Expand Up @@ -561,9 +567,37 @@ impl PartialEq for FindAndScheduleJob {

impl Eq for FindAndScheduleJob {}

/// Serializes the jobs in the queue. This is used to filter out transient jobs during
/// serialization.
fn serialize_jobs<S: Serializer>(
jobs: &VecDeque<AggregationUpdateJobItem>,
serializer: S,
) -> Result<S::Ok, S::Error> {
let mut seq = serializer.serialize_seq(Some(jobs.len()))?;
for job in jobs {
match job.job {
AggregationUpdateJob::IncreaseActiveCount { .. }
| AggregationUpdateJob::IncreaseActiveCounts { .. }
| AggregationUpdateJob::DecreaseActiveCount { .. }
| AggregationUpdateJob::DecreaseActiveCounts { .. } => {
seq.serialize_element(&AggregationUpdateJobItem {
job: AggregationUpdateJob::Noop,
#[cfg(feature = "trace_aggregation_update")]
span: None,
})?;
}
_ => {
seq.serialize_element(job)?;
}
}
}
seq.end()
}

/// A queue for aggregation update jobs.
#[derive(Default, Serialize, Deserialize, Clone)]
pub struct AggregationUpdateQueue {
#[serde(serialize_with = "serialize_jobs")]
jobs: VecDeque<AggregationUpdateJobItem>,
number_updates: FxIndexMap<TaskId, AggregationNumberUpdate>,
done_number_updates: FxHashMap<TaskId, AggregationNumberUpdate>,
Expand Down Expand Up @@ -706,8 +740,9 @@ impl AggregationUpdateQueue {
/// Executes a single step of the queue. Returns true, when the queue is empty.
pub fn process(&mut self, ctx: &mut impl ExecuteContext) -> bool {
if let Some(job) = self.jobs.pop_front() {
let job = job.entered();
let job: AggregationUpdateJobGuard = job.entered();
match job.job {
AggregationUpdateJob::Noop => {}
AggregationUpdateJob::UpdateAggregationNumber { .. }
| AggregationUpdateJob::BalanceEdge { .. } => {
// These jobs are never pushed to the queue
Expand Down

0 comments on commit 07a3840

Please sign in to comment.