Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion datafusion/ffi/src/udaf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,14 +717,15 @@ mod tests {
let foreign_udaf = create_test_foreign_udaf(Sum::new())?;

let schema = Schema::new(vec![Field::new("a", DataType::Float64, true)]);
// Note: sum distinct is only support Int64 until now
let acc_args = AccumulatorArgs {
return_field: Field::new("f", DataType::Float64, true).into(),
schema: &schema,
ignore_nulls: true,
order_bys: &[PhysicalSortExpr::new_default(col("a", &schema)?)],
is_reversed: false,
name: "round_trip",
is_distinct: true,
is_distinct: false,
exprs: &[col("a", &schema)?],
};

Expand Down
127 changes: 121 additions & 6 deletions datafusion/functions-aggregate/src/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use arrow::datatypes::{
};
use arrow::{array::ArrayRef, datatypes::Field};
use datafusion_common::{
exec_err, not_impl_err, utils::take_function_args, Result, ScalarValue,
exec_err, not_impl_err, utils::take_function_args, HashMap, Result, ScalarValue,
};
use datafusion_expr::function::AccumulatorArgs;
use datafusion_expr::function::StateFieldsArgs;
Expand Down Expand Up @@ -243,12 +243,23 @@ impl AggregateUDFImpl for Sum {
&self,
args: AccumulatorArgs,
) -> Result<Box<dyn Accumulator>> {
macro_rules! helper {
($t:ty, $dt:expr) => {
Ok(Box::new(SlidingSumAccumulator::<$t>::new($dt.clone())))
};
if args.is_distinct {
// distinct path: use our sliding‐window distinct‐sum
macro_rules! helper_distinct {
($t:ty, $dt:expr) => {
Ok(Box::new(SlidingDistinctSumAccumulator::try_new(&$dt)?))
};
}
downcast_sum!(args, helper_distinct)
} else {
// non‐distinct path: existing sliding sum
macro_rules! helper {
($t:ty, $dt:expr) => {
Ok(Box::new(SlidingSumAccumulator::<$t>::new($dt.clone())))
};
}
downcast_sum!(args, helper)
}
downcast_sum!(args, helper)
}

fn reverse_expr(&self) -> ReversedUDAF {
Expand Down Expand Up @@ -477,3 +488,107 @@ impl<T: ArrowPrimitiveType> Accumulator for DistinctSumAccumulator<T> {
size_of_val(self) + self.values.capacity() * size_of::<T::Native>()
}
}

/// A sliding‐window accumulator for `SUM(DISTINCT)` over Int64 columns.
/// Maintains a running sum so that `evaluate()` is O(1).
#[derive(Debug)]
pub struct SlidingDistinctSumAccumulator {
/// Map each distinct value → its current count in the window
counts: HashMap<i64, usize, RandomState>,
/// Running sum of all distinct keys currently in the window
sum: i64,
/// Data type (must be Int64)
data_type: DataType,
}

impl SlidingDistinctSumAccumulator {
/// Create a new accumulator; only `DataType::Int64` is supported.
pub fn try_new(data_type: &DataType) -> Result<Self> {
// TODO support other numeric types
if *data_type != DataType::Int64 {
return exec_err!("SlidingDistinctSumAccumulator only supports Int64");
}
Ok(Self {
counts: HashMap::default(),
sum: 0,
data_type: data_type.clone(),
})
}
}

impl Accumulator for SlidingDistinctSumAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let arr = values[0].as_primitive::<Int64Type>();
for &v in arr.values() {
let cnt = self.counts.entry(v).or_insert(0);
if *cnt == 0 {
// first occurrence in window
self.sum = self.sum.wrapping_add(v);
}
*cnt += 1;
}
Ok(())
}

fn evaluate(&mut self) -> Result<ScalarValue> {
// O(1) wrap of running sum
Ok(ScalarValue::Int64(Some(self.sum)))
}

fn size(&self) -> usize {
size_of_val(self)
}

fn state(&mut self) -> Result<Vec<ScalarValue>> {
// Serialize distinct keys for cross-partition merge if needed
let keys = self
.counts
.keys()
.cloned()
.map(Some)
.map(ScalarValue::Int64)
.collect::<Vec<_>>();
Ok(vec![ScalarValue::List(ScalarValue::new_list_nullable(
&keys,
&self.data_type,
))])
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
// Merge distinct keys from other partitions
let list_arr = states[0].as_list::<i32>();
for maybe_inner in list_arr.iter().flatten() {
for idx in 0..maybe_inner.len() {
if let ScalarValue::Int64(Some(v)) =
ScalarValue::try_from_array(&*maybe_inner, idx)?
{
let cnt = self.counts.entry(v).or_insert(0);
if *cnt == 0 {
self.sum = self.sum.wrapping_add(v);
}
*cnt += 1;
}
}
}
Ok(())
}

fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let arr = values[0].as_primitive::<Int64Type>();
for &v in arr.values() {
if let Some(cnt) = self.counts.get_mut(&v) {
*cnt -= 1;
if *cnt == 0 {
// last copy leaving window
self.sum = self.sum.wrapping_sub(v);
self.counts.remove(&v);
}
}
}
Ok(())
}

fn supports_retract_batch(&self) -> bool {
true
}
}
85 changes: 75 additions & 10 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5715,17 +5715,82 @@ EXPLAIN SELECT
RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
) AS distinct_count
FROM table_test_distinct_count
ODER BY k, time;
ORDER BY k, time;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, this is fix the typo here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Context: This came from #16887 and @zhuqi-lucas spotted my mistake in #16888.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @crepererum for quick review!

