Skip to content

Commit

Permalink
Respect IGNORE NULLS flag in ARRAY_AGG (#260)
Browse files Browse the repository at this point in the history
* Fix size calculation in ArrayAggGroupsAccumulator

* Respect `ignore_nulls` flag in `DistinctArrayAgg`

* Respect `ignore_nulls` in `ArrayAggAccumulator`

* Respect `ignore_nulls` in `ArrayAggGroupsAccumulator`

Also add an `ArrayAgg` groups accumulator for boolean arrays
  • Loading branch information
joroKr21 authored Aug 26, 2024
1 parent 11ed341 commit 635f78e
Show file tree
Hide file tree
Showing 10 changed files with 454 additions and 246 deletions.
16 changes: 9 additions & 7 deletions datafusion-examples/examples/advanced_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,10 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator {
values,
opt_filter,
total_num_groups,
true,
|group_index, new_value| {
let prod = &mut self.prods[group_index];
*prod = prod.mul_wrapping(new_value);

*prod = prod.mul_wrapping(new_value.unwrap());
self.counts[group_index] += 1;
},
);
Expand All @@ -318,8 +318,9 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator {
partial_counts,
opt_filter,
total_num_groups,
true,
|group_index, partial_count| {
self.counts[group_index] += partial_count;
self.counts[group_index] += partial_count.unwrap();
},
);

Expand All @@ -330,9 +331,10 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator {
partial_prods,
opt_filter,
total_num_groups,
|group_index, new_value: <Float64Type as ArrowPrimitiveType>::Native| {
true,
|group_index, new_value| {
let prod = &mut self.prods[group_index];
*prod = prod.mul_wrapping(new_value);
*prod = prod.mul_wrapping(new_value.unwrap());
},
);

Expand Down Expand Up @@ -394,8 +396,8 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator {
}

fn size(&self) -> usize {
self.counts.capacity() * std::mem::size_of::<u32>()
+ self.prods.capacity() * std::mem::size_of::<Float64Type>()
self.counts.capacity() * size_of::<u32>()
+ self.prods.capacity() * size_of::<Float64Type>()
}
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1211,9 +1211,9 @@ mod tests {
.map(|i| i.to_string())
.collect();
let coll: Vec<_> = schema
.all_fields()
.flattened_fields()
.into_iter()
.map(|i| i.name().to_string())
.map(|f| f.name().to_string())
.collect();
assert_eq!(coll, order);

Expand Down
Loading

0 comments on commit 635f78e

Please sign in to comment.