From 6343467f10304c2c3e1f0899ecfc25330f57c2f1 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 6 Aug 2024 09:34:46 +0800 Subject: [PATCH] backup Signed-off-by: jayzhan211 --- Cargo.toml | 6 + datafusion/expr-common/Cargo.toml | 43 +++ .../{expr => expr-common}/src/accumulator.rs | 0 .../src/columnar_value.rs | 0 .../src/groups_accumulator.rs | 2 +- .../src/interval_arithmetic.rs | 12 +- datafusion/expr-common/src/lib.rs | 25 ++ datafusion/expr-common/src/operator.rs | 282 ++++++++++++++++++ .../{expr => expr-common}/src/signature.rs | 4 +- .../src/sort_properties.rs | 0 datafusion/expr-common/src/type_coercion.rs | 19 ++ .../src/type_coercion/aggregates.rs | 2 +- .../src/type_coercion/binary.rs | 4 +- .../expr-functions-aggregate/Cargo.toml | 50 ++++ .../src/aggregate.rs} | 260 ++++------------ .../src/lib.rs} | 7 +- datafusion/expr/Cargo.toml | 5 +- datafusion/expr/src/function.rs | 10 +- datafusion/expr/src/lib.rs | 27 +- datafusion/expr/src/operator.rs | 266 +---------------- .../expr/src/type_coercion/functions.rs | 2 +- datafusion/expr/src/type_coercion/mod.rs | 7 +- datafusion/expr/src/udaf.rs | 2 +- datafusion/expr/src/udf.rs | 2 +- datafusion/expr/src/utils.rs | 88 ++++-- .../functions-aggregate-common/Cargo.toml | 49 +++ .../src/aggregate.rs | 190 ++++++++++++ .../src/aggregate/count_distinct.rs} | 0 .../src/aggregate/count_distinct/bytes.rs | 6 +- .../src/aggregate/count_distinct/native.rs | 4 +- .../src/aggregate/groups_accumulator.rs} | 0 .../groups_accumulator/accumulate.rs | 2 +- .../aggregate/groups_accumulator/bool_op.rs | 2 +- .../aggregate/groups_accumulator/prim_op.rs | 2 +- .../functions-aggregate-common/src/lib.rs | 25 ++ .../src}/merge_arrays.rs | 0 .../functions-aggregate-common/src/order.rs | 30 ++ .../src}/stats.rs | 0 .../src}/tdigest.rs | 0 .../src}/utils.rs | 7 +- datafusion/functions-aggregate/Cargo.toml | 2 + .../functions-aggregate/benches/count.rs | 5 +- datafusion/functions-aggregate/benches/sum.rs | 5 +- .../src/approx_percentile_cont.rs | 61 +++- .../src/approx_percentile_cont_with_weight.rs | 11 +- .../functions-aggregate/src/array_agg.rs | 17 +- datafusion/functions-aggregate/src/average.rs | 4 +- .../functions-aggregate/src/bit_and_or_xor.rs | 2 +- .../functions-aggregate/src/bool_and_or.rs | 2 +- .../functions-aggregate/src/correlation.rs | 2 +- datafusion/functions-aggregate/src/count.rs | 12 +- .../functions-aggregate/src/covariance.rs | 2 +- .../functions-aggregate/src/first_last.rs | 26 +- datafusion/functions-aggregate/src/median.rs | 2 +- datafusion/functions-aggregate/src/min_max.rs | 2 +- .../functions-aggregate/src/nth_value.rs | 63 ++-- datafusion/functions-aggregate/src/stddev.rs | 17 +- .../functions-aggregate/src/string_agg.rs | 44 ++- datafusion/functions-aggregate/src/sum.rs | 4 +- .../functions-aggregate/src/variance.rs | 2 +- datafusion/physical-expr-common/Cargo.toml | 2 +- .../physical-expr-common/src/binary_map.rs | 2 +- datafusion/physical-expr-common/src/datum.rs | 3 +- datafusion/physical-expr-common/src/lib.rs | 2 - .../physical-expr-common/src/physical_expr.rs | 39 +-- .../physical-expr-common/src/sort_expr.rs | 32 +- datafusion/physical-expr-common/src/utils.rs | 43 +-- datafusion/physical-expr/Cargo.toml | 2 + datafusion/physical-expr/benches/case_when.rs | 3 +- datafusion/physical-expr/benches/is_null.rs | 2 +- .../src/aggregate/groups_accumulator/mod.rs | 4 +- datafusion/physical-expr/src/aggregate/mod.rs | 4 +- .../physical-expr/src/aggregate/stats.rs | 2 +- .../src/equivalence/properties.rs | 5 +- .../physical-expr/src/expressions/binary.rs | 2 +- .../physical-expr/src/expressions/case.rs | 6 +- .../src/expressions/cast.rs | 6 +- .../src/expressions/column.rs | 34 ++- .../src/expressions/literal.rs | 7 +- .../physical-expr/src/expressions/mod.rs | 9 +- datafusion/physical-expr/src/lib.rs | 2 +- datafusion/physical-expr/src/physical_expr.rs | 2 +- datafusion/physical-plan/Cargo.toml | 2 + .../physical-plan/src/aggregates/mod.rs | 16 +- datafusion/physical-plan/src/lib.rs | 2 +- datafusion/physical-plan/src/union.rs | 2 +- datafusion/physical-plan/src/windows/mod.rs | 6 +- 87 files changed, 1159 insertions(+), 809 deletions(-) create mode 100644 datafusion/expr-common/Cargo.toml rename datafusion/{expr => expr-common}/src/accumulator.rs (100%) rename datafusion/{expr => expr-common}/src/columnar_value.rs (100%) rename datafusion/{expr => expr-common}/src/groups_accumulator.rs (99%) rename datafusion/{expr => expr-common}/src/interval_arithmetic.rs (99%) create mode 100644 datafusion/expr-common/src/lib.rs create mode 100644 datafusion/expr-common/src/operator.rs rename datafusion/{expr => expr-common}/src/signature.rs (99%) rename datafusion/{expr => expr-common}/src/sort_properties.rs (100%) create mode 100644 datafusion/expr-common/src/type_coercion.rs rename datafusion/{expr => expr-common}/src/type_coercion/aggregates.rs (99%) rename datafusion/{expr => expr-common}/src/type_coercion/binary.rs (99%) create mode 100644 datafusion/expr-functions-aggregate/Cargo.toml rename datafusion/{physical-expr-common/src/aggregate/mod.rs => expr-functions-aggregate/src/aggregate.rs} (69%) rename datafusion/{physical-expr-common/src/expressions/mod.rs => expr-functions-aggregate/src/lib.rs} (86%) create mode 100644 datafusion/functions-aggregate-common/Cargo.toml create mode 100644 datafusion/functions-aggregate-common/src/aggregate.rs rename datafusion/{physical-expr-common/src/aggregate/count_distinct/mod.rs => functions-aggregate-common/src/aggregate/count_distinct.rs} (100%) rename datafusion/{physical-expr-common => functions-aggregate-common}/src/aggregate/count_distinct/bytes.rs (95%) rename datafusion/{physical-expr-common => functions-aggregate-common}/src/aggregate/count_distinct/native.rs (98%) rename datafusion/{physical-expr-common/src/aggregate/groups_accumulator/mod.rs => functions-aggregate-common/src/aggregate/groups_accumulator.rs} (100%) rename datafusion/{physical-expr-common => functions-aggregate-common}/src/aggregate/groups_accumulator/accumulate.rs (99%) rename datafusion/{physical-expr-common => functions-aggregate-common}/src/aggregate/groups_accumulator/bool_op.rs (98%) rename datafusion/{physical-expr-common => functions-aggregate-common}/src/aggregate/groups_accumulator/prim_op.rs (98%) create mode 100644 datafusion/functions-aggregate-common/src/lib.rs rename datafusion/{physical-expr-common/src/aggregate => functions-aggregate-common/src}/merge_arrays.rs (100%) create mode 100644 datafusion/functions-aggregate-common/src/order.rs rename datafusion/{physical-expr-common/src/aggregate => functions-aggregate-common/src}/stats.rs (100%) rename datafusion/{physical-expr-common/src/aggregate => functions-aggregate-common/src}/tdigest.rs (100%) rename datafusion/{physical-expr-common/src/aggregate => functions-aggregate-common/src}/utils.rs (98%) rename datafusion/{physical-expr-common => physical-expr}/src/expressions/cast.rs (99%) rename datafusion/{physical-expr-common => physical-expr}/src/expressions/column.rs (83%) rename datafusion/{physical-expr-common => physical-expr}/src/expressions/literal.rs (95%) diff --git a/Cargo.toml b/Cargo.toml index 90aff3f715cab..d0ac37a892dce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,8 +23,11 @@ members = [ "datafusion/catalog", "datafusion/core", "datafusion/expr", + "datafusion/expr-common", + "datafusion/expr-functions-aggregate", "datafusion/execution", "datafusion/functions-aggregate", + "datafusion/functions-aggregate-common", "datafusion/functions", "datafusion/functions-nested", "datafusion/optimizer", @@ -94,8 +97,11 @@ datafusion-common = { path = "datafusion/common", version = "40.0.0", default-fe datafusion-common-runtime = { path = "datafusion/common-runtime", version = "40.0.0" } datafusion-execution = { path = "datafusion/execution", version = "40.0.0" } datafusion-expr = { path = "datafusion/expr", version = "40.0.0" } +datafusion-expr-common = { path = "datafusion/expr-common", version = "40.0.0" } +datafusion-expr-functions-aggregate = { path = "datafusion/expr-functions-aggregate", version = "40.0.0" } datafusion-functions = { path = "datafusion/functions", version = "40.0.0" } datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "40.0.0" } +datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "40.0.0" } datafusion-functions-nested = { path = "datafusion/functions-nested", version = "40.0.0" } datafusion-optimizer = { path = "datafusion/optimizer", version = "40.0.0", default-features = false } datafusion-physical-expr = { path = "datafusion/physical-expr", version = "40.0.0", default-features = false } diff --git a/datafusion/expr-common/Cargo.toml b/datafusion/expr-common/Cargo.toml new file mode 100644 index 0000000000000..7e477efc4ebc1 --- /dev/null +++ b/datafusion/expr-common/Cargo.toml @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-expr-common" +description = "Logical plan and expression representation for DataFusion query engine" +keywords = ["datafusion", "logical", "plan", "expressions"] +readme = "README.md" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +authors = { workspace = true } +rust-version = { workspace = true } + +[lints] +workspace = true + +[lib] +name = "datafusion_expr_common" +path = "src/lib.rs" + +[features] + +[dependencies] +arrow = { workspace = true } +datafusion-common = { workspace = true } +paste = "^1.0" diff --git a/datafusion/expr/src/accumulator.rs b/datafusion/expr-common/src/accumulator.rs similarity index 100% rename from datafusion/expr/src/accumulator.rs rename to datafusion/expr-common/src/accumulator.rs diff --git a/datafusion/expr/src/columnar_value.rs b/datafusion/expr-common/src/columnar_value.rs similarity index 100% rename from datafusion/expr/src/columnar_value.rs rename to datafusion/expr-common/src/columnar_value.rs diff --git a/datafusion/expr/src/groups_accumulator.rs b/datafusion/expr-common/src/groups_accumulator.rs similarity index 99% rename from datafusion/expr/src/groups_accumulator.rs rename to datafusion/expr-common/src/groups_accumulator.rs index 886bd8443e4d3..055d731b114c1 100644 --- a/datafusion/expr/src/groups_accumulator.rs +++ b/datafusion/expr-common/src/groups_accumulator.rs @@ -17,7 +17,7 @@ //! Vectorized [`GroupsAccumulator`] -use arrow_array::{ArrayRef, BooleanArray}; +use arrow::array::{ArrayRef, BooleanArray}; use datafusion_common::{not_impl_err, Result}; /// Describes how many rows should be emitted during grouping. diff --git a/datafusion/expr/src/interval_arithmetic.rs b/datafusion/expr-common/src/interval_arithmetic.rs similarity index 99% rename from datafusion/expr/src/interval_arithmetic.rs rename to datafusion/expr-common/src/interval_arithmetic.rs index 553cdd8c87097..f7075a75f7ab1 100644 --- a/datafusion/expr/src/interval_arithmetic.rs +++ b/datafusion/expr-common/src/interval_arithmetic.rs @@ -18,8 +18,8 @@ //! Interval arithmetic library use crate::type_coercion::binary::get_result_type; -use crate::Operator; -use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; +use crate::operator::Operator; +use arrow::datatypes::{IntervalDayTime, IntervalMonthDayNano}; use std::borrow::Borrow; use std::fmt::{self, Display, Formatter}; use std::ops::{AddAssign, SubAssign}; @@ -120,12 +120,12 @@ macro_rules! value_transition { IntervalYearMonth(None) } IntervalDayTime(Some(value)) - if value == arrow_buffer::IntervalDayTime::$bound => + if value == arrow::datatypes::IntervalDayTime::$bound => { IntervalDayTime(None) } IntervalMonthDayNano(Some(value)) - if value == arrow_buffer::IntervalMonthDayNano::$bound => + if value == arrow::datatypes::IntervalMonthDayNano::$bound => { IntervalMonthDayNano(None) } @@ -1135,12 +1135,12 @@ fn next_value_helper(value: ScalarValue) -> ScalarValue { } IntervalDayTime(Some(val)) => IntervalDayTime(Some(increment_decrement::< INC, - arrow_buffer::IntervalDayTime, + arrow::datatypes::IntervalDayTime, >(val))), IntervalMonthDayNano(Some(val)) => { IntervalMonthDayNano(Some(increment_decrement::< INC, - arrow_buffer::IntervalMonthDayNano, + arrow::datatypes::IntervalMonthDayNano, >(val))) } _ => value, // Unbounded values return without change. diff --git a/datafusion/expr-common/src/lib.rs b/datafusion/expr-common/src/lib.rs new file mode 100644 index 0000000000000..e880a590646ae --- /dev/null +++ b/datafusion/expr-common/src/lib.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod accumulator; +pub mod columnar_value; +pub mod groups_accumulator; +pub mod interval_arithmetic; +pub mod operator; +pub mod signature; +pub mod sort_properties; +pub mod type_coercion; \ No newline at end of file diff --git a/datafusion/expr-common/src/operator.rs b/datafusion/expr-common/src/operator.rs new file mode 100644 index 0000000000000..e013b6fafa22d --- /dev/null +++ b/datafusion/expr-common/src/operator.rs @@ -0,0 +1,282 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt; + +/// Operators applied to expressions +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Hash)] +pub enum Operator { + /// Expressions are equal + Eq, + /// Expressions are not equal + NotEq, + /// Left side is smaller than right side + Lt, + /// Left side is smaller or equal to right side + LtEq, + /// Left side is greater than right side + Gt, + /// Left side is greater or equal to right side + GtEq, + /// Addition + Plus, + /// Subtraction + Minus, + /// Multiplication operator, like `*` + Multiply, + /// Division operator, like `/` + Divide, + /// Remainder operator, like `%` + Modulo, + /// Logical AND, like `&&` + And, + /// Logical OR, like `||` + Or, + /// `IS DISTINCT FROM` (see [`distinct`]) + /// + /// [`distinct`]: arrow::compute::kernels::cmp::distinct + IsDistinctFrom, + /// `IS NOT DISTINCT FROM` (see [`not_distinct`]) + /// + /// [`not_distinct`]: arrow::compute::kernels::cmp::not_distinct + IsNotDistinctFrom, + /// Case sensitive regex match + RegexMatch, + /// Case insensitive regex match + RegexIMatch, + /// Case sensitive regex not match + RegexNotMatch, + /// Case insensitive regex not match + RegexNotIMatch, + /// Case sensitive pattern match + LikeMatch, + /// Case insensitive pattern match + ILikeMatch, + /// Case sensitive pattern not match + NotLikeMatch, + /// Case insensitive pattern not match + NotILikeMatch, + /// Bitwise and, like `&` + BitwiseAnd, + /// Bitwise or, like `|` + BitwiseOr, + /// Bitwise xor, such as `^` in MySQL or `#` in PostgreSQL + BitwiseXor, + /// Bitwise right, like `>>` + BitwiseShiftRight, + /// Bitwise left, like `<<` + BitwiseShiftLeft, + /// String concat + StringConcat, + /// At arrow, like `@>` + AtArrow, + /// Arrow at, like `<@` + ArrowAt, +} + +impl Operator { + /// If the operator can be negated, return the negated operator + /// otherwise return None + pub fn negate(&self) -> Option { + match self { + Operator::Eq => Some(Operator::NotEq), + Operator::NotEq => Some(Operator::Eq), + Operator::Lt => Some(Operator::GtEq), + Operator::LtEq => Some(Operator::Gt), + Operator::Gt => Some(Operator::LtEq), + Operator::GtEq => Some(Operator::Lt), + Operator::IsDistinctFrom => Some(Operator::IsNotDistinctFrom), + Operator::IsNotDistinctFrom => Some(Operator::IsDistinctFrom), + Operator::LikeMatch => Some(Operator::NotLikeMatch), + Operator::ILikeMatch => Some(Operator::NotILikeMatch), + Operator::NotLikeMatch => Some(Operator::LikeMatch), + Operator::NotILikeMatch => Some(Operator::ILikeMatch), + Operator::Plus + | Operator::Minus + | Operator::Multiply + | Operator::Divide + | Operator::Modulo + | Operator::And + | Operator::Or + | Operator::RegexMatch + | Operator::RegexIMatch + | Operator::RegexNotMatch + | Operator::RegexNotIMatch + | Operator::BitwiseAnd + | Operator::BitwiseOr + | Operator::BitwiseXor + | Operator::BitwiseShiftRight + | Operator::BitwiseShiftLeft + | Operator::StringConcat + | Operator::AtArrow + | Operator::ArrowAt => None, + } + } + + /// Return true if the operator is a numerical operator. + /// + /// For example, 'Binary(a, +, b)' would be a numerical expression. + /// PostgresSQL concept: + pub fn is_numerical_operators(&self) -> bool { + matches!( + self, + Operator::Plus + | Operator::Minus + | Operator::Multiply + | Operator::Divide + | Operator::Modulo + ) + } + + /// Return true if the operator is a comparison operator. + /// + /// For example, 'Binary(a, >, b)' would be a comparison expression. + pub fn is_comparison_operator(&self) -> bool { + matches!( + self, + Operator::Eq + | Operator::NotEq + | Operator::Lt + | Operator::LtEq + | Operator::Gt + | Operator::GtEq + | Operator::IsDistinctFrom + | Operator::IsNotDistinctFrom + | Operator::RegexMatch + | Operator::RegexIMatch + | Operator::RegexNotMatch + | Operator::RegexNotIMatch + ) + } + + /// Return true if the operator is a logic operator. + /// + /// For example, 'Binary(Binary(a, >, b), AND, Binary(a, <, b + 3))' would + /// be a logical expression. + pub fn is_logic_operator(&self) -> bool { + matches!(self, Operator::And | Operator::Or) + } + + /// Return the operator where swapping lhs and rhs wouldn't change the result. + /// + /// For example `Binary(50, >=, a)` could also be represented as `Binary(a, <=, 50)`. + pub fn swap(&self) -> Option { + match self { + Operator::Eq => Some(Operator::Eq), + Operator::NotEq => Some(Operator::NotEq), + Operator::Lt => Some(Operator::Gt), + Operator::LtEq => Some(Operator::GtEq), + Operator::Gt => Some(Operator::Lt), + Operator::GtEq => Some(Operator::LtEq), + Operator::AtArrow => Some(Operator::ArrowAt), + Operator::ArrowAt => Some(Operator::AtArrow), + Operator::IsDistinctFrom + | Operator::IsNotDistinctFrom + | Operator::Plus + | Operator::Minus + | Operator::Multiply + | Operator::Divide + | Operator::Modulo + | Operator::And + | Operator::Or + | Operator::RegexMatch + | Operator::RegexIMatch + | Operator::RegexNotMatch + | Operator::RegexNotIMatch + | Operator::LikeMatch + | Operator::ILikeMatch + | Operator::NotLikeMatch + | Operator::NotILikeMatch + | Operator::BitwiseAnd + | Operator::BitwiseOr + | Operator::BitwiseXor + | Operator::BitwiseShiftRight + | Operator::BitwiseShiftLeft + | Operator::StringConcat => None, + } + } + + /// Get the operator precedence + /// use as a reference + pub fn precedence(&self) -> u8 { + match self { + Operator::Or => 5, + Operator::And => 10, + Operator::Eq | Operator::NotEq | Operator::LtEq | Operator::GtEq => 15, + Operator::Lt | Operator::Gt => 20, + Operator::LikeMatch + | Operator::NotLikeMatch + | Operator::ILikeMatch + | Operator::NotILikeMatch => 25, + Operator::IsDistinctFrom + | Operator::IsNotDistinctFrom + | Operator::RegexMatch + | Operator::RegexNotMatch + | Operator::RegexIMatch + | Operator::RegexNotIMatch + | Operator::BitwiseAnd + | Operator::BitwiseOr + | Operator::BitwiseShiftLeft + | Operator::BitwiseShiftRight + | Operator::BitwiseXor + | Operator::StringConcat + | Operator::AtArrow + | Operator::ArrowAt => 30, + Operator::Plus | Operator::Minus => 40, + Operator::Multiply | Operator::Divide | Operator::Modulo => 45, + } + } +} + +impl fmt::Display for Operator { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let display = match &self { + Operator::Eq => "=", + Operator::NotEq => "!=", + Operator::Lt => "<", + Operator::LtEq => "<=", + Operator::Gt => ">", + Operator::GtEq => ">=", + Operator::Plus => "+", + Operator::Minus => "-", + Operator::Multiply => "*", + Operator::Divide => "/", + Operator::Modulo => "%", + Operator::And => "AND", + Operator::Or => "OR", + Operator::RegexMatch => "~", + Operator::RegexIMatch => "~*", + Operator::RegexNotMatch => "!~", + Operator::RegexNotIMatch => "!~*", + Operator::LikeMatch => "~~", + Operator::ILikeMatch => "~~*", + Operator::NotLikeMatch => "!~~", + Operator::NotILikeMatch => "!~~*", + Operator::IsDistinctFrom => "IS DISTINCT FROM", + Operator::IsNotDistinctFrom => "IS NOT DISTINCT FROM", + Operator::BitwiseAnd => "&", + Operator::BitwiseOr => "|", + Operator::BitwiseXor => "BIT_XOR", + Operator::BitwiseShiftRight => ">>", + Operator::BitwiseShiftLeft => "<<", + Operator::StringConcat => "||", + Operator::AtArrow => "@>", + Operator::ArrowAt => "<@", + }; + write!(f, "{display}") + } +} diff --git a/datafusion/expr/src/signature.rs b/datafusion/expr-common/src/signature.rs similarity index 99% rename from datafusion/expr/src/signature.rs rename to datafusion/expr-common/src/signature.rs index b1cec3bad774b..59a715cc9071f 100644 --- a/datafusion/expr/src/signature.rs +++ b/datafusion/expr-common/src/signature.rs @@ -170,7 +170,7 @@ impl std::fmt::Display for ArrayFunctionSignature { } impl TypeSignature { - pub(crate) fn to_string_repr(&self) -> Vec { + pub fn to_string_repr(&self) -> Vec { match self { TypeSignature::Variadic(types) => { vec![format!("{}, ..", Self::join_types(types, "/"))] @@ -207,7 +207,7 @@ impl TypeSignature { } /// Helper function to join types with specified delimiter. - pub(crate) fn join_types( + pub fn join_types( types: &[T], delimiter: &str, ) -> String { diff --git a/datafusion/expr/src/sort_properties.rs b/datafusion/expr-common/src/sort_properties.rs similarity index 100% rename from datafusion/expr/src/sort_properties.rs rename to datafusion/expr-common/src/sort_properties.rs diff --git a/datafusion/expr-common/src/type_coercion.rs b/datafusion/expr-common/src/type_coercion.rs new file mode 100644 index 0000000000000..6bb1a1a3317db --- /dev/null +++ b/datafusion/expr-common/src/type_coercion.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod aggregates; +pub mod binary; \ No newline at end of file diff --git a/datafusion/expr/src/type_coercion/aggregates.rs b/datafusion/expr-common/src/type_coercion/aggregates.rs similarity index 99% rename from datafusion/expr/src/type_coercion/aggregates.rs rename to datafusion/expr-common/src/type_coercion/aggregates.rs index e7e58bf84362e..40ee596eee05a 100644 --- a/datafusion/expr/src/type_coercion/aggregates.rs +++ b/datafusion/expr-common/src/type_coercion/aggregates.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::TypeSignature; +use crate::signature::TypeSignature; use arrow::datatypes::{ DataType, TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DECIMAL256_MAX_PRECISION, DECIMAL256_MAX_SCALE, diff --git a/datafusion/expr/src/type_coercion/binary.rs b/datafusion/expr-common/src/type_coercion/binary.rs similarity index 99% rename from datafusion/expr/src/type_coercion/binary.rs rename to datafusion/expr-common/src/type_coercion/binary.rs index 17280289ed1b4..f59b8c085819a 100644 --- a/datafusion/expr/src/type_coercion/binary.rs +++ b/datafusion/expr-common/src/type_coercion/binary.rs @@ -20,7 +20,7 @@ use std::collections::HashSet; use std::sync::Arc; -use crate::Operator; +use crate::operator::Operator; use arrow::array::{new_empty_array, Array}; use arrow::compute::can_cast_types; @@ -569,7 +569,7 @@ fn string_temporal_coercion( } /// Coerce `lhs_type` and `rhs_type` to a common type where both are numeric -pub(crate) fn binary_numeric_coercion( +pub fn binary_numeric_coercion( lhs_type: &DataType, rhs_type: &DataType, ) -> Option { diff --git a/datafusion/expr-functions-aggregate/Cargo.toml b/datafusion/expr-functions-aggregate/Cargo.toml new file mode 100644 index 0000000000000..5e20ac651f58d --- /dev/null +++ b/datafusion/expr-functions-aggregate/Cargo.toml @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-expr-functions-aggregate" +description = "Logical plan and expression representation for DataFusion query engine" +keywords = ["datafusion", "logical", "plan", "expressions"] +readme = "README.md" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +authors = { workspace = true } +rust-version = { workspace = true } + +[lints] +workspace = true + +[lib] +name = "datafusion_expr_functions_aggregate" +path = "src/lib.rs" + +[features] + +[dependencies] +ahash = { workspace = true } +arrow = { workspace = true } +datafusion-common = { workspace = true } +datafusion-expr-common = { workspace = true } +datafusion-expr = { workspace = true } +datafusion-physical-expr-common = { workspace = true } +datafusion-functions-aggregate-common = { workspace = true } +rand = { workspace = true } +# strum = { version = "0.26.1", features = ["derive"] } +# strum_macros = "0.26.0" diff --git a/datafusion/physical-expr-common/src/aggregate/mod.rs b/datafusion/expr-functions-aggregate/src/aggregate.rs similarity index 69% rename from datafusion/physical-expr-common/src/aggregate/mod.rs rename to datafusion/expr-functions-aggregate/src/aggregate.rs index 665cdd708329f..7634a0a8dbc49 100644 --- a/datafusion/physical-expr-common/src/aggregate/mod.rs +++ b/datafusion/expr-functions-aggregate/src/aggregate.rs @@ -15,33 +15,27 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::Debug; -use std::{any::Any, sync::Arc}; - use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - -use datafusion_common::exec_err; use datafusion_common::{internal_err, not_impl_err, DFSchema, Result}; use datafusion_expr::function::StateFieldsArgs; -use datafusion_expr::type_coercion::aggregates::check_arg_count; use datafusion_expr::utils::AggregateOrderSensitivity; use datafusion_expr::ReversedUDAF; use datafusion_expr::{ - function::AccumulatorArgs, Accumulator, AggregateUDF, Expr, GroupsAccumulator, + function::AccumulatorArgs, Accumulator, AggregateUDF, Expr, }; -use crate::physical_expr::PhysicalExpr; -use crate::sort_expr::{LexOrdering, PhysicalSortExpr}; -use crate::utils::reverse_order_bys; +use datafusion_expr_common::type_coercion::aggregates::check_arg_count; +use datafusion_functions_aggregate_common::aggregate::AggregateExpr; -use self::utils::down_cast_any_ref; +use datafusion_expr_common::groups_accumulator::GroupsAccumulator; -pub mod count_distinct; -pub mod groups_accumulator; -pub mod merge_arrays; -pub mod stats; -pub mod tdigest; -pub mod utils; +use datafusion_functions_aggregate_common::utils::{self, down_cast_any_ref}; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion_physical_expr_common::utils::reverse_order_bys; + +use std::fmt::Debug; +use std::{any::Any, sync::Arc}; /// Creates a physical expression of the UDAF, that includes all necessary type coercion. /// This function errors when `args`' can't be coerced to a valid argument type of the UDAF. @@ -73,7 +67,7 @@ pub fn create_aggregate_expr( ) -> Result> { let mut builder = AggregateExprBuilder::new(Arc::new(fun.clone()), input_phy_exprs.to_vec()); - builder = builder.sort_exprs(sort_exprs.to_vec()); + // builder = builder.sort_exprs(sort_exprs.to_vec()); builder = builder.order_by(ordering_req.to_vec()); builder = builder.logical_exprs(input_exprs.to_vec()); builder = builder.schema(Arc::new(schema.clone())); @@ -94,8 +88,8 @@ pub fn create_aggregate_expr( pub fn create_aggregate_expr_with_dfschema( fun: &AggregateUDF, input_phy_exprs: &[Arc], - input_exprs: &[Expr], - sort_exprs: &[Expr], + // input_exprs: &[Expr], + // sort_exprs: &[Expr], ordering_req: &[PhysicalSortExpr], dfschema: &DFSchema, name: impl Into, @@ -105,9 +99,9 @@ pub fn create_aggregate_expr_with_dfschema( ) -> Result> { let mut builder = AggregateExprBuilder::new(Arc::new(fun.clone()), input_phy_exprs.to_vec()); - builder = builder.sort_exprs(sort_exprs.to_vec()); + // builder = builder.sort_exprs(sort_exprs.to_vec()); builder = builder.order_by(ordering_req.to_vec()); - builder = builder.logical_exprs(input_exprs.to_vec()); + // builder = builder.logical_exprs(input_exprs.to_vec()); builder = builder.dfschema(dfschema.clone()); let schema: Schema = dfschema.into(); builder = builder.schema(Arc::new(schema)); @@ -217,12 +211,12 @@ impl AggregateExprBuilder { Ok(Arc::new(AggregateFunctionExpr { fun: Arc::unwrap_or_clone(fun), args, - logical_args, + // logical_args, data_type, name, schema: Arc::unwrap_or_clone(schema), dfschema, - sort_exprs, + // sort_exprs, ordering_req, ignore_nulls, ordering_fields, @@ -283,10 +277,10 @@ impl AggregateExprBuilder { } /// This method will be deprecated in - pub fn sort_exprs(mut self, sort_exprs: Vec) -> Self { - self.sort_exprs = sort_exprs; - self - } + // pub fn sort_exprs(mut self, sort_exprs: Vec) -> Self { + // self.sort_exprs = sort_exprs; + // self + // } /// This method will be deprecated in pub fn logical_exprs(mut self, logical_args: Vec) -> Self { @@ -295,171 +289,16 @@ impl AggregateExprBuilder { } } -/// An aggregate expression that: -/// * knows its resulting field -/// * knows how to create its accumulator -/// * knows its accumulator's state's field -/// * knows the expressions from whose its accumulator will receive values -/// -/// Any implementation of this trait also needs to implement the -/// `PartialEq` to allows comparing equality between the -/// trait objects. -pub trait AggregateExpr: Send + Sync + Debug + PartialEq { - /// Returns the aggregate expression as [`Any`] so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - - /// the field of the final result of this aggregation. - fn field(&self) -> Result; - - /// the accumulator used to accumulate values from the expressions. - /// the accumulator expects the same number of arguments as `expressions` and must - /// return states with the same description as `state_fields` - fn create_accumulator(&self) -> Result>; - - /// the fields that encapsulate the Accumulator's state - /// the number of fields here equals the number of states that the accumulator contains - fn state_fields(&self) -> Result>; - - /// expressions that are passed to the Accumulator. - /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many. - fn expressions(&self) -> Vec>; - - /// Order by requirements for the aggregate function - /// By default it is `None` (there is no requirement) - /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` should implement this - fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { - None - } - - /// Indicates whether aggregator can produce the correct result with any - /// arbitrary input ordering. By default, we assume that aggregate expressions - /// are order insensitive. - fn order_sensitivity(&self) -> AggregateOrderSensitivity { - AggregateOrderSensitivity::Insensitive - } - - /// Sets the indicator whether ordering requirements of the aggregator is - /// satisfied by its input. If this is not the case, aggregators with order - /// sensitivity `AggregateOrderSensitivity::Beneficial` can still produce - /// the correct result with possibly more work internally. - /// - /// # Returns - /// - /// Returns `Ok(Some(updated_expr))` if the process completes successfully. - /// If the expression can benefit from existing input ordering, but does - /// not implement the method, returns an error. Order insensitive and hard - /// requirement aggregators return `Ok(None)`. - fn with_beneficial_ordering( - self: Arc, - _requirement_satisfied: bool, - ) -> Result>> { - if self.order_bys().is_some() && self.order_sensitivity().is_beneficial() { - return exec_err!( - "Should implement with satisfied for aggregator :{:?}", - self.name() - ); - } - Ok(None) - } - - /// Human readable name such as `"MIN(c2)"`. The default - /// implementation returns placeholder text. - fn name(&self) -> &str { - "AggregateExpr: default name" - } - - /// If the aggregate expression has a specialized - /// [`GroupsAccumulator`] implementation. If this returns true, - /// `[Self::create_groups_accumulator`] will be called. - fn groups_accumulator_supported(&self) -> bool { - false - } - - /// Return a specialized [`GroupsAccumulator`] that manages state - /// for all groups. - /// - /// For maximum performance, a [`GroupsAccumulator`] should be - /// implemented in addition to [`Accumulator`]. - fn create_groups_accumulator(&self) -> Result> { - not_impl_err!("GroupsAccumulator hasn't been implemented for {self:?} yet") - } - - /// Construct an expression that calculates the aggregate in reverse. - /// Typically the "reverse" expression is itself (e.g. SUM, COUNT). - /// For aggregates that do not support calculation in reverse, - /// returns None (which is the default value). - fn reverse_expr(&self) -> Option> { - None - } - - /// Creates accumulator implementation that supports retract - fn create_sliding_accumulator(&self) -> Result> { - not_impl_err!("Retractable Accumulator hasn't been implemented for {self:?} yet") - } - - /// Returns all expressions used in the [`AggregateExpr`]. - /// These expressions are (1)function arguments, (2) order by expressions. - fn all_expressions(&self) -> AggregatePhysicalExpressions { - let args = self.expressions(); - let order_bys = self.order_bys().unwrap_or(&[]); - let order_by_exprs = order_bys - .iter() - .map(|sort_expr| sort_expr.expr.clone()) - .collect::>(); - AggregatePhysicalExpressions { - args, - order_by_exprs, - } - } - - /// Rewrites [`AggregateExpr`], with new expressions given. The argument should be consistent - /// with the return value of the [`AggregateExpr::all_expressions`] method. - /// Returns `Some(Arc)` if re-write is supported, otherwise returns `None`. - /// TODO: This method only rewrites the [`PhysicalExpr`]s and does not handle [`Expr`]s. - /// This can cause silent bugs and should be fixed in the future (possibly with physical-to-logical - /// conversions). - fn with_new_expressions( - &self, - _args: Vec>, - _order_by_exprs: Vec>, - ) -> Option> { - None - } - - /// If this function is max, return (output_field, true) - /// if the function is min, return (output_field, false) - /// otherwise return None (the default) - /// - /// output_field is the name of the column produced by this aggregate - /// - /// Note: this is used to use special aggregate implementations in certain conditions - fn get_minmax_desc(&self) -> Option<(Field, bool)> { - None - } -} - -/// Stores the physical expressions used inside the `AggregateExpr`. -pub struct AggregatePhysicalExpressions { - /// Aggregate function arguments - pub args: Vec>, - /// Order by expressions - pub order_by_exprs: Vec>, -} - /// Physical aggregate expression of a UDAF. #[derive(Debug, Clone)] pub struct AggregateFunctionExpr { fun: AggregateUDF, args: Vec>, - logical_args: Vec, /// Output / return type of this aggregate data_type: DataType, name: String, schema: Schema, dfschema: DFSchema, - // The logical order by expressions - sort_exprs: Vec, // The physical order by expressions ordering_req: LexOrdering, // Whether to ignore null values @@ -525,12 +364,14 @@ impl AggregateExpr for AggregateFunctionExpr { schema: &self.schema, dfschema: &self.dfschema, ignore_nulls: self.ignore_nulls, - sort_exprs: &self.sort_exprs, + // sort_exprs: &self.sort_exprs, + ordering_req: &self.ordering_req, is_distinct: self.is_distinct, input_types: &self.input_types, - input_exprs: &self.logical_args, + // input_exprs: &self.logical_args, name: &self.name, is_reversed: self.is_reversed, + physical_exprs: &self.args, }; self.fun.accumulator(acc_args) @@ -542,12 +383,13 @@ impl AggregateExpr for AggregateFunctionExpr { schema: &self.schema, dfschema: &self.dfschema, ignore_nulls: self.ignore_nulls, - sort_exprs: &self.sort_exprs, + ordering_req: &self.ordering_req, is_distinct: self.is_distinct, input_types: &self.input_types, - input_exprs: &self.logical_args, + // input_exprs: &self.logical_args, name: &self.name, is_reversed: self.is_reversed, + physical_exprs: &self.args, }; let accumulator = self.fun.create_sliding_accumulator(args)?; @@ -614,12 +456,13 @@ impl AggregateExpr for AggregateFunctionExpr { schema: &self.schema, dfschema: &self.dfschema, ignore_nulls: self.ignore_nulls, - sort_exprs: &self.sort_exprs, + ordering_req: &self.ordering_req, is_distinct: self.is_distinct, input_types: &self.input_types, - input_exprs: &self.logical_args, + // input_exprs: &self.logical_args, name: &self.name, is_reversed: self.is_reversed, + physical_exprs: &self.args, }; self.fun.groups_accumulator_supported(args) } @@ -630,12 +473,12 @@ impl AggregateExpr for AggregateFunctionExpr { schema: &self.schema, dfschema: &self.dfschema, ignore_nulls: self.ignore_nulls, - sort_exprs: &self.sort_exprs, + ordering_req: &self.ordering_req, is_distinct: self.is_distinct, input_types: &self.input_types, - input_exprs: &self.logical_args, name: &self.name, is_reversed: self.is_reversed, + physical_exprs: &self.args, }; self.fun.create_groups_accumulator(args) } @@ -676,14 +519,15 @@ impl AggregateExpr for AggregateFunctionExpr { create_aggregate_expr_with_dfschema( &updated_fn, &self.args, - &self.logical_args, - &self.sort_exprs, + // &self.logical_args, + // &self.sort_exprs, &self.ordering_req, &self.dfschema, self.name(), self.ignore_nulls, self.is_distinct, self.is_reversed, + // physical_exprs: &self.args, ) .map(Some) } @@ -694,18 +538,18 @@ impl AggregateExpr for AggregateFunctionExpr { ReversedUDAF::Identical => Some(Arc::new(self.clone())), ReversedUDAF::Reversed(reverse_udf) => { let reverse_ordering_req = reverse_order_bys(&self.ordering_req); - let reverse_sort_exprs = self - .sort_exprs - .iter() - .map(|e| { - if let Expr::Sort(s) = e { - Expr::Sort(s.reverse()) - } else { - // Expects to receive `Expr::Sort`. - unreachable!() - } - }) - .collect::>(); + // let reverse_sort_exprs = self + // .sort_exprs + // .iter() + // .map(|e| { + // if let Expr::Sort(s) = e { + // Expr::Sort(s.reverse()) + // } else { + // // Expects to receive `Expr::Sort`. + // unreachable!() + // } + // }) + // .collect::>(); let mut name = self.name().to_string(); // If the function is changed, we need to reverse order_by clause as well // i.e. First(a order by b asc null first) -> Last(a order by b desc null last) @@ -717,8 +561,8 @@ impl AggregateExpr for AggregateFunctionExpr { let reverse_aggr = create_aggregate_expr_with_dfschema( &reverse_udf, &self.args, - &self.logical_args, - &reverse_sort_exprs, + // &self.logical_args, + // &reverse_sort_exprs, &reverse_ordering_req, &self.dfschema, name, @@ -786,4 +630,4 @@ fn replace_order_by_clause(order_by: &mut String) { fn replace_fn_name_clause(aggr_name: &mut String, fn_name_old: &str, fn_name_new: &str) { *aggr_name = aggr_name.replace(fn_name_old, fn_name_new); -} +} \ No newline at end of file diff --git a/datafusion/physical-expr-common/src/expressions/mod.rs b/datafusion/expr-functions-aggregate/src/lib.rs similarity index 86% rename from datafusion/physical-expr-common/src/expressions/mod.rs rename to datafusion/expr-functions-aggregate/src/lib.rs index dd534cc07d20e..1e14a97af3b66 100644 --- a/datafusion/physical-expr-common/src/expressions/mod.rs +++ b/datafusion/expr-functions-aggregate/src/lib.rs @@ -15,9 +15,6 @@ // specific language governing permissions and limitations // under the License. -mod cast; -pub mod column; -pub mod literal; +//! Technically, those aggregate functions releated things that has depedency on `expr` should be here -pub use cast::{cast, cast_with_options, CastExpr}; -pub use literal::{lit, Literal}; +pub mod aggregate; diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index 1b6878b6f49e8..b5d34d9a3834f 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -43,7 +43,10 @@ arrow = { workspace = true } arrow-array = { workspace = true } arrow-buffer = { workspace = true } chrono = { workspace = true } -datafusion-common = { workspace = true, default-features = true } +datafusion-common = { workspace = true } +datafusion-expr-common = { workspace = true } +datafusion-functions-aggregate-common = { workspace = true } +datafusion-physical-expr-common = { workspace = true } paste = "^1.0" serde_json = { workspace = true } sqlparser = { workspace = true } diff --git a/datafusion/expr/src/function.rs b/datafusion/expr/src/function.rs index d8be2b4347323..ab784dde7a7cc 100644 --- a/datafusion/expr/src/function.rs +++ b/datafusion/expr/src/function.rs @@ -21,6 +21,8 @@ use crate::ColumnarValue; use crate::{Accumulator, Expr, PartitionEvaluator}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::{DFSchema, Result}; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use std::sync::Arc; #[derive(Debug, Clone, Copy)] @@ -79,7 +81,9 @@ pub struct AccumulatorArgs<'a> { /// ``` /// /// If no `ORDER BY` is specified, `sort_exprs`` will be empty. - pub sort_exprs: &'a [Expr], + // pub sort_exprs: &'a [Expr], + + pub ordering_req: &'a [PhysicalSortExpr], /// Whether the aggregation is running in reverse order pub is_reversed: bool, @@ -98,7 +102,9 @@ pub struct AccumulatorArgs<'a> { pub input_types: &'a [DataType], /// The logical expression of arguments the aggregate function takes. - pub input_exprs: &'a [Expr], + // pub input_exprs: &'a [Expr], + + pub physical_exprs: &'a [Arc], } /// [`StateFieldsArgs`] contains information about the fields that an diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index f5460918fa707..57e701c80dd53 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -27,13 +27,10 @@ //! //! The [expr_fn] module contains functions for creating expressions. -mod accumulator; mod built_in_window_function; -mod columnar_value; mod literal; mod operator; mod partition_evaluator; -mod signature; mod table_source; mod udaf; mod udf; @@ -46,13 +43,20 @@ pub mod expr_fn; pub mod expr_rewriter; pub mod expr_schema; pub mod function; -pub mod groups_accumulator; -pub mod interval_arithmetic; +pub mod groups_accumulator { + pub use datafusion_expr_common::groups_accumulator::*; +} + +pub mod interval_arithmetic { + pub use datafusion_expr_common::interval_arithmetic::*; +} pub mod logical_plan; pub mod planner; pub mod registry; pub mod simplify; -pub mod sort_properties; +pub mod sort_properties { + pub use datafusion_expr_common::sort_properties::*; +} pub mod test; pub mod tree_node; pub mod type_coercion; @@ -62,9 +66,10 @@ pub mod window_frame; pub mod window_function; pub mod window_state; -pub use accumulator::Accumulator; +pub use datafusion_expr_common::type_coercion::binary; +pub use datafusion_expr_common::accumulator::Accumulator; pub use built_in_window_function::BuiltInWindowFunction; -pub use columnar_value::ColumnarValue; +pub use datafusion_expr_common::columnar_value::ColumnarValue; pub use expr::{ Between, BinaryExpr, Case, Cast, Expr, GetFieldAccess, GroupingSet, Like, Sort as SortExpr, TryCast, WindowFunctionDefinition, @@ -75,12 +80,12 @@ pub use function::{ AccumulatorFactoryFunction, PartitionEvaluatorFactory, ReturnTypeFunction, ScalarFunctionImplementation, StateTypeFunction, }; -pub use groups_accumulator::{EmitTo, GroupsAccumulator}; +pub use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; pub use literal::{lit, lit_timestamp_nano, Literal, TimestampLiteral}; pub use logical_plan::*; -pub use operator::Operator; +pub use datafusion_expr_common::operator::Operator; pub use partition_evaluator::PartitionEvaluator; -pub use signature::{ +pub use datafusion_expr_common::signature::{ ArrayFunctionSignature, Signature, TypeSignature, Volatility, TIMEZONE_WILDCARD, }; pub use sqlparser; diff --git a/datafusion/expr/src/operator.rs b/datafusion/expr/src/operator.rs index 9bb8c48d6c71f..35d9ec7c53cb2 100644 --- a/datafusion/expr/src/operator.rs +++ b/datafusion/expr/src/operator.rs @@ -20,273 +20,9 @@ use crate::expr_fn::binary_expr; use crate::Expr; use crate::Like; -use std::fmt; use std::ops; use std::ops::Not; - -/// Operators applied to expressions -#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Hash)] -pub enum Operator { - /// Expressions are equal - Eq, - /// Expressions are not equal - NotEq, - /// Left side is smaller than right side - Lt, - /// Left side is smaller or equal to right side - LtEq, - /// Left side is greater than right side - Gt, - /// Left side is greater or equal to right side - GtEq, - /// Addition - Plus, - /// Subtraction - Minus, - /// Multiplication operator, like `*` - Multiply, - /// Division operator, like `/` - Divide, - /// Remainder operator, like `%` - Modulo, - /// Logical AND, like `&&` - And, - /// Logical OR, like `||` - Or, - /// `IS DISTINCT FROM` (see [`distinct`]) - /// - /// [`distinct`]: arrow::compute::kernels::cmp::distinct - IsDistinctFrom, - /// `IS NOT DISTINCT FROM` (see [`not_distinct`]) - /// - /// [`not_distinct`]: arrow::compute::kernels::cmp::not_distinct - IsNotDistinctFrom, - /// Case sensitive regex match - RegexMatch, - /// Case insensitive regex match - RegexIMatch, - /// Case sensitive regex not match - RegexNotMatch, - /// Case insensitive regex not match - RegexNotIMatch, - /// Case sensitive pattern match - LikeMatch, - /// Case insensitive pattern match - ILikeMatch, - /// Case sensitive pattern not match - NotLikeMatch, - /// Case insensitive pattern not match - NotILikeMatch, - /// Bitwise and, like `&` - BitwiseAnd, - /// Bitwise or, like `|` - BitwiseOr, - /// Bitwise xor, such as `^` in MySQL or `#` in PostgreSQL - BitwiseXor, - /// Bitwise right, like `>>` - BitwiseShiftRight, - /// Bitwise left, like `<<` - BitwiseShiftLeft, - /// String concat - StringConcat, - /// At arrow, like `@>` - AtArrow, - /// Arrow at, like `<@` - ArrowAt, -} - -impl Operator { - /// If the operator can be negated, return the negated operator - /// otherwise return None - pub fn negate(&self) -> Option { - match self { - Operator::Eq => Some(Operator::NotEq), - Operator::NotEq => Some(Operator::Eq), - Operator::Lt => Some(Operator::GtEq), - Operator::LtEq => Some(Operator::Gt), - Operator::Gt => Some(Operator::LtEq), - Operator::GtEq => Some(Operator::Lt), - Operator::IsDistinctFrom => Some(Operator::IsNotDistinctFrom), - Operator::IsNotDistinctFrom => Some(Operator::IsDistinctFrom), - Operator::LikeMatch => Some(Operator::NotLikeMatch), - Operator::ILikeMatch => Some(Operator::NotILikeMatch), - Operator::NotLikeMatch => Some(Operator::LikeMatch), - Operator::NotILikeMatch => Some(Operator::ILikeMatch), - Operator::Plus - | Operator::Minus - | Operator::Multiply - | Operator::Divide - | Operator::Modulo - | Operator::And - | Operator::Or - | Operator::RegexMatch - | Operator::RegexIMatch - | Operator::RegexNotMatch - | Operator::RegexNotIMatch - | Operator::BitwiseAnd - | Operator::BitwiseOr - | Operator::BitwiseXor - | Operator::BitwiseShiftRight - | Operator::BitwiseShiftLeft - | Operator::StringConcat - | Operator::AtArrow - | Operator::ArrowAt => None, - } - } - - /// Return true if the operator is a numerical operator. - /// - /// For example, 'Binary(a, +, b)' would be a numerical expression. - /// PostgresSQL concept: - pub fn is_numerical_operators(&self) -> bool { - matches!( - self, - Operator::Plus - | Operator::Minus - | Operator::Multiply - | Operator::Divide - | Operator::Modulo - ) - } - - /// Return true if the operator is a comparison operator. - /// - /// For example, 'Binary(a, >, b)' would be a comparison expression. - pub fn is_comparison_operator(&self) -> bool { - matches!( - self, - Operator::Eq - | Operator::NotEq - | Operator::Lt - | Operator::LtEq - | Operator::Gt - | Operator::GtEq - | Operator::IsDistinctFrom - | Operator::IsNotDistinctFrom - | Operator::RegexMatch - | Operator::RegexIMatch - | Operator::RegexNotMatch - | Operator::RegexNotIMatch - ) - } - - /// Return true if the operator is a logic operator. - /// - /// For example, 'Binary(Binary(a, >, b), AND, Binary(a, <, b + 3))' would - /// be a logical expression. - pub fn is_logic_operator(&self) -> bool { - matches!(self, Operator::And | Operator::Or) - } - - /// Return the operator where swapping lhs and rhs wouldn't change the result. - /// - /// For example `Binary(50, >=, a)` could also be represented as `Binary(a, <=, 50)`. - pub fn swap(&self) -> Option { - match self { - Operator::Eq => Some(Operator::Eq), - Operator::NotEq => Some(Operator::NotEq), - Operator::Lt => Some(Operator::Gt), - Operator::LtEq => Some(Operator::GtEq), - Operator::Gt => Some(Operator::Lt), - Operator::GtEq => Some(Operator::LtEq), - Operator::AtArrow => Some(Operator::ArrowAt), - Operator::ArrowAt => Some(Operator::AtArrow), - Operator::IsDistinctFrom - | Operator::IsNotDistinctFrom - | Operator::Plus - | Operator::Minus - | Operator::Multiply - | Operator::Divide - | Operator::Modulo - | Operator::And - | Operator::Or - | Operator::RegexMatch - | Operator::RegexIMatch - | Operator::RegexNotMatch - | Operator::RegexNotIMatch - | Operator::LikeMatch - | Operator::ILikeMatch - | Operator::NotLikeMatch - | Operator::NotILikeMatch - | Operator::BitwiseAnd - | Operator::BitwiseOr - | Operator::BitwiseXor - | Operator::BitwiseShiftRight - | Operator::BitwiseShiftLeft - | Operator::StringConcat => None, - } - } - - /// Get the operator precedence - /// use as a reference - pub fn precedence(&self) -> u8 { - match self { - Operator::Or => 5, - Operator::And => 10, - Operator::Eq | Operator::NotEq | Operator::LtEq | Operator::GtEq => 15, - Operator::Lt | Operator::Gt => 20, - Operator::LikeMatch - | Operator::NotLikeMatch - | Operator::ILikeMatch - | Operator::NotILikeMatch => 25, - Operator::IsDistinctFrom - | Operator::IsNotDistinctFrom - | Operator::RegexMatch - | Operator::RegexNotMatch - | Operator::RegexIMatch - | Operator::RegexNotIMatch - | Operator::BitwiseAnd - | Operator::BitwiseOr - | Operator::BitwiseShiftLeft - | Operator::BitwiseShiftRight - | Operator::BitwiseXor - | Operator::StringConcat - | Operator::AtArrow - | Operator::ArrowAt => 30, - Operator::Plus | Operator::Minus => 40, - Operator::Multiply | Operator::Divide | Operator::Modulo => 45, - } - } -} - -impl fmt::Display for Operator { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let display = match &self { - Operator::Eq => "=", - Operator::NotEq => "!=", - Operator::Lt => "<", - Operator::LtEq => "<=", - Operator::Gt => ">", - Operator::GtEq => ">=", - Operator::Plus => "+", - Operator::Minus => "-", - Operator::Multiply => "*", - Operator::Divide => "/", - Operator::Modulo => "%", - Operator::And => "AND", - Operator::Or => "OR", - Operator::RegexMatch => "~", - Operator::RegexIMatch => "~*", - Operator::RegexNotMatch => "!~", - Operator::RegexNotIMatch => "!~*", - Operator::LikeMatch => "~~", - Operator::ILikeMatch => "~~*", - Operator::NotLikeMatch => "!~~", - Operator::NotILikeMatch => "!~~*", - Operator::IsDistinctFrom => "IS DISTINCT FROM", - Operator::IsNotDistinctFrom => "IS NOT DISTINCT FROM", - Operator::BitwiseAnd => "&", - Operator::BitwiseOr => "|", - Operator::BitwiseXor => "BIT_XOR", - Operator::BitwiseShiftRight => ">>", - Operator::BitwiseShiftLeft => "<<", - Operator::StringConcat => "||", - Operator::AtArrow => "@>", - Operator::ArrowAt => "<@", - }; - write!(f, "{display}") - } -} +use datafusion_expr_common::operator::Operator; /// Support ` + ` fluent style impl ops::Add for Expr { diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index ef52a01e0598f..199447189caa7 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use crate::signature::{ +use datafusion_expr_common::signature::{ ArrayFunctionSignature, FIXED_SIZE_LIST_WILDCARD, TIMEZONE_WILDCARD, }; use crate::{AggregateUDF, ScalarUDF, Signature, TypeSignature}; diff --git a/datafusion/expr/src/type_coercion/mod.rs b/datafusion/expr/src/type_coercion/mod.rs index e0d1236aac2d6..3a5c65fb46ee4 100644 --- a/datafusion/expr/src/type_coercion/mod.rs +++ b/datafusion/expr/src/type_coercion/mod.rs @@ -31,11 +31,14 @@ //! i64. However, i64 -> i32 is never performed as there are i64 //! values which can not be represented by i32 values. -pub mod aggregates; -pub mod binary; +pub mod aggregates { + pub use datafusion_expr_common::type_coercion::aggregates::*; +} pub mod functions; pub mod other; +pub use datafusion_expr_common::type_coercion::binary; + use arrow::datatypes::DataType; /// Determine whether the given data type `dt` represents signed numeric values. pub fn is_signed_numeric(dt: &DataType) -> bool { diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 3a292b2b49bfb..3ae2af2762ce9 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -669,4 +669,4 @@ impl AggregateUDFImpl for AggregateUDFLegacyWrapper { fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { (self.accumulator)(acc_args) } -} +} \ No newline at end of file diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 5ba6e3007ead4..06a09dfbd37c4 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -27,7 +27,7 @@ use arrow::datatypes::DataType; use datafusion_common::{not_impl_err, ExprSchema, Result}; use crate::expr::create_name; -use crate::interval_arithmetic::Interval; +use datafusion_expr_common::interval_arithmetic::Interval; use crate::simplify::{ExprSimplifyResult, SimplifyInfo}; use crate::sort_properties::{ExprProperties, SortProperties}; use crate::{ diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 683a8e170ed4d..e0b1f27b709e3 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use crate::expr::{Alias, Sort, WindowFunction}; use crate::expr_rewriter::strip_outer_reference; -use crate::signature::{Signature, TypeSignature}; +use datafusion_expr_common::signature::{Signature, TypeSignature}; use crate::{ and, BinaryExpr, Expr, ExprSchemable, Filter, GroupingSet, LogicalPlan, Operator, }; @@ -40,6 +40,8 @@ use datafusion_common::{ use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem, WildcardAdditionalOptions}; +pub use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity; + /// The value to which `COUNT(*)` is expanded to in /// `COUNT()` expressions pub const COUNT_STAR_EXPANSION: ScalarValue = ScalarValue::Int64(Some(1)); @@ -1217,36 +1219,60 @@ pub fn format_state_name(name: &str, state_name: &str) -> String { format!("{name}[{state_name}]") } -/// Represents the sensitivity of an aggregate expression to ordering. -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub enum AggregateOrderSensitivity { - /// Indicates that the aggregate expression is insensitive to ordering. - /// Ordering at the input is not important for the result of the aggregator. - Insensitive, - /// Indicates that the aggregate expression has a hard requirement on ordering. - /// The aggregator can not produce a correct result unless its ordering - /// requirement is satisfied. - HardRequirement, - /// Indicates that ordering is beneficial for the aggregate expression in terms - /// of evaluation efficiency. The aggregator can produce its result efficiently - /// when its required ordering is satisfied; however, it can still produce the - /// correct result (albeit less efficiently) when its required ordering is not met. - Beneficial, -} - -impl AggregateOrderSensitivity { - pub fn is_insensitive(&self) -> bool { - self.eq(&AggregateOrderSensitivity::Insensitive) - } - - pub fn is_beneficial(&self) -> bool { - self.eq(&AggregateOrderSensitivity::Beneficial) - } - - pub fn hard_requires(&self) -> bool { - self.eq(&AggregateOrderSensitivity::HardRequirement) - } -} +// /// Converts `datafusion_expr::Expr` into corresponding `Arc`. +// /// If conversion is not supported yet, returns Error. +// pub fn limited_convert_logical_expr_to_physical_expr_with_dfschema( +// expr: &Expr, +// dfschema: &DFSchema, +// ) -> Result> { +// match expr { +// Expr::Alias(Alias { expr, .. }) => Ok( +// limited_convert_logical_expr_to_physical_expr_with_dfschema(expr, dfschema)?, +// ), +// Expr::Column(col) => { +// let idx = dfschema.index_of_column(col)?; +// Ok(Arc::new(Column::new(&col.name, idx))) +// } +// Expr::Cast(cast_expr) => Ok(Arc::new(CastExpr::new( +// limited_convert_logical_expr_to_physical_expr_with_dfschema( +// cast_expr.expr.as_ref(), +// dfschema, +// )?, +// cast_expr.data_type.clone(), +// None, +// ))), +// Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))), +// _ => exec_err!( +// "Unsupported expression: {expr} for conversion to Arc" +// ), +// } +// } + +// /// Converts each [`Expr::Sort`] into a corresponding [`PhysicalSortExpr`]. +// /// Returns an error if the given logical expression is not a [`Expr::Sort`]. +// pub fn limited_convert_logical_sort_exprs_to_physical_with_dfschema( +// exprs: &[Expr], +// dfschema: &DFSchema, +// ) -> Result> { +// // Construct PhysicalSortExpr objects from Expr objects: +// let mut sort_exprs = vec![]; +// for expr in exprs { +// let Expr::Sort(sort) = expr else { +// return exec_err!("Expects to receive sort expression"); +// }; +// sort_exprs.push(PhysicalSortExpr::new( +// limited_convert_logical_expr_to_physical_expr_with_dfschema( +// sort.expr.as_ref(), +// dfschema, +// )?, +// SortOptions { +// descending: !sort.asc, +// nulls_first: sort.nulls_first, +// }, +// )) +// } +// Ok(sort_exprs) +// } #[cfg(test)] mod tests { diff --git a/datafusion/functions-aggregate-common/Cargo.toml b/datafusion/functions-aggregate-common/Cargo.toml new file mode 100644 index 0000000000000..2c1119b82a092 --- /dev/null +++ b/datafusion/functions-aggregate-common/Cargo.toml @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-functions-aggregate-common" +description = "Common aggregate function packages for the DataFusion query engine" +keywords = ["datafusion", "logical", "plan", "expressions"] +readme = "README.md" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +authors = { workspace = true } +rust-version = { workspace = true } + +[lints] +workspace = true + +[lib] +name = "datafusion_functions_aggregate_common" +path = "src/lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +ahash = { workspace = true } +arrow = { workspace = true } +datafusion-common = { workspace = true } +datafusion-expr-common = { workspace = true } +# datafusion-expr = { workspace = true } +datafusion-physical-expr-common = { workspace = true } +rand = { workspace = true } +# strum = { version = "0.26.1", features = ["derive"] } +# strum_macros = "0.26.0" \ No newline at end of file diff --git a/datafusion/functions-aggregate-common/src/aggregate.rs b/datafusion/functions-aggregate-common/src/aggregate.rs new file mode 100644 index 0000000000000..37aded510741b --- /dev/null +++ b/datafusion/functions-aggregate-common/src/aggregate.rs @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; +use std::{any::Any, sync::Arc}; + +use arrow::datatypes::Field; + +use datafusion_common::exec_err; +use datafusion_common::{not_impl_err, Result}; +// use datafusion_expr::utils::AggregateOrderSensitivity; +use crate::order::AggregateOrderSensitivity; +use datafusion_expr_common::accumulator:: + Accumulator +; + +use datafusion_expr_common::groups_accumulator::GroupsAccumulator; + +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; + +pub mod count_distinct; +pub mod groups_accumulator; + +/// An aggregate expression that: +/// * knows its resulting field +/// * knows how to create its accumulator +/// * knows its accumulator's state's field +/// * knows the expressions from whose its accumulator will receive values +/// +/// Any implementation of this trait also needs to implement the +/// `PartialEq` to allows comparing equality between the +/// trait objects. +pub trait AggregateExpr: Send + Sync + Debug + PartialEq { + /// Returns the aggregate expression as [`Any`] so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// the field of the final result of this aggregation. + fn field(&self) -> Result; + + /// the accumulator used to accumulate values from the expressions. + /// the accumulator expects the same number of arguments as `expressions` and must + /// return states with the same description as `state_fields` + fn create_accumulator(&self) -> Result>; + + /// the fields that encapsulate the Accumulator's state + /// the number of fields here equals the number of states that the accumulator contains + fn state_fields(&self) -> Result>; + + /// expressions that are passed to the Accumulator. + /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many. + fn expressions(&self) -> Vec>; + + /// Order by requirements for the aggregate function + /// By default it is `None` (there is no requirement) + /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` should implement this + fn order_bys(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + /// Indicates whether aggregator can produce the correct result with any + /// arbitrary input ordering. By default, we assume that aggregate expressions + /// are order insensitive. + fn order_sensitivity(&self) -> AggregateOrderSensitivity { + AggregateOrderSensitivity::Insensitive + } + + /// Sets the indicator whether ordering requirements of the aggregator is + /// satisfied by its input. If this is not the case, aggregators with order + /// sensitivity `AggregateOrderSensitivity::Beneficial` can still produce + /// the correct result with possibly more work internally. + /// + /// # Returns + /// + /// Returns `Ok(Some(updated_expr))` if the process completes successfully. + /// If the expression can benefit from existing input ordering, but does + /// not implement the method, returns an error. Order insensitive and hard + /// requirement aggregators return `Ok(None)`. + fn with_beneficial_ordering( + self: Arc, + _requirement_satisfied: bool, + ) -> Result>> { + if self.order_bys().is_some() && self.order_sensitivity().is_beneficial() { + return exec_err!( + "Should implement with satisfied for aggregator :{:?}", + self.name() + ); + } + Ok(None) + } + + /// Human readable name such as `"MIN(c2)"`. The default + /// implementation returns placeholder text. + fn name(&self) -> &str { + "AggregateExpr: default name" + } + + /// If the aggregate expression has a specialized + /// [`GroupsAccumulator`] implementation. If this returns true, + /// `[Self::create_groups_accumulator`] will be called. + fn groups_accumulator_supported(&self) -> bool { + false + } + + /// Return a specialized [`GroupsAccumulator`] that manages state + /// for all groups. + /// + /// For maximum performance, a [`GroupsAccumulator`] should be + /// implemented in addition to [`Accumulator`]. + fn create_groups_accumulator(&self) -> Result> { + not_impl_err!("GroupsAccumulator hasn't been implemented for {self:?} yet") + } + + /// Construct an expression that calculates the aggregate in reverse. + /// Typically the "reverse" expression is itself (e.g. SUM, COUNT). + /// For aggregates that do not support calculation in reverse, + /// returns None (which is the default value). + fn reverse_expr(&self) -> Option> { + None + } + + /// Creates accumulator implementation that supports retract + fn create_sliding_accumulator(&self) -> Result> { + not_impl_err!("Retractable Accumulator hasn't been implemented for {self:?} yet") + } + + /// Returns all expressions used in the [`AggregateExpr`]. + /// These expressions are (1)function arguments, (2) order by expressions. + fn all_expressions(&self) -> AggregatePhysicalExpressions { + let args = self.expressions(); + let order_bys = self.order_bys().unwrap_or(&[]); + let order_by_exprs = order_bys + .iter() + .map(|sort_expr| sort_expr.expr.clone()) + .collect::>(); + AggregatePhysicalExpressions { + args, + order_by_exprs, + } + } + + /// Rewrites [`AggregateExpr`], with new expressions given. The argument should be consistent + /// with the return value of the [`AggregateExpr::all_expressions`] method. + /// Returns `Some(Arc)` if re-write is supported, otherwise returns `None`. + /// TODO: This method only rewrites the [`PhysicalExpr`]s and does not handle [`Expr`]s. + /// This can cause silent bugs and should be fixed in the future (possibly with physical-to-logical + /// conversions). + fn with_new_expressions( + &self, + _args: Vec>, + _order_by_exprs: Vec>, + ) -> Option> { + None + } + + /// If this function is max, return (output_field, true) + /// if the function is min, return (output_field, false) + /// otherwise return None (the default) + /// + /// output_field is the name of the column produced by this aggregate + /// + /// Note: this is used to use special aggregate implementations in certain conditions + fn get_minmax_desc(&self) -> Option<(Field, bool)> { + None + } +} + +/// Stores the physical expressions used inside the `AggregateExpr`. +pub struct AggregatePhysicalExpressions { + /// Aggregate function arguments + pub args: Vec>, + /// Order by expressions + pub order_by_exprs: Vec>, +} + diff --git a/datafusion/physical-expr-common/src/aggregate/count_distinct/mod.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs similarity index 100% rename from datafusion/physical-expr-common/src/aggregate/count_distinct/mod.rs rename to datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs diff --git a/datafusion/physical-expr-common/src/aggregate/count_distinct/bytes.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/bytes.rs similarity index 95% rename from datafusion/physical-expr-common/src/aggregate/count_distinct/bytes.rs rename to datafusion/functions-aggregate-common/src/aggregate/count_distinct/bytes.rs index 360d64ce01419..ee61128979e10 100644 --- a/datafusion/physical-expr-common/src/aggregate/count_distinct/bytes.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/bytes.rs @@ -17,13 +17,13 @@ //! [`BytesDistinctCountAccumulator`] for Utf8/LargeUtf8/Binary/LargeBinary values -use crate::binary_map::{ArrowBytesSet, OutputType}; -use crate::binary_view_map::ArrowBytesViewSet; use arrow::array::{ArrayRef, OffsetSizeTrait}; use datafusion_common::cast::as_list_array; use datafusion_common::utils::array_into_list_array_nullable; use datafusion_common::ScalarValue; -use datafusion_expr::Accumulator; +use datafusion_expr_common::accumulator::Accumulator; +use datafusion_physical_expr_common::binary_map::{ArrowBytesSet, OutputType}; +use datafusion_physical_expr_common::binary_view_map::ArrowBytesViewSet; use std::fmt::Debug; use std::sync::Arc; diff --git a/datafusion/physical-expr-common/src/aggregate/count_distinct/native.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs similarity index 98% rename from datafusion/physical-expr-common/src/aggregate/count_distinct/native.rs rename to datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs index e525118b9a177..d128a8af58eef 100644 --- a/datafusion/physical-expr-common/src/aggregate/count_distinct/native.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs @@ -35,9 +35,9 @@ use datafusion_common::cast::{as_list_array, as_primitive_array}; use datafusion_common::utils::array_into_list_array_nullable; use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::ScalarValue; -use datafusion_expr::Accumulator; +use datafusion_expr_common::accumulator::Accumulator; -use crate::aggregate::utils::Hashable; +use crate::utils::Hashable; #[derive(Debug)] pub struct PrimitiveDistinctCountAccumulator diff --git a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs similarity index 100% rename from datafusion/physical-expr-common/src/aggregate/groups_accumulator/mod.rs rename to datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs diff --git a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs similarity index 99% rename from datafusion/physical-expr-common/src/aggregate/groups_accumulator/accumulate.rs rename to datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index 3fcd570f514e5..18e497c63eb02 100644 --- a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -23,7 +23,7 @@ use arrow::array::{Array, BooleanArray, BooleanBufferBuilder, PrimitiveArray}; use arrow::buffer::{BooleanBuffer, NullBuffer}; use arrow::datatypes::ArrowPrimitiveType; -use datafusion_expr::EmitTo; +use datafusion_expr_common::groups_accumulator::EmitTo; /// Track the accumulator null state per row: if any values for that /// group were null and if any values have been seen at all for that group. /// diff --git a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/bool_op.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs similarity index 98% rename from datafusion/physical-expr-common/src/aggregate/groups_accumulator/bool_op.rs rename to datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs index 8498d69dd333f..be2b5e48a8db9 100644 --- a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/bool_op.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use arrow::array::{ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder}; use arrow::buffer::BooleanBuffer; use datafusion_common::Result; -use datafusion_expr::{EmitTo, GroupsAccumulator}; +use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; use super::accumulate::NullState; diff --git a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs similarity index 98% rename from datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs rename to datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs index 8d69646bd422a..77828a59c3e7b 100644 --- a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs @@ -23,7 +23,7 @@ use arrow::compute; use arrow::datatypes::ArrowPrimitiveType; use arrow::datatypes::DataType; use datafusion_common::{internal_datafusion_err, DataFusionError, Result}; -use datafusion_expr::{EmitTo, GroupsAccumulator}; +use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; use super::accumulate::NullState; diff --git a/datafusion/functions-aggregate-common/src/lib.rs b/datafusion/functions-aggregate-common/src/lib.rs new file mode 100644 index 0000000000000..43bc156f803e8 --- /dev/null +++ b/datafusion/functions-aggregate-common/src/lib.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`AggregateUDF`]: User Defined Aggregate Functions + +pub mod aggregate; +pub mod merge_arrays; +pub mod order; +pub mod stats; +pub mod tdigest; +pub mod utils; diff --git a/datafusion/physical-expr-common/src/aggregate/merge_arrays.rs b/datafusion/functions-aggregate-common/src/merge_arrays.rs similarity index 100% rename from datafusion/physical-expr-common/src/aggregate/merge_arrays.rs rename to datafusion/functions-aggregate-common/src/merge_arrays.rs diff --git a/datafusion/functions-aggregate-common/src/order.rs b/datafusion/functions-aggregate-common/src/order.rs new file mode 100644 index 0000000000000..4d7e604687b88 --- /dev/null +++ b/datafusion/functions-aggregate-common/src/order.rs @@ -0,0 +1,30 @@ +/// Represents the sensitivity of an aggregate expression to ordering. +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum AggregateOrderSensitivity { + /// Indicates that the aggregate expression is insensitive to ordering. + /// Ordering at the input is not important for the result of the aggregator. + Insensitive, + /// Indicates that the aggregate expression has a hard requirement on ordering. + /// The aggregator can not produce a correct result unless its ordering + /// requirement is satisfied. + HardRequirement, + /// Indicates that ordering is beneficial for the aggregate expression in terms + /// of evaluation efficiency. The aggregator can produce its result efficiently + /// when its required ordering is satisfied; however, it can still produce the + /// correct result (albeit less efficiently) when its required ordering is not met. + Beneficial, +} + +impl AggregateOrderSensitivity { + pub fn is_insensitive(&self) -> bool { + self.eq(&AggregateOrderSensitivity::Insensitive) + } + + pub fn is_beneficial(&self) -> bool { + self.eq(&AggregateOrderSensitivity::Beneficial) + } + + pub fn hard_requires(&self) -> bool { + self.eq(&AggregateOrderSensitivity::HardRequirement) + } +} diff --git a/datafusion/physical-expr-common/src/aggregate/stats.rs b/datafusion/functions-aggregate-common/src/stats.rs similarity index 100% rename from datafusion/physical-expr-common/src/aggregate/stats.rs rename to datafusion/functions-aggregate-common/src/stats.rs diff --git a/datafusion/physical-expr-common/src/aggregate/tdigest.rs b/datafusion/functions-aggregate-common/src/tdigest.rs similarity index 100% rename from datafusion/physical-expr-common/src/aggregate/tdigest.rs rename to datafusion/functions-aggregate-common/src/tdigest.rs diff --git a/datafusion/physical-expr-common/src/aggregate/utils.rs b/datafusion/functions-aggregate-common/src/utils.rs similarity index 98% rename from datafusion/physical-expr-common/src/aggregate/utils.rs rename to datafusion/functions-aggregate-common/src/utils.rs index 9e380bd820ff4..7b8ce0397af83 100644 --- a/datafusion/physical-expr-common/src/aggregate/utils.rs +++ b/datafusion/functions-aggregate-common/src/utils.rs @@ -29,11 +29,10 @@ use arrow::{ }, }; use datafusion_common::{exec_err, DataFusionError, Result}; -use datafusion_expr::Accumulator; +use datafusion_expr_common::accumulator::Accumulator; +use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; -use crate::sort_expr::PhysicalSortExpr; - -use super::AggregateExpr; +use crate::aggregate::AggregateExpr; /// Downcast a `Box` or `Arc` /// and return the inner trait object as [`Any`] so diff --git a/datafusion/functions-aggregate/Cargo.toml b/datafusion/functions-aggregate/Cargo.toml index 4f2bd864832e3..9e7476e81d053 100644 --- a/datafusion/functions-aggregate/Cargo.toml +++ b/datafusion/functions-aggregate/Cargo.toml @@ -45,6 +45,8 @@ datafusion-common = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-physical-expr-common = { workspace = true } +datafusion-physical-expr = { workspace = true } +datafusion-functions-aggregate-common = { workspace = true } log = { workspace = true } paste = "1.0.14" sqlparser = { workspace = true } diff --git a/datafusion/functions-aggregate/benches/count.rs b/datafusion/functions-aggregate/benches/count.rs index 875112ca8d47d..fa1051e945b1f 100644 --- a/datafusion/functions-aggregate/benches/count.rs +++ b/datafusion/functions-aggregate/benches/count.rs @@ -23,6 +23,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use datafusion_common::DFSchema; use datafusion_expr::{function::AccumulatorArgs, AggregateUDFImpl, GroupsAccumulator}; use datafusion_functions_aggregate::count::Count; +use datafusion_physical_expr::expressions::col; use std::sync::Arc; fn prepare_accumulator() -> Box { @@ -33,12 +34,12 @@ fn prepare_accumulator() -> Box { schema: &schema, dfschema: &df_schema, ignore_nulls: false, - sort_exprs: &[], + ordering_req: &[], is_reversed: false, name: "COUNT(f)", is_distinct: false, input_types: &[DataType::Int32], - input_exprs: &[datafusion_expr::col("f")], + physical_exprs: &[col("f", &schema).unwrap()], }; let count_fn = Count::new(); diff --git a/datafusion/functions-aggregate/benches/sum.rs b/datafusion/functions-aggregate/benches/sum.rs index dfaa93cdeff76..5eb42bfa46478 100644 --- a/datafusion/functions-aggregate/benches/sum.rs +++ b/datafusion/functions-aggregate/benches/sum.rs @@ -23,6 +23,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use datafusion_common::DFSchema; use datafusion_expr::{function::AccumulatorArgs, AggregateUDFImpl, GroupsAccumulator}; use datafusion_functions_aggregate::sum::Sum; +use datafusion_physical_expr::expressions::col; use std::sync::Arc; fn prepare_accumulator(data_type: &DataType) -> Box { @@ -33,12 +34,12 @@ fn prepare_accumulator(data_type: &DataType) -> Box { schema: &schema, dfschema: &df_schema, ignore_nulls: false, - sort_exprs: &[], + ordering_req: &[], is_reversed: false, name: "SUM(f)", is_distinct: false, input_types: &[data_type.clone()], - input_exprs: &[datafusion_expr::col("f")], + physical_exprs: &[col("f", &schema).unwrap()] }; let sum_fn = Sum::new(); diff --git a/datafusion/functions-aggregate/src/approx_percentile_cont.rs b/datafusion/functions-aggregate/src/approx_percentile_cont.rs index af2a26fd05ece..3c0ff92f0f4e3 100644 --- a/datafusion/functions-aggregate/src/approx_percentile_cont.rs +++ b/datafusion/functions-aggregate/src/approx_percentile_cont.rs @@ -31,9 +31,9 @@ use arrow::{ use arrow_schema::{Field, Schema}; use datafusion_common::{ - downcast_value, internal_err, not_impl_err, plan_err, DFSchema, DataFusionError, - ScalarValue, + downcast_value, exec_err, internal_err, not_impl_err, plan_err, DFSchema, DataFusionError, ScalarValue, Result }; +use datafusion_expr::expr::Alias; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::type_coercion::aggregates::{INTEGERS, NUMERICS}; use datafusion_expr::utils::format_state_name; @@ -41,10 +41,11 @@ use datafusion_expr::{ Accumulator, AggregateUDFImpl, ColumnarValue, Expr, Signature, TypeSignature, Volatility, }; -use datafusion_physical_expr_common::aggregate::tdigest::{ +use datafusion_functions_aggregate_common::tdigest::{ TDigest, TryIntoF64, DEFAULT_MAX_SIZE, }; -use datafusion_physical_expr_common::utils::limited_convert_logical_expr_to_physical_expr_with_dfschema; +use datafusion_physical_expr::expressions::{CastExpr, Column, Literal}; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; make_udaf_expr_and_func!( ApproxPercentileCont, @@ -98,9 +99,9 @@ impl ApproxPercentileCont { &self, args: AccumulatorArgs, ) -> datafusion_common::Result { - let percentile = validate_input_percentile_expr(&args.input_exprs[1])?; - let tdigest_max_size = if args.input_exprs.len() == 3 { - Some(validate_input_max_size_expr(&args.input_exprs[2])?) + let percentile = validate_input_percentile_expr(&args.physical_exprs[1])?; + let tdigest_max_size = if args.physical_exprs.len() == 3 { + Some(validate_input_max_size_expr(&args.physical_exprs[2])?) } else { None }; @@ -134,12 +135,44 @@ impl ApproxPercentileCont { } } -fn get_lit_value(expr: &Expr) -> datafusion_common::Result { + +/// Converts `datafusion_expr::Expr` into corresponding `Arc`. +/// If conversion is not supported yet, returns Error. +fn limited_convert_logical_expr_to_physical_expr_with_dfschema( + expr: &Expr, + dfschema: &DFSchema, +) -> Result> { + match expr { + Expr::Alias(Alias { expr, .. }) => Ok( + limited_convert_logical_expr_to_physical_expr_with_dfschema(expr, dfschema)?, + ), + Expr::Column(col) => { + let idx = dfschema.index_of_column(col)?; + Ok(Arc::new(Column::new(&col.name, idx))) + } + Expr::Cast(cast_expr) => Ok(Arc::new(CastExpr::new( + limited_convert_logical_expr_to_physical_expr_with_dfschema( + cast_expr.expr.as_ref(), + dfschema, + )?, + cast_expr.data_type.clone(), + None, + ))), + Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))), + _ => exec_err!( + "Unsupported expression: {expr} for conversion to Arc" + ), + } +} + + +fn get_lit_value(expr: &Arc) -> datafusion_common::Result { + // TODO: use real schema let empty_schema = Arc::new(Schema::empty()); let empty_batch = RecordBatch::new_empty(Arc::clone(&empty_schema)); - let dfschema = DFSchema::empty(); - let expr = - limited_convert_logical_expr_to_physical_expr_with_dfschema(expr, &dfschema)?; + // let dfschema = DFSchema::empty(); + // let expr = + // limited_convert_logical_expr_to_physical_expr_with_dfschema(expr, &dfschema)?; let result = expr.evaluate(&empty_batch)?; match result { ColumnarValue::Array(_) => Err(DataFusionError::Internal(format!( @@ -150,7 +183,7 @@ fn get_lit_value(expr: &Expr) -> datafusion_common::Result { } } -fn validate_input_percentile_expr(expr: &Expr) -> datafusion_common::Result { +fn validate_input_percentile_expr(expr: &Arc) -> datafusion_common::Result { let lit = get_lit_value(expr)?; let percentile = match &lit { ScalarValue::Float32(Some(q)) => *q as f64, @@ -170,7 +203,7 @@ fn validate_input_percentile_expr(expr: &Expr) -> datafusion_common::Result Ok(percentile) } -fn validate_input_max_size_expr(expr: &Expr) -> datafusion_common::Result { +fn validate_input_max_size_expr(expr: &Arc) -> datafusion_common::Result { let lit = get_lit_value(expr)?; let max_size = match &lit { ScalarValue::UInt8(Some(q)) => *q as usize, @@ -464,7 +497,7 @@ impl Accumulator for ApproxPercentileAccumulator { mod tests { use arrow_schema::DataType; - use datafusion_physical_expr_common::aggregate::tdigest::TDigest; + use datafusion_functions_aggregate_common::tdigest::TDigest; use crate::approx_percentile_cont::ApproxPercentileAccumulator; diff --git a/datafusion/functions-aggregate/src/approx_percentile_cont_with_weight.rs b/datafusion/functions-aggregate/src/approx_percentile_cont_with_weight.rs index 0dbea1fb1ff79..917da47ed669d 100644 --- a/datafusion/functions-aggregate/src/approx_percentile_cont_with_weight.rs +++ b/datafusion/functions-aggregate/src/approx_percentile_cont_with_weight.rs @@ -17,6 +17,7 @@ use std::any::Any; use std::fmt::{Debug, Formatter}; +use std::sync::Arc; use arrow::{ array::ArrayRef, @@ -29,7 +30,7 @@ use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::type_coercion::aggregates::NUMERICS; use datafusion_expr::Volatility::Immutable; use datafusion_expr::{Accumulator, AggregateUDFImpl, Signature, TypeSignature}; -use datafusion_physical_expr_common::aggregate::tdigest::{ +use datafusion_functions_aggregate_common::tdigest::{ Centroid, TDigest, DEFAULT_MAX_SIZE, }; @@ -123,16 +124,16 @@ impl AggregateUDFImpl for ApproxPercentileContWithWeight { ); } - if acc_args.input_exprs.len() != 3 { + if acc_args.physical_exprs.len() != 3 { return plan_err!( "approx_percentile_cont_with_weight requires three arguments: value, weight, percentile" ); } let sub_args = AccumulatorArgs { - input_exprs: &[ - acc_args.input_exprs[0].clone(), - acc_args.input_exprs[2].clone(), + physical_exprs: &[ + Arc::clone(&acc_args.physical_exprs[0]), + Arc::clone(&acc_args.physical_exprs[2]), ], ..acc_args }; diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 36c9d6a0d7c81..b6de9d0c7eb1b 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -29,10 +29,10 @@ use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::utils::format_state_name; use datafusion_expr::AggregateUDFImpl; use datafusion_expr::{Accumulator, Signature, Volatility}; -use datafusion_physical_expr_common::aggregate::merge_arrays::merge_ordered_arrays; -use datafusion_physical_expr_common::aggregate::utils::ordering_fields; +use datafusion_functions_aggregate_common::merge_arrays::merge_ordered_arrays; +use datafusion_functions_aggregate_common::utils::ordering_fields; use datafusion_physical_expr_common::sort_expr::{ - limited_convert_logical_sort_exprs_to_physical_with_dfschema, LexOrdering, + LexOrdering, PhysicalSortExpr, }; use std::collections::{HashSet, VecDeque}; @@ -123,18 +123,13 @@ impl AggregateUDFImpl for ArrayAgg { )?)); } - if acc_args.sort_exprs.is_empty() { + if acc_args.ordering_req.is_empty() { return Ok(Box::new(ArrayAggAccumulator::try_new( &acc_args.input_types[0], )?)); } - let ordering_req = limited_convert_logical_sort_exprs_to_physical_with_dfschema( - acc_args.sort_exprs, - acc_args.dfschema, - )?; - - let ordering_dtypes = ordering_req + let ordering_dtypes = acc_args.ordering_req .iter() .map(|e| e.expr.data_type(acc_args.schema)) .collect::>>()?; @@ -142,7 +137,7 @@ impl AggregateUDFImpl for ArrayAgg { OrderSensitiveArrayAggAccumulator::try_new( &acc_args.input_types[0], &ordering_dtypes, - ordering_req, + acc_args.ordering_req.to_vec(), acc_args.is_reversed, ) .map(|acc| Box::new(acc) as _) diff --git a/datafusion/functions-aggregate/src/average.rs b/datafusion/functions-aggregate/src/average.rs index 228bce1979a38..259520c0dfa74 100644 --- a/datafusion/functions-aggregate/src/average.rs +++ b/datafusion/functions-aggregate/src/average.rs @@ -34,8 +34,8 @@ use datafusion_expr::Volatility::Immutable; use datafusion_expr::{ Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature, }; -use datafusion_physical_expr_common::aggregate::groups_accumulator::accumulate::NullState; -use datafusion_physical_expr_common::aggregate::utils::DecimalAverager; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::NullState; +use datafusion_functions_aggregate_common::utils::DecimalAverager; use log::debug; use std::any::Any; use std::fmt::Debug; diff --git a/datafusion/functions-aggregate/src/bit_and_or_xor.rs b/datafusion/functions-aggregate/src/bit_and_or_xor.rs index f6dd0bc20a831..b6b1862d281bb 100644 --- a/datafusion/functions-aggregate/src/bit_and_or_xor.rs +++ b/datafusion/functions-aggregate/src/bit_and_or_xor.rs @@ -38,7 +38,7 @@ use datafusion_expr::{ Accumulator, AggregateUDFImpl, GroupsAccumulator, ReversedUDAF, Signature, Volatility, }; -use datafusion_physical_expr_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; use std::ops::{BitAndAssign, BitOrAssign, BitXorAssign}; /// This macro helps create group accumulators based on bitwise operations typically used internally diff --git a/datafusion/functions-aggregate/src/bool_and_or.rs b/datafusion/functions-aggregate/src/bool_and_or.rs index d0028672743ed..c9d3d1d8c6b45 100644 --- a/datafusion/functions-aggregate/src/bool_and_or.rs +++ b/datafusion/functions-aggregate/src/bool_and_or.rs @@ -35,7 +35,7 @@ use datafusion_expr::{ Accumulator, AggregateUDFImpl, GroupsAccumulator, ReversedUDAF, Signature, Volatility, }; -use datafusion_physical_expr_common::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::bool_op::BooleanGroupsAccumulator; // returns the new value after bool_and/bool_or with the new values, taking nullability into account macro_rules! typed_bool_and_or_batch { diff --git a/datafusion/functions-aggregate/src/correlation.rs b/datafusion/functions-aggregate/src/correlation.rs index c2d7a89081d66..88f01b06d2d9b 100644 --- a/datafusion/functions-aggregate/src/correlation.rs +++ b/datafusion/functions-aggregate/src/correlation.rs @@ -36,7 +36,7 @@ use datafusion_expr::{ utils::format_state_name, Accumulator, AggregateUDFImpl, Signature, Volatility, }; -use datafusion_physical_expr_common::aggregate::stats::StatsType; +use datafusion_functions_aggregate_common::stats::StatsType; make_udaf_expr_and_func!( Correlation, diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index aea05442536ee..f6296e1cd41b5 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -16,7 +16,7 @@ // under the License. use ahash::RandomState; -use datafusion_physical_expr_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator; +use datafusion_functions_aggregate_common::aggregate::count_distinct::BytesViewDistinctCountAccumulator; use std::collections::HashSet; use std::ops::BitAnd; use std::{fmt::Debug, sync::Arc}; @@ -47,14 +47,14 @@ use datafusion_expr::{ EmitTo, GroupsAccumulator, Signature, Volatility, }; use datafusion_expr::{Expr, ReversedUDAF, TypeSignature}; -use datafusion_physical_expr_common::aggregate::groups_accumulator::accumulate::accumulate_indices; -use datafusion_physical_expr_common::{ +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::accumulate_indices; +use datafusion_functions_aggregate_common::{ aggregate::count_distinct::{ BytesDistinctCountAccumulator, FloatDistinctCountAccumulator, PrimitiveDistinctCountAccumulator, }, - binary_map::OutputType, }; +use datafusion_physical_expr_common::binary_map::OutputType; make_udaf_expr_and_func!( Count, @@ -145,7 +145,7 @@ impl AggregateUDFImpl for Count { return Ok(Box::new(CountAccumulator::new())); } - if acc_args.input_exprs.len() > 1 { + if acc_args.physical_exprs.len() > 1 { return not_impl_err!("COUNT DISTINCT with multiple arguments"); } @@ -271,7 +271,7 @@ impl AggregateUDFImpl for Count { if args.is_distinct { return false; } - args.input_exprs.len() == 1 + args.physical_exprs.len() == 1 } fn create_groups_accumulator( diff --git a/datafusion/functions-aggregate/src/covariance.rs b/datafusion/functions-aggregate/src/covariance.rs index 6f03b256fd9f7..d0abb079ef155 100644 --- a/datafusion/functions-aggregate/src/covariance.rs +++ b/datafusion/functions-aggregate/src/covariance.rs @@ -35,7 +35,7 @@ use datafusion_expr::{ utils::format_state_name, Accumulator, AggregateUDFImpl, Signature, Volatility, }; -use datafusion_physical_expr_common::aggregate::stats::StatsType; +use datafusion_functions_aggregate_common::stats::StatsType; make_udaf_expr_and_func!( CovarianceSample, diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 587767b8e356a..91c23164e6b67 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -34,9 +34,9 @@ use datafusion_expr::{ Accumulator, AggregateUDFImpl, ArrayFunctionSignature, Expr, ExprFunctionExt, Signature, TypeSignature, Volatility, }; -use datafusion_physical_expr_common::aggregate::utils::get_sort_options; +use datafusion_functions_aggregate_common::utils::get_sort_options; use datafusion_physical_expr_common::sort_expr::{ - limited_convert_logical_sort_exprs_to_physical_with_dfschema, LexOrdering, + LexOrdering, PhysicalSortExpr, }; @@ -117,24 +117,19 @@ impl AggregateUDFImpl for FirstValue { } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { - let ordering_req = limited_convert_logical_sort_exprs_to_physical_with_dfschema( - acc_args.sort_exprs, - acc_args.dfschema, - )?; - - let ordering_dtypes = ordering_req + let ordering_dtypes = acc_args.ordering_req .iter() .map(|e| e.expr.data_type(acc_args.schema)) .collect::>>()?; // When requirement is empty, or it is signalled by outside caller that // the ordering requirement is/will be satisfied. - let requirement_satisfied = ordering_req.is_empty() || self.requirement_satisfied; + let requirement_satisfied = acc_args.ordering_req.is_empty() || self.requirement_satisfied; FirstValueAccumulator::try_new( acc_args.data_type, &ordering_dtypes, - ordering_req, + acc_args.ordering_req.to_vec(), acc_args.ignore_nulls, ) .map(|acc| Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _) @@ -416,22 +411,17 @@ impl AggregateUDFImpl for LastValue { } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { - let ordering_req = limited_convert_logical_sort_exprs_to_physical_with_dfschema( - acc_args.sort_exprs, - acc_args.dfschema, - )?; - - let ordering_dtypes = ordering_req + let ordering_dtypes = acc_args.ordering_req .iter() .map(|e| e.expr.data_type(acc_args.schema)) .collect::>>()?; - let requirement_satisfied = ordering_req.is_empty() || self.requirement_satisfied; + let requirement_satisfied = acc_args.ordering_req.is_empty() || self.requirement_satisfied; LastValueAccumulator::try_new( acc_args.data_type, &ordering_dtypes, - ordering_req, + acc_args.ordering_req.to_vec(), acc_args.ignore_nulls, ) .map(|acc| Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _) diff --git a/datafusion/functions-aggregate/src/median.rs b/datafusion/functions-aggregate/src/median.rs index febf1fcd2fefb..9a70591e18cf3 100644 --- a/datafusion/functions-aggregate/src/median.rs +++ b/datafusion/functions-aggregate/src/median.rs @@ -38,7 +38,7 @@ use datafusion_expr::{ function::AccumulatorArgs, utils::format_state_name, Accumulator, AggregateUDFImpl, Signature, Volatility, }; -use datafusion_physical_expr_common::aggregate::utils::Hashable; +use datafusion_functions_aggregate_common::utils::Hashable; make_udaf_expr_and_func!( Median, diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index 4d743983411dc..41513cf24c68e 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -49,7 +49,7 @@ use arrow::datatypes::{ }; use arrow_schema::IntervalUnit; use datafusion_common::{downcast_value, internal_err, DataFusionError, Result}; -use datafusion_physical_expr_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; use std::fmt::Debug; use arrow::datatypes::i256; diff --git a/datafusion/functions-aggregate/src/nth_value.rs b/datafusion/functions-aggregate/src/nth_value.rs index dc7c6c86f2130..247e74a0833db 100644 --- a/datafusion/functions-aggregate/src/nth_value.rs +++ b/datafusion/functions-aggregate/src/nth_value.rs @@ -30,12 +30,13 @@ use datafusion_common::{exec_err, internal_err, not_impl_err, Result, ScalarValu use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::utils::format_state_name; use datafusion_expr::{ - Accumulator, AggregateUDFImpl, Expr, ReversedUDAF, Signature, Volatility, + Accumulator, AggregateUDFImpl, ReversedUDAF, Signature, Volatility, }; -use datafusion_physical_expr_common::aggregate::merge_arrays::merge_ordered_arrays; -use datafusion_physical_expr_common::aggregate::utils::ordering_fields; +use datafusion_functions_aggregate_common::merge_arrays::merge_ordered_arrays; +use datafusion_functions_aggregate_common::utils::ordering_fields; +use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr_common::sort_expr::{ - limited_convert_logical_sort_exprs_to_physical_with_dfschema, LexOrdering, + LexOrdering, PhysicalSortExpr, }; @@ -87,27 +88,51 @@ impl AggregateUDFImpl for NthValueAgg { } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { - let n = match acc_args.input_exprs[1] { - Expr::Literal(ScalarValue::Int64(Some(value))) => { + // TODO: simplfiy this + let n = if let Some(lit) = acc_args.physical_exprs[1].as_any().downcast_ref::() { + if let ScalarValue::Int64(Some(value)) = lit.value() { if acc_args.is_reversed { - Ok(-value) + -*value } else { - Ok(value) + *value } } - _ => not_impl_err!( + else { + return not_impl_err!( "{} not supported for n: {}", self.name(), - &acc_args.input_exprs[1] - ), - }?; - - let ordering_req = limited_convert_logical_sort_exprs_to_physical_with_dfschema( - acc_args.sort_exprs, - acc_args.dfschema, - )?; + &acc_args.physical_exprs[1]) + } + } else { + return not_impl_err!( + "{} not supported for n: {}", + self.name(), + &acc_args.physical_exprs[1]) + }; - let ordering_dtypes = ordering_req + // let n = match acc_args.physical_exprs[1] { + + + // Expr::Literal(ScalarValue::Int64(Some(value))) => { + // if acc_args.is_reversed { + // Ok(-value) + // } else { + // Ok(value) + // } + // } + // _ => not_impl_err!( + // "{} not supported for n: {}", + // self.name(), + // &acc_args.physical_exprs[1] + // ), + // }?; + + // let ordering_req = limited_convert_logical_sort_exprs_to_physical_with_dfschema( + // acc_args.sort_exprs, + // acc_args.dfschema, + // )?; + + let ordering_dtypes = acc_args.ordering_req .iter() .map(|e| e.expr.data_type(acc_args.schema)) .collect::>>()?; @@ -116,7 +141,7 @@ impl AggregateUDFImpl for NthValueAgg { n, &acc_args.input_types[0], &ordering_dtypes, - ordering_req, + acc_args.ordering_req.to_vec(), ) .map(|acc| Box::new(acc) as _) } diff --git a/datafusion/functions-aggregate/src/stddev.rs b/datafusion/functions-aggregate/src/stddev.rs index df757ddc04226..a04e191d3f0e5 100644 --- a/datafusion/functions-aggregate/src/stddev.rs +++ b/datafusion/functions-aggregate/src/stddev.rs @@ -27,7 +27,7 @@ use datafusion_common::{plan_err, ScalarValue}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::utils::format_state_name; use datafusion_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility}; -use datafusion_physical_expr_common::aggregate::stats::StatsType; +use datafusion_functions_aggregate_common::stats::StatsType; use crate::variance::VarianceAccumulator; @@ -275,8 +275,8 @@ mod tests { use datafusion_common::DFSchema; use datafusion_expr::AggregateUDF; - use datafusion_physical_expr_common::aggregate::utils::get_accum_scalar_values_as_arrays; - use datafusion_physical_expr_common::expressions::column::col; + use datafusion_functions_aggregate_common::utils::get_accum_scalar_values_as_arrays; + use datafusion_physical_expr::expressions::col; use super::*; @@ -331,12 +331,13 @@ mod tests { schema, dfschema: &dfschema, ignore_nulls: false, - sort_exprs: &[], + ordering_req: &[], name: "a", is_distinct: false, is_reversed: false, input_types: &[DataType::Float64], - input_exprs: &[datafusion_expr::col("a")], + physical_exprs: &[col("a", schema)?], + // input_exprs: &[datafusion_expr::col("a")], }; let args2 = AccumulatorArgs { @@ -344,12 +345,14 @@ mod tests { schema, dfschema: &dfschema, ignore_nulls: false, - sort_exprs: &[], + // sort_exprs: &[], + ordering_req: &[], name: "a", is_distinct: false, is_reversed: false, input_types: &[DataType::Float64], - input_exprs: &[datafusion_expr::col("a")], + physical_exprs: &[col("a", schema)?], + // input_exprs: &[datafusion_expr::col("a")], }; let mut accum1 = agg1.accumulator(args1)?; diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index 5d91a52bc4c65..24f8aa2c745f5 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -24,8 +24,9 @@ use datafusion_common::Result; use datafusion_common::{not_impl_err, ScalarValue}; use datafusion_expr::function::AccumulatorArgs; use datafusion_expr::{ - Accumulator, AggregateUDFImpl, Expr, Signature, TypeSignature, Volatility, + Accumulator, AggregateUDFImpl, Signature, TypeSignature, Volatility, }; +use datafusion_physical_expr::expressions::Literal; use std::any::Any; make_udaf_expr_and_func!( @@ -82,21 +83,42 @@ impl AggregateUDFImpl for StringAgg { } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { - match &acc_args.input_exprs[1] { - Expr::Literal(ScalarValue::Utf8(Some(delimiter))) - | Expr::Literal(ScalarValue::LargeUtf8(Some(delimiter))) => { - Ok(Box::new(StringAggAccumulator::new(delimiter))) + + if let Some(lit) = acc_args.physical_exprs[1].as_any().downcast_ref::() { + return match lit.value() { + ScalarValue::Utf8(Some(delimiter)) | + ScalarValue::LargeUtf8(Some(delimiter)) => { + Ok(Box::new(StringAggAccumulator::new(delimiter.as_str()))) } - Expr::Literal(ScalarValue::Utf8(None)) - | Expr::Literal(ScalarValue::LargeUtf8(None)) - | Expr::Literal(ScalarValue::Null) => { - Ok(Box::new(StringAggAccumulator::new(""))) + ScalarValue::Utf8(None) | + ScalarValue::LargeUtf8(None)| + ScalarValue::Null => { + Ok(Box::new(StringAggAccumulator::new(""))) } - _ => not_impl_err!( + e => not_impl_err!( "StringAgg not supported for delimiter {}", - &acc_args.input_exprs[1] + e ), + } } + + not_impl_err!("expect literal") + + // match &acc_args.physical_exprs[1] { + // Expr::Literal(ScalarValue::Utf8(Some(delimiter))) + // | Expr::Literal(ScalarValue::LargeUtf8(Some(delimiter))) => { + // Ok(Box::new(StringAggAccumulator::new(delimiter))) + // } + // Expr::Literal(ScalarValue::Utf8(None)) + // | Expr::Literal(ScalarValue::LargeUtf8(None)) + // | Expr::Literal(ScalarValue::Null) => { + // Ok(Box::new(StringAggAccumulator::new(""))) + // } + // _ => not_impl_err!( + // "StringAgg not supported for delimiter {}", + // &acc_args.input_exprs[1] + // ), + // } } } diff --git a/datafusion/functions-aggregate/src/sum.rs b/datafusion/functions-aggregate/src/sum.rs index 08e3908a58297..40df5bf386ecb 100644 --- a/datafusion/functions-aggregate/src/sum.rs +++ b/datafusion/functions-aggregate/src/sum.rs @@ -39,8 +39,8 @@ use datafusion_expr::utils::format_state_name; use datafusion_expr::{ Accumulator, AggregateUDFImpl, GroupsAccumulator, ReversedUDAF, Signature, Volatility, }; -use datafusion_physical_expr_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; -use datafusion_physical_expr_common::aggregate::utils::Hashable; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; +use datafusion_functions_aggregate_common::utils::Hashable; make_udaf_expr_and_func!( Sum, diff --git a/datafusion/functions-aggregate/src/variance.rs b/datafusion/functions-aggregate/src/variance.rs index c772608cb376d..4c78a42ea494f 100644 --- a/datafusion/functions-aggregate/src/variance.rs +++ b/datafusion/functions-aggregate/src/variance.rs @@ -34,7 +34,7 @@ use datafusion_expr::{ utils::format_state_name, Accumulator, AggregateUDFImpl, Signature, Volatility, }; -use datafusion_physical_expr_common::aggregate::stats::StatsType; +use datafusion_functions_aggregate_common::stats::StatsType; make_udaf_expr_and_func!( VarianceSample, diff --git a/datafusion/physical-expr-common/Cargo.toml b/datafusion/physical-expr-common/Cargo.toml index 3ef2d53455339..45ccb08e52e91 100644 --- a/datafusion/physical-expr-common/Cargo.toml +++ b/datafusion/physical-expr-common/Cargo.toml @@ -39,6 +39,6 @@ path = "src/lib.rs" ahash = { workspace = true } arrow = { workspace = true } datafusion-common = { workspace = true, default-features = true } -datafusion-expr = { workspace = true } +datafusion-expr-common = { workspace = true } hashbrown = { workspace = true } rand = { workspace = true } diff --git a/datafusion/physical-expr-common/src/binary_map.rs b/datafusion/physical-expr-common/src/binary_map.rs index edf608a2054f3..d21bdb3434c45 100644 --- a/datafusion/physical-expr-common/src/binary_map.rs +++ b/datafusion/physical-expr-common/src/binary_map.rs @@ -60,7 +60,7 @@ impl ArrowBytesSet { /// Return the contents of this set and replace it with a new empty /// set with the same output type - pub(super) fn take(&mut self) -> Self { + pub fn take(&mut self) -> Self { Self(self.0.take()) } diff --git a/datafusion/physical-expr-common/src/datum.rs b/datafusion/physical-expr-common/src/datum.rs index d0ba5f113b6fa..96c08d0d3a5b5 100644 --- a/datafusion/physical-expr-common/src/datum.rs +++ b/datafusion/physical-expr-common/src/datum.rs @@ -22,7 +22,8 @@ use arrow::compute::SortOptions; use arrow::error::ArrowError; use datafusion_common::internal_err; use datafusion_common::{Result, ScalarValue}; -use datafusion_expr::{ColumnarValue, Operator}; +use datafusion_expr_common::columnar_value::ColumnarValue; +use datafusion_expr_common::operator::Operator; use std::sync::Arc; /// Applies a binary [`Datum`] kernel `f` to `lhs` and `rhs` diff --git a/datafusion/physical-expr-common/src/lib.rs b/datafusion/physical-expr-common/src/lib.rs index f03eedd4cf65c..c186d16aa1ea1 100644 --- a/datafusion/physical-expr-common/src/lib.rs +++ b/datafusion/physical-expr-common/src/lib.rs @@ -15,11 +15,9 @@ // specific language governing permissions and limitations // under the License. -pub mod aggregate; pub mod binary_map; pub mod binary_view_map; pub mod datum; -pub mod expressions; pub mod physical_expr; pub mod sort_expr; pub mod tree_node; diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index e62606a42e6fb..a9178c315fe42 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -20,18 +20,16 @@ use std::fmt::{Debug, Display}; use std::hash::{Hash, Hasher}; use std::sync::Arc; -use crate::expressions::column::Column; use crate::utils::scatter; use arrow::array::BooleanArray; use arrow::compute::filter_record_batch; -use arrow::datatypes::{DataType, Schema, SchemaRef}; +use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_common::tree_node::{Transformed, TreeNode}; -use datafusion_common::{internal_err, not_impl_err, plan_err, Result}; -use datafusion_expr::interval_arithmetic::Interval; -use datafusion_expr::sort_properties::ExprProperties; -use datafusion_expr::ColumnarValue; +use datafusion_common::{internal_err, not_impl_err, Result}; +use datafusion_expr_common::interval_arithmetic::Interval; +use datafusion_expr_common::sort_properties::ExprProperties; +use datafusion_expr_common::columnar_value::ColumnarValue; /// See [create_physical_expr](https://docs.rs/datafusion/latest/datafusion/physical_expr/fn.create_physical_expr.html) /// for examples of creating `PhysicalExpr` from `Expr` @@ -193,33 +191,6 @@ pub fn with_new_children_if_necessary( } } -/// Rewrites an expression according to new schema; i.e. changes the columns it -/// refers to with the column at corresponding index in the new schema. Returns -/// an error if the given schema has fewer columns than the original schema. -/// Note that the resulting expression may not be valid if data types in the -/// new schema is incompatible with expression nodes. -pub fn with_new_schema( - expr: Arc, - schema: &SchemaRef, -) -> Result> { - Ok(expr - .transform_up(|expr| { - if let Some(col) = expr.as_any().downcast_ref::() { - let idx = col.index(); - let Some(field) = schema.fields().get(idx) else { - return plan_err!( - "New schema has fewer columns than original schema" - ); - }; - let new_col = Column::new(field.name(), idx); - Ok(Transformed::yes(Arc::new(new_col) as _)) - } else { - Ok(Transformed::no(expr)) - } - })? - .data) -} - pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any { if any.is::>() { any.downcast_ref::>() diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 2b506b74216f4..b65cd3aa0c934 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -22,13 +22,13 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; use crate::physical_expr::PhysicalExpr; -use crate::utils::limited_convert_logical_expr_to_physical_expr_with_dfschema; use arrow::compute::kernels::sort::{SortColumn, SortOptions}; use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; -use datafusion_common::{exec_err, DFSchema, Result}; -use datafusion_expr::{ColumnarValue, Expr}; +use datafusion_common::Result; +use datafusion_expr_common::columnar_value::ColumnarValue; + /// Represents Sort operation for a column in a RecordBatch #[derive(Clone, Debug)] @@ -272,29 +272,3 @@ pub type LexRequirement = Vec; ///`LexRequirementRef` is an alias for the type &`[PhysicalSortRequirement]`, which /// represents a reference to a lexicographical ordering requirement. pub type LexRequirementRef<'a> = &'a [PhysicalSortRequirement]; - -/// Converts each [`Expr::Sort`] into a corresponding [`PhysicalSortExpr`]. -/// Returns an error if the given logical expression is not a [`Expr::Sort`]. -pub fn limited_convert_logical_sort_exprs_to_physical_with_dfschema( - exprs: &[Expr], - dfschema: &DFSchema, -) -> Result> { - // Construct PhysicalSortExpr objects from Expr objects: - let mut sort_exprs = vec![]; - for expr in exprs { - let Expr::Sort(sort) = expr else { - return exec_err!("Expects to receive sort expression"); - }; - sort_exprs.push(PhysicalSortExpr::new( - limited_convert_logical_expr_to_physical_expr_with_dfschema( - sort.expr.as_ref(), - dfschema, - )?, - SortOptions { - descending: !sort.asc, - nulls_first: sort.nulls_first, - }, - )) - } - Ok(sort_exprs) -} diff --git a/datafusion/physical-expr-common/src/utils.rs b/datafusion/physical-expr-common/src/utils.rs index 0978a906a5dc6..86cbb3ea8e056 100644 --- a/datafusion/physical-expr-common/src/utils.rs +++ b/datafusion/physical-expr-common/src/utils.rs @@ -20,14 +20,12 @@ use std::sync::Arc; use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData}; use arrow::compute::{and_kleene, is_not_null, SlicesIterator}; -use datafusion_common::{exec_err, DFSchema, Result}; -use datafusion_expr::expr::Alias; -use datafusion_expr::sort_properties::ExprProperties; -use datafusion_expr::Expr; - -use crate::expressions::column::Column; -use crate::expressions::literal::Literal; -use crate::expressions::CastExpr; +use datafusion_common::Result; +// use datafusion_expr::expr::Alias; +// use datafusion_expr::sort_properties::ExprProperties; +// use datafusion_expr::Expr; +use datafusion_expr_common::sort_properties::ExprProperties; + use crate::physical_expr::PhysicalExpr; use crate::sort_expr::PhysicalSortExpr; use crate::tree_node::ExprContext; @@ -108,35 +106,6 @@ pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) -> Vec`. -/// If conversion is not supported yet, returns Error. -pub fn limited_convert_logical_expr_to_physical_expr_with_dfschema( - expr: &Expr, - dfschema: &DFSchema, -) -> Result> { - match expr { - Expr::Alias(Alias { expr, .. }) => Ok( - limited_convert_logical_expr_to_physical_expr_with_dfschema(expr, dfschema)?, - ), - Expr::Column(col) => { - let idx = dfschema.index_of_column(col)?; - Ok(Arc::new(Column::new(&col.name, idx))) - } - Expr::Cast(cast_expr) => Ok(Arc::new(CastExpr::new( - limited_convert_logical_expr_to_physical_expr_with_dfschema( - cast_expr.expr.as_ref(), - dfschema, - )?, - cast_expr.data_type.clone(), - None, - ))), - Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))), - _ => exec_err!( - "Unsupported expression: {expr} for conversion to Arc" - ), - } -} - #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 8436b5279bd76..088cd5441789f 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -56,7 +56,9 @@ chrono = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-expr-common = { workspace = true } datafusion-physical-expr-common = { workspace = true } +datafusion-functions-aggregate-common = { workspace = true } half = { workspace = true } hashbrown = { workspace = true } hex = { version = "0.4", optional = true } diff --git a/datafusion/physical-expr/benches/case_when.rs b/datafusion/physical-expr/benches/case_when.rs index 862edd9c1fac3..b863b79b6a5c1 100644 --- a/datafusion/physical-expr/benches/case_when.rs +++ b/datafusion/physical-expr/benches/case_when.rs @@ -23,8 +23,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use datafusion_common::ScalarValue; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{BinaryExpr, CaseExpr}; -use datafusion_physical_expr_common::expressions::column::Column; -use datafusion_physical_expr_common::expressions::Literal; +use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use std::sync::Arc; diff --git a/datafusion/physical-expr/benches/is_null.rs b/datafusion/physical-expr/benches/is_null.rs index 3dad8e9b456a0..c76be547d8d1e 100644 --- a/datafusion/physical-expr/benches/is_null.rs +++ b/datafusion/physical-expr/benches/is_null.rs @@ -21,7 +21,7 @@ use arrow_array::builder::Int32Builder; use arrow_schema::DataType; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use datafusion_physical_expr::expressions::{IsNotNullExpr, IsNullExpr}; -use datafusion_physical_expr_common::expressions::column::Column; +use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use std::sync::Arc; diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs index 3c0f3a28fedbc..d2ea90ec30070 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs @@ -21,7 +21,7 @@ pub use adapter::GroupsAccumulatorAdapter; // Backward compatibility #[allow(unused_imports)] pub(crate) mod accumulate { - pub use datafusion_physical_expr_common::aggregate::groups_accumulator::accumulate::NullState; + pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::NullState; } -pub use datafusion_physical_expr_common::aggregate::groups_accumulator::accumulate::NullState; +pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::NullState; diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index 0760986a87c6d..12f496eaadec6 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -20,10 +20,10 @@ pub(crate) mod stats; pub mod moving_min_max; pub mod utils { - pub use datafusion_physical_expr_common::aggregate::utils::{ + pub use datafusion_functions_aggregate_common::utils::{ adjust_output_array, down_cast_any_ref, get_accum_scalar_values_as_arrays, get_sort_options, ordering_fields, DecimalAverager, Hashable, }; } -pub use datafusion_physical_expr_common::aggregate::AggregateExpr; +pub use datafusion_functions_aggregate_common::aggregate::AggregateExpr; diff --git a/datafusion/physical-expr/src/aggregate/stats.rs b/datafusion/physical-expr/src/aggregate/stats.rs index d9338f5a962f7..db1934cd1ad96 100644 --- a/datafusion/physical-expr/src/aggregate/stats.rs +++ b/datafusion/physical-expr/src/aggregate/stats.rs @@ -15,4 +15,4 @@ // specific language governing permissions and limitations // under the License. -pub use datafusion_physical_expr_common::aggregate::stats::StatsType; +pub use datafusion_functions_aggregate_common::stats::StatsType; diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index a6e9fba281676..5b8b69fd4cb37 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -24,7 +24,7 @@ use crate::equivalence::{ collapse_lex_req, EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping, }; -use crate::expressions::Literal; +use crate::expressions::{CastExpr, Column, Literal, with_new_schema}; use crate::{ physical_exprs_contains, ConstExpr, LexOrdering, LexOrderingRef, LexRequirement, LexRequirementRef, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, @@ -36,9 +36,6 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{plan_err, JoinSide, JoinType, Result}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; -use datafusion_physical_expr_common::expressions::column::Column; -use datafusion_physical_expr_common::expressions::CastExpr; -use datafusion_physical_expr_common::physical_expr::with_new_schema; use datafusion_physical_expr_common::utils::ExprPropertiesNode; use indexmap::{IndexMap, IndexSet}; diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index c34dcdfb75988..73cbc2b698ad8 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -685,7 +685,7 @@ mod tests { use datafusion_common::plan_datafusion_err; use datafusion_expr::type_coercion::binary::get_input_types; - use datafusion_physical_expr_common::expressions::column::Column; + use crate::expressions::Column; /// Performs a binary operation, applying any type coercion necessary fn binary_op( diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index b428d562bd1b7..988688f15f3a5 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -32,8 +32,8 @@ use datafusion_common::cast::as_boolean_array; use datafusion_common::{exec_err, internal_err, DataFusionError, Result, ScalarValue}; use datafusion_expr::ColumnarValue; -use datafusion_physical_expr_common::expressions::column::Column; -use datafusion_physical_expr_common::expressions::Literal; +use super::Column; +use super::Literal; use itertools::Itertools; type WhenThen = (Arc, Arc); @@ -558,7 +558,7 @@ mod tests { use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_expr::type_coercion::binary::comparison_coercion; use datafusion_expr::Operator; - use datafusion_physical_expr_common::expressions::Literal; + use super::Literal; #[test] fn case_with_expr() -> Result<()> { diff --git a/datafusion/physical-expr-common/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs similarity index 99% rename from datafusion/physical-expr-common/src/expressions/cast.rs rename to datafusion/physical-expr/src/expressions/cast.rs index dd6131ad65c3b..8317cbf5a99be 100644 --- a/datafusion/physical-expr-common/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -27,9 +27,9 @@ use arrow::datatypes::{DataType, DataType::*, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::format::DEFAULT_FORMAT_OPTIONS; use datafusion_common::{not_impl_err, Result}; -use datafusion_expr::interval_arithmetic::Interval; -use datafusion_expr::sort_properties::ExprProperties; -use datafusion_expr::ColumnarValue; +use datafusion_expr_common::interval_arithmetic::Interval; +use datafusion_expr_common::sort_properties::ExprProperties; +use datafusion_expr_common::columnar_value::ColumnarValue; const DEFAULT_CAST_OPTIONS: CastOptions<'static> = CastOptions { safe: false, diff --git a/datafusion/physical-expr-common/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs similarity index 83% rename from datafusion/physical-expr-common/src/expressions/column.rs rename to datafusion/physical-expr/src/expressions/column.rs index 5397599ea2dcc..4888c6b9e148c 100644 --- a/datafusion/physical-expr-common/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -25,7 +25,9 @@ use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; -use datafusion_common::{internal_err, Result}; +use arrow_schema::SchemaRef; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::{internal_err, plan_err, Result}; use datafusion_expr::ColumnarValue; use crate::physical_expr::{down_cast_any_ref, PhysicalExpr}; @@ -136,6 +138,36 @@ pub fn col(name: &str, schema: &Schema) -> Result> { Ok(Arc::new(Column::new_with_schema(name, schema)?)) } +// TODO: Move expressions out of physical-expr? + +/// Rewrites an expression according to new schema; i.e. changes the columns it +/// refers to with the column at corresponding index in the new schema. Returns +/// an error if the given schema has fewer columns than the original schema. +/// Note that the resulting expression may not be valid if data types in the +/// new schema is incompatible with expression nodes. +pub fn with_new_schema( + expr: Arc, + schema: &SchemaRef, +) -> Result> { + Ok(expr + .transform_up(|expr| { + if let Some(col) = expr.as_any().downcast_ref::() { + let idx = col.index(); + let Some(field) = schema.fields().get(idx) else { + return plan_err!( + "New schema has fewer columns than original schema" + ); + }; + let new_col = Column::new(field.name(), idx); + Ok(Transformed::yes(Arc::new(new_col) as _)) + } else { + Ok(Transformed::no(expr)) + } + })? + .data) +} + + #[cfg(test)] mod test { use super::Column; diff --git a/datafusion/physical-expr-common/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs similarity index 95% rename from datafusion/physical-expr-common/src/expressions/literal.rs rename to datafusion/physical-expr/src/expressions/literal.rs index b3cff1ef69baf..ed24e9028153e 100644 --- a/datafusion/physical-expr-common/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -28,9 +28,10 @@ use arrow::{ record_batch::RecordBatch, }; use datafusion_common::{Result, ScalarValue}; -use datafusion_expr::interval_arithmetic::Interval; -use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; -use datafusion_expr::{ColumnarValue, Expr}; +use datafusion_expr::Expr; +use datafusion_expr_common::columnar_value::ColumnarValue; +use datafusion_expr_common::interval_arithmetic::Interval; +use datafusion_expr_common::sort_properties::{ExprProperties, SortProperties}; /// Represents a literal value #[derive(Debug, PartialEq, Eq, Hash)] diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index cbb697b5f3041..0ff48ac1c230c 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -20,10 +20,13 @@ #[macro_use] mod binary; mod case; +mod cast; +mod column; mod in_list; mod is_not_null; mod is_null; mod like; +mod literal; mod negative; mod no_op; mod not; @@ -43,9 +46,9 @@ pub use crate::PhysicalSortExpr; pub use binary::{binary, BinaryExpr}; pub use case::{case, CaseExpr}; pub use datafusion_expr::utils::format_state_name; -pub use datafusion_physical_expr_common::expressions::column::{col, Column}; -pub use datafusion_physical_expr_common::expressions::literal::{lit, Literal}; -pub use datafusion_physical_expr_common::expressions::{cast, CastExpr}; +pub use column::{col, Column, with_new_schema}; +pub use literal::{lit, Literal}; +pub use cast::{cast, CastExpr}; pub use in_list::{in_list, InListExpr}; pub use is_not_null::{is_not_null, IsNotNullExpr}; pub use is_null::{is_null, IsNullExpr}; diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 2e78119eba468..3d2ae2651bfc2 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -45,7 +45,7 @@ pub mod execution_props { pub use aggregate::groups_accumulator::{GroupsAccumulatorAdapter, NullState}; pub use analysis::{analyze, AnalysisContext, ExprBoundaries}; -pub use datafusion_physical_expr_common::aggregate::{ +pub use datafusion_functions_aggregate_common::aggregate::{ AggregateExpr, AggregatePhysicalExpressions, }; pub use equivalence::{calculate_union, ConstExpr, EquivalenceProperties}; diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index c60a772b9ce26..c718e6b054ef3 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +pub(crate) use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use itertools::izip; pub use datafusion_physical_expr_common::physical_expr::down_cast_any_ref; diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index d3f66bdea93d5..102c66ab99bdb 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -51,7 +51,9 @@ datafusion-common = { workspace = true, default-features = true } datafusion-common-runtime = { workspace = true, default-features = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-expr-functions-aggregate = { workspace = true } datafusion-functions-aggregate = { workspace = true } +datafusion-functions-aggregate-common = { workspace = true } datafusion-physical-expr = { workspace = true, default-features = true } datafusion-physical-expr-common = { workspace = true } futures = { workspace = true } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 8941418c12e1e..bbc139460074d 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1219,10 +1219,10 @@ mod tests { use datafusion_physical_expr::PhysicalSortExpr; use crate::common::collect; - use datafusion_physical_expr_common::aggregate::{ + use datafusion_expr_functions_aggregate::aggregate::{ create_aggregate_expr_with_dfschema, AggregateExprBuilder, }; - use datafusion_physical_expr_common::expressions::Literal; + use datafusion_physical_expr::expressions::Literal; use futures::{FutureExt, Stream}; // Generate a schema which consists of 5 columns (a, b, c, d, e) @@ -1995,11 +1995,9 @@ mod tests { let args = vec![col("b", schema)?]; let logical_args = vec![datafusion_expr::col("b")]; let func = datafusion_expr::AggregateUDF::new_from_impl(FirstValue::new()); - datafusion_physical_expr_common::aggregate::create_aggregate_expr_with_dfschema( + datafusion_expr_functions_aggregate::aggregate::create_aggregate_expr_with_dfschema( &func, &args, - &logical_args, - &sort_exprs, &ordering_req, dfschema, "FIRST_VALUE(b)", @@ -2030,8 +2028,6 @@ mod tests { create_aggregate_expr_with_dfschema( &func, &args, - &logical_args, - &sort_exprs, &ordering_req, dfschema, "LAST_VALUE(b)", @@ -2263,8 +2259,6 @@ mod tests { create_aggregate_expr_with_dfschema( &array_agg_udaf(), &[Arc::clone(col_a)], - &[], - &sort_exprs, &ordering_req, &test_df_schema, "array_agg", @@ -2423,8 +2417,6 @@ mod tests { vec![create_aggregate_expr_with_dfschema( &count_udaf(), &[col("val", &schema)?], - &[datafusion_expr::col("val")], - &[], &[], &df_schema, "COUNT(val)", @@ -2511,8 +2503,6 @@ mod tests { vec![create_aggregate_expr_with_dfschema( &count_udaf(), &[col("val", &schema)?], - &[datafusion_expr::col("val")], - &[], &[], &df_schema, "COUNT(val)", diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index eeecc017c2afa..4c11b8d1a3ddf 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -82,7 +82,7 @@ pub mod windows; pub mod work_table; pub mod udaf { - pub use datafusion_physical_expr_common::aggregate::{ + pub use datafusion_expr_functions_aggregate::aggregate::{ create_aggregate_expr, create_aggregate_expr_with_dfschema, AggregateFunctionExpr, }; } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 9321fdb2cadf8..2712ae2fb635b 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -587,7 +587,7 @@ mod tests { use arrow_schema::{DataType, SortOptions}; use datafusion_common::ScalarValue; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; - use datafusion_physical_expr_common::expressions::column::col; + use datafusion_physical_expr::expressions::col; // Generate a schema which consists of 7 columns (a, b, c, d, e, f, g) fn create_test_schema() -> Result { diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 65cef28efc451..c079151ebf7be 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -43,7 +43,7 @@ use datafusion_physical_expr::{ AggregateExpr, ConstExpr, EquivalenceProperties, LexOrdering, PhysicalSortRequirement, }; -use datafusion_physical_expr_common::aggregate::AggregateExprBuilder; +use datafusion_expr_functions_aggregate::aggregate::AggregateExprBuilder; use itertools::Itertools; mod bounded_window_agg_exec; @@ -130,7 +130,7 @@ pub fn create_window_expr( .schema(Arc::new(input_schema.clone())) .name(name) .order_by(order_by.to_vec()) - .sort_exprs(sort_exprs) + // .sort_exprs(sort_exprs) .with_ignore_nulls(ignore_nulls) .build()?; window_expr_from_aggregate_expr( @@ -412,7 +412,7 @@ pub(crate) fn calc_requirements< let PhysicalSortExpr { expr, options } = element.borrow(); if !sort_reqs.iter().any(|e| e.expr.eq(expr)) { sort_reqs.push(PhysicalSortRequirement::new( - Arc::clone(expr), + Arc::clone(&expr), Some(*options), )); }