-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support compute return types from argument values (not just their DataTypes) #8985
Conversation
Thanks @yyy1000 👍 , could you check the output schema in your test like main...JasonLi-cn:arrow-datafusion:feature/refactor_udf |
@Weijun-H Thanks for your review! |
datafusion/expr/src/udf.rs
Outdated
fn return_type_from_exprs( | ||
&self, | ||
arg_exprs: &[Expr], | ||
schema: &DFSchema, | ||
) -> Result<DataType> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fn return_type_from_exprs( | |
&self, | |
arg_exprs: &[Expr], | |
schema: &DFSchema, | |
) -> Result<DataType> { | |
fn return_type_from_exprs<S: ExprSchema>( | |
&self, | |
arg_exprs: &[Expr], | |
schema: S, | |
) -> Result<DataType> { |
@Weijun-H If I'd like to make a change like this, there's an error 🥲
for a trait to be "object safe" it needs to allow building a vtable to allow the call to be resolvable dynamically;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am unfamiliar with this part. It seems you need to extend ScalarUDFImpl
to allow ExprSchema
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe here changing return_type to return_type_from_exprs would be a good idea. But a problem
is here the schema has type S: ExprSchema while return_type_from_exprs can't accept a generic type parameter S because it's in a trait. 🤔 What do you think?
Maybe you can use a trait object like &dyn ExprSchema
🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb Sorry for the delay.
I think the only problem that blocks me is the argument type of the function. I had some issue when passing schema
as a param.
I can also search for some ways to do it.🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool -- thanks -- I'll try and find some time to play around with it but it may be a while
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yyy1000 the following changes worked for me. If anyone else have a better solution please suggest it. I am still a newbie in rust but trying to help to learn.
expr_schema.rs
return_types_udf.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those look like great changes to me @brayanjuls -- 👌
What do you think @yyy1000 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for help! @alamb @brayanjuls
However, I have tried this method before and it will not work.
I will show you the reason on my side. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
&self.signature | ||
} | ||
fn return_type(&self, _: &[DataType]) -> Result<DataType> { | ||
Ok(DataType::Int32) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty confusing I think -- as it seems inconsistent with the return_type_from_exprs
} | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this example is missing actually using the function in a query / dataframe. As @Weijun-H pointed out the logic added to ScalarUDFImpl
doesn't seem to be connected anywhere else 🤔
What I think the example needs to do is someething like
- Create a ScalarUDF
- register the function with a
SessionContext
- Run a query that uses that function (ideally both with SQL and dataframe APIs)
So for example, a good example function might be a function that takes a string argument select my_cast(<arg>, 'string')
that converts the argument based on the value of the string
Then for example run queries like
select my_cast(c1, 'i32'), arrow_typeof(my_cast(c1, 'i32')); -- returns value and DataType::Int32
select my_cast(c1, 'i64'), arrow_typeof(my_cast(c1, 'i64')); -- returns value and DataType::Int64
Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I realized that it missing using the function.
I tried to make changes in https://github.com/apache/arrow-datafusion/blob/4d02cc0114908d4f805b2323f20751b1f6d9c2f4/datafusion/expr/src/expr_schema.rs#L106-L108
to replace return_type
with return_type_from_exprs
. But passing schema
as a param is a problem cause the type of schema
is a generic type S
. 🤔
What is the status of this PR -- are you waiting for more feedback or help @yyy1000 ? |
datafusion/expr/src/expr_schema.rs
Outdated
@@ -136,7 +136,7 @@ impl ExprSchemable for Expr { | |||
fun.return_type(&arg_data_types) | |||
} | |||
ScalarFunctionDefinition::UDF(fun) => { | |||
Ok(fun.return_type(&arg_data_types)?) | |||
fun.return_type_from_exprs(args, schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb @brayanjuls . It's here that it gets the return type of ScalarUDF
, and for UDF, it should be changed to this.
However, this can't work because schema
needs to be Sized
. Here is the error from my complier.
@@ -249,6 +261,22 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { | |||
/// the arguments | |||
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType>; | |||
|
|||
/// What [`DataType`] will be returned by this function, given the types of | |||
/// the expr arguments | |||
fn return_type_from_exprs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we need to use the ExprSchema
here, but there's issues going from <S: ExprSchema>
to &dyn ExprSchema
inside the trait, what if we moved up the default implementation out of the trait and into ScalarUDF
?
we could change the trait impl to something like this
pub trait ScalarUDFImpl: Debug + Send + Sync {
/// What [`DataType`] will be returned by this function, given the types of
/// the expr arguments
fn return_type_from_exprs(
&self,
arg_exprs: &[Expr],
schema: &dyn ExprSchema,
) -> Option<Result<DataType>> {
// The default implementation returns None
// so that people don't have to implement `return_type_from_exprs` if they dont want to
None
}
}
then change the ScalarUDF
impl to this
impl ScalarUDF
/// The datatype this function returns given the input argument input types.
/// This function is used when the input arguments are [`Expr`]s.
/// See [`ScalarUDFImpl::return_type_from_exprs`] for more details.
pub fn return_type_from_exprs<S: ExprSchema>(
&self,
args: &[Expr],
schema: &S,
) -> Result<DataType> {
// If the implementation provides a return_type_from_exprs, use it
if let Some(return_type) = self.inner.return_type_from_exprs(args, schema) {
return_type
// Otherwise, use the return_type function
} else {
let arg_types = args
.iter()
.map(|arg| arg.get_type(schema))
.collect::<Result<Vec<_>>>()?;
self.return_type(&arg_types)
}
}
}
this way we don't need to constrain the ExprSchemable
functions to ?Sized
, and we can update the get_type
function to use the return_type_from_exprs
without any compile time errors.
ScalarFunctionDefinition::UDF(fun) => {
Ok(fun.return_type_from_exprs(&args, schema)?)
}
and it still makes return_type_from_exprs
an opt-in method.
It does make it very slightly less ergonomic as end users now need to wrap their body in an Option
, but overall i think it's a decent compromise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works well on my side. Thanks!
Another question for me is, how can user implement return_type_from_exprs
when using schema
. For example, arg_exprs.get(take_idx).unwrap().get_type(schema)
will lead an error
the size for values of type
dyn ExprSchema
cannot be known at compilation time
the traitSized
is not implemented fordyn ExprSchema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm yeah @yyy1000 you still run into the same error then.
I'm wondering if it'd be easiest to just change the type signature on ExprSchemable
to be generic over the trait instead of the functions.
pub trait ExprSchemable<S: ExprSchema> {
/// given a schema, return the type of the expr
fn get_type(&self, schema: &S) -> Result<DataType>;
/// given a schema, return the nullability of the expr
fn nullable(&self, input_schema: &S) -> Result<bool>;
/// given a schema, return the expr's optional metadata
fn metadata(&self, schema: &S) -> Result<HashMap<String, String>>;
/// convert to a field with respect to a schema
fn to_field(&self, input_schema: &DFSchema) -> Result<DFField>;
/// cast to a type with respect to a schema
fn cast_to(self, cast_to_type: &DataType, schema: &S) -> Result<Expr>;
}
impl ExprSchemable<DFSchema> for Expr {
//...
}
then the trait can just go back to the original implementation you had using &DFSchema
fn return_type_from_exprs(
&self,
arg_exprs: &[Expr],
schema: &DFSchema,
) -> Result<DataType> {
let arg_types = arg_exprs
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
self.return_type(&arg_types)
}
I tried this locally and was able to get things to compile locally, and was able to implement a udf using the trait.
It does make it a little less flexible as it's expecting a DFSchema
, but i think thats ok?
I think the only other approach would be to make ScalarUDFImpl
dynamic over <S: ExprSchema>
, but I feel like that's much less ideal than just using a concrete type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your help! @universalmind303
It looks good and I think I can try it to see. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update is I changed the signature to take &dyn ExprSchema
which seems to have worked just fine
datafusion/physical-expr/src/udf.rs
Outdated
@@ -38,7 +38,7 @@ pub fn create_physical_expr( | |||
fun.name(), | |||
fun.fun(), | |||
input_phy_exprs.to_vec(), | |||
fun.return_type(&input_exprs_types)?, | |||
fun.return_type_from_exprs(&input_phy_exprs, input_schema)?, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @universalmind303 's method helped a lot and it's getting closer!
I think maybe the last issue is here how can we convert input_phy_exprs
of type &[Arc] to &[Expr]. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't possible in general to go from physical expressions back to Exprs.
I recommend adding a return_type
argument to this function and thus require the caller to pass it in.
I believe the caller of udf::create_physical_expr
has this information:
/// Create a physical expression of the UDF.
///
/// Note that the return type must match the type reported by fun
pub fn create_physical_expr(
fun: &ScalarUDF,
input_phy_exprs: &[Arc<dyn PhysicalExpr>],
return_type: DataType,
) -> Result<Arc<dyn PhysicalExpr>> {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this PR @yyy1000 and for the suggestions @universalmind303 and @Weijun-H
I think this is a pretty important strategic feature, so I want to help. Here is a PR with changes to this one that I think solves the last little bit: yyy1000#1
Let me know what you think
datafusion/physical-expr/src/udf.rs
Outdated
@@ -38,7 +38,7 @@ pub fn create_physical_expr( | |||
fun.name(), | |||
fun.fun(), | |||
input_phy_exprs.to_vec(), | |||
fun.return_type(&input_exprs_types)?, | |||
fun.return_type_from_exprs(&input_phy_exprs, input_schema)?, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't possible in general to go from physical expressions back to Exprs.
I recommend adding a return_type
argument to this function and thus require the caller to pass it in.
I believe the caller of udf::create_physical_expr
has this information:
/// Create a physical expression of the UDF.
///
/// Note that the return type must match the type reported by fun
pub fn create_physical_expr(
fun: &ScalarUDF,
input_phy_exprs: &[Arc<dyn PhysicalExpr>],
return_type: DataType,
) -> Result<Arc<dyn PhysicalExpr>> {
* Improve return_type_from_args * Rework example * Update datafusion/core/tests/user_defined/user_defined_scalar_functions.rs --------- Co-authored-by: Junhao Liu <junhaoliu2023@gmail.com>
Co-authored-by: Alex Huang <huangweijun1001@gmail.com>
I am going to try and fix the CI checks here |
datafusion/expr/src/expr_schema.rs
Outdated
} | ||
|
||
impl ExprSchemable for Expr { | ||
impl ExprSchemable<DFSchema> for Expr { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this means the only way to use Expr::get_type is now if you have a DFSchema with requires bringing in the entire core crate
I think the reason this trait was the way it was orginally was to allow things like dask-sql to use just the logical expr parts.
I am looking into this more carefully
@@ -494,6 +495,127 @@ async fn test_user_defined_functions_zero_argument() -> Result<()> { | |||
Ok(()) | |||
} | |||
|
|||
#[derive(Debug)] | |||
struct TakeUDF { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is an example of the feature working
@@ -37,19 +37,19 @@ use std::sync::Arc; | |||
/// trait to allow expr to typable with respect to a schema | |||
pub trait ExprSchemable { | |||
/// given a schema, return the type of the expr | |||
fn get_type<S: ExprSchema>(&self, schema: &S) -> Result<DataType>; | |||
fn get_type(&self, schema: &dyn ExprSchema) -> Result<DataType>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to change the traits to use dyn
dispatch rather than generics so that the UDF could use the same object (and e.g. not have to create its own copy of these methods for Expr)
I expect this to have 0 performance impact, but I will run the planning benchmarks to be sure if this acceptable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran
cargo bench --bench sql_planner
And the results looked good ( within the noise threshold / reported 1% slower which I don't attribute to this change)
@@ -249,6 +261,22 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { | |||
/// the arguments | |||
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType>; | |||
|
|||
/// What [`DataType`] will be returned by this function, given the types of | |||
/// the expr arguments | |||
fn return_type_from_exprs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update is I changed the signature to take &dyn ExprSchema
which seems to have worked just fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR is looking ready to go. However, given I contributed a bunch of code I think it would be good for another committer to give it a review prior to merging.
Thanks again @yyy1000 @Weijun-H and @universalmind303
@Jefffrey do you possibly have time to review this PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems reasonable to me 👍
Just some minor doc comments
/// What [`DataType`] will be returned by this function, given the | ||
/// arguments? | ||
/// | ||
/// Note most UDFs should implement [`Self::return_type`] and not this | ||
/// function. The output type for most functions only depends on the types | ||
/// of their inputs (e.g. `sqrt(f32)` is always `f32`). | ||
/// | ||
/// By default, this function calls [`Self::return_type`] with the | ||
/// types of each argument. | ||
/// | ||
/// This method can be overridden for functions that return different | ||
/// *types* based on the *values* of their arguments. | ||
/// | ||
/// For example, the following two function calls get the same argument | ||
/// types (something and a `Utf8` string) but return different types based | ||
/// on the value of the second argument: | ||
/// | ||
/// * `arrow_cast(x, 'Int16')` --> `Int16` | ||
/// * `arrow_cast(x, 'Float32')` --> `Float32` | ||
/// | ||
/// # Notes: | ||
/// | ||
/// This function must consistently return the same type for the same | ||
/// logical input even if the input is simplified (e.g. it must return the same | ||
/// value for `('foo' | 'bar')` as it does for ('foobar'). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add some documentation about what would happen if a user tries to implement both return_type()
and return_type_from_exprs()
? (Which takes priority, etc.)
And what the suggested implementation for return_type()
be if they choose to implement return_type_from_exprs()
instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent idea - added in 653577f
datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
Outdated
Show resolved
Hide resolved
datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
Outdated
Show resolved
Hide resolved
datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
Outdated
Show resolved
Hide resolved
Co-authored-by: Jeffrey Vo <jeffrey.vo.australia@gmail.com>
…type_from_exprs` are called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @yyy1000 and @universalmind303 @Weijun-H , @Dandandan and @Jefffrey for the great reviews.
I am just running a quick sql planning benchmark and then I plan to merge this PR in
cc @brayanjuls
I don't actually think this is an API change so removing the label |
Which issue does this PR close?
Closes #8624
Rationale for this change
This PR will extend ScalarFunctionImpl, so that when users want to create a
ScalarUDF
by implementingScalarUDFImpl
, they can implement functionreturn_type_from_exprs
intrait ScalarUDFImpl
. It also provides a default implementation that wil callreturn_type
.What changes are included in this PR?
ScalarUDFImpl
by adding a newfn
and default implementationAre these changes tested?
Yes, by adding an example and I tested it.