Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(hash agg): split building change and applying change #8706

Merged
merged 1 commit into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 113 additions & 120 deletions src/stream/src/executor/aggregation/agg_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,80 +31,15 @@ use crate::common::table::state_table::StateTable;
use crate::executor::error::StreamExecutorResult;
use crate::executor::PkIndices;

mod changes_builder {
use super::*;

pub(super) fn insert_new_outputs(
curr_outputs: &OwnedRow,
builders: &mut [ArrayBuilderImpl],
new_ops: &mut Vec<Op>,
) -> usize {
new_ops.push(Op::Insert);

for (builder, new_value) in builders.iter_mut().zip_eq_fast(curr_outputs.iter()) {
trace!("insert datum: {:?}", new_value);
builder.append_datum(new_value);
}

1
}

pub(super) fn delete_old_outputs(
prev_outputs: &OwnedRow,
builders: &mut [ArrayBuilderImpl],
new_ops: &mut Vec<Op>,
) -> usize {
new_ops.push(Op::Delete);

for (builder, old_value) in builders.iter_mut().zip_eq_fast(prev_outputs.iter()) {
trace!("delete datum: {:?}", old_value);
builder.append_datum(old_value);
}

1
}

pub(super) fn update_outputs(
prev_outputs: &OwnedRow,
curr_outputs: &OwnedRow,
builders: &mut [ArrayBuilderImpl],
new_ops: &mut Vec<Op>,
) -> usize {
if prev_outputs == curr_outputs {
// Fast path for no change.
return 0;
}

new_ops.push(Op::UpdateDelete);
new_ops.push(Op::UpdateInsert);

for (builder, old_value, new_value) in itertools::multizip((
builders.iter_mut(),
prev_outputs.iter(),
curr_outputs.iter(),
)) {
trace!(
"update datum: prev = {:?}, curr = {:?}",
old_value,
new_value
);
builder.append_datum(old_value);
builder.append_datum(new_value);
}

2
}
}

pub trait Strategy {
fn build_changes(
/// Infer the change type of the aggregation result. Don't need to take the ownership of
/// `prev_outputs` and `curr_outputs`.
fn infer_change_type(
prev_row_count: usize,
curr_row_count: usize,
prev_outputs: Option<&OwnedRow>,
curr_outputs: &OwnedRow,
builders: &mut [ArrayBuilderImpl],
new_ops: &mut Vec<Op>,
) -> usize;
) -> Option<AggChangeType>;
}

/// The strategy that always outputs the aggregation result no matter there're input rows or not.
Expand All @@ -114,14 +49,12 @@ pub struct AlwaysOutput;
pub struct OnlyOutputIfHasInput;

