Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -854,11 +854,6 @@ class VectorExecutor : public KernelExecutorImpl<VectorKernel> {

protected:
Status ExecuteBatch(const ExecBatch& batch, ExecListener* listener) {
if (batch.length == 0) {
// Skip empty batches. This may only happen when not using
// ExecBatchIterator
return Status::OK();
}
Datum out;
if (output_descr_.shape == ValueDescr::ARRAY) {
// We preallocate (maybe) only for the output of processing the current
Expand Down
42 changes: 26 additions & 16 deletions cpp/src/arrow/compute/kernels/aggregate_quantile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@ struct SortQuantiler {
}

// prepare out array
int64_t out_length = options.q.size();
if (in_buffer.empty()) {
out_length = 0; // input is empty or only contains null and nan, return empty array
}
// out type depends on options
const bool is_datapoint = IsDataPoint(options);
const std::shared_ptr<DataType> out_type =
is_datapoint ? TypeTraits<InType>::type_singleton() : float64();
int64_t out_length = options.q.size();
if (in_buffer.empty()) {
return MakeArrayOfNull(out_type, out_length, ctx->memory_pool()).Value(out);
}
auto out_data = ArrayData::Make(out_type, out_length, 0);
out_data->buffers.resize(2, nullptr);

Expand Down Expand Up @@ -245,14 +245,14 @@ struct CountQuantiler {
}

// prepare out array
int64_t out_length = options.q.size();
if (in_length == 0) {
out_length = 0; // input is empty or only contains null, return empty array
}
// out type depends on options
const bool is_datapoint = IsDataPoint(options);
const std::shared_ptr<DataType> out_type =
is_datapoint ? TypeTraits<InType>::type_singleton() : float64();
int64_t out_length = options.q.size();
if (in_length == 0) {
return MakeArrayOfNull(out_type, out_length, ctx->memory_pool()).Value(out);
}
auto out_data = ArrayData::Make(out_type, out_length, 0);
out_data->buffers.resize(2, nullptr);

Expand Down Expand Up @@ -404,17 +404,27 @@ Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options,
const Scalar& scalar, Datum* out) {
using CType = typename T::c_type;
ArrayData* output = out->mutable_array();
if (!scalar.is_valid || options.min_count > 1) {
output->length = 0;
output->null_count = 0;
return Status::OK();
}
auto out_type = IsDataPoint(options) ? scalar.type : float64();
output->length = options.q.size();
output->null_count = 0;
auto out_type = IsDataPoint(options) ? scalar.type : float64();
ARROW_ASSIGN_OR_RAISE(
output->buffers[1],
ctx->Allocate(output->length * BitUtil::BytesForBits(GetBitWidth(*out_type))));

if (!scalar.is_valid || options.min_count > 1) {
output->null_count = output->length;
ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(output->length));
BitUtil::SetBitsTo(output->buffers[0]->mutable_data(), /*offset=*/0, output->length,
false);
if (IsDataPoint(options)) {
CType* out_buffer = output->template GetMutableValues<CType>(1);
std::fill(out_buffer, out_buffer + output->length, CType(0));
} else {
double* out_buffer = output->template GetMutableValues<double>(1);
std::fill(out_buffer, out_buffer + output->length, 0.0);
}
return Status::OK();
}
output->null_count = 0;
if (IsDataPoint(options)) {
CType* out_buffer = output->template GetMutableValues<CType>(1);
for (int64_t i = 0; i < output->length; i++) {
Expand Down Expand Up @@ -484,7 +494,7 @@ const FunctionDoc quantile_doc{
"If quantile lies between two data points, an interpolated value is\n"
"returned based on selected interpolation method.\n"
"Nulls and NaNs are ignored.\n"
"An empty array is returned if there is no valid data point."),
"An array of nulls is returned if there is no valid data point."),
{"array"},
"QuantileOptions"};

Expand Down
24 changes: 13 additions & 11 deletions cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,24 @@ struct TDigestImpl : public ScalarAggregator {
}

Status Finalize(KernelContext* ctx, Datum* out) override {
const int64_t out_length =
(this->tdigest.is_empty() || !this->all_valid || this->count < options.min_count)
? 0
: options.q.size();
const int64_t out_length = options.q.size();
auto out_data = ArrayData::Make(float64(), out_length, 0);
out_data->buffers.resize(2, nullptr);

if (out_length > 0) {
ARROW_ASSIGN_OR_RAISE(out_data->buffers[1],
ctx->Allocate(out_length * sizeof(double)));
double* out_buffer = out_data->template GetMutableValues<double>(1);
ARROW_ASSIGN_OR_RAISE(out_data->buffers[1],
ctx->Allocate(out_length * sizeof(double)));
double* out_buffer = out_data->template GetMutableValues<double>(1);

if (this->tdigest.is_empty() || !this->all_valid || this->count < options.min_count) {
ARROW_ASSIGN_OR_RAISE(out_data->buffers[0], ctx->AllocateBitmap(out_length));
std::memset(out_data->buffers[0]->mutable_data(), 0x00,
out_data->buffers[0]->size());
std::fill(out_buffer, out_buffer + out_length, 0.0);
out_data->null_count = out_length;
} else {
for (int64_t i = 0; i < out_length; ++i) {
out_buffer[i] = this->tdigest.Quantile(this->options.q[i]);
}
}

*out = Datum(std::move(out_data));
return Status::OK();
}
Expand Down Expand Up @@ -161,7 +163,7 @@ const FunctionDoc tdigest_doc{
"Approximate quantiles of a numeric array with T-Digest algorithm",
("By default, 0.5 quantile (median) is returned.\n"
"Nulls and NaNs are ignored.\n"
"An empty array is returned if there is no valid data point."),
"An array of nulls is returned if there is no valid data point."),
{"array"},
"TDigestOptions"};

Expand Down
100 changes: 85 additions & 15 deletions cpp/src/arrow/compute/kernels/aggregate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2871,7 +2871,8 @@ class TestPrimitiveQuantileKernel : public ::testing::Test {
ASSERT_OK_AND_ASSIGN(Datum out, Quantile(array, options));
auto out_array = out.make_array();
ValidateOutput(*out_array);
ASSERT_EQ(out.array()->length, 0);
ASSERT_EQ(out.array()->length, q.size());
ASSERT_EQ(out.array()->null_count, q.size());
}
}

Expand Down Expand Up @@ -2943,7 +2944,7 @@ TYPED_TEST(TestIntegerQuantileKernel, Basics) {
QuantileOptions keep_nulls_min_count(/*q=*/0.5, QuantileOptions::LINEAR,
/*skip_nulls=*/false, /*min_count=*/3);
auto not_empty = ResultWith(ArrayFromJSON(float64(), "[3.0]"));
auto empty = ResultWith(ArrayFromJSON(float64(), "[]"));
auto empty = ResultWith(ArrayFromJSON(float64(), "[null]"));
EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5]"), keep_nulls), not_empty);
EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5, null]"), keep_nulls), empty);
EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5]"), keep_nulls), not_empty);
Expand Down Expand Up @@ -2974,7 +2975,7 @@ TYPED_TEST(TestIntegerQuantileKernel, Basics) {
EXPECT_THAT(Quantile(*MakeScalar(ty, 1), options),
ResultWith(ArrayFromJSON(expected_ty, "[1, 1, 1]")));
EXPECT_THAT(Quantile(MakeNullScalar(ty), options),
ResultWith(ArrayFromJSON(expected_ty, "[]")));
ResultWith(ArrayFromJSON(expected_ty, "[null, null, null]")));
}
}

