Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-943] Optimize data conversion for String/Binary type in Row2Colu…
Browse files Browse the repository at this point in the history
…mnar (#969)

* [NSE-943] Optimize String/Binary Type for Row2Columnar

* Fix TPCDS queries

* Add __AVX512BW__ Check
  • Loading branch information
zhixingheyi-tian authored Jun 28, 2022
1 parent c4d4a65 commit 1affa5b
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 45 deletions.
113 changes: 68 additions & 45 deletions native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

#include "operators/row_to_columnar_converter.h"

#include <immintrin.h>
#include <x86intrin.h>

#include <algorithm>
#include <iostream>

Expand Down Expand Up @@ -46,7 +49,7 @@ arrow::Status CreateArrayData(std::shared_ptr<arrow::Schema> schema, int64_t num
int32_t columnar_id, int64_t fieldOffset,
std::vector<int64_t>& offsets, uint8_t* memory_address_,
std::shared_ptr<arrow::Array>* array,
arrow::MemoryPool* pool) {
arrow::MemoryPool* pool, bool support_avx512) {
auto field = schema->field(columnar_id);
auto type = field->type();

Expand Down Expand Up @@ -281,58 +284,76 @@ arrow::Status CreateArrayData(std::shared_ptr<arrow::Schema> schema, int64_t num
*array = MakeArray(std::make_shared<arrow::ArrayData>(std::move(out_data)));
break;
}
case arrow::BinaryType::type_id: {
std::unique_ptr<arrow::TypeTraits<arrow::BinaryType>::BuilderType> builder_;
std::unique_ptr<arrow::ArrayBuilder> array_builder;
arrow::MakeBuilder(pool, arrow::TypeTraits<arrow::BinaryType>::type_singleton(),
&array_builder);
builder_.reset(arrow::internal::checked_cast<
arrow::TypeTraits<arrow::BinaryType>::BuilderType*>(
array_builder.release()));

using offset_type = typename arrow::BinaryType::offset_type;
for (int64_t position = 0; position < num_rows; position++) {
bool is_null = IsNull(memory_address_ + offsets[position], columnar_id);
if (is_null) {
RETURN_NOT_OK(builder_->AppendNull());
} else {
int64_t offsetAndSize;
memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset,
sizeof(int64_t));
offset_type length = int32_t(offsetAndSize);
int32_t wordoffset = int32_t(offsetAndSize >> 32);
RETURN_NOT_OK(
builder_->Append(memory_address_ + offsets[position] + wordoffset, length));
}
}
auto status = builder_->Finish(array);
break;
}
case arrow::BinaryType::type_id:
case arrow::StringType::type_id: {
std::unique_ptr<arrow::TypeTraits<arrow::StringType>::BuilderType> builder_;
std::unique_ptr<arrow::ArrayBuilder> array_builder;
arrow::MakeBuilder(pool, arrow::TypeTraits<arrow::StringType>::type_singleton(),
&array_builder);
builder_.reset(arrow::internal::checked_cast<
arrow::TypeTraits<arrow::StringType>::BuilderType*>(
array_builder.release()));

arrow::ArrayData out_data;
out_data.length = num_rows;
out_data.buffers.resize(3);
out_data.type = field->type();
using offset_type = typename arrow::StringType::offset_type;
ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool));
ARROW_ASSIGN_OR_RAISE(out_data.buffers[1],
AllocateBuffer(sizeof(offset_type) * (num_rows + 1), pool));
ARROW_ASSIGN_OR_RAISE(out_data.buffers[2],
AllocateResizableBuffer(20 * num_rows, pool));
auto validity_buffer = out_data.buffers[0]->mutable_data();
// initialize all true once allocated
memset(validity_buffer, 0xff, out_data.buffers[0]->capacity());
auto array_offset = out_data.GetMutableValues<offset_type>(1);
auto array_data = out_data.buffers[2]->mutable_data();
int64_t null_count = 0;

