-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support ORDER BY in AggregateUDF #9249
Conversation
Thank you @jayzhan211 -- I plan to review this tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for working on this @jayzhan211 . I had some comments about the API. Sorry for the delay in reviews
@@ -90,8 +90,15 @@ pub fn create_window_expr( | |||
)) | |||
} | |||
WindowFunctionDefinition::AggregateUDF(fun) => { | |||
let aggregate = | |||
udaf::create_aggregate_expr(fun.as_ref(), args, input_schema, name)?; | |||
// TODO: Ordering not supported for Window UDFs yet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably file a ticket for this as well -- I can do so as part of this PR's review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @jayzhan211 -- this pr is looking close. I still think the API needs a little tweaking as I mentioned in the comments
I apologize for the delay in review and feedback and I feel back about the back and forth. I am still catching up from last week. If you don't have a chance I hope to find some time to help push this PR through myself later this week
&self, | ||
_arg: &DataType, | ||
_sort_exprs: Vec<Expr>, | ||
_schmea: Option<Schema>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_schmea: Option<Schema>, | |
_schema: Option<Schema>, |
@@ -255,15 +261,20 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { | |||
|
|||
/// Return a new [`Accumulator`] that aggregates values for a specific | |||
/// group during query execution. | |||
fn accumulator(&self, arg: &DataType) -> Result<Box<dyn Accumulator>>; | |||
fn accumulator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this API:
- Doesn't need an owned Schema (passing in a
Schema
requires cloning which I think we can avoid - I am not sure why it needs an
Option
(why is it not always passed in) - Should have the documentation updated to explain what sort_exprs are and what schema is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Option is because Schema is only used for ordering, so if we don't care about ordering we can just pass None.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe we should return the schema in case we need it in the future not only for ordering? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless there is some reason not to always pass in schema
I think we should always pass it in to make for an easier to use API. For example, if it is an Option
the user would have to call unwrap()
or check / error when they got sort exprs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jayzhan211 -- I had a play around with the APIs and the code in this PR and I came up with this: jayzhan211#1
I see the challenge now that the create_accumulator
function is invoked by the physical plan when there are no more Exprs / Schemas.
I think the next thing to do is figure out how ARRAY_AGG or some other built in aggregate function handles ordering and see if we can emulate the same thing
@@ -255,15 +261,20 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { | |||
|
|||
/// Return a new [`Accumulator`] that aggregates values for a specific | |||
/// group during query execution. | |||
fn accumulator(&self, arg: &DataType) -> Result<Box<dyn Accumulator>>; | |||
fn accumulator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless there is some reason not to always pass in schema
I think we should always pass it in to make for an easier to use API. For example, if it is an Option
the user would have to call unwrap()
or check / error when they got sort exprs
datafusion/expr/src/udaf.rs
Outdated
/// group during query execution. sort_exprs is a list of ordering expressions, | ||
/// and schema is used while ordering. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// group during query execution. sort_exprs is a list of ordering expressions, | |
/// and schema is used while ordering. | |
/// group during query execution. | |
/// | |
/// `arg`: the type of the argument to this accumulator | |
/// | |
/// `sort_exprs`: contains a list of `Expr::SortExpr`s if the | |
/// aggregate is called with an explicit `ORDER BY`. For example, | |
/// `ARRAY_AGG(x ORDER BY y ASC)`. In this case, `sort_exprs` would contain `[y ASC]` | |
/// | |
/// `schema` is the input schema to the aggregate |
datafusion/expr/src/udaf.rs
Outdated
@@ -276,6 +288,16 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { | |||
fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> { | |||
not_impl_err!("GroupsAccumulator hasn't been implemented for {self:?} yet") | |||
} | |||
|
|||
/// Return the ordering expressions for the accumulator | |||
fn sort_exprs(&self) -> Vec<Expr> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand how the sort_exprs could be supplied by the UDAF. I would expect them to be provided to the UDAF based on what was in the query
Edit: I think we can just pass the logical ordering expr and not move the conversion downstream. I think the problem here is that we need logical expr (ordering and schema) for accumulator. But the arguments pass into I'm thinking about changing the function here to let us done the converting of logical to physical lately, so we can keep the logical expr for accumulator. Other builtin accumulator don't have the problem because they are all defined in physical-expr unlike udaf. @alamb Does the move of physical expression conversion make senses? |
0dbfbad
to
80eae25
Compare
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
This comment was marked as outdated.
This comment was marked as outdated.
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
data_type, | ||
&ordering_dtypes, | ||
ordering_req, | ||
false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignore_nulls
or other arguments are given in the accumulator provided by the user.
@@ -517,7 +517,10 @@ impl AsExecutionPlan for PhysicalPlanNode { | |||
} | |||
AggregateFunction::UserDefinedAggrFunction(udaf_name) => { | |||
let agg_udf = registry.udaf(udaf_name)?; | |||
udaf::create_aggregate_expr(agg_udf.as_ref(), &input_phy_expr, &physical_schema, name) | |||
// TODO: `order by` is not supported for UDAF yet | |||
let sort_exprs = &[]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to convert Vec<PhysicalSortExprNode>
to Vec<Expr>
, not sure if we should include it in this PR or not.
@alamb I think it is ready for review |
Thanks @jayzhan211 -- I will check it out, hopefully tomorrow |
I plan to change |
Which issue does this PR close?
Closes #8984.
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?