-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make FirstValue an UDAF, Change AggregateUDFImpl::accumulator
signature, support ORDER BY for UDAFs
#9874
Make FirstValue an UDAF, Change AggregateUDFImpl::accumulator
signature, support ORDER BY for UDAFs
#9874
Changes from 45 commits
b94f70f
c743d13
3a7e965
4917f56
c9e8641
3a5f0d1
a3ea00a
f349f21
6fcdaac
c3512a6
092d46e
8592e6b
0f8fc24
3185f9f
3ecc772
faadc63
7e33910
cfffcbf
6aaa15c
b74b7d2
263e6cb
0a77e4f
7b26377
4bfd91d
7f54141
6339535
33ae6ee
b4eb865
d8ab6c5
a3bff42
53465fd
cc21496
b62544f
ddfabad
4b809b0
2534727
17378dd
dd1c4ba
5d5d310
23f20f9
b2ba8c3
75aa2fe
5b9625f
d5c3f6f
dc9549a
7ce3d41
49b4a76
d70cce5
29c4018
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -69,11 +69,14 @@ use datafusion_common::{ | |
OwnedTableReference, SchemaReference, | ||
}; | ||
use datafusion_execution::registry::SerializerRegistry; | ||
use datafusion_expr::type_coercion::aggregates::NUMERICS; | ||
use datafusion_expr::{create_first_value, Signature, Volatility}; | ||
use datafusion_expr::{ | ||
logical_plan::{DdlStatement, Statement}, | ||
var_provider::is_system_variables, | ||
Expr, StringifiedPlan, UserDefinedLogicalNode, WindowUDF, | ||
}; | ||
use datafusion_physical_expr::create_first_value_accumulator; | ||
use datafusion_sql::{ | ||
parser::{CopyToSource, CopyToStatement, DFParser}, | ||
planner::{object_name_to_table_reference, ContextProvider, ParserOptions, SqlToRel}, | ||
|
@@ -82,6 +85,7 @@ use datafusion_sql::{ | |
|
||
use async_trait::async_trait; | ||
use chrono::{DateTime, Utc}; | ||
use log::debug; | ||
use parking_lot::RwLock; | ||
use sqlparser::dialect::dialect_from_str; | ||
use url::Url; | ||
|
@@ -1457,6 +1461,22 @@ impl SessionState { | |
datafusion_functions_array::register_all(&mut new_self) | ||
.expect("can not register array expressions"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We also panic if register fails here |
||
|
||
let first_value = create_first_value( | ||
"FIRST_VALUE", | ||
Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable), | ||
Arc::new(create_first_value_accumulator), | ||
); | ||
|
||
match new_self.register_udaf(Arc::new(first_value)) { | ||
Ok(Some(existing_udaf)) => { | ||
debug!("Overwrite existing UDF: {}", existing_udaf.name()); | ||
jayzhan211 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
Ok(None) => {} | ||
Err(err) => { | ||
panic!("Failed to register UDF: {}", err); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is a large change to change this function to
jayzhan211 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
new_self | ||
} | ||
/// Returns new [`SessionState`] using the provided | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -21,16 +21,18 @@ use crate::expr::{ | |||||
AggregateFunction, BinaryExpr, Cast, Exists, GroupingSet, InList, InSubquery, | ||||||
Placeholder, ScalarFunction, TryCast, | ||||||
}; | ||||||
use crate::function::PartitionEvaluatorFactory; | ||||||
use crate::function::{ | ||||||
AccumulatorArgs, AccumulatorFactoryFunction, PartitionEvaluatorFactory, | ||||||
}; | ||||||
use crate::udaf::format_state_name; | ||||||
use crate::{ | ||||||
aggregate_function, built_in_function, conditional_expressions::CaseBuilder, | ||||||
logical_plan::Subquery, AccumulatorFactoryFunction, AggregateUDF, | ||||||
BuiltinScalarFunction, Expr, LogicalPlan, Operator, ScalarFunctionImplementation, | ||||||
ScalarUDF, Signature, Volatility, | ||||||
logical_plan::Subquery, AggregateUDF, BuiltinScalarFunction, Expr, LogicalPlan, | ||||||
Operator, ScalarFunctionImplementation, ScalarUDF, Signature, Volatility, | ||||||
}; | ||||||
use crate::{AggregateUDFImpl, ColumnarValue, ScalarUDFImpl, WindowUDF, WindowUDFImpl}; | ||||||
use arrow::datatypes::DataType; | ||||||
use datafusion_common::{Column, Result}; | ||||||
use arrow::datatypes::{DataType, Field}; | ||||||
use datafusion_common::{internal_err, Column, Result}; | ||||||
use std::any::Any; | ||||||
use std::fmt::Debug; | ||||||
use std::ops::Not; | ||||||
|
@@ -719,6 +721,16 @@ pub fn create_udaf( | |||||
)) | ||||||
} | ||||||
|
||||||
/// Creates a new UDAF with a specific signature, state type and return type. | ||||||
/// The signature and state type must match the `Accumulator's implementation`. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might make sense to add a comment explaining this is a temporary solution (i.e. that the idea is we'll pull the function out into its own crate, but for now we need to keep the physical implementation separate |
||||||
pub fn create_first_value( | ||||||
name: &str, | ||||||
signature: Signature, | ||||||
accumulator: AccumulatorFactoryFunction, | ||||||
) -> AggregateUDF { | ||||||
AggregateUDF::from(FirstValue::new(name, signature, accumulator)) | ||||||
} | ||||||
|
||||||
/// Implements [`AggregateUDFImpl`] for functions that have a single signature and | ||||||
/// return type. | ||||||
pub struct SimpleAggregateUDF { | ||||||
|
@@ -796,15 +808,94 @@ impl AggregateUDFImpl for SimpleAggregateUDF { | |||||
Ok(self.return_type.clone()) | ||||||
} | ||||||
|
||||||
fn accumulator(&self, arg: &DataType) -> Result<Box<dyn crate::Accumulator>> { | ||||||
(self.accumulator)(arg) | ||||||
fn accumulator( | ||||||
&self, | ||||||
acc_args: AccumulatorArgs, | ||||||
) -> Result<Box<dyn crate::Accumulator>> { | ||||||
(self.accumulator)(acc_args) | ||||||
} | ||||||
|
||||||
fn state_type(&self, _return_type: &DataType) -> Result<Vec<DataType>> { | ||||||
Ok(self.state_type.clone()) | ||||||
} | ||||||
} | ||||||
|
||||||
pub struct FirstValue { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Very nice! I think we should put this code in a new crate (maybe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had tried moving it to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see -- it seems like the issue is that the accumulator implementation requires PhysicalSortExpr. To pull the code into its own crate maybe we could pull out the relevant pieces of |
||||||
name: String, | ||||||
signature: Signature, | ||||||
accumulator: AccumulatorFactoryFunction, | ||||||
} | ||||||
|
||||||
impl Debug for FirstValue { | ||||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { | ||||||
f.debug_struct("FirstValue") | ||||||
.field("name", &self.name) | ||||||
.field("signature", &self.signature) | ||||||
.field("fun", &"<FUNC>") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
.finish() | ||||||
} | ||||||
} | ||||||
|
||||||
impl FirstValue { | ||||||
pub fn new( | ||||||
name: impl Into<String>, | ||||||
signature: Signature, | ||||||
accumulator: AccumulatorFactoryFunction, | ||||||
) -> Self { | ||||||
let name = name.into(); | ||||||
Self { | ||||||
name, | ||||||
signature, | ||||||
accumulator, | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
impl AggregateUDFImpl for FirstValue { | ||||||
fn as_any(&self) -> &dyn Any { | ||||||
self | ||||||
} | ||||||
|
||||||
fn name(&self) -> &str { | ||||||
&self.name | ||||||
} | ||||||
|
||||||
fn signature(&self) -> &Signature { | ||||||
&self.signature | ||||||
} | ||||||
|
||||||
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> { | ||||||
Ok(arg_types[0].clone()) | ||||||
} | ||||||
|
||||||
fn accumulator( | ||||||
&self, | ||||||
acc_args: AccumulatorArgs, | ||||||
) -> Result<Box<dyn crate::Accumulator>> { | ||||||
(self.accumulator)(acc_args) | ||||||
} | ||||||
|
||||||
fn state_type(&self, _return_type: &DataType) -> Result<Vec<DataType>> { | ||||||
internal_err!("FirstValue does not have a state type") | ||||||
} | ||||||
|
||||||
fn state_fields( | ||||||
&self, | ||||||
name: &str, | ||||||
value_type: DataType, | ||||||
ordering_fields: Vec<Field>, | ||||||
) -> Result<Vec<Field>> { | ||||||
let mut fields = vec![Field::new( | ||||||
format_state_name(name, "first_value"), | ||||||
value_type, | ||||||
true, | ||||||
)]; | ||||||
fields.extend(ordering_fields); | ||||||
fields.push(Field::new("is_set", DataType::Boolean, true)); | ||||||
Ok(fields) | ||||||
} | ||||||
} | ||||||
|
||||||
/// Creates a new UDWF with a specific signature, state type and return type. | ||||||
/// | ||||||
/// The signature and state type must match the [`PartitionEvaluator`]'s implementation`. | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about the impact on this API for UDAF writers last night.
Specifically, about the many existing UDAFs that exist / will exist at the time this change gets released and on the first time people encounter / try to use this API. i think the args with datatypes is much easier to use (and has less mental gymnastics to use). Thus I am going to propose an easier / beginner API for this that will require fewer changes to existing UDAFs and will be easier to use for first timers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what i came up with: #9920