-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement IGNORE NULLS for FIRST_VALUE #9411
Conversation
Ok((!indices.is_empty()).then_some(indices.value(0) as _)) | ||
|
||
if self.is_ignore_null { | ||
let indices = lexsort_to_indices(&sort_columns, Some(value.len()))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead you can use
let indices = lexsort_to_indices(&sort_columns, Some(value.len()))?; | |
let indices = lexsort_to_indices(&sort_columns, None)?; |
to be more explicit about we are sorting whole array
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks!
) | ||
} | ||
|
||
pub fn with_ignore_null( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of this API, maybe we can use builder API as below
pub fn with_ignore_null(mut self, ignore_null: bool) -> Self{
self.ignore_null = ignore_null;
self
}
However, this purely stylistic. Feel free to proceed as you wish
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed. Thanks for the suggestion!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some minor comments. Other than these this PR is LGTM!. Thanks @huaxingao for this PR.
@@ -213,6 +235,7 @@ struct FirstValueAccumulator { | |||
ordering_req: LexOrdering, | |||
// Stores whether incoming data already satisfies the ordering requirement. | |||
requirement_satisfied: bool, | |||
is_ignore_null: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be good to add a comment like above parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added. Thanks
// If ignoring nulls, find the first non-null value. | ||
for i in 0..value.len() { | ||
if !value.is_null(i) { | ||
return Ok((!value.is_empty()).then_some(i)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once you enter this loop, I think value
is not empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks
// If ignoring nulls, find the first non-null value. | ||
for index in indices.iter().flatten() { | ||
if !value.is_null(index as usize) { | ||
return Ok((!value.is_empty()).then_some(index as usize)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I saw you implement it in physical expression FirstValue
. But do you add ignore null
parameter to logical one and pass it through logical to physical? I.e., does a query with first value + ignore null work now? Can you add an e2e test to sqllogictests?
@@ -44,6 +44,7 @@ pub struct FirstValue { | |||
expr: Arc<dyn PhysicalExpr>, | |||
ordering_req: LexOrdering, | |||
requirement_satisfied: bool, | |||
is_ignore_null: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can use ignore_nulls: bool
, it is consistent with SQL keywords and also window functions that in parallel implementing IGNORE NULLS .
Would be also nice to have a comments in struct about this field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @huaxingao
Would be that possible to add tests to aggregate.slt
?
It should start with logical plan, from #9221 the null treatment is allowed for window functions now https://github.com/apache/arrow-datafusion/blob/main/datafusion/sql/src/expr/function.rs#L61 For agg I suppose we need to do the same |
9cc5ce3
to
366e7ef
Compare
3e84596
to
b3f8952
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the contribution @huaxingao -- this looks good to me. I had some style suggestions but nothing that would prevent the PR from merging in my mind
@@ -246,6 +246,7 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> { | |||
args, | |||
filter, | |||
order_by, | |||
.. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As as style thing I have found it helpful to not use a ..
match all, and instead explicitly ignore each field like
.. | |
null_treatment: _, |
With this treatment, if/when someone adds a new field to AggregateFunction
in the future, the compiler will tell them all the places that may need to be changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks for the suggestion!
@@ -44,6 +44,7 @@ pub struct FirstValue { | |||
expr: Arc<dyn PhysicalExpr>, | |||
ordering_req: LexOrdering, | |||
requirement_satisfied: bool, | |||
ignore_null: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a very minor nit is that some parts of this PR use ignore_null
and some parts use ignore_nulls
(with an s
). It might be nice to make them uniform
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have change all of them to ignore_nulls
. Thanks for catching this!
@@ -672,7 +672,7 @@ pub fn to_substrait_agg_measure( | |||
), | |||
) -> Result<Measure> { | |||
match expr { | |||
Expr::AggregateFunction(expr::AggregateFunction { func_def, args, distinct, filter, order_by }) => { | |||
Expr::AggregateFunction(expr::AggregateFunction { func_def, args, distinct, filter, order_by, .. }) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment here about matching all vs explicit field matches
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks!
@@ -544,6 +544,7 @@ pub struct AggregateFunction { | |||
pub filter: Option<Box<Expr>>, | |||
/// Optional ordering | |||
pub order_by: Option<Vec<Expr>>, | |||
pub null_treatment: Option<NullTreatment>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment as other parameters did.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added. Thanks
datafusion/expr/src/expr.rs
Outdated
.. | ||
}) => { | ||
fmt_function(f, func_def.name(), *distinct, args, true)?; | ||
if let Some(nt) = null_treatment { | ||
write!(f, "{}", nt)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
write!(f, "{}", nt)?; | |
write!(f, " {}", nt)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed. Thanks
datafusion/expr/src/expr.rs
Outdated
@@ -1823,6 +1832,9 @@ fn create_name(e: &Expr) -> Result<String> { | |||
if let Some(order_by) = order_by { | |||
info += &format!(" ORDER BY [{}]", expr_vec_fmt!(order_by)); | |||
}; | |||
if let Some(nt) = null_treatment { | |||
info += &format!("{}", nt); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
info += &format!("{}", nt); | |
info += &format!(" {}", nt); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed. Thanks
} | ||
} | ||
|
||
pub fn ignore_null(mut self, ignore_null: bool) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a builder style API, a common API name pattern should be (e.g., the existing with_requirement_satisfied
):
pub fn ignore_null(mut self, ignore_null: bool) -> Self { | |
pub fn with_ignore_null(mut self, ignore_null: bool) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. I have a few minor comments. Thanks @huaxingao.
Test failed:
|
@@ -3214,7 +3214,7 @@ JOIN sales_global AS e | |||
ON s.currency = e.currency AND | |||
s.ts >= e.ts | |||
GROUP BY s.sn, s.zip_code, s.country, s.ts, s.currency | |||
ORDER BY s.sn | |||
ORDER BY s.sn, s.zip_code |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The expected result is
0 GRC 0 2022-01-01T06:00:00 EUR 30
1 FRA 1 2022-01-01T08:00:00 EUR 50
1 TUR 2 2022-01-01T11:30:00 TRY 75
1 FRA 3 2022-01-02T12:00:00 EUR 200
0 GRC 4 2022-01-03T10:00:00 EUR 80
1 TUR 4 2022-01-03T10:00:00 TRY 100
but my test has
0 GRC 0 2022-01-01T06:00:00 EUR 30
1 FRA 1 2022-01-01T08:00:00 EUR 50
1 TUR 2 2022-01-01T11:30:00 TRY 75
1 FRA 3 2022-01-02T12:00:00 EUR 200
1 TUR 4 2022-01-03T10:00:00 TRY 100
0 GRC 4 2022-01-03T10:00:00 EUR 80
I think the above is also legal. I added one more column in ORDER BY
to make the returned order deterministic.
Closing / reopening to trigger CI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @huaxingao -- I am trying to clear the PR backlog to keep the code in this repo moving. I noticed this PR had conflicts due to #8891, so I took the liberty of merging up from main to resolve the conflicts (and hopefully get this PR ready to merge)
Thanks again for the contribution and I am sorry this one has not yet merged.
Thanks @alamb |
🚀 |
Thank you all very much for helping me with this PR! @alamb @comphead @mustafasrepo @viirya |
Which issue does this PR close?
Closes #.
Related #9055
Rationale for this change
Spark has ignore null in
First_value
, so we want to support this option too.The logic for ignore null in
Last_value
is similar. I will have a separate PR for ignore null inLast_value
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?