-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle ordering of first last aggregation inside aggregator #8662
Changes from 6 commits
edcef77
da1cf71
202936d
05bdc81
416ac3a
06adf25
e208ebf
0ece593
298fcf0
64666df
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,7 @@ | |
use std::any::Any; | ||
use std::sync::Arc; | ||
|
||
use crate::aggregate::utils::{down_cast_any_ref, ordering_fields}; | ||
use crate::aggregate::utils::{down_cast_any_ref, get_sort_options, ordering_fields}; | ||
use crate::expressions::format_state_name; | ||
use crate::{ | ||
reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr, | ||
|
@@ -29,7 +29,6 @@ use crate::{ | |
use arrow::array::{Array, ArrayRef, AsArray, BooleanArray}; | ||
use arrow::compute::{self, lexsort_to_indices, SortColumn}; | ||
use arrow::datatypes::{DataType, Field}; | ||
use arrow_schema::SortOptions; | ||
use datafusion_common::utils::{compare_rows, get_arrayref_at_indices, get_row_at_idx}; | ||
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue}; | ||
use datafusion_expr::Accumulator; | ||
|
@@ -211,10 +210,47 @@ impl FirstValueAccumulator { | |
} | ||
|
||
// Updates state with the values in the given row. | ||
fn update_with_new_row(&mut self, row: &[ScalarValue]) { | ||
self.first = row[0].clone(); | ||
self.orderings = row[1..].to_vec(); | ||
self.is_set = true; | ||
fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> { | ||
let value = &row[0]; | ||
let orderings = &row[1..]; | ||
// Update when | ||
// - no entry in the state | ||
// - There is an earlier entry in according to requirements | ||
if !self.is_set | ||
|| compare_rows( | ||
&self.orderings, | ||
orderings, | ||
&get_sort_options(&self.ordering_req), | ||
)? | ||
.is_gt() | ||
{ | ||
self.first = value.clone(); | ||
self.orderings = orderings.to_vec(); | ||
self.is_set = true; | ||
} | ||
Ok(()) | ||
} | ||
|
||
fn get_first_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> { | ||
let value = &values[0]; | ||
let ordering_values = &values[1..]; | ||
assert_eq!(ordering_values.len(), self.ordering_req.len()); | ||
if self.ordering_req.is_empty() { | ||
return Ok((!value.is_empty()).then_some(0)); | ||
} | ||
let sort_columns = ordering_values | ||
.iter() | ||
.zip(self.ordering_req.iter()) | ||
.map(|(values, req)| SortColumn { | ||
values: values.clone(), | ||
options: Some(req.options), | ||
}) | ||
.collect::<Vec<_>>(); | ||
let indices = lexsort_to_indices(&sort_columns, Some(1))?; | ||
if !indices.is_empty() { | ||
return Ok(Some(indices.value(0) as usize)); | ||
} | ||
Ok(None) | ||
} | ||
} | ||
|
||
|
@@ -227,11 +263,9 @@ impl Accumulator for FirstValueAccumulator { | |
} | ||
|
||
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { | ||
// If we have seen first value, we shouldn't update it | ||
if !values[0].is_empty() && !self.is_set { | ||
let row = get_row_at_idx(values, 0)?; | ||
// Update with first value in the array. | ||
self.update_with_new_row(&row); | ||
if let Some(first_idx) = self.get_first_idx(values)? { | ||
let row = get_row_at_idx(values, first_idx)?; | ||
self.update_with_new_row(&row)?; | ||
} | ||
Ok(()) | ||
} | ||
|
@@ -265,7 +299,7 @@ impl Accumulator for FirstValueAccumulator { | |
// Update with first value in the state. Note that we should exclude the | ||
// is_set flag from the state. Otherwise, we will end up with a state | ||
// containing two is_set flags. | ||
self.update_with_new_row(&first_row[0..is_set_idx]); | ||
self.update_with_new_row(&first_row[0..is_set_idx])?; | ||
} | ||
} | ||
Ok(()) | ||
|
@@ -459,10 +493,52 @@ impl LastValueAccumulator { | |
} | ||
|
||
// Updates state with the values in the given row. | ||
fn update_with_new_row(&mut self, row: &[ScalarValue]) { | ||
self.last = row[0].clone(); | ||
self.orderings = row[1..].to_vec(); | ||
self.is_set = true; | ||
fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> { | ||
let value = &row[0]; | ||
let orderings = &row[1..]; | ||
// Update when | ||
// - no value in the state | ||
// - There is no specific requirement, but a new value (most recent entry in terms of execution) | ||
// - There is a more recent entry in terms of requirement | ||
if !self.is_set | ||
|| self.orderings.is_empty() | ||
|| compare_rows( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm sure you are aware but https://docs.rs/arrow-row/latest/arrow_row/ will be a much faster way to perform row-based comparisons than relying on ScalarValue There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed, however, here we are checking just a single row (row that have lowest value). Hence I don't think it is worth to conversion here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Update: Row format may well be a good idea (not for this PR). I will wait until I have reviewed this code to offer a more informed opinion There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I re-reviewed and I agree that the |
||
&self.orderings, | ||
orderings, | ||
&get_sort_options(&self.ordering_req), | ||
)? | ||
.is_lt() | ||
{ | ||
self.last = value.clone(); | ||
self.orderings = orderings.to_vec(); | ||
self.is_set = true; | ||
} | ||
Ok(()) | ||
} | ||
|
||
fn get_last_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> { | ||
let value = &values[0]; | ||
let ordering_values = &values[1..]; | ||
assert_eq!(ordering_values.len(), self.ordering_req.len()); | ||
if self.ordering_req.is_empty() { | ||
return Ok((!value.is_empty()).then_some(value.len() - 1)); | ||
} | ||
let sort_columns = ordering_values | ||
.iter() | ||
.zip(self.ordering_req.iter()) | ||
.map(|(values, req)| { | ||
// Take reverse ordering requirement this would enable us to use fetch=1 for last value. | ||
SortColumn { | ||
values: values.clone(), | ||
options: Some(!req.options), | ||
} | ||
}) | ||
.collect::<Vec<_>>(); | ||
let indices = lexsort_to_indices(&sort_columns, Some(1))?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If there is a min max alternative to this we can use that one also. However, as far as I know there is no util for this support. Maybe @tustvold can answer this, if he is familiar with. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not aware of a min/max kernel that returns the ordinal position of the min/max There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW I had the same basic need (find the position of min/max so I could find a value in a corresponding column) while implementing our special There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think you implementation is more efficient? If that is the case, maybe we can use that code instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think our implementation is (slightly) more efficient, but it is less general (only works for timestamp columns). You can see the basic idea here https://github.com/influxdata/influxdb/blob/main/query_functions/src/selectors.rs And the comparision is here: https://github.com/influxdata/influxdb/blob/acfef87659c9a8c4c49e4628264369569e04cad1/query_functions/src/selectors/internal.rs#L119-L127 I think we should stay with the |
||
if !indices.is_empty() { | ||
return Ok(Some(indices.value(0) as usize)); | ||
} | ||
Ok(None) | ||
} | ||
} | ||
|
||
|
@@ -475,10 +551,9 @@ impl Accumulator for LastValueAccumulator { | |
} | ||
|
||
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { | ||
if !values[0].is_empty() { | ||
let row = get_row_at_idx(values, values[0].len() - 1)?; | ||
// Update with last value in the array. | ||
self.update_with_new_row(&row); | ||
if let Some(last_idx) = self.get_last_idx(values)? { | ||
let row = get_row_at_idx(values, last_idx)?; | ||
self.update_with_new_row(&row)?; | ||
} | ||
Ok(()) | ||
} | ||
|
@@ -515,7 +590,7 @@ impl Accumulator for LastValueAccumulator { | |
// Update with last value in the state. Note that we should exclude the | ||
// is_set flag from the state. Otherwise, we will end up with a state | ||
// containing two is_set flags. | ||
self.update_with_new_row(&last_row[0..is_set_idx]); | ||
self.update_with_new_row(&last_row[0..is_set_idx])?; | ||
} | ||
} | ||
Ok(()) | ||
|
@@ -559,14 +634,6 @@ fn convert_to_sort_cols( | |
.collect::<Vec<_>>() | ||
} | ||
|
||
/// Selects the sort option attribute from all the given `PhysicalSortExpr`s. | ||
fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec<SortOptions> { | ||
ordering_req | ||
.iter() | ||
.map(|item| item.options) | ||
.collect::<Vec<_>>() | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use crate::aggregate::first_last::{FirstValueAccumulator, LastValueAccumulator}; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,7 @@ | |
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use crate::expressions::{FirstValue, LastValue, OrderSensitiveArrayAgg}; | ||
use crate::expressions::OrderSensitiveArrayAgg; | ||
use crate::{PhysicalExpr, PhysicalSortExpr}; | ||
use arrow::datatypes::Field; | ||
use datafusion_common::{not_impl_err, DataFusionError, Result}; | ||
|
@@ -134,10 +134,7 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> { | |
|
||
/// Checks whether the given aggregate expression is order-sensitive. | ||
/// For instance, a `SUM` aggregation doesn't depend on the order of its inputs. | ||
/// However, a `FirstValue` depends on the input ordering (if the order changes, | ||
/// the first value in the list would change). | ||
/// However, a `ARRAY_AGG` with `ORDER BY` depends on the input ordering. | ||
pub fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool { | ||
aggr_expr.as_any().is::<FirstValue>() | ||
|| aggr_expr.as_any().is::<LastValue>() | ||
|| aggr_expr.as_any().is::<OrderSensitiveArrayAgg>() | ||
aggr_expr.as_any().is::<OrderSensitiveArrayAgg>() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Eventually this would be a nice thing to move into the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like this idea 👍 |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory, we may be able to use a
Option<ScalarValue>
instead ofScalarValue
andis_set
flag, but I don't think it matters for performance and this PR follows the existing implementation as well 👍