Skip to content

Commit

Permalink
refactor(batch,agg): merge Aggregator::update* and refactor `SortAg…
Browse files Browse the repository at this point in the history
…gExecutor` (risingwavelabs#3794)

* feed Aggregator at most one group at a time

* impl new Aggregator functions for existing aggregators

* impl `nth` for ArrayImplIterator to speed up `iter.skip(n)`

* fix clippy warnings

* add unittest for ArrayImplIterator::nth

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
stdrc and mergify[bot] authored Jul 13, 2022
1 parent 3f03b2e commit b16c25a
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 585 deletions.
2 changes: 1 addition & 1 deletion src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl<K: HashKey + Send + Sync> HashAggExecutor<K> {
// TODO: currently not a vectorized implementation
states
.iter_mut()
.for_each(|state| state.update_with_row(&chunk, row_id).unwrap());
.for_each(|state| state.update_single(&chunk, row_id).unwrap());
}
}

Expand Down
121 changes: 67 additions & 54 deletions src/batch/src/executor/sort_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,36 +137,32 @@ impl SortAggExecutor {
.map(|(grouper, array)| grouper.detect_groups(array))
.collect::<Result<Vec<EqGroups>>>()?;

let mut groups = EqGroups::intersect(&groups);
loop {
let limit = {
if left_capacity >= groups.len() {
left_capacity -= groups.len();
0
} else {
let old = left_capacity;
left_capacity = 0;
old
}
};
groups.set_limit(limit);

SortAggExecutor::build_sorted_groups(
&mut self.sorted_groupers,
&group_columns,
&mut group_builders,
&groups,
)?;

SortAggExecutor::build_agg_states(
&mut self.agg_states,
&child_chunk,
&mut agg_builders,
&groups,
)?;
let groups = EqGroups::intersect(&groups);

let mut start_row_idx = 0;
for i in groups.indices {
let end_row_idx = i;
if start_row_idx < end_row_idx {
Self::update_sorted_groupers(
&mut self.sorted_groupers,
&group_columns,
start_row_idx,
end_row_idx,
)?;
Self::update_agg_states(
&mut self.agg_states,
&child_chunk,
start_row_idx,
end_row_idx,
)?;
}
Self::output_sorted_groupers(&mut self.sorted_groupers, &mut group_builders)?;
Self::output_agg_states(&mut self.agg_states, &mut agg_builders)?;
start_row_idx = end_row_idx;

left_capacity -= 1;
if left_capacity == 0 {
// yield output chunk
// output chunk reaches its limit size, yield it
let columns = group_builders
.into_iter()
.chain(agg_builders)
Expand All @@ -182,24 +178,27 @@ impl SortAggExecutor {

left_capacity = self.output_size_limit;
}

groups.advance_offset();
if groups.is_empty() {
break;
}
}
let row_cnt = child_chunk.cardinality();
if start_row_idx < row_cnt {
Self::update_sorted_groupers(
&mut self.sorted_groupers,
&group_columns,
start_row_idx,
row_cnt,
)?;
Self::update_agg_states(
&mut self.agg_states,
&child_chunk,
start_row_idx,
row_cnt,
)?;
}
}

assert!(left_capacity > 0);
// process the last group
self.sorted_groupers
.iter()
.zip_eq(&mut group_builders)
.try_for_each(|(grouper, builder)| grouper.output(builder))?;
self.agg_states
.iter()
.zip_eq(&mut agg_builders)
.try_for_each(|(state, builder)| state.output(builder))?;
Self::output_sorted_groupers(&mut self.sorted_groupers, &mut group_builders)?;
Self::output_agg_states(&mut self.agg_states, &mut agg_builders)?;

let columns = group_builders
.into_iter()
Expand All @@ -212,33 +211,47 @@ impl SortAggExecutor {
yield output;
}

fn build_sorted_groups(
fn update_sorted_groupers(
sorted_groupers: &mut [BoxedSortedGrouper],
group_columns: &[ArrayRef],
group_builders: &mut [ArrayBuilderImpl],
groups: &EqGroups,
start_row_idx: usize,
end_row_idx: usize,
) -> Result<()> {
sorted_groupers
.iter_mut()
.zip_eq(group_columns)
.zip_eq(group_builders)
.try_for_each(|((grouper, column), builder)| {
grouper.update_and_output_with_sorted_groups(column, builder, groups)
})
.try_for_each(|(grouper, column)| grouper.update(column, start_row_idx, end_row_idx))
}

fn build_agg_states(
fn update_agg_states(
agg_states: &mut [BoxedAggState],
child_chunk: &DataChunk,
start_row_idx: usize,
end_row_idx: usize,
) -> Result<()> {
agg_states
.iter_mut()
.try_for_each(|state| state.update_multi(child_chunk, start_row_idx, end_row_idx))
}

fn output_sorted_groupers(
sorted_groupers: &mut [BoxedSortedGrouper],
group_builders: &mut [ArrayBuilderImpl],
) -> Result<()> {
sorted_groupers
.iter_mut()
.zip_eq(group_builders)
.try_for_each(|(grouper, builder)| grouper.output_and_reset(builder))
}

fn output_agg_states(
agg_states: &mut [BoxedAggState],
agg_builders: &mut [ArrayBuilderImpl],
groups: &EqGroups,
) -> Result<()> {
agg_states
.iter_mut()
.zip_eq(agg_builders)
.try_for_each(|(state, builder)| {
state.update_and_output_with_sorted_groups(child_chunk, builder, groups)
})
.try_for_each(|(state, builder)| state.output_and_reset(builder))
}

fn create_builders(
Expand Down
28 changes: 20 additions & 8 deletions src/common/src/array/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ impl<'a> Iterator for ArrayImplIterator<'a> {
}
}

fn nth(&mut self, n: usize) -> Option<Self::Item> {
if self.pos + n >= self.data.len() {
None
} else {
let item = self.data.value_at(self.pos + n);
self.pos += n + 1;
Some(item)
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let size = self.data.len() - self.pos;
(size, Some(size))
Expand All @@ -97,24 +107,26 @@ mod tests {
#[test]
fn [<test_trusted_len_for_ $suffix_name _array>]() {
use crate::array::$builder;
let mut builder = $builder::new(3);
for _ in 0..3 {
let mut builder = $builder::new(5);
for _ in 0..5 {
builder.append_null().unwrap();
}
let array = builder.finish().unwrap();
let mut iter = array.iter();

assert_eq!(iter.size_hint(), (3, Some(3))); iter.next();
assert_eq!(iter.size_hint(), (2, Some(2))); iter.next();
assert_eq!(iter.size_hint(), (1, Some(1))); iter.next();
assert_eq!(iter.size_hint(), (5, Some(5))); iter.next();
assert_eq!(iter.size_hint(), (4, Some(4))); iter.next();
assert_eq!(iter.size_hint(), (3, Some(3))); iter.nth(0);
assert_eq!(iter.size_hint(), (2, Some(2))); iter.nth(1);
assert_eq!(iter.size_hint(), (0, Some(0)));

let array_impl = ArrayImpl::from(array);
let mut iter = array_impl.iter();

assert_eq!(iter.size_hint(), (3, Some(3))); iter.next();
assert_eq!(iter.size_hint(), (2, Some(2))); iter.next();
assert_eq!(iter.size_hint(), (1, Some(1))); iter.next();
assert_eq!(iter.size_hint(), (5, Some(5))); iter.next();
assert_eq!(iter.size_hint(), (4, Some(4))); iter.next();
assert_eq!(iter.size_hint(), (3, Some(3))); iter.nth(0);
assert_eq!(iter.size_hint(), (2, Some(2))); iter.nth(1);
assert_eq!(iter.size_hint(), (0, Some(0)));
}
}
Expand Down
35 changes: 13 additions & 22 deletions src/expr/src/vector_op/agg/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,27 @@ use crate::vector_op::agg::count_star::CountStar;
use crate::vector_op::agg::functions::*;
use crate::vector_op::agg::general_agg::*;
use crate::vector_op::agg::general_distinct_agg::*;
use crate::vector_op::agg::general_sorted_grouper::EqGroups;

/// An `Aggregator` supports `update` data and `output` result.
pub trait Aggregator: Send + 'static {
fn return_type(&self) -> DataType;

/// `update` the aggregator with a row with type checked at runtime.
fn update_with_row(&mut self, input: &DataChunk, row_id: usize) -> Result<()>;
/// `update` the aggregator with `Array` with input with type checked at runtime.
///
/// This may be deprecated as it consumes whole array without sort or hash group info.
fn update(&mut self, input: &DataChunk) -> Result<()>;
/// `update_single` update the aggregator with a single row with type checked at runtime.
fn update_single(&mut self, input: &DataChunk, row_id: usize) -> Result<()>;

/// `output` the aggregator to `ArrayBuilder` with input with type checked at runtime.
fn output(&self, builder: &mut ArrayBuilderImpl) -> Result<()>;

/// `update_and_output_with_sorted_groups` supersede `update` when grouping with the sort
/// aggregate algorithm.
///
/// Rather than updating with the whole `input` array all at once, it updates with each
/// subslice of the `input` array according to the `EqGroups`. Finished groups are outputted
/// to `builder` immediately along the way. After this call, the internal state is about
/// the last group which may continue in the next chunk. It can be obtained with `output` when
/// there are no more upstream data.
fn update_and_output_with_sorted_groups(
/// `update_multi` update the aggregator with multiple rows with type checked at runtime.
fn update_multi(
&mut self,
input: &DataChunk,
builder: &mut ArrayBuilderImpl,
groups: &EqGroups,
start_row_id: usize,
end_row_id: usize,
) -> Result<()>;

/// `output` the aggregator to `ArrayBuilder` with input with type checked at runtime.
fn output(&self, builder: &mut ArrayBuilderImpl) -> Result<()>;

/// `output_and_reset` output the aggregator to `ArrayBuilder` and reset the internal state.
fn output_and_reset(&mut self, builder: &mut ArrayBuilderImpl) -> Result<()>;
}

pub type BoxedAggState = Box<dyn Aggregator>;
Expand Down Expand Up @@ -120,7 +111,7 @@ impl AggStateFactory {
self.distinct,
)
} else {
Ok(Box::new(CountStar::new(self.return_type.clone(), 0)))
Ok(Box::new(CountStar::new(self.return_type.clone())))
}
}

Expand Down
Loading

0 comments on commit b16c25a

Please sign in to comment.