From 757bba6f4829a8597509fe7556aa274a170d28d3 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Sat, 15 Jun 2024 23:46:48 +0530 Subject: [PATCH 01/16] remove bit and or xor from expr --- datafusion/expr/src/aggregate_function.rs | 20 ------------------- .../expr/src/type_coercion/aggregates.rs | 14 ------------- 2 files changed, 34 deletions(-) diff --git a/datafusion/expr/src/aggregate_function.rs b/datafusion/expr/src/aggregate_function.rs index 81562bf12476..be0f3887079f 100644 --- a/datafusion/expr/src/aggregate_function.rs +++ b/datafusion/expr/src/aggregate_function.rs @@ -51,12 +51,6 @@ pub enum AggregateFunction { ApproxPercentileContWithWeight, /// Grouping Grouping, - /// Bit And - BitAnd, - /// Bit Or - BitOr, - /// Bit Xor - BitXor, /// Bool And BoolAnd, /// Bool Or @@ -78,9 +72,6 @@ impl AggregateFunction { ApproxPercentileCont => "APPROX_PERCENTILE_CONT", ApproxPercentileContWithWeight => "APPROX_PERCENTILE_CONT_WITH_WEIGHT", Grouping => "GROUPING", - BitAnd => "BIT_AND", - BitOr => "BIT_OR", - BitXor => "BIT_XOR", BoolAnd => "BOOL_AND", BoolOr => "BOOL_OR", StringAgg => "STRING_AGG", @@ -100,9 +91,6 @@ impl FromStr for AggregateFunction { Ok(match name { // general "avg" => AggregateFunction::Avg, - "bit_and" => AggregateFunction::BitAnd, - "bit_or" => AggregateFunction::BitOr, - "bit_xor" => AggregateFunction::BitXor, "bool_and" => AggregateFunction::BoolAnd, "bool_or" => AggregateFunction::BoolOr, "max" => AggregateFunction::Max, @@ -155,9 +143,6 @@ impl AggregateFunction { // The coerced_data_types is same with input_types. Ok(coerced_data_types[0].clone()) } - AggregateFunction::BitAnd - | AggregateFunction::BitOr - | AggregateFunction::BitXor => Ok(coerced_data_types[0].clone()), AggregateFunction::BoolAnd | AggregateFunction::BoolOr => { Ok(DataType::Boolean) } @@ -214,11 +199,6 @@ impl AggregateFunction { .collect::>(); Signature::uniform(1, valid, Volatility::Immutable) } - AggregateFunction::BitAnd - | AggregateFunction::BitOr - | AggregateFunction::BitXor => { - Signature::uniform(1, INTEGERS.to_vec(), Volatility::Immutable) - } AggregateFunction::BoolAnd | AggregateFunction::BoolOr => { Signature::uniform(1, vec![DataType::Boolean], Volatility::Immutable) } diff --git a/datafusion/expr/src/type_coercion/aggregates.rs b/datafusion/expr/src/type_coercion/aggregates.rs index 6c9a71bab46a..a94c9ebd8f46 100644 --- a/datafusion/expr/src/type_coercion/aggregates.rs +++ b/datafusion/expr/src/type_coercion/aggregates.rs @@ -122,20 +122,6 @@ pub fn coerce_types( }; Ok(vec![v]) } - AggregateFunction::BitAnd - | AggregateFunction::BitOr - | AggregateFunction::BitXor => { - // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc - // smallint, int, bigint, real, double precision, decimal, or interval. - if !is_bit_and_or_xor_support_arg_type(&input_types[0]) { - return plan_err!( - "The function {:?} does not support inputs of type {:?}.", - agg_fun, - input_types[0] - ); - } - Ok(input_types.to_vec()) - } AggregateFunction::BoolAnd | AggregateFunction::BoolOr => { // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc // smallint, int, bigint, real, double precision, decimal, or interval. From eb0c87069e5b8d62300dbbc71a61ebbec6846fee Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Sun, 16 Jun 2024 00:03:39 +0530 Subject: [PATCH 02/16] remove bit and or xor from physical expr and proto --- .../physical-expr/src/aggregate/build_in.rs | 76 +------------------ datafusion/physical-expr/src/aggregate/mod.rs | 1 - datafusion/proto/proto/datafusion.proto | 6 +- datafusion/proto/src/generated/prost.rs | 9 +-- .../proto/src/logical_plan/from_proto.rs | 3 - .../proto/src/physical_plan/to_proto.rs | 13 +--- 6 files changed, 9 insertions(+), 99 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs b/datafusion/physical-expr/src/aggregate/build_in.rs index df87a2e261a1..ed85c2b44577 100644 --- a/datafusion/physical-expr/src/aggregate/build_in.rs +++ b/datafusion/physical-expr/src/aggregate/build_in.rs @@ -65,26 +65,6 @@ pub fn create_aggregate_expr( name, data_type, )), - (AggregateFunction::BitAnd, _) => Arc::new(expressions::BitAnd::new( - input_phy_exprs[0].clone(), - name, - data_type, - )), - (AggregateFunction::BitOr, _) => Arc::new(expressions::BitOr::new( - input_phy_exprs[0].clone(), - name, - data_type, - )), - (AggregateFunction::BitXor, false) => Arc::new(expressions::BitXor::new( - input_phy_exprs[0].clone(), - name, - data_type, - )), - (AggregateFunction::BitXor, true) => Arc::new(expressions::DistinctBitXor::new( - input_phy_exprs[0].clone(), - name, - data_type, - )), (AggregateFunction::BoolAnd, _) => Arc::new(expressions::BoolAnd::new( input_phy_exprs[0].clone(), name, @@ -234,7 +214,7 @@ mod tests { use super::*; use crate::expressions::{ - try_cast, ApproxPercentileCont, ArrayAgg, Avg, BitAnd, BitOr, BitXor, BoolAnd, + try_cast, ApproxPercentileCont, ArrayAgg, Avg, BoolAnd, BoolOr, DistinctArrayAgg, Max, Min, }; @@ -406,60 +386,6 @@ mod tests { Ok(()) } - #[test] - fn test_bit_and_or_xor_expr() -> Result<()> { - let funcs = vec![ - AggregateFunction::BitAnd, - AggregateFunction::BitOr, - AggregateFunction::BitXor, - ]; - let data_types = vec![DataType::UInt64, DataType::Int64]; - for fun in funcs { - for data_type in &data_types { - let input_schema = - Schema::new(vec![Field::new("c1", data_type.clone(), true)]); - let input_phy_exprs: Vec> = vec![Arc::new( - expressions::Column::new_with_schema("c1", &input_schema).unwrap(), - )]; - let result_agg_phy_exprs = create_physical_agg_expr_for_test( - &fun, - false, - &input_phy_exprs[0..1], - &input_schema, - "c1", - )?; - match fun { - AggregateFunction::BitAnd => { - assert!(result_agg_phy_exprs.as_any().is::()); - assert_eq!("c1", result_agg_phy_exprs.name()); - assert_eq!( - Field::new("c1", data_type.clone(), true), - result_agg_phy_exprs.field().unwrap() - ); - } - AggregateFunction::BitOr => { - assert!(result_agg_phy_exprs.as_any().is::()); - assert_eq!("c1", result_agg_phy_exprs.name()); - assert_eq!( - Field::new("c1", data_type.clone(), true), - result_agg_phy_exprs.field().unwrap() - ); - } - AggregateFunction::BitXor => { - assert!(result_agg_phy_exprs.as_any().is::()); - assert_eq!("c1", result_agg_phy_exprs.name()); - assert_eq!( - Field::new("c1", data_type.clone(), true), - result_agg_phy_exprs.field().unwrap() - ); - } - _ => {} - }; - } - } - Ok(()) - } - #[test] fn test_bool_and_or_expr() -> Result<()> { let funcs = vec![AggregateFunction::BoolAnd, AggregateFunction::BoolOr]; diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index 9079a81e6241..6c4d71c281df 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -23,7 +23,6 @@ pub(crate) mod array_agg; pub(crate) mod array_agg_distinct; pub(crate) mod array_agg_ordered; pub(crate) mod average; -pub(crate) mod bit_and_or_xor; pub(crate) mod bool_and_or; pub(crate) mod correlation; pub(crate) mod covariance; diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 83223a04d023..7dd026cdb313 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -491,9 +491,9 @@ enum AggregateFunction { APPROX_PERCENTILE_CONT_WITH_WEIGHT = 16; GROUPING = 17; // MEDIAN = 18; - BIT_AND = 19; - BIT_OR = 20; - BIT_XOR = 21; + // BIT_AND = 19; + // BIT_OR = 20; + // BIT_XOR = 21; BOOL_AND = 22; BOOL_OR = 23; // REGR_SLOPE = 26; diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index fa0217e9ef4f..58b15cef2fd8 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1945,9 +1945,9 @@ pub enum AggregateFunction { ApproxPercentileContWithWeight = 16, Grouping = 17, /// MEDIAN = 18; - BitAnd = 19, - BitOr = 20, - BitXor = 21, + /// BitAnd = 19, + /// BitOr = 20, + /// BitXor = 21, BoolAnd = 22, BoolOr = 23, /// REGR_SLOPE = 26; @@ -1979,9 +1979,6 @@ impl AggregateFunction { "APPROX_PERCENTILE_CONT_WITH_WEIGHT" } AggregateFunction::Grouping => "GROUPING", - AggregateFunction::BitAnd => "BIT_AND", - AggregateFunction::BitOr => "BIT_OR", - AggregateFunction::BitXor => "BIT_XOR", AggregateFunction::BoolAnd => "BOOL_AND", AggregateFunction::BoolOr => "BOOL_OR", AggregateFunction::StringAgg => "STRING_AGG", diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index ed7b0129cc48..aed47d72748b 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -140,9 +140,6 @@ impl From for AggregateFunction { protobuf::AggregateFunction::Min => Self::Min, protobuf::AggregateFunction::Max => Self::Max, protobuf::AggregateFunction::Avg => Self::Avg, - protobuf::AggregateFunction::BitAnd => Self::BitAnd, - protobuf::AggregateFunction::BitOr => Self::BitOr, - protobuf::AggregateFunction::BitXor => Self::BitXor, protobuf::AggregateFunction::BoolAnd => Self::BoolAnd, protobuf::AggregateFunction::BoolOr => Self::BoolOr, protobuf::AggregateFunction::ArrayAgg => Self::ArrayAgg, diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index ef462ac94b9a..1d736a717d9a 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -24,8 +24,8 @@ use datafusion::physical_expr::window::{NthValueKind, SlidingAggregateWindowExpr use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr}; use datafusion::physical_plan::expressions::{ ApproxPercentileCont, ApproxPercentileContWithWeight, ArrayAgg, Avg, BinaryExpr, - BitAnd, BitOr, BitXor, BoolAnd, BoolOr, CaseExpr, CastExpr, Column, Correlation, - CumeDist, DistinctArrayAgg, DistinctBitXor, Grouping, InListExpr, IsNotNullExpr, + BoolAnd, BoolOr, CaseExpr, CastExpr, Column, Correlation, + CumeDist, DistinctArrayAgg, Grouping, InListExpr, IsNotNullExpr, IsNullExpr, Literal, Max, Min, NegativeExpr, NotExpr, NthValue, NthValueAgg, Ntile, OrderSensitiveArrayAgg, Rank, RankType, RowNumber, StringAgg, TryCastExpr, WindowShift, @@ -242,15 +242,6 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> Result { let inner = if aggr_expr.downcast_ref::().is_some() { protobuf::AggregateFunction::Grouping - } else if aggr_expr.downcast_ref::().is_some() { - protobuf::AggregateFunction::BitAnd - } else if aggr_expr.downcast_ref::().is_some() { - protobuf::AggregateFunction::BitOr - } else if aggr_expr.downcast_ref::().is_some() { - protobuf::AggregateFunction::BitXor - } else if aggr_expr.downcast_ref::().is_some() { - distinct = true; - protobuf::AggregateFunction::BitXor } else if aggr_expr.downcast_ref::().is_some() { protobuf::AggregateFunction::BoolAnd } else if aggr_expr.downcast_ref::().is_some() { From f5cc74d84ae2fcdb6dae582c6a75ffde66cf5385 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Sun, 16 Jun 2024 00:34:41 +0530 Subject: [PATCH 03/16] add proto regen changes --- datafusion/expr/src/type_coercion/aggregates.rs | 4 ---- datafusion/proto/src/generated/pbjson.rs | 9 --------- datafusion/proto/src/generated/prost.rs | 9 +++------ 3 files changed, 3 insertions(+), 19 deletions(-) diff --git a/datafusion/expr/src/type_coercion/aggregates.rs b/datafusion/expr/src/type_coercion/aggregates.rs index a94c9ebd8f46..7e4991c69c98 100644 --- a/datafusion/expr/src/type_coercion/aggregates.rs +++ b/datafusion/expr/src/type_coercion/aggregates.rs @@ -386,10 +386,6 @@ pub fn avg_sum_type(arg_type: &DataType) -> Result { } } -pub fn is_bit_and_or_xor_support_arg_type(arg_type: &DataType) -> bool { - NUMERICS.contains(arg_type) -} - pub fn is_bool_and_or_support_arg_type(arg_type: &DataType) -> bool { matches!(arg_type, DataType::Boolean) } diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index f298dd241abf..dd52355bbf0a 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -540,9 +540,6 @@ impl serde::Serialize for AggregateFunction { Self::ApproxPercentileCont => "APPROX_PERCENTILE_CONT", Self::ApproxPercentileContWithWeight => "APPROX_PERCENTILE_CONT_WITH_WEIGHT", Self::Grouping => "GROUPING", - Self::BitAnd => "BIT_AND", - Self::BitOr => "BIT_OR", - Self::BitXor => "BIT_XOR", Self::BoolAnd => "BOOL_AND", Self::BoolOr => "BOOL_OR", Self::StringAgg => "STRING_AGG", @@ -566,9 +563,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction { "APPROX_PERCENTILE_CONT", "APPROX_PERCENTILE_CONT_WITH_WEIGHT", "GROUPING", - "BIT_AND", - "BIT_OR", - "BIT_XOR", "BOOL_AND", "BOOL_OR", "STRING_AGG", @@ -621,9 +615,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction { "APPROX_PERCENTILE_CONT" => Ok(AggregateFunction::ApproxPercentileCont), "APPROX_PERCENTILE_CONT_WITH_WEIGHT" => Ok(AggregateFunction::ApproxPercentileContWithWeight), "GROUPING" => Ok(AggregateFunction::Grouping), - "BIT_AND" => Ok(AggregateFunction::BitAnd), - "BIT_OR" => Ok(AggregateFunction::BitOr), - "BIT_XOR" => Ok(AggregateFunction::BitXor), "BOOL_AND" => Ok(AggregateFunction::BoolAnd), "BOOL_OR" => Ok(AggregateFunction::BoolOr), "STRING_AGG" => Ok(AggregateFunction::StringAgg), diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 58b15cef2fd8..f068c53d193b 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1945,9 +1945,9 @@ pub enum AggregateFunction { ApproxPercentileContWithWeight = 16, Grouping = 17, /// MEDIAN = 18; - /// BitAnd = 19, - /// BitOr = 20, - /// BitXor = 21, + /// BIT_AND = 19; + /// BIT_OR = 20; + /// BIT_XOR = 21; BoolAnd = 22, BoolOr = 23, /// REGR_SLOPE = 26; @@ -1998,9 +1998,6 @@ impl AggregateFunction { Some(Self::ApproxPercentileContWithWeight) } "GROUPING" => Some(Self::Grouping), - "BIT_AND" => Some(Self::BitAnd), - "BIT_OR" => Some(Self::BitOr), - "BIT_XOR" => Some(Self::BitXor), "BOOL_AND" => Some(Self::BoolAnd), "BOOL_OR" => Some(Self::BoolOr), "STRING_AGG" => Some(Self::StringAgg), From 418f37bd7363c12db041ee032178eea25d889361 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Sun, 16 Jun 2024 04:05:05 +0530 Subject: [PATCH 04/16] impl BitAnd, BitOr, BitXor UADF --- .../functions-aggregate/src/bit_and_or_xor.rs | 502 +++++++++++++ datafusion/functions-aggregate/src/lib.rs | 7 + .../src/aggregate/bit_and_or_xor.rs | 695 ------------------ .../physical-expr/src/expressions/mod.rs | 1 - datafusion/proto/src/logical_plan/to_proto.rs | 6 - 5 files changed, 509 insertions(+), 702 deletions(-) create mode 100644 datafusion/functions-aggregate/src/bit_and_or_xor.rs delete mode 100644 datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs diff --git a/datafusion/functions-aggregate/src/bit_and_or_xor.rs b/datafusion/functions-aggregate/src/bit_and_or_xor.rs new file mode 100644 index 000000000000..a7a4455d1ed1 --- /dev/null +++ b/datafusion/functions-aggregate/src/bit_and_or_xor.rs @@ -0,0 +1,502 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines BitAnd, BitOr, and BitXor Aggregate accumulators + +use std::any::Any; +use std::collections::HashSet; +use ahash::RandomState; +use datafusion_common::cast::as_list_array; + +use arrow::array::{Array, ArrayRef, AsArray}; +use arrow::datatypes::{ArrowNativeType, ArrowNumericType}; +use arrow::datatypes::{ + DataType, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, +}; +use arrow_schema::Field; + +use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue}; +use datafusion_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility}; +use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; +use datafusion_expr::type_coercion::aggregates::NUMERICS; +use datafusion_expr::utils::format_state_name; + +macro_rules! downcast_logical { + ($args:ident, $helper:ident) => { + match $args.data_type { + DataType::Int8 => $helper!(Int8Type, $args.data_type), + DataType::Int16 => $helper!(Int16Type, $args.data_type), + DataType::Int32 => $helper!(Int32Type, $args.data_type), + DataType::Int64 => $helper!(Int64Type, $args.data_type), + DataType::UInt8 => $helper!(UInt8Type, $args.data_type), + DataType::UInt16 => $helper!(UInt16Type, $args.data_type), + DataType::UInt32 => $helper!(UInt32Type, $args.data_type), + DataType::UInt64 => $helper!(UInt64Type, $args.data_type), + _ => { + not_impl_err!("Sum not supported for {}: {}", $args.name, $args.data_type) + } + } + }; +} + +make_udaf_expr_and_func!( + BitAnd, + bit_and, + expression, + "Returns the bit wise and of a group of values.", + bit_and_udaf +); + +make_udaf_expr_and_func!( + BitOr, + bit_or, + expression, + "Returns the bit wise or of a group of values.", + bit_or_udaf +); + +make_udaf_expr_and_func!( + BitXor, + bit_xor, + expression, + "Returns the bit wise xor of a group of values.", + bit_xor_udaf +); + +#[derive(Debug)] +pub struct BitAnd { + signature: Signature, +} + +impl BitAnd { + pub fn new() -> Self { + Self { + signature: Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable) + } + } +} + +impl Default for BitAnd { + fn default() -> Self { + Self::new() + } +} + +impl AggregateUDFImpl for BitAnd { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "bit_and" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + match &arg_types[0] { + DataType::Int8 | + DataType::Int16 | + DataType::Int32 | + DataType::Int64 | + DataType::UInt8 | + DataType::UInt16 | + DataType::UInt32 | + DataType::UInt64 => { + Ok(arg_types[0].clone()) + } + other => { + exec_err!("[return_type] SUM not supported for {}", other) + } + } + } + + fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { + macro_rules! helper { + ($t:ty, $dt:expr) => { + Ok(Box::new(BitAndAccumulator::<$t>::default())) + }; + } + downcast_logical!(acc_args, helper) + } +} + +struct BitAndAccumulator { + value: Option, +} + +impl std::fmt::Debug for BitAndAccumulator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BitAndAccumulator({})", T::DATA_TYPE) + } +} + +impl Default for BitAndAccumulator { + fn default() -> Self { + Self { value: None } + } +} + +impl Accumulator for BitAndAccumulator + where + T::Native: std::ops::BitAnd, +{ + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if let Some(x) = arrow::compute::bit_and(values[0].as_primitive::()) { + let v = self.value.get_or_insert(x); + *v = *v & x; + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.update_batch(states) + } + + fn state(&mut self) -> Result> { + Ok(vec![self.evaluate()?]) + } + + fn evaluate(&mut self) -> Result { + ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + } +} + +#[derive(Debug)] +pub struct BitOr { + signature: Signature, +} + +impl BitOr { + pub fn new() -> Self { + Self { + signature: Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable) + } + } +} + +impl Default for BitOr { + fn default() -> Self { + Self::new() + } +} + +impl AggregateUDFImpl for BitOr { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "bit_or" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + match &arg_types[0] { + DataType::Int8 | + DataType::Int16 | + DataType::Int32 | + DataType::Int64 | + DataType::UInt8 | + DataType::UInt16 | + DataType::UInt32 | + DataType::UInt64 => { + Ok(arg_types[0].clone()) + } + other => { + exec_err!("[return_type] SUM not supported for {}", other) + } + } + } + + fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { + macro_rules! helper { + ($t:ty, $dt:expr) => { + Ok(Box::new(BitOrAccumulator::<$t>::default())) + }; + } + downcast_logical!(acc_args, helper) + } +} + +struct BitOrAccumulator { + value: Option, +} + +impl std::fmt::Debug for BitOrAccumulator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BitOrAccumulator({})", T::DATA_TYPE) + } +} + +impl Default for BitOrAccumulator { + fn default() -> Self { + Self { value: None } + } +} + +impl Accumulator for BitOrAccumulator + where + T::Native: std::ops::BitOr, +{ + fn state(&mut self) -> Result> { + Ok(vec![self.evaluate()?]) + } + + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if let Some(x) = arrow::compute::bit_or(values[0].as_primitive::()) { + let v = self.value.get_or_insert(T::Native::usize_as(0)); + *v = *v | x; + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.update_batch(states) + } + + fn evaluate(&mut self) -> Result { + ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + } +} + +#[derive(Debug)] +pub struct BitXor { + signature: Signature, +} + +impl BitXor { + pub fn new() -> Self { + Self { + signature: Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable) + } + } +} + +impl Default for BitXor { + fn default() -> Self { + Self::new() + } +} + +impl AggregateUDFImpl for BitXor { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "bit_xor" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + match &arg_types[0] { + DataType::Int8 | + DataType::Int16 | + DataType::Int32 | + DataType::Int64 | + DataType::UInt8 | + DataType::UInt16 | + DataType::UInt32 | + DataType::UInt64 => { + Ok(arg_types[0].clone()) + } + other => { + exec_err!("[return_type] SUM not supported for {}", other) + } + } + } + + fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { + + if acc_args.is_distinct { + macro_rules! helper { + ($t:ty, $dt:expr) => { + Ok(Box::new(DistinctBitXorAccumulator::<$t>::default())) + }; + } + downcast_logical!(acc_args, helper) + } else { + macro_rules! helper { + ($t:ty, $dt:expr) => { + Ok(Box::new(BitXorAccumulator::<$t>::default())) + }; + } + downcast_logical!(acc_args, helper) + } + } + + fn state_fields(&self, args: StateFieldsArgs) -> Result> { + if args.is_distinct { + Ok(vec![Field::new_list( + format_state_name(args.name, "xor distinct"), + Field::new("item", args.return_type.clone(), true), + false, + )]) + } else { + Ok(vec![Field::new( + format_state_name(args.name, "xor"), + args.return_type.clone(), + true, + )]) + } + } + +} + +struct BitXorAccumulator { + value: Option, +} + +impl std::fmt::Debug for BitXorAccumulator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BitXorAccumulator({})", T::DATA_TYPE) + } +} + +impl Default for BitXorAccumulator { + fn default() -> Self { + Self { value: None } + } +} + +impl Accumulator for BitXorAccumulator + where + T::Native: std::ops::BitXor, +{ + fn state(&mut self) -> Result> { + Ok(vec![self.evaluate()?]) + } + + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if let Some(x) = arrow::compute::bit_xor(values[0].as_primitive::()) { + let v = self.value.get_or_insert(T::Native::usize_as(0)); + *v = *v ^ x; + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.update_batch(states) + } + + fn evaluate(&mut self) -> Result { + ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + } +} + +struct DistinctBitXorAccumulator { + values: HashSet, +} + +impl std::fmt::Debug for DistinctBitXorAccumulator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "DistinctBitXorAccumulator({})", T::DATA_TYPE) + } +} + +impl Default for DistinctBitXorAccumulator { + fn default() -> Self { + Self { + values: HashSet::default(), + } + } +} + +impl Accumulator for DistinctBitXorAccumulator + where + T::Native: std::ops::BitXor + std::hash::Hash + Eq, +{ + fn state(&mut self) -> Result> { + // 1. Stores aggregate state in `ScalarValue::List` + // 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set + let state_out = { + let values = self + .values + .iter() + .map(|x| ScalarValue::new_primitive::(Some(*x), &T::DATA_TYPE)) + .collect::>>()?; + + let arr = ScalarValue::new_list(&values, &T::DATA_TYPE); + vec![ScalarValue::List(arr)] + }; + Ok(state_out) + } + + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if values.is_empty() { + return Ok(()); + } + + let array = values[0].as_primitive::(); + match array.nulls().filter(|x| x.null_count() > 0) { + Some(n) => { + for idx in n.valid_indices() { + self.values.insert(array.value(idx)); + } + } + None => array.values().iter().for_each(|x| { + self.values.insert(*x); + }), + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + if let Some(state) = states.first() { + let list_arr = as_list_array(state)?; + for arr in list_arr.iter().flatten() { + self.update_batch(&[arr])?; + } + } + Ok(()) + } + + fn evaluate(&mut self) -> Result { + let mut acc = T::Native::usize_as(0); + for distinct_value in self.values.iter() { + acc = acc ^ *distinct_value; + } + let v = (!self.values.is_empty()).then_some(acc); + ScalarValue::new_primitive::(v, &T::DATA_TYPE) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + + self.values.capacity() * std::mem::size_of::() + } +} \ No newline at end of file diff --git a/datafusion/functions-aggregate/src/lib.rs b/datafusion/functions-aggregate/src/lib.rs index fabe15e416f4..5ebd7b3303eb 100644 --- a/datafusion/functions-aggregate/src/lib.rs +++ b/datafusion/functions-aggregate/src/lib.rs @@ -68,6 +68,7 @@ pub mod variance; pub mod approx_median; pub mod approx_percentile_cont; +pub mod bit_and_or_xor; use datafusion_common::Result; use datafusion_execution::FunctionRegistry; @@ -100,6 +101,9 @@ pub mod expr_fn { pub use super::sum::sum; pub use super::variance::var_pop; pub use super::variance::var_sample; + pub use super::bit_and_or_xor::bit_and; + pub use super::bit_and_or_xor::bit_or; + pub use super::bit_and_or_xor::bit_xor; } /// Returns all default aggregate functions @@ -127,6 +131,9 @@ pub fn all_default_aggregate_functions() -> Vec> { stddev::stddev_pop_udaf(), approx_median::approx_median_udaf(), approx_distinct::approx_distinct_udaf(), + bit_and_or_xor::bit_and_udaf(), + bit_and_or_xor::bit_or_udaf(), + bit_and_or_xor::bit_xor_udaf() ] } diff --git a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs deleted file mode 100644 index 3fa225c5e479..000000000000 --- a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs +++ /dev/null @@ -1,695 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Defines BitAnd, BitOr, and BitXor Aggregate accumulators - -use ahash::RandomState; -use datafusion_common::cast::as_list_array; -use std::any::Any; -use std::sync::Arc; - -use crate::{AggregateExpr, PhysicalExpr}; -use arrow::datatypes::DataType; -use arrow::{array::ArrayRef, datatypes::Field}; -use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue}; -use datafusion_expr::{Accumulator, GroupsAccumulator}; -use std::collections::HashSet; - -use crate::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; -use crate::aggregate::utils::down_cast_any_ref; -use crate::expressions::format_state_name; -use arrow::array::Array; -use arrow::compute::{bit_and, bit_or, bit_xor}; -use arrow_array::cast::AsArray; -use arrow_array::{downcast_integer, ArrowNumericType}; -use arrow_buffer::ArrowNativeType; - -/// BIT_AND aggregate expression -#[derive(Debug, Clone)] -pub struct BitAnd { - name: String, - pub data_type: DataType, - expr: Arc, - nullable: bool, -} - -impl BitAnd { - /// Create a new BIT_AND aggregate function - pub fn new( - expr: Arc, - name: impl Into, - data_type: DataType, - ) -> Self { - Self { - name: name.into(), - expr, - data_type, - nullable: true, - } - } -} - -impl AggregateExpr for BitAnd { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn field(&self) -> Result { - Ok(Field::new( - &self.name, - self.data_type.clone(), - self.nullable, - )) - } - - fn create_accumulator(&self) -> Result> { - macro_rules! helper { - ($t:ty) => { - Ok(Box::>::default()) - }; - } - downcast_integer! { - &self.data_type => (helper), - _ => Err(DataFusionError::NotImplemented(format!( - "BitAndAccumulator not supported for {} with {}", - self.name(), - self.data_type - ))), - } - } - - fn state_fields(&self) -> Result> { - Ok(vec![Field::new( - format_state_name(&self.name, "bit_and"), - self.data_type.clone(), - self.nullable, - )]) - } - - fn expressions(&self) -> Vec> { - vec![self.expr.clone()] - } - - fn name(&self) -> &str { - &self.name - } - - fn groups_accumulator_supported(&self) -> bool { - true - } - - fn create_groups_accumulator(&self) -> Result> { - use std::ops::BitAndAssign; - - // Note the default value for BitAnd should be all set, i.e. `!0` - macro_rules! helper { - ($t:ty, $dt:expr) => { - Ok(Box::new( - PrimitiveGroupsAccumulator::<$t, _>::new($dt, |x, y| { - x.bitand_assign(y) - }) - .with_starting_value(!0), - )) - }; - } - - let data_type = &self.data_type; - downcast_integer! { - data_type => (helper, data_type), - _ => not_impl_err!( - "GroupsAccumulator not supported for {} with {}", - self.name(), - self.data_type - ), - } - } - - fn reverse_expr(&self) -> Option> { - Some(Arc::new(self.clone())) - } -} - -impl PartialEq for BitAnd { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name - && self.data_type == x.data_type - && self.nullable == x.nullable - && self.expr.eq(&x.expr) - }) - .unwrap_or(false) - } -} - -struct BitAndAccumulator { - value: Option, -} - -impl std::fmt::Debug for BitAndAccumulator { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "BitAndAccumulator({})", T::DATA_TYPE) - } -} - -impl Default for BitAndAccumulator { - fn default() -> Self { - Self { value: None } - } -} - -impl Accumulator for BitAndAccumulator -where - T::Native: std::ops::BitAnd, -{ - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if let Some(x) = bit_and(values[0].as_primitive::()) { - let v = self.value.get_or_insert(x); - *v = *v & x; - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - self.update_batch(states) - } - - fn state(&mut self) -> Result> { - Ok(vec![self.evaluate()?]) - } - - fn evaluate(&mut self) -> Result { - ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) - } - - fn size(&self) -> usize { - std::mem::size_of_val(self) - } -} - -/// BIT_OR aggregate expression -#[derive(Debug, Clone)] -pub struct BitOr { - name: String, - pub data_type: DataType, - expr: Arc, - nullable: bool, -} - -impl BitOr { - /// Create a new BIT_OR aggregate function - pub fn new( - expr: Arc, - name: impl Into, - data_type: DataType, - ) -> Self { - Self { - name: name.into(), - expr, - data_type, - nullable: true, - } - } -} - -impl AggregateExpr for BitOr { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn field(&self) -> Result { - Ok(Field::new( - &self.name, - self.data_type.clone(), - self.nullable, - )) - } - - fn create_accumulator(&self) -> Result> { - macro_rules! helper { - ($t:ty) => { - Ok(Box::>::default()) - }; - } - downcast_integer! { - &self.data_type => (helper), - _ => Err(DataFusionError::NotImplemented(format!( - "BitOrAccumulator not supported for {} with {}", - self.name(), - self.data_type - ))), - } - } - - fn state_fields(&self) -> Result> { - Ok(vec![Field::new( - format_state_name(&self.name, "bit_or"), - self.data_type.clone(), - self.nullable, - )]) - } - - fn expressions(&self) -> Vec> { - vec![self.expr.clone()] - } - - fn name(&self) -> &str { - &self.name - } - - fn groups_accumulator_supported(&self) -> bool { - true - } - - fn create_groups_accumulator(&self) -> Result> { - use std::ops::BitOrAssign; - macro_rules! helper { - ($t:ty, $dt:expr) => { - Ok(Box::new(PrimitiveGroupsAccumulator::<$t, _>::new( - $dt, - |x, y| x.bitor_assign(y), - ))) - }; - } - - let data_type = &self.data_type; - downcast_integer! { - data_type => (helper, data_type), - _ => not_impl_err!( - "GroupsAccumulator not supported for {} with {}", - self.name(), - self.data_type - ), - } - } - - fn reverse_expr(&self) -> Option> { - Some(Arc::new(self.clone())) - } -} - -impl PartialEq for BitOr { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name - && self.data_type == x.data_type - && self.nullable == x.nullable - && self.expr.eq(&x.expr) - }) - .unwrap_or(false) - } -} - -struct BitOrAccumulator { - value: Option, -} - -impl std::fmt::Debug for BitOrAccumulator { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "BitOrAccumulator({})", T::DATA_TYPE) - } -} - -impl Default for BitOrAccumulator { - fn default() -> Self { - Self { value: None } - } -} - -impl Accumulator for BitOrAccumulator -where - T::Native: std::ops::BitOr, -{ - fn state(&mut self) -> Result> { - Ok(vec![self.evaluate()?]) - } - - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if let Some(x) = bit_or(values[0].as_primitive::()) { - let v = self.value.get_or_insert(T::Native::usize_as(0)); - *v = *v | x; - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - self.update_batch(states) - } - - fn evaluate(&mut self) -> Result { - ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) - } - - fn size(&self) -> usize { - std::mem::size_of_val(self) - } -} - -/// BIT_XOR aggregate expression -#[derive(Debug, Clone)] -pub struct BitXor { - name: String, - pub data_type: DataType, - expr: Arc, - nullable: bool, -} - -impl BitXor { - /// Create a new BIT_XOR aggregate function - pub fn new( - expr: Arc, - name: impl Into, - data_type: DataType, - ) -> Self { - Self { - name: name.into(), - expr, - data_type, - nullable: true, - } - } -} - -impl AggregateExpr for BitXor { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn field(&self) -> Result { - Ok(Field::new( - &self.name, - self.data_type.clone(), - self.nullable, - )) - } - - fn create_accumulator(&self) -> Result> { - macro_rules! helper { - ($t:ty) => { - Ok(Box::>::default()) - }; - } - downcast_integer! { - &self.data_type => (helper), - _ => Err(DataFusionError::NotImplemented(format!( - "BitXor not supported for {} with {}", - self.name(), - self.data_type - ))), - } - } - - fn state_fields(&self) -> Result> { - Ok(vec![Field::new( - format_state_name(&self.name, "bit_xor"), - self.data_type.clone(), - self.nullable, - )]) - } - - fn expressions(&self) -> Vec> { - vec![self.expr.clone()] - } - - fn name(&self) -> &str { - &self.name - } - - fn groups_accumulator_supported(&self) -> bool { - true - } - - fn create_groups_accumulator(&self) -> Result> { - use std::ops::BitXorAssign; - macro_rules! helper { - ($t:ty, $dt:expr) => { - Ok(Box::new(PrimitiveGroupsAccumulator::<$t, _>::new( - $dt, - |x, y| x.bitxor_assign(y), - ))) - }; - } - - let data_type = &self.data_type; - downcast_integer! { - data_type => (helper, data_type), - _ => not_impl_err!( - "GroupsAccumulator not supported for {} with {}", - self.name(), - self.data_type - ), - } - } - - fn reverse_expr(&self) -> Option> { - Some(Arc::new(self.clone())) - } -} - -impl PartialEq for BitXor { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name - && self.data_type == x.data_type - && self.nullable == x.nullable - && self.expr.eq(&x.expr) - }) - .unwrap_or(false) - } -} - -struct BitXorAccumulator { - value: Option, -} - -impl std::fmt::Debug for BitXorAccumulator { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "BitXorAccumulator({})", T::DATA_TYPE) - } -} - -impl Default for BitXorAccumulator { - fn default() -> Self { - Self { value: None } - } -} - -impl Accumulator for BitXorAccumulator -where - T::Native: std::ops::BitXor, -{ - fn state(&mut self) -> Result> { - Ok(vec![self.evaluate()?]) - } - - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if let Some(x) = bit_xor(values[0].as_primitive::()) { - let v = self.value.get_or_insert(T::Native::usize_as(0)); - *v = *v ^ x; - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - self.update_batch(states) - } - - fn evaluate(&mut self) -> Result { - ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) - } - - fn size(&self) -> usize { - std::mem::size_of_val(self) - } -} - -/// Expression for a BIT_XOR(DISTINCT) aggregation. -#[derive(Debug, Clone)] -pub struct DistinctBitXor { - name: String, - pub data_type: DataType, - expr: Arc, - nullable: bool, -} - -impl DistinctBitXor { - /// Create a new DistinctBitXor aggregate function - pub fn new( - expr: Arc, - name: impl Into, - data_type: DataType, - ) -> Self { - Self { - name: name.into(), - expr, - data_type, - nullable: true, - } - } -} - -impl AggregateExpr for DistinctBitXor { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn field(&self) -> Result { - Ok(Field::new( - &self.name, - self.data_type.clone(), - self.nullable, - )) - } - - fn create_accumulator(&self) -> Result> { - macro_rules! helper { - ($t:ty) => { - Ok(Box::>::default()) - }; - } - downcast_integer! { - &self.data_type => (helper), - _ => Err(DataFusionError::NotImplemented(format!( - "DistinctBitXorAccumulator not supported for {} with {}", - self.name(), - self.data_type - ))), - } - } - - fn state_fields(&self) -> Result> { - // State field is a List which stores items to rebuild hash set. - Ok(vec![Field::new_list( - format_state_name(&self.name, "bit_xor distinct"), - Field::new("item", self.data_type.clone(), true), - false, - )]) - } - - fn expressions(&self) -> Vec> { - vec![self.expr.clone()] - } - - fn name(&self) -> &str { - &self.name - } -} - -impl PartialEq for DistinctBitXor { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name - && self.data_type == x.data_type - && self.nullable == x.nullable - && self.expr.eq(&x.expr) - }) - .unwrap_or(false) - } -} - -struct DistinctBitXorAccumulator { - values: HashSet, -} - -impl std::fmt::Debug for DistinctBitXorAccumulator { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "DistinctBitXorAccumulator({})", T::DATA_TYPE) - } -} - -impl Default for DistinctBitXorAccumulator { - fn default() -> Self { - Self { - values: HashSet::default(), - } - } -} - -impl Accumulator for DistinctBitXorAccumulator -where - T::Native: std::ops::BitXor + std::hash::Hash + Eq, -{ - fn state(&mut self) -> Result> { - // 1. Stores aggregate state in `ScalarValue::List` - // 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set - let state_out = { - let values = self - .values - .iter() - .map(|x| ScalarValue::new_primitive::(Some(*x), &T::DATA_TYPE)) - .collect::>>()?; - - let arr = ScalarValue::new_list(&values, &T::DATA_TYPE); - vec![ScalarValue::List(arr)] - }; - Ok(state_out) - } - - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if values.is_empty() { - return Ok(()); - } - - let array = values[0].as_primitive::(); - match array.nulls().filter(|x| x.null_count() > 0) { - Some(n) => { - for idx in n.valid_indices() { - self.values.insert(array.value(idx)); - } - } - None => array.values().iter().for_each(|x| { - self.values.insert(*x); - }), - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - if let Some(state) = states.first() { - let list_arr = as_list_array(state)?; - for arr in list_arr.iter().flatten() { - self.update_batch(&[arr])?; - } - } - Ok(()) - } - - fn evaluate(&mut self) -> Result { - let mut acc = T::Native::usize_as(0); - for distinct_value in self.values.iter() { - acc = acc ^ *distinct_value; - } - let v = (!self.values.is_empty()).then_some(acc); - ScalarValue::new_primitive::(v, &T::DATA_TYPE) - } - - fn size(&self) -> usize { - std::mem::size_of_val(self) - + self.values.capacity() * std::mem::size_of::() - } -} diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 592393f800d0..1db590511313 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -42,7 +42,6 @@ pub use crate::aggregate::array_agg_distinct::DistinctArrayAgg; pub use crate::aggregate::array_agg_ordered::OrderSensitiveArrayAgg; pub use crate::aggregate::average::Avg; pub use crate::aggregate::average::AvgAccumulator; -pub use crate::aggregate::bit_and_or_xor::{BitAnd, BitOr, BitXor, DistinctBitXor}; pub use crate::aggregate::bool_and_or::{BoolAnd, BoolOr}; pub use crate::aggregate::build_in::create_aggregate_expr; pub use crate::aggregate::correlation::Correlation; diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 04f7b596fea8..0f23fd5f1648 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -111,9 +111,6 @@ impl From<&AggregateFunction> for protobuf::AggregateFunction { AggregateFunction::Min => Self::Min, AggregateFunction::Max => Self::Max, AggregateFunction::Avg => Self::Avg, - AggregateFunction::BitAnd => Self::BitAnd, - AggregateFunction::BitOr => Self::BitOr, - AggregateFunction::BitXor => Self::BitXor, AggregateFunction::BoolAnd => Self::BoolAnd, AggregateFunction::BoolOr => Self::BoolOr, AggregateFunction::ArrayAgg => Self::ArrayAgg, @@ -390,9 +387,6 @@ pub fn serialize_expr( AggregateFunction::ArrayAgg => protobuf::AggregateFunction::ArrayAgg, AggregateFunction::Min => protobuf::AggregateFunction::Min, AggregateFunction::Max => protobuf::AggregateFunction::Max, - AggregateFunction::BitAnd => protobuf::AggregateFunction::BitAnd, - AggregateFunction::BitOr => protobuf::AggregateFunction::BitOr, - AggregateFunction::BitXor => protobuf::AggregateFunction::BitXor, AggregateFunction::BoolAnd => protobuf::AggregateFunction::BoolAnd, AggregateFunction::BoolOr => protobuf::AggregateFunction::BoolOr, AggregateFunction::Avg => protobuf::AggregateFunction::Avg, From 9c3e9b77582a651c795cb1700dbee7dad30d531a Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Sun, 16 Jun 2024 04:21:42 +0530 Subject: [PATCH 05/16] add support for float --- .../functions-aggregate/src/bit_and_or_xor.rs | 170 ++++++++---------- 1 file changed, 73 insertions(+), 97 deletions(-) diff --git a/datafusion/functions-aggregate/src/bit_and_or_xor.rs b/datafusion/functions-aggregate/src/bit_and_or_xor.rs index a7a4455d1ed1..8a31e72ae93d 100644 --- a/datafusion/functions-aggregate/src/bit_and_or_xor.rs +++ b/datafusion/functions-aggregate/src/bit_and_or_xor.rs @@ -23,9 +23,9 @@ use ahash::RandomState; use datafusion_common::cast::as_list_array; use arrow::array::{Array, ArrayRef, AsArray}; -use arrow::datatypes::{ArrowNativeType, ArrowNumericType}; use arrow::datatypes::{ DataType, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, + ArrowNativeType, ArrowNumericType, Float32Type, Float64Type, }; use arrow_schema::Field; @@ -46,8 +46,10 @@ macro_rules! downcast_logical { DataType::UInt16 => $helper!(UInt16Type, $args.data_type), DataType::UInt32 => $helper!(UInt32Type, $args.data_type), DataType::UInt64 => $helper!(UInt64Type, $args.data_type), + DataType::Float32 => $helper!(Float32Type, $args.data_type) + DataType::Float64 => $helper!(Float64Type, $args.data_type) _ => { - not_impl_err!("Sum not supported for {}: {}", $args.name, $args.data_type) + not_impl_err!("not supported for {}: {}", $args.name, $args.data_type) } } }; @@ -57,7 +59,7 @@ make_udaf_expr_and_func!( BitAnd, bit_and, expression, - "Returns the bit wise and of a group of values.", + "Returns the bit wise AND of a group of values.", bit_and_udaf ); @@ -65,7 +67,7 @@ make_udaf_expr_and_func!( BitOr, bit_or, expression, - "Returns the bit wise or of a group of values.", + "Returns the bit wise OR of a group of values.", bit_or_udaf ); @@ -73,7 +75,7 @@ make_udaf_expr_and_func!( BitXor, bit_xor, expression, - "Returns the bit wise xor of a group of values.", + "Returns the bit wise XOR of a group of values.", bit_xor_udaf ); @@ -110,21 +112,11 @@ impl AggregateUDFImpl for BitAnd { } fn return_type(&self, arg_types: &[DataType]) -> Result { - match &arg_types[0] { - DataType::Int8 | - DataType::Int16 | - DataType::Int32 | - DataType::Int64 | - DataType::UInt8 | - DataType::UInt16 | - DataType::UInt32 | - DataType::UInt64 => { - Ok(arg_types[0].clone()) - } - other => { - exec_err!("[return_type] SUM not supported for {}", other) - } + let arg_type = &arg_types[0]; + if !is_bit_and_or_xor_support_arg_type(arg_type) { + return exec_err!("[return_type] AND not supported for {}", arg_type) } + return Ok(arg_type.clone()) } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { @@ -165,14 +157,6 @@ impl Accumulator for BitAndAccumulator Ok(()) } - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - self.update_batch(states) - } - - fn state(&mut self) -> Result> { - Ok(vec![self.evaluate()?]) - } - fn evaluate(&mut self) -> Result { ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) } @@ -180,6 +164,14 @@ impl Accumulator for BitAndAccumulator fn size(&self) -> usize { std::mem::size_of_val(self) } + + fn state(&mut self) -> Result> { + Ok(vec![self.evaluate()?]) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.update_batch(states) + } } #[derive(Debug)] @@ -215,21 +207,11 @@ impl AggregateUDFImpl for BitOr { } fn return_type(&self, arg_types: &[DataType]) -> Result { - match &arg_types[0] { - DataType::Int8 | - DataType::Int16 | - DataType::Int32 | - DataType::Int64 | - DataType::UInt8 | - DataType::UInt16 | - DataType::UInt32 | - DataType::UInt64 => { - Ok(arg_types[0].clone()) - } - other => { - exec_err!("[return_type] SUM not supported for {}", other) - } + let arg_type = &arg_types[0]; + if !is_bit_and_or_xor_support_arg_type(arg_type) { + return exec_err!("[return_type] OR not supported for {}", arg_type) } + return Ok(arg_type.clone()) } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { @@ -262,10 +244,6 @@ impl Accumulator for BitOrAccumulator where T::Native: std::ops::BitOr, { - fn state(&mut self) -> Result> { - Ok(vec![self.evaluate()?]) - } - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { if let Some(x) = arrow::compute::bit_or(values[0].as_primitive::()) { let v = self.value.get_or_insert(T::Native::usize_as(0)); @@ -274,10 +252,6 @@ impl Accumulator for BitOrAccumulator Ok(()) } - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - self.update_batch(states) - } - fn evaluate(&mut self) -> Result { ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) } @@ -285,6 +259,14 @@ impl Accumulator for BitOrAccumulator fn size(&self) -> usize { std::mem::size_of_val(self) } + + fn state(&mut self) -> Result> { + Ok(vec![self.evaluate()?]) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.update_batch(states) + } } #[derive(Debug)] @@ -320,21 +302,11 @@ impl AggregateUDFImpl for BitXor { } fn return_type(&self, arg_types: &[DataType]) -> Result { - match &arg_types[0] { - DataType::Int8 | - DataType::Int16 | - DataType::Int32 | - DataType::Int64 | - DataType::UInt8 | - DataType::UInt16 | - DataType::UInt32 | - DataType::UInt64 => { - Ok(arg_types[0].clone()) - } - other => { - exec_err!("[return_type] SUM not supported for {}", other) - } + let arg_type = &arg_types[0]; + if !is_bit_and_or_xor_support_arg_type(arg_type) { + return exec_err!("[return_type] XOR not supported for {}", arg_type) } + return Ok(arg_type.clone()) } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { @@ -394,10 +366,6 @@ impl Accumulator for BitXorAccumulator where T::Native: std::ops::BitXor, { - fn state(&mut self) -> Result> { - Ok(vec![self.evaluate()?]) - } - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { if let Some(x) = arrow::compute::bit_xor(values[0].as_primitive::()) { let v = self.value.get_or_insert(T::Native::usize_as(0)); @@ -406,10 +374,6 @@ impl Accumulator for BitXorAccumulator Ok(()) } - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - self.update_batch(states) - } - fn evaluate(&mut self) -> Result { ScalarValue::new_primitive::(self.value, &T::DATA_TYPE) } @@ -417,6 +381,14 @@ impl Accumulator for BitXorAccumulator fn size(&self) -> usize { std::mem::size_of_val(self) } + + fn state(&mut self) -> Result> { + Ok(vec![self.evaluate()?]) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.update_batch(states) + } } struct DistinctBitXorAccumulator { @@ -441,22 +413,6 @@ impl Accumulator for DistinctBitXorAccumulator where T::Native: std::ops::BitXor + std::hash::Hash + Eq, { - fn state(&mut self) -> Result> { - // 1. Stores aggregate state in `ScalarValue::List` - // 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set - let state_out = { - let values = self - .values - .iter() - .map(|x| ScalarValue::new_primitive::(Some(*x), &T::DATA_TYPE)) - .collect::>>()?; - - let arr = ScalarValue::new_list(&values, &T::DATA_TYPE); - vec![ScalarValue::List(arr)] - }; - Ok(state_out) - } - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { if values.is_empty() { return Ok(()); @@ -476,16 +432,6 @@ impl Accumulator for DistinctBitXorAccumulator Ok(()) } - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - if let Some(state) = states.first() { - let list_arr = as_list_array(state)?; - for arr in list_arr.iter().flatten() { - self.update_batch(&[arr])?; - } - } - Ok(()) - } - fn evaluate(&mut self) -> Result { let mut acc = T::Native::usize_as(0); for distinct_value in self.values.iter() { @@ -499,4 +445,34 @@ impl Accumulator for DistinctBitXorAccumulator std::mem::size_of_val(self) + self.values.capacity() * std::mem::size_of::() } + + fn state(&mut self) -> Result> { + // 1. Stores aggregate state in `ScalarValue::List` + // 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set + let state_out = { + let values = self + .values + .iter() + .map(|x| ScalarValue::new_primitive::(Some(*x), &T::DATA_TYPE)) + .collect::>>()?; + + let arr = ScalarValue::new_list(&values, &T::DATA_TYPE); + vec![ScalarValue::List(arr)] + }; + Ok(state_out) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + if let Some(state) = states.first() { + let list_arr = as_list_array(state)?; + for arr in list_arr.iter().flatten() { + self.update_batch(&[arr])?; + } + } + Ok(()) + } +} + +fn is_bit_and_or_xor_support_arg_type(arg_type: &DataType) -> bool { + NUMERICS.contains(arg_type) } \ No newline at end of file From d481ccc57802988f2ba9e6863649b3b6e07af3b5 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Sun, 16 Jun 2024 04:31:59 +0530 Subject: [PATCH 06/16] removing support for float --- .../functions-aggregate/src/bit_and_or_xor.rs | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/datafusion/functions-aggregate/src/bit_and_or_xor.rs b/datafusion/functions-aggregate/src/bit_and_or_xor.rs index 8a31e72ae93d..f4e0398edd55 100644 --- a/datafusion/functions-aggregate/src/bit_and_or_xor.rs +++ b/datafusion/functions-aggregate/src/bit_and_or_xor.rs @@ -19,20 +19,20 @@ use std::any::Any; use std::collections::HashSet; -use ahash::RandomState; -use datafusion_common::cast::as_list_array; +use ahash::RandomState; use arrow::array::{Array, ArrayRef, AsArray}; use arrow::datatypes::{ - DataType, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, - ArrowNativeType, ArrowNumericType, Float32Type, Float64Type, + ArrowNativeType, ArrowNumericType, DataType, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, + UInt64Type, UInt8Type, }; use arrow_schema::Field; use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue}; +use datafusion_common::cast::as_list_array; use datafusion_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; -use datafusion_expr::type_coercion::aggregates::NUMERICS; +use datafusion_expr::type_coercion::aggregates::INTEGERS; use datafusion_expr::utils::format_state_name; macro_rules! downcast_logical { @@ -46,8 +46,6 @@ macro_rules! downcast_logical { DataType::UInt16 => $helper!(UInt16Type, $args.data_type), DataType::UInt32 => $helper!(UInt32Type, $args.data_type), DataType::UInt64 => $helper!(UInt64Type, $args.data_type), - DataType::Float32 => $helper!(Float32Type, $args.data_type) - DataType::Float64 => $helper!(Float64Type, $args.data_type) _ => { not_impl_err!("not supported for {}: {}", $args.name, $args.data_type) } @@ -87,7 +85,7 @@ pub struct BitAnd { impl BitAnd { pub fn new() -> Self { Self { - signature: Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable) + signature: Signature::uniform(1, INTEGERS.to_vec(), Volatility::Immutable) } } } @@ -182,7 +180,7 @@ pub struct BitOr { impl BitOr { pub fn new() -> Self { Self { - signature: Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable) + signature: Signature::uniform(1, INTEGERS.to_vec(), Volatility::Immutable) } } } @@ -277,7 +275,7 @@ pub struct BitXor { impl BitXor { pub fn new() -> Self { Self { - signature: Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable) + signature: Signature::uniform(1, INTEGERS.to_vec(), Volatility::Immutable) } } } @@ -474,5 +472,5 @@ impl Accumulator for DistinctBitXorAccumulator } fn is_bit_and_or_xor_support_arg_type(arg_type: &DataType) -> bool { - NUMERICS.contains(arg_type) + INTEGERS.contains(arg_type) } \ No newline at end of file From 0d25cc0caaa96489c752fe7d94142f623a849fa8 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Sun, 16 Jun 2024 16:34:26 +0530 Subject: [PATCH 07/16] refactor helper macros --- .../functions-aggregate/src/bit_and_or_xor.rs | 74 +++++++++---------- 1 file changed, 36 insertions(+), 38 deletions(-) diff --git a/datafusion/functions-aggregate/src/bit_and_or_xor.rs b/datafusion/functions-aggregate/src/bit_and_or_xor.rs index f4e0398edd55..d1e8bc188dfb 100644 --- a/datafusion/functions-aggregate/src/bit_and_or_xor.rs +++ b/datafusion/functions-aggregate/src/bit_and_or_xor.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Defines BitAnd, BitOr, and BitXor Aggregate accumulators +//! Defines `BitAnd`, `BitOr`, `BitXor` and `BitXor DISTINCT` aggregate accumulators use std::any::Any; use std::collections::HashSet; @@ -35,19 +35,38 @@ use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::type_coercion::aggregates::INTEGERS; use datafusion_expr::utils::format_state_name; -macro_rules! downcast_logical { - ($args:ident, $helper:ident) => { +#[derive()] +enum BitwiseOperatorType { + And, + Or, + Xor, + XorDistinct, +} + +macro_rules! accumulator_helper { + ($t:ty, $opr:expr) => { + match $opr { + BitwiseOperatorType::And => Ok(Box::new(BitAndAccumulator::<$t>::default())), + BitwiseOperatorType::Or => Ok(Box::>::default()), + BitwiseOperatorType::Xor => Ok(Box::new(BitXorAccumulator::<$t>::default())), + BitwiseOperatorType::XorDistinct => Ok(Box::new(DistinctBitXorAccumulator::<$t>::default())), + } + }; +} + +macro_rules! downcast_bitwise_accumulator { + ($args:ident, $opr:expr) => { match $args.data_type { - DataType::Int8 => $helper!(Int8Type, $args.data_type), - DataType::Int16 => $helper!(Int16Type, $args.data_type), - DataType::Int32 => $helper!(Int32Type, $args.data_type), - DataType::Int64 => $helper!(Int64Type, $args.data_type), - DataType::UInt8 => $helper!(UInt8Type, $args.data_type), - DataType::UInt16 => $helper!(UInt16Type, $args.data_type), - DataType::UInt32 => $helper!(UInt32Type, $args.data_type), - DataType::UInt64 => $helper!(UInt64Type, $args.data_type), + DataType::Int8 => accumulator_helper!(Int8Type, $opr), + DataType::Int16 => accumulator_helper!(Int16Type, $opr), + DataType::Int32 => accumulator_helper!(Int32Type, $opr), + DataType::Int64 => accumulator_helper!(Int64Type, $opr), + DataType::UInt8 => accumulator_helper!(UInt8Type, $opr), + DataType::UInt16 => accumulator_helper!(UInt16Type, $opr), + DataType::UInt32 => accumulator_helper!(UInt32Type, $opr), + DataType::UInt64 => accumulator_helper!(UInt64Type, $opr), _ => { - not_impl_err!("not supported for {}: {}", $args.name, $args.data_type) + not_impl_err!("{} not supported for {}: {}", stringify!($opr) ,$args.name, $args.data_type) } } }; @@ -114,16 +133,11 @@ impl AggregateUDFImpl for BitAnd { if !is_bit_and_or_xor_support_arg_type(arg_type) { return exec_err!("[return_type] AND not supported for {}", arg_type) } - return Ok(arg_type.clone()) + Ok(arg_type.clone()) } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { - macro_rules! helper { - ($t:ty, $dt:expr) => { - Ok(Box::new(BitAndAccumulator::<$t>::default())) - }; - } - downcast_logical!(acc_args, helper) + downcast_bitwise_accumulator!(acc_args, BitwiseOperatorType::And) } } @@ -213,12 +227,7 @@ impl AggregateUDFImpl for BitOr { } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { - macro_rules! helper { - ($t:ty, $dt:expr) => { - Ok(Box::new(BitOrAccumulator::<$t>::default())) - }; - } - downcast_logical!(acc_args, helper) + downcast_bitwise_accumulator!(acc_args, BitwiseOperatorType::Or) } } @@ -308,21 +317,10 @@ impl AggregateUDFImpl for BitXor { } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { - if acc_args.is_distinct { - macro_rules! helper { - ($t:ty, $dt:expr) => { - Ok(Box::new(DistinctBitXorAccumulator::<$t>::default())) - }; - } - downcast_logical!(acc_args, helper) + downcast_bitwise_accumulator!(acc_args, BitwiseOperatorType::XorDistinct) } else { - macro_rules! helper { - ($t:ty, $dt:expr) => { - Ok(Box::new(BitXorAccumulator::<$t>::default())) - }; - } - downcast_logical!(acc_args, helper) + downcast_bitwise_accumulator!(acc_args, BitwiseOperatorType::Xor) } } From 5d339cb6840322142342fab75295b90bfde892bc Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Sun, 16 Jun 2024 16:43:46 +0530 Subject: [PATCH 08/16] clippy'fy --- .../functions-aggregate/src/bit_and_or_xor.rs | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/datafusion/functions-aggregate/src/bit_and_or_xor.rs b/datafusion/functions-aggregate/src/bit_and_or_xor.rs index d1e8bc188dfb..3db5c83b204c 100644 --- a/datafusion/functions-aggregate/src/bit_and_or_xor.rs +++ b/datafusion/functions-aggregate/src/bit_and_or_xor.rs @@ -43,17 +43,22 @@ enum BitwiseOperatorType { XorDistinct, } +/// `accumulator_helper` is a macro accepting ([ArrowPrimitiveType], [BitwiseOperatorType]) macro_rules! accumulator_helper { ($t:ty, $opr:expr) => { match $opr { - BitwiseOperatorType::And => Ok(Box::new(BitAndAccumulator::<$t>::default())), + BitwiseOperatorType::And => Ok(Box::>::default()), BitwiseOperatorType::Or => Ok(Box::>::default()), - BitwiseOperatorType::Xor => Ok(Box::new(BitXorAccumulator::<$t>::default())), - BitwiseOperatorType::XorDistinct => Ok(Box::new(DistinctBitXorAccumulator::<$t>::default())), + BitwiseOperatorType::Xor => Ok(Box::>::default()), + BitwiseOperatorType::XorDistinct => Ok(Box::>::default()), } }; } +/// AND, OR and XOR only supports a subset of numeric types, instead relying on type coercion +/// +/// `args` is [AccumulatorArgs] +/// `opr` is [BitwiseOperatorType] macro_rules! downcast_bitwise_accumulator { ($args:ident, $opr:expr) => { match $args.data_type { @@ -76,7 +81,7 @@ make_udaf_expr_and_func!( BitAnd, bit_and, expression, - "Returns the bit wise AND of a group of values.", + "Returns the bitwise AND of a group of values.", bit_and_udaf ); @@ -84,7 +89,7 @@ make_udaf_expr_and_func!( BitOr, bit_or, expression, - "Returns the bit wise OR of a group of values.", + "Returns the bitwise OR of a group of values.", bit_or_udaf ); @@ -92,7 +97,7 @@ make_udaf_expr_and_func!( BitXor, bit_xor, expression, - "Returns the bit wise XOR of a group of values.", + "Returns the bitwise XOR of a group of values.", bit_xor_udaf ); @@ -223,7 +228,7 @@ impl AggregateUDFImpl for BitOr { if !is_bit_and_or_xor_support_arg_type(arg_type) { return exec_err!("[return_type] OR not supported for {}", arg_type) } - return Ok(arg_type.clone()) + Ok(arg_type.clone()) } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { @@ -313,7 +318,7 @@ impl AggregateUDFImpl for BitXor { if !is_bit_and_or_xor_support_arg_type(arg_type) { return exec_err!("[return_type] XOR not supported for {}", arg_type) } - return Ok(arg_type.clone()) + Ok(arg_type.clone()) } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { From 74fc16cfbd56c60df0da767cdb47d28711ab64ca Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Sun, 16 Jun 2024 18:23:18 +0530 Subject: [PATCH 09/16] simplify Bitwise operation --- .../functions-aggregate/src/bit_and_or_xor.rs | 227 +++++------------- 1 file changed, 65 insertions(+), 162 deletions(-) diff --git a/datafusion/functions-aggregate/src/bit_and_or_xor.rs b/datafusion/functions-aggregate/src/bit_and_or_xor.rs index 3db5c83b204c..218e09f6cada 100644 --- a/datafusion/functions-aggregate/src/bit_and_or_xor.rs +++ b/datafusion/functions-aggregate/src/bit_and_or_xor.rs @@ -19,6 +19,7 @@ use std::any::Any; use std::collections::HashSet; +use std::fmt::{Display, Formatter}; use ahash::RandomState; use arrow::array::{Array, ArrayRef, AsArray}; @@ -35,22 +36,15 @@ use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::type_coercion::aggregates::INTEGERS; use datafusion_expr::utils::format_state_name; -#[derive()] -enum BitwiseOperatorType { - And, - Or, - Xor, - XorDistinct, -} -/// `accumulator_helper` is a macro accepting ([ArrowPrimitiveType], [BitwiseOperatorType]) +/// `accumulator_helper` is a macro accepting ([ArrowPrimitiveType], [BitwiseOperationType]) macro_rules! accumulator_helper { ($t:ty, $opr:expr) => { match $opr { - BitwiseOperatorType::And => Ok(Box::>::default()), - BitwiseOperatorType::Or => Ok(Box::>::default()), - BitwiseOperatorType::Xor => Ok(Box::>::default()), - BitwiseOperatorType::XorDistinct => Ok(Box::>::default()), + BitwiseOperationType::And => Ok(Box::>::default()), + BitwiseOperationType::Or => Ok(Box::>::default()), + BitwiseOperationType::Xor => Ok(Box::>::default()), + BitwiseOperationType::XorDistinct => Ok(Box::>::default()), } }; } @@ -58,7 +52,7 @@ macro_rules! accumulator_helper { /// AND, OR and XOR only supports a subset of numeric types, instead relying on type coercion /// /// `args` is [AccumulatorArgs] -/// `opr` is [BitwiseOperatorType] +/// `opr` is [BitwiseOperationType] macro_rules! downcast_bitwise_accumulator { ($args:ident, $opr:expr) => { match $args.data_type { @@ -77,56 +71,56 @@ macro_rules! downcast_bitwise_accumulator { }; } -make_udaf_expr_and_func!( - BitAnd, - bit_and, - expression, - "Returns the bitwise AND of a group of values.", - bit_and_udaf -); - -make_udaf_expr_and_func!( - BitOr, - bit_or, - expression, - "Returns the bitwise OR of a group of values.", - bit_or_udaf -); - -make_udaf_expr_and_func!( - BitXor, - bit_xor, - expression, - "Returns the bitwise XOR of a group of values.", - bit_xor_udaf -); + +macro_rules! make_bitwise_udaf_expr_and_func { + ($EXPR_FN:ident, $AGGREGATE_UDF_FN:ident, $OPR_TYPE:expr) => { + make_udaf_expr!($EXPR_FN, expr_y expr_x, concat!("Returns the bitwise", stringify!($OPR_TYPE), "of a group of values"), $AGGREGATE_UDF_FN); + create_func!($EXPR_FN, $AGGREGATE_UDF_FN, BitwiseOperation::new($OPR_TYPE, stringify!($EXPR_FN))); + } +} + +make_bitwise_udaf_expr_and_func!(bit_and, bit_and_udaf, BitwiseOperationType::And); +make_bitwise_udaf_expr_and_func!(bit_or, bit_or_udaf, BitwiseOperationType::Or); +make_bitwise_udaf_expr_and_func!(bit_xor, bit_xor_udaf, BitwiseOperationType::Xor); + +#[derive(Debug, Clone, Eq, PartialEq)] +enum BitwiseOperationType { + And, + Or, + Xor, + XorDistinct, +} + +impl Display for BitwiseOperationType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} #[derive(Debug)] -pub struct BitAnd { +struct BitwiseOperation { signature: Signature, + operation: BitwiseOperationType, + func_name: &'static str, } -impl BitAnd { - pub fn new() -> Self { +impl BitwiseOperation { + pub fn new(operator: BitwiseOperationType, func_name: &'static str) -> Self { Self { - signature: Signature::uniform(1, INTEGERS.to_vec(), Volatility::Immutable) + operation: operator, + signature: Signature::uniform(1, INTEGERS.to_vec(), Volatility::Immutable), + func_name, } } } -impl Default for BitAnd { - fn default() -> Self { - Self::new() - } -} - -impl AggregateUDFImpl for BitAnd { +impl AggregateUDFImpl for BitwiseOperation { fn as_any(&self) -> &dyn Any { self } fn name(&self) -> &str { - "bit_and" + self.func_name } fn signature(&self) -> &Signature { @@ -136,13 +130,33 @@ impl AggregateUDFImpl for BitAnd { fn return_type(&self, arg_types: &[DataType]) -> Result { let arg_type = &arg_types[0]; if !is_bit_and_or_xor_support_arg_type(arg_type) { - return exec_err!("[return_type] AND not supported for {}", arg_type) + return exec_err!("[return_type] {} not supported for {}", self.operation.to_string() ,arg_type); } Ok(arg_type.clone()) } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { - downcast_bitwise_accumulator!(acc_args, BitwiseOperatorType::And) + if acc_args.is_distinct && self.operation == BitwiseOperationType::Xor { + downcast_bitwise_accumulator!(acc_args, BitwiseOperationType::XorDistinct) + } else { + downcast_bitwise_accumulator!(acc_args, self.operation) + } + } + + fn state_fields(&self, args: StateFieldsArgs) -> Result> { + if self.operation == BitwiseOperationType::Xor && args.is_distinct { + Ok(vec![Field::new_list( + format_state_name(args.name, "xor distinct"), + Field::new("item", args.return_type.clone(), true), + false, + )]) + } else { + Ok(vec![Field::new( + format_state_name(args.name, self.operation.to_string().as_str()), + args.return_type.clone(), + true, + )]) + } } } @@ -191,51 +205,6 @@ impl Accumulator for BitAndAccumulator } } -#[derive(Debug)] -pub struct BitOr { - signature: Signature, -} - -impl BitOr { - pub fn new() -> Self { - Self { - signature: Signature::uniform(1, INTEGERS.to_vec(), Volatility::Immutable) - } - } -} - -impl Default for BitOr { - fn default() -> Self { - Self::new() - } -} - -impl AggregateUDFImpl for BitOr { - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - "bit_or" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, arg_types: &[DataType]) -> Result { - let arg_type = &arg_types[0]; - if !is_bit_and_or_xor_support_arg_type(arg_type) { - return exec_err!("[return_type] OR not supported for {}", arg_type) - } - Ok(arg_type.clone()) - } - - fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { - downcast_bitwise_accumulator!(acc_args, BitwiseOperatorType::Or) - } -} - struct BitOrAccumulator { value: Option, } @@ -281,72 +250,6 @@ impl Accumulator for BitOrAccumulator } } -#[derive(Debug)] -pub struct BitXor { - signature: Signature, -} - -impl BitXor { - pub fn new() -> Self { - Self { - signature: Signature::uniform(1, INTEGERS.to_vec(), Volatility::Immutable) - } - } -} - -impl Default for BitXor { - fn default() -> Self { - Self::new() - } -} - -impl AggregateUDFImpl for BitXor { - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - "bit_xor" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, arg_types: &[DataType]) -> Result { - let arg_type = &arg_types[0]; - if !is_bit_and_or_xor_support_arg_type(arg_type) { - return exec_err!("[return_type] XOR not supported for {}", arg_type) - } - Ok(arg_type.clone()) - } - - fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { - if acc_args.is_distinct { - downcast_bitwise_accumulator!(acc_args, BitwiseOperatorType::XorDistinct) - } else { - downcast_bitwise_accumulator!(acc_args, BitwiseOperatorType::Xor) - } - } - - fn state_fields(&self, args: StateFieldsArgs) -> Result> { - if args.is_distinct { - Ok(vec![Field::new_list( - format_state_name(args.name, "xor distinct"), - Field::new("item", args.return_type.clone(), true), - false, - )]) - } else { - Ok(vec![Field::new( - format_state_name(args.name, "xor"), - args.return_type.clone(), - true, - )]) - } - } - -} - struct BitXorAccumulator { value: Option, } @@ -412,7 +315,7 @@ impl Default for DistinctBitXorAccumulator { impl Accumulator for DistinctBitXorAccumulator where - T::Native: std::ops::BitXor + std::hash::Hash + Eq, + T::Native: std::ops::BitXor + std::hash::Hash + Eq, { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { if values.is_empty() { From ae6879e474472d83551eb2975b108fd221c895d6 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Sun, 16 Jun 2024 21:50:39 +0530 Subject: [PATCH 10/16] add documentation --- datafusion/functions-aggregate/src/bit_and_or_xor.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-aggregate/src/bit_and_or_xor.rs b/datafusion/functions-aggregate/src/bit_and_or_xor.rs index 218e09f6cada..77f1b2f763da 100644 --- a/datafusion/functions-aggregate/src/bit_and_or_xor.rs +++ b/datafusion/functions-aggregate/src/bit_and_or_xor.rs @@ -49,7 +49,7 @@ macro_rules! accumulator_helper { }; } -/// AND, OR and XOR only supports a subset of numeric types, instead relying on type coercion +/// AND, OR and XOR only supports a subset of numeric types /// /// `args` is [AccumulatorArgs] /// `opr` is [BitwiseOperationType] @@ -71,7 +71,11 @@ macro_rules! downcast_bitwise_accumulator { }; } - +/// Simplifies the creation of User-Defined Aggregate Functions (UDAFs) for performing bitwise operations in a declarative manner. +/// +/// `EXPR_FN` identifier used to name the generated expression function. +/// `AGGREGATE_UDF_FN` is an identifier used to name the underlying UDAF function. +/// `OPR_TYPE` is an expression that evaluates to the type of bitwise operation to be performed. macro_rules! make_bitwise_udaf_expr_and_func { ($EXPR_FN:ident, $AGGREGATE_UDF_FN:ident, $OPR_TYPE:expr) => { make_udaf_expr!($EXPR_FN, expr_y expr_x, concat!("Returns the bitwise", stringify!($OPR_TYPE), "of a group of values"), $AGGREGATE_UDF_FN); @@ -83,11 +87,13 @@ make_bitwise_udaf_expr_and_func!(bit_and, bit_and_udaf, BitwiseOperationType::An make_bitwise_udaf_expr_and_func!(bit_or, bit_or_udaf, BitwiseOperationType::Or); make_bitwise_udaf_expr_and_func!(bit_xor, bit_xor_udaf, BitwiseOperationType::Xor); +/// The different types of bitwise operations that can be performed. #[derive(Debug, Clone, Eq, PartialEq)] enum BitwiseOperationType { And, Or, Xor, + /// `XorDistinct` is a variation of the bitwise XOR operation specifically for the scenario of BitXor DISTINCT XorDistinct, } @@ -97,9 +103,11 @@ impl Display for BitwiseOperationType { } } +/// [BitwiseOperation] struct encapsulates information about a bitwise operation. #[derive(Debug)] struct BitwiseOperation { signature: Signature, + /// `operation` indicates the type of bitwise operation to be performed. operation: BitwiseOperationType, func_name: &'static str, } From 41b64c93393355d54bf14b3d4a131f6611f01d36 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Sun, 16 Jun 2024 23:30:08 +0530 Subject: [PATCH 11/16] formatting --- .../functions-aggregate/src/bit_and_or_xor.rs | 52 +++++++++++-------- datafusion/functions-aggregate/src/lib.rs | 6 +-- .../physical-expr/src/aggregate/build_in.rs | 3 +- .../proto/src/physical_plan/to_proto.rs | 10 ++-- 4 files changed, 40 insertions(+), 31 deletions(-) diff --git a/datafusion/functions-aggregate/src/bit_and_or_xor.rs b/datafusion/functions-aggregate/src/bit_and_or_xor.rs index 77f1b2f763da..3fc60838b9c4 100644 --- a/datafusion/functions-aggregate/src/bit_and_or_xor.rs +++ b/datafusion/functions-aggregate/src/bit_and_or_xor.rs @@ -24,27 +24,28 @@ use std::fmt::{Display, Formatter}; use ahash::RandomState; use arrow::array::{Array, ArrayRef, AsArray}; use arrow::datatypes::{ - ArrowNativeType, ArrowNumericType, DataType, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, - UInt64Type, UInt8Type, + ArrowNativeType, ArrowNumericType, ArrowPrimitiveType, DataType, Int16Type, + Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; use arrow_schema::Field; -use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue}; use datafusion_common::cast::as_list_array; -use datafusion_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility}; +use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::type_coercion::aggregates::INTEGERS; use datafusion_expr::utils::format_state_name; - +use datafusion_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility}; /// `accumulator_helper` is a macro accepting ([ArrowPrimitiveType], [BitwiseOperationType]) macro_rules! accumulator_helper { ($t:ty, $opr:expr) => { match $opr { - BitwiseOperationType::And => Ok(Box::>::default()), - BitwiseOperationType::Or => Ok(Box::>::default()), - BitwiseOperationType::Xor => Ok(Box::>::default()), - BitwiseOperationType::XorDistinct => Ok(Box::>::default()), + BitwiseOperationType::And => Ok(Box::>::default()), + BitwiseOperationType::Or => Ok(Box::>::default()), + BitwiseOperationType::Xor => Ok(Box::>::default()), + BitwiseOperationType::XorDistinct => { + Ok(Box::>::default()) + } } }; } @@ -62,10 +63,15 @@ macro_rules! downcast_bitwise_accumulator { DataType::Int64 => accumulator_helper!(Int64Type, $opr), DataType::UInt8 => accumulator_helper!(UInt8Type, $opr), DataType::UInt16 => accumulator_helper!(UInt16Type, $opr), - DataType::UInt32 => accumulator_helper!(UInt32Type, $opr), + DataType::UInt32 => accumulator_helper!(UInt32Type, $opr), DataType::UInt64 => accumulator_helper!(UInt64Type, $opr), _ => { - not_impl_err!("{} not supported for {}: {}", stringify!($opr) ,$args.name, $args.data_type) + not_impl_err!( + "{} not supported for {}: {}", + stringify!($opr), + $args.name, + $args.data_type + ) } } }; @@ -138,7 +144,11 @@ impl AggregateUDFImpl for BitwiseOperation { fn return_type(&self, arg_types: &[DataType]) -> Result { let arg_type = &arg_types[0]; if !is_bit_and_or_xor_support_arg_type(arg_type) { - return exec_err!("[return_type] {} not supported for {}", self.operation.to_string() ,arg_type); + return exec_err!( + "[return_type] {} not supported for {}", + self.operation.to_string(), + arg_type + ); } Ok(arg_type.clone()) } @@ -185,8 +195,8 @@ impl Default for BitAndAccumulator { } impl Accumulator for BitAndAccumulator - where - T::Native: std::ops::BitAnd, +where + T::Native: std::ops::BitAnd, { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { if let Some(x) = arrow::compute::bit_and(values[0].as_primitive::()) { @@ -230,8 +240,8 @@ impl Default for BitOrAccumulator { } impl Accumulator for BitOrAccumulator - where - T::Native: std::ops::BitOr, +where + T::Native: std::ops::BitOr, { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { if let Some(x) = arrow::compute::bit_or(values[0].as_primitive::()) { @@ -275,8 +285,8 @@ impl Default for BitXorAccumulator { } impl Accumulator for BitXorAccumulator - where - T::Native: std::ops::BitXor, +where + T::Native: std::ops::BitXor, { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { if let Some(x) = arrow::compute::bit_xor(values[0].as_primitive::()) { @@ -322,8 +332,8 @@ impl Default for DistinctBitXorAccumulator { } impl Accumulator for DistinctBitXorAccumulator - where - T::Native: std::ops::BitXor + std::hash::Hash + Eq, +where + T::Native: std::ops::BitXor + std::hash::Hash + Eq, { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { if values.is_empty() { @@ -387,4 +397,4 @@ impl Accumulator for DistinctBitXorAccumulator fn is_bit_and_or_xor_support_arg_type(arg_type: &DataType) -> bool { INTEGERS.contains(arg_type) -} \ No newline at end of file +} diff --git a/datafusion/functions-aggregate/src/lib.rs b/datafusion/functions-aggregate/src/lib.rs index 8aaf04b802b3..990303bd1de3 100644 --- a/datafusion/functions-aggregate/src/lib.rs +++ b/datafusion/functions-aggregate/src/lib.rs @@ -85,6 +85,9 @@ pub mod expr_fn { pub use super::approx_median::approx_median; pub use super::approx_percentile_cont::approx_percentile_cont; pub use super::approx_percentile_cont_with_weight::approx_percentile_cont_with_weight; + pub use super::bit_and_or_xor::bit_and; + pub use super::bit_and_or_xor::bit_or; + pub use super::bit_and_or_xor::bit_xor; pub use super::count::count; pub use super::count::count_distinct; pub use super::covariance::covar_pop; @@ -106,9 +109,6 @@ pub mod expr_fn { pub use super::sum::sum; pub use super::variance::var_pop; pub use super::variance::var_sample; - pub use super::bit_and_or_xor::bit_and; - pub use super::bit_and_or_xor::bit_or; - pub use super::bit_and_or_xor::bit_xor; } /// Returns all default aggregate functions diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs b/datafusion/physical-expr/src/aggregate/build_in.rs index c75c624e76bb..6c01decdbf95 100644 --- a/datafusion/physical-expr/src/aggregate/build_in.rs +++ b/datafusion/physical-expr/src/aggregate/build_in.rs @@ -182,8 +182,7 @@ mod tests { use datafusion_expr::{type_coercion, Signature}; use crate::expressions::{ - try_cast, ArrayAgg, Avg, BoolAnd, BoolOr, - DistinctArrayAgg, Max, Min, + try_cast, ArrayAgg, Avg, BoolAnd, BoolOr, DistinctArrayAgg, Max, Min, }; use super::*; diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index d4a237132826..886179bf5627 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -23,11 +23,11 @@ use datafusion::datasource::file_format::parquet::ParquetSink; use datafusion::physical_expr::window::{NthValueKind, SlidingAggregateWindowExpr}; use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr}; use datafusion::physical_plan::expressions::{ - ArrayAgg, Avg, BinaryExpr, BoolAnd, BoolOr, CaseExpr, - CastExpr, Column, Correlation, CumeDist, DistinctArrayAgg, Grouping, - InListExpr, IsNotNullExpr, IsNullExpr, Literal, Max, Min, NegativeExpr, NotExpr, - NthValue, NthValueAgg, Ntile, OrderSensitiveArrayAgg, Rank, RankType, RowNumber, - StringAgg, TryCastExpr, WindowShift, + ArrayAgg, Avg, BinaryExpr, BoolAnd, BoolOr, CaseExpr, CastExpr, Column, Correlation, + CumeDist, DistinctArrayAgg, Grouping, InListExpr, IsNotNullExpr, IsNullExpr, Literal, + Max, Min, NegativeExpr, NotExpr, NthValue, NthValueAgg, Ntile, + OrderSensitiveArrayAgg, Rank, RankType, RowNumber, StringAgg, TryCastExpr, + WindowShift, }; use datafusion::physical_plan::udaf::AggregateFunctionExpr; use datafusion::physical_plan::windows::{BuiltInWindowExpr, PlainAggregateWindowExpr}; From 66c5b8a12136df3351934cc2ad234042ad5ebc1d Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Sun, 16 Jun 2024 23:33:38 +0530 Subject: [PATCH 12/16] fix lint issue --- datafusion/functions-aggregate/src/bit_and_or_xor.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-aggregate/src/bit_and_or_xor.rs b/datafusion/functions-aggregate/src/bit_and_or_xor.rs index 3fc60838b9c4..8c21cbfcd0ec 100644 --- a/datafusion/functions-aggregate/src/bit_and_or_xor.rs +++ b/datafusion/functions-aggregate/src/bit_and_or_xor.rs @@ -24,8 +24,8 @@ use std::fmt::{Display, Formatter}; use ahash::RandomState; use arrow::array::{Array, ArrayRef, AsArray}; use arrow::datatypes::{ - ArrowNativeType, ArrowNumericType, ArrowPrimitiveType, DataType, Int16Type, - Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, + ArrowNativeType, ArrowNumericType, DataType, Int16Type, Int32Type, Int64Type, + Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; use arrow_schema::Field; @@ -36,7 +36,7 @@ use datafusion_expr::type_coercion::aggregates::INTEGERS; use datafusion_expr::utils::format_state_name; use datafusion_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility}; -/// `accumulator_helper` is a macro accepting ([ArrowPrimitiveType], [BitwiseOperationType]) +/// `accumulator_helper` is a macro accepting (ArrowPrimitiveType, BitwiseOperationType) macro_rules! accumulator_helper { ($t:ty, $opr:expr) => { match $opr { From e453f7889c234086ea68ce547dc34b0fef9973d4 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Mon, 17 Jun 2024 11:05:10 +0530 Subject: [PATCH 13/16] remove XorDistinct --- .../functions-aggregate/src/bit_and_or_xor.rs | 52 ++++++++----------- 1 file changed, 23 insertions(+), 29 deletions(-) diff --git a/datafusion/functions-aggregate/src/bit_and_or_xor.rs b/datafusion/functions-aggregate/src/bit_and_or_xor.rs index 8c21cbfcd0ec..3c9e338bc026 100644 --- a/datafusion/functions-aggregate/src/bit_and_or_xor.rs +++ b/datafusion/functions-aggregate/src/bit_and_or_xor.rs @@ -29,22 +29,25 @@ use arrow::datatypes::{ }; use arrow_schema::Field; -use datafusion_common::cast::as_list_array; use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue}; +use datafusion_common::cast::as_list_array; +use datafusion_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::type_coercion::aggregates::INTEGERS; use datafusion_expr::utils::format_state_name; -use datafusion_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility}; -/// `accumulator_helper` is a macro accepting (ArrowPrimitiveType, BitwiseOperationType) +/// `accumulator_helper` is a macro accepting (ArrowPrimitiveType, BitwiseOperationType, bool) macro_rules! accumulator_helper { - ($t:ty, $opr:expr) => { + ($t:ty, $opr:expr, $is_distinct: expr) => { match $opr { BitwiseOperationType::And => Ok(Box::>::default()), BitwiseOperationType::Or => Ok(Box::>::default()), - BitwiseOperationType::Xor => Ok(Box::>::default()), - BitwiseOperationType::XorDistinct => { - Ok(Box::>::default()) + BitwiseOperationType::Xor => { + if $is_distinct { + Ok(Box::>::default()) + } else { + Ok(Box::>::default()) + } } } }; @@ -54,17 +57,18 @@ macro_rules! accumulator_helper { /// /// `args` is [AccumulatorArgs] /// `opr` is [BitwiseOperationType] +/// `is_distinct` is boolean value indicating whether the operation is distinct or not. macro_rules! downcast_bitwise_accumulator { - ($args:ident, $opr:expr) => { + ($args:ident, $opr:expr, $is_distinct: expr) => { match $args.data_type { - DataType::Int8 => accumulator_helper!(Int8Type, $opr), - DataType::Int16 => accumulator_helper!(Int16Type, $opr), - DataType::Int32 => accumulator_helper!(Int32Type, $opr), - DataType::Int64 => accumulator_helper!(Int64Type, $opr), - DataType::UInt8 => accumulator_helper!(UInt8Type, $opr), - DataType::UInt16 => accumulator_helper!(UInt16Type, $opr), - DataType::UInt32 => accumulator_helper!(UInt32Type, $opr), - DataType::UInt64 => accumulator_helper!(UInt64Type, $opr), + DataType::Int8 => accumulator_helper!(Int8Type, $opr, $is_distinct), + DataType::Int16 => accumulator_helper!(Int16Type, $opr, $is_distinct), + DataType::Int32 => accumulator_helper!(Int32Type, $opr, $is_distinct), + DataType::Int64 => accumulator_helper!(Int64Type, $opr, $is_distinct), + DataType::UInt8 => accumulator_helper!(UInt8Type, $opr, $is_distinct), + DataType::UInt16 => accumulator_helper!(UInt16Type, $opr, $is_distinct), + DataType::UInt32 => accumulator_helper!(UInt32Type, $opr, $is_distinct), + DataType::UInt64 => accumulator_helper!(UInt64Type, $opr, $is_distinct), _ => { not_impl_err!( "{} not supported for {}: {}", @@ -84,7 +88,7 @@ macro_rules! downcast_bitwise_accumulator { /// `OPR_TYPE` is an expression that evaluates to the type of bitwise operation to be performed. macro_rules! make_bitwise_udaf_expr_and_func { ($EXPR_FN:ident, $AGGREGATE_UDF_FN:ident, $OPR_TYPE:expr) => { - make_udaf_expr!($EXPR_FN, expr_y expr_x, concat!("Returns the bitwise", stringify!($OPR_TYPE), "of a group of values"), $AGGREGATE_UDF_FN); + make_udaf_expr!($EXPR_FN, expr_x, concat!("Returns the bitwise", stringify!($OPR_TYPE), "of a group of values"), $AGGREGATE_UDF_FN); create_func!($EXPR_FN, $AGGREGATE_UDF_FN, BitwiseOperation::new($OPR_TYPE, stringify!($EXPR_FN))); } } @@ -99,8 +103,6 @@ enum BitwiseOperationType { And, Or, Xor, - /// `XorDistinct` is a variation of the bitwise XOR operation specifically for the scenario of BitXor DISTINCT - XorDistinct, } impl Display for BitwiseOperationType { @@ -143,7 +145,7 @@ impl AggregateUDFImpl for BitwiseOperation { fn return_type(&self, arg_types: &[DataType]) -> Result { let arg_type = &arg_types[0]; - if !is_bit_and_or_xor_support_arg_type(arg_type) { + if !arg_type.is_integer() { return exec_err!( "[return_type] {} not supported for {}", self.operation.to_string(), @@ -154,11 +156,7 @@ impl AggregateUDFImpl for BitwiseOperation { } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { - if acc_args.is_distinct && self.operation == BitwiseOperationType::Xor { - downcast_bitwise_accumulator!(acc_args, BitwiseOperationType::XorDistinct) - } else { - downcast_bitwise_accumulator!(acc_args, self.operation) - } + downcast_bitwise_accumulator!(acc_args, self.operation, acc_args.is_distinct) } fn state_fields(&self, args: StateFieldsArgs) -> Result> { @@ -394,7 +392,3 @@ where Ok(()) } } - -fn is_bit_and_or_xor_support_arg_type(arg_type: &DataType) -> bool { - INTEGERS.contains(arg_type) -} From a8f3c805d13c6aaa612892c1dd5da96abb241309 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Mon, 17 Jun 2024 11:09:27 +0530 Subject: [PATCH 14/16] update roundtrip_expr_api test --- datafusion/proto/tests/cases/roundtrip_logical_plan.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index a496e226855a..52696a106183 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -59,6 +59,7 @@ use datafusion_expr::{ TryCast, Volatility, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, WindowUDF, WindowUDFImpl, }; +use datafusion_functions_aggregate::expr_fn::{bit_and, bit_or, bit_xor}; use datafusion_proto::bytes::{ logical_plan_from_bytes, logical_plan_from_bytes_with_extension_codec, logical_plan_to_bytes, logical_plan_to_bytes_with_extension_codec, @@ -665,6 +666,9 @@ async fn roundtrip_expr_api() -> Result<()> { approx_median(lit(2)), approx_percentile_cont(lit(2), lit(0.5)), approx_percentile_cont_with_weight(lit(2), lit(1), lit(0.5)), + bit_and(lit(2)), + bit_or(lit(2)), + bit_xor(lit(2)), ]; // ensure expressions created with the expr api can be round tripped From 356c5575560a74b1422587eb274d41018daa7d6b Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Mon, 17 Jun 2024 11:21:23 +0530 Subject: [PATCH 15/16] linting --- .../functions-aggregate/src/bit_and_or_xor.rs | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/datafusion/functions-aggregate/src/bit_and_or_xor.rs b/datafusion/functions-aggregate/src/bit_and_or_xor.rs index 3c9e338bc026..99a170d51a14 100644 --- a/datafusion/functions-aggregate/src/bit_and_or_xor.rs +++ b/datafusion/functions-aggregate/src/bit_and_or_xor.rs @@ -29,12 +29,12 @@ use arrow::datatypes::{ }; use arrow_schema::Field; -use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue}; use datafusion_common::cast::as_list_array; -use datafusion_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility}; +use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::type_coercion::aggregates::INTEGERS; use datafusion_expr::utils::format_state_name; +use datafusion_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility}; /// `accumulator_helper` is a macro accepting (ArrowPrimitiveType, BitwiseOperationType, bool) macro_rules! accumulator_helper { @@ -88,9 +88,22 @@ macro_rules! downcast_bitwise_accumulator { /// `OPR_TYPE` is an expression that evaluates to the type of bitwise operation to be performed. macro_rules! make_bitwise_udaf_expr_and_func { ($EXPR_FN:ident, $AGGREGATE_UDF_FN:ident, $OPR_TYPE:expr) => { - make_udaf_expr!($EXPR_FN, expr_x, concat!("Returns the bitwise", stringify!($OPR_TYPE), "of a group of values"), $AGGREGATE_UDF_FN); - create_func!($EXPR_FN, $AGGREGATE_UDF_FN, BitwiseOperation::new($OPR_TYPE, stringify!($EXPR_FN))); - } + make_udaf_expr!( + $EXPR_FN, + expr_x, + concat!( + "Returns the bitwise", + stringify!($OPR_TYPE), + "of a group of values" + ), + $AGGREGATE_UDF_FN + ); + create_func!( + $EXPR_FN, + $AGGREGATE_UDF_FN, + BitwiseOperation::new($OPR_TYPE, stringify!($EXPR_FN)) + ); + }; } make_bitwise_udaf_expr_and_func!(bit_and, bit_and_udaf, BitwiseOperationType::And); From 782998207ea78163037f2c399b558eb3e166c25b Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Mon, 17 Jun 2024 15:48:52 +0530 Subject: [PATCH 16/16] support groups accumulator --- .../functions-aggregate/src/bit_and_or_xor.rs | 61 +++++++++++++++++-- 1 file changed, 56 insertions(+), 5 deletions(-) diff --git a/datafusion/functions-aggregate/src/bit_and_or_xor.rs b/datafusion/functions-aggregate/src/bit_and_or_xor.rs index 99a170d51a14..19e24f547d8a 100644 --- a/datafusion/functions-aggregate/src/bit_and_or_xor.rs +++ b/datafusion/functions-aggregate/src/bit_and_or_xor.rs @@ -22,7 +22,7 @@ use std::collections::HashSet; use std::fmt::{Display, Formatter}; use ahash::RandomState; -use arrow::array::{Array, ArrayRef, AsArray}; +use arrow::array::{downcast_integer, Array, ArrayRef, AsArray}; use arrow::datatypes::{ ArrowNativeType, ArrowNumericType, DataType, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, @@ -34,7 +34,31 @@ use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::type_coercion::aggregates::INTEGERS; use datafusion_expr::utils::format_state_name; -use datafusion_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility}; +use datafusion_expr::{ + Accumulator, AggregateUDFImpl, GroupsAccumulator, ReversedUDAF, Signature, Volatility, +}; + +use datafusion_physical_expr_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; +use std::ops::{BitAndAssign, BitOrAssign, BitXorAssign}; + +/// This macro helps create group accumulators based on bitwise operations typically used internally +/// and might not be necessary for users to call directly. +macro_rules! group_accumulator_helper { + ($t:ty, $dt:expr, $opr:expr) => { + match $opr { + BitwiseOperationType::And => Ok(Box::new( + PrimitiveGroupsAccumulator::<$t, _>::new($dt, |x, y| x.bitand_assign(y)) + .with_starting_value(!0), + )), + BitwiseOperationType::Or => Ok(Box::new( + PrimitiveGroupsAccumulator::<$t, _>::new($dt, |x, y| x.bitor_assign(y)), + )), + BitwiseOperationType::Xor => Ok(Box::new( + PrimitiveGroupsAccumulator::<$t, _>::new($dt, |x, y| x.bitxor_assign(y)), + )), + } + }; +} /// `accumulator_helper` is a macro accepting (ArrowPrimitiveType, BitwiseOperationType, bool) macro_rules! accumulator_helper { @@ -161,7 +185,7 @@ impl AggregateUDFImpl for BitwiseOperation { if !arg_type.is_integer() { return exec_err!( "[return_type] {} not supported for {}", - self.operation.to_string(), + self.name(), arg_type ); } @@ -175,18 +199,45 @@ impl AggregateUDFImpl for BitwiseOperation { fn state_fields(&self, args: StateFieldsArgs) -> Result> { if self.operation == BitwiseOperationType::Xor && args.is_distinct { Ok(vec![Field::new_list( - format_state_name(args.name, "xor distinct"), + format_state_name( + args.name, + format!("{} distinct", self.name()).as_str(), + ), Field::new("item", args.return_type.clone(), true), false, )]) } else { Ok(vec![Field::new( - format_state_name(args.name, self.operation.to_string().as_str()), + format_state_name(args.name, self.name()), args.return_type.clone(), true, )]) } } + + fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool { + true + } + + fn create_groups_accumulator( + &self, + args: AccumulatorArgs, + ) -> Result> { + let data_type = args.data_type; + let operation = &self.operation; + downcast_integer! { + data_type => (group_accumulator_helper, data_type, operation), + _ => not_impl_err!( + "GroupsAccumulator not supported for {} with {}", + self.name(), + data_type + ), + } + } + + fn reverse_expr(&self) -> ReversedUDAF { + ReversedUDAF::Identical + } } struct BitAndAccumulator {