diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs index b9e4323744ec..7e0d800eb0f4 100644 --- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs +++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs @@ -441,3 +441,41 @@ impl Accumulator for ApproxPercentileAccumulator { - std::mem::size_of_val(&self.return_type) } } + +#[cfg(test)] +mod tests { + use crate::aggregate::approx_percentile_cont::ApproxPercentileAccumulator; + use crate::aggregate::tdigest::TDigest; + use arrow_schema::DataType; + use datafusion_common::ScalarValue; + use datafusion_expr::Accumulator; + + #[test] + fn test_combine_approx_percentile_accumulator() { + let mut digests: Vec = Vec::new(); + + // one TDigest with 50_000 values from 1 to 1_000 + for _ in 1..=50 { + let t = TDigest::new(100); + let values: Vec<_> = (1..=1_000).map(f64::from).collect(); + let t = t.merge_unsorted_f64(values); + digests.push(t) + } + + let t1 = TDigest::merge_digests(&digests); + let t2 = TDigest::merge_digests(&digests); + + let mut accumulator = + ApproxPercentileAccumulator::new_with_max_size(0.5, DataType::Float64, 100); + + accumulator.merge_digests(&[t1]); + assert_eq!(accumulator.digest.count(), 50_000.0); + accumulator.merge_digests(&[t2]); + assert_eq!(accumulator.digest.count(), 100_000.0); + + assert_eq!( + accumulator.evaluate().unwrap(), + ScalarValue::Float64(Some(500.0)) + ); + } +}