From eabfe68ce48b0a34363d5ea69c38d3cc7acc7da2 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 23 Apr 2024 17:09:28 +0800 Subject: [PATCH 01/11] Avoid create null array for empty args Signed-off-by: jayzhan211 --- datafusion/functions/src/math/pi.rs | 7 +++--- datafusion/functions/src/math/random.rs | 23 ++++++++----------- datafusion/functions/src/string/uuid.rs | 7 +++--- .../physical-expr/src/scalar_function.rs | 20 ++++------------ 4 files changed, 23 insertions(+), 34 deletions(-) diff --git a/datafusion/functions/src/math/pi.rs b/datafusion/functions/src/math/pi.rs index 0801e797511b..2ece5fe165a9 100644 --- a/datafusion/functions/src/math/pi.rs +++ b/datafusion/functions/src/math/pi.rs @@ -63,9 +63,10 @@ impl ScalarUDFImpl for PiFunc { } fn invoke(&self, args: &[ColumnarValue]) -> Result { - if !matches!(&args[0], ColumnarValue::Array(_)) { - return exec_err!("Expect pi function to take no param"); - } + if !args.is_empty() { + return exec_err!("Expect {} function to take no param", self.name()); + }; + let array = Float64Array::from_value(std::f64::consts::PI, 1); Ok(ColumnarValue::Array(Arc::new(array))) } diff --git a/datafusion/functions/src/math/random.rs b/datafusion/functions/src/math/random.rs index 2c1ad4136702..9c5243b8bebd 100644 --- a/datafusion/functions/src/math/random.rs +++ b/datafusion/functions/src/math/random.rs @@ -65,22 +65,19 @@ impl ScalarUDFImpl for RandomFunc { } fn invoke(&self, args: &[ColumnarValue]) -> Result { - random(args) + let len = if args.is_empty() { + 1 + } else { + return exec_err!("Expect {} function to take no param", self.name()); + }; + + let mut rng = thread_rng(); + let values = iter::repeat_with(|| rng.gen_range(0.0..1.0)).take(len); + let array = Float64Array::from_iter_values(values); + Ok(ColumnarValue::Array(Arc::new(array))) } } -/// Random SQL function -fn random(args: &[ColumnarValue]) -> Result { - let len: usize = match &args[0] { - ColumnarValue::Array(array) => array.len(), - _ => return exec_err!("Expect random function to take no param"), - }; - let mut rng = thread_rng(); - let values = iter::repeat_with(|| rng.gen_range(0.0..1.0)).take(len); - let array = Float64Array::from_iter_values(values); - Ok(ColumnarValue::Array(Arc::new(array))) -} - #[cfg(test)] mod test { use std::sync::Arc; diff --git a/datafusion/functions/src/string/uuid.rs b/datafusion/functions/src/string/uuid.rs index c68871d42e9f..61948b4bcc39 100644 --- a/datafusion/functions/src/string/uuid.rs +++ b/datafusion/functions/src/string/uuid.rs @@ -61,9 +61,10 @@ impl ScalarUDFImpl for UuidFunc { /// Prints random (v4) uuid values per row /// uuid() = 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11' fn invoke(&self, args: &[ColumnarValue]) -> Result { - let len: usize = match &args[0] { - ColumnarValue::Array(array) => array.len(), - _ => return exec_err!("Expect uuid function to take no param"), + let len = if args.is_empty() { + 1 + } else { + return exec_err!("Expect {} function to take no param", self.name()); }; let values = iter::repeat_with(|| Uuid::new_v4().to_string()).take(len); diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index 9ae9f3dee3e7..7acdf322c341 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -142,21 +142,11 @@ impl PhysicalExpr for ScalarFunctionExpr { } fn evaluate(&self, batch: &RecordBatch) -> Result { - // evaluate the arguments, if there are no arguments we'll instead pass in a null array - // indicating the batch size (as a convention) - let inputs = match self.args.is_empty() { - // If the function supports zero argument, we pass in a null array indicating the batch size. - // This is for user-defined functions. - // MakeArray support zero argument but has the different behavior from the array with one null. - true if self.supports_zero_argument && self.name != "make_array" => { - vec![ColumnarValue::create_null_array(batch.num_rows())] - } - _ => self - .args - .iter() - .map(|e| e.evaluate(batch)) - .collect::>>()?, - }; + let inputs = self + .args + .iter() + .map(|e| e.evaluate(batch)) + .collect::>>()?; // evaluate the function match self.fun { From 7b529d905ed98aac10ad41aa4e24eb34eb815aaa Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 23 Apr 2024 17:21:07 +0800 Subject: [PATCH 02/11] fix test Signed-off-by: jayzhan211 --- datafusion/functions/src/math/random.rs | 34 ++++++++++++------------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/datafusion/functions/src/math/random.rs b/datafusion/functions/src/math/random.rs index 9c5243b8bebd..457b3d1194ad 100644 --- a/datafusion/functions/src/math/random.rs +++ b/datafusion/functions/src/math/random.rs @@ -65,34 +65,32 @@ impl ScalarUDFImpl for RandomFunc { } fn invoke(&self, args: &[ColumnarValue]) -> Result { - let len = if args.is_empty() { - 1 - } else { - return exec_err!("Expect {} function to take no param", self.name()); - }; - - let mut rng = thread_rng(); - let values = iter::repeat_with(|| rng.gen_range(0.0..1.0)).take(len); - let array = Float64Array::from_iter_values(values); - Ok(ColumnarValue::Array(Arc::new(array))) + random(args) } } +fn random(args: &[ColumnarValue]) -> Result { + let len = if args.is_empty() { + 1 + } else { + return exec_err!("Expect random function to take no param"); + }; + + let mut rng = thread_rng(); + let values = iter::repeat_with(|| rng.gen_range(0.0..1.0)).take(len); + let array = Float64Array::from_iter_values(values); + Ok(ColumnarValue::Array(Arc::new(array))) +} + #[cfg(test)] mod test { - use std::sync::Arc; - - use arrow::array::NullArray; - use datafusion_common::cast::as_float64_array; - use datafusion_expr::ColumnarValue; - use crate::math::random::random; + use super::*; #[test] fn test_random_expression() { - let args = vec![ColumnarValue::Array(Arc::new(NullArray::new(1)))]; - let array = random(&args) + let array = random(&[]) .expect("failed to initialize function random") .into_array(1) .expect("Failed to convert to array"); From 03ec8b521d1d7029f3b5bafcf18cafc5d2fc278e Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 23 Apr 2024 17:31:52 +0800 Subject: [PATCH 03/11] fix test Signed-off-by: jayzhan211 --- .../user_defined/user_defined_scalar_functions.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index e31a1081621a..1c975409e6cb 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -434,15 +434,10 @@ impl ScalarUDFImpl for RandomUDF { } fn invoke(&self, args: &[ColumnarValue]) -> Result { - let len: usize = match &args[0] { - // This udf is always invoked with zero argument so its argument - // is a null array indicating the batch size. - ColumnarValue::Array(array) if array.data_type().is_null() => array.len(), - _ => { - return Err(datafusion::error::DataFusionError::Internal( - "Invalid argument type".to_string(), - )) - } + let len = if args.is_empty() { + 1 + } else { + return internal_err!("Invalid argument type"); }; let mut rng = thread_rng(); let values = iter::repeat_with(|| rng.gen_range(0.1..1.0)).take(len); From 36e685e961b6a1f1ea284e5cebb0a51b17076de7 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 23 Apr 2024 20:22:36 +0800 Subject: [PATCH 04/11] return scalar instead of array Signed-off-by: jayzhan211 --- .../user_defined_scalar_functions.rs | 133 +++--------------- datafusion/expr/src/udf.rs | 5 - datafusion/functions/src/math/pi.rs | 15 +- datafusion/functions/src/math/random.rs | 44 +----- datafusion/functions/src/string/uuid.rs | 19 +-- datafusion/sqllogictest/test_files/test1.slt | 0 6 files changed, 32 insertions(+), 184 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/test1.slt diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index 1c975409e6cb..32d4539668bb 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -17,9 +17,8 @@ use arrow::compute::kernels::numeric::add; use arrow_array::{ - Array, ArrayRef, Float32Array, Float64Array, Int32Array, RecordBatch, UInt8Array, + ArrayRef, Float32Array, Float64Array, Int32Array, RecordBatch }; -use arrow_schema::DataType::Float64; use arrow_schema::{DataType, Field, Schema}; use datafusion::execution::context::{FunctionFactory, RegisterFunction, SessionState}; use datafusion::prelude::*; @@ -36,9 +35,7 @@ use datafusion_expr::{ create_udaf, create_udf, Accumulator, ColumnarValue, CreateFunction, ExprSchemable, LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, Volatility, }; -use rand::{thread_rng, Rng}; use std::any::Any; -use std::iter; use std::sync::Arc; /// test that casting happens on udfs. @@ -403,118 +400,6 @@ async fn test_user_defined_functions_with_alias() -> Result<()> { Ok(()) } -#[derive(Debug)] -pub struct RandomUDF { - signature: Signature, -} - -impl RandomUDF { - pub fn new() -> Self { - Self { - signature: Signature::any(0, Volatility::Volatile), - } - } -} - -impl ScalarUDFImpl for RandomUDF { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn name(&self) -> &str { - "random_udf" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, _arg_types: &[DataType]) -> Result { - Ok(Float64) - } - - fn invoke(&self, args: &[ColumnarValue]) -> Result { - let len = if args.is_empty() { - 1 - } else { - return internal_err!("Invalid argument type"); - }; - let mut rng = thread_rng(); - let values = iter::repeat_with(|| rng.gen_range(0.1..1.0)).take(len); - let array = Float64Array::from_iter_values(values); - Ok(ColumnarValue::Array(Arc::new(array))) - } -} - -/// Ensure that a user defined function with zero argument will be invoked -/// with a null array indicating the batch size. -#[tokio::test] -async fn test_user_defined_functions_zero_argument() -> Result<()> { - let ctx = SessionContext::new(); - - let schema = Arc::new(Schema::new(vec![Field::new( - "index", - DataType::UInt8, - false, - )])); - - let batch = RecordBatch::try_new( - schema, - vec![Arc::new(UInt8Array::from_iter_values([1, 2, 3]))], - )?; - - ctx.register_batch("data_table", batch)?; - - let random_normal_udf = ScalarUDF::from(RandomUDF::new()); - ctx.register_udf(random_normal_udf); - - let result = plan_and_collect( - &ctx, - "SELECT random_udf() AS random_udf, random() AS native_random FROM data_table", - ) - .await?; - - assert_eq!(result.len(), 1); - let batch = &result[0]; - let random_udf = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let native_random = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - - assert_eq!(random_udf.len(), native_random.len()); - - let mut previous = -1.0; - for i in 0..random_udf.len() { - assert!(random_udf.value(i) >= 0.0 && random_udf.value(i) < 1.0); - assert!(random_udf.value(i) != previous); - previous = random_udf.value(i); - } - - Ok(()) -} - -#[tokio::test] -async fn deregister_udf() -> Result<()> { - let random_normal_udf = ScalarUDF::from(RandomUDF::new()); - let ctx = SessionContext::new(); - - ctx.register_udf(random_normal_udf.clone()); - - assert!(ctx.udfs().contains("random_udf")); - - ctx.deregister_udf("random_udf"); - - assert!(!ctx.udfs().contains("random_udf")); - - Ok(()) -} - #[derive(Debug)] struct CastToI64UDF { signature: Signature, @@ -610,6 +495,22 @@ async fn test_user_defined_functions_cast_to_i64() -> Result<()> { Ok(()) } +#[tokio::test] +async fn deregister_udf() -> Result<()> { + let cast2i64 = ScalarUDF::from(CastToI64UDF::new()); + let ctx = SessionContext::new(); + + ctx.register_udf(cast2i64.clone()); + + assert!(ctx.udfs().contains("cast_to_i64")); + + ctx.deregister_udf("cast_to_i64"); + + assert!(!ctx.udfs().contains("cast_to_i64")); + + Ok(()) +} + #[derive(Debug)] struct TakeUDF { signature: Signature, diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 4557fe60a447..6f05ec1a5300 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -322,11 +322,6 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { /// The function will be invoked passed with the slice of [`ColumnarValue`] /// (either scalar or array). /// - /// # Zero Argument Functions - /// If the function has zero parameters (e.g. `now()`) it will be passed a - /// single element slice which is a a null array to indicate the batch's row - /// count (so the function can know the resulting array size). - /// /// # Performance /// /// For the best performance, the implementations of `invoke` should handle diff --git a/datafusion/functions/src/math/pi.rs b/datafusion/functions/src/math/pi.rs index 2ece5fe165a9..c135a13f1256 100644 --- a/datafusion/functions/src/math/pi.rs +++ b/datafusion/functions/src/math/pi.rs @@ -16,13 +16,11 @@ // under the License. use std::any::Any; -use std::sync::Arc; -use arrow::array::Float64Array; use arrow::datatypes::DataType; use arrow::datatypes::DataType::Float64; -use datafusion_common::{exec_err, Result}; +use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{ColumnarValue, FuncMonotonicity, Volatility}; use datafusion_expr::{ScalarUDFImpl, Signature}; @@ -62,13 +60,10 @@ impl ScalarUDFImpl for PiFunc { Ok(Float64) } - fn invoke(&self, args: &[ColumnarValue]) -> Result { - if !args.is_empty() { - return exec_err!("Expect {} function to take no param", self.name()); - }; - - let array = Float64Array::from_value(std::f64::consts::PI, 1); - Ok(ColumnarValue::Array(Arc::new(array))) + fn invoke(&self, _args: &[ColumnarValue]) -> Result { + Ok(ColumnarValue::Scalar(ScalarValue::Float64(Some( + std::f64::consts::PI, + )))) } fn monotonicity(&self) -> Result> { diff --git a/datafusion/functions/src/math/random.rs b/datafusion/functions/src/math/random.rs index 457b3d1194ad..885952778dea 100644 --- a/datafusion/functions/src/math/random.rs +++ b/datafusion/functions/src/math/random.rs @@ -16,15 +16,12 @@ // under the License. use std::any::Any; -use std::iter; -use std::sync::Arc; -use arrow::array::Float64Array; use arrow::datatypes::DataType; use arrow::datatypes::DataType::Float64; use rand::{thread_rng, Rng}; -use datafusion_common::{exec_err, Result}; +use datafusion_common::{Result, ScalarValue}; use datafusion_expr::ColumnarValue; use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; @@ -64,40 +61,9 @@ impl ScalarUDFImpl for RandomFunc { Ok(Float64) } - fn invoke(&self, args: &[ColumnarValue]) -> Result { - random(args) - } -} - -fn random(args: &[ColumnarValue]) -> Result { - let len = if args.is_empty() { - 1 - } else { - return exec_err!("Expect random function to take no param"); - }; - - let mut rng = thread_rng(); - let values = iter::repeat_with(|| rng.gen_range(0.0..1.0)).take(len); - let array = Float64Array::from_iter_values(values); - Ok(ColumnarValue::Array(Arc::new(array))) -} - -#[cfg(test)] -mod test { - use datafusion_common::cast::as_float64_array; - - use super::*; - - #[test] - fn test_random_expression() { - let array = random(&[]) - .expect("failed to initialize function random") - .into_array(1) - .expect("Failed to convert to array"); - let floats = - as_float64_array(&array).expect("failed to initialize function random"); - - assert_eq!(floats.len(), 1); - assert!(0.0 <= floats.value(0) && floats.value(0) < 1.0); + fn invoke(&self, _args: &[ColumnarValue]) -> Result { + let mut rng = thread_rng(); + let val = rng.gen_range(0.0..1.0); + Ok(ColumnarValue::Scalar(ScalarValue::Float64(Some(val)))) } } diff --git a/datafusion/functions/src/string/uuid.rs b/datafusion/functions/src/string/uuid.rs index 61948b4bcc39..cedddc5a3ee2 100644 --- a/datafusion/functions/src/string/uuid.rs +++ b/datafusion/functions/src/string/uuid.rs @@ -16,15 +16,12 @@ // under the License. use std::any::Any; -use std::iter; -use std::sync::Arc; -use arrow::array::GenericStringArray; use arrow::datatypes::DataType; use arrow::datatypes::DataType::Utf8; use uuid::Uuid; -use datafusion_common::{exec_err, Result}; +use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{ColumnarValue, Volatility}; use datafusion_expr::{ScalarUDFImpl, Signature}; @@ -60,15 +57,9 @@ impl ScalarUDFImpl for UuidFunc { /// Prints random (v4) uuid values per row /// uuid() = 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11' - fn invoke(&self, args: &[ColumnarValue]) -> Result { - let len = if args.is_empty() { - 1 - } else { - return exec_err!("Expect {} function to take no param", self.name()); - }; - - let values = iter::repeat_with(|| Uuid::new_v4().to_string()).take(len); - let array = GenericStringArray::::from_iter_values(values); - Ok(ColumnarValue::Array(Arc::new(array))) + fn invoke(&self, _args: &[ColumnarValue]) -> Result { + Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some( + Uuid::new_v4().to_string(), + )))) } } diff --git a/datafusion/sqllogictest/test_files/test1.slt b/datafusion/sqllogictest/test_files/test1.slt new file mode 100644 index 000000000000..e69de29bb2d1 From 7b04c0ba6cc582d2aeedd87ad78666aaa638b982 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 23 Apr 2024 20:38:43 +0800 Subject: [PATCH 05/11] remove supports 0 args in scalarudf Signed-off-by: jayzhan211 --- datafusion/expr/src/type_coercion/functions.rs | 1 + datafusion/physical-expr/src/functions.rs | 1 - datafusion/physical-expr/src/scalar_function.rs | 6 ------ datafusion/physical-expr/src/udf.rs | 1 - datafusion/proto/src/physical_plan/from_proto.rs | 1 - 5 files changed, 1 insertion(+), 9 deletions(-) diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index 37eeb7d464b8..6700faf36566 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -52,6 +52,7 @@ pub fn data_types( ); } } + let valid_types = get_valid_types(&signature.type_signature, current_types)?; if valid_types diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 875fe7ac3be1..b5b68dc2f32f 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -74,7 +74,6 @@ pub fn create_physical_expr( input_phy_exprs.to_vec(), return_type, fun.monotonicity()?, - fun.signature().type_signature.supports_zero_argument(), ))) } diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index 7acdf322c341..b8759d60a866 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -58,8 +58,6 @@ pub struct ScalarFunctionExpr { // and it specifies the effect of an increase or decrease in // the corresponding `arg` to the function value. monotonicity: Option, - // Whether this function can be invoked with zero arguments - supports_zero_argument: bool, } impl Debug for ScalarFunctionExpr { @@ -70,7 +68,6 @@ impl Debug for ScalarFunctionExpr { .field("args", &self.args) .field("return_type", &self.return_type) .field("monotonicity", &self.monotonicity) - .field("supports_zero_argument", &self.supports_zero_argument) .finish() } } @@ -83,7 +80,6 @@ impl ScalarFunctionExpr { args: Vec>, return_type: DataType, monotonicity: Option, - supports_zero_argument: bool, ) -> Self { Self { fun, @@ -91,7 +87,6 @@ impl ScalarFunctionExpr { args, return_type, monotonicity, - supports_zero_argument, } } @@ -173,7 +168,6 @@ impl PhysicalExpr for ScalarFunctionExpr { children, self.return_type().clone(), self.monotonicity.clone(), - self.supports_zero_argument, ))) } diff --git a/datafusion/physical-expr/src/udf.rs b/datafusion/physical-expr/src/udf.rs index 368dfdf92f45..aad78b7c2f90 100644 --- a/datafusion/physical-expr/src/udf.rs +++ b/datafusion/physical-expr/src/udf.rs @@ -57,7 +57,6 @@ pub fn create_physical_expr( input_phy_exprs.to_vec(), return_type, fun.monotonicity()?, - fun.signature().type_signature.supports_zero_argument(), ))) } diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 12a3288a76bb..cec7dc7c7f29 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -360,7 +360,6 @@ pub fn parse_physical_expr( args, convert_required!(e.return_type)?, None, - signature.type_signature.supports_zero_argument(), )) } ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new( From 7c10382be60a9177dea9df0421df613cb8379f5d Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 23 Apr 2024 20:42:44 +0800 Subject: [PATCH 06/11] cleanup Signed-off-by: jayzhan211 --- datafusion/core/src/physical_optimizer/projection_pushdown.rs | 4 ---- .../core/tests/user_defined/user_defined_scalar_functions.rs | 4 +--- datafusion/proto/src/physical_plan/from_proto.rs | 1 - datafusion/proto/tests/cases/roundtrip_physical_plan.rs | 2 -- 4 files changed, 1 insertion(+), 10 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 337c566e8f33..c71868031b76 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -1402,7 +1402,6 @@ mod tests { ], DataType::Int32, None, - false, )), Arc::new(CaseExpr::try_new( Some(Arc::new(Column::new("d", 2))), @@ -1471,7 +1470,6 @@ mod tests { ], DataType::Int32, None, - false, )), Arc::new(CaseExpr::try_new( Some(Arc::new(Column::new("d", 3))), @@ -1543,7 +1541,6 @@ mod tests { ], DataType::Int32, None, - false, )), Arc::new(CaseExpr::try_new( Some(Arc::new(Column::new("d", 2))), @@ -1612,7 +1609,6 @@ mod tests { ], DataType::Int32, None, - false, )), Arc::new(CaseExpr::try_new( Some(Arc::new(Column::new("d_new", 3))), diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index 32d4539668bb..9b659c0724d9 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -16,9 +16,7 @@ // under the License. use arrow::compute::kernels::numeric::add; -use arrow_array::{ - ArrayRef, Float32Array, Float64Array, Int32Array, RecordBatch -}; +use arrow_array::{ArrayRef, Float32Array, Float64Array, Int32Array, RecordBatch}; use arrow_schema::{DataType, Field, Schema}; use datafusion::execution::context::{FunctionFactory, RegisterFunction, SessionState}; use datafusion::prelude::*; diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index cec7dc7c7f29..e8508fd81476 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -349,7 +349,6 @@ pub fn parse_physical_expr( Some(buf) => codec.try_decode_udf(&e.name, buf)?, None => registry.udf(e.name.as_str())?, }; - let signature = udf.signature(); let scalar_fun_def = ScalarFunctionDefinition::UDF(udf.clone()); let args = parse_physical_exprs(&e.args, registry, input_schema, codec)?; diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 642860d6397b..5e446f93fea7 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -626,7 +626,6 @@ fn roundtrip_scalar_udf() -> Result<()> { vec![col("a", &schema)?], DataType::Int64, None, - false, ); let project = @@ -755,7 +754,6 @@ fn roundtrip_scalar_udf_extension_codec() -> Result<()> { vec![col("text", &schema)?], DataType::Int64, None, - false, )); let filter = Arc::new(FilterExec::try_new( From 864d197cdcc7ed0a37a63f197652f0dd5171cb49 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 23 Apr 2024 20:48:30 +0800 Subject: [PATCH 07/11] rm test1 Signed-off-by: jayzhan211 --- datafusion/sqllogictest/test_files/test1.slt | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 datafusion/sqllogictest/test_files/test1.slt diff --git a/datafusion/sqllogictest/test_files/test1.slt b/datafusion/sqllogictest/test_files/test1.slt deleted file mode 100644 index e69de29bb2d1..000000000000 From 5b51fb7d7c06cc9dc015d7b5ed80272307e06578 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Wed, 24 Apr 2024 09:29:50 +0800 Subject: [PATCH 08/11] invoke no args and support randomness Signed-off-by: jayzhan211 --- datafusion/expr/src/udf.rs | 34 +++++++++++++++++-- datafusion/functions/src/math/random.rs | 15 +++++--- datafusion/functions/src/string/uuid.rs | 16 ++++++--- .../physical-expr/src/scalar_function.rs | 8 ++++- 4 files changed, 61 insertions(+), 12 deletions(-) diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 6f05ec1a5300..d623d3541fc7 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -23,7 +23,7 @@ use crate::{ ScalarFunctionImplementation, Signature, }; use arrow::datatypes::DataType; -use datafusion_common::{ExprSchema, Result}; +use datafusion_common::{not_impl_err, ExprSchema, Result}; use std::any::Any; use std::fmt; use std::fmt::Debug; @@ -180,6 +180,13 @@ impl ScalarUDF { self.inner.invoke(args) } + /// Invoke the function without `args` but number of rows, returning the appropriate result. + /// + /// See [`ScalarUDFImpl::invoke_no_args`] for more details. + pub fn invoke_no_args(&self, number_rows: usize) -> Result { + self.inner.invoke_no_args(number_rows) + } + /// Returns a `ScalarFunctionImplementation` that can invoke the function /// during execution pub fn fun(&self) -> ScalarFunctionImplementation { @@ -198,6 +205,10 @@ impl ScalarUDF { pub fn short_circuits(&self) -> bool { self.inner.short_circuits() } + + pub fn support_randomness(&self) -> bool { + self.inner.support_randomness() + } } impl From for ScalarUDF @@ -330,7 +341,20 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { /// /// [`ColumnarValue::values_to_arrays`] can be used to convert the arguments /// to arrays, which will likely be simpler code, but be slower. - fn invoke(&self, args: &[ColumnarValue]) -> Result; + fn invoke(&self, _args: &[ColumnarValue]) -> Result { + not_impl_err!( + "Function {} does not implement invoke but called", + self.name() + ) + } + + /// Invoke the function without `args` but number of rows, returning the appropriate result + fn invoke_no_args(&self, _number_rows: usize) -> Result { + not_impl_err!( + "Function {} does not implement invoke_no_args but called", + self.name() + ) + } /// Returns any aliases (alternate names) for this function. /// @@ -383,6 +407,12 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { fn short_circuits(&self) -> bool { false } + + /// Returns true if the function supports randomness, This is useful for functions that need to generate + /// random values for each row. `invoke_no_args` can be called in this case. + fn support_randomness(&self) -> bool { + false + } } /// ScalarUDF that adds an alias to the underlying function. It is better to diff --git a/datafusion/functions/src/math/random.rs b/datafusion/functions/src/math/random.rs index 885952778dea..8a993541eee3 100644 --- a/datafusion/functions/src/math/random.rs +++ b/datafusion/functions/src/math/random.rs @@ -16,12 +16,14 @@ // under the License. use std::any::Any; +use std::sync::Arc; +use arrow::array::Float64Array; use arrow::datatypes::DataType; use arrow::datatypes::DataType::Float64; use rand::{thread_rng, Rng}; -use datafusion_common::{Result, ScalarValue}; +use datafusion_common::Result; use datafusion_expr::ColumnarValue; use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; @@ -61,9 +63,14 @@ impl ScalarUDFImpl for RandomFunc { Ok(Float64) } - fn invoke(&self, _args: &[ColumnarValue]) -> Result { + fn support_randomness(&self) -> bool { + true + } + + fn invoke_no_args(&self, num_rows: usize) -> Result { let mut rng = thread_rng(); - let val = rng.gen_range(0.0..1.0); - Ok(ColumnarValue::Scalar(ScalarValue::Float64(Some(val)))) + let values = std::iter::repeat_with(|| rng.gen_range(0.0..1.0)).take(num_rows); + let array = Float64Array::from_iter_values(values); + Ok(ColumnarValue::Array(Arc::new(array))) } } diff --git a/datafusion/functions/src/string/uuid.rs b/datafusion/functions/src/string/uuid.rs index cedddc5a3ee2..707ac8130fff 100644 --- a/datafusion/functions/src/string/uuid.rs +++ b/datafusion/functions/src/string/uuid.rs @@ -16,12 +16,14 @@ // under the License. use std::any::Any; +use std::sync::Arc; +use arrow::array::GenericStringArray; use arrow::datatypes::DataType; use arrow::datatypes::DataType::Utf8; use uuid::Uuid; -use datafusion_common::{Result, ScalarValue}; +use datafusion_common::Result; use datafusion_expr::{ColumnarValue, Volatility}; use datafusion_expr::{ScalarUDFImpl, Signature}; @@ -55,11 +57,15 @@ impl ScalarUDFImpl for UuidFunc { Ok(Utf8) } + fn support_randomness(&self) -> bool { + true + } + /// Prints random (v4) uuid values per row /// uuid() = 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11' - fn invoke(&self, _args: &[ColumnarValue]) -> Result { - Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some( - Uuid::new_v4().to_string(), - )))) + fn invoke_no_args(&self, num_rows: usize) -> Result { + let values = std::iter::repeat_with(|| Uuid::new_v4().to_string()).take(num_rows); + let array = GenericStringArray::::from_iter_values(values); + Ok(ColumnarValue::Array(Arc::new(array))) } } diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index b8759d60a866..a16a00103f9d 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -145,7 +145,13 @@ impl PhysicalExpr for ScalarFunctionExpr { // evaluate the function match self.fun { - ScalarFunctionDefinition::UDF(ref fun) => fun.invoke(&inputs), + ScalarFunctionDefinition::UDF(ref fun) => { + if fun.support_randomness() { + fun.invoke_no_args(batch.num_rows()) + } else { + fun.invoke(&inputs) + } + } ScalarFunctionDefinition::Name(_) => { internal_err!( "Name function must be resolved to one of the other variants prior to physical planning" From 88d2a3336cd128363c3b1192d11762e22d07564f Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Fri, 26 Apr 2024 08:03:14 +0800 Subject: [PATCH 09/11] rm randomness Signed-off-by: jayzhan211 --- datafusion/expr/src/udf.rs | 26 +++++++------------ datafusion/functions/src/math/pi.rs | 6 ++++- datafusion/functions/src/math/random.rs | 6 ++--- datafusion/functions/src/string/uuid.rs | 6 ++--- .../physical-expr/src/scalar_function.rs | 2 +- 5 files changed, 21 insertions(+), 25 deletions(-) diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index d623d3541fc7..c9c11a6bbfea 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -205,10 +205,6 @@ impl ScalarUDF { pub fn short_circuits(&self) -> bool { self.inner.short_circuits() } - - pub fn support_randomness(&self) -> bool { - self.inner.support_randomness() - } } impl From for ScalarUDF @@ -333,6 +329,10 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { /// The function will be invoked passed with the slice of [`ColumnarValue`] /// (either scalar or array). /// + /// If the function does not take any arguments, please use [invoke_no_args] + /// instead and return [not_impl_err] for this function. + /// + /// /// # Performance /// /// For the best performance, the implementations of `invoke` should handle @@ -341,14 +341,12 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { /// /// [`ColumnarValue::values_to_arrays`] can be used to convert the arguments /// to arrays, which will likely be simpler code, but be slower. - fn invoke(&self, _args: &[ColumnarValue]) -> Result { - not_impl_err!( - "Function {} does not implement invoke but called", - self.name() - ) - } + /// + /// [invoke_no_args]: ScalarUDFImpl::invoke_no_args + fn invoke(&self, _args: &[ColumnarValue]) -> Result; - /// Invoke the function without `args` but number of rows, returning the appropriate result + /// Invoke the function without `args`, instead the number of rows are provided, + /// returning the appropriate result. fn invoke_no_args(&self, _number_rows: usize) -> Result { not_impl_err!( "Function {} does not implement invoke_no_args but called", @@ -407,12 +405,6 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { fn short_circuits(&self) -> bool { false } - - /// Returns true if the function supports randomness, This is useful for functions that need to generate - /// random values for each row. `invoke_no_args` can be called in this case. - fn support_randomness(&self) -> bool { - false - } } /// ScalarUDF that adds an alias to the underlying function. It is better to diff --git a/datafusion/functions/src/math/pi.rs b/datafusion/functions/src/math/pi.rs index c135a13f1256..f9403e411fe2 100644 --- a/datafusion/functions/src/math/pi.rs +++ b/datafusion/functions/src/math/pi.rs @@ -20,7 +20,7 @@ use std::any::Any; use arrow::datatypes::DataType; use arrow::datatypes::DataType::Float64; -use datafusion_common::{Result, ScalarValue}; +use datafusion_common::{not_impl_err, Result, ScalarValue}; use datafusion_expr::{ColumnarValue, FuncMonotonicity, Volatility}; use datafusion_expr::{ScalarUDFImpl, Signature}; @@ -61,6 +61,10 @@ impl ScalarUDFImpl for PiFunc { } fn invoke(&self, _args: &[ColumnarValue]) -> Result { + not_impl_err!("{} function does not accept arguments", self.name()) + } + + fn invoke_no_args(&self, _number_rows: usize) -> Result { Ok(ColumnarValue::Scalar(ScalarValue::Float64(Some( std::f64::consts::PI, )))) diff --git a/datafusion/functions/src/math/random.rs b/datafusion/functions/src/math/random.rs index 8a993541eee3..b5eece212a3b 100644 --- a/datafusion/functions/src/math/random.rs +++ b/datafusion/functions/src/math/random.rs @@ -23,7 +23,7 @@ use arrow::datatypes::DataType; use arrow::datatypes::DataType::Float64; use rand::{thread_rng, Rng}; -use datafusion_common::Result; +use datafusion_common::{not_impl_err, Result}; use datafusion_expr::ColumnarValue; use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; @@ -63,8 +63,8 @@ impl ScalarUDFImpl for RandomFunc { Ok(Float64) } - fn support_randomness(&self) -> bool { - true + fn invoke(&self, _args: &[ColumnarValue]) -> Result { + not_impl_err!("{} function does not accept arguments", self.name()) } fn invoke_no_args(&self, num_rows: usize) -> Result { diff --git a/datafusion/functions/src/string/uuid.rs b/datafusion/functions/src/string/uuid.rs index 707ac8130fff..9c97b4dd7413 100644 --- a/datafusion/functions/src/string/uuid.rs +++ b/datafusion/functions/src/string/uuid.rs @@ -23,7 +23,7 @@ use arrow::datatypes::DataType; use arrow::datatypes::DataType::Utf8; use uuid::Uuid; -use datafusion_common::Result; +use datafusion_common::{not_impl_err, Result}; use datafusion_expr::{ColumnarValue, Volatility}; use datafusion_expr::{ScalarUDFImpl, Signature}; @@ -57,8 +57,8 @@ impl ScalarUDFImpl for UuidFunc { Ok(Utf8) } - fn support_randomness(&self) -> bool { - true + fn invoke(&self, _args: &[ColumnarValue]) -> Result { + not_impl_err!("{} function does not accept arguments", self.name()) } /// Prints random (v4) uuid values per row diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index a16a00103f9d..3b360fc20c39 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -146,7 +146,7 @@ impl PhysicalExpr for ScalarFunctionExpr { // evaluate the function match self.fun { ScalarFunctionDefinition::UDF(ref fun) => { - if fun.support_randomness() { + if self.args.is_empty() { fun.invoke_no_args(batch.num_rows()) } else { fun.invoke(&inputs) From 7c8177650ca77c5da4c070d73528461276fa9ec7 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Fri, 26 Apr 2024 08:44:30 +0800 Subject: [PATCH 10/11] add func with no args Signed-off-by: jayzhan211 --- .../user_defined_scalar_functions.rs | 62 +++++++++++++++---- 1 file changed, 49 insertions(+), 13 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index 9b659c0724d9..eb52b8dd0d47 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -163,6 +163,48 @@ async fn scalar_udf() -> Result<()> { Ok(()) } +struct Simple0ArgsScalarUDF { + name: String, + signature: Signature, + return_type: DataType, +} + +impl std::fmt::Debug for Simple0ArgsScalarUDF { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("ScalarUDF") + .field("name", &self.name) + .field("signature", &self.signature) + .field("fun", &"") + .finish() + } +} + +impl ScalarUDFImpl for Simple0ArgsScalarUDF { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + &self.name + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(self.return_type.clone()) + } + + fn invoke(&self, _args: &[ColumnarValue]) -> Result { + not_impl_err!("{} function does not accept arguments", self.name()) + } + + fn invoke_no_args(&self, _number_rows: usize) -> Result { + Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(100)))) + } +} + #[tokio::test] async fn scalar_udf_zero_params() -> Result<()> { let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); @@ -174,20 +216,14 @@ async fn scalar_udf_zero_params() -> Result<()> { let ctx = SessionContext::new(); ctx.register_batch("t", batch)?; - // create function just returns 100 regardless of inp - let myfunc = Arc::new(|_args: &[ColumnarValue]| { - Ok(ColumnarValue::Array( - Arc::new((0..1).map(|_| 100).collect::()) as ArrayRef, - )) - }); - ctx.register_udf(create_udf( - "get_100", - vec![], - Arc::new(DataType::Int32), - Volatility::Immutable, - myfunc, - )); + let get_100_udf = Simple0ArgsScalarUDF { + name: "get_100".to_string(), + signature: Signature::exact(vec![], Volatility::Immutable), + return_type: DataType::Int32, + }; + + ctx.register_udf(ScalarUDF::from(get_100_udf)); let result = plan_and_collect(&ctx, "select get_100() a from t").await?; let expected = [ From bd4c65b5a7adcaaa478a2c906248668572ac8bf0 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Fri, 26 Apr 2024 17:28:59 +0800 Subject: [PATCH 11/11] array Signed-off-by: jayzhan211 --- datafusion/functions-array/src/make_array.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/functions-array/src/make_array.rs b/datafusion/functions-array/src/make_array.rs index 0439a736ee42..770276938f6b 100644 --- a/datafusion/functions-array/src/make_array.rs +++ b/datafusion/functions-array/src/make_array.rs @@ -104,6 +104,10 @@ impl ScalarUDFImpl for MakeArray { make_scalar_function(make_array_inner)(args) } + fn invoke_no_args(&self, _number_rows: usize) -> Result { + make_scalar_function(make_array_inner)(&[]) + } + fn aliases(&self) -> &[String] { &self.aliases }