Skip to content

Commit

Permalink
refactor(hash agg): split building change and applying change (rising…
Browse files Browse the repository at this point in the history
…wavelabs#8706)

Signed-off-by: Richard Chien <stdrc@outlook.com>
  • Loading branch information
stdrc authored Mar 22, 2023
1 parent b474059 commit 9e774bb
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 169 deletions.
233 changes: 113 additions & 120 deletions src/stream/src/executor/aggregation/agg_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,80 +31,15 @@ use crate::common::table::state_table::StateTable;
use crate::executor::error::StreamExecutorResult;
use crate::executor::PkIndices;

mod changes_builder {
use super::*;

pub(super) fn insert_new_outputs(
curr_outputs: &OwnedRow,
builders: &mut [ArrayBuilderImpl],
new_ops: &mut Vec<Op>,
) -> usize {
new_ops.push(Op::Insert);

for (builder, new_value) in builders.iter_mut().zip_eq_fast(curr_outputs.iter()) {
trace!("insert datum: {:?}", new_value);
builder.append_datum(new_value);
}

1
}

pub(super) fn delete_old_outputs(
prev_outputs: &OwnedRow,
builders: &mut [ArrayBuilderImpl],
new_ops: &mut Vec<Op>,
) -> usize {
new_ops.push(Op::Delete);

for (builder, old_value) in builders.iter_mut().zip_eq_fast(prev_outputs.iter()) {
trace!("delete datum: {:?}", old_value);
builder.append_datum(old_value);
}

1
}

pub(super) fn update_outputs(
prev_outputs: &OwnedRow,
curr_outputs: &OwnedRow,
builders: &mut [ArrayBuilderImpl],
new_ops: &mut Vec<Op>,
) -> usize {
if prev_outputs == curr_outputs {
// Fast path for no change.
return 0;
}

new_ops.push(Op::UpdateDelete);
new_ops.push(Op::UpdateInsert);

for (builder, old_value, new_value) in itertools::multizip((
builders.iter_mut(),
prev_outputs.iter(),
curr_outputs.iter(),
)) {
trace!(
"update datum: prev = {:?}, curr = {:?}",
old_value,
new_value
);
builder.append_datum(old_value);
builder.append_datum(new_value);
}

2
}
}

pub trait Strategy {
fn build_changes(
/// Infer the change type of the aggregation result. Don't need to take the ownership of
/// `prev_outputs` and `curr_outputs`.
fn infer_change_type(
prev_row_count: usize,
curr_row_count: usize,
prev_outputs: Option<&OwnedRow>,
curr_outputs: &OwnedRow,
builders: &mut [ArrayBuilderImpl],
new_ops: &mut Vec<Op>,
) -> usize;
) -> Option<AggChangeType>;
}

/// The strategy that always outputs the aggregation result no matter there're input rows or not.
Expand All @@ -114,14 +49,12 @@ pub struct AlwaysOutput;
pub struct OnlyOutputIfHasInput;

impl Strategy for AlwaysOutput {
fn build_changes(
fn infer_change_type(
prev_row_count: usize,
curr_row_count: usize,
prev_outputs: Option<&OwnedRow>,
curr_outputs: &OwnedRow,
builders: &mut [ArrayBuilderImpl],
new_ops: &mut Vec<Op>,
) -> usize {
) -> Option<AggChangeType> {
match prev_outputs {
None => {
// First time to build changes, assert to ensure correctness.
Expand All @@ -130,46 +63,48 @@ impl Strategy for AlwaysOutput {
assert_eq!(prev_row_count, 0);

// Generate output no matter whether current row count is 0 or not.
changes_builder::insert_new_outputs(curr_outputs, builders, new_ops)
Some(AggChangeType::Insert)
}
Some(prev_outputs) => {
if prev_row_count == 0 && curr_row_count == 0 {
// No rows exist.
return 0;
if prev_row_count == 0 && curr_row_count == 0 || prev_outputs == curr_outputs {
// No rows exist, or output is not changed.
None
} else {
Some(AggChangeType::Update)
}
changes_builder::update_outputs(prev_outputs, curr_outputs, builders, new_ops)
}
}
}
}

impl Strategy for OnlyOutputIfHasInput {
fn build_changes(
fn infer_change_type(
prev_row_count: usize,
curr_row_count: usize,
prev_outputs: Option<&OwnedRow>,
curr_outputs: &OwnedRow,
builders: &mut [ArrayBuilderImpl],
new_ops: &mut Vec<Op>,
) -> usize {
) -> Option<AggChangeType> {
match (prev_row_count, curr_row_count) {
(0, 0) => {
// No rows of current group exist.
0
None
}
(0, _) => {
// Insert new output row for this newly emerged group.
changes_builder::insert_new_outputs(curr_outputs, builders, new_ops)
Some(AggChangeType::Insert)
}
(_, 0) => {
// Delete old output row for this newly disappeared group.
let prev_outputs = prev_outputs.expect("must exist previous outputs");
changes_builder::delete_old_outputs(prev_outputs, builders, new_ops)
Some(AggChangeType::Delete)
}
(_, _) => {
// Update output row.
let prev_outputs = prev_outputs.expect("must exist previous outputs");
changes_builder::update_outputs(prev_outputs, curr_outputs, builders, new_ops)
if prev_outputs.expect("must exist previous outputs") == curr_outputs {
// No output change.
None
} else {
Some(AggChangeType::Update)
}
}
}
}
Expand All @@ -178,7 +113,7 @@ impl Strategy for OnlyOutputIfHasInput {
/// [`AggGroup`] manages agg states of all agg calls for one `group_key`.
pub struct AggGroup<S: StateStore, Strtg: Strategy> {
/// Group key.
group_key: Option<OwnedRow>, // TODO(rc): we can remove this
group_key: Option<OwnedRow>,

/// Current managed states for all [`AggCall`]s.
states: Vec<AggState<S>>,
Expand All @@ -201,14 +136,25 @@ impl<S: StateStore, Strtg: Strategy> Debug for AggGroup<S, Strtg> {
}
}

/// Information about the changes built by `AggState::build_changes`.
pub struct AggChangesInfo {
/// The number of rows and corresponding ops in the changes.
pub n_appended_ops: usize,
/// The result row containing group key prefix. To be inserted into result table.
pub result_row: OwnedRow,
/// The previous outputs of all agg calls recorded in the `AggState`.
pub prev_outputs: Option<OwnedRow>,
/// Type of aggregation change.
pub enum AggChangeType {
Insert,
Delete,
Update,
}

/// Aggregation change. The result rows include group key prefix.
pub enum AggChange {
Insert {
new_row: OwnedRow,
},
Delete {
old_row: OwnedRow,
},
Update {
old_row: OwnedRow,
new_row: OwnedRow,
},
}

impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
Expand Down Expand Up @@ -318,8 +264,10 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
self.states.iter_mut().for_each(|state| state.reset());
}

/// Get the outputs of all managed agg states.
/// Get the outputs of all managed agg states, without group key prefix.
/// Possibly need to read/sync from state table if the state not cached in memory.
/// This method is idempotent, i.e. it can be called multiple times and the outputs are
/// guaranteed to be the same.
pub async fn get_outputs(
&mut self,
storages: &[AggStateStorage<S>],
Expand Down Expand Up @@ -349,15 +297,9 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
.map(OwnedRow::new)
}

/// Build changes into `builders` and `new_ops`, according to previous and current agg outputs.
/// Returns [`AggChangesInfo`] contains information about changes built.
/// The saved previous outputs will be updated to the latest outputs after building changes.
pub fn build_changes(
&mut self,
curr_outputs: OwnedRow,
builders: &mut [ArrayBuilderImpl],
new_ops: &mut Vec<Op>,
) -> AggChangesInfo {
/// Build aggregation result change, according to previous and current agg outputs.
/// The saved previous outputs will be updated to the latest outputs after this method.
pub fn build_change(&mut self, curr_outputs: OwnedRow) -> Option<AggChange> {
let prev_row_count = self.prev_row_count();
let curr_row_count = curr_outputs[self.row_count_index]
.as_ref()
Expand All @@ -370,27 +312,78 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
curr_row_count
);

let n_appended_ops = Strtg::build_changes(
let change_type = Strtg::infer_change_type(
prev_row_count,
curr_row_count,
self.prev_outputs.as_ref(),
&curr_outputs,
builders,
new_ops,
);

let result_row = self.group_key().chain(&curr_outputs).into_owned_row();
// Split `AggChangeType` and `AggChange` to avoid unnecessary cloning.
change_type.map(|change_type| match change_type {
AggChangeType::Insert => {
let new_row = self.group_key().chain(&curr_outputs).into_owned_row();
self.prev_outputs = Some(curr_outputs);
AggChange::Insert { new_row }
}
AggChangeType::Delete => {
let prev_outputs = self.prev_outputs.take();
let old_row = self.group_key().chain(prev_outputs).into_owned_row();
AggChange::Delete { old_row }
}
AggChangeType::Update => {
let new_row = self.group_key().chain(&curr_outputs).into_owned_row();
let prev_outputs = self.prev_outputs.replace(curr_outputs);
let old_row = self.group_key().chain(prev_outputs).into_owned_row();
AggChange::Update { old_row, new_row }
}
})
}

let prev_outputs = if n_appended_ops == 0 {
self.prev_outputs.clone()
} else {
std::mem::replace(&mut self.prev_outputs, Some(curr_outputs))
};
pub fn apply_change_to_builders(
&self,
change: &AggChange,
builders: &mut [ArrayBuilderImpl],
ops: &mut Vec<Op>,
) {
match change {
AggChange::Insert { new_row } => {
trace!("insert row: {:?}", new_row);
ops.push(Op::Insert);
for (builder, new_value) in builders.iter_mut().zip_eq_fast(new_row.iter()) {
builder.append_datum(new_value);
}
}
AggChange::Delete { old_row } => {
trace!("delete row: {:?}", old_row);
ops.push(Op::Delete);
for (builder, old_value) in builders.iter_mut().zip_eq_fast(old_row.iter()) {
builder.append_datum(old_value);
}
}
AggChange::Update { old_row, new_row } => {
trace!("update row: prev = {:?}, curr = {:?}", old_row, new_row);
ops.push(Op::UpdateDelete);
ops.push(Op::UpdateInsert);
for (builder, old_value, new_value) in
itertools::multizip((builders.iter_mut(), old_row.iter(), new_row.iter()))
{
builder.append_datum(old_value);
builder.append_datum(new_value);
}
}
}
}

AggChangesInfo {
n_appended_ops,
result_row,
prev_outputs,
pub fn apply_change_to_result_table(
&self,
change: &AggChange,
result_table: &mut StateTable<S>,
) {
match change {
AggChange::Insert { new_row } => result_table.insert(new_row),
AggChange::Delete { old_row } => result_table.delete(old_row),
AggChange::Update { old_row, new_row } => result_table.update(old_row, new_row),
}
}
}
29 changes: 8 additions & 21 deletions src/stream/src/executor/global_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ use futures::StreamExt;
use futures_async_stream::try_stream;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::row::RowExt;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_storage::StateStore;

use super::agg_common::AggExecutorArgs;
use super::aggregation::{
agg_call_filter_res, iter_table_storage, AggChangesInfo, AggStateStorage, AlwaysOutput,
DistinctDeduplicater,
agg_call_filter_res, iter_table_storage, AggStateStorage, AlwaysOutput, DistinctDeduplicater,
};
use super::*;
use crate::common::table::state_table::StateTable;
Expand Down Expand Up @@ -251,29 +249,18 @@ impl<S: StateStore> GlobalSimpleAggExecutor<S> {
let mut new_ops = Vec::with_capacity(2);
// Retrieve modified states and put the changes into the builders.
let curr_outputs = vars.agg_group.get_outputs(&this.storages).await?;
let AggChangesInfo {
result_row,
prev_outputs,
n_appended_ops,
} = vars
.agg_group
.build_changes(curr_outputs, &mut builders, &mut new_ops);

if n_appended_ops == 0 {
if let Some(change) = vars.agg_group.build_change(curr_outputs) {
vars.agg_group
.apply_change_to_builders(&change, &mut builders, &mut new_ops);
vars.agg_group
.apply_change_to_result_table(&change, &mut this.result_table);
this.result_table.commit(epoch).await?;
} else {
// Agg result is not changed.
this.result_table.commit_no_data_expected(epoch);
return Ok(None);
}

// Update the result table with latest agg outputs.
if let Some(prev_outputs) = prev_outputs {
let old_row = vars.agg_group.group_key().chain(prev_outputs);
this.result_table.update(old_row, result_row);
} else {
this.result_table.insert(result_row);
}
this.result_table.commit(epoch).await?;

let columns = builders
.into_iter()
.map(|builder| builder.finish().into())
Expand Down
Loading

0 comments on commit 9e774bb

Please sign in to comment.