Skip to content

Commit

Permalink
refactor(hash agg): directly use AggGroup instead of Box<AggGroup> (
Browse files Browse the repository at this point in the history
risingwavelabs#8745)

Signed-off-by: Richard Chien <stdrc@outlook.com>
  • Loading branch information
stdrc authored Mar 24, 2023
1 parent 5f41727 commit 5992017
Showing 1 changed file with 16 additions and 19 deletions.
35 changes: 16 additions & 19 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ use super::{
use crate::cache::{cache_may_stale, new_with_hasher, ExecutorCache};
use crate::common::table::state_table::StateTable;
use crate::error::StreamResult;
use crate::executor::aggregation::{generate_agg_schema, AggCall, AggGroup};
use crate::executor::aggregation::{generate_agg_schema, AggCall, AggGroup as GenericAggGroup};
use crate::executor::error::StreamExecutorError;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{BoxedMessageStream, Message};
use crate::task::AtomicU64Ref;

type BoxedAggGroup<S> = Box<AggGroup<S, OnlyOutputIfHasInput>>;
type AggGroupCache<K, S> = ExecutorCache<K, BoxedAggGroup<S>, PrecomputedBuildHasher>;
type AggGroup<S> = GenericAggGroup<S, OnlyOutputIfHasInput>;
type AggGroupCache<K, S> = ExecutorCache<K, AggGroup<S>, PrecomputedBuildHasher>;

/// [`HashAggExecutor`] could process large amounts of data using a state backend. It works as
/// follows:
Expand Down Expand Up @@ -255,19 +255,17 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
Some(async {
// Create `AggGroup` for the current group if not exists. This will
// fetch previous agg result from the result table.
let agg_group = Box::new(
AggGroup::create(
Some(key.deserialize(group_key_types)?),
&this.agg_calls,
&this.storages,
&this.result_table,
&this.input_pk_indices,
this.row_count_index,
this.extreme_cache_size,
&this.input_schema,
)
.await?,
);
let agg_group = AggGroup::create(
Some(key.deserialize(group_key_types)?),
&this.agg_calls,
&this.storages,
&this.result_table,
&this.input_pk_indices,
this.row_count_index,
this.extreme_cache_size,
&this.input_schema,
)
.await?;
Ok::<_, StreamExecutorError>((key.clone(), agg_group))
})
}
Expand Down Expand Up @@ -345,7 +343,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {

// Apply chunk to each of the state (per agg_call), for each group.
for (key, visibility) in group_visibilities {
let agg_group = vars.agg_group_cache.get_mut(&key).unwrap().as_mut();
let agg_group = vars.agg_group_cache.get_mut(&key).unwrap();
let visibilities = call_visibilities
.iter()
.map(Option::as_ref)
Expand Down Expand Up @@ -421,8 +419,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
let agg_group = vars
.agg_group_cache
.get_mut(key)
.expect("changed group must have corresponding AggGroup")
.as_mut();
.expect("changed group must have corresponding AggGroup");
agg_group.flush_state_if_needed(&mut this.storages).await?;
}

Expand Down

0 comments on commit 5992017

Please sign in to comment.