Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhangHuiGui committed Mar 21, 2024
1 parent ab56353 commit d5c8ac2
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 50 deletions.
5 changes: 3 additions & 2 deletions cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ bool CheckIfAllScalar(const ExecBatch& batch) {
return false;
}
}
return batch.num_values() > 0 || batch.IsNull();
return batch.num_values() >= 0;
}

} // namespace
Expand Down Expand Up @@ -825,12 +825,13 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {

protected:
Status EmitResult(std::shared_ptr<ArrayData> out, ExecListener* listener) {
if (span_iterator_.have_all_scalars()) {
if (span_iterator_.have_all_scalars() && kernel_->is_pure) {
// ARROW-16757 We boxed scalar inputs as ArraySpan, so now we have to
// unbox the output as a scalar
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> scalar, MakeArray(out)->GetScalar(0));
return listener->OnResult(std::move(scalar));
} else {
// ARROW-40687 Ensure impure scalar function's result is array
return listener->OnResult(std::move(out));
}
}
Expand Down
6 changes: 0 additions & 6 deletions cpp/src/arrow/compute/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,6 @@ struct ARROW_EXPORT ExecBatch {
return result;
}

/// \brief A check for the empty ExecBatch with no output length
/// and function's arguments.
bool IsNull() const { return length == 0 && num_values() == 0; }

std::string ToString() const;
};

Expand Down Expand Up @@ -424,8 +420,6 @@ struct ARROW_EXPORT ExecSpan {
return result;
}

bool IsNull() const { return length == 0 && num_values() == 0; }

int64_t length = 0;
std::vector<ExecValue> values;
};
Expand Down
29 changes: 10 additions & 19 deletions cpp/src/arrow/compute/exec_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1426,28 +1426,19 @@ TEST(Ordering, IsSuborderOf) {
CheckOrdering(unordered, {true, true, true, true, true, true});
}

