Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Serde for ScalarUDF in Physical Expressions #9436

Merged
merged 14 commits into from
Mar 19, 2024

Conversation

yyy1000
Copy link
Contributor

@yyy1000 yyy1000 commented Mar 3, 2024

Which issue does this PR close?

Closes #9428.

Rationale for this change

See the issue.

What changes are included in this PR?

To do

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the physical-expr Physical Expressions label Mar 3, 2024
Copy link
Contributor Author

@yyy1000 yyy1000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @thinkharderdev
I began this PR for physical-expr and I have a question.
Would like to see your thought!

} else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
} else {
let mut buf = Vec::new();
// let _ = codec.try_encode_udf(, &mut buf);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question, how can I get a ScalarUDF from a ScalarFunctionExpr?
Given its current structure, I would think change the fields in ScalarFunctionExpr may be a method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also tried using create_udf, but it seems can not work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming this is what you mean in your below comment but I think we need to change

pub struct ScalarFunctionExpr {
    fun: ScalarFunctionImplementation,

to

pub struct ScalarFunctionExpr {
    fun: ScalarFunctionDefinition,

@alamb This would technically be a breaking change since ScalarFunctionExpr has a public method

pub fn fun(&self) -> &ScalarFunctionImplementation {
        &self.fun
}

We could technically do this is in a backwards compatible way I think by creating a corresponding ScalarFunctionImplementation from the ScalarFunctionImplementation or we could just change the public API. Seems to me like this would probably not be a very disruptive breaking change since ScalarFunctionImplementation is just an opaque function pointer so unlikely this is being used in application code in a way that wouldn't be easily adaptable if it started returning &ScalarFunctionDefinition instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review! @thinkharderdev
Yeah, the change is what I thought. And I also think some other fields in ScalarFunctionExpr can be removed like name and return_type.

Yeah, I think create a ScalarFunctionImplementation from ScalarFunctionDefinition is a method. Given the BuiltinScalarFunction will be removed and we can get the inner of a ScalarUDF which is a ScalarUDFImpl, and we can do something like.

let captured = inner.clone();
Arc::new(move |args| captured.invoke(args))

I'd like to see the opinions from you and @alamb :)

};

/// Physical expression of a scalar function
pub struct ScalarFunctionExpr {
fun: ScalarFunctionImplementation,
fun: ScalarFunctionDefinition,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the fun here can be a ScalarFunctionDefinition to get the ScalarUDF from ScalarFunctionExpr?
Also I searched the code base and found ScalarFunctionImplementation is only used for:

  1. BuiltinScalarFunction
  2. method like create_udf which I think will be deprecated in the future given the new ScalarUDFImpl way.
    So I think ScalarFunctionImplementation will also be deleted in a day. 🤔 WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this should be ScalarFunctionDefinition here. I don't think create_udf will be deprecated but we have ScalarUdfLegacyWrapper to wrap ScalarFunctionImplementation and create a ScalarUDFImpl out of it. So the new ScalarFunctionDefinition would include anything that is represented as a ScalarFunctionImplementation

@yyy1000 yyy1000 marked this pull request as draft March 3, 2024 03:42
@yyy1000 yyy1000 marked this pull request as ready for review March 11, 2024 04:52
@yyy1000
Copy link
Contributor Author

yyy1000 commented Mar 11, 2024

I think I need some feedback on #9436 (comment) and #9436 (comment) to continue this PR. 🤔

@thinkharderdev
Copy link
Contributor

I think I need some feedback on #9436 (comment) and #9436 (comment) to continue this PR. 🤔

I think it is fine to replace ScalarFunctionImplementation with ScalarUDF and end up with something like:

pub struct ScalarFunctionExpr {
    fun: ScalarUDF,
    args: Vec<Arc<dyn PhysicalExpr>>,
    // Whether this function can be invoked with zero arguments
    supports_zero_argument: bool,
}

impl ScalarFunctionExpr {
  // Change this to return &ScalarUDF instead of &ScalarFunctionImplementation
  fn fun(&self) -> &ScalarUDF {
    &self.fun
  ]
}

@yyy1000
Copy link
Contributor Author

yyy1000 commented Mar 11, 2024

I think it is fine to replace ScalarFunctionImplementation with ScalarUDF and end up with something like:

pub struct ScalarFunctionExpr {
    fun: ScalarUDF,
    args: Vec<Arc<dyn PhysicalExpr>>,
    // Whether this function can be invoked with zero arguments
    supports_zero_argument: bool,
}

impl ScalarFunctionExpr {
  // Change this to return &ScalarUDF instead of &ScalarFunctionImplementation
  fn fun(&self) -> &ScalarUDF {
    &self.fun
  ]
}

A question is, currently there're still BuiltInFunctions, should ScalarUDF be ScalarFunctionDefinition?
Later after porting is finished we can use ScalarUDF.

@thinkharderdev
Copy link
Contributor

I think it is fine to replace ScalarFunctionImplementation with ScalarUDF and end up with something like:

pub struct ScalarFunctionExpr {
    fun: ScalarUDF,
    args: Vec<Arc<dyn PhysicalExpr>>,
    // Whether this function can be invoked with zero arguments
    supports_zero_argument: bool,
}

impl ScalarFunctionExpr {
  // Change this to return &ScalarUDF instead of &ScalarFunctionImplementation
  fn fun(&self) -> &ScalarUDF {
    &self.fun
  ]
}

A question is, currently there're still BuiltInFunctions, should ScalarUDF be ScalarFunctionDefinition? Later after porting is finished we can use ScalarUDF.

Yeah I think that's correct

@yyy1000 yyy1000 marked this pull request as draft March 12, 2024 17:33
@yyy1000
Copy link
Contributor Author

yyy1000 commented Mar 12, 2024

I'm waiting on this #9537 to be merged.
This can reduce some effort caused by ExecutionProps when create a physical scalar function from BuiltIn

@github-actions github-actions bot added the core Core DataFusion crate label Mar 13, 2024
@yyy1000 yyy1000 marked this pull request as ready for review March 13, 2024 21:03
Copy link
Contributor Author

@yyy1000 yyy1000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @thinkharderdev
I finished the PR for Serde in PhysicalExpr, though a little delay
Could you review it when you are available?
Thanks!

Arc::new(|_: &[ColumnarValue]| unimplemented!("not implemented")),
ScalarFunctionDefinition::UDF(Arc::new(ScalarUDF::new_from_impl(
DummyUDF::new(),
))),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since here ScalarFunctionExpr needs a ScalarFunctionDefinition, I created one for each. Don't know whether there's a better way to do it.

@@ -57,7 +58,7 @@ pub fn create_physical_expr(
fun: &BuiltinScalarFunction,
input_phy_exprs: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
execution_props: &ExecutionProps,
_execution_props: &ExecutionProps,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the ExecutionProps param can be moved, but I'd like to do it in a follow-up PR because doing all in a PR may make me less easy to inspect error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird, seems like it never needed this param

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it doesn't need this param now, but before it needs this for BuiltInFunction to create physical function for some Date function like to_date. Since these functions has been ported, it will not need this param anymore.

let monotonicity = fun.monotonicity();

let fun_def = ScalarFunctionDefinition::BuiltIn(*fun);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I just pass the fun_def, will call create_physical_fun when execute the PhysicalExpr

@@ -171,8 +171,17 @@ impl PhysicalExpr for ScalarFunctionExpr {
.collect::<Result<Vec<_>>>()?,
};

let fun_implementation = match self.fun {
ScalarFunctionDefinition::BuiltIn(ref fun) => create_physical_fun(fun)?,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it will call create_physical_fun to get the FunctionImplementation

let expr = parse_physical_expr(expr.as_ref(), registry, input_schema)?;
let codec = DefaultPhysicalExtensionCodec {};
let expr =
parse_physical_expr(expr.as_ref(), registry, input_schema, &codec)?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I create DefaultPhysicalExtensionCodec for cases that will not need ExtensionCodec

let udf = match &e.fun_definition {
Some(buf) => codec.try_decode_udf(&e.name, buf)?,
None => registry.udf(e.name.as_str())?,
};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the decode logic

value: Arc<dyn PhysicalExpr>,
codec: &dyn PhysicalExtensionCodec,
) -> Result<protobuf::PhysicalExprNode, DataFusionError> {
let expr = value.as_any();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied the method from LogicalExpr

@@ -665,6 +670,133 @@ fn roundtrip_scalar_udf() -> Result<()> {
roundtrip_test_with_context(Arc::new(project), ctx)
}

#[test]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the test case, mainly copied from LogicalExpr Serde test

@yyy1000
Copy link
Contributor Author

yyy1000 commented Mar 13, 2024

I believe that CI fails because of some system glitch, so this PR is still ready for review.

@yyy1000 yyy1000 requested a review from thinkharderdev March 19, 2024 00:35
Copy link
Contributor

@thinkharderdev thinkharderdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice work! I had a few nitpicks but otherwise I think this is ready

@@ -57,7 +58,7 @@ pub fn create_physical_expr(
fun: &BuiltinScalarFunction,
input_phy_exprs: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
execution_props: &ExecutionProps,
_execution_props: &ExecutionProps,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird, seems like it never needed this param

datafusion/physical-expr/src/scalar_function.rs Outdated Show resolved Hide resolved
datafusion/proto/src/physical_plan/to_proto.rs Outdated Show resolved Hide resolved
datafusion/proto/tests/cases/roundtrip_physical_plan.rs Outdated Show resolved Hide resolved
@thinkharderdev
Copy link
Contributor

I believe that CI fails because of some system glitch, so this PR is still ready for review.

Yep, I re-ran the job and it worked

yyy1000 and others added 2 commits March 19, 2024 10:03
Co-authored-by: Dan Harris <1327726+thinkharderdev@users.noreply.github.com>
Co-authored-by: Dan Harris <1327726+thinkharderdev@users.noreply.github.com>
@yyy1000
Copy link
Contributor Author

yyy1000 commented Mar 19, 2024

Thanks @thinkharderdev for your review!
I addressed the comment and I think it's good to go now :)

@thinkharderdev
Copy link
Contributor

Thanks @thinkharderdev for your review! I addressed the comment and I think it's good to go now :)

Thank you for your work on this!

@thinkharderdev thinkharderdev merged commit 0974759 into apache:main Mar 19, 2024
23 checks passed
@yyy1000 yyy1000 deleted the serde-phsyical branch March 19, 2024 18:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support Serde for ScalarUDF in Physical Expressions
2 participants