diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 05ea319abf363..0f13f043d0969 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -336,7 +336,7 @@ bool CheckIfAllScalar(const ExecBatch& batch) { return false; } } - return batch.num_values() > 0 || batch.IsNull(); + return batch.num_values() >= 0; } } // namespace @@ -825,12 +825,13 @@ class ScalarExecutor : public KernelExecutorImpl { protected: Status EmitResult(std::shared_ptr out, ExecListener* listener) { - if (span_iterator_.have_all_scalars()) { + if (span_iterator_.have_all_scalars() && kernel_->is_pure) { // ARROW-16757 We boxed scalar inputs as ArraySpan, so now we have to // unbox the output as a scalar ARROW_ASSIGN_OR_RAISE(std::shared_ptr scalar, MakeArray(out)->GetScalar(0)); return listener->OnResult(std::move(scalar)); } else { + // ARROW-40687 Ensure impure scalar function's result is array return listener->OnResult(std::move(out)); } } diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index 0b82ed2212e8d..3fbefe4a1ab7b 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -257,10 +257,6 @@ struct ARROW_EXPORT ExecBatch { return result; } - /// \brief A check for the empty ExecBatch with no output length - /// and function's arguments. - bool IsNull() const { return length == 0 && num_values() == 0; } - std::string ToString() const; }; @@ -424,8 +420,6 @@ struct ARROW_EXPORT ExecSpan { return result; } - bool IsNull() const { return length == 0 && num_values() == 0; } - int64_t length = 0; std::vector values; }; diff --git a/cpp/src/arrow/compute/exec_test.cc b/cpp/src/arrow/compute/exec_test.cc index 7e45b112f1f17..908035b1ba655 100644 --- a/cpp/src/arrow/compute/exec_test.cc +++ b/cpp/src/arrow/compute/exec_test.cc @@ -1426,28 +1426,19 @@ TEST(Ordering, IsSuborderOf) { CheckOrdering(unordered, {true, true, true, true, true, true}); } -static Status RegisterMyScalarPureFunction() { - const std::string name = "my_scalar_function"; - auto func = std::make_shared(name, Arity::Nullary(), - FunctionDoc::Empty(), nullptr); - auto func_exec = [](KernelContext* /*ctx*/, const ExecSpan& /*batch*/, - ExecResult* out) -> Status { - auto scalar = MakeScalar("I am a scalar"); - ARROW_ASSIGN_OR_RAISE(auto arr_res, MakeArrayFromScalar(*scalar, 1)); - out->value = std::move(arr_res->data()); +TEST(CallFunction, ScalarFunctionWithZeroLengthBatch) { + // Ensure a scalar function called with a zero length input batch + // does not get skipped if it is called with zero arguments + auto func = std::make_shared("my_scalar_function", Arity::Nullary(), + FunctionDoc::Empty()); + + auto func_exec = [](KernelContext*, const ExecSpan&, ExecResult* out) { + out->value = ArrayFromJSON(utf8(), R"(["I am a scalar"])")->data(); return Status::OK(); }; - ScalarKernel kernel({}, utf8(), func_exec); - ARROW_RETURN_NOT_OK(func->AddKernel(kernel)); - - auto registry = GetFunctionRegistry(); - ARROW_RETURN_NOT_OK(registry->AddFunction(std::move(func))); - return Status::OK(); -} - -TEST(CallFunction, ScalarPureFunction) { - ASSERT_OK(RegisterMyScalarPureFunction()); + EXPECT_THAT(func->AddKernel({}, utf8(), func_exec), Ok()); + EXPECT_THAT(GetFunctionRegistry()->AddFunction(std::move(func)), Ok()); EXPECT_THAT(CallFunction("my_scalar_function", ExecBatch({})), ResultWith(MakeScalar("I am a scalar"))); diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index e1a2e8c5d8879..28a443b9bda2d 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -426,6 +426,7 @@ Status ScalarFunction::AddKernel(ScalarKernel kernel) { if (arity_.is_varargs && !kernel.signature->is_varargs()) { return Status::Invalid("Function accepts varargs but kernel signature does not"); } + kernel.is_pure = is_pure(); kernels_.emplace_back(std::move(kernel)); return Status::OK(); } diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index 1adb3e96c97c8..6a4bec331aaf3 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -566,6 +566,11 @@ struct ARROW_EXPORT ScalarKernel : public Kernel { // bitmaps is a reasonable default NullHandling::type null_handling = NullHandling::INTERSECTION; MemAllocation::type mem_allocation = MemAllocation::PREALLOCATE; + + // Scalar functions' pure property for return a reasonable value when + // the function's output length cannot be determined. + // This mark can be fetched from ScalarFunction and set in AddKernel. + bool is_pure = true; }; // ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/compute/kernels/scalar_compare.cc b/cpp/src/arrow/compute/kernels/scalar_compare.cc index 6aada8f8f1026..aecd423372792 100644 --- a/cpp/src/arrow/compute/kernels/scalar_compare.cc +++ b/cpp/src/arrow/compute/kernels/scalar_compare.cc @@ -705,11 +705,11 @@ struct BinaryScalarMinMax { template struct FixedSizeBinaryScalarMinMax { static Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { - if (batch.IsNull()) { - std::shared_ptr result = MakeNullScalar(out->type()->GetSharedPtr()); - ARROW_ASSIGN_OR_RAISE(std::shared_ptr arr_result, - MakeArrayFromScalar(*result, 1)); - out->value = std::move(arr_result->data()); + if (batch.num_values() == 0) { + ARROW_ASSIGN_OR_RAISE( + std::shared_ptr arr, + MakeArrayOfNull(out->type()->GetSharedPtr(), 1, ctx->memory_pool())); + out->value = std::move(arr->data()); return Status::OK(); } diff --git a/cpp/src/arrow/compute/kernels/scalar_nested.cc b/cpp/src/arrow/compute/kernels/scalar_nested.cc index 9a5b8d0ca039f..456ab525d251c 100644 --- a/cpp/src/arrow/compute/kernels/scalar_nested.cc +++ b/cpp/src/arrow/compute/kernels/scalar_nested.cc @@ -571,12 +571,11 @@ Result MakeStructResolve(KernelContext* ctx, } Status MakeStructExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { - if (batch.IsNull()) { - ScalarVector value; - std::shared_ptr result = std::make_shared(value, struct_({})); - ARROW_ASSIGN_OR_RAISE(std::shared_ptr arr_result, - MakeArrayFromScalar(*result, 1)); - out->value = std::move(arr_result->data()); + if (batch.num_values() == 0) { + ARROW_ASSIGN_OR_RAISE( + std::shared_ptr arr, + MakeArrayOfNull(out->type()->GetSharedPtr(), 1, ctx->memory_pool())); + out->value = std::move(arr->data()); return Status::OK(); } diff --git a/cpp/src/arrow/compute/kernels/scalar_nested_test.cc b/cpp/src/arrow/compute/kernels/scalar_nested_test.cc index 1412d907df1eb..78d51aee9ea07 100644 --- a/cpp/src/arrow/compute/kernels/scalar_nested_test.cc +++ b/cpp/src/arrow/compute/kernels/scalar_nested_test.cc @@ -843,12 +843,11 @@ TEST(MakeStruct, Scalar) { // Three field names but one input value EXPECT_THAT(MakeStructor({str}, {"i", "f", "s"}), Raises(StatusCode::Invalid)); - // ARROW-16757: No input values yields empty struct array of length 1 - ScalarVector value; - auto empty_scalar = std::make_shared(value, struct_({})); + // ARROW-40687: No input values yields empty scalar + auto empty_expect = MakeNullScalar(struct_({})); ASSERT_OK_AND_ASSIGN(Datum empty_actual, CallFunction("make_struct", std::vector({}))); - AssertDatumsEqual(Datum(empty_scalar), empty_actual); + AssertDatumsEqual(Datum(empty_actual), empty_expect); } TEST(MakeStruct, Array) { diff --git a/cpp/src/arrow/compute/kernels/scalar_random.cc b/cpp/src/arrow/compute/kernels/scalar_random.cc index e3c5f5a439c4d..517cf068673c5 100644 --- a/cpp/src/arrow/compute/kernels/scalar_random.cc +++ b/cpp/src/arrow/compute/kernels/scalar_random.cc @@ -57,14 +57,6 @@ random::pcg64_oneseq MakeSeedGenerator() { } Status ExecRandom(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { - if (batch.IsNull()) { - std::shared_ptr result = MakeNullScalar(out->type()->GetSharedPtr()); - ARROW_ASSIGN_OR_RAISE(std::shared_ptr arr_result, - MakeArrayFromScalar(*result, 1)); - out->array_span_mutable()->FillFromScalar(*result); - return Status::OK(); - } - static random::pcg64_oneseq seed_gen = MakeSeedGenerator(); static std::mutex seed_gen_mutex;