From 1e73d1bce3fd85c88f8e16809f1cb94312750aaf Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 15 Jul 2024 17:40:39 +0800 Subject: [PATCH] Refine files --- cpp/src/arrow/CMakeLists.txt | 2 - .../arrow/compute/kernels/vector_scatter.cc | 307 ++++- .../vector_scatter_by_mask_internal.cc | 1132 ----------------- .../kernels/vector_scatter_by_mask_internal.h | 37 - .../kernels/vector_scatter_internal.cc | 888 ------------- .../compute/kernels/vector_scatter_internal.h | 71 -- 6 files changed, 292 insertions(+), 2145 deletions(-) delete mode 100644 cpp/src/arrow/compute/kernels/vector_scatter_by_mask_internal.cc delete mode 100644 cpp/src/arrow/compute/kernels/vector_scatter_by_mask_internal.h delete mode 100644 cpp/src/arrow/compute/kernels/vector_scatter_internal.cc delete mode 100644 cpp/src/arrow/compute/kernels/vector_scatter_internal.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 61617c9fdf001..b34db61691e18 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -735,8 +735,6 @@ set(ARROW_COMPUTE_SRCS compute/kernels/util_internal.cc compute/kernels/vector_hash.cc compute/kernels/vector_scatter.cc - compute/kernels/vector_scatter_by_mask_internal.cc - compute/kernels/vector_scatter_internal.cc compute/kernels/vector_selection.cc compute/kernels/vector_selection_filter_internal.cc compute/kernels/vector_selection_internal.cc diff --git a/cpp/src/arrow/compute/kernels/vector_scatter.cc b/cpp/src/arrow/compute/kernels/vector_scatter.cc index 46a72d83ed15e..a2840196629fe 100644 --- a/cpp/src/arrow/compute/kernels/vector_scatter.cc +++ b/cpp/src/arrow/compute/kernels/vector_scatter.cc @@ -32,7 +32,6 @@ #include "arrow/compute/api_vector.h" #include "arrow/compute/kernels/common_internal.h" #include "arrow/compute/kernels/util_internal.h" -#include "arrow/compute/kernels/vector_scatter_by_mask_internal.h" #include "arrow/extension_type.h" #include "arrow/record_batch.h" #include "arrow/result.h" @@ -61,30 +60,308 @@ namespace internal { namespace { -// ---------------------------------------------------------------------- +struct ScatterKernelData { + InputType value_type; + InputType selection_type; + ArrayKernelExec exec; +}; -const FunctionDoc array_scatter_by_mask_doc( +const FunctionDoc array_scatter_doc( "Scatter with a boolean positional mask", ("The values of the input `array` will be placed into the output at positions where " "the `positional_mask` is non-zero. The rest positions of the output will be " "populated by `null`s.\n"), {"array", "positional_mask"}); +// ---------------------------------------------------------------------- +// Optimized and streamlined scatter for primitive types + +Status PrimitiveScatterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { + const ArraySpan& values = batch[0].array; + const ArraySpan& filter = batch[1].array; + const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED; + + int64_t output_length = GetFilterOutputSize(filter, null_selection); + + ArrayData* out_arr = out->array_data().get(); + + const bool filter_null_count_is_zero = + is_ree_filter ? filter.child_data[1].null_count == 0 : filter.null_count == 0; + + // The output precomputed null count is unknown except in the narrow + // condition that all the values are non-null and the filter will not cause + // any new nulls to be created. + if (values.null_count == 0 && + (null_selection == FilterOptions::DROP || filter_null_count_is_zero)) { + out_arr->null_count = 0; + } else { + out_arr->null_count = kUnknownNullCount; + } + + // When neither the values nor filter is known to have any nulls, we will + // elect the optimized non-null path where there is no need to populate a + // validity bitmap. + const bool allocate_validity = values.null_count != 0 || !filter_null_count_is_zero; + + DCHECK(util::IsFixedWidthLike(values)); + const int64_t bit_width = util::FixedWidthInBits(*values.type); + RETURN_NOT_OK(util::internal::PreallocateFixedWidthArrayData( + ctx, output_length, /*source=*/values, allocate_validity, out_arr)); + + switch (bit_width) { + case 1: + PrimitiveFilterImpl<1, /*kIsBoolean=*/true>(values, filter, null_selection, out_arr) + .Exec(); + break; + case 8: + PrimitiveFilterImpl<1>(values, filter, null_selection, out_arr).Exec(); + break; + case 16: + PrimitiveFilterImpl<2>(values, filter, null_selection, out_arr).Exec(); + break; + case 32: + PrimitiveFilterImpl<4>(values, filter, null_selection, out_arr).Exec(); + break; + case 64: + PrimitiveFilterImpl<8>(values, filter, null_selection, out_arr).Exec(); + break; + case 128: + // For INTERVAL_MONTH_DAY_NANO, DECIMAL128 + PrimitiveFilterImpl<16>(values, filter, null_selection, out_arr).Exec(); + break; + case 256: + // For DECIMAL256 + PrimitiveFilterImpl<32>(values, filter, null_selection, out_arr).Exec(); + break; + default: + // Non-specializing on byte width + PrimitiveFilterImpl<-1>(values, filter, null_selection, out_arr).Exec(); + break; + } + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// Implement Scatter metafunction + +Result> ScatterRecordBatch(const RecordBatch& batch, + const Datum& filter, + const FunctionOptions* options, + ExecContext* ctx) { + if (batch.num_rows() != filter.length()) { + return Status::Invalid("Filter inputs must all be the same length"); + } + + // Fetch filter + const auto& filter_opts = *static_cast(options); + ArrayData filter_array; + switch (filter.kind()) { + case Datum::ARRAY: + filter_array = *filter.array(); + break; + case Datum::CHUNKED_ARRAY: { + ARROW_ASSIGN_OR_RAISE(auto combined, Concatenate(filter.chunked_array()->chunks())); + filter_array = *combined->data(); + break; + } + default: + return Status::TypeError("Filter should be array-like"); + } + + // Convert filter to selection vector/indices and use Take + ARROW_ASSIGN_OR_RAISE(std::shared_ptr indices, + GetTakeIndices(filter_array, filter_opts.null_selection_behavior, + ctx->memory_pool())); + std::vector> columns(batch.num_columns()); + for (int i = 0; i < batch.num_columns(); ++i) { + ARROW_ASSIGN_OR_RAISE(Datum out, Take(batch.column(i)->data(), Datum(indices), + TakeOptions::NoBoundsCheck(), ctx)); + columns[i] = out.make_array(); + } + return RecordBatch::Make(batch.schema(), indices->length, std::move(columns)); +} + +Result> ScatterTable(const Table& table, const Datum& filter, + const FunctionOptions* options, + ExecContext* ctx) { + if (table.num_rows() != filter.length()) { + return Status::Invalid("Filter inputs must all be the same length"); + } + if (table.num_rows() == 0) { + return Table::Make(table.schema(), table.columns(), 0); + } + + // Last input element will be the filter array + const int num_columns = table.num_columns(); + std::vector inputs(num_columns + 1); + + // Fetch table columns + for (int i = 0; i < num_columns; ++i) { + inputs[i] = table.column(i)->chunks(); + } + // Fetch filter + const auto& filter_opts = *static_cast(options); + switch (filter.kind()) { + case Datum::ARRAY: + inputs.back().push_back(filter.make_array()); + break; + case Datum::CHUNKED_ARRAY: + inputs.back() = filter.chunked_array()->chunks(); + break; + default: + return Status::TypeError("Filter should be array-like"); + } + + // Rechunk inputs to allow consistent iteration over their respective chunks + inputs = arrow::internal::RechunkArraysConsistently(inputs); + + // Instead of filtering each column with the boolean filter + // (which would be slow if the table has a large number of columns: ARROW-10569), + // convert each filter chunk to indices, and take() the column. + const int64_t num_chunks = static_cast(inputs.back().size()); + std::vector out_columns(num_columns); + int64_t out_num_rows = 0; + + for (int64_t i = 0; i < num_chunks; ++i) { + const ArrayData& filter_chunk = *inputs.back()[i]->data(); + ARROW_ASSIGN_OR_RAISE( + const auto indices, + GetTakeIndices(filter_chunk, filter_opts.null_selection_behavior, + ctx->memory_pool())); + + if (indices->length > 0) { + // Take from all input columns + Datum indices_datum{std::move(indices)}; + for (int col = 0; col < num_columns; ++col) { + const auto& column_chunk = inputs[col][i]; + ARROW_ASSIGN_OR_RAISE(Datum out, Take(column_chunk, indices_datum, + TakeOptions::NoBoundsCheck(), ctx)); + out_columns[col].push_back(std::move(out).make_array()); + } + out_num_rows += indices->length; + } + } + + ChunkedArrayVector out_chunks(num_columns); + for (int i = 0; i < num_columns; ++i) { + out_chunks[i] = std::make_shared(std::move(out_columns[i]), + table.column(i)->type()); + } + return Table::Make(table.schema(), std::move(out_chunks), out_num_rows); +} + +const FunctionDoc scatter_doc( + "Scatter with a boolean selection filter", + ("The output is populated with values from the input at positions\n" + "where the selection filter is non-zero. Nulls in the selection filter\n" + "are handled based on FilterOptions."), + {"input", "selection_filter"}, "FilterOptions"); + +class ScatterMetaFunction : public MetaFunction { + public: + ScatterMetaFunction() + : MetaFunction("scatter", Arity::Binary(), scatter_doc, NULLPTR) {} + + Result ExecuteImpl(const std::vector& args, + const FunctionOptions* options, + ExecContext* ctx) const override { + if (args[1].kind() != Datum::ARRAY && args[1].kind() != Datum::CHUNKED_ARRAY) { + return Status::TypeError("Filter should be array-like"); + } + + const auto& filter_type = *args[1].type(); + const bool filter_is_plain_bool = filter_type.id() == Type::BOOL; + const bool filter_is_ree_bool = + filter_type.id() == Type::RUN_END_ENCODED && + checked_cast(filter_type).value_type()->id() == + Type::BOOL; + if (!filter_is_plain_bool && !filter_is_ree_bool) { + return Status::NotImplemented("Filter argument must be boolean type"); + } + + if (args[0].kind() == Datum::RECORD_BATCH) { + ARROW_ASSIGN_OR_RAISE( + std::shared_ptr out_batch, + ScatterRecordBatch(*args[0].record_batch(), args[1], options, ctx)); + return Datum(out_batch); + } else if (args[0].kind() == Datum::TABLE) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr out_table, + ScatterTable(*args[0].table(), args[1], options, ctx)); + return Datum(out_table); + } else { + return CallFunction("array_scatter", args, options, ctx); + } + } +}; + +// ---------------------------------------------------------------------- + } // namespace void RegisterVectorScatter(FunctionRegistry* registry) { - // Scatter by mask kernels - std::vector scatter_by_mask_kernels; - PopulateScatterByMaskKernels(&scatter_by_mask_kernels); - - VectorKernel scatter_by_mask_base; - scatter_by_mask_base.can_execute_chunkwise = false; - scatter_by_mask_base.output_chunked = false; - RegisterScatterFunction("array_scatter_by_mask", array_scatter_by_mask_doc, - scatter_by_mask_base, std::move(scatter_by_mask_kernels), - NULLPTR, registry); - - DCHECK_OK(registry->AddFunction(MakeScatterByMaskMetaFunction())); + // array_scatter + { + auto plain_mask = InputType(Type::BOOL); + // auto ree_mask = InputType(match::RunEndEncoded(Type::BOOL)); + std::vector array_scatter_kernels{ + // * x Boolean + {InputType(match::Primitive()), plain_mask, PrimitiveScatterExec}, + // {InputType(match::BinaryLike()), plain_filter, BinaryFilterExec}, + // {InputType(match::LargeBinaryLike()), plain_filter, BinaryFilterExec}, + // {InputType(null()), plain_filter, NullFilterExec}, + // {InputType(Type::FIXED_SIZE_BINARY), plain_filter, PrimitiveFilterExec}, + // {InputType(Type::DECIMAL128), plain_filter, PrimitiveFilterExec}, + // {InputType(Type::DECIMAL256), plain_filter, PrimitiveFilterExec}, + // {InputType(Type::DICTIONARY), plain_filter, DictionaryFilterExec}, + // {InputType(Type::EXTENSION), plain_filter, ExtensionFilterExec}, + // {InputType(Type::LIST), plain_filter, ListFilterExec}, + // {InputType(Type::LARGE_LIST), plain_filter, LargeListFilterExec}, + // {InputType(Type::LIST_VIEW), plain_filter, ListViewFilterExec}, + // {InputType(Type::LARGE_LIST_VIEW), plain_filter, LargeListViewFilterExec}, + // {InputType(Type::FIXED_SIZE_LIST), plain_filter, FSLFilterExec}, + // {InputType(Type::DENSE_UNION), plain_filter, DenseUnionFilterExec}, + // {InputType(Type::SPARSE_UNION), plain_filter, SparseUnionFilterExec}, + // {InputType(Type::STRUCT), plain_filter, StructFilterExec}, + // {InputType(Type::MAP), plain_filter, MapFilterExec}, + + // * x REE(Boolean) + // {InputType(match::Primitive()), ree_filter, PrimitiveFilterExec}, + // {InputType(match::BinaryLike()), ree_filter, BinaryFilterExec}, + // {InputType(match::LargeBinaryLike()), ree_filter, BinaryFilterExec}, + // {InputType(null()), ree_filter, NullFilterExec}, + // {InputType(Type::FIXED_SIZE_BINARY), ree_filter, PrimitiveFilterExec}, + // {InputType(Type::DECIMAL128), ree_filter, PrimitiveFilterExec}, + // {InputType(Type::DECIMAL256), ree_filter, PrimitiveFilterExec}, + // {InputType(Type::DICTIONARY), ree_filter, DictionaryFilterExec}, + // {InputType(Type::EXTENSION), ree_filter, ExtensionFilterExec}, + // {InputType(Type::LIST), ree_filter, ListFilterExec}, + // {InputType(Type::LARGE_LIST), ree_filter, LargeListFilterExec}, + // {InputType(Type::LIST_VIEW), ree_filter, ListViewFilterExec}, + // {InputType(Type::LARGE_LIST_VIEW), ree_filter, LargeListViewFilterExec}, + // {InputType(Type::FIXED_SIZE_LIST), ree_filter, FSLFilterExec}, + // {InputType(Type::DENSE_UNION), ree_filter, DenseUnionFilterExec}, + // {InputType(Type::SPARSE_UNION), ree_filter, SparseUnionFilterExec}, + // {InputType(Type::STRUCT), ree_filter, StructFilterExec}, + // {InputType(Type::MAP), ree_filter, MapFilterExec}, + }; + + VectorKernel kernal_base; + kernal_base.can_execute_chunkwise = false; + kernal_base.output_chunked = false; + auto array_scatter_func = std::make_shared( + "array_scatter", Arity::Binary(), std::move(array_scatter_doc), NULLPTR); + for (auto&& kernel_data : array_scatter_kernels) { + kernal_base.signature = KernelSignature::Make( + {std::move(kernel_data.value_type), std::move(kernel_data.selection_type)}, + OutputType(FirstType)); + kernal_base.exec = kernel_data.exec; + DCHECK_OK(array_scatter_func->AddKernel(kernal_base)); + } + DCHECK_OK(registry->AddFunction(std::move(array_scatter_func))); + } + + // scatter metafunction. + DCHECK_OK(registry->AddFunction(std::make_shared())); } } // namespace internal diff --git a/cpp/src/arrow/compute/kernels/vector_scatter_by_mask_internal.cc b/cpp/src/arrow/compute/kernels/vector_scatter_by_mask_internal.cc deleted file mode 100644 index 86da8c1f889e6..0000000000000 --- a/cpp/src/arrow/compute/kernels/vector_scatter_by_mask_internal.cc +++ /dev/null @@ -1,1132 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include -#include -#include -#include -#include - -#include "arrow/array/concatenate.h" -#include "arrow/array/data.h" -#include "arrow/buffer_builder.h" -#include "arrow/chunked_array.h" -#include "arrow/compute/api_vector.h" -#include "arrow/compute/exec.h" -#include "arrow/compute/kernel.h" -#include "arrow/compute/kernels/codegen_internal.h" -#include "arrow/compute/kernels/vector_selection_filter_internal.h" -#include "arrow/compute/kernels/vector_selection_internal.h" -#include "arrow/datum.h" -#include "arrow/extension_type.h" -#include "arrow/record_batch.h" -#include "arrow/table.h" -#include "arrow/type.h" -#include "arrow/util/bit_block_counter.h" -#include "arrow/util/bit_run_reader.h" -#include "arrow/util/bit_util.h" -#include "arrow/util/bitmap_ops.h" -#include "arrow/util/fixed_width_internal.h" - -namespace arrow { - -using internal::BinaryBitBlockCounter; -using internal::BitBlockCount; -using internal::BitBlockCounter; -using internal::CopyBitmap; -using internal::CountSetBits; -using internal::OptionalBitBlockCounter; - -namespace compute::internal { - -namespace { - -using FilterState = OptionsWrapper; - -int64_t GetBitmapFilterOutputSize(const ArraySpan& filter, - FilterOptions::NullSelectionBehavior null_selection) { - int64_t output_size = 0; - - if (filter.MayHaveNulls()) { - const uint8_t* filter_is_valid = filter.buffers[0].data; - BinaryBitBlockCounter bit_counter(filter.buffers[1].data, filter.offset, - filter_is_valid, filter.offset, filter.length); - int64_t position = 0; - if (null_selection == FilterOptions::EMIT_NULL) { - while (position < filter.length) { - BitBlockCount block = bit_counter.NextOrNotWord(); - output_size += block.popcount; - position += block.length; - } - } else { - while (position < filter.length) { - BitBlockCount block = bit_counter.NextAndWord(); - output_size += block.popcount; - position += block.length; - } - } - } else { - // The filter has no nulls, so we can use CountSetBits - output_size = CountSetBits(filter.buffers[1].data, filter.offset, filter.length); - } - return output_size; -} - -int64_t GetREEFilterOutputSize(const ArraySpan& filter, - FilterOptions::NullSelectionBehavior null_selection) { - const auto& ree_type = checked_cast(*filter.type); - DCHECK_EQ(ree_type.value_type()->id(), Type::BOOL); - int64_t output_size = 0; - VisitPlainxREEFilterOutputSegments( - filter, /*filter_may_have_nulls=*/true, null_selection, - [&output_size](int64_t, int64_t segment_length, bool) { - output_size += segment_length; - return true; - }); - return output_size; -} - -} // namespace - -int64_t GetFilterOutputSize(const ArraySpan& filter, - FilterOptions::NullSelectionBehavior null_selection) { - if (filter.type->id() == Type::BOOL) { - return GetBitmapFilterOutputSize(filter, null_selection); - } - DCHECK_EQ(filter.type->id(), Type::RUN_END_ENCODED); - return GetREEFilterOutputSize(filter, null_selection); -} - -namespace { - -// ---------------------------------------------------------------------- -// Optimized and streamlined filter for primitive types - -// Use either BitBlockCounter or BinaryBitBlockCounter to quickly scan filter a -// word at a time for the DROP selection type. -class DropNullCounter { - public: - // validity bitmap may be null - DropNullCounter(const uint8_t* validity, const uint8_t* data, int64_t offset, - int64_t length) - : data_counter_(data, offset, length), - data_and_validity_counter_(data, offset, validity, offset, length), - has_validity_(validity != nullptr) {} - - BitBlockCount NextBlock() { - if (has_validity_) { - // filter is true AND not null - return data_and_validity_counter_.NextAndWord(); - } else { - return data_counter_.NextWord(); - } - } - - private: - // For when just data is present, but no validity bitmap - BitBlockCounter data_counter_; - - // For when both validity bitmap and data are present - BinaryBitBlockCounter data_and_validity_counter_; - const bool has_validity_; -}; - -/// \brief The Filter implementation for primitive (fixed-width) types does not -/// use the logical Arrow type but rather the physical C type. This way we only -/// generate one take function for each byte width. -/// -/// We use compile-time specialization for two variations: -/// - operating on boolean data (using kIsBoolean = true) -/// - operating on fixed-width data of arbitrary width (using kByteWidth = -1), -/// with the actual width only known at runtime -template -class PrimitiveFilterImpl { - public: - PrimitiveFilterImpl(const ArraySpan& values, const ArraySpan& filter, - FilterOptions::NullSelectionBehavior null_selection, - ArrayData* out_arr) - : byte_width_(util::FixedWidthInBytes(*values.type)), - values_is_valid_(values.buffers[0].data), - // No offset applied for boolean because it's a bitmap - values_data_(kIsBoolean ? values.buffers[1].data - : util::OffsetPointerOfFixedByteWidthValues(values)), - values_null_count_(values.null_count), - values_offset_(values.offset), - values_length_(values.length), - filter_(filter), - null_selection_(null_selection) { - if constexpr (kByteWidth >= 0 && !kIsBoolean) { - DCHECK_EQ(kByteWidth, byte_width_); - } - - DCHECK_EQ(out_arr->offset, 0); - if (out_arr->buffers[0] != nullptr) { - // May be unallocated if neither filter nor values contain nulls - out_is_valid_ = out_arr->buffers[0]->mutable_data(); - } - out_data_ = util::MutableFixedWidthValuesPointer(out_arr); - out_length_ = out_arr->length; - out_position_ = 0; - } - - void ExecREEFilter() { - if (filter_.child_data[1].null_count == 0 && values_null_count_ == 0) { - DCHECK(!out_is_valid_); - // Fastest: no nulls in either filter or values - return VisitPlainxREEFilterOutputSegments( - filter_, /*filter_may_have_nulls=*/false, null_selection_, - [&](int64_t position, int64_t segment_length, bool filter_valid) { - // Fastest path: all values in range are included and not null - WriteValueSegment(position, segment_length); - DCHECK(filter_valid); - return true; - }); - } - if (values_is_valid_) { - DCHECK(out_is_valid_); - // Slower path: values can be null, so the validity bitmap should be copied - return VisitPlainxREEFilterOutputSegments( - filter_, /*filter_may_have_nulls=*/true, null_selection_, - [&](int64_t position, int64_t segment_length, bool filter_valid) { - if (filter_valid) { - CopyBitmap(values_is_valid_, values_offset_ + position, segment_length, - out_is_valid_, out_position_); - WriteValueSegment(position, segment_length); - } else { - bit_util::SetBitsTo(out_is_valid_, out_position_, segment_length, false); - WriteNullSegment(segment_length); - } - return true; - }); - } - // Faster path: only write to out_is_valid_ if filter contains nulls and - // null_selection is EMIT_NULL - if (out_is_valid_) { - // Set all to valid, so only if nulls are produced by EMIT_NULL, we need - // to set out_is_valid[i] to false. - bit_util::SetBitsTo(out_is_valid_, 0, out_length_, true); - } - return VisitPlainxREEFilterOutputSegments( - filter_, /*filter_may_have_nulls=*/true, null_selection_, - [&](int64_t position, int64_t segment_length, bool filter_valid) { - if (filter_valid) { - WriteValueSegment(position, segment_length); - } else { - bit_util::SetBitsTo(out_is_valid_, out_position_, segment_length, false); - WriteNullSegment(segment_length); - } - return true; - }); - } - - void Exec() { - if (filter_.type->id() == Type::RUN_END_ENCODED) { - return ExecREEFilter(); - } - const auto* filter_is_valid = filter_.buffers[0].data; - const auto* filter_data = filter_.buffers[1].data; - const auto filter_offset = filter_.offset; - if (filter_.null_count == 0 && values_null_count_ == 0) { - // Fast filter when values and filter are not null - ::arrow::internal::VisitSetBitRunsVoid( - filter_data, filter_.offset, values_length_, - [&](int64_t position, int64_t length) { WriteValueSegment(position, length); }); - return; - } - - // Bit counters used for both null_selection behaviors - DropNullCounter drop_null_counter(filter_is_valid, filter_data, filter_offset, - values_length_); - OptionalBitBlockCounter data_counter(values_is_valid_, values_offset_, - values_length_); - OptionalBitBlockCounter filter_valid_counter(filter_is_valid, filter_offset, - values_length_); - - auto WriteNotNull = [&](int64_t index) { - bit_util::SetBit(out_is_valid_, out_position_); - // Increments out_position_ - WriteValue(index); - }; - - auto WriteMaybeNull = [&](int64_t index) { - bit_util::SetBitTo(out_is_valid_, out_position_, - bit_util::GetBit(values_is_valid_, values_offset_ + index)); - // Increments out_position_ - WriteValue(index); - }; - - int64_t in_position = 0; - while (in_position < values_length_) { - BitBlockCount filter_block = drop_null_counter.NextBlock(); - BitBlockCount filter_valid_block = filter_valid_counter.NextWord(); - BitBlockCount data_block = data_counter.NextWord(); - if (filter_block.AllSet() && data_block.AllSet()) { - // Fastest path: all values in block are included and not null - bit_util::SetBitsTo(out_is_valid_, out_position_, filter_block.length, true); - WriteValueSegment(in_position, filter_block.length); - in_position += filter_block.length; - } else if (filter_block.AllSet()) { - // Faster: all values are selected, but some values are null - // Batch copy bits from values validity bitmap to output validity bitmap - CopyBitmap(values_is_valid_, values_offset_ + in_position, filter_block.length, - out_is_valid_, out_position_); - WriteValueSegment(in_position, filter_block.length); - in_position += filter_block.length; - } else if (filter_block.NoneSet() && null_selection_ == FilterOptions::DROP) { - // For this exceedingly common case in low-selectivity filters we can - // skip further analysis of the data and move on to the next block. - in_position += filter_block.length; - } else { - // Some filter values are false or null - if (data_block.AllSet()) { - // No values are null - if (filter_valid_block.AllSet()) { - // Filter is non-null but some values are false - for (int64_t i = 0; i < filter_block.length; ++i) { - if (bit_util::GetBit(filter_data, filter_offset + in_position)) { - WriteNotNull(in_position); - } - ++in_position; - } - } else if (null_selection_ == FilterOptions::DROP) { - // If any values are selected, they ARE NOT null - for (int64_t i = 0; i < filter_block.length; ++i) { - if (bit_util::GetBit(filter_is_valid, filter_offset + in_position) && - bit_util::GetBit(filter_data, filter_offset + in_position)) { - WriteNotNull(in_position); - } - ++in_position; - } - } else { // null_selection == FilterOptions::EMIT_NULL - // Data values in this block are not null - for (int64_t i = 0; i < filter_block.length; ++i) { - const bool is_valid = - bit_util::GetBit(filter_is_valid, filter_offset + in_position); - if (is_valid && - bit_util::GetBit(filter_data, filter_offset + in_position)) { - // Filter slot is non-null and set - WriteNotNull(in_position); - } else if (!is_valid) { - // Filter slot is null, so we have a null in the output - bit_util::ClearBit(out_is_valid_, out_position_); - WriteNull(); - } - ++in_position; - } - } - } else { // !data_block.AllSet() - // Some values are null - if (filter_valid_block.AllSet()) { - // Filter is non-null but some values are false - for (int64_t i = 0; i < filter_block.length; ++i) { - if (bit_util::GetBit(filter_data, filter_offset + in_position)) { - WriteMaybeNull(in_position); - } - ++in_position; - } - } else if (null_selection_ == FilterOptions::DROP) { - // If any values are selected, they ARE NOT null - for (int64_t i = 0; i < filter_block.length; ++i) { - if (bit_util::GetBit(filter_is_valid, filter_offset + in_position) && - bit_util::GetBit(filter_data, filter_offset + in_position)) { - WriteMaybeNull(in_position); - } - ++in_position; - } - } else { // null_selection == FilterOptions::EMIT_NULL - // Data values in this block are not null - for (int64_t i = 0; i < filter_block.length; ++i) { - const bool is_valid = - bit_util::GetBit(filter_is_valid, filter_offset + in_position); - if (is_valid && - bit_util::GetBit(filter_data, filter_offset + in_position)) { - // Filter slot is non-null and set - WriteMaybeNull(in_position); - } else if (!is_valid) { - // Filter slot is null, so we have a null in the output - bit_util::ClearBit(out_is_valid_, out_position_); - WriteNull(); - } - ++in_position; - } - } - } - } // !filter_block.AllSet() - } // while(in_position < values_length_) - } - - // Write the next out_position given the selected in_position for the input - // data and advance out_position - void WriteValue(int64_t in_position) { - if constexpr (kIsBoolean) { - bit_util::SetBitTo(out_data_, out_position_, - bit_util::GetBit(values_data_, values_offset_ + in_position)); - } else { - memcpy(out_data_ + out_position_ * byte_width(), - values_data_ + in_position * byte_width(), byte_width()); - } - ++out_position_; - } - - void WriteValueSegment(int64_t in_start, int64_t length) { - if constexpr (kIsBoolean) { - CopyBitmap(values_data_, values_offset_ + in_start, length, out_data_, - out_position_); - } else { - memcpy(out_data_ + out_position_ * byte_width(), - values_data_ + in_start * byte_width(), length * byte_width()); - } - out_position_ += length; - } - - void WriteNull() { - if constexpr (kIsBoolean) { - // Zero the bit - bit_util::ClearBit(out_data_, out_position_); - } else { - // Zero the memory - memset(out_data_ + out_position_ * byte_width(), 0, byte_width()); - } - ++out_position_; - } - - void WriteNullSegment(int64_t length) { - if constexpr (kIsBoolean) { - // Zero the bits - bit_util::SetBitsTo(out_data_, out_position_, length, false); - } else { - // Zero the memory - memset(out_data_ + out_position_ * byte_width(), 0, length * byte_width()); - } - out_position_ += length; - } - - constexpr int64_t byte_width() const { - if constexpr (kByteWidth >= 0) { - return kByteWidth; - } else { - return byte_width_; - } - } - - private: - int64_t byte_width_; - const uint8_t* values_is_valid_; - const uint8_t* values_data_; - int64_t values_null_count_; - int64_t values_offset_; - int64_t values_length_; - const ArraySpan& filter_; - FilterOptions::NullSelectionBehavior null_selection_; - uint8_t* out_is_valid_ = NULLPTR; - uint8_t* out_data_; - int64_t out_length_; - int64_t out_position_; -}; - -} // namespace - -Status PrimitiveFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { - const ArraySpan& values = batch[0].array; - const ArraySpan& filter = batch[1].array; - const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED; - FilterOptions::NullSelectionBehavior null_selection = - FilterState::Get(ctx).null_selection_behavior; - - int64_t output_length = GetFilterOutputSize(filter, null_selection); - - ArrayData* out_arr = out->array_data().get(); - - const bool filter_null_count_is_zero = - is_ree_filter ? filter.child_data[1].null_count == 0 : filter.null_count == 0; - - // The output precomputed null count is unknown except in the narrow - // condition that all the values are non-null and the filter will not cause - // any new nulls to be created. - if (values.null_count == 0 && - (null_selection == FilterOptions::DROP || filter_null_count_is_zero)) { - out_arr->null_count = 0; - } else { - out_arr->null_count = kUnknownNullCount; - } - - // When neither the values nor filter is known to have any nulls, we will - // elect the optimized non-null path where there is no need to populate a - // validity bitmap. - const bool allocate_validity = values.null_count != 0 || !filter_null_count_is_zero; - - DCHECK(util::IsFixedWidthLike(values)); - const int64_t bit_width = util::FixedWidthInBits(*values.type); - RETURN_NOT_OK(util::internal::PreallocateFixedWidthArrayData( - ctx, output_length, /*source=*/values, allocate_validity, out_arr)); - - switch (bit_width) { - case 1: - PrimitiveFilterImpl<1, /*kIsBoolean=*/true>(values, filter, null_selection, out_arr) - .Exec(); - break; - case 8: - PrimitiveFilterImpl<1>(values, filter, null_selection, out_arr).Exec(); - break; - case 16: - PrimitiveFilterImpl<2>(values, filter, null_selection, out_arr).Exec(); - break; - case 32: - PrimitiveFilterImpl<4>(values, filter, null_selection, out_arr).Exec(); - break; - case 64: - PrimitiveFilterImpl<8>(values, filter, null_selection, out_arr).Exec(); - break; - case 128: - // For INTERVAL_MONTH_DAY_NANO, DECIMAL128 - PrimitiveFilterImpl<16>(values, filter, null_selection, out_arr).Exec(); - break; - case 256: - // For DECIMAL256 - PrimitiveFilterImpl<32>(values, filter, null_selection, out_arr).Exec(); - break; - default: - // Non-specializing on byte width - PrimitiveFilterImpl<-1>(values, filter, null_selection, out_arr).Exec(); - break; - } - return Status::OK(); -} - -namespace { - -// ---------------------------------------------------------------------- -// Optimized filter for base binary types (32-bit and 64-bit) - -#define BINARY_FILTER_SETUP_COMMON() \ - const auto raw_offsets = values.GetValues(1); \ - const uint8_t* raw_data = values.buffers[2].data; \ - \ - TypedBufferBuilder offset_builder(ctx->memory_pool()); \ - TypedBufferBuilder data_builder(ctx->memory_pool()); \ - RETURN_NOT_OK(offset_builder.Reserve(output_length + 1)); \ - \ - /* Presize the data builder with a rough estimate */ \ - if (values.length > 0) { \ - const double mean_value_length = (raw_offsets[values.length] - raw_offsets[0]) / \ - static_cast(values.length); \ - RETURN_NOT_OK( \ - data_builder.Reserve(static_cast(mean_value_length * output_length))); \ - } \ - int64_t space_available = data_builder.capacity(); \ - offset_type offset = 0; - -#define APPEND_RAW_DATA(DATA, NBYTES) \ - if (ARROW_PREDICT_FALSE(NBYTES > space_available)) { \ - RETURN_NOT_OK(data_builder.Reserve(NBYTES)); \ - space_available = data_builder.capacity() - data_builder.length(); \ - } \ - data_builder.UnsafeAppend(DATA, NBYTES); \ - space_available -= NBYTES - -#define APPEND_SINGLE_VALUE() \ - do { \ - offset_type val_size = raw_offsets[in_position + 1] - raw_offsets[in_position]; \ - APPEND_RAW_DATA(raw_data + raw_offsets[in_position], val_size); \ - offset += val_size; \ - } while (0) - -// Optimized binary filter for the case where neither values nor filter have -// nulls -template -Status BinaryFilterNonNullImpl(KernelContext* ctx, const ArraySpan& values, - const ArraySpan& filter, int64_t output_length, - FilterOptions::NullSelectionBehavior null_selection, - ArrayData* out) { - using offset_type = typename ArrowType::offset_type; - const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED; - - BINARY_FILTER_SETUP_COMMON(); - - auto emit_segment = [&](int64_t position, int64_t length) { - // Bulk-append raw data - const offset_type run_data_bytes = - (raw_offsets[position + length] - raw_offsets[position]); - APPEND_RAW_DATA(raw_data + raw_offsets[position], run_data_bytes); - // Append offsets - for (int64_t i = 0; i < length; ++i) { - offset_builder.UnsafeAppend(offset); - offset += raw_offsets[i + position + 1] - raw_offsets[i + position]; - } - return Status::OK(); - }; - if (is_ree_filter) { - Status status; - VisitPlainxREEFilterOutputSegments( - filter, /*filter_may_have_nulls=*/false, null_selection, - [&status, emit_segment = std::move(emit_segment)]( - int64_t position, int64_t segment_length, bool filter_valid) { - DCHECK(filter_valid); - status = emit_segment(position, segment_length); - return status.ok(); - }); - RETURN_NOT_OK(std::move(status)); - } else { - const auto filter_data = filter.buffers[1].data; - RETURN_NOT_OK(arrow::internal::VisitSetBitRuns( - filter_data, filter.offset, filter.length, std::move(emit_segment))); - } - - offset_builder.UnsafeAppend(offset); - out->length = output_length; - RETURN_NOT_OK(offset_builder.Finish(&out->buffers[1])); - return data_builder.Finish(&out->buffers[2]); -} - -template -Status BinaryFilterImpl(KernelContext* ctx, const ArraySpan& values, - const ArraySpan& filter, int64_t output_length, - FilterOptions::NullSelectionBehavior null_selection, - ArrayData* out) { - using offset_type = typename ArrowType::offset_type; - - const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED; - - BINARY_FILTER_SETUP_COMMON(); - - const uint8_t* values_is_valid = values.buffers[0].data; - const int64_t values_offset = values.offset; - - const int64_t out_offset = out->offset; - uint8_t* out_is_valid = out->buffers[0]->mutable_data(); - // Zero bits and then only have to set valid values to true - bit_util::SetBitsTo(out_is_valid, out_offset, output_length, false); - - int64_t in_position = 0; - int64_t out_position = 0; - if (is_ree_filter) { - auto emit_segment = [&](int64_t position, int64_t segment_length, bool filter_valid) { - in_position = position; - if (filter_valid) { - // Filter values are all true and not null - // Some of the values in the block may be null - for (int64_t i = 0; i < segment_length; ++i, ++in_position, ++out_position) { - offset_builder.UnsafeAppend(offset); - if (bit_util::GetBit(values_is_valid, values_offset + in_position)) { - bit_util::SetBit(out_is_valid, out_offset + out_position); - APPEND_SINGLE_VALUE(); - } - } - } else { - offset_builder.UnsafeAppend(segment_length, offset); - out_position += segment_length; - } - return Status::OK(); - }; - Status status; - VisitPlainxREEFilterOutputSegments( - filter, /*filter_may_have_nulls=*/true, null_selection, - [&status, emit_segment = std::move(emit_segment)]( - int64_t position, int64_t segment_length, bool filter_valid) { - status = emit_segment(position, segment_length, filter_valid); - return status.ok(); - }); - RETURN_NOT_OK(std::move(status)); - } else { - const auto filter_data = filter.buffers[1].data; - const uint8_t* filter_is_valid = filter.buffers[0].data; - const int64_t filter_offset = filter.offset; - - // We use 3 block counters for fast scanning of the filter - // - // * values_valid_counter: for values null/not-null - // * filter_valid_counter: for filter null/not-null - // * filter_counter: for filter true/false - OptionalBitBlockCounter values_valid_counter(values_is_valid, values_offset, - values.length); - OptionalBitBlockCounter filter_valid_counter(filter_is_valid, filter_offset, - filter.length); - BitBlockCounter filter_counter(filter_data, filter_offset, filter.length); - - while (in_position < filter.length) { - BitBlockCount filter_valid_block = filter_valid_counter.NextWord(); - BitBlockCount values_valid_block = values_valid_counter.NextWord(); - BitBlockCount filter_block = filter_counter.NextWord(); - if (filter_block.NoneSet() && null_selection == FilterOptions::DROP) { - // For this exceedingly common case in low-selectivity filters we can - // skip further analysis of the data and move on to the next block. - in_position += filter_block.length; - } else if (filter_valid_block.AllSet()) { - // Simpler path: no filter values are null - if (filter_block.AllSet()) { - // Fastest path: filter values are all true and not null - if (values_valid_block.AllSet()) { - // The values aren't null either - bit_util::SetBitsTo(out_is_valid, out_offset + out_position, - filter_block.length, true); - - // Bulk-append raw data - offset_type block_data_bytes = - (raw_offsets[in_position + filter_block.length] - - raw_offsets[in_position]); - APPEND_RAW_DATA(raw_data + raw_offsets[in_position], block_data_bytes); - // Append offsets - for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) { - offset_builder.UnsafeAppend(offset); - offset += raw_offsets[in_position + 1] - raw_offsets[in_position]; - } - out_position += filter_block.length; - } else { - // Some of the values in this block are null - for (int64_t i = 0; i < filter_block.length; - ++i, ++in_position, ++out_position) { - offset_builder.UnsafeAppend(offset); - if (bit_util::GetBit(values_is_valid, values_offset + in_position)) { - bit_util::SetBit(out_is_valid, out_offset + out_position); - APPEND_SINGLE_VALUE(); - } - } - } - } else { // !filter_block.AllSet() - // Some of the filter values are false, but all not null - if (values_valid_block.AllSet()) { - // All the values are not-null, so we can skip null checking for - // them - for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) { - if (bit_util::GetBit(filter_data, filter_offset + in_position)) { - offset_builder.UnsafeAppend(offset); - bit_util::SetBit(out_is_valid, out_offset + out_position++); - APPEND_SINGLE_VALUE(); - } - } - } else { - // Some of the values in the block are null, so we have to check - // each one - for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) { - if (bit_util::GetBit(filter_data, filter_offset + in_position)) { - offset_builder.UnsafeAppend(offset); - if (bit_util::GetBit(values_is_valid, values_offset + in_position)) { - bit_util::SetBit(out_is_valid, out_offset + out_position); - APPEND_SINGLE_VALUE(); - } - ++out_position; - } - } - } - } - } else { // !filter_valid_block.AllSet() - // Some of the filter values are null, so we have to handle the DROP - // versus EMIT_NULL null selection behavior. - if (null_selection == FilterOptions::DROP) { - // Filter null values are treated as false. - if (values_valid_block.AllSet()) { - for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) { - if (bit_util::GetBit(filter_is_valid, filter_offset + in_position) && - bit_util::GetBit(filter_data, filter_offset + in_position)) { - offset_builder.UnsafeAppend(offset); - bit_util::SetBit(out_is_valid, out_offset + out_position++); - APPEND_SINGLE_VALUE(); - } - } - } else { - for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) { - if (bit_util::GetBit(filter_is_valid, filter_offset + in_position) && - bit_util::GetBit(filter_data, filter_offset + in_position)) { - offset_builder.UnsafeAppend(offset); - if (bit_util::GetBit(values_is_valid, values_offset + in_position)) { - bit_util::SetBit(out_is_valid, out_offset + out_position); - APPEND_SINGLE_VALUE(); - } - ++out_position; - } - } - } - } else { - // EMIT_NULL - - // Filter null values are appended to output as null whether the - // value in the corresponding slot is valid or not - if (values_valid_block.AllSet()) { - for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) { - const bool filter_not_null = - bit_util::GetBit(filter_is_valid, filter_offset + in_position); - if (filter_not_null && - bit_util::GetBit(filter_data, filter_offset + in_position)) { - offset_builder.UnsafeAppend(offset); - bit_util::SetBit(out_is_valid, out_offset + out_position++); - APPEND_SINGLE_VALUE(); - } else if (!filter_not_null) { - offset_builder.UnsafeAppend(offset); - ++out_position; - } - } - } else { - for (int64_t i = 0; i < filter_block.length; ++i, ++in_position) { - const bool filter_not_null = - bit_util::GetBit(filter_is_valid, filter_offset + in_position); - if (filter_not_null && - bit_util::GetBit(filter_data, filter_offset + in_position)) { - offset_builder.UnsafeAppend(offset); - if (bit_util::GetBit(values_is_valid, values_offset + in_position)) { - bit_util::SetBit(out_is_valid, out_offset + out_position); - APPEND_SINGLE_VALUE(); - } - ++out_position; - } else if (!filter_not_null) { - offset_builder.UnsafeAppend(offset); - ++out_position; - } - } - } - } - } - } - } - offset_builder.UnsafeAppend(offset); - out->length = output_length; - RETURN_NOT_OK(offset_builder.Finish(&out->buffers[1])); - return data_builder.Finish(&out->buffers[2]); -} - -#undef BINARY_FILTER_SETUP_COMMON -#undef APPEND_RAW_DATA -#undef APPEND_SINGLE_VALUE - -Status BinaryFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { - FilterOptions::NullSelectionBehavior null_selection = - FilterState::Get(ctx).null_selection_behavior; - - const ArraySpan& values = batch[0].array; - const ArraySpan& filter = batch[1].array; - const bool is_ree_filter = filter.type->id() == Type::RUN_END_ENCODED; - int64_t output_length = GetFilterOutputSize(filter, null_selection); - - ArrayData* out_arr = out->array_data().get(); - - const bool filter_null_count_is_zero = - is_ree_filter ? filter.child_data[1].null_count == 0 : filter.null_count == 0; - - // The output precomputed null count is unknown except in the narrow - // condition that all the values are non-null and the filter will not cause - // any new nulls to be created. - if (values.null_count == 0 && - (null_selection == FilterOptions::DROP || filter_null_count_is_zero)) { - out_arr->null_count = 0; - } else { - out_arr->null_count = kUnknownNullCount; - } - Type::type type_id = values.type->id(); - if (values.null_count == 0 && filter_null_count_is_zero) { - // Faster no-nulls case - if (is_binary_like(type_id)) { - RETURN_NOT_OK(BinaryFilterNonNullImpl( - ctx, values, filter, output_length, null_selection, out_arr)); - } else if (is_large_binary_like(type_id)) { - RETURN_NOT_OK(BinaryFilterNonNullImpl( - ctx, values, filter, output_length, null_selection, out_arr)); - } else { - DCHECK(false); - } - } else { - // Output may have nulls - RETURN_NOT_OK(ctx->AllocateBitmap(output_length).Value(&out_arr->buffers[0])); - if (is_binary_like(type_id)) { - RETURN_NOT_OK(BinaryFilterImpl(ctx, values, filter, output_length, - null_selection, out_arr)); - } else if (is_large_binary_like(type_id)) { - RETURN_NOT_OK(BinaryFilterImpl(ctx, values, filter, output_length, - null_selection, out_arr)); - } else { - DCHECK(false); - } - } - - return Status::OK(); -} - -// ---------------------------------------------------------------------- -// Null filter - -Status NullFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { - int64_t output_length = - GetFilterOutputSize(batch[1].array, FilterState::Get(ctx).null_selection_behavior); - out->value = std::make_shared(output_length)->data(); - return Status::OK(); -} - -// ---------------------------------------------------------------------- -// Dictionary filter - -Status DictionaryFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { - DictionaryArray dict_values(batch[0].array.ToArrayData()); - Datum result; - RETURN_NOT_OK(Filter(Datum(dict_values.indices()), batch[1].array.ToArrayData(), - FilterState::Get(ctx), ctx->exec_context()) - .Value(&result)); - DictionaryArray filtered_values(dict_values.type(), result.make_array(), - dict_values.dictionary()); - out->value = filtered_values.data(); - return Status::OK(); -} - -// ---------------------------------------------------------------------- -// Extension filter - -Status ExtensionFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { - ExtensionArray ext_values(batch[0].array.ToArrayData()); - Datum result; - RETURN_NOT_OK(Filter(Datum(ext_values.storage()), batch[1].array.ToArrayData(), - FilterState::Get(ctx), ctx->exec_context()) - .Value(&result)); - ExtensionArray filtered_values(ext_values.type(), result.make_array()); - out->value = filtered_values.data(); - return Status::OK(); -} - -// Transform filter to selection indices and then use Take. -Status FilterWithTakeExec(const ArrayKernelExec& take_exec, KernelContext* ctx, - const ExecSpan& batch, ExecResult* out) { - std::shared_ptr indices; - RETURN_NOT_OK(GetTakeIndices(batch[1].array, - FilterState::Get(ctx).null_selection_behavior, - ctx->memory_pool()) - .Value(&indices)); - KernelContext take_ctx(*ctx); - TakeState state{TakeOptions::NoBoundsCheck()}; - take_ctx.SetState(&state); - ExecSpan take_batch({batch[0], ArraySpan(*indices)}, batch.length); - return take_exec(&take_ctx, take_batch, out); -} - -// Due to the special treatment with their Take kernels, we filter Struct and SparseUnion -// arrays by transforming filter to selection indices and call Take. -Status StructFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { - return FilterWithTakeExec(StructTakeExec, ctx, batch, out); -} - -Status SparseUnionFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { - return FilterWithTakeExec(SparseUnionTakeExec, ctx, batch, out); -} - -// ---------------------------------------------------------------------- -// Implement Filter metafunction - -Result> FilterRecordBatch(const RecordBatch& batch, - const Datum& filter, - const FunctionOptions* options, - ExecContext* ctx) { - if (batch.num_rows() != filter.length()) { - return Status::Invalid("Filter inputs must all be the same length"); - } - - // Fetch filter - const auto& filter_opts = *static_cast(options); - ArrayData filter_array; - switch (filter.kind()) { - case Datum::ARRAY: - filter_array = *filter.array(); - break; - case Datum::CHUNKED_ARRAY: { - ARROW_ASSIGN_OR_RAISE(auto combined, Concatenate(filter.chunked_array()->chunks())); - filter_array = *combined->data(); - break; - } - default: - return Status::TypeError("Filter should be array-like"); - } - - // Convert filter to selection vector/indices and use Take - ARROW_ASSIGN_OR_RAISE(std::shared_ptr indices, - GetTakeIndices(filter_array, filter_opts.null_selection_behavior, - ctx->memory_pool())); - std::vector> columns(batch.num_columns()); - for (int i = 0; i < batch.num_columns(); ++i) { - ARROW_ASSIGN_OR_RAISE(Datum out, Take(batch.column(i)->data(), Datum(indices), - TakeOptions::NoBoundsCheck(), ctx)); - columns[i] = out.make_array(); - } - return RecordBatch::Make(batch.schema(), indices->length, std::move(columns)); -} - -Result> FilterTable(const Table& table, const Datum& filter, - const FunctionOptions* options, - ExecContext* ctx) { - if (table.num_rows() != filter.length()) { - return Status::Invalid("Filter inputs must all be the same length"); - } - if (table.num_rows() == 0) { - return Table::Make(table.schema(), table.columns(), 0); - } - - // Last input element will be the filter array - const int num_columns = table.num_columns(); - std::vector inputs(num_columns + 1); - - // Fetch table columns - for (int i = 0; i < num_columns; ++i) { - inputs[i] = table.column(i)->chunks(); - } - // Fetch filter - const auto& filter_opts = *static_cast(options); - switch (filter.kind()) { - case Datum::ARRAY: - inputs.back().push_back(filter.make_array()); - break; - case Datum::CHUNKED_ARRAY: - inputs.back() = filter.chunked_array()->chunks(); - break; - default: - return Status::TypeError("Filter should be array-like"); - } - - // Rechunk inputs to allow consistent iteration over their respective chunks - inputs = arrow::internal::RechunkArraysConsistently(inputs); - - // Instead of filtering each column with the boolean filter - // (which would be slow if the table has a large number of columns: ARROW-10569), - // convert each filter chunk to indices, and take() the column. - const int64_t num_chunks = static_cast(inputs.back().size()); - std::vector out_columns(num_columns); - int64_t out_num_rows = 0; - - for (int64_t i = 0; i < num_chunks; ++i) { - const ArrayData& filter_chunk = *inputs.back()[i]->data(); - ARROW_ASSIGN_OR_RAISE( - const auto indices, - GetTakeIndices(filter_chunk, filter_opts.null_selection_behavior, - ctx->memory_pool())); - - if (indices->length > 0) { - // Take from all input columns - Datum indices_datum{std::move(indices)}; - for (int col = 0; col < num_columns; ++col) { - const auto& column_chunk = inputs[col][i]; - ARROW_ASSIGN_OR_RAISE(Datum out, Take(column_chunk, indices_datum, - TakeOptions::NoBoundsCheck(), ctx)); - out_columns[col].push_back(std::move(out).make_array()); - } - out_num_rows += indices->length; - } - } - - ChunkedArrayVector out_chunks(num_columns); - for (int i = 0; i < num_columns; ++i) { - out_chunks[i] = std::make_shared(std::move(out_columns[i]), - table.column(i)->type()); - } - return Table::Make(table.schema(), std::move(out_chunks), out_num_rows); -} - -const FunctionDoc filter_doc( - "Filter with a boolean selection filter", - ("The output is populated with values from the input at positions\n" - "where the selection filter is non-zero. Nulls in the selection filter\n" - "are handled based on FilterOptions."), - {"input", "selection_filter"}, "FilterOptions"); - -class FilterMetaFunction : public MetaFunction { - public: - FilterMetaFunction() - : MetaFunction("filter", Arity::Binary(), filter_doc, GetDefaultFilterOptions()) {} - - Result ExecuteImpl(const std::vector& args, - const FunctionOptions* options, - ExecContext* ctx) const override { - if (args[1].kind() != Datum::ARRAY && args[1].kind() != Datum::CHUNKED_ARRAY) { - return Status::TypeError("Filter should be array-like"); - } - - const auto& filter_type = *args[1].type(); - const bool filter_is_plain_bool = filter_type.id() == Type::BOOL; - const bool filter_is_ree_bool = - filter_type.id() == Type::RUN_END_ENCODED && - checked_cast(filter_type).value_type()->id() == - Type::BOOL; - if (!filter_is_plain_bool && !filter_is_ree_bool) { - return Status::NotImplemented("Filter argument must be boolean type"); - } - - if (args[0].kind() == Datum::RECORD_BATCH) { - ARROW_ASSIGN_OR_RAISE( - std::shared_ptr out_batch, - FilterRecordBatch(*args[0].record_batch(), args[1], options, ctx)); - return Datum(out_batch); - } else if (args[0].kind() == Datum::TABLE) { - ARROW_ASSIGN_OR_RAISE(std::shared_ptr
out_table, - FilterTable(*args[0].table(), args[1], options, ctx)); - return Datum(out_table); - } else { - return CallFunction("array_filter", args, options, ctx); - } - } -}; - -// ---------------------------------------------------------------------- - -} // namespace - -// const FilterOptions* GetDefaultFilterOptions() { -// static const auto kDefaultFilterOptions = FilterOptions::Defaults(); -// return &kDefaultFilterOptions; -// } - -std::unique_ptr MakeScatterByMaskMetaFunction() { - return std::make_unique(); -} - -void PopulateScatterByMaskKernels(std::vector* out) { - auto plain_filter = InputType(Type::BOOL); - auto ree_filter = InputType(match::RunEndEncoded(Type::BOOL)); - - *out = { - // * x Boolean - {InputType(match::Primitive()), plain_filter, PrimitiveFilterExec}, - {InputType(match::BinaryLike()), plain_filter, BinaryFilterExec}, - {InputType(match::LargeBinaryLike()), plain_filter, BinaryFilterExec}, - {InputType(null()), plain_filter, NullFilterExec}, - {InputType(Type::FIXED_SIZE_BINARY), plain_filter, PrimitiveFilterExec}, - {InputType(Type::DECIMAL128), plain_filter, PrimitiveFilterExec}, - {InputType(Type::DECIMAL256), plain_filter, PrimitiveFilterExec}, - {InputType(Type::DICTIONARY), plain_filter, DictionaryFilterExec}, - {InputType(Type::EXTENSION), plain_filter, ExtensionFilterExec}, - {InputType(Type::LIST), plain_filter, ListFilterExec}, - {InputType(Type::LARGE_LIST), plain_filter, LargeListFilterExec}, - {InputType(Type::FIXED_SIZE_LIST), plain_filter, FSLFilterExec}, - {InputType(Type::DENSE_UNION), plain_filter, DenseUnionFilterExec}, - {InputType(Type::SPARSE_UNION), plain_filter, SparseUnionFilterExec}, - {InputType(Type::STRUCT), plain_filter, StructFilterExec}, - {InputType(Type::MAP), plain_filter, MapFilterExec}, - - // * x REE(Boolean) - {InputType(match::Primitive()), ree_filter, PrimitiveFilterExec}, - {InputType(match::BinaryLike()), ree_filter, BinaryFilterExec}, - {InputType(match::LargeBinaryLike()), ree_filter, BinaryFilterExec}, - {InputType(null()), ree_filter, NullFilterExec}, - {InputType(Type::FIXED_SIZE_BINARY), ree_filter, PrimitiveFilterExec}, - {InputType(Type::DECIMAL128), ree_filter, PrimitiveFilterExec}, - {InputType(Type::DECIMAL256), ree_filter, PrimitiveFilterExec}, - {InputType(Type::DICTIONARY), ree_filter, DictionaryFilterExec}, - {InputType(Type::EXTENSION), ree_filter, ExtensionFilterExec}, - {InputType(Type::LIST), ree_filter, ListFilterExec}, - {InputType(Type::LARGE_LIST), ree_filter, LargeListFilterExec}, - {InputType(Type::FIXED_SIZE_LIST), ree_filter, FSLFilterExec}, - {InputType(Type::DENSE_UNION), ree_filter, DenseUnionFilterExec}, - {InputType(Type::SPARSE_UNION), ree_filter, SparseUnionFilterExec}, - {InputType(Type::STRUCT), ree_filter, StructFilterExec}, - {InputType(Type::MAP), ree_filter, MapFilterExec}, - }; -} - -} // namespace compute::internal - -} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/vector_scatter_by_mask_internal.h b/cpp/src/arrow/compute/kernels/vector_scatter_by_mask_internal.h deleted file mode 100644 index ec1e3f849a7e0..0000000000000 --- a/cpp/src/arrow/compute/kernels/vector_scatter_by_mask_internal.h +++ /dev/null @@ -1,37 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include - -#include "arrow/array/data.h" -#include "arrow/compute/api_vector.h" -#include "arrow/compute/kernels/vector_scatter_internal.h" - -namespace arrow { -namespace compute { -namespace internal { - -std::unique_ptr MakeScatterByMaskMetaFunction(); - -void PopulateScatterByMaskKernels(std::vector* out); - -} // namespace internal -} // namespace compute -} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/vector_scatter_internal.cc b/cpp/src/arrow/compute/kernels/vector_scatter_internal.cc deleted file mode 100644 index 2e95132c8735e..0000000000000 --- a/cpp/src/arrow/compute/kernels/vector_scatter_internal.cc +++ /dev/null @@ -1,888 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include -#include -#include -#include - -#include "arrow/array/array_binary.h" -#include "arrow/array/array_nested.h" -#include "arrow/array/builder_primitive.h" -#include "arrow/buffer_builder.h" -#include "arrow/chunked_array.h" -#include "arrow/compute/api_vector.h" -#include "arrow/compute/function.h" -#include "arrow/compute/kernel.h" -#include "arrow/compute/kernels/codegen_internal.h" -#include "arrow/compute/kernels/vector_scatter_internal.h" -#include "arrow/compute/registry.h" -#include "arrow/type.h" -#include "arrow/type_traits.h" -#include "arrow/util/bit_block_counter.h" -#include "arrow/util/bit_run_reader.h" -#include "arrow/util/bit_util.h" -#include "arrow/util/fixed_width_internal.h" -#include "arrow/util/int_util.h" -#include "arrow/util/logging.h" -#include "arrow/util/ree_util.h" - -namespace arrow { - -using internal::CheckIndexBounds; - -namespace compute::internal { - -void RegisterScatterFunction(const std::string& name, FunctionDoc doc, - VectorKernel base_kernel, - std::vector&& kernels, - const FunctionOptions* default_options, - FunctionRegistry* registry) { - auto func = std::make_shared(name, Arity::Binary(), std::move(doc), - default_options); - for (auto&& kernel_data : kernels) { - base_kernel.signature = KernelSignature::Make( - {std::move(kernel_data.value_type), std::move(kernel_data.selection_type)}, - OutputType(FirstType)); - base_kernel.exec = kernel_data.exec; - DCHECK_OK(func->AddKernel(base_kernel)); - } - kernels.clear(); - DCHECK_OK(registry->AddFunction(std::move(func))); -} - -// namespace { - -// /// \brief Iterate over a REE filter, emitting ranges of a plain values array that -// /// would pass the filter. -// /// -// /// Differently from REExREE, and REExPlain filtering, PlainxREE filtering -// /// does not produce a REE output, but rather a plain output array. As such it's -// /// much simpler. -// /// -// /// \param filter_may_have_nulls Only pass false if you know the filter has no nulls. -// template -// void VisitPlainxREEFilterOutputSegmentsImpl( -// const ArraySpan& filter, bool filter_may_have_nulls, -// FilterOptions::NullSelectionBehavior null_selection, -// const EmitREEFilterSegment& emit_segment) { -// using FilterRunEndCType = typename FilterRunEndType::c_type; -// const ArraySpan& filter_values = arrow::ree_util::ValuesArray(filter); -// const int64_t filter_values_offset = filter_values.offset; -// const uint8_t* filter_is_valid = filter_values.buffers[0].data; -// const uint8_t* filter_selection = filter_values.buffers[1].data; -// filter_may_have_nulls = filter_may_have_nulls && filter_is_valid != nullptr && -// filter_values.null_count != 0; - -// const arrow::ree_util::RunEndEncodedArraySpan filter_span(filter); -// auto it = filter_span.begin(); -// if (filter_may_have_nulls) { -// if (null_selection == FilterOptions::EMIT_NULL) { -// while (!it.is_end(filter_span)) { -// const int64_t i = filter_values_offset + it.index_into_array(); -// const bool valid = bit_util::GetBit(filter_is_valid, i); -// const bool emit = !valid || bit_util::GetBit(filter_selection, i); -// if (ARROW_PREDICT_FALSE( -// emit && !emit_segment(it.logical_position(), it.run_length(), valid))) { -// break; -// } -// ++it; -// } -// } else { // DROP nulls -// while (!it.is_end(filter_span)) { -// const int64_t i = filter_values_offset + it.index_into_array(); -// const bool emit = -// bit_util::GetBit(filter_is_valid, i) && bit_util::GetBit(filter_selection, i); -// if (ARROW_PREDICT_FALSE( -// emit && !emit_segment(it.logical_position(), it.run_length(), true))) { -// break; -// } -// ++it; -// } -// } -// } else { -// while (!it.is_end(filter_span)) { -// const int64_t i = filter_values_offset + it.index_into_array(); -// const bool emit = bit_util::GetBit(filter_selection, i); -// if (ARROW_PREDICT_FALSE( -// emit && !emit_segment(it.logical_position(), it.run_length(), true))) { -// break; -// } -// ++it; -// } -// } -// } - -// } // namespace - -// void VisitPlainxREEFilterOutputSegments( -// const ArraySpan& filter, bool filter_may_have_nulls, -// FilterOptions::NullSelectionBehavior null_selection, -// const EmitREEFilterSegment& emit_segment) { -// if (filter.length == 0) { -// return; -// } -// const auto& ree_type = checked_cast(*filter.type); -// switch (ree_type.run_end_type()->id()) { -// case Type::INT16: -// return VisitPlainxREEFilterOutputSegmentsImpl( -// filter, filter_may_have_nulls, null_selection, emit_segment); -// case Type::INT32: -// return VisitPlainxREEFilterOutputSegmentsImpl( -// filter, filter_may_have_nulls, null_selection, emit_segment); -// default: -// DCHECK(ree_type.run_end_type()->id() == Type::INT64); -// return VisitPlainxREEFilterOutputSegmentsImpl( -// filter, filter_may_have_nulls, null_selection, emit_segment); -// } -// } - -// namespace { - -// // ---------------------------------------------------------------------- -// // Implement take for other data types where there is less performance -// // sensitivity by visiting the selected indices. - -// // Use CRTP to dispatch to type-specific processing of take indices for each -// // unsigned integer type. -// template -// struct Selection { -// using ValuesArrayType = typename TypeTraits::ArrayType; - -// // Forwards the generic value visitors to the VisitFilter template -// struct FilterAdapter { -// static constexpr bool is_take = false; - -// Impl* impl; -// explicit FilterAdapter(Impl* impl) : impl(impl) {} -// template -// Status Generate(ValidVisitor&& visit_valid, NullVisitor&& visit_null) { -// return impl->VisitFilter(std::forward(visit_valid), -// std::forward(visit_null)); -// } -// }; - -// // Forwards the generic value visitors to the take index visitor template -// template -// struct TakeAdapter { -// static constexpr bool is_take = true; - -// Impl* impl; -// explicit TakeAdapter(Impl* impl) : impl(impl) {} -// template -// Status Generate(ValidVisitor&& visit_valid, NullVisitor&& visit_null) { -// return impl->template VisitTake(std::forward(visit_valid), -// std::forward(visit_null)); -// } -// }; - -// KernelContext* ctx; -// const ArraySpan& values; -// const ArraySpan& selection; -// int64_t output_length; -// ArrayData* out; -// TypedBufferBuilder validity_builder; - -// Selection(KernelContext* ctx, const ExecSpan& batch, int64_t output_length, -// ExecResult* out) -// : ctx(ctx), -// values(batch[0].array), -// selection(batch[1].array), -// output_length(output_length), -// out(out->array_data().get()), -// validity_builder(ctx->memory_pool()) {} - -// virtual ~Selection() = default; - -// Status FinishCommon() { -// out->buffers.resize(values.num_buffers()); -// out->length = validity_builder.length(); -// out->null_count = validity_builder.false_count(); -// return validity_builder.Finish(&out->buffers[0]); -// } - -// template -// Status VisitTake(ValidVisitor&& visit_valid, NullVisitor&& visit_null) { -// const auto indices_values = selection.GetValues(1); -// const uint8_t* is_valid = selection.buffers[0].data; -// arrow::internal::OptionalBitIndexer indices_is_valid(is_valid, selection.offset); -// arrow::internal::OptionalBitIndexer values_is_valid(values.buffers[0].data, -// values.offset); - -// const bool values_have_nulls = values.MayHaveNulls(); -// arrow::internal::OptionalBitBlockCounter bit_counter(is_valid, selection.offset, -// selection.length); -// int64_t position = 0; -// while (position < selection.length) { -// BitBlockCount block = bit_counter.NextBlock(); -// const bool indices_have_nulls = block.popcount < block.length; -// if (!indices_have_nulls && !values_have_nulls) { -// // Fastest path, neither indices nor values have nulls -// validity_builder.UnsafeAppend(block.length, true); -// for (int64_t i = 0; i < block.length; ++i) { -// RETURN_NOT_OK(visit_valid(indices_values[position++])); -// } -// } else if (block.popcount > 0) { -// // Since we have to branch on whether the indices are null or not, we -// // combine the "non-null indices block but some values null" and -// // "some-null indices block but values non-null" into a single loop. -// for (int64_t i = 0; i < block.length; ++i) { -// if ((!indices_have_nulls || indices_is_valid[position]) && -// values_is_valid[indices_values[position]]) { -// validity_builder.UnsafeAppend(true); -// RETURN_NOT_OK(visit_valid(indices_values[position])); -// } else { -// validity_builder.UnsafeAppend(false); -// RETURN_NOT_OK(visit_null()); -// } -// ++position; -// } -// } else { -// // The whole block is null -// validity_builder.UnsafeAppend(block.length, false); -// for (int64_t i = 0; i < block.length; ++i) { -// RETURN_NOT_OK(visit_null()); -// } -// position += block.length; -// } -// } -// return Status::OK(); -// } - -// // We use the NullVisitor both for "selected" nulls as well as "emitted" -// // nulls coming from the filter when using FilterOptions::EMIT_NULL -// template -// Status VisitFilter(ValidVisitor&& visit_valid, NullVisitor&& visit_null) { -// const bool is_ree_filter = selection.type->id() == Type::RUN_END_ENCODED; -// const auto null_selection = FilterState::Get(ctx).null_selection_behavior; - -// arrow::internal::OptionalBitIndexer values_is_valid(values.buffers[0].data, -// values.offset); - -// auto AppendNotNull = [&](int64_t index) -> Status { -// validity_builder.UnsafeAppend(true); -// return visit_valid(index); -// }; - -// auto AppendNull = [&]() -> Status { -// validity_builder.UnsafeAppend(false); -// return visit_null(); -// }; - -// auto AppendMaybeNull = [&](int64_t index) -> Status { -// if (values_is_valid[index]) { -// return AppendNotNull(index); -// } else { -// return AppendNull(); -// } -// }; - -// if (is_ree_filter) { -// Status status; -// VisitPlainxREEFilterOutputSegments( -// selection, /*filter_may_have_nulls=*/true, null_selection, -// [&](int64_t position, int64_t segment_length, bool filter_valid) { -// if (filter_valid) { -// for (int64_t i = 0; i < segment_length; ++i) { -// status = AppendMaybeNull(position + i); -// } -// } else { -// for (int64_t i = 0; i < segment_length; ++i) { -// status = AppendNull(); -// } -// } -// return status.ok(); -// }); -// return status; -// } - -// const uint8_t* filter_data = selection.buffers[1].data; -// const uint8_t* filter_is_valid = selection.buffers[0].data; -// const int64_t filter_offset = selection.offset; -// // We use 3 block counters for fast scanning of the filter -// // -// // * values_valid_counter: for values null/not-null -// // * filter_valid_counter: for filter null/not-null -// // * filter_counter: for filter true/false -// arrow::internal::OptionalBitBlockCounter values_valid_counter( -// values.buffers[0].data, values.offset, values.length); -// arrow::internal::OptionalBitBlockCounter filter_valid_counter( -// filter_is_valid, filter_offset, selection.length); -// arrow::internal::BitBlockCounter filter_counter(filter_data, filter_offset, -// selection.length); - -// int64_t in_position = 0; -// while (in_position < selection.length) { -// arrow::internal::BitBlockCount filter_valid_block = filter_valid_counter.NextWord(); -// arrow::internal::BitBlockCount values_valid_block = values_valid_counter.NextWord(); -// arrow::internal::BitBlockCount filter_block = filter_counter.NextWord(); -// if (filter_block.NoneSet() && null_selection == FilterOptions::DROP) { -// // For this exceedingly common case in low-selectivity filters we can -// // skip further analysis of the data and move on to the next block. -// in_position += filter_block.length; -// } else if (filter_valid_block.AllSet()) { -// // Simpler path: no filter values are null -// if (filter_block.AllSet()) { -// // Fastest path: filter values are all true and not null -// if (values_valid_block.AllSet()) { -// // The values aren't null either -// validity_builder.UnsafeAppend(filter_block.length, true); -// for (int64_t i = 0; i < filter_block.length; ++i) { -// RETURN_NOT_OK(visit_valid(in_position++)); -// } -// } else { -// // Some of the values in this block are null -// for (int64_t i = 0; i < filter_block.length; ++i) { -// RETURN_NOT_OK(AppendMaybeNull(in_position++)); -// } -// } -// } else { // !filter_block.AllSet() -// // Some of the filter values are false, but all not null -// if (values_valid_block.AllSet()) { -// // All the values are not-null, so we can skip null checking for -// // them -// for (int64_t i = 0; i < filter_block.length; ++i) { -// if (bit_util::GetBit(filter_data, filter_offset + in_position)) { -// RETURN_NOT_OK(AppendNotNull(in_position)); -// } -// ++in_position; -// } -// } else { -// // Some of the values in the block are null, so we have to check -// // each one -// for (int64_t i = 0; i < filter_block.length; ++i) { -// if (bit_util::GetBit(filter_data, filter_offset + in_position)) { -// RETURN_NOT_OK(AppendMaybeNull(in_position)); -// } -// ++in_position; -// } -// } -// } -// } else { // !filter_valid_block.AllSet() -// // Some of the filter values are null, so we have to handle the DROP -// // versus EMIT_NULL null selection behavior. -// if (null_selection == FilterOptions::DROP) { -// // Filter null values are treated as false. -// for (int64_t i = 0; i < filter_block.length; ++i) { -// if (bit_util::GetBit(filter_is_valid, filter_offset + in_position) && -// bit_util::GetBit(filter_data, filter_offset + in_position)) { -// RETURN_NOT_OK(AppendMaybeNull(in_position)); -// } -// ++in_position; -// } -// } else { -// // Filter null values are appended to output as null whether the -// // value in the corresponding slot is valid or not -// for (int64_t i = 0; i < filter_block.length; ++i) { -// const bool filter_not_null = -// bit_util::GetBit(filter_is_valid, filter_offset + in_position); -// if (filter_not_null && -// bit_util::GetBit(filter_data, filter_offset + in_position)) { -// RETURN_NOT_OK(AppendMaybeNull(in_position)); -// } else if (!filter_not_null) { -// // EMIT_NULL case -// RETURN_NOT_OK(AppendNull()); -// } -// ++in_position; -// } -// } -// } -// } -// return Status::OK(); -// } - -// virtual Status Init() { return Status::OK(); } - -// // Implementation specific finish logic -// virtual Status Finish() = 0; - -// Status ExecTake() { -// RETURN_NOT_OK(this->validity_builder.Reserve(output_length)); -// RETURN_NOT_OK(Init()); -// int index_width = this->selection.type->byte_width(); - -// // CTRP dispatch here -// switch (index_width) { -// case 1: { -// Status s = -// static_cast(this)->template GenerateOutput>(); -// RETURN_NOT_OK(s); -// } break; -// case 2: { -// Status s = -// static_cast(this)->template GenerateOutput>(); -// RETURN_NOT_OK(s); -// } break; -// case 4: { -// Status s = -// static_cast(this)->template GenerateOutput>(); -// RETURN_NOT_OK(s); -// } break; -// case 8: { -// Status s = -// static_cast(this)->template GenerateOutput>(); -// RETURN_NOT_OK(s); -// } break; -// default: -// DCHECK(false) << "Invalid index width"; -// break; -// } -// RETURN_NOT_OK(this->FinishCommon()); -// return Finish(); -// } - -// Status ExecFilter() { -// RETURN_NOT_OK(this->validity_builder.Reserve(output_length)); -// RETURN_NOT_OK(Init()); -// // CRTP dispatch -// Status s = static_cast(this)->template GenerateOutput(); -// RETURN_NOT_OK(s); -// RETURN_NOT_OK(this->FinishCommon()); -// return Finish(); -// } -// }; - -// #define LIFT_BASE_MEMBERS() \ -// using ValuesArrayType = typename Base::ValuesArrayType; \ -// using Base::ctx; \ -// using Base::values; \ -// using Base::selection; \ -// using Base::output_length; \ -// using Base::out; \ -// using Base::validity_builder - -// inline Status VisitNoop() { return Status::OK(); } - -// // A selection implementation for 32-bit and 64-bit variable binary -// // types. Common generated kernels are shared between Binary/String and -// // LargeBinary/LargeString -// template -// struct VarBinarySelectionImpl : public Selection, Type> { -// using offset_type = typename Type::offset_type; - -// using Base = Selection, Type>; -// LIFT_BASE_MEMBERS(); - -// TypedBufferBuilder offset_builder; -// TypedBufferBuilder data_builder; - -// static constexpr int64_t kOffsetLimit = std::numeric_limits::max() - 1; - -// VarBinarySelectionImpl(KernelContext* ctx, const ExecSpan& batch, int64_t output_length, -// ExecResult* out) -// : Base(ctx, batch, output_length, out), -// offset_builder(ctx->memory_pool()), -// data_builder(ctx->memory_pool()) {} - -// template -// Status GenerateOutput() { -// const auto raw_offsets = this->values.template GetValues(1); -// const uint8_t* raw_data = this->values.buffers[2].data; - -// // Presize the data builder with a rough estimate of the required data size -// if (this->values.length > 0) { -// int64_t data_length = raw_offsets[this->values.length] - raw_offsets[0]; -// const double mean_value_length = -// data_length / static_cast(this->values.length); - -// // TODO: See if possible to reduce output_length for take/filter cases -// // where there are nulls in the selection array -// RETURN_NOT_OK( -// data_builder.Reserve(static_cast(mean_value_length * output_length))); -// } -// int64_t space_available = data_builder.capacity(); - -// offset_type offset = 0; -// Adapter adapter(this); -// RETURN_NOT_OK(adapter.Generate( -// [&](int64_t index) { -// offset_builder.UnsafeAppend(offset); -// offset_type val_offset = raw_offsets[index]; -// offset_type val_size = raw_offsets[index + 1] - val_offset; - -// // Use static property to prune this code from the filter path in -// // optimized builds -// if (Adapter::is_take && -// ARROW_PREDICT_FALSE(static_cast(offset) + -// static_cast(val_size)) > kOffsetLimit) { -// return Status::Invalid("Take operation overflowed binary array capacity"); -// } -// offset += val_size; -// if (ARROW_PREDICT_FALSE(val_size > space_available)) { -// RETURN_NOT_OK(data_builder.Reserve(val_size)); -// space_available = data_builder.capacity() - data_builder.length(); -// } -// data_builder.UnsafeAppend(raw_data + val_offset, val_size); -// space_available -= val_size; -// return Status::OK(); -// }, -// [&]() { -// offset_builder.UnsafeAppend(offset); -// return Status::OK(); -// })); -// offset_builder.UnsafeAppend(offset); -// return Status::OK(); -// } - -// Status Init() override { return offset_builder.Reserve(output_length + 1); } - -// Status Finish() override { -// RETURN_NOT_OK(offset_builder.Finish(&out->buffers[1])); -// return data_builder.Finish(&out->buffers[2]); -// } -// }; - -// template -// struct ListSelectionImpl : public Selection, Type> { -// using offset_type = typename Type::offset_type; - -// using Base = Selection, Type>; -// LIFT_BASE_MEMBERS(); - -// TypedBufferBuilder offset_builder; -// typename TypeTraits::OffsetBuilderType child_index_builder; - -// ListSelectionImpl(KernelContext* ctx, const ExecSpan& batch, int64_t output_length, -// ExecResult* out) -// : Base(ctx, batch, output_length, out), -// offset_builder(ctx->memory_pool()), -// child_index_builder(ctx->memory_pool()) {} - -// template -// Status GenerateOutput() { -// ValuesArrayType typed_values(this->values.ToArrayData()); - -// // TODO presize child_index_builder with a similar heuristic as VarBinarySelectionImpl - -// offset_type offset = 0; -// Adapter adapter(this); -// RETURN_NOT_OK(adapter.Generate( -// [&](int64_t index) { -// offset_builder.UnsafeAppend(offset); -// offset_type value_offset = typed_values.value_offset(index); -// offset_type value_length = typed_values.value_length(index); -// offset += value_length; -// RETURN_NOT_OK(child_index_builder.Reserve(value_length)); -// for (offset_type j = value_offset; j < value_offset + value_length; ++j) { -// child_index_builder.UnsafeAppend(j); -// } -// return Status::OK(); -// }, -// [&]() { -// offset_builder.UnsafeAppend(offset); -// return Status::OK(); -// })); -// offset_builder.UnsafeAppend(offset); -// return Status::OK(); -// } - -// Status Init() override { -// RETURN_NOT_OK(offset_builder.Reserve(output_length + 1)); -// return Status::OK(); -// } - -// Status Finish() override { -// std::shared_ptr child_indices; -// RETURN_NOT_OK(child_index_builder.Finish(&child_indices)); - -// ValuesArrayType typed_values(this->values.ToArrayData()); - -// // No need to boundscheck the child values indices -// ARROW_ASSIGN_OR_RAISE(std::shared_ptr taken_child, -// Take(*typed_values.values(), *child_indices, -// TakeOptions::NoBoundsCheck(), ctx->exec_context())); -// RETURN_NOT_OK(offset_builder.Finish(&out->buffers[1])); -// out->child_data = {taken_child->data()}; -// return Status::OK(); -// } -// }; - -// struct DenseUnionSelectionImpl -// : public Selection { -// using Base = Selection; -// LIFT_BASE_MEMBERS(); - -// TypedBufferBuilder value_offset_buffer_builder_; -// TypedBufferBuilder child_id_buffer_builder_; -// std::vector type_codes_; -// std::vector child_indices_builders_; - -// DenseUnionSelectionImpl(KernelContext* ctx, const ExecSpan& batch, -// int64_t output_length, ExecResult* out) -// : Base(ctx, batch, output_length, out), -// value_offset_buffer_builder_(ctx->memory_pool()), -// child_id_buffer_builder_(ctx->memory_pool()), -// type_codes_(checked_cast(*this->values.type).type_codes()), -// child_indices_builders_(type_codes_.size()) { -// for (auto& child_indices_builder : child_indices_builders_) { -// child_indices_builder = Int32Builder(ctx->memory_pool()); -// } -// } - -// template -// Status GenerateOutput() { -// DenseUnionArray typed_values(this->values.ToArrayData()); -// Adapter adapter(this); -// RETURN_NOT_OK(adapter.Generate( -// [&](int64_t index) { -// int8_t child_id = typed_values.child_id(index); -// child_id_buffer_builder_.UnsafeAppend(type_codes_[child_id]); -// int32_t value_offset = typed_values.value_offset(index); -// value_offset_buffer_builder_.UnsafeAppend( -// static_cast(child_indices_builders_[child_id].length())); -// RETURN_NOT_OK(child_indices_builders_[child_id].Reserve(1)); -// child_indices_builders_[child_id].UnsafeAppend(value_offset); -// return Status::OK(); -// }, -// [&]() { -// int8_t child_id = 0; -// child_id_buffer_builder_.UnsafeAppend(type_codes_[child_id]); -// value_offset_buffer_builder_.UnsafeAppend( -// static_cast(child_indices_builders_[child_id].length())); -// RETURN_NOT_OK(child_indices_builders_[child_id].Reserve(1)); -// child_indices_builders_[child_id].UnsafeAppendNull(); -// return Status::OK(); -// })); -// return Status::OK(); -// } - -// Status Init() override { -// RETURN_NOT_OK(child_id_buffer_builder_.Reserve(output_length)); -// RETURN_NOT_OK(value_offset_buffer_builder_.Reserve(output_length)); -// return Status::OK(); -// } - -// Status Finish() override { -// ARROW_ASSIGN_OR_RAISE(auto child_ids_buffer, child_id_buffer_builder_.Finish()); -// ARROW_ASSIGN_OR_RAISE(auto value_offsets_buffer, -// value_offset_buffer_builder_.Finish()); -// DenseUnionArray typed_values(this->values.ToArrayData()); -// auto num_fields = typed_values.num_fields(); -// auto num_rows = child_ids_buffer->size(); -// BufferVector buffers{nullptr, std::move(child_ids_buffer), -// std::move(value_offsets_buffer)}; -// *out = ArrayData(typed_values.type(), num_rows, std::move(buffers), 0); -// for (auto i = 0; i < num_fields; i++) { -// ARROW_ASSIGN_OR_RAISE(auto child_indices_array, -// child_indices_builders_[i].Finish()); -// ARROW_ASSIGN_OR_RAISE(std::shared_ptr child_array, -// Take(*typed_values.field(i), *child_indices_array)); -// out->child_data.push_back(child_array->data()); -// } -// return Status::OK(); -// } -// }; - -// // We need a slightly different approach for SparseUnion. For Take, we can -// // invoke Take on each child's data with boundschecking disabled. For -// // Filter on the other hand, if we naively call Filter on each child, then the -// // filter output length will have to be redundantly computed. Thus, for Filter -// // we instead convert the filter to selection indices and then invoke take. - -// // SparseUnion selection implementation. ONLY used for Take -// struct SparseUnionSelectionImpl -// : public Selection { -// using Base = Selection; -// LIFT_BASE_MEMBERS(); - -// TypedBufferBuilder child_id_buffer_builder_; -// const int8_t type_code_for_null_; - -// SparseUnionSelectionImpl(KernelContext* ctx, const ExecSpan& batch, -// int64_t output_length, ExecResult* out) -// : Base(ctx, batch, output_length, out), -// child_id_buffer_builder_(ctx->memory_pool()), -// type_code_for_null_( -// checked_cast(*this->values.type).type_codes()[0]) {} - -// template -// Status GenerateOutput() { -// SparseUnionArray typed_values(this->values.ToArrayData()); -// Adapter adapter(this); -// RETURN_NOT_OK(adapter.Generate( -// [&](int64_t index) { -// child_id_buffer_builder_.UnsafeAppend(typed_values.type_code(index)); -// return Status::OK(); -// }, -// [&]() { -// child_id_buffer_builder_.UnsafeAppend(type_code_for_null_); -// return Status::OK(); -// })); -// return Status::OK(); -// } - -// Status Init() override { -// RETURN_NOT_OK(child_id_buffer_builder_.Reserve(output_length)); -// return Status::OK(); -// } - -// Status Finish() override { -// ARROW_ASSIGN_OR_RAISE(auto child_ids_buffer, child_id_buffer_builder_.Finish()); -// SparseUnionArray typed_values(this->values.ToArrayData()); -// auto num_fields = typed_values.num_fields(); -// auto num_rows = child_ids_buffer->size(); -// BufferVector buffers{nullptr, std::move(child_ids_buffer)}; -// *out = ArrayData(typed_values.type(), num_rows, std::move(buffers), 0); -// out->child_data.reserve(num_fields); -// for (auto i = 0; i < num_fields; i++) { -// ARROW_ASSIGN_OR_RAISE(auto child_datum, -// Take(*typed_values.field(i), *this->selection.ToArrayData())); -// out->child_data.emplace_back(std::move(child_datum).array()); -// } -// return Status::OK(); -// } -// }; - -// struct FSLSelectionImpl : public Selection { -// Int64Builder child_index_builder; - -// using Base = Selection; -// LIFT_BASE_MEMBERS(); - -// FSLSelectionImpl(KernelContext* ctx, const ExecSpan& batch, int64_t output_length, -// ExecResult* out) -// : Base(ctx, batch, output_length, out), child_index_builder(ctx->memory_pool()) {} - -// template -// Status GenerateOutput() { -// ValuesArrayType typed_values(this->values.ToArrayData()); -// const int32_t list_size = typed_values.list_type()->list_size(); -// const int64_t base_offset = typed_values.offset(); - -// // We must take list_size elements even for null elements of -// // indices. -// RETURN_NOT_OK(child_index_builder.Reserve(output_length * list_size)); - -// Adapter adapter(this); -// return adapter.Generate( -// [&](int64_t index) { -// int64_t offset = (base_offset + index) * list_size; -// for (int64_t j = offset; j < offset + list_size; ++j) { -// child_index_builder.UnsafeAppend(j); -// } -// return Status::OK(); -// }, -// [&]() { return child_index_builder.AppendNulls(list_size); }); -// } - -// Status Finish() override { -// std::shared_ptr child_indices; -// RETURN_NOT_OK(child_index_builder.Finish(&child_indices)); - -// ValuesArrayType typed_values(this->values.ToArrayData()); - -// // No need to boundscheck the child values indices -// ARROW_ASSIGN_OR_RAISE(std::shared_ptr taken_child, -// Take(*typed_values.values(), *child_indices, -// TakeOptions::NoBoundsCheck(), ctx->exec_context())); -// out->child_data = {taken_child->data()}; -// return Status::OK(); -// } -// }; - -// // ---------------------------------------------------------------------- -// // Struct selection implementations - -// // We need a slightly different approach for StructType. For Take, we can -// // invoke Take on each struct field's data with boundschecking disabled. For -// // Filter on the other hand, if we naively call Filter on each field, then the -// // filter output length will have to be redundantly computed. Thus, for Filter -// // we instead convert the filter to selection indices and then invoke take. - -// // Struct selection implementation. ONLY used for Take -// struct StructSelectionImpl : public Selection { -// using Base = Selection; -// LIFT_BASE_MEMBERS(); -// using Base::Base; - -// template -// Status GenerateOutput() { -// StructArray typed_values(this->values.ToArrayData()); -// Adapter adapter(this); -// // There's nothing to do for Struct except to generate the validity bitmap -// return adapter.Generate([&](int64_t index) { return Status::OK(); }, -// /*visit_null=*/VisitNoop); -// } - -// Status Finish() override { -// StructArray typed_values(this->values.ToArrayData()); - -// // Select from children without boundschecking -// out->child_data.resize(this->values.type->num_fields()); -// for (int field_index = 0; field_index < this->values.type->num_fields(); -// ++field_index) { -// ARROW_ASSIGN_OR_RAISE(Datum taken_field, -// Take(Datum(typed_values.field(field_index)), -// Datum(this->selection.ToArrayData()), -// TakeOptions::NoBoundsCheck(), ctx->exec_context())); -// out->child_data[field_index] = taken_field.array(); -// } -// return Status::OK(); -// } -// }; - -// #undef LIFT_BASE_MEMBERS - -// // ---------------------------------------------------------------------- - -template -Status ScatterByMaskExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { - int64_t output_length = - GetFilterOutputSize(batch[1].array, FilterState::Get(ctx).null_selection_behavior); - Impl kernel(ctx, batch, output_length, out); - return kernel.ExecFilter(); -} - -// } // namespace - -Status ListFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { - return FilterExec>(ctx, batch, out); -} - -Status LargeListFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { - return FilterExec>(ctx, batch, out); -} - -Status FSLFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { - const ArraySpan& values = batch[0].array; - - // If a FixedSizeList wraps a fixed-width type we can, in some cases, use - // PrimitiveFilterExec for a fixed-size list array. - if (util::IsFixedWidthLike(values, - /*force_null_count=*/true, - /*exclude_bool_and_dictionary=*/true)) { - const auto byte_width = util::FixedWidthInBytes(*values.type); - // 0 is a valid byte width for FixedSizeList, but PrimitiveFilterExec - // might not handle it correctly. - if (byte_width > 0) { - return PrimitiveFilterExec(ctx, batch, out); - } - } - return FilterExec(ctx, batch, out); -} - -Status DenseUnionFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { - return FilterExec(ctx, batch, out); -} - -Status MapFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { - return FilterExec>(ctx, batch, out); -} - -} // namespace compute::internal -} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/vector_scatter_internal.h b/cpp/src/arrow/compute/kernels/vector_scatter_internal.h deleted file mode 100644 index d7493997159da..0000000000000 --- a/cpp/src/arrow/compute/kernels/vector_scatter_internal.h +++ /dev/null @@ -1,71 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include - -#include "arrow/array/data.h" -#include "arrow/compute/api_vector.h" -#include "arrow/compute/exec.h" -#include "arrow/compute/function.h" -#include "arrow/compute/kernel.h" -#include "arrow/compute/kernels/codegen_internal.h" - -namespace arrow::compute::internal { - -struct ScatterKernelData { - InputType value_type; - InputType selection_type; - ArrayKernelExec exec; -}; - -void RegisterScatterFunction(const std::string& name, FunctionDoc doc, - VectorKernel base_kernel, - std::vector&& kernels, - const FunctionOptions* default_options, - FunctionRegistry* registry); - -/// \brief Callback type for VisitPlainxREEFilterOutputSegments. -/// -/// position is the logical position in the values array relative to its offset. -/// -/// segment_length is the number of elements that should be emitted. -/// -/// filter_valid is true if the filter run value is non-NULL. This value can -/// only be false if null_selection is NullSelectionBehavior::EMIT_NULL. For -/// NullSelectionBehavior::DROP, NULL values from the filter are simply skipped. -/// -/// Return true if iteration should continue, false if iteration should stop. -// using EmitREEFilterSegment = -// std::function; - -// void VisitPlainxREEFilterOutputSegments( -// const ArraySpan& filter, bool filter_may_have_nulls, -// FilterOptions::NullSelectionBehavior null_selection, -// const EmitREEFilterSegment& emit_segment); - -Status PrimitiveScatterByMaskExec(KernelContext*, const ExecSpan&, ExecResult*); -Status ListScatterByMaskExec(KernelContext*, const ExecSpan&, ExecResult*); -Status LargeListScatterByMaskExec(KernelContext*, const ExecSpan&, ExecResult*); -Status FSLScatterByMaskExec(KernelContext*, const ExecSpan&, ExecResult*); -Status DenseUnionScatterByMaskExec(KernelContext*, const ExecSpan&, ExecResult*); -Status MapScatterByMaskExec(KernelContext*, const ExecSpan&, ExecResult*); - -} // namespace arrow::compute::internal