array_offset[0] = 0;
for (int64_t position = 0; position < num_rows; position++) {
bool is_null = IsNull(memory_address_ + offsets[position], columnar_id);
if (is_null) {
RETURN_NOT_OK(builder_->AppendNull());
arrow::BitUtil::SetBitTo(validity_buffer, position, false);
array_offset[position + 1] = array_offset[position];
null_count++;
} else {
int64_t offsetAndSize;
memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset,
sizeof(int64_t));
int64_t offsetAndSize =
*(int64_t*)(memory_address_ + offsets[position] + fieldOffset);
offset_type length = int32_t(offsetAndSize);
int32_t wordoffset = int32_t(offsetAndSize >> 32);
RETURN_NOT_OK(
builder_->Append(memory_address_ + offsets[position] + wordoffset, length));
auto value_offset = array_offset[position + 1] =
array_offset[position] + length;
uint64_t capacity = out_data.buffers[2]->capacity();

if (ARROW_PREDICT_FALSE(value_offset >= capacity)) {
// allocate value buffer again
// enlarge the buffer by 1.5x
capacity = capacity + std::max((capacity >> 1), (uint64_t)length);
auto value_buffer =
std::static_pointer_cast<arrow::ResizableBuffer>(out_data.buffers[2]);
value_buffer->Reserve(capacity);
array_data = value_buffer->mutable_data();
}

auto dst_value_base = array_data + array_offset[position];
auto value_src_ptr = memory_address_ + offsets[position] + wordoffset;
#ifdef __AVX512BW__
if (ARROW_PREDICT_TRUE(support_avx512)) {
// write the variable value
uint32_t k;
for (k = 0; k + 32 < length; k += 32) {
__m256i v = _mm256_loadu_si256((const __m256i*)(value_src_ptr + k));
_mm256_storeu_si256((__m256i*)(dst_value_base + k), v);
}
auto mask = (1L << (length - k)) - 1;
__m256i v = _mm256_maskz_loadu_epi8(mask, value_src_ptr + k);
_mm256_mask_storeu_epi8(dst_value_base + k, mask, v);
} else
#endif
{
memcpy(dst_value_base, value_src_ptr, length);
}
}
}
auto status = builder_->Finish(array);
out_data.null_count = null_count;
if (null_count == 0) {
out_data.buffers[0] == nullptr;
}
*array = MakeArray(std::make_shared<arrow::ArrayData>(std::move(out_data)));
break;
}
case arrow::Decimal128Type::type_id: {
Expand Down Expand Up @@ -967,6 +988,7 @@ arrow::Status CreateArrayData(std::shared_ptr<arrow::Schema> schema, int64_t num
}

arrow::Status RowToColumnarConverter::Init(std::shared_ptr<arrow::RecordBatch>* batch) {
support_avx512_ = __builtin_cpu_supports("avx512bw");
int64_t nullBitsetWidthInBytes = CalculateBitSetWidthInBytes(num_cols_);
for (auto i = 0; i < num_rows_; i++) {
offsets_.push_back(0);
Expand All @@ -982,12 +1004,13 @@ arrow::Status RowToColumnarConverter::Init(std::shared_ptr<arrow::RecordBatch>*
std::shared_ptr<arrow::Array> array_data;
int64_t field_offset = GetFieldOffset(nullBitsetWidthInBytes, i);
RETURN_NOT_OK(CreateArrayData(schema_, num_rows_, i, field_offset, offsets_,
memory_address_, &array_data, m_pool_));
memory_address_, &array_data, m_pool_,
support_avx512_));
arrays.push_back(array_data);
}
*batch = arrow::RecordBatch::Make(schema_, num_rows_, arrays);
return arrow::Status::OK();
}

} // namespace rowtocolumnar
} // namespace sparkcolumnarplugin
} // namespace sparkcolumnarplugin
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class RowToColumnarConverter {
arrow::Status Init(std::shared_ptr<arrow::RecordBatch>* batch);

protected:
// Check whether support AVX512 instructions
bool support_avx512_;
std::shared_ptr<arrow::Schema> schema_;
int64_t num_cols_;
int64_t num_rows_;
Expand Down

0 comments on commit 1affa5b

Please sign in to comment.