diff --git a/src/stream/src/executor/hash_agg.rs b/src/stream/src/executor/hash_agg.rs index 245921beb4060..73ea1aa0dcdcb 100644 --- a/src/stream/src/executor/hash_agg.rs +++ b/src/stream/src/executor/hash_agg.rs @@ -14,6 +14,7 @@ use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; +use std::ptr::NonNull; use std::sync::Arc; use futures::{stream, StreamExt, TryStreamExt}; @@ -436,11 +437,17 @@ impl HashAggExecutor { // Calculate current outputs, concurrently. let futs = keys_in_batch.into_iter().map(|key| { - // Pop out the agg group temporarily. - let mut agg_group = vars - .agg_group_cache - .pop(&key) - .expect("changed group must have corresponding AggGroup"); + // Get agg group of the key. + let agg_group = { + let mut ptr: NonNull<_> = vars + .agg_group_cache + .get_mut(&key) + .expect("changed group must have corresponding AggGroup") + .into(); + // SAFETY: `key`s in `keys_in_batch` are unique by nature, because they're + // from `group_change_set` which is a set. + unsafe { ptr.as_mut() } + }; async { let curr_outputs = agg_group.get_outputs(&this.storages).await?; Ok::<_, StreamExecutorError>((key, agg_group, curr_outputs)) @@ -452,7 +459,7 @@ impl HashAggExecutor { .try_collect() .await?; - for (key, mut agg_group, curr_outputs) in outputs_in_batch { + for (key, agg_group, curr_outputs) in outputs_in_batch { let AggChangesInfo { n_appended_ops, result_row, @@ -477,9 +484,6 @@ impl HashAggExecutor { this.result_table.insert(result_row); } } - - // Put the agg group back into the agg group cache. - vars.agg_group_cache.put(key, agg_group); } let columns = builders