Expand Down Expand Up @@ -3019,7 +3020,7 @@ TYPED_TEST(TestFloatingQuantileKernel, Floats) {
QuantileOptions keep_nulls_min_count(/*q=*/0.5, QuantileOptions::LINEAR,
/*skip_nulls=*/false, /*min_count=*/3);
auto not_empty = ResultWith(ArrayFromJSON(float64(), "[3.0]"));
auto empty = ResultWith(ArrayFromJSON(float64(), "[]"));
auto empty = ResultWith(ArrayFromJSON(float64(), "[null]"));
EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5]"), keep_nulls), not_empty);
EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 2, 4, 5, null]"), keep_nulls), empty);
EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1, 5]"), keep_nulls), not_empty);
Expand Down Expand Up @@ -3050,7 +3051,7 @@ TYPED_TEST(TestFloatingQuantileKernel, Floats) {
EXPECT_THAT(Quantile(*MakeScalar(ty, 1), options),
ResultWith(ArrayFromJSON(expected_ty, "[1, 1, 1]")));
EXPECT_THAT(Quantile(MakeNullScalar(ty), options),
ResultWith(ArrayFromJSON(expected_ty, "[]")));
ResultWith(ArrayFromJSON(expected_ty, "[null, null, null]")));
}
}

Expand Down Expand Up @@ -3312,6 +3313,73 @@ TEST_F(TestRandomFloatQuantileKernel, Sliced) {
}
#endif