----
logical_plan
01)Projection: oder.k, oder.time, count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW AS normal_count, count(DISTINCT oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW AS distinct_count
02)--WindowAggr: windowExpr=[[count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW, count(DISTINCT oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS count(DISTINCT oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW]]
03)----SubqueryAlias: oder
01)Sort: table_test_distinct_count.k ASC NULLS LAST, table_test_distinct_count.time ASC NULLS LAST
02)--Projection: table_test_distinct_count.k, table_test_distinct_count.time, count(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW AS normal_count, count(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW AS distinct_count
03)----WindowAggr: windowExpr=[[count(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS count(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW, count(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS count(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW]]
04)------TableScan: table_test_distinct_count projection=[k, v, time]
physical_plan
01)ProjectionExec: expr=[k@0 as k, time@2 as time, count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW@3 as normal_count, count(DISTINCT oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW@4 as distinct_count]
02)--BoundedWindowAggExec: wdw=[count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: Field { name: "count(oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW, count(DISTINCT oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: Field { name: "count(DISTINCT oder.v) PARTITION BY [oder.k] ORDER BY [oder.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW], mode=[Sorted]
03)----SortExec: expr=[k@0 ASC NULLS LAST, time@2 ASC NULLS LAST], preserve_partitioning=[true]
04)------CoalesceBatchesExec: target_batch_size=1
05)--------RepartitionExec: partitioning=Hash([k@0], 2), input_partitions=2
06)----------DataSourceExec: partitions=2, partition_sizes=[5, 4]
01)SortPreservingMergeExec: [k@0 ASC NULLS LAST, time@1 ASC NULLS LAST]
02)--ProjectionExec: expr=[k@0 as k, time@2 as time, count(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW@3 as normal_count, count(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW@4 as distinct_count]
03)----BoundedWindowAggExec: wdw=[count(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: Field { name: "count(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW, count(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: Field { name: "count(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW], mode=[Sorted]
04)------SortExec: expr=[k@0 ASC NULLS LAST, time@2 ASC NULLS LAST], preserve_partitioning=[true]
05)--------CoalesceBatchesExec: target_batch_size=1
06)----------RepartitionExec: partitioning=Hash([k@0], 2), input_partitions=2
07)------------DataSourceExec: partitions=2, partition_sizes=[5, 4]


# Add testing for distinct sum
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the corresponding slt testing for this PR.

query TPII
SELECT
k,
time,
SUM(v) OVER (
PARTITION BY k
ORDER BY time
RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
) AS sum_v,
SUM(DISTINCT v) OVER (
PARTITION BY k
ORDER BY time
RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
) AS sum_distinct_v
FROM table_test_distinct_count
ORDER BY k, time;
----
a 1970-01-01T00:01:00Z 1 1
a 1970-01-01T00:02:00Z 2 1
a 1970-01-01T00:03:00Z 5 3
a 1970-01-01T00:03:00Z 5 3
a 1970-01-01T00:04:00Z 5 3
b 1970-01-01T00:01:00Z 3 3
b 1970-01-01T00:02:00Z 6 3
b 1970-01-01T00:03:00Z 14 7
b 1970-01-01T00:03:00Z 14 7



query TT
EXPLAIN SELECT
k,
time,
SUM(v) OVER (
PARTITION BY k
ORDER BY time
RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
) AS sum_v,
SUM(DISTINCT v) OVER (
PARTITION BY k
ORDER BY time
RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
) AS sum_distinct_v
FROM table_test_distinct_count
ORDER BY k, time;
----
logical_plan
01)Sort: table_test_distinct_count.k ASC NULLS LAST, table_test_distinct_count.time ASC NULLS LAST
02)--Projection: table_test_distinct_count.k, table_test_distinct_count.time, sum(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW AS sum_v, sum(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW AS sum_distinct_v
03)----WindowAggr: windowExpr=[[sum(__common_expr_1) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS sum(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW, sum(DISTINCT __common_expr_1) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS sum(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW]]
04)------Projection: CAST(table_test_distinct_count.v AS Int64) AS __common_expr_1, table_test_distinct_count.k, table_test_distinct_count.time
05)--------TableScan: table_test_distinct_count projection=[k, v, time]
physical_plan
01)SortPreservingMergeExec: [k@0 ASC NULLS LAST, time@1 ASC NULLS LAST]
02)--ProjectionExec: expr=[k@1 as k, time@2 as time, sum(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW@3 as sum_v, sum(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW@4 as sum_distinct_v]
03)----BoundedWindowAggExec: wdw=[sum(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: Field { name: "sum(table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW, sum(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW: Field { name: "sum(DISTINCT table_test_distinct_count.v) PARTITION BY [table_test_distinct_count.k] ORDER BY [table_test_distinct_count.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW], mode=[Sorted]
04)------SortExec: expr=[k@1 ASC NULLS LAST, time@2 ASC NULLS LAST], preserve_partitioning=[true]
05)--------CoalesceBatchesExec: target_batch_size=1
06)----------RepartitionExec: partitioning=Hash([k@1], 2), input_partitions=2
07)------------ProjectionExec: expr=[CAST(v@1 AS Int64) as __common_expr_1, k@0 as k, time@2 as time]
08)--------------DataSourceExec: partitions=2, partition_sizes=[5, 4]