From b787dfb4e323712d8c764e36a9dbe8513ef6a48f Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Wed, 21 Feb 2024 20:53:35 +0800 Subject: [PATCH 01/26] first draft Signed-off-by: jayzhan211 --- .../user_defined_scalar_functions.rs | 85 ++++++++++++++++++- datafusion/expr/src/lib.rs | 2 +- datafusion/expr/src/udf.rs | 17 ++++ datafusion/physical-expr/src/planner.rs | 62 ++++++++------ 4 files changed, 137 insertions(+), 29 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index 0546ef59b1d8..df4094d1f50f 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -16,7 +16,10 @@ // under the License. use arrow::compute::kernels::numeric::add; -use arrow_array::{Array, ArrayRef, Float64Array, Int32Array, RecordBatch, UInt8Array}; +use arrow_array::{ + Array, ArrayRef, Float32Array, Float64Array, Int32Array, RecordBatch, + UInt8Array, +}; use arrow_schema::DataType::Float64; use arrow_schema::{DataType, Field, Schema}; use datafusion::prelude::*; @@ -26,12 +29,15 @@ use datafusion_common::{ assert_batches_eq, assert_batches_sorted_eq, cast::as_int32_array, not_impl_err, plan_err, ExprSchema, Result, ScalarValue, }; +use datafusion_common::{DFField, DFSchema}; use datafusion_expr::{ create_udaf, create_udf, Accumulator, ColumnarValue, ExprSchemable, - LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, Volatility, + LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, Simplified, Volatility, }; + use rand::{thread_rng, Rng}; use std::any::Any; +use std::collections::HashMap; use std::iter; use std::sync::Arc; @@ -514,6 +520,81 @@ async fn deregister_udf() -> Result<()> { Ok(()) } +#[derive(Debug)] +struct CastToI64UDF { + signature: Signature, +} + +impl CastToI64UDF { + fn new() -> Self { + Self { + signature: Signature::any(1, Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for CastToI64UDF { + fn as_any(&self) -> &dyn Any { + self + } + fn name(&self) -> &str { + "cast_to_i64" + } + fn signature(&self) -> &Signature { + &self.signature + } + fn return_type(&self, _args: &[DataType]) -> Result { + Ok(DataType::Int64) + } + // Wrap with Expr::Cast() to Int64 + fn simplify(&self, args: Vec) -> Result { + let dfs = DFSchema::new_with_metadata( + vec![DFField::new(Some("t"), "x", DataType::Float32, true)], + HashMap::default(), + )?; + let e = args[0].clone(); + let casted_expr = e.cast_to(&DataType::Int64, &dfs)?; + Ok(Simplified::Rewritten(casted_expr)) + } + fn invoke(&self, args: &[ColumnarValue]) -> Result { + Ok(args.get(0).unwrap().clone()) + } +} + +#[tokio::test] +async fn test_user_defined_functions_cast_to_i64() -> Result<()> { + let ctx = SessionContext::new(); + + let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Float32, false)])); + + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0]))], + )?; + + ctx.register_batch("t", batch)?; + + let cast_to_i64_udf = ScalarUDF::from(CastToI64UDF::new()); + ctx.register_udf(cast_to_i64_udf); + + let result = plan_and_collect(&ctx, "SELECT cast_to_i64(x) FROM t").await?; + + assert_batches_eq!( + &[ + "+------------------+", + "| cast_to_i64(t.x) |", + "+------------------+", + "| 1 |", + "| 2 |", + "| 3 |", + "+------------------+" + ], + &result + ); + + Ok(()) +} + #[derive(Debug)] struct TakeUDF { signature: Signature, diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 8c73ae5ae709..0b6d8aad26c2 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -80,7 +80,7 @@ pub use signature::{ }; pub use table_source::{TableProviderFilterPushDown, TableSource, TableType}; pub use udaf::{AggregateUDF, AggregateUDFImpl}; -pub use udf::{ScalarUDF, ScalarUDFImpl}; +pub use udf::{ScalarUDF, ScalarUDFImpl, Simplified}; pub use udwf::{WindowUDF, WindowUDFImpl}; pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits}; diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 59e5a7772e02..2779708418fe 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -30,6 +30,15 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; +/// Was the expression simplified? +pub enum Simplified { + /// The function call was simplified to an entirely new Expr + Rewritten(Expr), + /// the function call could not be simplified, and the arguments + /// are return unmodified + Original(Vec), +} + /// Logical representation of a Scalar User Defined Function. /// /// A scalar function produces a single row output for each row of input. This @@ -161,6 +170,10 @@ impl ScalarUDF { self.inner.return_type_from_exprs(args, schema) } + pub fn simplify(&self, args: Vec) -> Result { + self.inner.simplify(args) + } + /// Invoke the function on `args`, returning the appropriate result. /// /// See [`ScalarUDFImpl::invoke`] for more details. @@ -338,6 +351,10 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { fn monotonicity(&self) -> Result> { Ok(None) } + + fn simplify(&self, args: Vec) -> Result { + Ok(Simplified::Original(args)) + } } /// ScalarUDF that adds an alias to the underlying function. It is better to diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index bf279518d31d..48d1de7638a4 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -257,34 +257,44 @@ pub fn create_physical_expr( ))) } - Expr::ScalarFunction(ScalarFunction { func_def, args }) => { - let physical_args = args - .iter() - .map(|e| create_physical_expr(e, input_dfschema, execution_props)) - .collect::>>()?; - match func_def { - ScalarFunctionDefinition::BuiltIn(fun) => { - functions::create_physical_expr( - fun, - &physical_args, - input_schema, - execution_props, - ) - } - ScalarFunctionDefinition::UDF(fun) => { - let return_type = fun.return_type_from_exprs(args, input_dfschema)?; + Expr::ScalarFunction(ScalarFunction { func_def, args }) => match func_def { + ScalarFunctionDefinition::BuiltIn(fun) => { + let physical_args = args + .iter() + .map(|e| create_physical_expr(e, input_dfschema, execution_props)) + .collect::>>()?; - udf::create_physical_expr( - fun.clone().as_ref(), - &physical_args, - return_type, - ) - } - ScalarFunctionDefinition::Name(_) => { - internal_err!("Function `Expr` with name should be resolved.") - } + functions::create_physical_expr( + fun, + &physical_args, + input_schema, + execution_props, + ) } - } + ScalarFunctionDefinition::UDF(fun) => { + let args = match fun.simplify(args.to_owned())? { + datafusion_expr::Simplified::Original(args) => args, + datafusion_expr::Simplified::Rewritten(expr) => vec![expr], + }; + + let physical_args = args + .iter() + .map(|e| create_physical_expr(e, input_dfschema, execution_props)) + .collect::>>()?; + + let return_type = + fun.return_type_from_exprs(args.as_slice(), input_dfschema)?; + + udf::create_physical_expr( + fun.clone().as_ref(), + &physical_args, + return_type, + ) + } + ScalarFunctionDefinition::Name(_) => { + internal_err!("Function `Expr` with name should be resolved.") + } + }, Expr::Between(Between { expr, negated, From 72592750e183c3a71936f73c27a9c2ea50c1e790 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Wed, 21 Feb 2024 21:00:59 +0800 Subject: [PATCH 02/26] clippy Signed-off-by: jayzhan211 --- .../core/tests/user_defined/user_defined_scalar_functions.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index df4094d1f50f..6d62a954340d 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -17,8 +17,7 @@ use arrow::compute::kernels::numeric::add; use arrow_array::{ - Array, ArrayRef, Float32Array, Float64Array, Int32Array, RecordBatch, - UInt8Array, + Array, ArrayRef, Float32Array, Float64Array, Int32Array, RecordBatch, UInt8Array, }; use arrow_schema::DataType::Float64; use arrow_schema::{DataType, Field, Schema}; @@ -557,7 +556,7 @@ impl ScalarUDFImpl for CastToI64UDF { Ok(Simplified::Rewritten(casted_expr)) } fn invoke(&self, args: &[ColumnarValue]) -> Result { - Ok(args.get(0).unwrap().clone()) + Ok(args.first().unwrap().clone()) } } From 4d98121aa16cb4697480280d3474918ee6e8a48d Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Wed, 21 Feb 2024 23:07:25 +0800 Subject: [PATCH 03/26] add comments Signed-off-by: jayzhan211 --- .../core/tests/user_defined/user_defined_scalar_functions.rs | 2 ++ datafusion/expr/src/udf.rs | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index 6d62a954340d..bf615cc9c7be 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -555,7 +555,9 @@ impl ScalarUDFImpl for CastToI64UDF { let casted_expr = e.cast_to(&DataType::Int64, &dfs)?; Ok(Simplified::Rewritten(casted_expr)) } + // Casting should be done in `simplify`, so we just return the first argument fn invoke(&self, args: &[ColumnarValue]) -> Result { + assert_eq!(args.len(), 1); Ok(args.first().unwrap().clone()) } } diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 2779708418fe..a9b598438b5f 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -170,6 +170,9 @@ impl ScalarUDF { self.inner.return_type_from_exprs(args, schema) } + /// Do the function rewrite + /// + /// See [`ScalarUDFImpl::simplify`] for more details. pub fn simplify(&self, args: Vec) -> Result { self.inner.simplify(args) } @@ -352,6 +355,7 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { Ok(None) } + // Do the function rewrite fn simplify(&self, args: Vec) -> Result { Ok(Simplified::Original(args)) } From bacc9667a2aa32f88d2e1f921070b3289c7b8f7d Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Fri, 23 Feb 2024 19:58:28 +0800 Subject: [PATCH 04/26] move to optimize rule Signed-off-by: jayzhan211 --- .../user_defined_scalar_functions.rs | 4 +- datafusion/expr/src/udf.rs | 8 +-- datafusion/optimizer/src/analyzer/mod.rs | 3 +- .../optimizer/src/analyzer/rewrite_expr.rs | 65 +++++++++++++++++++ datafusion/physical-expr/src/planner.rs | 63 ++++++++---------- 5 files changed, 100 insertions(+), 43 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index bf615cc9c7be..1a425eae5c9f 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -546,12 +546,12 @@ impl ScalarUDFImpl for CastToI64UDF { Ok(DataType::Int64) } // Wrap with Expr::Cast() to Int64 - fn simplify(&self, args: Vec) -> Result { + fn simplify(&self, args: &[Expr]) -> Result { let dfs = DFSchema::new_with_metadata( vec![DFField::new(Some("t"), "x", DataType::Float32, true)], HashMap::default(), )?; - let e = args[0].clone(); + let e = args[0].to_owned(); let casted_expr = e.cast_to(&DataType::Int64, &dfs)?; Ok(Simplified::Rewritten(casted_expr)) } diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index a9b598438b5f..abca0ebb038b 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -36,7 +36,7 @@ pub enum Simplified { Rewritten(Expr), /// the function call could not be simplified, and the arguments /// are return unmodified - Original(Vec), + Original, } /// Logical representation of a Scalar User Defined Function. @@ -173,7 +173,7 @@ impl ScalarUDF { /// Do the function rewrite /// /// See [`ScalarUDFImpl::simplify`] for more details. - pub fn simplify(&self, args: Vec) -> Result { + pub fn simplify(&self, args: &[Expr]) -> Result { self.inner.simplify(args) } @@ -356,8 +356,8 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { } // Do the function rewrite - fn simplify(&self, args: Vec) -> Result { - Ok(Simplified::Original(args)) + fn simplify(&self, _args: &[Expr]) -> Result { + Ok(Simplified::Original) } } diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index 4c480017fc3a..d2b4917c389b 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -38,7 +38,7 @@ use datafusion_expr::{Expr, LogicalPlan}; use log::debug; use std::sync::Arc; -use self::rewrite_expr::OperatorToFunction; +use self::rewrite_expr::{FunctionSimplifiction, OperatorToFunction}; /// [`AnalyzerRule`]s transform [`LogicalPlan`]s in some way to make /// the plan valid prior to the rest of the DataFusion optimization process. @@ -78,6 +78,7 @@ impl Analyzer { // OperatorToFunction should be run before TypeCoercion, since it rewrite based on the argument types (List or Scalar), // and TypeCoercion may cast the argument types from Scalar to List. Arc::new(OperatorToFunction::new()), + Arc::new(FunctionSimplifiction::new()), Arc::new(TypeCoercion::new()), Arc::new(CountWildcardRule::new()), ]; diff --git a/datafusion/optimizer/src/analyzer/rewrite_expr.rs b/datafusion/optimizer/src/analyzer/rewrite_expr.rs index eedfc40a7f80..a7f5c223beb9 100644 --- a/datafusion/optimizer/src/analyzer/rewrite_expr.rs +++ b/datafusion/optimizer/src/analyzer/rewrite_expr.rs @@ -31,6 +31,7 @@ use datafusion_expr::utils::merge_schema; use datafusion_expr::BuiltinScalarFunction; use datafusion_expr::Operator; use datafusion_expr::ScalarFunctionDefinition; +use datafusion_expr::Simplified; use datafusion_expr::{BinaryExpr, Expr, LogicalPlan}; use super::AnalyzerRule; @@ -319,3 +320,67 @@ fn rewrite_array_concat_operator_to_func_for_column( _ => Ok(None), } } + +#[derive(Default)] +pub struct FunctionSimplifiction {} + +impl FunctionSimplifiction { + pub fn new() -> Self { + Self {} + } +} + +impl AnalyzerRule for FunctionSimplifiction { + fn name(&self) -> &str { + "function_simplification" + } + + fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result { + function_simplication_analyze_internal(&plan) + } +} + +fn function_simplication_analyze_internal(plan: &LogicalPlan) -> Result { + // optimize child plans first + let new_inputs = plan + .inputs() + .iter() + .map(|p| function_simplication_analyze_internal(p)) + .collect::>>()?; + + let mut expr_rewrite = FunctionSimplifictionRewriter {}; + + let new_expr = plan + .expressions() + .into_iter() + .map(|expr| { + // ensure names don't change: + // https://github.com/apache/arrow-datafusion/issues/3555 + rewrite_preserving_name(expr, &mut expr_rewrite) + }) + .collect::>>()?; + + plan.with_new_exprs(new_expr, new_inputs) +} + +pub(crate) struct FunctionSimplifictionRewriter {} + +impl TreeNodeRewriter for FunctionSimplifictionRewriter { + type N = Expr; + + fn mutate(&mut self, expr: Expr) -> Result { + if let Expr::ScalarFunction(ScalarFunction { + func_def: ScalarFunctionDefinition::UDF(udf), + args, + }) = &expr + { + let simplified_expr = udf.simplify(args)?; + match simplified_expr { + Simplified::Original => Ok(expr), + Simplified::Rewritten(expr) => Ok(expr), + } + } else { + Ok(expr) + } + } +} diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index 48d1de7638a4..f8a889b825f3 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -257,44 +257,35 @@ pub fn create_physical_expr( ))) } - Expr::ScalarFunction(ScalarFunction { func_def, args }) => match func_def { - ScalarFunctionDefinition::BuiltIn(fun) => { - let physical_args = args - .iter() - .map(|e| create_physical_expr(e, input_dfschema, execution_props)) - .collect::>>()?; - - functions::create_physical_expr( - fun, - &physical_args, - input_schema, - execution_props, - ) - } - ScalarFunctionDefinition::UDF(fun) => { - let args = match fun.simplify(args.to_owned())? { - datafusion_expr::Simplified::Original(args) => args, - datafusion_expr::Simplified::Rewritten(expr) => vec![expr], - }; - - let physical_args = args - .iter() - .map(|e| create_physical_expr(e, input_dfschema, execution_props)) - .collect::>>()?; - - let return_type = - fun.return_type_from_exprs(args.as_slice(), input_dfschema)?; + Expr::ScalarFunction(ScalarFunction { func_def, args }) => { + let physical_args = args + .iter() + .map(|e| create_physical_expr(e, input_dfschema, execution_props)) + .collect::>>()?; + match func_def { + ScalarFunctionDefinition::BuiltIn(fun) => { + functions::create_physical_expr( + fun, + &physical_args, + input_schema, + execution_props, + ) + } + ScalarFunctionDefinition::UDF(fun) => { + let return_type = + fun.return_type_from_exprs(args.as_slice(), input_dfschema)?; - udf::create_physical_expr( - fun.clone().as_ref(), - &physical_args, - return_type, - ) - } - ScalarFunctionDefinition::Name(_) => { - internal_err!("Function `Expr` with name should be resolved.") + udf::create_physical_expr( + fun.clone().as_ref(), + &physical_args, + return_type, + ) + } + ScalarFunctionDefinition::Name(_) => { + internal_err!("Function `Expr` with name should be resolved.") + } } - }, + } Expr::Between(Between { expr, negated, From 3199bcad9079bd36fa3cea67388e856b49011843 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Fri, 23 Feb 2024 20:01:10 +0800 Subject: [PATCH 05/26] cleanup Signed-off-by: jayzhan211 --- datafusion/physical-expr/src/planner.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index f8a889b825f3..bf279518d31d 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -272,8 +272,7 @@ pub fn create_physical_expr( ) } ScalarFunctionDefinition::UDF(fun) => { - let return_type = - fun.return_type_from_exprs(args.as_slice(), input_dfschema)?; + let return_type = fun.return_type_from_exprs(args, input_dfschema)?; udf::create_physical_expr( fun.clone().as_ref(), From 63648bee1aa727735ed5c2b2a67eedb07a358091 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Fri, 23 Feb 2024 20:40:29 +0800 Subject: [PATCH 06/26] fix explain test Signed-off-by: jayzhan211 --- datafusion/sqllogictest/test_files/explain.slt | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 4002164cc918..83c02c302b67 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -181,6 +181,7 @@ Projection: simple_explain_test.a, simple_explain_test.b, simple_explain_test.c --TableScan: simple_explain_test logical_plan after inline_table_scan SAME TEXT AS ABOVE logical_plan after operator_to_function SAME TEXT AS ABOVE +logical_plan after function_simplification SAME TEXT AS ABOVE logical_plan after type_coercion SAME TEXT AS ABOVE logical_plan after count_wildcard_rule SAME TEXT AS ABOVE analyzed_logical_plan SAME TEXT AS ABOVE From 83fc9d8f84ffda8d32d6c7d016e37c3c333bc1c8 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 27 Feb 2024 23:15:57 +0800 Subject: [PATCH 07/26] move to simplifier Signed-off-by: jayzhan211 --- datafusion/optimizer/src/analyzer/mod.rs | 3 +- .../optimizer/src/analyzer/rewrite_expr.rs | 67 +------------------ .../simplify_expressions/expr_simplifier.rs | 12 +++- .../function_simplifier.rs | 53 +++++++++++++++ .../optimizer/src/simplify_expressions/mod.rs | 1 + 5 files changed, 67 insertions(+), 69 deletions(-) create mode 100644 datafusion/optimizer/src/simplify_expressions/function_simplifier.rs diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index d2b4917c389b..4c480017fc3a 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -38,7 +38,7 @@ use datafusion_expr::{Expr, LogicalPlan}; use log::debug; use std::sync::Arc; -use self::rewrite_expr::{FunctionSimplifiction, OperatorToFunction}; +use self::rewrite_expr::OperatorToFunction; /// [`AnalyzerRule`]s transform [`LogicalPlan`]s in some way to make /// the plan valid prior to the rest of the DataFusion optimization process. @@ -78,7 +78,6 @@ impl Analyzer { // OperatorToFunction should be run before TypeCoercion, since it rewrite based on the argument types (List or Scalar), // and TypeCoercion may cast the argument types from Scalar to List. Arc::new(OperatorToFunction::new()), - Arc::new(FunctionSimplifiction::new()), Arc::new(TypeCoercion::new()), Arc::new(CountWildcardRule::new()), ]; diff --git a/datafusion/optimizer/src/analyzer/rewrite_expr.rs b/datafusion/optimizer/src/analyzer/rewrite_expr.rs index a7f5c223beb9..7a0402c91571 100644 --- a/datafusion/optimizer/src/analyzer/rewrite_expr.rs +++ b/datafusion/optimizer/src/analyzer/rewrite_expr.rs @@ -31,7 +31,6 @@ use datafusion_expr::utils::merge_schema; use datafusion_expr::BuiltinScalarFunction; use datafusion_expr::Operator; use datafusion_expr::ScalarFunctionDefinition; -use datafusion_expr::Simplified; use datafusion_expr::{BinaryExpr, Expr, LogicalPlan}; use super::AnalyzerRule; @@ -319,68 +318,4 @@ fn rewrite_array_concat_operator_to_func_for_column( } _ => Ok(None), } -} - -#[derive(Default)] -pub struct FunctionSimplifiction {} - -impl FunctionSimplifiction { - pub fn new() -> Self { - Self {} - } -} - -impl AnalyzerRule for FunctionSimplifiction { - fn name(&self) -> &str { - "function_simplification" - } - - fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result { - function_simplication_analyze_internal(&plan) - } -} - -fn function_simplication_analyze_internal(plan: &LogicalPlan) -> Result { - // optimize child plans first - let new_inputs = plan - .inputs() - .iter() - .map(|p| function_simplication_analyze_internal(p)) - .collect::>>()?; - - let mut expr_rewrite = FunctionSimplifictionRewriter {}; - - let new_expr = plan - .expressions() - .into_iter() - .map(|expr| { - // ensure names don't change: - // https://github.com/apache/arrow-datafusion/issues/3555 - rewrite_preserving_name(expr, &mut expr_rewrite) - }) - .collect::>>()?; - - plan.with_new_exprs(new_expr, new_inputs) -} - -pub(crate) struct FunctionSimplifictionRewriter {} - -impl TreeNodeRewriter for FunctionSimplifictionRewriter { - type N = Expr; - - fn mutate(&mut self, expr: Expr) -> Result { - if let Expr::ScalarFunction(ScalarFunction { - func_def: ScalarFunctionDefinition::UDF(udf), - args, - }) = &expr - { - let simplified_expr = udf.simplify(args)?; - match simplified_expr { - Simplified::Original => Ok(expr), - Simplified::Rewritten(expr) => Ok(expr), - } - } else { - Ok(expr) - } - } -} +} \ No newline at end of file diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index 3f65c68bc45b..6ef4d8b27c59 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -19,6 +19,14 @@ use std::ops::Not; +use super::function_simplifier::FunctionSimplifier; +use super::inlist_simplifier::{InListSimplifier, ShortenInListSimplifier}; +use super::utils::*; +use crate::analyzer::type_coercion::TypeCoercionRewriter; +use crate::simplify_expressions::guarantees::GuaranteeRewriter; +use crate::simplify_expressions::regex::simplify_regex_expr; +use crate::simplify_expressions::SimplifyInfo; + use arrow::{ array::{new_null_array, AsArray}, datatypes::{DataType, Field, Schema}, @@ -137,6 +145,7 @@ impl ExprSimplifier { let mut shorten_in_list_simplifier = ShortenInListSimplifier::new(); let mut inlist_simplifier = InListSimplifier::new(); let mut guarantee_rewriter = GuaranteeRewriter::new(&self.guarantees); + let mut function_simplifier = FunctionSimplifier::new(); let expr = if self.canonicalize { expr.rewrite(&mut Canonicalizer::new())? @@ -148,7 +157,8 @@ impl ExprSimplifier { // (evaluating constants can enable new simplifications and // simplifications can enable new constant evaluation) // https://github.com/apache/arrow-datafusion/issues/1160 - expr.rewrite(&mut const_evaluator)? + expr.rewrite(&mut function_simplifier)? + .rewrite(&mut const_evaluator)? .rewrite(&mut simplifier)? .rewrite(&mut inlist_simplifier)? .rewrite(&mut shorten_in_list_simplifier)? diff --git a/datafusion/optimizer/src/simplify_expressions/function_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/function_simplifier.rs new file mode 100644 index 000000000000..251662690f5a --- /dev/null +++ b/datafusion/optimizer/src/simplify_expressions/function_simplifier.rs @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module implements a rule that do function simplification. + +use datafusion_common::tree_node::TreeNodeRewriter; +use datafusion_common::Result; +use datafusion_expr::{expr::ScalarFunction, Expr, ScalarFunctionDefinition}; +use datafusion_expr::Simplified; + + +#[derive(Default)] +pub(super) struct FunctionSimplifier {} + +impl FunctionSimplifier { + pub(super) fn new() -> Self { + Self {} + } +} + +impl TreeNodeRewriter for FunctionSimplifier { + type N = Expr; + + fn mutate(&mut self, expr: Expr) -> Result { + if let Expr::ScalarFunction(ScalarFunction { + func_def: ScalarFunctionDefinition::UDF(udf), + args, + }) = &expr + { + let simplified_expr = udf.simplify(args)?; + match simplified_expr { + Simplified::Original => Ok(expr), + Simplified::Rewritten(expr) => Ok(expr), + } + } else { + Ok(expr) + } + } +} diff --git a/datafusion/optimizer/src/simplify_expressions/mod.rs b/datafusion/optimizer/src/simplify_expressions/mod.rs index a03dd767e911..6334783ca8da 100644 --- a/datafusion/optimizer/src/simplify_expressions/mod.rs +++ b/datafusion/optimizer/src/simplify_expressions/mod.rs @@ -19,6 +19,7 @@ pub mod context; pub mod expr_simplifier; mod guarantees; mod inlist_simplifier; +mod function_simplifier; mod regex; pub mod simplify_exprs; mod utils; From c73126addda31fafc658c4fbadbd03b9bd2f57f7 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Wed, 28 Feb 2024 08:37:11 +0800 Subject: [PATCH 08/26] pass with schema Signed-off-by: jayzhan211 --- datafusion/core/tests/simplification.rs | 28 +++++++++++++------ .../user_defined_scalar_functions.rs | 11 ++------ datafusion/expr/src/udf.rs | 12 ++++---- .../optimizer/src/analyzer/rewrite_expr.rs | 2 +- .../src/simplify_expressions/context.rs | 7 +++++ .../simplify_expressions/expr_simplifier.rs | 2 +- .../function_simplifier.rs | 25 +++++++++++------ .../optimizer/src/simplify_expressions/mod.rs | 2 +- 8 files changed, 55 insertions(+), 34 deletions(-) diff --git a/datafusion/core/tests/simplification.rs b/datafusion/core/tests/simplification.rs index 5fe64ca5bf04..1a0d922a0fdb 100644 --- a/datafusion/core/tests/simplification.rs +++ b/datafusion/core/tests/simplification.rs @@ -21,7 +21,6 @@ use arrow::datatypes::{DataType, Field, Schema}; use arrow_array::{ArrayRef, Int32Array}; use chrono::{DateTime, TimeZone, Utc}; use datafusion::common::DFSchema; -use datafusion::{error::Result, execution::context::ExecutionProps, prelude::*}; use datafusion_common::cast::as_int32_array; use datafusion_common::ScalarValue; use datafusion_expr::expr::ScalarFunction; @@ -34,6 +33,10 @@ use datafusion_optimizer::simplify_expressions::{ }; use datafusion_optimizer::{OptimizerContext, OptimizerRule}; use std::sync::Arc; +use datafusion::{error::Result, execution::context::ExecutionProps, prelude::*}; +use datafusion_common::{DFSchemaRef, ToDFSchema}; +use datafusion_expr::{Expr, ExprSchemable}; +use datafusion_optimizer::simplify_expressions::{ExprSimplifier, SimplifyInfo}; /// In order to simplify expressions, DataFusion must have information /// about the expressions. @@ -42,7 +45,7 @@ use std::sync::Arc; /// objects or from some other implementation struct MyInfo { /// The input schema - schema: DFSchema, + schema: DFSchemaRef, /// Execution specific details needed for constant evaluation such /// as the current time for `now()` and [VariableProviders] @@ -51,11 +54,14 @@ struct MyInfo { impl SimplifyInfo for MyInfo { fn is_boolean_type(&self, expr: &Expr) -> Result { - Ok(matches!(expr.get_type(&self.schema)?, DataType::Boolean)) + Ok(matches!( + expr.get_type(self.schema.as_ref())?, + DataType::Boolean + )) } fn nullable(&self, expr: &Expr) -> Result { - expr.nullable(&self.schema) + expr.nullable(self.schema.as_ref()) } fn execution_props(&self) -> &ExecutionProps { @@ -63,12 +69,16 @@ impl SimplifyInfo for MyInfo { } fn get_data_type(&self, expr: &Expr) -> Result { - expr.get_type(&self.schema) + expr.get_type(self.schema.as_ref()) + } + + fn schema(&self) -> Option { + Some(self.schema.clone()) } } -impl From for MyInfo { - fn from(schema: DFSchema) -> Self { +impl From for MyInfo { + fn from(schema: DFSchemaRef) -> Self { Self { schema, execution_props: ExecutionProps::new(), @@ -81,13 +91,13 @@ impl From for MyInfo { /// a: Int32 (possibly with nulls) /// b: Int32 /// s: Utf8 -fn schema() -> DFSchema { +fn schema() -> DFSchemaRef { Schema::new(vec![ Field::new("a", DataType::Int32, true), Field::new("b", DataType::Int32, false), Field::new("s", DataType::Utf8, false), ]) - .try_into() + .to_dfschema_ref() .unwrap() } diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index 1a425eae5c9f..5a9705549cc2 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -24,11 +24,11 @@ use arrow_schema::{DataType, Field, Schema}; use datafusion::prelude::*; use datafusion::{execution::registry::FunctionRegistry, test_util}; use datafusion_common::cast::as_float64_array; +use datafusion_common::DFSchemaRef; use datafusion_common::{ assert_batches_eq, assert_batches_sorted_eq, cast::as_int32_array, not_impl_err, plan_err, ExprSchema, Result, ScalarValue, }; -use datafusion_common::{DFField, DFSchema}; use datafusion_expr::{ create_udaf, create_udf, Accumulator, ColumnarValue, ExprSchemable, LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, Simplified, Volatility, @@ -36,7 +36,6 @@ use datafusion_expr::{ use rand::{thread_rng, Rng}; use std::any::Any; -use std::collections::HashMap; use std::iter; use std::sync::Arc; @@ -546,13 +545,9 @@ impl ScalarUDFImpl for CastToI64UDF { Ok(DataType::Int64) } // Wrap with Expr::Cast() to Int64 - fn simplify(&self, args: &[Expr]) -> Result { - let dfs = DFSchema::new_with_metadata( - vec![DFField::new(Some("t"), "x", DataType::Float32, true)], - HashMap::default(), - )?; + fn simplify(&self, args: &[Expr], schema: DFSchemaRef) -> Result { let e = args[0].to_owned(); - let casted_expr = e.cast_to(&DataType::Int64, &dfs)?; + let casted_expr = e.cast_to(&DataType::Int64, schema.as_ref())?; Ok(Simplified::Rewritten(casted_expr)) } // Casting should be done in `simplify`, so we just return the first argument diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index abca0ebb038b..934f825de703 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -23,7 +23,7 @@ use crate::{ ScalarFunctionImplementation, Signature, }; use arrow::datatypes::DataType; -use datafusion_common::{ExprSchema, Result}; +use datafusion_common::{DFSchemaRef, ExprSchema, Result}; use std::any::Any; use std::fmt; use std::fmt::Debug; @@ -173,8 +173,8 @@ impl ScalarUDF { /// Do the function rewrite /// /// See [`ScalarUDFImpl::simplify`] for more details. - pub fn simplify(&self, args: &[Expr]) -> Result { - self.inner.simplify(args) + pub fn simplify(&self, args: &[Expr], schema: DFSchemaRef) -> Result { + self.inner.simplify(args, schema) } /// Invoke the function on `args`, returning the appropriate result. @@ -355,8 +355,10 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { Ok(None) } - // Do the function rewrite - fn simplify(&self, _args: &[Expr]) -> Result { + // Do the function rewrite. + // 'args': The arguments of the function + // 'schema': The schema of the function + fn simplify(&self, _args: &[Expr], _schema: DFSchemaRef) -> Result { Ok(Simplified::Original) } } diff --git a/datafusion/optimizer/src/analyzer/rewrite_expr.rs b/datafusion/optimizer/src/analyzer/rewrite_expr.rs index 7a0402c91571..eedfc40a7f80 100644 --- a/datafusion/optimizer/src/analyzer/rewrite_expr.rs +++ b/datafusion/optimizer/src/analyzer/rewrite_expr.rs @@ -318,4 +318,4 @@ fn rewrite_array_concat_operator_to_func_for_column( } _ => Ok(None), } -} \ No newline at end of file +} diff --git a/datafusion/optimizer/src/simplify_expressions/context.rs b/datafusion/optimizer/src/simplify_expressions/context.rs index 34f3908c7e42..fcabc4237e4d 100644 --- a/datafusion/optimizer/src/simplify_expressions/context.rs +++ b/datafusion/optimizer/src/simplify_expressions/context.rs @@ -41,6 +41,9 @@ pub trait SimplifyInfo { /// Returns data type of this expr needed for determining optimized int type of a value fn get_data_type(&self, expr: &Expr) -> Result; + + /// Return the schema for function simplifier + fn schema(&self) -> Option; } /// Provides simplification information based on DFSchema and @@ -97,6 +100,10 @@ impl<'a> SimplifyContext<'a> { } impl<'a> SimplifyInfo for SimplifyContext<'a> { + fn schema(&self) -> Option { + self.schema.clone() + } + /// returns true if this Expr has boolean type fn is_boolean_type(&self, expr: &Expr) -> Result { for schema in &self.schema { diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index 6ef4d8b27c59..e39dc93d41c5 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -145,7 +145,7 @@ impl ExprSimplifier { let mut shorten_in_list_simplifier = ShortenInListSimplifier::new(); let mut inlist_simplifier = InListSimplifier::new(); let mut guarantee_rewriter = GuaranteeRewriter::new(&self.guarantees); - let mut function_simplifier = FunctionSimplifier::new(); + let mut function_simplifier = FunctionSimplifier::new(&self.info); let expr = if self.canonicalize { expr.rewrite(&mut Canonicalizer::new())? diff --git a/datafusion/optimizer/src/simplify_expressions/function_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/function_simplifier.rs index 251662690f5a..25b33f1632e4 100644 --- a/datafusion/optimizer/src/simplify_expressions/function_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/function_simplifier.rs @@ -18,21 +18,23 @@ //! This module implements a rule that do function simplification. use datafusion_common::tree_node::TreeNodeRewriter; -use datafusion_common::Result; -use datafusion_expr::{expr::ScalarFunction, Expr, ScalarFunctionDefinition}; +use datafusion_common::{DFSchema, Result}; use datafusion_expr::Simplified; +use datafusion_expr::{expr::ScalarFunction, Expr, ScalarFunctionDefinition}; +use super::SimplifyInfo; -#[derive(Default)] -pub(super) struct FunctionSimplifier {} +pub(super) struct FunctionSimplifier<'a, S> { + info: &'a S, +} -impl FunctionSimplifier { - pub(super) fn new() -> Self { - Self {} +impl<'a, S> FunctionSimplifier<'a, S> { + pub(super) fn new(info: &'a S) -> Self { + Self { info } } } -impl TreeNodeRewriter for FunctionSimplifier { +impl<'a, S: SimplifyInfo> TreeNodeRewriter for FunctionSimplifier<'a, S> { type N = Expr; fn mutate(&mut self, expr: Expr) -> Result { @@ -41,7 +43,12 @@ impl TreeNodeRewriter for FunctionSimplifier { args, }) = &expr { - let simplified_expr = udf.simplify(args)?; + let schema = self + .info + .schema() + .unwrap_or_else(|| DFSchema::empty().into()); + + let simplified_expr = udf.simplify(args, schema)?; match simplified_expr { Simplified::Original => Ok(expr), Simplified::Rewritten(expr) => Ok(expr), diff --git a/datafusion/optimizer/src/simplify_expressions/mod.rs b/datafusion/optimizer/src/simplify_expressions/mod.rs index 6334783ca8da..8bab9615a4e7 100644 --- a/datafusion/optimizer/src/simplify_expressions/mod.rs +++ b/datafusion/optimizer/src/simplify_expressions/mod.rs @@ -17,9 +17,9 @@ pub mod context; pub mod expr_simplifier; +mod function_simplifier; mod guarantees; mod inlist_simplifier; -mod function_simplifier; mod regex; pub mod simplify_exprs; mod utils; From dd99362523b9577a0e870f028bd2539d084388f0 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Wed, 28 Feb 2024 08:51:01 +0800 Subject: [PATCH 09/26] fix explain Signed-off-by: jayzhan211 --- datafusion/sqllogictest/test_files/explain.slt | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 83c02c302b67..4002164cc918 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -181,7 +181,6 @@ Projection: simple_explain_test.a, simple_explain_test.b, simple_explain_test.c --TableScan: simple_explain_test logical_plan after inline_table_scan SAME TEXT AS ABOVE logical_plan after operator_to_function SAME TEXT AS ABOVE -logical_plan after function_simplification SAME TEXT AS ABOVE logical_plan after type_coercion SAME TEXT AS ABOVE logical_plan after count_wildcard_rule SAME TEXT AS ABOVE analyzed_logical_plan SAME TEXT AS ABOVE From 0b66ed32eff663dc0ae1884f401ec6a2a8fef120 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Wed, 28 Feb 2024 19:18:46 +0800 Subject: [PATCH 10/26] fix doc Signed-off-by: jayzhan211 --- .../optimizer/src/simplify_expressions/expr_simplifier.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index e39dc93d41c5..5e968ae6b19e 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -103,6 +103,8 @@ impl ExprSimplifier { /// use datafusion_common::Result; /// use datafusion_physical_expr::execution_props::ExecutionProps; /// use datafusion_optimizer::simplify_expressions::{ExprSimplifier, SimplifyInfo}; + /// use datafusion_common::DFSchema; + /// use std::sync::Arc; /// /// /// Simple implementation that provides `Simplifier` the information it needs /// /// See SimplifyContext for a structure that does this. @@ -124,6 +126,7 @@ impl ExprSimplifier { /// fn get_data_type(&self, expr: &Expr) -> Result { /// Ok(DataType::Int32) /// } + /// fn schema(&self) -> Option> { None } /// } /// /// // Create the simplifier From 079827407381218c70d17f7e01e99eec7d5f9de7 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 2 Mar 2024 09:55:32 +0800 Subject: [PATCH 11/26] move to expr Signed-off-by: jayzhan211 --- datafusion-cli/Cargo.lock | 1 + datafusion-examples/examples/expr_api.rs | 5 ++- datafusion-examples/examples/simple_udtf.rs | 5 ++- .../core/src/datasource/listing/helpers.rs | 2 +- .../datasource/physical_plan/parquet/mod.rs | 7 ++-- .../physical_plan/parquet/row_filter.rs | 2 +- .../physical_plan/parquet/row_groups.rs | 2 +- datafusion/core/src/execution/context/mod.rs | 4 +- .../core/src/physical_optimizer/pruning.rs | 2 +- datafusion/core/src/test_util/parquet.rs | 5 ++- datafusion/core/src/variable/mod.rs | 2 +- datafusion/core/tests/dataframe/mod.rs | 2 +- datafusion/core/tests/parquet/page_pruning.rs | 2 +- datafusion/core/tests/simplification.rs | 3 +- .../src/execution_props.rs | 0 datafusion/expr/src/lib.rs | 3 ++ .../context.rs => expr/src/simplify.rs} | 9 +++-- datafusion/expr/src/udf.rs | 1 + .../src/var_provider.rs | 0 datafusion/optimizer/src/decorrelate.rs | 7 ++-- .../simplify_expressions/expr_simplifier.rs | 38 ++++++++++++------- .../function_simplifier.rs | 3 +- .../optimizer/src/simplify_expressions/mod.rs | 2 - .../simplify_expressions/simplify_exprs.rs | 5 ++- .../src/simplify_expressions/utils.rs | 6 +-- .../physical-expr/src/equivalence/ordering.rs | 2 +- .../src/equivalence/projection.rs | 2 +- .../src/equivalence/properties.rs | 2 +- datafusion/physical-expr/src/functions.rs | 4 +- datafusion/physical-expr/src/lib.rs | 2 - datafusion/physical-expr/src/physical_expr.rs | 4 +- datafusion/physical-expr/src/planner.rs | 9 ++--- .../physical-expr/src/utils/guarantee.rs | 2 +- datafusion/wasmtest/src/lib.rs | 5 ++- 34 files changed, 86 insertions(+), 64 deletions(-) rename datafusion/{physical-expr => expr}/src/execution_props.rs (100%) rename datafusion/{optimizer/src/simplify_expressions/context.rs => expr/src/simplify.rs} (94%) rename datafusion/{physical-expr => expr}/src/var_provider.rs (100%) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 2379a30ce10f..280984b3006e 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1219,6 +1219,7 @@ dependencies = [ "ahash", "arrow", "arrow-array", + "chrono", "datafusion-common", "paste", "sqlparser", diff --git a/datafusion-examples/examples/expr_api.rs b/datafusion-examples/examples/expr_api.rs index 9739b44aafa0..5f9f3106e14d 100644 --- a/datafusion-examples/examples/expr_api.rs +++ b/datafusion-examples/examples/expr_api.rs @@ -24,15 +24,16 @@ use arrow::record_batch::RecordBatch; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion::common::{DFField, DFSchema}; use datafusion::error::Result; -use datafusion::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext}; -use datafusion::physical_expr::execution_props::ExecutionProps; +use datafusion::optimizer::simplify_expressions::ExprSimplifier; use datafusion::physical_expr::{ analyze, create_physical_expr, AnalysisContext, ExprBoundaries, PhysicalExpr, }; use datafusion::prelude::*; use datafusion_common::{ScalarValue, ToDFSchema}; +use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::expr::BinaryExpr; use datafusion_expr::interval_arithmetic::Interval; +use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::{ColumnarValue, ExprSchemable, Operator}; /// This example demonstrates the DataFusion [`Expr`] API. diff --git a/datafusion-examples/examples/simple_udtf.rs b/datafusion-examples/examples/simple_udtf.rs index 09341fbf47fa..5652bc7872da 100644 --- a/datafusion-examples/examples/simple_udtf.rs +++ b/datafusion-examples/examples/simple_udtf.rs @@ -27,9 +27,10 @@ use datafusion::execution::context::{ExecutionProps, SessionState}; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; -use datafusion_common::{plan_err, ScalarValue}; +use datafusion_common::{plan_err, DataFusionError, ScalarValue}; +use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::{Expr, TableType}; -use datafusion_optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext}; +use datafusion_optimizer::simplify_expressions::ExprSimplifier; use std::fs::File; use std::io::Seek; use std::path::Path; diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 077356b716b0..6040c400140e 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -39,9 +39,9 @@ use crate::datasource::listing::ListingTableUrl; use crate::execution::context::SessionState; use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_common::{internal_err, Column, DFField, DFSchema, DataFusionError}; +use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{Expr, ScalarFunctionDefinition, Volatility}; use datafusion_physical_expr::create_physical_expr; -use datafusion_physical_expr::execution_props::ExecutionProps; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 12b62fd68068..f41c326fa405 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -804,11 +804,12 @@ mod tests { use arrow::record_batch::RecordBatch; use arrow_schema::Fields; use datafusion_common::{assert_contains, FileType, GetExt, ScalarValue, ToDFSchema}; - use datafusion_expr::{col, lit, when, Expr}; - use datafusion_physical_expr::create_physical_expr; - use datafusion_physical_expr::execution_props::ExecutionProps; use chrono::{TimeZone, Utc}; + use datafusion_common::{FileType, GetExt, ScalarValue}; + use datafusion_expr::execution_props::ExecutionProps; + use datafusion_expr::{col, lit, when, Expr}; + use datafusion_physical_expr::create_physical_expr; use futures::StreamExt; use object_store::local::LocalFileSystem; use object_store::path::Path; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs index 3c40509a86d2..2762cacad874 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs @@ -394,9 +394,9 @@ mod test { use super::*; use arrow::datatypes::Field; use datafusion_common::ToDFSchema; + use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{cast, col, lit, Expr}; use datafusion_physical_expr::create_physical_expr; - use datafusion_physical_expr::execution_props::ExecutionProps; use parquet::arrow::parquet_to_arrow_schema; use parquet::file::reader::{FileReader, SerializedFileReader}; use rand::prelude::*; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index fa9523a76380..ef2eb775e037 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -346,8 +346,8 @@ mod tests { use arrow::datatypes::Schema; use arrow::datatypes::{DataType, Field}; use datafusion_common::{Result, ToDFSchema}; + use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{cast, col, lit, Expr}; - use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::{create_physical_expr, PhysicalExpr}; use parquet::arrow::arrow_to_parquet_schema; use parquet::arrow::async_reader::ParquetObjectReader; diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 3aa4edfe3adc..926b430c2c15 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -43,12 +43,12 @@ use datafusion_common::{ tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion}, }; use datafusion_execution::registry::SerializerRegistry; +pub use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::var_provider::is_system_variables; use datafusion_expr::{ logical_plan::{DdlStatement, Statement}, Expr, StringifiedPlan, UserDefinedLogicalNode, WindowUDF, }; -pub use datafusion_physical_expr::execution_props::ExecutionProps; -use datafusion_physical_expr::var_provider::is_system_variables; use parking_lot::RwLock; use std::collections::hash_map::Entry; use std::string::String; diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 37f705d8a82f..37f0f553b336 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -1338,10 +1338,10 @@ mod tests { datatypes::{DataType, TimeUnit}, }; use datafusion_common::{ScalarValue, ToDFSchema}; + use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::expr::InList; use datafusion_expr::{cast, is_null, try_cast, Expr}; use datafusion_physical_expr::create_physical_expr; - use datafusion_physical_expr::execution_props::ExecutionProps; use std::collections::HashMap; use std::ops::{Not, Rem}; diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 1047c3dd4e48..6d0711610b5a 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -28,9 +28,10 @@ use crate::datasource::listing::{ListingTableUrl, PartitionedFile}; use crate::datasource::object_store::ObjectStoreUrl; use crate::datasource::physical_plan::{FileScanConfig, ParquetExec}; use crate::error::Result; -use crate::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext}; +use crate::logical_expr::execution_props::ExecutionProps; +use crate::logical_expr::simplify::SimplifyContext; +use crate::optimizer::simplify_expressions::ExprSimplifier; use crate::physical_expr::create_physical_expr; -use crate::physical_expr::execution_props::ExecutionProps; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::metrics::MetricsSet; use crate::physical_plan::ExecutionPlan; diff --git a/datafusion/core/src/variable/mod.rs b/datafusion/core/src/variable/mod.rs index 5ef165313ccf..475f7570a8ee 100644 --- a/datafusion/core/src/variable/mod.rs +++ b/datafusion/core/src/variable/mod.rs @@ -17,4 +17,4 @@ //! Variable provider for `@name` and `@@name` style runtime values. -pub use datafusion_physical_expr::var_provider::{VarProvider, VarType}; +pub use datafusion_expr::var_provider::{VarProvider, VarType}; diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index abe5fd29182e..305a7e69fdb2 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -48,13 +48,13 @@ use datafusion_common::{assert_contains, DataFusionError, ScalarValue, UnnestOpt use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_expr::expr::{GroupingSet, Sort}; +use datafusion_expr::var_provider::{VarProvider, VarType}; use datafusion_expr::{ array_agg, avg, cast, col, count, exists, expr, in_subquery, lit, max, out_ref_col, placeholder, scalar_subquery, sum, when, wildcard, AggregateFunction, Expr, ExprSchemable, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, }; -use datafusion_physical_expr::var_provider::{VarProvider, VarType}; #[tokio::test] async fn test_count_wildcard_on_sort() -> Result<()> { diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index d182986ebbdc..6ee4247eea36 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -28,9 +28,9 @@ use datafusion::physical_plan::metrics::MetricValue; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; use datafusion_common::{ScalarValue, Statistics, ToDFSchema}; +use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{col, lit, Expr}; use datafusion_physical_expr::create_physical_expr; -use datafusion_physical_expr::execution_props::ExecutionProps; use futures::StreamExt; use object_store::path::Path; diff --git a/datafusion/core/tests/simplification.rs b/datafusion/core/tests/simplification.rs index 1a0d922a0fdb..d89edd591e8e 100644 --- a/datafusion/core/tests/simplification.rs +++ b/datafusion/core/tests/simplification.rs @@ -35,8 +35,9 @@ use datafusion_optimizer::{OptimizerContext, OptimizerRule}; use std::sync::Arc; use datafusion::{error::Result, execution::context::ExecutionProps, prelude::*}; use datafusion_common::{DFSchemaRef, ToDFSchema}; +use datafusion_expr::simplify::SimplifyInfo; use datafusion_expr::{Expr, ExprSchemable}; -use datafusion_optimizer::simplify_expressions::{ExprSimplifier, SimplifyInfo}; +use datafusion_optimizer::simplify_expressions::ExprSimplifier; /// In order to simplify expressions, DataFusion must have information /// about the expressions. diff --git a/datafusion/physical-expr/src/execution_props.rs b/datafusion/expr/src/execution_props.rs similarity index 100% rename from datafusion/physical-expr/src/execution_props.rs rename to datafusion/expr/src/execution_props.rs diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 0b6d8aad26c2..e1ea51dc0324 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -40,6 +40,7 @@ mod udwf; pub mod aggregate_function; pub mod conditional_expressions; +pub mod execution_props; pub mod expr; pub mod expr_fn; pub mod expr_rewriter; @@ -49,9 +50,11 @@ pub mod function; pub mod groups_accumulator; pub mod interval_arithmetic; pub mod logical_plan; +pub mod simplify; pub mod tree_node; pub mod type_coercion; pub mod utils; +pub mod var_provider; pub mod window_frame; pub mod window_state; diff --git a/datafusion/optimizer/src/simplify_expressions/context.rs b/datafusion/expr/src/simplify.rs similarity index 94% rename from datafusion/optimizer/src/simplify_expressions/context.rs rename to datafusion/expr/src/simplify.rs index fcabc4237e4d..89b1623aa606 100644 --- a/datafusion/optimizer/src/simplify_expressions/context.rs +++ b/datafusion/expr/src/simplify.rs @@ -19,8 +19,8 @@ use arrow::datatypes::DataType; use datafusion_common::{DFSchemaRef, DataFusionError, Result}; -use datafusion_expr::{Expr, ExprSchemable}; -use datafusion_physical_expr::execution_props::ExecutionProps; + +use crate::{execution_props::ExecutionProps, Expr, ExprSchemable}; #[allow(rustdoc::private_intra_doc_links)] /// The information necessary to apply algebraic simplification to an @@ -54,8 +54,9 @@ pub trait SimplifyInfo { /// use arrow::datatypes::{Schema, Field, DataType}; /// use datafusion_expr::{col, lit}; /// use datafusion_common::{DataFusionError, ToDFSchema}; -/// use datafusion_physical_expr::execution_props::ExecutionProps; -/// use datafusion_optimizer::simplify_expressions::{SimplifyContext, ExprSimplifier}; +/// use datafusion_expr::execution_props::ExecutionProps; +/// use datafusion_expr::simplify::SimplifyContext; +/// use datafusion_optimizer::simplify_expressions::ExprSimplifier; /// /// // Create the schema /// let schema = Schema::new(vec![ diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 934f825de703..a6782dcb5855 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -30,6 +30,7 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; +// TODO(In this PR): Move to simplify.rs /// Was the expression simplified? pub enum Simplified { /// The function call was simplified to an entirely new Expr diff --git a/datafusion/physical-expr/src/var_provider.rs b/datafusion/expr/src/var_provider.rs similarity index 100% rename from datafusion/physical-expr/src/var_provider.rs rename to datafusion/expr/src/var_provider.rs diff --git a/datafusion/optimizer/src/decorrelate.rs b/datafusion/optimizer/src/decorrelate.rs index 0f4b39d9eee3..b49e092ce20c 100644 --- a/datafusion/optimizer/src/decorrelate.rs +++ b/datafusion/optimizer/src/decorrelate.rs @@ -15,17 +15,18 @@ // specific language governing permissions and limitations // under the License. -use crate::simplify_expressions::{ExprSimplifier, SimplifyContext}; +use crate::simplify_expressions::ExprSimplifier; use crate::utils::collect_subquery_cols; use datafusion_common::tree_node::{ RewriteRecursion, Transformed, TreeNode, TreeNodeRewriter, }; use datafusion_common::{plan_err, Result}; -use datafusion_common::{Column, DFSchemaRef, ScalarValue}; +use datafusion_common::{Column, DFSchemaRef, DataFusionError, ScalarValue}; +use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::expr::{AggregateFunctionDefinition, Alias}; +use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::utils::{conjunction, find_join_exprs, split_conjunction}; use datafusion_expr::{expr, EmptyRelation, Expr, LogicalPlan, LogicalPlanBuilder}; -use datafusion_physical_expr::execution_props::ExecutionProps; use std::collections::{BTreeSet, HashMap}; use std::ops::Deref; diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index 5e968ae6b19e..7dca224e1446 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -25,7 +25,6 @@ use super::utils::*; use crate::analyzer::type_coercion::TypeCoercionRewriter; use crate::simplify_expressions::guarantees::GuaranteeRewriter; use crate::simplify_expressions::regex::simplify_regex_expr; -use crate::simplify_expressions::SimplifyInfo; use arrow::{ array::{new_null_array, AsArray}, @@ -40,12 +39,14 @@ use datafusion_common::{ use datafusion_common::{ internal_err, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, }; +use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::simplify::SimplifyInfo; use datafusion_expr::{ and, lit, or, BinaryExpr, BuiltinScalarFunction, Case, ColumnarValue, Expr, Like, ScalarFunctionDefinition, Volatility, }; use datafusion_expr::{expr::ScalarFunction, interval_arithmetic::NullableInterval}; -use datafusion_physical_expr::{create_physical_expr, execution_props::ExecutionProps}; +use datafusion_physical_expr::create_physical_expr; use crate::analyzer::type_coercion::TypeCoercionRewriter; use crate::simplify_expressions::guarantees::GuaranteeRewriter; @@ -101,8 +102,9 @@ impl ExprSimplifier { /// use arrow::datatypes::DataType; /// use datafusion_expr::{col, lit, Expr}; /// use datafusion_common::Result; - /// use datafusion_physical_expr::execution_props::ExecutionProps; - /// use datafusion_optimizer::simplify_expressions::{ExprSimplifier, SimplifyInfo}; + /// use datafusion_expr::execution_props::ExecutionProps; + /// use datafusion_expr::simplify::SimplifyContext; + /// use datafusion_optimizer::simplify_expressions::ExprSimplifier; /// use datafusion_common::DFSchema; /// use std::sync::Arc; /// @@ -202,9 +204,9 @@ impl ExprSimplifier { /// use datafusion_expr::{col, lit, Expr}; /// use datafusion_expr::interval_arithmetic::{Interval, NullableInterval}; /// use datafusion_common::{Result, ScalarValue, ToDFSchema}; - /// use datafusion_physical_expr::execution_props::ExecutionProps; - /// use datafusion_optimizer::simplify_expressions::{ - /// ExprSimplifier, SimplifyContext}; + /// use datafusion_expr::execution_props::ExecutionProps; + /// use datafusion_expr::simplify::SimplifyContext; + /// use datafusion_optimizer::simplify_expressions::ExprSimplifier; /// /// let schema = Schema::new(vec![ /// Field::new("x", DataType::Int64, false), @@ -261,9 +263,9 @@ impl ExprSimplifier { /// use datafusion_expr::{col, lit, Expr}; /// use datafusion_expr::interval_arithmetic::{Interval, NullableInterval}; /// use datafusion_common::{Result, ScalarValue, ToDFSchema}; - /// use datafusion_physical_expr::execution_props::ExecutionProps; - /// use datafusion_optimizer::simplify_expressions::{ - /// ExprSimplifier, SimplifyContext}; + /// use datafusion_expr::execution_props::ExecutionProps; + /// use datafusion_expr::simplify::SimplifyContext; + /// use datafusion_optimizer::simplify_expressions::ExprSimplifier; /// /// let schema = Schema::new(vec![ /// Field::new("a", DataType::Int64, false), @@ -1350,10 +1352,20 @@ mod tests { sync::Arc, }; - use arrow::datatypes::{DataType, Field, Schema}; - use datafusion_common::{assert_contains, DFField, ToDFSchema}; + use super::*; + use crate::simplify_expressions::utils::for_test::{ + cast_to_int64_expr, now_expr, to_timestamp_expr, + }; + use crate::test::test_table_scan_with_name; + + use arrow::{ + array::{ArrayRef, Int32Array}, + datatypes::{DataType, Field, Schema}, + }; + use datafusion_common::{assert_contains, cast::as_int32_array, DFField, ToDFSchema}; + use datafusion_expr::execution_props::ExecutionProps; + use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::{interval_arithmetic::Interval, *}; - use datafusion_physical_expr::execution_props::ExecutionProps; use crate::simplify_expressions::SimplifyContext; use crate::test::test_table_scan_with_name; diff --git a/datafusion/optimizer/src/simplify_expressions/function_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/function_simplifier.rs index 25b33f1632e4..6ff598035a75 100644 --- a/datafusion/optimizer/src/simplify_expressions/function_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/function_simplifier.rs @@ -19,11 +19,10 @@ use datafusion_common::tree_node::TreeNodeRewriter; use datafusion_common::{DFSchema, Result}; +use datafusion_expr::simplify::SimplifyInfo; use datafusion_expr::Simplified; use datafusion_expr::{expr::ScalarFunction, Expr, ScalarFunctionDefinition}; -use super::SimplifyInfo; - pub(super) struct FunctionSimplifier<'a, S> { info: &'a S, } diff --git a/datafusion/optimizer/src/simplify_expressions/mod.rs b/datafusion/optimizer/src/simplify_expressions/mod.rs index 8bab9615a4e7..a105088db647 100644 --- a/datafusion/optimizer/src/simplify_expressions/mod.rs +++ b/datafusion/optimizer/src/simplify_expressions/mod.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -pub mod context; pub mod expr_simplifier; mod function_simplifier; mod guarantees; @@ -24,6 +23,5 @@ mod regex; pub mod simplify_exprs; mod utils; -pub use context::*; pub use expr_simplifier::*; pub use simplify_exprs::*; diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index ddb754a919bd..bb4dabfebaf7 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -19,10 +19,13 @@ use std::sync::Arc; +use super::ExprSimplifier; +use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::{DFSchema, DFSchemaRef, Result}; +use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::logical_plan::LogicalPlan; +use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::utils::merge_schema; -use datafusion_physical_expr::execution_props::ExecutionProps; use crate::{OptimizerConfig, OptimizerRule}; diff --git a/datafusion/optimizer/src/simplify_expressions/utils.rs b/datafusion/optimizer/src/simplify_expressions/utils.rs index 4d3b123bace0..75e32268b293 100644 --- a/datafusion/optimizer/src/simplify_expressions/utils.rs +++ b/datafusion/optimizer/src/simplify_expressions/utils.rs @@ -17,11 +17,11 @@ //! Utility functions for expression simplification -use crate::simplify_expressions::SimplifyInfo; -use datafusion_common::{internal_err, Result, ScalarValue}; use datafusion_expr::expr::ScalarFunction; +use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue}; +use datafusion_expr::simplify::SimplifyInfo; use datafusion_expr::{ - expr::{Between, BinaryExpr, InList}, + expr::{Between, BinaryExpr, InList, ScalarFunction}, expr_fn::{and, bitwise_and, bitwise_or, concat_ws, or}, lit, BuiltinScalarFunction, Expr, Like, Operator, ScalarFunctionDefinition, }; diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 1a414592ce4c..50861ad34ab4 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -228,7 +228,6 @@ mod tests { use crate::equivalence::{ EquivalenceClass, EquivalenceGroup, OrderingEquivalenceClass, }; - use crate::execution_props::ExecutionProps; use crate::expressions::Column; use crate::expressions::{col, BinaryExpr}; use crate::functions::create_physical_expr; @@ -236,6 +235,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::SortOptions; use datafusion_common::Result; + use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{BuiltinScalarFunction, Operator}; use itertools::Itertools; use std::sync::Arc; diff --git a/datafusion/physical-expr/src/equivalence/projection.rs b/datafusion/physical-expr/src/equivalence/projection.rs index 0f92b2c2f431..55ac6632800b 100644 --- a/datafusion/physical-expr/src/equivalence/projection.rs +++ b/datafusion/physical-expr/src/equivalence/projection.rs @@ -115,13 +115,13 @@ mod tests { output_schema, }; use crate::equivalence::EquivalenceProperties; - use crate::execution_props::ExecutionProps; use crate::expressions::{col, BinaryExpr, Literal}; use crate::functions::create_physical_expr; use crate::PhysicalSortExpr; use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::{SortOptions, TimeUnit}; use datafusion_common::{Result, ScalarValue}; + use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{BuiltinScalarFunction, Operator}; use itertools::Itertools; use std::sync::Arc; diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 5a9a4f64876d..43a20c6141f5 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -1293,13 +1293,13 @@ mod tests { create_random_schema, create_test_params, create_test_schema, generate_table_for_eq_properties, is_table_same_after_sort, output_schema, }; - use crate::execution_props::ExecutionProps; use crate::expressions::{col, BinaryExpr, Column}; use crate::functions::create_physical_expr; use crate::PhysicalSortExpr; use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::{Fields, SortOptions, TimeUnit}; use datafusion_common::Result; + use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{BuiltinScalarFunction, Operator}; use itertools::Itertools; diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 56ad92082d9f..7a03b7f1933d 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -30,7 +30,6 @@ //! an argument i32 is passed to a function that supports f64, the //! argument is automatically is coerced to f64. -use crate::execution_props::ExecutionProps; use crate::sort_properties::SortProperties; use crate::{ array_expressions, conditional_expressions, datetime_expressions, math_expressions, @@ -42,7 +41,8 @@ use arrow::{ datatypes::{DataType, Int32Type, Int64Type, Schema}, }; use arrow_array::Array; -use datafusion_common::{exec_err, Result, ScalarValue}; +use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue}; +use datafusion_expr::execution_props::ExecutionProps; pub use datafusion_expr::FuncMonotonicity; use datafusion_expr::{ type_coercion::functions::data_types, BuiltinScalarFunction, ColumnarValue, diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 41d36d8bcbed..df2852beaf42 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -24,7 +24,6 @@ pub mod conditional_expressions; pub mod crypto_expressions; pub mod datetime_expressions; pub mod equivalence; -pub mod execution_props; pub mod expressions; pub mod functions; pub mod intervals; @@ -44,7 +43,6 @@ pub mod udf; #[cfg(feature = "unicode_expressions")] pub mod unicode_expressions; pub mod utils; -pub mod var_provider; pub mod window; pub use aggregate::groups_accumulator::{GroupsAccumulatorAdapter, NullState}; diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index 567054e2b59e..9aa952f29b65 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -54,7 +54,7 @@ use itertools::izip; /// # use datafusion_common::DFSchema; /// # use datafusion_expr::{Expr, col, lit}; /// # use datafusion_physical_expr::create_physical_expr; -/// # use datafusion_physical_expr::execution_props::ExecutionProps; +/// # use datafusion_expr::execution_props::ExecutionProps; /// // For a logical expression `a = 1`, we can create a physical expression /// let expr = col("a").eq(lit(1)); /// // To create a PhysicalExpr we need 1. a schema @@ -74,7 +74,7 @@ use itertools::izip; /// # use datafusion_common::{assert_batches_eq, DFSchema}; /// # use datafusion_expr::{Expr, col, lit, ColumnarValue}; /// # use datafusion_physical_expr::create_physical_expr; -/// # use datafusion_physical_expr::execution_props::ExecutionProps; +/// # use datafusion_expr::execution_props::ExecutionProps; /// # let expr = col("a").eq(lit(1)); /// # let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); /// # let df_schema = DFSchema::try_from(schema.clone()).unwrap(); diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index bf279518d31d..858dbd30c124 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -16,19 +16,18 @@ // under the License. use crate::expressions::GetFieldAccessExpr; -use crate::var_provider::is_system_variables; use crate::{ - execution_props::ExecutionProps, expressions::{self, binary, like, Column, GetIndexedFieldExpr, Literal}, - functions, udf, - var_provider::VarType, - PhysicalExpr, + functions, udf, PhysicalExpr, }; use arrow::datatypes::Schema; use datafusion_common::{ exec_err, internal_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, }; +use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::expr::{Alias, Cast, InList, ScalarFunction}; +use datafusion_expr::var_provider::is_system_variables; +use datafusion_expr::var_provider::VarType; use datafusion_expr::{ binary_expr, Between, BinaryExpr, Expr, GetFieldAccess, GetIndexedField, Like, Operator, ScalarFunctionDefinition, TryCast, diff --git a/datafusion/physical-expr/src/utils/guarantee.rs b/datafusion/physical-expr/src/utils/guarantee.rs index 26ee95f4793c..ab756b28428e 100644 --- a/datafusion/physical-expr/src/utils/guarantee.rs +++ b/datafusion/physical-expr/src/utils/guarantee.rs @@ -421,9 +421,9 @@ impl<'a> ColOpLit<'a> { mod test { use super::*; use crate::create_physical_expr; - use crate::execution_props::ExecutionProps; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion_common::ToDFSchema; + use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::expr_fn::*; use datafusion_expr::{lit, Expr}; use itertools::Itertools; diff --git a/datafusion/wasmtest/src/lib.rs b/datafusion/wasmtest/src/lib.rs index 86e29420e8e6..a74cce72ac64 100644 --- a/datafusion/wasmtest/src/lib.rs +++ b/datafusion/wasmtest/src/lib.rs @@ -17,9 +17,10 @@ extern crate wasm_bindgen; use datafusion_common::{DFSchema, ScalarValue}; +use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::lit; -use datafusion_optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext}; -use datafusion_physical_expr::execution_props::ExecutionProps; +use datafusion_expr::simplify::SimplifyContext; +use datafusion_optimizer::simplify_expressions::ExprSimplifier; use datafusion_sql::sqlparser::dialect::GenericDialect; use datafusion_sql::sqlparser::parser::Parser; use std::sync::Arc; From 5fdf177a891cc31442e4326bb21d97cc7d3e2d07 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 2 Mar 2024 10:15:29 +0800 Subject: [PATCH 12/26] change simplify signature Signed-off-by: jayzhan211 --- .../user_defined_scalar_functions.rs | 9 ++++++--- datafusion/expr/src/lib.rs | 2 +- datafusion/expr/src/simplify.rs | 9 +++++++++ datafusion/expr/src/udf.rs | 19 +++++-------------- .../function_simplifier.rs | 11 +++-------- 5 files changed, 24 insertions(+), 26 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index 5a9705549cc2..15b2c1476d94 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -24,14 +24,16 @@ use arrow_schema::{DataType, Field, Schema}; use datafusion::prelude::*; use datafusion::{execution::registry::FunctionRegistry, test_util}; use datafusion_common::cast::as_float64_array; -use datafusion_common::DFSchemaRef; +use datafusion_common::DFSchema; use datafusion_common::{ assert_batches_eq, assert_batches_sorted_eq, cast::as_int32_array, not_impl_err, plan_err, ExprSchema, Result, ScalarValue, }; +use datafusion_expr::simplify::Simplified; +use datafusion_expr::simplify::SimplifyInfo; use datafusion_expr::{ create_udaf, create_udf, Accumulator, ColumnarValue, ExprSchemable, - LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, Simplified, Volatility, + LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, Volatility, }; use rand::{thread_rng, Rng}; @@ -545,8 +547,9 @@ impl ScalarUDFImpl for CastToI64UDF { Ok(DataType::Int64) } // Wrap with Expr::Cast() to Int64 - fn simplify(&self, args: &[Expr], schema: DFSchemaRef) -> Result { + fn simplify(&self, args: &[Expr], info: &dyn SimplifyInfo) -> Result { let e = args[0].to_owned(); + let schema = info.schema().unwrap_or_else(|| DFSchema::empty().into()); let casted_expr = e.cast_to(&DataType::Int64, schema.as_ref())?; Ok(Simplified::Rewritten(casted_expr)) } diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index e1ea51dc0324..a297f2dc7886 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -83,7 +83,7 @@ pub use signature::{ }; pub use table_source::{TableProviderFilterPushDown, TableSource, TableType}; pub use udaf::{AggregateUDF, AggregateUDFImpl}; -pub use udf::{ScalarUDF, ScalarUDFImpl, Simplified}; +pub use udf::{ScalarUDF, ScalarUDFImpl}; pub use udwf::{WindowUDF, WindowUDFImpl}; pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits}; diff --git a/datafusion/expr/src/simplify.rs b/datafusion/expr/src/simplify.rs index 89b1623aa606..3c34f9d74a5b 100644 --- a/datafusion/expr/src/simplify.rs +++ b/datafusion/expr/src/simplify.rs @@ -140,3 +140,12 @@ impl<'a> SimplifyInfo for SimplifyContext<'a> { self.props } } + +/// Was the expression simplified? +pub enum Simplified { + /// The function call was simplified to an entirely new Expr + Rewritten(Expr), + /// the function call could not be simplified, and the arguments + /// are return unmodified + Original, +} diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index a6782dcb5855..9302ad9f821f 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -17,29 +17,20 @@ //! [`ScalarUDF`]: Scalar User Defined Functions +use crate::simplify::{Simplified, SimplifyInfo}; use crate::ExprSchemable; use crate::{ ColumnarValue, Expr, FuncMonotonicity, ReturnTypeFunction, ScalarFunctionImplementation, Signature, }; use arrow::datatypes::DataType; -use datafusion_common::{DFSchemaRef, ExprSchema, Result}; +use datafusion_common::{ExprSchema, Result}; use std::any::Any; use std::fmt; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; -// TODO(In this PR): Move to simplify.rs -/// Was the expression simplified? -pub enum Simplified { - /// The function call was simplified to an entirely new Expr - Rewritten(Expr), - /// the function call could not be simplified, and the arguments - /// are return unmodified - Original, -} - /// Logical representation of a Scalar User Defined Function. /// /// A scalar function produces a single row output for each row of input. This @@ -174,8 +165,8 @@ impl ScalarUDF { /// Do the function rewrite /// /// See [`ScalarUDFImpl::simplify`] for more details. - pub fn simplify(&self, args: &[Expr], schema: DFSchemaRef) -> Result { - self.inner.simplify(args, schema) + pub fn simplify(&self, args: &[Expr], info: &dyn SimplifyInfo) -> Result { + self.inner.simplify(args, info) } /// Invoke the function on `args`, returning the appropriate result. @@ -359,7 +350,7 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { // Do the function rewrite. // 'args': The arguments of the function // 'schema': The schema of the function - fn simplify(&self, _args: &[Expr], _schema: DFSchemaRef) -> Result { + fn simplify(&self, _args: &[Expr], _info: &dyn SimplifyInfo) -> Result { Ok(Simplified::Original) } } diff --git a/datafusion/optimizer/src/simplify_expressions/function_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/function_simplifier.rs index 6ff598035a75..dc8de3b48dab 100644 --- a/datafusion/optimizer/src/simplify_expressions/function_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/function_simplifier.rs @@ -18,9 +18,9 @@ //! This module implements a rule that do function simplification. use datafusion_common::tree_node::TreeNodeRewriter; -use datafusion_common::{DFSchema, Result}; +use datafusion_common::Result; +use datafusion_expr::simplify::Simplified; use datafusion_expr::simplify::SimplifyInfo; -use datafusion_expr::Simplified; use datafusion_expr::{expr::ScalarFunction, Expr, ScalarFunctionDefinition}; pub(super) struct FunctionSimplifier<'a, S> { @@ -42,12 +42,7 @@ impl<'a, S: SimplifyInfo> TreeNodeRewriter for FunctionSimplifier<'a, S> { args, }) = &expr { - let schema = self - .info - .schema() - .unwrap_or_else(|| DFSchema::empty().into()); - - let simplified_expr = udf.simplify(args, schema)?; + let simplified_expr = udf.simplify(args, self.info)?; match simplified_expr { Simplified::Original => Ok(expr), Simplified::Rewritten(expr) => Ok(expr), From ab66a194f08736e9a25ac264a3a405fb383c4ab6 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 2 Mar 2024 11:08:34 +0800 Subject: [PATCH 13/26] cleanup Signed-off-by: jayzhan211 --- datafusion-examples/examples/simple_udtf.rs | 2 +- .../datasource/physical_plan/parquet/mod.rs | 6 ++--- datafusion/core/tests/simplification.rs | 13 ++++------- datafusion/expr/Cargo.toml | 1 + datafusion/optimizer/src/decorrelate.rs | 2 +- .../simplify_expressions/expr_simplifier.rs | 23 +++---------------- .../simplify_expressions/simplify_exprs.rs | 4 +--- .../src/simplify_expressions/utils.rs | 3 +-- datafusion/physical-expr/src/functions.rs | 2 +- 9 files changed, 16 insertions(+), 40 deletions(-) diff --git a/datafusion-examples/examples/simple_udtf.rs b/datafusion-examples/examples/simple_udtf.rs index 5652bc7872da..c68c21fab169 100644 --- a/datafusion-examples/examples/simple_udtf.rs +++ b/datafusion-examples/examples/simple_udtf.rs @@ -27,7 +27,7 @@ use datafusion::execution::context::{ExecutionProps, SessionState}; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; -use datafusion_common::{plan_err, DataFusionError, ScalarValue}; +use datafusion_common::{plan_err, ScalarValue}; use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::{Expr, TableType}; use datafusion_optimizer::simplify_expressions::ExprSimplifier; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index f41c326fa405..2f3b151e7763 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -800,16 +800,16 @@ mod tests { ArrayRef, Date64Array, Int32Array, Int64Array, Int8Array, StringArray, StructArray, }; + use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder}; use arrow::record_batch::RecordBatch; use arrow_schema::Fields; use datafusion_common::{assert_contains, FileType, GetExt, ScalarValue, ToDFSchema}; - - use chrono::{TimeZone, Utc}; - use datafusion_common::{FileType, GetExt, ScalarValue}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{col, lit, when, Expr}; use datafusion_physical_expr::create_physical_expr; + + use chrono::{TimeZone, Utc}; use futures::StreamExt; use object_store::local::LocalFileSystem; use object_store::path::Path; diff --git a/datafusion/core/tests/simplification.rs b/datafusion/core/tests/simplification.rs index d89edd591e8e..c3e5ea7c4269 100644 --- a/datafusion/core/tests/simplification.rs +++ b/datafusion/core/tests/simplification.rs @@ -20,24 +20,19 @@ use arrow::datatypes::{DataType, Field, Schema}; use arrow_array::{ArrayRef, Int32Array}; use chrono::{DateTime, TimeZone, Utc}; -use datafusion::common::DFSchema; +use datafusion::{error::Result, execution::context::ExecutionProps, prelude::*}; use datafusion_common::cast::as_int32_array; use datafusion_common::ScalarValue; +use datafusion_common::{DFSchemaRef, ToDFSchema}; use datafusion_expr::expr::ScalarFunction; +use datafusion_expr::simplify::SimplifyInfo; use datafusion_expr::{ expr, table_scan, BuiltinScalarFunction, Cast, ColumnarValue, Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder, ScalarUDF, Volatility, }; -use datafusion_optimizer::simplify_expressions::{ - ExprSimplifier, SimplifyExpressions, SimplifyInfo, -}; +use datafusion_optimizer::simplify_expressions::{ExprSimplifier, SimplifyExpressions}; use datafusion_optimizer::{OptimizerContext, OptimizerRule}; use std::sync::Arc; -use datafusion::{error::Result, execution::context::ExecutionProps, prelude::*}; -use datafusion_common::{DFSchemaRef, ToDFSchema}; -use datafusion_expr::simplify::SimplifyInfo; -use datafusion_expr::{Expr, ExprSchemable}; -use datafusion_optimizer::simplify_expressions::ExprSimplifier; /// In order to simplify expressions, DataFusion must have information /// about the expressions. diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index 6e430943cf5c..621a320230f2 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -40,6 +40,7 @@ ahash = { version = "0.8", default-features = false, features = [ ] } arrow = { workspace = true } arrow-array = { workspace = true } +chrono = { workspace = true } datafusion-common = { workspace = true, default-features = true } paste = "^1.0" sqlparser = { workspace = true } diff --git a/datafusion/optimizer/src/decorrelate.rs b/datafusion/optimizer/src/decorrelate.rs index b49e092ce20c..cea8d6eb1364 100644 --- a/datafusion/optimizer/src/decorrelate.rs +++ b/datafusion/optimizer/src/decorrelate.rs @@ -21,7 +21,7 @@ use datafusion_common::tree_node::{ RewriteRecursion, Transformed, TreeNode, TreeNodeRewriter, }; use datafusion_common::{plan_err, Result}; -use datafusion_common::{Column, DFSchemaRef, DataFusionError, ScalarValue}; +use datafusion_common::{Column, DFSchemaRef, ScalarValue}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::expr::{AggregateFunctionDefinition, Alias}; use datafusion_expr::simplify::SimplifyContext; diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index 7dca224e1446..6077803b293a 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -19,13 +19,6 @@ use std::ops::Not; -use super::function_simplifier::FunctionSimplifier; -use super::inlist_simplifier::{InListSimplifier, ShortenInListSimplifier}; -use super::utils::*; -use crate::analyzer::type_coercion::TypeCoercionRewriter; -use crate::simplify_expressions::guarantees::GuaranteeRewriter; -use crate::simplify_expressions::regex::simplify_regex_expr; - use arrow::{ array::{new_null_array, AsArray}, datatypes::{DataType, Field, Schema}, @@ -51,8 +44,8 @@ use datafusion_physical_expr::create_physical_expr; use crate::analyzer::type_coercion::TypeCoercionRewriter; use crate::simplify_expressions::guarantees::GuaranteeRewriter; use crate::simplify_expressions::regex::simplify_regex_expr; -use crate::simplify_expressions::SimplifyInfo; +use super::function_simplifier::FunctionSimplifier; use super::inlist_simplifier::{InListSimplifier, ShortenInListSimplifier}; use super::utils::*; @@ -1352,22 +1345,12 @@ mod tests { sync::Arc, }; - use super::*; - use crate::simplify_expressions::utils::for_test::{ - cast_to_int64_expr, now_expr, to_timestamp_expr, - }; - use crate::test::test_table_scan_with_name; - - use arrow::{ - array::{ArrayRef, Int32Array}, - datatypes::{DataType, Field, Schema}, - }; - use datafusion_common::{assert_contains, cast::as_int32_array, DFField, ToDFSchema}; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::{assert_contains, DFField, ToDFSchema}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::{interval_arithmetic::Interval, *}; - use crate::simplify_expressions::SimplifyContext; use crate::test::test_table_scan_with_name; use super::*; diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index bb4dabfebaf7..00d60d0a80dc 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -19,8 +19,6 @@ use std::sync::Arc; -use super::ExprSimplifier; -use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::{DFSchema, DFSchemaRef, Result}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::logical_plan::LogicalPlan; @@ -29,7 +27,7 @@ use datafusion_expr::utils::merge_schema; use crate::{OptimizerConfig, OptimizerRule}; -use super::{ExprSimplifier, SimplifyContext}; +use super::ExprSimplifier; /// Optimizer Pass that simplifies [`LogicalPlan`]s by rewriting /// [`Expr`]`s evaluating constants and applying algebraic diff --git a/datafusion/optimizer/src/simplify_expressions/utils.rs b/datafusion/optimizer/src/simplify_expressions/utils.rs index 75e32268b293..8952d5d79856 100644 --- a/datafusion/optimizer/src/simplify_expressions/utils.rs +++ b/datafusion/optimizer/src/simplify_expressions/utils.rs @@ -17,8 +17,7 @@ //! Utility functions for expression simplification -use datafusion_expr::expr::ScalarFunction; -use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue}; +use datafusion_common::{internal_err, Result, ScalarValue}; use datafusion_expr::simplify::SimplifyInfo; use datafusion_expr::{ expr::{Between, BinaryExpr, InList, ScalarFunction}, diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 7a03b7f1933d..9aee375bcd4e 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -41,7 +41,7 @@ use arrow::{ datatypes::{DataType, Int32Type, Int64Type, Schema}, }; use arrow_array::Array; -use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue}; +use datafusion_common::{exec_err, Result, ScalarValue}; use datafusion_expr::execution_props::ExecutionProps; pub use datafusion_expr::FuncMonotonicity; use datafusion_expr::{ From 7c7b65408c1f55d3b5188b4fbdab877b4cc622c0 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 2 Mar 2024 11:22:20 +0800 Subject: [PATCH 14/26] cleanup Signed-off-by: jayzhan211 --- datafusion/expr/src/execution_props.rs | 1 + .../optimizer/src/simplify_expressions/expr_simplifier.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/expr/src/execution_props.rs b/datafusion/expr/src/execution_props.rs index 8fdbbb7c5452..7e5dd03af32c 100644 --- a/datafusion/expr/src/execution_props.rs +++ b/datafusion/expr/src/execution_props.rs @@ -32,6 +32,7 @@ use std::sync::Arc; /// done so during predicate pruning and expression simplification /// /// [`LogicalPlan`]: datafusion_expr::LogicalPlan +#[allow(rustdoc::broken_intra_doc_links)] #[derive(Clone, Debug)] pub struct ExecutionProps { pub query_execution_start_time: DateTime, diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index 6077803b293a..91b7bf20d7f5 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -67,7 +67,7 @@ impl ExprSimplifier { /// instance of [`SimplifyContext`]. See /// [`simplify`](Self::simplify) for an example. /// - /// [`SimplifyContext`]: crate::simplify_expressions::context::SimplifyContext + /// [`SimplifyContext`]: datafusion_expr::simplify::SimplifyContext pub fn new(info: S) -> Self { Self { info, From cbefb3c58afebfe026704b26659af7352d722426 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 2 Mar 2024 13:17:25 +0800 Subject: [PATCH 15/26] fix doc Signed-off-by: jayzhan211 --- datafusion/expr/src/simplify.rs | 34 +----------------- .../simplify_expressions/expr_simplifier.rs | 35 +++++++++++++++++++ 2 files changed, 36 insertions(+), 33 deletions(-) diff --git a/datafusion/expr/src/simplify.rs b/datafusion/expr/src/simplify.rs index 3c34f9d74a5b..9c6a5eb3f80e 100644 --- a/datafusion/expr/src/simplify.rs +++ b/datafusion/expr/src/simplify.rs @@ -46,39 +46,7 @@ pub trait SimplifyInfo { fn schema(&self) -> Option; } -/// Provides simplification information based on DFSchema and -/// [`ExecutionProps`]. This is the default implementation used by DataFusion -/// -/// For example: -/// ``` -/// use arrow::datatypes::{Schema, Field, DataType}; -/// use datafusion_expr::{col, lit}; -/// use datafusion_common::{DataFusionError, ToDFSchema}; -/// use datafusion_expr::execution_props::ExecutionProps; -/// use datafusion_expr::simplify::SimplifyContext; -/// use datafusion_optimizer::simplify_expressions::ExprSimplifier; -/// -/// // Create the schema -/// let schema = Schema::new(vec![ -/// Field::new("i", DataType::Int64, false), -/// ]) -/// .to_dfschema_ref().unwrap(); -/// -/// // Create the simplifier -/// let props = ExecutionProps::new(); -/// let context = SimplifyContext::new(&props) -/// .with_schema(schema); -/// let simplifier = ExprSimplifier::new(context); -/// -/// // Use the simplifier -/// -/// // b < 2 or (1 > 3) -/// let expr = col("b").lt(lit(2)).or(lit(1).gt(lit(3))); -/// -/// // b < 2 -/// let simplified = simplifier.simplify(expr).unwrap(); -/// assert_eq!(simplified, col("b").lt(lit(2))); -/// ``` +/// See [ExprSimplifier] for an example of how to use this. pub struct SimplifyContext<'a> { schema: Option, props: &'a ExecutionProps, diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index 91b7bf20d7f5..999f3740d67b 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -50,6 +50,40 @@ use super::inlist_simplifier::{InListSimplifier, ShortenInListSimplifier}; use super::utils::*; /// This structure handles API for expression simplification +/// +/// Provides simplification information based on DFSchema and +/// [`ExecutionProps`]. This is the default implementation used by DataFusion +/// +/// For example: +/// ``` +/// use arrow::datatypes::{Schema, Field, DataType}; +/// use datafusion_expr::{col, lit}; +/// use datafusion_common::{DataFusionError, ToDFSchema}; +/// use datafusion_expr::execution_props::ExecutionProps; +/// use datafusion_expr::simplify::SimplifyContext; +/// use datafusion_optimizer::simplify_expressions::ExprSimplifier; +/// +/// // Create the schema +/// let schema = Schema::new(vec![ +/// Field::new("i", DataType::Int64, false), +/// ]) +/// .to_dfschema_ref().unwrap(); +/// +/// // Create the simplifier +/// let props = ExecutionProps::new(); +/// let context = SimplifyContext::new(&props) +/// .with_schema(schema); +/// let simplifier = ExprSimplifier::new(context); +/// +/// // Use the simplifier +/// +/// // b < 2 or (1 > 3) +/// let expr = col("b").lt(lit(2)).or(lit(1).gt(lit(3))); +/// +/// // b < 2 +/// let simplified = simplifier.simplify(expr).unwrap(); +/// assert_eq!(simplified, col("b").lt(lit(2))); +/// ``` pub struct ExprSimplifier { info: S, /// Guarantees about the values of columns. This is provided by the user @@ -97,6 +131,7 @@ impl ExprSimplifier { /// use datafusion_common::Result; /// use datafusion_expr::execution_props::ExecutionProps; /// use datafusion_expr::simplify::SimplifyContext; + /// use datafusion_expr::simplify::SimplifyInfo; /// use datafusion_optimizer::simplify_expressions::ExprSimplifier; /// use datafusion_common::DFSchema; /// use std::sync::Arc; From 77182518dccfc5bb03fbd3e74f7b2f272bd01085 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 2 Mar 2024 14:06:15 +0800 Subject: [PATCH 16/26] fix doc Signed-off-by: jayzhan211 --- datafusion/expr/src/simplify.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/simplify.rs b/datafusion/expr/src/simplify.rs index 9c6a5eb3f80e..01bdad3931fc 100644 --- a/datafusion/expr/src/simplify.rs +++ b/datafusion/expr/src/simplify.rs @@ -46,7 +46,7 @@ pub trait SimplifyInfo { fn schema(&self) -> Option; } -/// See [ExprSimplifier] for an example of how to use this. +/// See `ExprSimplifier` for an example of how to use this. pub struct SimplifyContext<'a> { schema: Option, props: &'a ExecutionProps, From 5cdae92af6e9bc3a31618ef1180f1fabf3c81cc6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 2 Mar 2024 14:18:41 -0500 Subject: [PATCH 17/26] Update datafusion/expr/src/udf.rs --- datafusion/expr/src/udf.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 9302ad9f821f..9df9b39334b1 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -347,7 +347,15 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { Ok(None) } - // Do the function rewrite. + /// Optionally apply per-UDF simplification / rewrite rules + /// + /// This can be used to apply function specific simplification rules + /// during optimization (e.g. `arrow_cast` --> `Expr::Cast`). + /// + /// Note that since DataFusion handles simplifying arguments + /// as well as "constant folding" (replacing a function call with constant + /// arguments such as `my_add(1,2) --> 3` ) there is no need to implement such + /// optimizations for UDFs. // 'args': The arguments of the function // 'schema': The schema of the function fn simplify(&self, _args: &[Expr], _info: &dyn SimplifyInfo) -> Result { From 4886ba5c871b98bd1ae46d9553bf5c59e359cd23 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 2 Mar 2024 14:51:50 -0500 Subject: [PATCH 18/26] Add backwards compatibile uses, inline FunctionSimplifier, rename to ExprSimplifyResult --- .../user_defined_scalar_functions.rs | 10 ++-- datafusion/expr/src/simplify.rs | 4 +- datafusion/expr/src/udf.rs | 20 ++++--- .../simplify_expressions/expr_simplifier.rs | 26 +++++---- .../function_simplifier.rs | 54 ------------------- .../optimizer/src/simplify_expressions/mod.rs | 4 +- datafusion/physical-expr/src/lib.rs | 5 ++ 7 files changed, 48 insertions(+), 75 deletions(-) delete mode 100644 datafusion/optimizer/src/simplify_expressions/function_simplifier.rs diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index 15b2c1476d94..61186a491457 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -29,7 +29,7 @@ use datafusion_common::{ assert_batches_eq, assert_batches_sorted_eq, cast::as_int32_array, not_impl_err, plan_err, ExprSchema, Result, ScalarValue, }; -use datafusion_expr::simplify::Simplified; +use datafusion_expr::simplify::ExprSimplifyResult; use datafusion_expr::simplify::SimplifyInfo; use datafusion_expr::{ create_udaf, create_udf, Accumulator, ColumnarValue, ExprSchemable, @@ -547,11 +547,15 @@ impl ScalarUDFImpl for CastToI64UDF { Ok(DataType::Int64) } // Wrap with Expr::Cast() to Int64 - fn simplify(&self, args: &[Expr], info: &dyn SimplifyInfo) -> Result { + fn simplify( + &self, + args: &[Expr], + info: &dyn SimplifyInfo, + ) -> Result { let e = args[0].to_owned(); let schema = info.schema().unwrap_or_else(|| DFSchema::empty().into()); let casted_expr = e.cast_to(&DataType::Int64, schema.as_ref())?; - Ok(Simplified::Rewritten(casted_expr)) + Ok(ExprSimplifyResult::Simplified(casted_expr)) } // Casting should be done in `simplify`, so we just return the first argument fn invoke(&self, args: &[ColumnarValue]) -> Result { diff --git a/datafusion/expr/src/simplify.rs b/datafusion/expr/src/simplify.rs index 01bdad3931fc..f531e98d589b 100644 --- a/datafusion/expr/src/simplify.rs +++ b/datafusion/expr/src/simplify.rs @@ -110,9 +110,9 @@ impl<'a> SimplifyInfo for SimplifyContext<'a> { } /// Was the expression simplified? -pub enum Simplified { +pub enum ExprSimplifyResult { /// The function call was simplified to an entirely new Expr - Rewritten(Expr), + Simplified(Expr), /// the function call could not be simplified, and the arguments /// are return unmodified Original, diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 9df9b39334b1..44b2f5a1e1a4 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -17,7 +17,7 @@ //! [`ScalarUDF`]: Scalar User Defined Functions -use crate::simplify::{Simplified, SimplifyInfo}; +use crate::simplify::{ExprSimplifyResult, SimplifyInfo}; use crate::ExprSchemable; use crate::{ ColumnarValue, Expr, FuncMonotonicity, ReturnTypeFunction, @@ -165,7 +165,11 @@ impl ScalarUDF { /// Do the function rewrite /// /// See [`ScalarUDFImpl::simplify`] for more details. - pub fn simplify(&self, args: &[Expr], info: &dyn SimplifyInfo) -> Result { + pub fn simplify( + &self, + args: &[Expr], + info: &dyn SimplifyInfo, + ) -> Result { self.inner.simplify(args, info) } @@ -349,17 +353,21 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { /// Optionally apply per-UDF simplification / rewrite rules /// - /// This can be used to apply function specific simplification rules + /// This can be used to apply function specific simplification rules /// during optimization (e.g. `arrow_cast` --> `Expr::Cast`). /// - /// Note that since DataFusion handles simplifying arguments + /// Note that since DataFusion handles simplifying arguments /// as well as "constant folding" (replacing a function call with constant /// arguments such as `my_add(1,2) --> 3` ) there is no need to implement such /// optimizations for UDFs. // 'args': The arguments of the function // 'schema': The schema of the function - fn simplify(&self, _args: &[Expr], _info: &dyn SimplifyInfo) -> Result { - Ok(Simplified::Original) + fn simplify( + &self, + _args: &[Expr], + _info: &dyn SimplifyInfo, + ) -> Result { + Ok(ExprSimplifyResult::Original) } } diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index 999f3740d67b..d47be161b779 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -32,20 +32,19 @@ use datafusion_common::{ use datafusion_common::{ internal_err, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, }; -use datafusion_expr::execution_props::ExecutionProps; -use datafusion_expr::simplify::SimplifyInfo; +use datafusion_expr::simplify::ExprSimplifyResult; use datafusion_expr::{ and, lit, or, BinaryExpr, BuiltinScalarFunction, Case, ColumnarValue, Expr, Like, ScalarFunctionDefinition, Volatility, }; use datafusion_expr::{expr::ScalarFunction, interval_arithmetic::NullableInterval}; -use datafusion_physical_expr::create_physical_expr; +use datafusion_physical_expr::{create_physical_expr, execution_props::ExecutionProps}; use crate::analyzer::type_coercion::TypeCoercionRewriter; use crate::simplify_expressions::guarantees::GuaranteeRewriter; use crate::simplify_expressions::regex::simplify_regex_expr; +use crate::simplify_expressions::SimplifyInfo; -use super::function_simplifier::FunctionSimplifier; use super::inlist_simplifier::{InListSimplifier, ShortenInListSimplifier}; use super::utils::*; @@ -178,7 +177,6 @@ impl ExprSimplifier { let mut shorten_in_list_simplifier = ShortenInListSimplifier::new(); let mut inlist_simplifier = InListSimplifier::new(); let mut guarantee_rewriter = GuaranteeRewriter::new(&self.guarantees); - let mut function_simplifier = FunctionSimplifier::new(&self.info); let expr = if self.canonicalize { expr.rewrite(&mut Canonicalizer::new())? @@ -190,8 +188,7 @@ impl ExprSimplifier { // (evaluating constants can enable new simplifications and // simplifications can enable new constant evaluation) // https://github.com/apache/arrow-datafusion/issues/1160 - expr.rewrite(&mut function_simplifier)? - .rewrite(&mut const_evaluator)? + expr.rewrite(&mut const_evaluator)? .rewrite(&mut simplifier)? .rewrite(&mut inlist_simplifier)? .rewrite(&mut shorten_in_list_simplifier)? @@ -1276,6 +1273,17 @@ impl<'a, S: SimplifyInfo> TreeNodeRewriter for Simplifier<'a, S> { out_expr.rewrite(self)? } + Expr::ScalarFunction(ScalarFunction { + func_def: ScalarFunctionDefinition::UDF(udf), + args, + }) => match udf.simplify(&args, info)? { + ExprSimplifyResult::Original => Expr::ScalarFunction(ScalarFunction { + func_def: ScalarFunctionDefinition::UDF(udf), + args, + }), + ExprSimplifyResult::Simplified(expr) => expr, + }, + // log Expr::ScalarFunction(ScalarFunction { func_def: ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::Log), @@ -1382,10 +1390,10 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::{assert_contains, DFField, ToDFSchema}; - use datafusion_expr::execution_props::ExecutionProps; - use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::{interval_arithmetic::Interval, *}; + use datafusion_physical_expr::execution_props::ExecutionProps; + use crate::simplify_expressions::SimplifyContext; use crate::test::test_table_scan_with_name; use super::*; diff --git a/datafusion/optimizer/src/simplify_expressions/function_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/function_simplifier.rs deleted file mode 100644 index dc8de3b48dab..000000000000 --- a/datafusion/optimizer/src/simplify_expressions/function_simplifier.rs +++ /dev/null @@ -1,54 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! This module implements a rule that do function simplification. - -use datafusion_common::tree_node::TreeNodeRewriter; -use datafusion_common::Result; -use datafusion_expr::simplify::Simplified; -use datafusion_expr::simplify::SimplifyInfo; -use datafusion_expr::{expr::ScalarFunction, Expr, ScalarFunctionDefinition}; - -pub(super) struct FunctionSimplifier<'a, S> { - info: &'a S, -} - -impl<'a, S> FunctionSimplifier<'a, S> { - pub(super) fn new(info: &'a S) -> Self { - Self { info } - } -} - -impl<'a, S: SimplifyInfo> TreeNodeRewriter for FunctionSimplifier<'a, S> { - type N = Expr; - - fn mutate(&mut self, expr: Expr) -> Result { - if let Expr::ScalarFunction(ScalarFunction { - func_def: ScalarFunctionDefinition::UDF(udf), - args, - }) = &expr - { - let simplified_expr = udf.simplify(args, self.info)?; - match simplified_expr { - Simplified::Original => Ok(expr), - Simplified::Rewritten(expr) => Ok(expr), - } - } else { - Ok(expr) - } - } -} diff --git a/datafusion/optimizer/src/simplify_expressions/mod.rs b/datafusion/optimizer/src/simplify_expressions/mod.rs index a105088db647..5244f9a5af88 100644 --- a/datafusion/optimizer/src/simplify_expressions/mod.rs +++ b/datafusion/optimizer/src/simplify_expressions/mod.rs @@ -16,12 +16,14 @@ // under the License. pub mod expr_simplifier; -mod function_simplifier; mod guarantees; mod inlist_simplifier; mod regex; pub mod simplify_exprs; mod utils; +// backwards compatibility +pub use datafusion_expr::simplify::{SimplifyContext, SimplifyInfo}; + pub use expr_simplifier::*; pub use simplify_exprs::*; diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index df2852beaf42..35dd21b96d48 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -45,6 +45,11 @@ pub mod unicode_expressions; pub mod utils; pub mod window; +// backwards compatibility +pub mod execution_props { + pub use datafusion_expr::execution_props::ExecutionProps; +} + pub use aggregate::groups_accumulator::{GroupsAccumulatorAdapter, NullState}; pub use aggregate::AggregateExpr; pub use analysis::{analyze, AnalysisContext, ExprBoundaries}; From ed6a04bc2194ea282f6be606e59bd5b0f449272d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 2 Mar 2024 15:22:09 -0500 Subject: [PATCH 19/26] Remove DFSchema from SimplifyInfo --- datafusion/core/tests/simplification.rs | 4 ---- .../user_defined_scalar_functions.rs | 17 ++++++++++++---- datafusion/expr/src/simplify.rs | 11 ++-------- datafusion/expr/src/udf.rs | 20 ++++++++++--------- 4 files changed, 26 insertions(+), 26 deletions(-) diff --git a/datafusion/core/tests/simplification.rs b/datafusion/core/tests/simplification.rs index c3e5ea7c4269..41457df02cfc 100644 --- a/datafusion/core/tests/simplification.rs +++ b/datafusion/core/tests/simplification.rs @@ -67,10 +67,6 @@ impl SimplifyInfo for MyInfo { fn get_data_type(&self, expr: &Expr) -> Result { expr.get_type(self.schema.as_ref()) } - - fn schema(&self) -> Option { - Some(self.schema.clone()) - } } impl From for MyInfo { diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index 61186a491457..eaee1a55cfa4 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -24,7 +24,6 @@ use arrow_schema::{DataType, Field, Schema}; use datafusion::prelude::*; use datafusion::{execution::registry::FunctionRegistry, test_util}; use datafusion_common::cast::as_float64_array; -use datafusion_common::DFSchema; use datafusion_common::{ assert_batches_eq, assert_batches_sorted_eq, cast::as_int32_array, not_impl_err, plan_err, ExprSchema, Result, ScalarValue, @@ -553,9 +552,19 @@ impl ScalarUDFImpl for CastToI64UDF { info: &dyn SimplifyInfo, ) -> Result { let e = args[0].to_owned(); - let schema = info.schema().unwrap_or_else(|| DFSchema::empty().into()); - let casted_expr = e.cast_to(&DataType::Int64, schema.as_ref())?; - Ok(ExprSimplifyResult::Simplified(casted_expr)) + // TODO cast_to requires an ExprSchema but simplify gets a SimplifyInfo + // so we have to replicate some of the casting logic here. + let source_type = info.get_data_type(&e)?; + if source_type == DataType::Int64 { + Ok(ExprSimplifyResult::Original) + } else { + Ok(ExprSimplifyResult::Simplified(Expr::Cast( + datafusion_expr::Cast { + expr: Box::new(e), + data_type: DataType::Int64, + }, + ))) + } } // Casting should be done in `simplify`, so we just return the first argument fn invoke(&self, args: &[ColumnarValue]) -> Result { diff --git a/datafusion/expr/src/simplify.rs b/datafusion/expr/src/simplify.rs index f531e98d589b..e8f987a55bea 100644 --- a/datafusion/expr/src/simplify.rs +++ b/datafusion/expr/src/simplify.rs @@ -22,8 +22,7 @@ use datafusion_common::{DFSchemaRef, DataFusionError, Result}; use crate::{execution_props::ExecutionProps, Expr, ExprSchemable}; -#[allow(rustdoc::private_intra_doc_links)] -/// The information necessary to apply algebraic simplification to an +/// Provides the information necessary to apply algebraic simplification to an /// [Expr]. See [SimplifyContext] for one concrete implementation. /// /// This trait exists so that other systems can plug schema @@ -41,12 +40,10 @@ pub trait SimplifyInfo { /// Returns data type of this expr needed for determining optimized int type of a value fn get_data_type(&self, expr: &Expr) -> Result; - - /// Return the schema for function simplifier - fn schema(&self) -> Option; } /// See `ExprSimplifier` for an example of how to use this. +#[derive(Debug, Clone)] pub struct SimplifyContext<'a> { schema: Option, props: &'a ExecutionProps, @@ -69,10 +66,6 @@ impl<'a> SimplifyContext<'a> { } impl<'a> SimplifyInfo for SimplifyContext<'a> { - fn schema(&self) -> Option { - self.schema.clone() - } - /// returns true if this Expr has boolean type fn is_boolean_type(&self, expr: &Expr) -> Result { for schema in &self.schema { diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 44b2f5a1e1a4..ab36d95fd71d 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -351,17 +351,19 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { Ok(None) } - /// Optionally apply per-UDF simplification / rewrite rules + /// Optionally apply per-UDF simplification / rewrite rules. /// /// This can be used to apply function specific simplification rules - /// during optimization (e.g. `arrow_cast` --> `Expr::Cast`). - /// - /// Note that since DataFusion handles simplifying arguments - /// as well as "constant folding" (replacing a function call with constant - /// arguments such as `my_add(1,2) --> 3` ) there is no need to implement such - /// optimizations for UDFs. - // 'args': The arguments of the function - // 'schema': The schema of the function + /// during optimization (e.g. `arrow_cast` --> `Expr::Cast`). + /// + /// Note that DataFusion handles simplifying arguments and "constant + /// folding" (replacing a function call with constant arguments such as + /// `my_add(1,2) --> 3` ). Thus, there is no need to implement such + /// optimizations manually for specific UDFs. + /// + /// # Arguments + /// * 'args': The arguments of the function + /// * 'schema': The schema of the function fn simplify( &self, _args: &[Expr], From f6848d8edeaba1ab23d10156ca08815e42ca674f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 2 Mar 2024 15:30:43 -0500 Subject: [PATCH 20/26] Avoid requiring argument copies --- .../user_defined_scalar_functions.rs | 15 +++++++++------ datafusion/expr/src/simplify.rs | 4 ++-- datafusion/expr/src/udf.rs | 16 +++++++++++----- .../src/simplify_expressions/expr_simplifier.rs | 12 +++++++----- 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index eaee1a55cfa4..ad32151ab141 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -548,16 +548,19 @@ impl ScalarUDFImpl for CastToI64UDF { // Wrap with Expr::Cast() to Int64 fn simplify( &self, - args: &[Expr], + mut args: Vec, info: &dyn SimplifyInfo, ) -> Result { - let e = args[0].to_owned(); - // TODO cast_to requires an ExprSchema but simplify gets a SimplifyInfo - // so we have to replicate some of the casting logic here. - let source_type = info.get_data_type(&e)?; + // Note that Expr::cast_to requires an ExprSchema but simplify gets a + // SimplifyInfo so we have to replicate some of the casting logic here. + let source_type = info.get_data_type(&args[0])?; if source_type == DataType::Int64 { - Ok(ExprSimplifyResult::Original) + Ok(ExprSimplifyResult::Original(args)) } else { + // DataFusion should have ensured the function is called with just a + // single argument + assert_eq!(args.len(), 1); + let e = args.pop().unwrap(); Ok(ExprSimplifyResult::Simplified(Expr::Cast( datafusion_expr::Cast { expr: Box::new(e), diff --git a/datafusion/expr/src/simplify.rs b/datafusion/expr/src/simplify.rs index e8f987a55bea..bd57c9e8bc13 100644 --- a/datafusion/expr/src/simplify.rs +++ b/datafusion/expr/src/simplify.rs @@ -107,6 +107,6 @@ pub enum ExprSimplifyResult { /// The function call was simplified to an entirely new Expr Simplified(Expr), /// the function call could not be simplified, and the arguments - /// are return unmodified - Original, + /// are return unmodified. + Original(Vec), } diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index ab36d95fd71d..5ad420b2f382 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -167,7 +167,7 @@ impl ScalarUDF { /// See [`ScalarUDFImpl::simplify`] for more details. pub fn simplify( &self, - args: &[Expr], + args: Vec, info: &dyn SimplifyInfo, ) -> Result { self.inner.simplify(args, info) @@ -353,8 +353,9 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { /// Optionally apply per-UDF simplification / rewrite rules. /// - /// This can be used to apply function specific simplification rules - /// during optimization (e.g. `arrow_cast` --> `Expr::Cast`). + /// This can be used to apply function specific simplification rules during + /// optimization (e.g. `arrow_cast` --> `Expr::Cast`). The default + /// implementation does nothing. /// /// Note that DataFusion handles simplifying arguments and "constant /// folding" (replacing a function call with constant arguments such as @@ -364,12 +365,17 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { /// # Arguments /// * 'args': The arguments of the function /// * 'schema': The schema of the function + /// + /// # Returns + /// [`ExprSimplifyResult`] indicating the result of the simplification NOTE + /// if the function cannot be simplified, the arguments *MUST* be returned + /// unmodified fn simplify( &self, - _args: &[Expr], + args: Vec, _info: &dyn SimplifyInfo, ) -> Result { - Ok(ExprSimplifyResult::Original) + Ok(ExprSimplifyResult::Original(args)) } } diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index d47be161b779..14b1b2e9de45 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -1276,11 +1276,13 @@ impl<'a, S: SimplifyInfo> TreeNodeRewriter for Simplifier<'a, S> { Expr::ScalarFunction(ScalarFunction { func_def: ScalarFunctionDefinition::UDF(udf), args, - }) => match udf.simplify(&args, info)? { - ExprSimplifyResult::Original => Expr::ScalarFunction(ScalarFunction { - func_def: ScalarFunctionDefinition::UDF(udf), - args, - }), + }) => match udf.simplify(args, info)? { + ExprSimplifyResult::Original(args) => { + Expr::ScalarFunction(ScalarFunction { + func_def: ScalarFunctionDefinition::UDF(udf), + args, + }) + } ExprSimplifyResult::Simplified(expr) => expr, }, From 18f8371d2bd19b60bae60843292119c5e04d5f75 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 2 Mar 2024 15:39:14 -0500 Subject: [PATCH 21/26] Improve docs --- datafusion/expr/src/execution_props.rs | 1 - datafusion/expr/src/simplify.rs | 8 +++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/execution_props.rs b/datafusion/expr/src/execution_props.rs index 7e5dd03af32c..8fdbbb7c5452 100644 --- a/datafusion/expr/src/execution_props.rs +++ b/datafusion/expr/src/execution_props.rs @@ -32,7 +32,6 @@ use std::sync::Arc; /// done so during predicate pruning and expression simplification /// /// [`LogicalPlan`]: datafusion_expr::LogicalPlan -#[allow(rustdoc::broken_intra_doc_links)] #[derive(Clone, Debug)] pub struct ExecutionProps { pub query_execution_start_time: DateTime, diff --git a/datafusion/expr/src/simplify.rs b/datafusion/expr/src/simplify.rs index bd57c9e8bc13..536a01fa8571 100644 --- a/datafusion/expr/src/simplify.rs +++ b/datafusion/expr/src/simplify.rs @@ -42,7 +42,13 @@ pub trait SimplifyInfo { fn get_data_type(&self, expr: &Expr) -> Result; } -/// See `ExprSimplifier` for an example of how to use this. +/// Provides simplification information based on DFSchema and +/// [`ExecutionProps`]. This is the default implementation used by DataFusion +/// +/// # Example +/// See the `simplify_demo` in the [`expr_api` example] +/// +/// [`expr_api` example]: https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/expr_api.rs #[derive(Debug, Clone)] pub struct SimplifyContext<'a> { schema: Option, From bfb54a06c8724b4c4dc2a9936e92e8aa8f73c430 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 2 Mar 2024 15:40:17 -0500 Subject: [PATCH 22/26] fix link --- datafusion/expr/src/execution_props.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/expr/src/execution_props.rs b/datafusion/expr/src/execution_props.rs index 8fdbbb7c5452..58e5f3150a98 100644 --- a/datafusion/expr/src/execution_props.rs +++ b/datafusion/expr/src/execution_props.rs @@ -24,14 +24,12 @@ use std::sync::Arc; /// Holds per-query execution properties and data (such as statement /// starting timestamps). /// -/// An [`ExecutionProps`] is created each time a [`LogicalPlan`] is +/// An [`ExecutionProps`] is created each time a `LogicalPlan` is /// prepared for execution (optimized). If the same plan is optimized /// multiple times, a new `ExecutionProps` is created each time. /// /// It is important that this structure be cheap to create as it is /// done so during predicate pruning and expression simplification -/// -/// [`LogicalPlan`]: datafusion_expr::LogicalPlan #[derive(Clone, Debug)] pub struct ExecutionProps { pub query_execution_start_time: DateTime, From 33aa7ff72b4eaea1c46624457e3b46eb0a373362 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 2 Mar 2024 15:43:30 -0500 Subject: [PATCH 23/26] fix doc test --- datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index 14b1b2e9de45..6914d91228ec 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -155,7 +155,6 @@ impl ExprSimplifier { /// fn get_data_type(&self, expr: &Expr) -> Result { /// Ok(DataType::Int32) /// } - /// fn schema(&self) -> Option> { None } /// } /// /// // Create the simplifier From 24adcbf10c39ddccba0e71a52cf9e751800a5cde Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 3 Mar 2024 02:09:21 -0500 Subject: [PATCH 24/26] Update datafusion/physical-expr/src/lib.rs --- datafusion/physical-expr/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 35dd21b96d48..7e99ac885188 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -48,6 +48,7 @@ pub mod window; // backwards compatibility pub mod execution_props { pub use datafusion_expr::execution_props::ExecutionProps; + pub use datafusion_expr::var_provider::{VarProvider, VarType}; } pub use aggregate::groups_accumulator::{GroupsAccumulatorAdapter, NullState}; From fea82cba90d12aa6ff2d64ff2983a6473c8bdc38 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 4 Mar 2024 17:47:46 -0500 Subject: [PATCH 25/26] Change example simplify to always simplify its argument --- .../user_defined_scalar_functions.rs | 41 +++++++++++-------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index ad32151ab141..a29ae9d13935 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -545,34 +545,39 @@ impl ScalarUDFImpl for CastToI64UDF { fn return_type(&self, _args: &[DataType]) -> Result { Ok(DataType::Int64) } - // Wrap with Expr::Cast() to Int64 + + // Demonstrate simplifying a UDF fn simplify( &self, mut args: Vec, info: &dyn SimplifyInfo, ) -> Result { + // DataFusion should have ensured the function is called with just a + // single argument + assert_eq!(args.len(), 1); + let arg = args.pop().unwrap(); + // Note that Expr::cast_to requires an ExprSchema but simplify gets a // SimplifyInfo so we have to replicate some of the casting logic here. - let source_type = info.get_data_type(&args[0])?; - if source_type == DataType::Int64 { - Ok(ExprSimplifyResult::Original(args)) + + let source_type = info.get_data_type(&arg)?; + let new_expr = if source_type == DataType::Int64 { + // the argument's data type is already the correct type + arg } else { - // DataFusion should have ensured the function is called with just a - // single argument - assert_eq!(args.len(), 1); - let e = args.pop().unwrap(); - Ok(ExprSimplifyResult::Simplified(Expr::Cast( - datafusion_expr::Cast { - expr: Box::new(e), - data_type: DataType::Int64, - }, - ))) - } + // need to use an actual cast to get the correct type + Expr::Cast(datafusion_expr::Cast { + expr: Box::new(arg), + data_type: DataType::Int64, + }) + }; + // return the newly written argument to DataFusion + Ok(ExprSimplifyResult::Simplified(new_expr)) } + // Casting should be done in `simplify`, so we just return the first argument - fn invoke(&self, args: &[ColumnarValue]) -> Result { - assert_eq!(args.len(), 1); - Ok(args.first().unwrap().clone()) + fn invoke(&self, _args: &[ColumnarValue]) -> Result { + unimplemented!("Function should not be evaluated") } } From 4e9eb70c8b1d286141218f151b4b066fd9952073 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 5 Mar 2024 16:28:06 -0500 Subject: [PATCH 26/26] Clarify comment --- .../core/tests/user_defined/user_defined_scalar_functions.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index a29ae9d13935..982fb0464ed5 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -575,9 +575,8 @@ impl ScalarUDFImpl for CastToI64UDF { Ok(ExprSimplifyResult::Simplified(new_expr)) } - // Casting should be done in `simplify`, so we just return the first argument fn invoke(&self, _args: &[ColumnarValue]) -> Result { - unimplemented!("Function should not be evaluated") + unimplemented!("Function should have been simplified prior to evaluation") } }