diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index f2e4578383122..8acd53f76e4a3 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -297,11 +297,23 @@ void ComputeDataPreallocate(const DataType& type, case Type::MAP: widths->emplace_back(32, /*added_length=*/1); return; + case Type::LIST_VIEW: { + // add offsets and size + widths->emplace_back(32, /*added_length=*/1); + widths->emplace_back(32, /*added_length=*/1); + return; + } case Type::LARGE_BINARY: case Type::LARGE_STRING: case Type::LARGE_LIST: widths->emplace_back(64, /*added_length=*/1); return; + case Type::LARGE_LIST_VIEW: { + // add offsets and size + widths->emplace_back(64, /*added_length=*/1); + widths->emplace_back(64, /*added_length=*/1); + return; + } default: break; } @@ -410,7 +422,7 @@ bool ExecSpanIterator::Next(ExecSpan* span) { // The first time this is called, we populate the output span with any // Scalar or Array arguments in the ExecValue struct, and then just // increment array offsets below. If any arguments are ChunkedArray, then - // the internal ArraySpans will see their members updated during hte + // the internal ArraySpans will see their members updated during the // iteration span->values.resize(args_->size()); for (size_t i = 0; i < args_->size(); ++i) { @@ -473,7 +485,7 @@ bool ExecSpanIterator::Next(ExecSpan* span) { namespace { struct NullGeneralization { - enum type { PERHAPS_NULL, ALL_VALID, ALL_NULL }; + enum type { PERHAPS_NULL = 0, ALL_VALID = 1, ALL_NULL = 2, INVALID = 3 }; static type Get(const ExecValue& value) { const auto dtype_id = value.type()->id(); @@ -498,7 +510,30 @@ struct NullGeneralization { return PERHAPS_NULL; } + static type Get(const ChunkedArray& chunk_array) { + if (chunk_array.num_chunks() == 0 || + (chunk_array.null_count() == chunk_array.length())) { + return ALL_NULL; + } + + type null_gen = INVALID; + int chunk_idx = 0; + while (chunk_idx < chunk_array.num_chunks()) { + ExecValue value; + const ArrayData* curr_chunk = chunk_array.chunk(chunk_idx)->data().get(); + value.SetArray(*curr_chunk); + null_gen = static_cast((null_gen & Get(value))); + chunk_idx++; + } + DCHECK(null_gen != type::INVALID); + return null_gen; + } + static type Get(const Datum& datum) { + if (datum.is_chunked_array()) { + return Get(*datum.chunked_array()); + } + // Temporary workaround to help with ARROW-16756 ExecValue value; if (datum.is_array()) { @@ -506,7 +541,6 @@ struct NullGeneralization { } else if (datum.is_scalar()) { value.SetScalar(datum.scalar().get()); } else { - // TODO(wesm): ChunkedArray, I think return PERHAPS_NULL; } return Get(value); @@ -738,12 +772,10 @@ class KernelExecutorImpl : public KernelExecutor { } for (size_t i = 0; i < data_preallocated_.size(); ++i) { const auto& prealloc = data_preallocated_[i]; - if (prealloc.bit_width >= 0) { - ARROW_ASSIGN_OR_RAISE( - out->buffers[i + 1], - AllocateDataBuffer(kernel_ctx_, length + prealloc.added_length, - prealloc.bit_width)); - } + ARROW_ASSIGN_OR_RAISE( + out->buffers[i + 1], + AllocateDataBuffer(kernel_ctx_, length + prealloc.added_length, + prealloc.bit_width)); } return out; } @@ -796,7 +828,7 @@ class ScalarExecutor : public KernelExecutorImpl { // kernels supporting preallocation, then we do so up front and then // iterate over slices of that large array. Otherwise, we preallocate prior // to processing each span emitted from the ExecSpanIterator - RETURN_NOT_OK(SetupPreallocation(span_iterator_.length(), batch.values)); + RETURN_NOT_OK(SetupPreallocation(batch.values)); // ARROW-16756: Here we have to accommodate the distinct cases // @@ -928,7 +960,7 @@ class ScalarExecutor : public KernelExecutorImpl { return Status::OK(); } - Status SetupPreallocation(int64_t total_length, const std::vector& args) { + Status SetupPreallocation(const std::vector& args) { output_num_buffers_ = static_cast(output_type_.type->layout().buffers.size()); auto out_type_id = output_type_.type->id(); // Default to no validity pre-allocation for following cases: @@ -966,12 +998,6 @@ class ScalarExecutor : public KernelExecutorImpl { data_preallocated_.size() == static_cast(output_num_buffers_ - 1) && !is_nested(out_type_id) && !is_dictionary(out_type_id)); - // TODO(wesm): why was this check ever here? Fixed width binary - // can be 0-width but anything else? - DCHECK(std::all_of( - data_preallocated_.begin(), data_preallocated_.end(), - [](const BufferPreallocation& prealloc) { return prealloc.bit_width >= 0; })); - // Contiguous preallocation only possible on non-nested types if all // buffers are preallocated. Otherwise, we must go chunk-by-chunk. // @@ -1022,25 +1048,7 @@ Status CheckCanExecuteChunked(const VectorKernel* kernel) { class VectorExecutor : public KernelExecutorImpl { public: Status Execute(const ExecBatch& batch, ExecListener* listener) override { - // Some vector kernels have a separate code path for handling - // chunked arrays (VectorKernel::exec_chunked) so we check if we - // have any chunked arrays. If we do and an exec_chunked function - // is defined then we call that. - bool have_chunked_arrays = false; - for (const Datum& arg : batch.values) { - if (arg.is_chunked_array()) have_chunked_arrays = true; - } - - output_num_buffers_ = static_cast(output_type_.type->layout().buffers.size()); - - // Decide if we need to preallocate memory for this kernel - validity_preallocated_ = - (kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE && - kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL); - if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { - data_preallocated_.clear(); - ComputeDataPreallocate(*output_type_.type, &data_preallocated_); - } + RETURN_NOT_OK(SetupPreallocation(batch.values)); if (kernel_->can_execute_chunkwise) { RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize())); @@ -1049,10 +1057,22 @@ class VectorExecutor : public KernelExecutorImpl { RETURN_NOT_OK(Exec(span, listener)); } } else { - // Kernel cannot execute chunkwise. If we have any chunked - // arrays, then VectorKernel::exec_chunked must be defined - // otherwise we raise an error + // Some vector kernels have a separate code path for handling + // chunked arrays (VectorKernel::exec_chunked), so we check if we + // have any chunked arrays. If we do and an exec_chunked function + // is defined then we call that. + bool have_chunked_arrays = false; + for (const Datum& arg : batch.values) { + if (arg.is_chunked_array()) { + have_chunked_arrays = true; + break; + } + } + if (have_chunked_arrays) { + // Kernel cannot execute chunkwise. If we have any chunked + // arrays, then VectorKernel::exec_chunked must be defined + // otherwise we raise an error RETURN_NOT_OK(ExecChunked(batch, listener)); } else { // No chunked arrays. We pack the args into an ExecSpan and @@ -1123,6 +1143,34 @@ class VectorExecutor : public KernelExecutorImpl { } } + Status SetupPreallocation(const std::vector& args) { + output_num_buffers_ = static_cast(output_type_.type->layout().buffers.size()); + auto out_type_id = output_type_.type->id(); + + // Decide if we need to preallocate memory for this kernel + validity_preallocated_ = false; + if (out_type_id != Type::NA) { + if (kernel_->null_handling == NullHandling::COMPUTED_PREALLOCATE) { + // Override the flag if kernel asks for pre-allocation + validity_preallocated_ = true; + } else if (kernel_->null_handling == NullHandling::INTERSECTION) { + bool elide_validity_bitmap = true; + for (const auto& arg : args) { + auto null_gen = NullGeneralization::Get(arg) == NullGeneralization::ALL_VALID; + + // If not all valid, this becomes false + elide_validity_bitmap = elide_validity_bitmap && null_gen; + } + validity_preallocated_ = !elide_validity_bitmap; + } + } + if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { + data_preallocated_.clear(); + ComputeDataPreallocate(*output_type_.type, &data_preallocated_); + } + return Status::OK(); + } + ExecSpanIterator span_iterator_; std::vector results_; }; diff --git a/cpp/src/arrow/compute/exec_internal.h b/cpp/src/arrow/compute/exec_internal.h index 7e4f364a9288e..f84722314763f 100644 --- a/cpp/src/arrow/compute/exec_internal.h +++ b/cpp/src/arrow/compute/exec_internal.h @@ -132,9 +132,6 @@ class ARROW_EXPORT KernelExecutor { /// for all scanned batches in a dataset filter. virtual Status Init(KernelContext*, KernelInitArgs) = 0; - // TODO(wesm): per ARROW-16819, adding ExecBatch variant so that a batch - // length can be passed in for scalar functions; will have to return and - // clean a bunch of things up virtual Status Execute(const ExecBatch& batch, ExecListener* listener) = 0; virtual Datum WrapResults(const std::vector& args,