diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 70093f30be6..7e49e761dd5 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -854,11 +854,6 @@ class VectorExecutor : public KernelExecutorImpl { protected: Status ExecuteBatch(const ExecBatch& batch, ExecListener* listener) { - if (batch.length == 0) { - // Skip empty batches. This may only happen when not using - // ExecBatchIterator - return Status::OK(); - } Datum out; if (output_descr_.shape == ValueDescr::ARRAY) { // We preallocate (maybe) only for the output of processing the current diff --git a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc index bfd97f813e5..62e375e6950 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc @@ -104,14 +104,14 @@ struct SortQuantiler { } // prepare out array - int64_t out_length = options.q.size(); - if (in_buffer.empty()) { - out_length = 0; // input is empty or only contains null and nan, return empty array - } // out type depends on options const bool is_datapoint = IsDataPoint(options); const std::shared_ptr out_type = is_datapoint ? TypeTraits::type_singleton() : float64(); + int64_t out_length = options.q.size(); + if (in_buffer.empty()) { + return MakeArrayOfNull(out_type, out_length, ctx->memory_pool()).Value(out); + } auto out_data = ArrayData::Make(out_type, out_length, 0); out_data->buffers.resize(2, nullptr); @@ -245,14 +245,14 @@ struct CountQuantiler { } // prepare out array - int64_t out_length = options.q.size(); - if (in_length == 0) { - out_length = 0; // input is empty or only contains null, return empty array - } // out type depends on options const bool is_datapoint = IsDataPoint(options); const std::shared_ptr out_type = is_datapoint ? TypeTraits::type_singleton() : float64(); + int64_t out_length = options.q.size(); + if (in_length == 0) { + return MakeArrayOfNull(out_type, out_length, ctx->memory_pool()).Value(out); + } auto out_data = ArrayData::Make(out_type, out_length, 0); out_data->buffers.resize(2, nullptr); @@ -404,17 +404,27 @@ Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options, const Scalar& scalar, Datum* out) { using CType = typename T::c_type; ArrayData* output = out->mutable_array(); - if (!scalar.is_valid || options.min_count > 1) { - output->length = 0; - output->null_count = 0; - return Status::OK(); - } - auto out_type = IsDataPoint(options) ? scalar.type : float64(); output->length = options.q.size(); - output->null_count = 0; + auto out_type = IsDataPoint(options) ? scalar.type : float64(); ARROW_ASSIGN_OR_RAISE( output->buffers[1], ctx->Allocate(output->length * BitUtil::BytesForBits(GetBitWidth(*out_type)))); + + if (!scalar.is_valid || options.min_count > 1) { + output->null_count = output->length; + ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(output->length)); + BitUtil::SetBitsTo(output->buffers[0]->mutable_data(), /*offset=*/0, output->length, + false); + if (IsDataPoint(options)) { + CType* out_buffer = output->template GetMutableValues(1); + std::fill(out_buffer, out_buffer + output->length, CType(0)); + } else { + double* out_buffer = output->template GetMutableValues(1); + std::fill(out_buffer, out_buffer + output->length, 0.0); + } + return Status::OK(); + } + output->null_count = 0; if (IsDataPoint(options)) { CType* out_buffer = output->template GetMutableValues(1); for (int64_t i = 0; i < output->length; i++) { @@ -484,7 +494,7 @@ const FunctionDoc quantile_doc{ "If quantile lies between two data points, an interpolated value is\n" "returned based on selected interpolation method.\n" "Nulls and NaNs are ignored.\n" - "An empty array is returned if there is no valid data point."), + "An array of nulls is returned if there is no valid data point."), {"array"}, "QuantileOptions"}; diff --git a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc index 3b616c664a9..52ddb3674b4 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc @@ -85,22 +85,24 @@ struct TDigestImpl : public ScalarAggregator { } Status Finalize(KernelContext* ctx, Datum* out) override { - const int64_t out_length = - (this->tdigest.is_empty() || !this->all_valid || this->count < options.min_count) - ? 0 - : options.q.size(); + const int64_t out_length = options.q.size(); auto out_data = ArrayData::Make(float64(), out_length, 0); out_data->buffers.resize(2, nullptr); - - if (out_length > 0) { - ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], - ctx->Allocate(out_length * sizeof(double))); - double* out_buffer = out_data->template GetMutableValues(1); + ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], + ctx->Allocate(out_length * sizeof(double))); + double* out_buffer = out_data->template GetMutableValues(1); + + if (this->tdigest.is_empty() || !this->all_valid || this->count < options.min_count) { + ARROW_ASSIGN_OR_RAISE(out_data->buffers[0], ctx->AllocateBitmap(out_length)); + std::memset(out_data->buffers[0]->mutable_data(), 0x00, + out_data->buffers[0]->size()); + std::fill(out_buffer, out_buffer + out_length, 0.0); + out_data->null_count = out_length; + } else { for (int64_t i = 0; i < out_length; ++i) { out_buffer[i] = this->tdigest.Quantile(this->options.q[i]); } } - *out = Datum(std::move(out_data)); return Status::OK(); } @@ -161,7 +163,7 @@ const FunctionDoc tdigest_doc{ "Approximate quantiles of a numeric array with T-Digest algorithm", ("By default, 0.5 quantile (median) is returned.\n" "Nulls and NaNs are ignored.\n" - "An empty array is returned if there is no valid data point."), + "An array of nulls is returned if there is no valid data point."), {"array"}, "TDigestOptions"}; diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index f3d470c42de..98bf156195f 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -2871,7 +2871,8 @@ class TestPrimitiveQuantileKernel : public ::testing::Test { ASSERT_OK_AND_ASSIGN(Datum out, Quantile(array, options)); auto out_array = out.make_array(); ValidateOutput(*out_array); - ASSERT_EQ(out.array()->length, 0); + ASSERT_EQ(out.array()->length, q.size()); + ASSERT_EQ(out.array()->null_count, q.size()); } } @@ -2943,7 +2944,7 @@ TYPED_TEST(TestIntegerQuantileKernel, Basics) { QuantileOptions keep_nulls_min_count(/*q=*/0.5, QuantileOptions::LINEAR, /*skip_nulls=*/false, /*min_count=*/3); auto not_empty = ResultWith(ArrayFromJSON(float64(), "[3.0]")); - auto empty = ResultWith(ArrayFromJSON(float64(), "[]")); + auto empty = ResultWith(ArrayFromJSON(float64(), "[null]")); EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5]"), keep_nulls), not_empty); EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5, null]"), keep_nulls), empty); EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5]"), keep_nulls), not_empty); @@ -2974,7 +2975,7 @@ TYPED_TEST(TestIntegerQuantileKernel, Basics) { EXPECT_THAT(Quantile(*MakeScalar(ty, 1), options), ResultWith(ArrayFromJSON(expected_ty, "[1, 1, 1]"))); EXPECT_THAT(Quantile(MakeNullScalar(ty), options), - ResultWith(ArrayFromJSON(expected_ty, "[]"))); + ResultWith(ArrayFromJSON(expected_ty, "[null, null, null]"))); } } @@ -3019,7 +3020,7 @@ TYPED_TEST(TestFloatingQuantileKernel, Floats) { QuantileOptions keep_nulls_min_count(/*q=*/0.5, QuantileOptions::LINEAR, /*skip_nulls=*/false, /*min_count=*/3); auto not_empty = ResultWith(ArrayFromJSON(float64(), "[3.0]")); - auto empty = ResultWith(ArrayFromJSON(float64(), "[]")); + auto empty = ResultWith(ArrayFromJSON(float64(), "[null]")); EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5]"), keep_nulls), not_empty); EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5, null]"), keep_nulls), empty); EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5]"), keep_nulls), not_empty); @@ -3050,7 +3051,7 @@ TYPED_TEST(TestFloatingQuantileKernel, Floats) { EXPECT_THAT(Quantile(*MakeScalar(ty, 1), options), ResultWith(ArrayFromJSON(expected_ty, "[1, 1, 1]"))); EXPECT_THAT(Quantile(MakeNullScalar(ty), options), - ResultWith(ArrayFromJSON(expected_ty, "[]"))); + ResultWith(ArrayFromJSON(expected_ty, "[null, null, null]"))); } } @@ -3312,6 +3313,73 @@ TEST_F(TestRandomFloatQuantileKernel, Sliced) { } #endif +TEST(TestQuantileKernel, AllNullsOrNaNs) { + const std::vector> tests = { + {"[]"}, + {"[null, null]", "[]", "[null]"}, + {"[NaN]", "[NaN, NaN]", "[]"}, + {"[null, NaN, null]"}, + {"[NaN, NaN]", "[]", "[null]"}, + }; + + for (const auto& json : tests) { + auto chunked = ChunkedArrayFromJSON(float64(), json); + ASSERT_OK_AND_ASSIGN(Datum out, Quantile(chunked, QuantileOptions())); + auto out_array = out.make_array(); + ValidateOutput(*out_array); + AssertArraysEqual(*ArrayFromJSON(float64(), "[null]"), *out_array, /*verbose=*/true); + } +} + +TEST(TestQuantileKernel, Scalar) { + for (const auto& ty : {float64(), int64(), uint64()}) { + QuantileOptions options(std::vector{0.0, 0.5, 1.0}); + EXPECT_THAT(Quantile(*MakeScalar(ty, 1), options), + ResultWith(ArrayFromJSON(float64(), "[1.0, 1.0, 1.0]"))); + EXPECT_THAT(Quantile(MakeNullScalar(ty), options), + ResultWith(ArrayFromJSON(float64(), "[null, null, null]"))); + } +} + +TEST(TestQuantileKernel, Options) { + auto ty = float64(); + QuantileOptions keep_nulls(/*q=*/0.5, QuantileOptions::LINEAR, + /*skip_nulls=*/false, /*min_count=*/0); + QuantileOptions min_count(/*q=*/0.5, QuantileOptions::LINEAR, + /*skip_nulls=*/true, /*min_count=*/3); + QuantileOptions keep_nulls_min_count(/*q=*/0.5, QuantileOptions::NEAREST, + /*skip_nulls=*/false, /*min_count=*/3); + + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1.0, 2.0, 3.0]"), keep_nulls), + ResultWith(ArrayFromJSON(ty, "[2.0]"))); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1.0, 2.0, 3.0, null]"), keep_nulls), + ResultWith(ArrayFromJSON(ty, "[null]"))); + EXPECT_THAT(Quantile(ScalarFromJSON(ty, "1.0"), keep_nulls), + ResultWith(ArrayFromJSON(ty, "[1.0]"))); + EXPECT_THAT(Quantile(ScalarFromJSON(ty, "null"), keep_nulls), + ResultWith(ArrayFromJSON(ty, "[null]"))); + + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1.0, 2.0, 3.0, null]"), min_count), + ResultWith(ArrayFromJSON(ty, "[2.0]"))); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1.0, 2.0, null]"), min_count), + ResultWith(ArrayFromJSON(ty, "[null]"))); + EXPECT_THAT(Quantile(ScalarFromJSON(ty, "1.0"), min_count), + ResultWith(ArrayFromJSON(ty, "[null]"))); + EXPECT_THAT(Quantile(ScalarFromJSON(ty, "null"), min_count), + ResultWith(ArrayFromJSON(ty, "[null]"))); + + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1.0, 2.0, 3.0]"), keep_nulls_min_count), + ResultWith(ArrayFromJSON(ty, "[2.0]"))); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1.0, 2.0]"), keep_nulls_min_count), + ResultWith(ArrayFromJSON(ty, "[null]"))); + EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1.0, 2.0, 3.0, null]"), keep_nulls_min_count), + ResultWith(ArrayFromJSON(ty, "[null]"))); + EXPECT_THAT(Quantile(ScalarFromJSON(ty, "1.0"), keep_nulls_min_count), + ResultWith(ArrayFromJSON(ty, "[null]"))); + EXPECT_THAT(Quantile(ScalarFromJSON(ty, "null"), keep_nulls_min_count), + ResultWith(ArrayFromJSON(ty, "[null]"))); +} + TEST(TestTDigestKernel, AllNullsOrNaNs) { const std::vector> tests = { {"[]"}, @@ -3326,7 +3394,7 @@ TEST(TestTDigestKernel, AllNullsOrNaNs) { ASSERT_OK_AND_ASSIGN(Datum out, TDigest(chunked, TDigestOptions())); auto out_array = out.make_array(); ValidateOutput(*out_array); - ASSERT_EQ(out.array()->length, 0); + AssertArraysEqual(*ArrayFromJSON(float64(), "[null]"), *out_array, /*verbose=*/true); } } @@ -3335,6 +3403,8 @@ TEST(TestTDigestKernel, Scalar) { TDigestOptions options(std::vector{0.0, 0.5, 1.0}); EXPECT_THAT(TDigest(*MakeScalar(ty, 1), options), ResultWith(ArrayFromJSON(float64(), "[1, 1, 1]"))); + EXPECT_THAT(TDigest(MakeNullScalar(ty), options), + ResultWith(ArrayFromJSON(float64(), "[null, null, null]"))); } } @@ -3350,31 +3420,31 @@ TEST(TestTDigestKernel, Options) { EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, 3.0]"), keep_nulls), ResultWith(ArrayFromJSON(ty, "[2.0]"))); EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, 3.0, null]"), keep_nulls), - ResultWith(ArrayFromJSON(ty, "[]"))); + ResultWith(ArrayFromJSON(ty, "[null]"))); EXPECT_THAT(TDigest(ScalarFromJSON(ty, "1.0"), keep_nulls), ResultWith(ArrayFromJSON(ty, "[1.0]"))); EXPECT_THAT(TDigest(ScalarFromJSON(ty, "null"), keep_nulls), - ResultWith(ArrayFromJSON(ty, "[]"))); + ResultWith(ArrayFromJSON(ty, "[null]"))); EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, 3.0, null]"), min_count), ResultWith(ArrayFromJSON(ty, "[2.0]"))); EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, null]"), min_count), - ResultWith(ArrayFromJSON(ty, "[]"))); + ResultWith(ArrayFromJSON(ty, "[null]"))); EXPECT_THAT(TDigest(ScalarFromJSON(ty, "1.0"), min_count), - ResultWith(ArrayFromJSON(ty, "[]"))); + ResultWith(ArrayFromJSON(ty, "[null]"))); EXPECT_THAT(TDigest(ScalarFromJSON(ty, "null"), min_count), - ResultWith(ArrayFromJSON(ty, "[]"))); + ResultWith(ArrayFromJSON(ty, "[null]"))); EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, 3.0]"), keep_nulls_min_count), ResultWith(ArrayFromJSON(ty, "[2.0]"))); EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0]"), keep_nulls_min_count), - ResultWith(ArrayFromJSON(ty, "[]"))); + ResultWith(ArrayFromJSON(ty, "[null]"))); EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, 3.0, null]"), keep_nulls_min_count), - ResultWith(ArrayFromJSON(ty, "[]"))); + ResultWith(ArrayFromJSON(ty, "[null]"))); EXPECT_THAT(TDigest(ScalarFromJSON(ty, "1.0"), keep_nulls_min_count), - ResultWith(ArrayFromJSON(ty, "[]"))); + ResultWith(ArrayFromJSON(ty, "[null]"))); EXPECT_THAT(TDigest(ScalarFromJSON(ty, "null"), keep_nulls_min_count), - ResultWith(ArrayFromJSON(ty, "[]"))); + ResultWith(ArrayFromJSON(ty, "[null]"))); } } // namespace compute diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index 2be424751d8..843d62911a7 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -1661,17 +1661,18 @@ struct GroupedTDigestImpl : public GroupedAggregator { } Result Finalize() override { + const int64_t slot_length = options_.q.size(); + const int64_t num_values = tdigests_.size() * slot_length; const int64_t* counts = counts_.data(); std::shared_ptr null_bitmap; - ARROW_ASSIGN_OR_RAISE( - std::shared_ptr values, - AllocateBuffer(tdigests_.size() * options_.q.size() * sizeof(double), pool_)); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr values, + AllocateBuffer(num_values * sizeof(double), pool_)); int64_t null_count = 0; - const int64_t slot_length = options_.q.size(); double* results = reinterpret_cast(values->mutable_data()); for (int64_t i = 0; static_cast(i) < tdigests_.size(); ++i) { - if (!tdigests_[i].is_empty() && counts[i] >= options_.min_count) { + if (!tdigests_[i].is_empty() && counts[i] >= options_.min_count && + (options_.skip_nulls || BitUtil::GetBit(no_nulls_.data(), i))) { for (int64_t j = 0; j < slot_length; j++) { results[i * slot_length + j] = tdigests_[i].Quantile(options_.q[j]); } @@ -1679,30 +1680,19 @@ struct GroupedTDigestImpl : public GroupedAggregator { } if (!null_bitmap) { - ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(tdigests_.size(), pool_)); - BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, tdigests_.size(), true); + ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_values, pool_)); + BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_values, true); } - null_count++; - BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false); + null_count += slot_length; + BitUtil::SetBitsTo(null_bitmap->mutable_data(), i * slot_length, slot_length, + false); std::fill(&results[i * slot_length], &results[(i + 1) * slot_length], 0.0); } - if (!options_.skip_nulls) { - null_count = kUnknownNullCount; - if (null_bitmap) { - arrow::internal::BitmapAnd(null_bitmap->data(), /*left_offset=*/0, - no_nulls_.data(), /*right_offset=*/0, - static_cast(tdigests_.size()), - /*out_offset=*/0, null_bitmap->mutable_data()); - } else { - ARROW_ASSIGN_OR_RAISE(null_bitmap, no_nulls_.Finish()); - } - } - - auto child = ArrayData::Make(float64(), tdigests_.size() * options_.q.size(), - {nullptr, std::move(values)}, /*null_count=*/0); - return ArrayData::Make(out_type(), tdigests_.size(), {std::move(null_bitmap)}, - {std::move(child)}, null_count); + auto child = ArrayData::Make(float64(), num_values, + {std::move(null_bitmap), std::move(values)}, null_count); + return ArrayData::Make(out_type(), tdigests_.size(), {nullptr}, {std::move(child)}, + /*null_count=*/0); } std::shared_ptr out_type() const override { @@ -2642,7 +2632,7 @@ const FunctionDoc hash_tdigest_doc{ "Calculate approximate quantiles of a numeric array with the T-Digest algorithm", ("By default, the 0.5 quantile (median) is returned.\n" "Nulls and NaNs are ignored.\n" - "A null array is returned if there are no valid data points."), + "A array of nulls is returned if there are no valid data points."), {"array", "group_id_array"}, "TDigestOptions"}; diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc index c8894e530ef..f90e71bf670 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc @@ -1144,11 +1144,11 @@ TEST(GroupBy, TDigest) { field("key_0", int64()), }), R"([ - [[1.0], [1.0, 3.0, 3.0], [1.0, 3.0, 3.0], null, null, null, 1], - [[0.0], [0.0, 0.0, 0.0], [0.0, 0.0, 0.0], [0.0], [0.0], [0.0], 2], - [null, null, null, null, null, null, 3], - [[1.0], [1.0, 1.0, 1.0], [1.0, 1.0, 1.0], null, [1.0], null, 4], - [[1.0], [1.0, 4.0, 4.0], [1.0, 4.0, 4.0], [1.0], null, null, null] + [[1.0], [1.0, 3.0, 3.0], [1.0, 3.0, 3.0], [null], [null], [null], 1], + [[0.0], [0.0, 0.0, 0.0], [0.0, 0.0, 0.0], [0.0], [0.0], [0.0], 2], + [[null], [null, null, null], [null, null, null], [null], [null], [null], 3], + [[1.0], [1.0, 1.0, 1.0], [1.0, 1.0, 1.0], [null], [1.0], [null], 4], + [[1.0], [1.0, 4.0, 4.0], [1.0, 4.0, 4.0], [1.0], [null], [null], null] ])"), aggregated_and_grouped, /*verbose=*/true);