Skip to content

Commit e8d9b62

Browse files
authored
Improve performance 10%-100% in FIRST_VALUE / LAST_VALUE by not sort rows in FirstValueAccumulator (#14402)
* Switch to `LexicographicalComparator` * Take `ignore_nulls` out of filter
1 parent b80080e commit e8d9b62

File tree

3 files changed

+73
-56
lines changed

3 files changed

+73
-56
lines changed

datafusion/core/benches/aggregate_query_sql.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,39 @@ fn criterion_benchmark(c: &mut Criterion) {
173173
)
174174
})
175175
});
176+
177+
c.bench_function("first_last_many_columns", |b| {
178+
b.iter(|| {
179+
query(
180+
ctx.clone(),
181+
"SELECT first_value(u64_wide order by f64, u64_narrow, utf8),\
182+
last_value(u64_wide order by f64, u64_narrow, utf8) \
183+
FROM t GROUP BY u64_narrow",
184+
)
185+
})
186+
});
187+
188+
c.bench_function("first_last_ignore_nulls", |b| {
189+
b.iter(|| {
190+
query(
191+
ctx.clone(),
192+
"SELECT first_value(u64_wide ignore nulls order by f64, u64_narrow, utf8), \
193+
last_value(u64_wide ignore nulls order by f64, u64_narrow, utf8) \
194+
FROM t GROUP BY u64_narrow",
195+
)
196+
})
197+
});
198+
199+
c.bench_function("first_last_one_column", |b| {
200+
b.iter(|| {
201+
query(
202+
ctx.clone(),
203+
"SELECT first_value(u64_wide order by f64), \
204+
last_value(u64_wide order by f64) \
205+
FROM t GROUP BY u64_narrow",
206+
)
207+
})
208+
});
176209
}
177210

178211
criterion_group!(benches, criterion_benchmark);

datafusion/functions-aggregate/src/first_last.rs

Lines changed: 39 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::mem::size_of_val;
2323
use std::sync::Arc;
2424

2525
use arrow::array::{ArrayRef, AsArray, BooleanArray};
26-
use arrow::compute::{self, lexsort_to_indices, take_arrays, SortColumn};
26+
use arrow::compute::{self, LexicographicalComparator, SortColumn};
2727
use arrow::datatypes::{DataType, Field};
2828
use datafusion_common::utils::{compare_rows, get_row_at_idx};
2929
use datafusion_common::{
@@ -250,6 +250,7 @@ impl FirstValueAccumulator {
250250
return Ok((!value.is_empty()).then_some(0));
251251
}
252252
}
253+
253254
let sort_columns = ordering_values
254255
.iter()
255256
.zip(self.ordering_req.iter())
@@ -259,19 +260,17 @@ impl FirstValueAccumulator {
259260
})
260261
.collect::<Vec<_>>();
261262

262-
if self.ignore_nulls {
263-
let indices = lexsort_to_indices(&sort_columns, None)?;
264-
// If ignoring nulls, find the first non-null value.
265-
for index in indices.iter().flatten() {
266-
if !value.is_null(index as usize) {
267-
return Ok(Some(index as usize));
268-
}
269-
}
270-
Ok(None)
263+
let comparator = LexicographicalComparator::try_new(&sort_columns)?;
264+
265+
let min_index = if self.ignore_nulls {
266+
(0..value.len())
267+
.filter(|&index| !value.is_null(index))
268+
.min_by(|&a, &b| comparator.compare(a, b))
271269
} else {
272-
let indices = lexsort_to_indices(&sort_columns, Some(1))?;
273-
Ok((!indices.is_empty()).then_some(indices.value(0) as _))
274-
}
270+
(0..value.len()).min_by(|&a, &b| comparator.compare(a, b))
271+
};
272+
273+
Ok(min_index)
275274
}
276275
}
277276