TEST(TestQuantileKernel, AllNullsOrNaNs) {
const std::vector<std::vector<std::string>> tests = {
{"[]"},
{"[null, null]", "[]", "[null]"},
{"[NaN]", "[NaN, NaN]", "[]"},
{"[null, NaN, null]"},
{"[NaN, NaN]", "[]", "[null]"},
};

for (const auto& json : tests) {
auto chunked = ChunkedArrayFromJSON(float64(), json);
ASSERT_OK_AND_ASSIGN(Datum out, Quantile(chunked, QuantileOptions()));
auto out_array = out.make_array();
ValidateOutput(*out_array);
AssertArraysEqual(*ArrayFromJSON(float64(), "[null]"), *out_array, /*verbose=*/true);
}
}

TEST(TestQuantileKernel, Scalar) {
for (const auto& ty : {float64(), int64(), uint64()}) {
QuantileOptions options(std::vector<double>{0.0, 0.5, 1.0});
EXPECT_THAT(Quantile(*MakeScalar(ty, 1), options),
ResultWith(ArrayFromJSON(float64(), "[1.0, 1.0, 1.0]")));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to also test with a null scalar here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, added (also added to TDigest's tests)

EXPECT_THAT(Quantile(MakeNullScalar(ty), options),
ResultWith(ArrayFromJSON(float64(), "[null, null, null]")));
}
}

TEST(TestQuantileKernel, Options) {
auto ty = float64();
QuantileOptions keep_nulls(/*q=*/0.5, QuantileOptions::LINEAR,
/*skip_nulls=*/false, /*min_count=*/0);
QuantileOptions min_count(/*q=*/0.5, QuantileOptions::LINEAR,
/*skip_nulls=*/true, /*min_count=*/3);
QuantileOptions keep_nulls_min_count(/*q=*/0.5, QuantileOptions::NEAREST,
/*skip_nulls=*/false, /*min_count=*/3);

EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1.0, 2.0, 3.0]"), keep_nulls),
ResultWith(ArrayFromJSON(ty, "[2.0]")));
EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1.0, 2.0, 3.0, null]"), keep_nulls),
ResultWith(ArrayFromJSON(ty, "[null]")));
EXPECT_THAT(Quantile(ScalarFromJSON(ty, "1.0"), keep_nulls),
ResultWith(ArrayFromJSON(ty, "[1.0]")));
EXPECT_THAT(Quantile(ScalarFromJSON(ty, "null"), keep_nulls),
ResultWith(ArrayFromJSON(ty, "[null]")));

EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1.0, 2.0, 3.0, null]"), min_count),
ResultWith(ArrayFromJSON(ty, "[2.0]")));
EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1.0, 2.0, null]"), min_count),
ResultWith(ArrayFromJSON(ty, "[null]")));
EXPECT_THAT(Quantile(ScalarFromJSON(ty, "1.0"), min_count),
ResultWith(ArrayFromJSON(ty, "[null]")));
EXPECT_THAT(Quantile(ScalarFromJSON(ty, "null"), min_count),
ResultWith(ArrayFromJSON(ty, "[null]")));

EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1.0, 2.0, 3.0]"), keep_nulls_min_count),
ResultWith(ArrayFromJSON(ty, "[2.0]")));
EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1.0, 2.0]"), keep_nulls_min_count),
ResultWith(ArrayFromJSON(ty, "[null]")));
EXPECT_THAT(Quantile(ArrayFromJSON(ty, "[1.0, 2.0, 3.0, null]"), keep_nulls_min_count),
ResultWith(ArrayFromJSON(ty, "[null]")));
EXPECT_THAT(Quantile(ScalarFromJSON(ty, "1.0"), keep_nulls_min_count),
ResultWith(ArrayFromJSON(ty, "[null]")));
EXPECT_THAT(Quantile(ScalarFromJSON(ty, "null"), keep_nulls_min_count),
ResultWith(ArrayFromJSON(ty, "[null]")));
}

TEST(TestTDigestKernel, AllNullsOrNaNs) {
const std::vector<std::vector<std::string>> tests = {
{"[]"},
Expand All @@ -3326,7 +3394,7 @@ TEST(TestTDigestKernel, AllNullsOrNaNs) {
ASSERT_OK_AND_ASSIGN(Datum out, TDigest(chunked, TDigestOptions()));
auto out_array = out.make_array();
ValidateOutput(*out_array);
ASSERT_EQ(out.array()->length, 0);
AssertArraysEqual(*ArrayFromJSON(float64(), "[null]"), *out_array, /*verbose=*/true);
}
}

Expand All @@ -3335,6 +3403,8 @@ TEST(TestTDigestKernel, Scalar) {
TDigestOptions options(std::vector<double>{0.0, 0.5, 1.0});
EXPECT_THAT(TDigest(*MakeScalar(ty, 1), options),
ResultWith(ArrayFromJSON(float64(), "[1, 1, 1]")));
EXPECT_THAT(TDigest(MakeNullScalar(ty), options),
ResultWith(ArrayFromJSON(float64(), "[null, null, null]")));
}
}

Expand All @@ -3350,31 +3420,31 @@ TEST(TestTDigestKernel, Options) {
EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, 3.0]"), keep_nulls),
ResultWith(ArrayFromJSON(ty, "[2.0]")));
EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, 3.0, null]"), keep_nulls),
ResultWith(ArrayFromJSON(ty, "[]")));
ResultWith(ArrayFromJSON(ty, "[null]")));
EXPECT_THAT(TDigest(ScalarFromJSON(ty, "1.0"), keep_nulls),
ResultWith(ArrayFromJSON(ty, "[1.0]")));
EXPECT_THAT(TDigest(ScalarFromJSON(ty, "null"), keep_nulls),
ResultWith(ArrayFromJSON(ty, "[]")));
ResultWith(ArrayFromJSON(ty, "[null]")));

EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, 3.0, null]"), min_count),
ResultWith(ArrayFromJSON(ty, "[2.0]")));
EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, null]"), min_count),
ResultWith(ArrayFromJSON(ty, "[]")));
ResultWith(ArrayFromJSON(ty, "[null]")));
EXPECT_THAT(TDigest(ScalarFromJSON(ty, "1.0"), min_count),
ResultWith(ArrayFromJSON(ty, "[]")));
ResultWith(ArrayFromJSON(ty, "[null]")));
EXPECT_THAT(TDigest(ScalarFromJSON(ty, "null"), min_count),
ResultWith(ArrayFromJSON(ty, "[]")));
ResultWith(ArrayFromJSON(ty, "[null]")));

EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, 3.0]"), keep_nulls_min_count),
ResultWith(ArrayFromJSON(ty, "[2.0]")));
EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0]"), keep_nulls_min_count),
ResultWith(ArrayFromJSON(ty, "[]")));
ResultWith(ArrayFromJSON(ty, "[null]")));
EXPECT_THAT(TDigest(ArrayFromJSON(ty, "[1.0, 2.0, 3.0, null]"), keep_nulls_min_count),
ResultWith(ArrayFromJSON(ty, "[]")));
ResultWith(ArrayFromJSON(ty, "[null]")));
EXPECT_THAT(TDigest(ScalarFromJSON(ty, "1.0"), keep_nulls_min_count),
ResultWith(ArrayFromJSON(ty, "[]")));
ResultWith(ArrayFromJSON(ty, "[null]")));
EXPECT_THAT(TDigest(ScalarFromJSON(ty, "null"), keep_nulls_min_count),
ResultWith(ArrayFromJSON(ty, "[]")));
ResultWith(ArrayFromJSON(ty, "[null]")));
}

} // namespace compute
Expand Down
42 changes: 16 additions & 26 deletions cpp/src/arrow/compute/kernels/hash_aggregate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1661,48 +1661,38 @@ struct GroupedTDigestImpl : public GroupedAggregator {
}

Result<Datum> Finalize() override {
const int64_t slot_length = options_.q.size();
const int64_t num_values = tdigests_.size() * slot_length;
const int64_t* counts = counts_.data();
std::shared_ptr<Buffer> null_bitmap;
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<Buffer> values,
AllocateBuffer(tdigests_.size() * options_.q.size() * sizeof(double), pool_));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> values,
AllocateBuffer(num_values * sizeof(double), pool_));
int64_t null_count = 0;
const int64_t slot_length = options_.q.size();

double* results = reinterpret_cast<double*>(values->mutable_data());
for (int64_t i = 0; static_cast<size_t>(i) < tdigests_.size(); ++i) {
if (!tdigests_[i].is_empty() && counts[i] >= options_.min_count) {
if (!tdigests_[i].is_empty() && counts[i] >= options_.min_count &&
(options_.skip_nulls || BitUtil::GetBit(no_nulls_.data(), i))) {
for (int64_t j = 0; j < slot_length; j++) {
results[i * slot_length + j] = tdigests_[i].Quantile(options_.q[j]);
}
continue;
}

if (!null_bitmap) {
ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(tdigests_.size(), pool_));
BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, tdigests_.size(), true);
ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_values, pool_));
BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_values, true);
}
null_count++;
BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false);
null_count += slot_length;
BitUtil::SetBitsTo(null_bitmap->mutable_data(), i * slot_length, slot_length,
false);
std::fill(&results[i * slot_length], &results[(i + 1) * slot_length], 0.0);
}

if (!options_.skip_nulls) {
null_count = kUnknownNullCount;
if (null_bitmap) {
arrow::internal::BitmapAnd(null_bitmap->data(), /*left_offset=*/0,
no_nulls_.data(), /*right_offset=*/0,
static_cast<int64_t>(tdigests_.size()),
/*out_offset=*/0, null_bitmap->mutable_data());
} else {
ARROW_ASSIGN_OR_RAISE(null_bitmap, no_nulls_.Finish());
}
}

auto child = ArrayData::Make(float64(), tdigests_.size() * options_.q.size(),
{nullptr, std::move(values)}, /*null_count=*/0);
return ArrayData::Make(out_type(), tdigests_.size(), {std::move(null_bitmap)},
{std::move(child)}, null_count);
auto child = ArrayData::Make(float64(), num_values,
{std::move(null_bitmap), std::move(values)}, null_count);
return ArrayData::Make(out_type(), tdigests_.size(), {nullptr}, {std::move(child)},
/*null_count=*/0);
}

std::shared_ptr<DataType> out_type() const override {
Expand Down Expand Up @@ -2642,7 +2632,7 @@ const FunctionDoc hash_tdigest_doc{
"Calculate approximate quantiles of a numeric array with the T-Digest algorithm",
("By default, the 0.5 quantile (median) is returned.\n"
"Nulls and NaNs are ignored.\n"
"A null array is returned if there are no valid data points."),
"A array of nulls is returned if there are no valid data points."),
{"array", "group_id_array"},
"TDigestOptions"};

Expand Down
Loading