@@ -4106,6 +4106,7 @@ class TestRandomQuantileKernel : public TestPrimitiveQuantileKernel<ArrowType> {
41064106 VerifyTDigest (chunked, quantiles);
41074107 VerifyTDigestMapQuantile (chunked, quantiles);
41084108 VerifyTDigestMapReduceQuantile (chunked, quantiles);
4109+ VerifyTDigestMapReduceIncrementalQuantile (chunked, quantiles);
41094110 }
41104111
41114112 void CheckTDigestsSliced (const std::vector<int >& chunk_sizes, int64_t num_quantiles) {
@@ -4124,6 +4125,7 @@ class TestRandomQuantileKernel : public TestPrimitiveQuantileKernel<ArrowType> {
41244125 VerifyTDigest (chunked->Slice (os[0 ], os[1 ]), quantiles);
41254126 VerifyTDigestMapQuantile (chunked->Slice (os[0 ], os[1 ]), quantiles);
41264127 VerifyTDigestMapReduceQuantile (chunked->Slice (os[0 ], os[1 ]), quantiles);
4128+ VerifyTDigestMapReduceIncrementalQuantile (chunked->Slice (os[0 ], os[1 ]), quantiles);
41274129 }
41284130 }
41294131
@@ -4193,8 +4195,51 @@ class TestRandomQuantileKernel : public TestPrimitiveQuantileKernel<ArrowType> {
41934195 map_chunks.push_back (std::move (map_chunk));
41944196 }
41954197 auto map_chunked = std::make_shared<ChunkedArray>(std::move (map_chunks));
4198+ ASSERT_OK_AND_ASSIGN (Datum reduced, TDigestReduce (map_chunked));
41964199 TDigestQuantileOptions options (quantiles);
4197- ASSERT_OK_AND_ASSIGN (Datum out, TDigestQuantile (map_chunked, options));
4200+
4201+ ASSERT_OK_AND_ASSIGN (Datum out_alternative, TDigestQuantile (map_chunked, options));
4202+ ASSERT_OK_AND_ASSIGN (Datum out, TDigestQuantile (reduced, options));
4203+ ASSERT_EQ (out, out_alternative);
4204+ const auto & out_array = out.make_array ();
4205+ ValidateOutput (*out_array);
4206+ ASSERT_EQ (out_array->length (), quantiles.size ());
4207+ ASSERT_EQ (out_array->null_count (), 0 );
4208+ AssertTypeEqual (out_array->type (), float64 ());
4209+
4210+ // linear interpolated exact quantile as reference
4211+ std::vector<std::vector<Datum>> exact =
4212+ NaiveQuantile (*chunked, quantiles, {QuantileOptions::LINEAR});
4213+ const double * approx = out_array->data ()->GetValues <double >(1 );
4214+ for (size_t i = 0 ; i < quantiles.size (); ++i) {
4215+ const auto & exact_scalar = checked_pointer_cast<DoubleScalar>(exact[i][0 ].scalar ());
4216+ const double tolerance = std::fabs (exact_scalar->value ) * 0.05 ;
4217+ EXPECT_NEAR (approx[i], exact_scalar->value , tolerance) << quantiles[i];
4218+ }
4219+ }
4220+
4221+ void VerifyTDigestMapReduceIncrementalQuantile (
4222+ const std::shared_ptr<ChunkedArray>& chunked, std::vector<double >& quantiles) {
4223+ Datum out;
4224+ TDigestQuantileOptions options (quantiles);
4225+ std::shared_ptr<Array> incremental_centroids;
4226+ for (const auto & chunk : chunked->chunks ()) {
4227+ ASSERT_OK_AND_ASSIGN (Datum centroids, TDigestMap (chunk));
4228+ ASSERT_OK_AND_ASSIGN (auto map_chunk, MakeArrayFromScalar (*centroids.scalar (), 1 ));
4229+ if (incremental_centroids) {
4230+ auto map_chunked =
4231+ std::make_shared<ChunkedArray>(ArrayVector{incremental_centroids, map_chunk});
4232+ ASSERT_OK_AND_ASSIGN (Datum reduced, TDigestReduce (map_chunked));
4233+ ASSERT_OK_AND_ASSIGN (incremental_centroids,
4234+ MakeArrayFromScalar (*reduced.scalar (), 1 ));
4235+ } else {
4236+ incremental_centroids = map_chunk;
4237+ }
4238+
4239+ ASSERT_OK_AND_ASSIGN (
4240+ out, TDigestQuantile (incremental_centroids, options)); // incremental quantile
4241+ }
4242+
41984243 const auto & out_array = out.make_array ();
41994244 ValidateOutput (*out_array);
42004245 ASSERT_EQ (out_array->length (), quantiles.size ());
0 commit comments