@@ -312,22 +311,19 @@ impl Accumulator for FirstValueAccumulator {
312311
// last index contains is_set flag.
313312
let is_set_idx = states.len() - 1;
314313
let flags = states[is_set_idx].as_boolean();
315-
let filtered_states = filter_states_according_to_is_set(states, flags)?;
314+
let filtered_states =
315+
filter_states_according_to_is_set(&states[0..is_set_idx], flags)?;
316316
// 1..is_set_idx range corresponds to ordering section
317-
let sort_cols = convert_to_sort_cols(
317+
let sort_columns = convert_to_sort_cols(
318318
&filtered_states[1..is_set_idx],
319319
self.ordering_req.as_ref(),
320320
);
321321

322-
let ordered_states = if sort_cols.is_empty() {
323-
// When no ordering is given, use the existing state as is:
324-
filtered_states
325-
} else {
326-
let indices = lexsort_to_indices(&sort_cols, None)?;
327-
take_arrays(&filtered_states, &indices, None)?
328-
};
329-
if !ordered_states[0].is_empty() {
330-
let first_row = get_row_at_idx(&ordered_states, 0)?;
322+
let comparator = LexicographicalComparator::try_new(&sort_columns)?;
323+
let min = (0..filtered_states[0].len()).min_by(|&a, &b| comparator.compare(a, b));
324+
325+
if let Some(first_idx) = min {
326+
let first_row = get_row_at_idx(&filtered_states, first_idx)?;
331327
// When collecting orderings, we exclude the is_set flag from the state.
332328
let first_ordering = &first_row[1..is_set_idx];
333329
let sort_options = get_sort_options(self.ordering_req.as_ref());
@@ -559,29 +555,22 @@ impl LastValueAccumulator {
559555
let sort_columns = ordering_values
560556
.iter()
561557
.zip(self.ordering_req.iter())
562-
.map(|(values, req)| {
563-
// Take the reverse ordering requirement. This enables us to
564-
// use "fetch = 1" to get the last value.
565-
SortColumn {
566-
values: Arc::clone(values),
567-
options: Some(!req.options),
568-
}
558+
.map(|(values, req)| SortColumn {
559+
values: Arc::clone(values),
560+
options: Some(req.options),
569561
})
570562
.collect::<Vec<_>>();
571563

572-
if self.ignore_nulls {
573-
let indices = lexsort_to_indices(&sort_columns, None)?;
574-
// If ignoring nulls, find the last non-null value.
575-
for index in indices.iter().flatten() {
576-
if !value.is_null(index as usize) {
577-
return Ok(Some(index as usize));
578-
}
579-
}
580-
Ok(None)
564+
let comparator = LexicographicalComparator::try_new(&sort_columns)?;
565+
let max_ind = if self.ignore_nulls {
566+
(0..value.len())
567+
.filter(|&index| !(value.is_null(index)))
568+
.max_by(|&a, &b| comparator.compare(a, b))
581569
} else {
582-
let indices = lexsort_to_indices(&sort_columns, Some(1))?;
583-
Ok((!indices.is_empty()).then_some(indices.value(0) as _))
584-
}
570+
(0..value.len()).max_by(|&a, &b| comparator.compare(a, b))
571+
};
572+
573+
Ok(max_ind)
585574
}
586575

587576
fn with_requirement_satisfied(mut self, requirement_satisfied: bool) -> Self {
@@ -627,24 +616,19 @@ impl Accumulator for LastValueAccumulator {
627616
// last index contains is_set flag.
628617
let is_set_idx = states.len() - 1;
629618
let flags = states[is_set_idx].as_boolean();
630-
let filtered_states = filter_states_according_to_is_set(states, flags)?;
619+
let filtered_states =
620+
filter_states_according_to_is_set(&states[0..is_set_idx], flags)?;
631621
// 1..is_set_idx range corresponds to ordering section
632-
let sort_cols = convert_to_sort_cols(
622+
let sort_columns = convert_to_sort_cols(
633623
&filtered_states[1..is_set_idx],
634624
self.ordering_req.as_ref(),
635625
);
636626

637-
let ordered_states = if sort_cols.is_empty() {
638-
// When no ordering is given, use existing state as is:
639-
filtered_states
640-
} else {
641-
let indices = lexsort_to_indices(&sort_cols, None)?;
642-
take_arrays(&filtered_states, &indices, None)?
643-
};
627+
let comparator = LexicographicalComparator::try_new(&sort_columns)?;
628+
let max = (0..filtered_states[0].len()).max_by(|&a, &b| comparator.compare(a, b));
644629

645-
if !ordered_states[0].is_empty() {
646-
let last_idx = ordered_states[0].len() - 1;
647-
let last_row = get_row_at_idx(&ordered_states, last_idx)?;
630+
if let Some(last_idx) = max {
631+
let last_row = get_row_at_idx(&filtered_states, last_idx)?;
648632
// When collecting orderings, we exclude the is_set flag from the state.
649633
let last_ordering = &last_row[1..is_set_idx];
650634
let sort_options = get_sort_options(self.ordering_req.as_ref());

datafusion/sqllogictest/test_files/group_by.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3003,7 +3003,7 @@ SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
30033003
LAST_VALUE(amount ORDER BY ts ASC) AS fv2
30043004
FROM sales_global
30053005
----
3006-
30 100
3006+
30 80
30073007

30083008
# Conversion in between FIRST_VALUE and LAST_VALUE to resolve
30093009
# contradictory requirements should work in multi partitions.

0 commit comments

Comments
 (0)