Skip to content

Commit

Permalink
refactor executor kernel
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhangHuiGui committed Jun 5, 2024
1 parent 19044ee commit bdcd19d
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 42 deletions.
126 changes: 87 additions & 39 deletions cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,23 @@ void ComputeDataPreallocate(const DataType& type,
case Type::MAP:
widths->emplace_back(32, /*added_length=*/1);
return;
case Type::LIST_VIEW: {
// add offsets and size
widths->emplace_back(32, /*added_length=*/1);
widths->emplace_back(32, /*added_length=*/1);
return;
}
case Type::LARGE_BINARY:
case Type::LARGE_STRING:
case Type::LARGE_LIST:
widths->emplace_back(64, /*added_length=*/1);
return;
case Type::LARGE_LIST_VIEW: {
// add offsets and size
widths->emplace_back(64, /*added_length=*/1);
widths->emplace_back(64, /*added_length=*/1);
return;
}
default:
break;
}
Expand Down Expand Up @@ -410,7 +422,7 @@ bool ExecSpanIterator::Next(ExecSpan* span) {
// The first time this is called, we populate the output span with any
// Scalar or Array arguments in the ExecValue struct, and then just
// increment array offsets below. If any arguments are ChunkedArray, then
// the internal ArraySpans will see their members updated during hte
// the internal ArraySpans will see their members updated during the
// iteration
span->values.resize(args_->size());
for (size_t i = 0; i < args_->size(); ++i) {
Expand Down Expand Up @@ -473,7 +485,7 @@ bool ExecSpanIterator::Next(ExecSpan* span) {
namespace {

struct NullGeneralization {
enum type { PERHAPS_NULL, ALL_VALID, ALL_NULL };
enum type { PERHAPS_NULL = 0, ALL_VALID = 1, ALL_NULL = 2, INVALID = 3 };

static type Get(const ExecValue& value) {
const auto dtype_id = value.type()->id();
Expand All @@ -498,15 +510,37 @@ struct NullGeneralization {
return PERHAPS_NULL;
}

static type Get(const ChunkedArray& chunk_array) {
if (chunk_array.num_chunks() == 0 ||
(chunk_array.null_count() == chunk_array.length())) {
return ALL_NULL;
}

type null_gen = INVALID;
int chunk_idx = 0;
while (chunk_idx < chunk_array.num_chunks()) {
ExecValue value;
const ArrayData* curr_chunk = chunk_array.chunk(chunk_idx)->data().get();
value.SetArray(*curr_chunk);
null_gen = static_cast<type>((null_gen & Get(value)));
chunk_idx++;
}
DCHECK(null_gen != type::INVALID);
return null_gen;
}

static type Get(const Datum& datum) {
if (datum.is_chunked_array()) {
return Get(*datum.chunked_array());
}

// Temporary workaround to help with ARROW-16756
ExecValue value;
if (datum.is_array()) {
value.SetArray(*datum.array());
} else if (datum.is_scalar()) {
value.SetScalar(datum.scalar().get());
} else {
// TODO(wesm): ChunkedArray, I think
return PERHAPS_NULL;
}
return Get(value);
Expand Down Expand Up @@ -738,12 +772,10 @@ class KernelExecutorImpl : public KernelExecutor {
}
for (size_t i = 0; i < data_preallocated_.size(); ++i) {
const auto& prealloc = data_preallocated_[i];
if (prealloc.bit_width >= 0) {
ARROW_ASSIGN_OR_RAISE(
out->buffers[i + 1],
AllocateDataBuffer(kernel_ctx_, length + prealloc.added_length,
prealloc.bit_width));
}
ARROW_ASSIGN_OR_RAISE(
out->buffers[i + 1],
AllocateDataBuffer(kernel_ctx_, length + prealloc.added_length,
prealloc.bit_width));
}
return out;
}
Expand Down Expand Up @@ -796,7 +828,7 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
// kernels supporting preallocation, then we do so up front and then
// iterate over slices of that large array. Otherwise, we preallocate prior
// to processing each span emitted from the ExecSpanIterator
RETURN_NOT_OK(SetupPreallocation(span_iterator_.length(), batch.values));
RETURN_NOT_OK(SetupPreallocation(batch.values));

// ARROW-16756: Here we have to accommodate the distinct cases
//
Expand Down Expand Up @@ -928,7 +960,7 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
return Status::OK();
}

Status SetupPreallocation(int64_t total_length, const std::vector<Datum>& args) {
Status SetupPreallocation(const std::vector<Datum>& args) {
output_num_buffers_ = static_cast<int>(output_type_.type->layout().buffers.size());
auto out_type_id = output_type_.type->id();
// Default to no validity pre-allocation for following cases:
Expand Down Expand Up @@ -966,12 +998,6 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
data_preallocated_.size() == static_cast<size_t>(output_num_buffers_ - 1) &&
!is_nested(out_type_id) && !is_dictionary(out_type_id));

// TODO(wesm): why was this check ever here? Fixed width binary
// can be 0-width but anything else?
DCHECK(std::all_of(
data_preallocated_.begin(), data_preallocated_.end(),
[](const BufferPreallocation& prealloc) { return prealloc.bit_width >= 0; }));

// Contiguous preallocation only possible on non-nested types if all
// buffers are preallocated. Otherwise, we must go chunk-by-chunk.
//
Expand Down Expand Up @@ -1022,25 +1048,7 @@ Status CheckCanExecuteChunked(const VectorKernel* kernel) {
class VectorExecutor : public KernelExecutorImpl<VectorKernel> {
public:
Status Execute(const ExecBatch& batch, ExecListener* listener) override {
// Some vector kernels have a separate code path for handling
// chunked arrays (VectorKernel::exec_chunked) so we check if we
// have any chunked arrays. If we do and an exec_chunked function
// is defined then we call that.
bool have_chunked_arrays = false;
for (const Datum& arg : batch.values) {
if (arg.is_chunked_array()) have_chunked_arrays = true;
}

output_num_buffers_ = static_cast<int>(output_type_.type->layout().buffers.size());

// Decide if we need to preallocate memory for this kernel
validity_preallocated_ =
(kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE &&
kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL);
if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) {
data_preallocated_.clear();
ComputeDataPreallocate(*output_type_.type, &data_preallocated_);
}
RETURN_NOT_OK(SetupPreallocation(batch.values));

if (kernel_->can_execute_chunkwise) {
RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize()));
Expand All @@ -1049,10 +1057,22 @@ class VectorExecutor : public KernelExecutorImpl<VectorKernel> {
RETURN_NOT_OK(Exec(span, listener));
}
} else {
// Kernel cannot execute chunkwise. If we have any chunked
// arrays, then VectorKernel::exec_chunked must be defined
// otherwise we raise an error
// Some vector kernels have a separate code path for handling
// chunked arrays (VectorKernel::exec_chunked), so we check if we
// have any chunked arrays. If we do and an exec_chunked function
// is defined then we call that.
bool have_chunked_arrays = false;
for (const Datum& arg : batch.values) {
if (arg.is_chunked_array()) {
have_chunked_arrays = true;
break;
}
}

if (have_chunked_arrays) {
// Kernel cannot execute chunkwise. If we have any chunked
// arrays, then VectorKernel::exec_chunked must be defined
// otherwise we raise an error
RETURN_NOT_OK(ExecChunked(batch, listener));
} else {
// No chunked arrays. We pack the args into an ExecSpan and
Expand Down Expand Up @@ -1123,6 +1143,34 @@ class VectorExecutor : public KernelExecutorImpl<VectorKernel> {
}
}

Status SetupPreallocation(const std::vector<Datum>& args) {
output_num_buffers_ = static_cast<int>(output_type_.type->layout().buffers.size());
auto out_type_id = output_type_.type->id();

// Decide if we need to preallocate memory for this kernel
validity_preallocated_ = false;
if (out_type_id != Type::NA) {
if (kernel_->null_handling == NullHandling::COMPUTED_PREALLOCATE) {
// Override the flag if kernel asks for pre-allocation
validity_preallocated_ = true;
} else if (kernel_->null_handling == NullHandling::INTERSECTION) {
bool elide_validity_bitmap = true;
for (const auto& arg : args) {
auto null_gen = NullGeneralization::Get(arg) == NullGeneralization::ALL_VALID;

// If not all valid, this becomes false
elide_validity_bitmap = elide_validity_bitmap && null_gen;
}
validity_preallocated_ = !elide_validity_bitmap;
}
}
if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) {
data_preallocated_.clear();
ComputeDataPreallocate(*output_type_.type, &data_preallocated_);
}
return Status::OK();
}

ExecSpanIterator span_iterator_;
std::vector<Datum> results_;
};
Expand Down
3 changes: 0 additions & 3 deletions cpp/src/arrow/compute/exec_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,6 @@ class ARROW_EXPORT KernelExecutor {
/// for all scanned batches in a dataset filter.
virtual Status Init(KernelContext*, KernelInitArgs) = 0;

// TODO(wesm): per ARROW-16819, adding ExecBatch variant so that a batch
// length can be passed in for scalar functions; will have to return and
// clean a bunch of things up
virtual Status Execute(const ExecBatch& batch, ExecListener* listener) = 0;

virtual Datum WrapResults(const std::vector<Datum>& args,
Expand Down

0 comments on commit bdcd19d

Please sign in to comment.