From 8f75c0865f7e2a4e4f9dd3a8faf9155f53ba277f Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Tue, 10 Dec 2019 16:03:36 +0800 Subject: [PATCH] [C++] adding probe kernel return an array of row idx if there are matches based on quick hacks of isin kernel Signed-off-by: Yuan Zhou --- cpp/src/arrow/CMakeLists.txt | 1 + cpp/src/arrow/compute/kernels/probe.cc | 324 +++++++++++++++++++++++++ cpp/src/arrow/compute/kernels/probe.h | 49 ++++ 3 files changed, 374 insertions(+) create mode 100644 cpp/src/arrow/compute/kernels/probe.cc create mode 100644 cpp/src/arrow/compute/kernels/probe.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 4d53bdb600033..66ed07965bb28 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -280,6 +280,7 @@ if(ARROW_COMPUTE) compute/kernels/add.cc compute/kernels/take.cc compute/kernels/isin.cc + compute/kernels/probe.cc compute/kernels/util_internal.cc compute/operations/cast.cc compute/operations/literal.cc) diff --git a/cpp/src/arrow/compute/kernels/probe.cc b/cpp/src/arrow/compute/kernels/probe.cc new file mode 100644 index 0000000000000..c497ec7c7b4c3 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/probe.cc @@ -0,0 +1,324 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/kernels/probe.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/array.h" +#include "arrow/array/dict_internal.h" +#include "arrow/buffer.h" +#include "arrow/builder.h" +#include "arrow/compute/context.h" +#include "arrow/compute/kernel.h" +#include "arrow/compute/kernels/util_internal.h" +#include "arrow/memory_pool.h" +#include "arrow/type.h" +#include "arrow/type_traits.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/hashing.h" +#include "arrow/util/logging.h" +#include "arrow/util/macros.h" +#include "arrow/util/string_view.h" +#include "arrow/visitor_inline.h" + +namespace arrow { + +using internal::checked_cast; +using internal::DictionaryTraits; +using internal::HashTraits; + +namespace compute { + +class PorbeKernelImpl : public UnaryKernel { + virtual Status Compute(FunctionContext* ctx, const Datum& left, Datum* out) = 0; + + public: + // \brief Check if value in both arrays or not and returns boolean values/null + Status Call(FunctionContext* ctx, const Datum& left, Datum* out) override { + DCHECK_EQ(Datum::ARRAY, left.kind()); + RETURN_NOT_OK(Compute(ctx, left, out)); + return Status::OK(); + } + + std::shared_ptr out_type() const override { return boolean(); } + + virtual Status ConstructRight(FunctionContext* ctx, const Datum& right) = 0; +}; + +// ---------------------------------------------------------------------- +// Using a visitor create a memo_table_ for the right array +// TODO: Implement for small lists + +template +struct MemoTableRight { + Status VisitNull() { return Status::OK(); } + + Status VisitValue(const Scalar& value) { + memo_table_->GetOrInsert(value); + return Status::OK(); + } + + Status Reset(MemoryPool* pool) { + memo_table_.reset(new MemoTable(pool, 0)); + return Status::OK(); + } + + Status Append(FunctionContext* ctx, const Datum& right) { + const ArrayData& right_data = *right.array(); + right_null_count += right_data.GetNullCount(); + return ArrayDataVisitor::Visit(right_data, this); + } + + using MemoTable = typename HashTraits::MemoTableType; + std::unique_ptr memo_table_; + int64_t right_null_count{}; +}; + +// ---------------------------------------------------------------------- + +template +class PorbeKernel : public PorbeKernelImpl { + public: + PorbeKernel(const std::shared_ptr& type, MemoryPool* pool) + : type_(type), pool_(pool) {} + + // \brief if left array has a null return true + Status VisitNull() { + writer->Set(); + writer->Next(); + return Status::OK(); + } + + // \brief Iterate over the left array using another visitor. + // In VisitValue, use the memo_table_ (for right array) and check if value + // in left array is in the memo_table_. Return true if condition satisfied, + // else false. + Status VisitValue(const Scalar& value) { + if (memo_table_->Get(value) != -1) { + indices_builder_.Append(memo_table_->Get(value)); + std::cout << "xxxx found\n"; + //writer->Set(); + } else { + indices_builder_.Append(-1); + //writer->Clear(); + } + //writer->Next(); + return Status::OK(); + } + + Status Compute(FunctionContext* ctx, const Datum& left, Datum* out) override { + const ArrayData& left_data = *left.array(); + + output = out->array(); + output->type = boolean(); + + //writer = std::make_shared( + // output.get()->buffers[1]->mutable_data(), output.get()->offset, left_data.length); + + RETURN_NOT_OK(ArrayDataVisitor::Visit(left_data, this)); + //writer->Finish(); + + RETURN_NOT_OK(indices_builder_.FinishInternal(&output)); + out->value = std::move(output); + + // if right null count is zero and left null count is not zero, propagate nulls + if (right_null_count_ == 0 && left_data.GetNullCount() != 0) { + RETURN_NOT_OK(detail::PropagateNulls(ctx, left_data, output.get())); + } + return Status::OK(); + } + + Status ConstructRight(FunctionContext* ctx, const Datum& right) override { + MemoTableRight func; + RETURN_NOT_OK(func.Reset(pool_)); + + if (right.kind() == Datum::ARRAY) { + RETURN_NOT_OK(func.Append(ctx, right)); + } else if (right.kind() == Datum::CHUNKED_ARRAY) { + const ChunkedArray& right_array = *right.chunked_array(); + for (int i = 0; i < right_array.num_chunks(); i++) { + RETURN_NOT_OK(func.Append(ctx, right_array.chunk(i))); + } + } else { + return Status::Invalid("Input Datum was not array-like"); + } + + memo_table_ = std::move(func.memo_table_); + right_null_count_ = func.right_null_count; + return Status::OK(); + } + + protected: + using MemoTable = typename HashTraits::MemoTableType; + std::unique_ptr memo_table_; + std::shared_ptr type_; + MemoryPool* pool_; + + private: + // \brief Additional member "right_null_count" is used to check if + // null count in right is not 0 + int64_t right_null_count_{}; + std::shared_ptr writer; + std::shared_ptr output; + Int32Builder indices_builder_; +}; + +// ---------------------------------------------------------------------- +// (NullType has a separate implementation) + +class NullPorbeKernel : public PorbeKernelImpl { + public: + NullPorbeKernel(const std::shared_ptr& type, MemoryPool* pool) {} + + // \brief When array is NullType, based on the null count for the arrays, + // return true, else propagate to all nulls + Status Compute(FunctionContext* ctx, const Datum& left, Datum* out) override { + const ArrayData& left_data = *left.array(); + left_null_count = left_data.GetNullCount(); + + output = out->array(); + output->type = boolean(); + + writer = std::make_shared( + output.get()->buffers[1]->mutable_data(), output.get()->offset, left_data.length); + + if (left_null_count != 0 && right_null_count == 0) { + RETURN_NOT_OK(detail::PropagateNulls(ctx, left_data, output.get())); + } else { + for (int64_t i = 0; i < left_data.length; ++i) { + writer->Set(); + writer->Next(); + } + writer->Finish(); + } + return Status::OK(); + } + + Status ConstructRight(FunctionContext* ctx, const Datum& right) override { + if (right.kind() == Datum::ARRAY) { + const ArrayData& right_data = *right.array(); + right_null_count = right_data.GetNullCount(); + } else if (right.kind() == Datum::CHUNKED_ARRAY) { + const ChunkedArray& right_array = *right.chunked_array(); + for (int i = 0; i < right_array.num_chunks(); i++) { + right_null_count += right_array.chunk(i)->null_count(); + } + } else { + return Status::Invalid("Input Datum was not array-like"); + } + return Status::OK(); + } + + private: + int64_t left_null_count{}; + int64_t right_null_count{}; + std::shared_ptr writer; + std::shared_ptr output; +}; + +// ---------------------------------------------------------------------- +// Kernel wrapper for generic hash table kernels + +template +struct PorbeKernelTraits {}; + +template +struct PorbeKernelTraits> { + using PorbeKernelImpl = NullPorbeKernel; +}; + +template +struct PorbeKernelTraits> { + using PorbeKernelImpl = PorbeKernel; +}; + +template +struct PorbeKernelTraits> { + using PorbeKernelImpl = PorbeKernel; +}; + +Status GetPorbeKernel(FunctionContext* ctx, const std::shared_ptr& type, + const Datum& right, std::unique_ptr* out) { + std::unique_ptr kernel; + +#define ISIN_CASE(InType) \ + case InType::type_id: \ + kernel.reset(new typename PorbeKernelTraits::PorbeKernelImpl( \ + type, ctx->memory_pool())); \ + break + + switch (type->id()) { + ISIN_CASE(NullType); + ISIN_CASE(BooleanType); + ISIN_CASE(UInt8Type); + ISIN_CASE(Int8Type); + ISIN_CASE(UInt16Type); + ISIN_CASE(Int16Type); + ISIN_CASE(UInt32Type); + ISIN_CASE(Int32Type); + ISIN_CASE(UInt64Type); + ISIN_CASE(Int64Type); + ISIN_CASE(FloatType); + ISIN_CASE(DoubleType); + ISIN_CASE(Date32Type); + ISIN_CASE(Date64Type); + ISIN_CASE(Time32Type); + ISIN_CASE(Time64Type); + ISIN_CASE(TimestampType); + ISIN_CASE(BinaryType); + ISIN_CASE(StringType); + ISIN_CASE(FixedSizeBinaryType); + ISIN_CASE(Decimal128Type); + default: + break; + } +#undef ISIN_CASE + + if (!kernel) { + return Status::NotImplemented("IsIn is not implemented for ", type->ToString()); + } + RETURN_NOT_OK(kernel->ConstructRight(ctx, right)); + *out = std::move(kernel); + return Status::OK(); +} + +Status Probe(FunctionContext* ctx, const Datum& left, const Datum& right, Datum* out) { + DCHECK(left.type()->Equals(right.type())); + std::vector outputs; + std::unique_ptr lkernel; + + RETURN_NOT_OK(GetPorbeKernel(ctx, left.type(), right, &lkernel)); + detail::PrimitiveAllocatingUnaryKernel kernel(lkernel.get()); + RETURN_NOT_OK(detail::InvokeUnaryArrayKernel(ctx, &kernel, left, &outputs)); + + *out = detail::WrapDatumsLike(left, outputs); + return Status::OK(); +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/probe.h b/cpp/src/arrow/compute/kernels/probe.h new file mode 100644 index 0000000000000..e26be260f1b7c --- /dev/null +++ b/cpp/src/arrow/compute/kernels/probe.h @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "arrow/array.h" +#include "arrow/compute/context.h" +#include "arrow/compute/kernel.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace compute { + +/// \brief IsIn returns boolean values if the value +/// is in both left and right arrays. +/// +/// If null occurs in left, if null count in right is not 0, +/// it returns true, else returns null. +/// +/// \param[in] context the FunctionContext +/// \param[in] left array-like input +/// \param[in] right array-like input +/// \param[out] out resulting datum +/// +/// \since 1.0.0 +/// \note API not yet finalized +ARROW_EXPORT +Status Probe(FunctionContext* context, const Datum& left, const Datum& right, Datum* out); + +} // namespace compute +} // namespace arrow