Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle ordering of first last aggregation inside aggregator #8662

Merged
merged 10 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ pub struct ParquetMetadataFunc {}

impl TableFunctionImpl for ParquetMetadataFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let filename = match exprs.get(0) {
let filename = match exprs.first() {
Some(Expr::Literal(ScalarValue::Utf8(Some(s)))) => s, // single quote: parquet_metadata('x.parquet')
Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet")
_ => {
Expand Down
1 change: 0 additions & 1 deletion datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,6 @@ macro_rules! arrow_err {

// To avoid compiler error when using macro in the same crate:
// macros from the current crate cannot be referred to by absolute paths
pub use exec_err as _exec_err;
pub use internal_datafusion_err as _internal_datafusion_err;
pub use internal_err as _internal_err;
pub use not_impl_err as _not_impl_err;
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,10 @@ fn update_join_on(
proj_right_exprs: &[(Column, String)],
hash_join_on: &[(Column, Column)],
) -> Option<Vec<(Column, Column)>> {
// TODO: Clippy wants the "map" call removed, but doing so generates
// a compilation error. Remove the clippy directive once this
// issue is fixed.
#[allow(clippy::map_identity)]
let (left_idx, right_idx): (Vec<_>, Vec<_>) = hash_join_on
.iter()
.map(|(left, right)| (left, right))
Expand Down
4 changes: 4 additions & 0 deletions datafusion/optimizer/src/simplify_expressions/guarantees.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ impl<'a> GuaranteeRewriter<'a> {
guarantees: impl IntoIterator<Item = &'a (Expr, NullableInterval)>,
) -> Self {
Self {
// TODO: Clippy wants the "map" call removed, but doing so generates
// a compilation error. Remove the clippy directive once this
// issue is fixed.
#[allow(clippy::map_identity)]
guarantees: guarantees.into_iter().map(|(k, v)| (k, v)).collect(),
}
}
Expand Down
131 changes: 98 additions & 33 deletions datafusion/physical-expr/src/aggregate/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::any::Any;
use std::sync::Arc;

use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
use crate::aggregate::utils::{down_cast_any_ref, get_sort_options, ordering_fields};
use crate::expressions::format_state_name;
use crate::{
reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr,
Expand All @@ -29,9 +29,10 @@ use crate::{
use arrow::array::{Array, ArrayRef, AsArray, BooleanArray};
use arrow::compute::{self, lexsort_to_indices, SortColumn};
use arrow::datatypes::{DataType, Field};
use arrow_schema::SortOptions;
use datafusion_common::utils::{compare_rows, get_arrayref_at_indices, get_row_at_idx};
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue};
use datafusion_common::{
arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::Accumulator;

/// FIRST_VALUE aggregate expression
Expand Down Expand Up @@ -211,10 +212,45 @@ impl FirstValueAccumulator {
}

// Updates state with the values in the given row.
fn update_with_new_row(&mut self, row: &[ScalarValue]) {
self.first = row[0].clone();
self.orderings = row[1..].to_vec();
self.is_set = true;
fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
let [value, orderings @ ..] = row else {
return internal_err!("Empty row in FIRST_VALUE");
};
// Update when there is no entry in the state, or we have an "earlier"
// entry according to sort requirements.
if !self.is_set
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, we may be able to use a Option<ScalarValue> instead of ScalarValue and is_set flag, but I don't think it matters for performance and this PR follows the existing implementation as well 👍

|| compare_rows(
&self.orderings,
orderings,
&get_sort_options(&self.ordering_req),
)?
.is_gt()
{
self.first = value.clone();
self.orderings = orderings.to_vec();
self.is_set = true;
}
Ok(())
}

fn get_first_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
let [value, ordering_values @ ..] = values else {
return internal_err!("Empty row in FIRST_VALUE");
};
if self.ordering_req.is_empty() {
// Get first entry according to receive order (0th index)
return Ok((!value.is_empty()).then_some(0));
}
let sort_columns = ordering_values
.iter()
.zip(self.ordering_req.iter())
.map(|(values, req)| SortColumn {
values: values.clone(),
options: Some(req.options),
})
.collect::<Vec<_>>();
let indices = lexsort_to_indices(&sort_columns, Some(1))?;
Ok((!indices.is_empty()).then_some(indices.value(0) as _))
}
}

Expand All @@ -227,11 +263,9 @@ impl Accumulator for FirstValueAccumulator {
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
// If we have seen first value, we shouldn't update it
if !values[0].is_empty() && !self.is_set {
let row = get_row_at_idx(values, 0)?;
// Update with first value in the array.
self.update_with_new_row(&row);
if let Some(first_idx) = self.get_first_idx(values)? {
let row = get_row_at_idx(values, first_idx)?;
self.update_with_new_row(&row)?;
}
Ok(())
}
Expand Down Expand Up @@ -265,7 +299,7 @@ impl Accumulator for FirstValueAccumulator {
// Update with first value in the state. Note that we should exclude the
// is_set flag from the state. Otherwise, we will end up with a state
// containing two is_set flags.
self.update_with_new_row(&first_row[0..is_set_idx]);
self.update_with_new_row(&first_row[0..is_set_idx])?;
}
}
Ok(())
Expand Down Expand Up @@ -459,10 +493,50 @@ impl LastValueAccumulator {
}

// Updates state with the values in the given row.
fn update_with_new_row(&mut self, row: &[ScalarValue]) {
self.last = row[0].clone();
self.orderings = row[1..].to_vec();
self.is_set = true;
fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
let [value, orderings @ ..] = row else {
return internal_err!("Empty row in LAST_VALUE");
};
// Update when there is no entry in the state, or we have a "later"
// entry (either according to sort requirements or the order of execution).
if !self.is_set
|| self.orderings.is_empty()
|| compare_rows(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure you are aware but https://docs.rs/arrow-row/latest/arrow_row/ will be a much faster way to perform row-based comparisons than relying on ScalarValue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, however, here we are checking just a single row (row that have lowest value). Hence I don't think it is worth to conversion here.

Copy link
Contributor

@alamb alamb Dec 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that since it is a single column max comparison this is probably fine (and no worse than the current implementation). If we need to optimize performance we could probably implement specialized implementations (like FirstValue<ArrowPrimitiveType> and skip the copying entirely.

That is likely a premature optimization at this point

Update: Row format may well be a good idea (not for this PR). I will wait until I have reviewed this code to offer a more informed opinion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-reviewed and I agree that the RowFormat is not needed here (and in fact it may actually be slower) because, as @mustafasrepo points out, this code uses ScalarValue to compare a single row per batch (it finds the largest/smallest row per batch using lexsort_to_indices). We would have to benchmark to be sure.

&self.orderings,
orderings,
&get_sort_options(&self.ordering_req),
)?
.is_lt()
{
self.last = value.clone();
self.orderings = orderings.to_vec();
self.is_set = true;
}
Ok(())
}

fn get_last_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
let [value, ordering_values @ ..] = values else {
return internal_err!("Empty row in LAST_VALUE");
};
if self.ordering_req.is_empty() {
// Get last entry according to the order of data:
return Ok((!value.is_empty()).then_some(value.len() - 1));
}
let sort_columns = ordering_values
.iter()
.zip(self.ordering_req.iter())
.map(|(values, req)| {
// Take the reverse ordering requirement. This enables us to
// use "fetch = 1" to get the last value.
SortColumn {
values: values.clone(),
options: Some(!req.options),
}
})
.collect::<Vec<_>>();
let indices = lexsort_to_indices(&sort_columns, Some(1))?;
Copy link
Contributor Author

@mustafasrepo mustafasrepo Dec 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a min max alternative to this we can use that one also. However, as far as I know there is no util for this support. Maybe @tustvold can answer this, if he is familiar with.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not aware of a min/max kernel that returns the ordinal position of the min/max

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW I had the same basic need (find the position of min/max so I could find a value in a corresponding column) while implementing our special selector_first, selector_last, etc functions in InfluxDB 3.0 (I also had to code them specially)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think you implementation is more efficient? If that is the case, maybe we can use that code instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think our implementation is (slightly) more efficient, but it is less general (only works for timestamp columns). You can see the basic idea here

https://github.com/influxdata/influxdb/blob/main/query_functions/src/selectors.rs

And the comparision is here: https://github.com/influxdata/influxdb/blob/acfef87659c9a8c4c49e4628264369569e04cad1/query_functions/src/selectors/internal.rs#L119-L127

I think we should stay with the ScalarValue implementation unless we find some query where this calculation is taking most of the time

Ok((!indices.is_empty()).then_some(indices.value(0) as _))
}
}

Expand All @@ -475,10 +549,9 @@ impl Accumulator for LastValueAccumulator {
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if !values[0].is_empty() {
let row = get_row_at_idx(values, values[0].len() - 1)?;
// Update with last value in the array.
self.update_with_new_row(&row);
if let Some(last_idx) = self.get_last_idx(values)? {
let row = get_row_at_idx(values, last_idx)?;
self.update_with_new_row(&row)?;
}
Ok(())
}
Expand Down Expand Up @@ -515,7 +588,7 @@ impl Accumulator for LastValueAccumulator {
// Update with last value in the state. Note that we should exclude the
// is_set flag from the state. Otherwise, we will end up with a state
// containing two is_set flags.
self.update_with_new_row(&last_row[0..is_set_idx]);
self.update_with_new_row(&last_row[0..is_set_idx])?;
}
}
Ok(())
Expand Down Expand Up @@ -559,26 +632,18 @@ fn convert_to_sort_cols(
.collect::<Vec<_>>()
}

/// Selects the sort option attribute from all the given `PhysicalSortExpr`s.
fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec<SortOptions> {
ordering_req
.iter()
.map(|item| item.options)
.collect::<Vec<_>>()
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use crate::aggregate::first_last::{FirstValueAccumulator, LastValueAccumulator};

use arrow::compute::concat;
use arrow_array::{ArrayRef, Int64Array};
use arrow_schema::DataType;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::Accumulator;

use arrow::compute::concat;
use std::sync::Arc;

#[test]
fn test_first_last_value_value() -> Result<()> {
let mut first_accumulator =
Expand Down
30 changes: 15 additions & 15 deletions datafusion/physical-expr/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,20 @@
// specific language governing permissions and limitations
// under the License.

use crate::expressions::{FirstValue, LastValue, OrderSensitiveArrayAgg};
use crate::{PhysicalExpr, PhysicalSortExpr};
use arrow::datatypes::Field;
use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_expr::Accumulator;
use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;

use self::groups_accumulator::GroupsAccumulator;
use crate::expressions::OrderSensitiveArrayAgg;
use crate::{PhysicalExpr, PhysicalSortExpr};

use arrow::datatypes::Field;
use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_expr::Accumulator;

mod hyperloglog;
mod tdigest;

pub(crate) mod approx_distinct;
pub(crate) mod approx_median;
Expand All @@ -46,19 +50,18 @@ pub(crate) mod median;
pub(crate) mod string_agg;
#[macro_use]
pub(crate) mod min_max;
pub mod build_in;
pub(crate) mod groups_accumulator;
mod hyperloglog;
pub mod moving_min_max;
pub(crate) mod regr;
pub(crate) mod stats;
pub(crate) mod stddev;
pub(crate) mod sum;
pub(crate) mod sum_distinct;
mod tdigest;
pub mod utils;
pub(crate) mod variance;

pub mod build_in;
pub mod moving_min_max;
pub mod utils;

/// An aggregate expression that:
/// * knows its resulting field
/// * knows how to create its accumulator
Expand Down Expand Up @@ -134,10 +137,7 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {

/// Checks whether the given aggregate expression is order-sensitive.
/// For instance, a `SUM` aggregation doesn't depend on the order of its inputs.
/// However, a `FirstValue` depends on the input ordering (if the order changes,
/// the first value in the list would change).
/// However, an `ARRAY_AGG` with `ORDER BY` depends on the input ordering.
pub fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
aggr_expr.as_any().is::<FirstValue>()
|| aggr_expr.as_any().is::<LastValue>()
|| aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually this would be a nice thing to move into the AggregateExpr trait directly so we could override it and avoid special casing built in functions. Not for this PR though :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea 👍

}
18 changes: 12 additions & 6 deletions datafusion/physical-expr/src/aggregate/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@

//! Utilities used in aggregates

use std::any::Any;
use std::sync::Arc;

use crate::{AggregateExpr, PhysicalSortExpr};
use arrow::array::ArrayRef;

use arrow::array::{ArrayRef, ArrowNativeTypeOp};
use arrow_array::cast::AsArray;
use arrow_array::types::{
Decimal128Type, DecimalType, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
};
use arrow_array::ArrowNativeTypeOp;
use arrow_buffer::ArrowNativeType;
use arrow_schema::{DataType, Field};
use arrow_schema::{DataType, Field, SortOptions};
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::Accumulator;
use std::any::Any;
use std::sync::Arc;

/// Convert scalar values from an accumulator into arrays.
pub fn get_accum_scalar_values_as_arrays(
Expand All @@ -40,7 +41,7 @@ pub fn get_accum_scalar_values_as_arrays(
.state()?
.iter()
.map(|s| s.to_array_of_size(1))
.collect::<Result<Vec<_>>>()
.collect()
}

/// Computes averages for `Decimal128`/`Decimal256` values, checking for overflow
Expand Down Expand Up @@ -205,3 +206,8 @@ pub(crate) fn ordering_fields(
})
.collect()
}

/// Selects the sort option attribute from all the given `PhysicalSortExpr`s.
pub fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec<SortOptions> {
ordering_req.iter().map(|item| item.options).collect()
}
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2444,7 +2444,7 @@ pub fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
let last_offset: OffsetSize = offsets.last().copied().unwrap();
offsets.push(last_offset + OffsetSize::usize_as(rows.len()));
let arrays = converter.convert_rows(rows)?;
let array = match arrays.get(0) {
let array = match arrays.first() {
Some(array) => array.clone(),
None => {
return internal_err!("array_distinct: failed to get array from rows")
Expand Down
Loading