-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make FirstValue an UDAF, Change AggregateUDFImpl::accumulator
signature, support ORDER BY for UDAFs
#9874
Conversation
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
AggregateUDFImpl::accumulator
signature
AggregateUDFImpl::accumulator
signatureAggregateUDFImpl::accumulator
signature, support ORDER BY for UDAFs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️ -- looking very good @jayzhan211
} | ||
|
||
fn state_type(&self, _return_type: &DataType) -> Result<Vec<DataType>> { | ||
Ok(self.state_type.clone()) | ||
} | ||
} | ||
|
||
pub struct FirstValue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see -- it seems like the issue is that the accumulator implementation requires PhysicalSortExpr.
To pull the code into its own crate maybe we could pull out the relevant pieces of datafusion-physical-expr
into datafusion-physical-core
or something (as a follow on PR)
@@ -710,6 +712,16 @@ pub fn create_udaf( | |||
)) | |||
} | |||
|
|||
/// Creates a new UDAF with a specific signature, state type and return type. | |||
/// The signature and state type must match the `Accumulator's implementation`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might make sense to add a comment explaining this is a temporary solution (i.e. that the idea is we'll pull the function out into its own crate, but for now we need to keep the physical implementation separate
datafusion/expr/src/expr_fn.rs
Outdated
f.debug_struct("FirstValue") | ||
.field("name", &self.name) | ||
.field("signature", &self.signature) | ||
.field("fun", &"<FUNC>") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.field("fun", &"<FUNC>") | |
.field("accumulator", &"<FUNC>") |
datafusion/expr/src/function.rs
Outdated
pub data_type: &'a DataType, // the return type of the function | ||
pub schema: &'a Schema, // the schema of the input arguments | ||
pub ignore_nulls: bool, // whether to ignore nulls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these field are pub
I think they show up in the rustdocs. Can you please make them with three ///
and above the field in question to show up in the docs?
pub schema: &'a Schema, // the schema of the input arguments | ||
pub ignore_nulls: bool, // whether to ignore nulls | ||
|
||
// ordering arguments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also please document how we would tell if there was no ORDER BY
specified? (is sort_exprs
empty?)
datafusion/expr/src/function.rs
Outdated
pub ignore_nulls: bool, // whether to ignore nulls | ||
|
||
// ordering arguments | ||
pub sort_exprs: &'a [Expr], // the expressions of `order by` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you made this Option<&'a [Expr]>
that would probably be easier for UDF implementors to check if ORDER BY was specified
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can have an empty vec to describe no ORDER BY
is given.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another approach is always to set ordering with another function with_ordering(sort_exprs: &'a [Expr])
.
datafusion/expr/src/udaf.rs
Outdated
/// | ||
/// `arg`: the type of the argument to this accumulator | ||
/// | ||
/// `sort_exprs`: contains a list of `Expr::SortExpr`s if the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these comments are now out of date
datafusion/expr/src/udaf.rs
Outdated
&self, | ||
_name: &str, | ||
_value_type: DataType, | ||
_ordering_fields: Vec<Field>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are ordering_fields
used for? I think they should be documented
datafusion/expr/src/udaf.rs
Outdated
|
||
/// Return the type used to serialize the [`Accumulator`]'s intermediate state. | ||
/// See [`Accumulator::state()`] for more details | ||
fn state_type(&self, return_type: &DataType) -> Result<Vec<DataType>>; | ||
|
||
/// Return the fields of the intermediate state. It is mutually exclusive with [`Self::state_type`]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are going to change the AggregateUDFImpl anyways, maybe we should just remove state_type
and always require state_fields
?
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
let accumulator: AccumulatorFactoryFunction = | ||
Arc::new(|_| Ok(Box::<AvgAccumulator>::default())); | ||
let my_avg = AggregateUDF::from(SimpleAggregateUDF::new_with_signature( | ||
"MY_AVG", | ||
Signature::uniform(1, vec![DataType::Float64], Volatility::Immutable), | ||
return_type, | ||
accumulator, | ||
state_type, | ||
vec![ | ||
Field::new("count", DataType::UInt64, true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually think this is a nice change that the fields are now named rather than anonymous
I think @jacksonrnewhouse mentioned this might be helpful for his project as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jayzhan211 -- I think this looks good to me
What are you thinking of next? To try and pull first_value into its own crate (to prove it is separable)?
Yes, I want to move it to its own crate and be able to register like scalar functions. And, move PhysicalSorting to |
@@ -85,13 +87,21 @@ impl AggregateUDFImpl for GeoMeanUdaf { | |||
/// is supported, DataFusion will use this row oriented | |||
/// accumulator when the aggregate function is used as a window function | |||
/// or when there are only aggregates (no GROUP BY columns) in the plan. | |||
fn accumulator(&self, _arg: &DataType) -> Result<Box<dyn Accumulator>> { | |||
fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about the impact on this API for UDAF writers last night.
Specifically, about the many existing UDAFs that exist / will exist at the time this change gets released and on the first time people encounter / try to use this API. i think the args with datatypes is much easier to use (and has less mental gymnastics to use). Thus I am going to propose an easier / beginner API for this that will require fewer changes to existing UDAFs and will be easier to use for first timers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what i came up with: #9920
Which issue does this PR close?
First step of #8708
Close #9249
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?