impl Strategy for AlwaysOutput {
fn build_changes(
fn infer_change_type(
prev_row_count: usize,
curr_row_count: usize,
prev_outputs: Option<&OwnedRow>,
curr_outputs: &OwnedRow,
builders: &mut [ArrayBuilderImpl],
new_ops: &mut Vec<Op>,
) -> usize {
) -> Option<AggChangeType> {
match prev_outputs {
None => {
// First time to build changes, assert to ensure correctness.
Expand All @@ -130,46 +63,48 @@ impl Strategy for AlwaysOutput {
assert_eq!(prev_row_count, 0);

// Generate output no matter whether current row count is 0 or not.
changes_builder::insert_new_outputs(curr_outputs, builders, new_ops)
Some(AggChangeType::Insert)
}
Some(prev_outputs) => {
if prev_row_count == 0 && curr_row_count == 0 {
// No rows exist.
return 0;
if prev_row_count == 0 && curr_row_count == 0 || prev_outputs == curr_outputs {
// No rows exist, or output is not changed.
None
} else {
Some(AggChangeType::Update)
}
changes_builder::update_outputs(prev_outputs, curr_outputs, builders, new_ops)
}
}
}
}

impl Strategy for OnlyOutputIfHasInput {
fn build_changes(
fn infer_change_type(
prev_row_count: usize,
curr_row_count: usize,
prev_outputs: Option<&OwnedRow>,
curr_outputs: &OwnedRow,
builders: &mut [ArrayBuilderImpl],
new_ops: &mut Vec<Op>,
) -> usize {
) -> Option<AggChangeType> {
match (prev_row_count, curr_row_count) {
(0, 0) => {
// No rows of current group exist.
0
None
}
(0, _) => {
// Insert new output row for this newly emerged group.
changes_builder::insert_new_outputs(curr_outputs, builders, new_ops)
Some(AggChangeType::Insert)
}
(_, 0) => {
// Delete old output row for this newly disappeared group.
let prev_outputs = prev_outputs.expect("must exist previous outputs");
changes_builder::delete_old_outputs(prev_outputs, builders, new_ops)
Some(AggChangeType::Delete)
}
(_, _) => {
// Update output row.
let prev_outputs = prev_outputs.expect("must exist previous outputs");
changes_builder::update_outputs(prev_outputs, curr_outputs, builders, new_ops)
if prev_outputs.expect("must exist previous outputs") == curr_outputs {
// No output change.
None
} else {
Some(AggChangeType::Update)
}
}
}
}
Expand All @@ -178,7 +113,7 @@ impl Strategy for OnlyOutputIfHasInput {
/// [`AggGroup`] manages agg states of all agg calls for one `group_key`.
pub struct AggGroup<S: StateStore, Strtg: Strategy> {
/// Group key.
group_key: Option<OwnedRow>, // TODO(rc): we can remove this
group_key: Option<OwnedRow>,

/// Current managed states for all [`AggCall`]s.
states: Vec<AggState<S>>,
Expand All @@ -201,14 +136,25 @@ impl<S: StateStore, Strtg: Strategy> Debug for AggGroup<S, Strtg> {
}
}

/// Information about the changes built by `AggState::build_changes`.
pub struct AggChangesInfo {
/// The number of rows and corresponding ops in the changes.
pub n_appended_ops: usize,
/// The result row containing group key prefix. To be inserted into result table.
pub result_row: OwnedRow,
/// The previous outputs of all agg calls recorded in the `AggState`.
pub prev_outputs: Option<OwnedRow>,
/// Type of aggregation change.
pub enum AggChangeType {
Insert,
Delete,
Update,
}

/// Aggregation change. The result rows include group key prefix.
pub enum AggChange {
Insert {
new_row: OwnedRow,
},
Delete {
old_row: OwnedRow,
},
Update {
old_row: OwnedRow,
new_row: OwnedRow,
},
}

impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
Expand Down Expand Up @@ -318,8 +264,10 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
self.states.iter_mut().for_each(|state| state.reset());
}

/// Get the outputs of all managed agg states.
/// Get the outputs of all managed agg states, without group key prefix.
/// Possibly need to read/sync from state table if the state not cached in memory.
/// This method is idempotent, i.e. it can be called multiple times and the outputs are
/// guaranteed to be the same.
pub async fn get_outputs(
&mut self,
storages: &[AggStateStorage<S>],
Expand Down Expand Up @@ -349,15 +297,9 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
.map(OwnedRow::new)
}

/// Build changes into `builders` and `new_ops`, according to previous and current agg outputs.
/// Returns [`AggChangesInfo`] contains information about changes built.
/// The saved previous outputs will be updated to the latest outputs after building changes.
pub fn build_changes(
&mut self,
curr_outputs: OwnedRow,
builders: &mut [ArrayBuilderImpl],
new_ops: &mut Vec<Op>,
) -> AggChangesInfo {
/// Build aggregation result change, according to previous and current agg outputs.
/// The saved previous outputs will be updated to the latest outputs after this method.
pub fn build_change(&mut self, curr_outputs: OwnedRow) -> Option<AggChange> {
let prev_row_count = self.prev_row_count();
let curr_row_count = curr_outputs[self.row_count_index]
.as_ref()
Expand All @@ -370,27 +312,78 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
curr_row_count
);

let n_appended_ops = Strtg::build_changes(
let change_type = Strtg::infer_change_type(
prev_row_count,
curr_row_count,
self.prev_outputs.as_ref(),
&curr_outputs,
builders,
new_ops,
);

let result_row = self.group_key().chain(&curr_outputs).into_owned_row();
// Split `AggChangeType` and `AggChange` to avoid unnecessary cloning.
change_type.map(|change_type| match change_type {
AggChangeType::Insert => {
let new_row = self.group_key().chain(&curr_outputs).into_owned_row();
self.prev_outputs = Some(curr_outputs);
AggChange::Insert { new_row }
}
AggChangeType::Delete => {
let prev_outputs = self.prev_outputs.take();
let old_row = self.group_key().chain(prev_outputs).into_owned_row();
AggChange::Delete { old_row }
}
AggChangeType::Update => {
let new_row = self.group_key().chain(&curr_outputs).into_owned_row();
let prev_outputs = self.prev_outputs.replace(curr_outputs);
let old_row = self.group_key().chain(prev_outputs).into_owned_row();
AggChange::Update { old_row, new_row }
}
})
}

let prev_outputs = if n_appended_ops == 0 {
self.prev_outputs.clone()
} else {
std::mem::replace(&mut self.prev_outputs, Some(curr_outputs))
};
pub fn apply_change_to_builders(
&self,
change: &AggChange,
builders: &mut [ArrayBuilderImpl],
ops: &mut Vec<Op>,
) {
match change {
AggChange::Insert { new_row } => {
trace!("insert row: {:?}", new_row);
ops.push(Op::Insert);
for (builder, new_value) in builders.iter_mut().zip_eq_fast(new_row.iter()) {
builder.append_datum(new_value);
}
}
AggChange::Delete { old_row } => {
trace!("delete row: {:?}", old_row);
ops.push(Op::Delete);
for (builder, old_value) in builders.iter_mut().zip_eq_fast(old_row.iter()) {
builder.append_datum(old_value);
}
}
AggChange::Update { old_row, new_row } => {
trace!("update row: prev = {:?}, curr = {:?}", old_row, new_row);
ops.push(Op::UpdateDelete);
ops.push(Op::UpdateInsert);
for (builder, old_value, new_value) in
itertools::multizip((builders.iter_mut(), old_row.iter(), new_row.iter()))
{
builder.append_datum(old_value);
builder.append_datum(new_value);
}
}
}
}

AggChangesInfo {
n_appended_ops,
result_row,
prev_outputs,
pub fn apply_change_to_result_table(
&self,
change: &AggChange,
result_table: &mut StateTable<S>,
) {
match change {
AggChange::Insert { new_row } => result_table.insert(new_row),
AggChange::Delete { old_row } => result_table.delete(old_row),
AggChange::Update { old_row, new_row } => result_table.update(old_row, new_row),
}
}
}
29 changes: 8 additions & 21 deletions src/stream/src/executor/global_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ use futures::StreamExt;
use futures_async_stream::try_stream;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::row::RowExt;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_storage::StateStore;

use super::agg_common::AggExecutorArgs;
use super::aggregation::{
agg_call_filter_res, iter_table_storage, AggChangesInfo, AggStateStorage, AlwaysOutput,
DistinctDeduplicater,
agg_call_filter_res, iter_table_storage, AggStateStorage, AlwaysOutput, DistinctDeduplicater,
};
use super::*;
use crate::common::table::state_table::StateTable;
Expand Down Expand Up @@ -251,29 +249,18 @@ impl<S: StateStore> GlobalSimpleAggExecutor<S> {
let mut new_ops = Vec::with_capacity(2);
// Retrieve modified states and put the changes into the builders.
let curr_outputs = vars.agg_group.get_outputs(&this.storages).await?;
let AggChangesInfo {
result_row,
prev_outputs,
n_appended_ops,
} = vars
.agg_group
.build_changes(curr_outputs, &mut builders, &mut new_ops);

if n_appended_ops == 0 {
if let Some(change) = vars.agg_group.build_change(curr_outputs) {
vars.agg_group
.apply_change_to_builders(&change, &mut builders, &mut new_ops);
vars.agg_group
.apply_change_to_result_table(&change, &mut this.result_table);
this.result_table.commit(epoch).await?;
} else {
// Agg result is not changed.
this.result_table.commit_no_data_expected(epoch);
return Ok(None);
}

// Update the result table with latest agg outputs.
if let Some(prev_outputs) = prev_outputs {
let old_row = vars.agg_group.group_key().chain(prev_outputs);
this.result_table.update(old_row, result_row);
} else {
this.result_table.insert(result_row);
}
this.result_table.commit(epoch).await?;

let columns = builders
.into_iter()
.map(|builder| builder.finish().into())
Expand Down
Loading