From af9d92d985728cccf45a3f5acff42a1c6d34f3c1 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 14 Dec 2023 10:18:30 -0500 Subject: [PATCH 1/2] Avoid copy in count / logical nulls --- .../physical-expr/src/aggregate/count.rs | 29 ++++++++++++++----- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index 8e9ae5cea36b..c5cb1ef6786f 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -18,6 +18,7 @@ //! Defines physical expressions that can evaluated at runtime during query execution use std::any::Any; +use std::borrow::Cow; use std::fmt::Debug; use std::ops::BitAnd; use std::sync::Arc; @@ -31,7 +32,7 @@ use arrow::{array::ArrayRef, datatypes::Field}; use arrow_array::cast::AsArray; use arrow_array::types::Int64Type; use arrow_array::PrimitiveArray; -use arrow_buffer::BooleanBuffer; +use arrow_buffer::{BooleanBuffer, NullBuffer}; use datafusion_common::{downcast_value, ScalarValue}; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::Accumulator; @@ -117,13 +118,14 @@ impl GroupsAccumulator for CountGroupsAccumulator { ) -> Result<()> { assert_eq!(values.len(), 1, "single argument to update_batch"); let values = &values[0]; + let nulls = logical_nulls(values); // Add one to each group's counter for each non null, non // filtered value self.counts.resize(total_num_groups, 0); accumulate_indices( group_indices, - values.logical_nulls().as_ref(), + nulls.as_ref().map(|nulls| nulls.as_ref()), opt_filter, |group_index| { self.counts[group_index] += 1; @@ -192,24 +194,37 @@ impl GroupsAccumulator for CountGroupsAccumulator { } } +/// Returns a `NullBuffer` indicating which values are null +/// +/// Returns a [`Cow`] to avoid cloning `NullBuffers` for array types that +/// have this information already computed such as Primitive and StringArrays +fn logical_nulls(array: &dyn Array) -> Option> { + match array.data_type() { + // These types have computed null buffers, so need a call to logical nulls + // TODO remove when upstream is released + DataType::Null | DataType::Dictionary(_, _) => { + array.logical_nulls().map(Cow::Owned) + } + _ => array.nulls().map(Cow::Borrowed), + } +} + /// count null values for multiple columns /// for each row if one column value is null, then null_count + 1 fn null_count_for_multiple_cols(values: &[ArrayRef]) -> usize { if values.len() > 1 { let result_bool_buf: Option = values .iter() - .map(|a| a.logical_nulls()) + .map(|a| logical_nulls(a)) .fold(None, |acc, b| match (acc, b) { (Some(acc), Some(b)) => Some(acc.bitand(b.inner())), (Some(acc), None) => Some(acc), - (None, Some(b)) => Some(b.into_inner()), + (None, Some(b)) => Some(b.into_owned().into_inner()), _ => None, }); result_bool_buf.map_or(0, |b| values[0].len() - b.count_set_bits()) } else { - values[0] - .logical_nulls() - .map_or(0, |nulls| nulls.null_count()) + logical_nulls(&values[0]).map_or(0, |nulls| nulls.null_count()) } } From a9dfee45e7cdc526eb839b8456a0a6ecb2963083 Mon Sep 17 00:00:00 2001 From: Georgi Krastev Date: Thu, 14 Dec 2023 16:53:30 +0100 Subject: [PATCH 2/2] Update datafusion/physical-expr/src/aggregate/count.rs Co-authored-by: Andrew Lamb --- datafusion/physical-expr/src/aggregate/count.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index c5cb1ef6786f..7c6a89dc8d64 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -202,6 +202,7 @@ fn logical_nulls(array: &dyn Array) -> Option> { match array.data_type() { // These types have computed null buffers, so need a call to logical nulls // TODO remove when upstream is released + // https://github.com/apache/arrow-rs/issues/5208 DataType::Null | DataType::Dictionary(_, _) => { array.logical_nulls().map(Cow::Owned) }