static Status RegisterMyScalarPureFunction() {
const std::string name = "my_scalar_function";
auto func = std::make_shared<ScalarFunction>(name, Arity::Nullary(),
FunctionDoc::Empty(), nullptr);
auto func_exec = [](KernelContext* /*ctx*/, const ExecSpan& /*batch*/,
ExecResult* out) -> Status {
auto scalar = MakeScalar("I am a scalar");
ARROW_ASSIGN_OR_RAISE(auto arr_res, MakeArrayFromScalar(*scalar, 1));
out->value = std::move(arr_res->data());
TEST(CallFunction, ScalarFunctionWithZeroLengthBatch) {
// Ensure a scalar function called with a zero length input batch
// does not get skipped if it is called with zero arguments
auto func = std::make_shared<ScalarFunction>("my_scalar_function", Arity::Nullary(),
FunctionDoc::Empty());

auto func_exec = [](KernelContext*, const ExecSpan&, ExecResult* out) {
out->value = ArrayFromJSON(utf8(), R"(["I am a scalar"])")->data();
return Status::OK();
};

ScalarKernel kernel({}, utf8(), func_exec);
ARROW_RETURN_NOT_OK(func->AddKernel(kernel));

auto registry = GetFunctionRegistry();
ARROW_RETURN_NOT_OK(registry->AddFunction(std::move(func)));
return Status::OK();
}

TEST(CallFunction, ScalarPureFunction) {
ASSERT_OK(RegisterMyScalarPureFunction());
EXPECT_THAT(func->AddKernel({}, utf8(), func_exec), Ok());
EXPECT_THAT(GetFunctionRegistry()->AddFunction(std::move(func)), Ok());

EXPECT_THAT(CallFunction("my_scalar_function", ExecBatch({})),
ResultWith(MakeScalar("I am a scalar")));
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/function.cc
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ Status ScalarFunction::AddKernel(ScalarKernel kernel) {
if (arity_.is_varargs && !kernel.signature->is_varargs()) {
return Status::Invalid("Function accepts varargs but kernel signature does not");
}
kernel.is_pure = is_pure();
kernels_.emplace_back(std::move(kernel));
return Status::OK();
}
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/compute/kernel.h
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,11 @@ struct ARROW_EXPORT ScalarKernel : public Kernel {
// bitmaps is a reasonable default
NullHandling::type null_handling = NullHandling::INTERSECTION;
MemAllocation::type mem_allocation = MemAllocation::PREALLOCATE;

// Scalar functions' pure property for return a reasonable value when
// the function's output length cannot be determined.
// This mark can be fetched from ScalarFunction and set in AddKernel.
bool is_pure = true;
};

// ----------------------------------------------------------------------
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/arrow/compute/kernels/scalar_compare.cc
Original file line number Diff line number Diff line change
Expand Up @@ -705,11 +705,11 @@ struct BinaryScalarMinMax {
template <typename Op>
struct FixedSizeBinaryScalarMinMax {
static Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
if (batch.IsNull()) {
std::shared_ptr<Scalar> result = MakeNullScalar(out->type()->GetSharedPtr());
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> arr_result,
MakeArrayFromScalar(*result, 1));
out->value = std::move(arr_result->data());
if (batch.num_values() == 0) {
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<Array> arr,
MakeArrayOfNull(out->type()->GetSharedPtr(), 1, ctx->memory_pool()));
out->value = std::move(arr->data());
return Status::OK();
}

Expand Down
11 changes: 5 additions & 6 deletions cpp/src/arrow/compute/kernels/scalar_nested.cc
Original file line number Diff line number Diff line change
Expand Up @@ -571,12 +571,11 @@ Result<TypeHolder> MakeStructResolve(KernelContext* ctx,
}

Status MakeStructExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
if (batch.IsNull()) {
ScalarVector value;
std::shared_ptr<Scalar> result = std::make_shared<StructScalar>(value, struct_({}));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> arr_result,
MakeArrayFromScalar(*result, 1));
out->value = std::move(arr_result->data());
if (batch.num_values() == 0) {
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<Array> arr,
MakeArrayOfNull(out->type()->GetSharedPtr(), 1, ctx->memory_pool()));
out->value = std::move(arr->data());
return Status::OK();
}

Expand Down
7 changes: 3 additions & 4 deletions cpp/src/arrow/compute/kernels/scalar_nested_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -843,12 +843,11 @@ TEST(MakeStruct, Scalar) {
// Three field names but one input value
EXPECT_THAT(MakeStructor({str}, {"i", "f", "s"}), Raises(StatusCode::Invalid));

// ARROW-16757: No input values yields empty struct array of length 1
ScalarVector value;
auto empty_scalar = std::make_shared<StructScalar>(value, struct_({}));
// ARROW-40687: No input values yields empty scalar
auto empty_expect = MakeNullScalar(struct_({}));
ASSERT_OK_AND_ASSIGN(Datum empty_actual,
CallFunction("make_struct", std::vector<Datum>({})));
AssertDatumsEqual(Datum(empty_scalar), empty_actual);
AssertDatumsEqual(Datum(empty_actual), empty_expect);
}

TEST(MakeStruct, Array) {
Expand Down
8 changes: 0 additions & 8 deletions cpp/src/arrow/compute/kernels/scalar_random.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,6 @@ random::pcg64_oneseq MakeSeedGenerator() {
}

Status ExecRandom(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
if (batch.IsNull()) {
std::shared_ptr<Scalar> result = MakeNullScalar(out->type()->GetSharedPtr());
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> arr_result,
MakeArrayFromScalar(*result, 1));
out->array_span_mutable()->FillFromScalar(*result);
return Status::OK();
}

static random::pcg64_oneseq seed_gen = MakeSeedGenerator();
static std::mutex seed_gen_mutex;

Expand Down

0 comments on commit d5c8ac2

Please sign in to comment.