-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Is your feature request related to a problem or challenge?
When GroupedHashAggregateStream finishes processing input and transitions to emitting accumulated groups, it calls emit(EmitTo::All) which materializes all groups at once. For high-cardinality aggregations (>500k groups) or complex grouping keys (Strings, Lists), this becomes a CPU-intensive blocking operation that stalls the async runtime for hundreds of milliseconds to seconds.
This "long poll" prevents other tasks on the same thread from running, causing latency spikes and system "hiccups." We've observed AggregateExec stalling the runtime for >1s when processing queries with ~10M groups.
Prior attempts to fix this
In #18906, we introduced a DrainingGroups state to emit groups incrementally. In #19562, we implemented EmitTo::Next(n) for true incremental emission at the GroupColumn level. This works correctly but revealed a ~15x performance regression on high-cardinality queries.
Root cause
The current GroupColumn implementations store values in contiguous Vec<T>. When emitting first n elements via take_n(), all remaining elements must be shifted:
fn take_n(&mut self, n: usize) -> ArrayRef {
let first_n = self.group_values.drain(0..n).collect(); // O(remaining)!
}Profiling showed costs distributed across Vec::from_iter (allocations), MaybeNullBufferBuilder::take_n (copying), and _platform_memmove (shifting).
Conclusion: Incremental emission is the right approach, but requires chunked storage to be efficient.
Describe the solution you'd like
This epic tracks two complementary changes:
1. Chunked Storage for GroupColumn
Replace contiguous Vec<T> with a chunked structure so take_n() is O(n) instead of O(remaining):
Current: Vec [v0, v1, v2, ...vN] → take_n(3) shifts all remaining = O(remaining)
Chunked: [Chunk0] → [Chunk1] → [Chunk2] → take_n(3) advances head = O(n)
2. Incremental Emission in HashAggregate
Update GroupedHashAggregateStream to call emit(EmitTo::Next(batch_size)) iteratively instead of emit(EmitTo::All), yielding between emissions.
Implementation phases:
1. Infrastructure
- Add
ChunkedVec<T>data structure withpush,len,get,take_n,iter, andsizemethods - Add
ChunkedNullBufferBuilderfor chunked null bitmap storage withtake_nsupport
2. GroupColumn Migrations
- Migrate
PrimitiveGroupValueBuilderto useChunkedVecfor group values storage - Migrate
BooleanGroupValueBuilderto use chunked storage - Migrate
ByteGroupValueBuilderto use chunked offsets and buffer - Migrate
ByteViewGroupValueBuilderto use chunked views storage
3. Emission Path
- Optimize
emit()index adjustment forEmitTo::First(n)with chunked storage - Update
GroupedHashAggregateStreamto useemit(EmitTo::Next(batch_size))iteratively inProducingOutputstate
Describe alternatives you've considered
Increase target_partitions which reduces groups per partition and could mitigate the issue. Can work but doesn't solve the fundamental problem and has other trade-offs (e.g. working on smaller partitions can slow down other operators).
Additional context
Related issues & experiments