Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-857] Further optimizations of validity buffer split #915

Merged
merged 4 commits into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 98 additions & 50 deletions native-sql-engine/cpp/src/shuffle/splitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <gandiva/projector.h>
#include <gandiva/tree_expr_builder.h>
#include <immintrin.h>
#include <x86intrin.h>

#include <cstring>
#include <memory>
Expand Down Expand Up @@ -945,7 +946,7 @@ arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) {
// check input_fixed_width_has_null_[col] is cheaper than GetNullCount()
// once input_fixed_width_has_null_ is set to true, we didn't reset it after spill
if (input_fixed_width_has_null_[col] == false &&
rb.column_data(col_idx)->GetNullCount() != 0) {
rb.column_data(col_idx)->GetNullCount() > 0) {
input_fixed_width_has_null_[col] = true;
}
}
Expand Down Expand Up @@ -1173,28 +1174,11 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb)
break;
#endif
case 1: // arrow::BooleanType::type_id:
std::copy(partition_buffer_idx_base_.begin(), partition_buffer_idx_base_.end(),
partition_buffer_dst_offset_.begin());
#ifdef PROCESSROW
// assume batch size = 32k; reducer# = 4K; row/reducer = 8
for (auto pid = 0; pid < num_partitions_; pid++) {
auto dst_pid_base =
reinterpret_cast<uint8_t*>(partition_buffer_dst_addr_[pid]); /*32k*/
auto r = reducer_offset_offset_[pid]; /*8k*/
auto size = reducer_offset_offset_[pid + 1];
for (r; r < size; r++) {
auto src_offset = reducer_offsets_[r]; /*16k*/
row_offset_type dst_offset = partition_buffer_dst_offset_[pid];
uint8_t dst = dst_addrs[pid][dst_offset >> 3];
dst ^=
(dst >> (dst_offset & 7) ^ src_addr[src_offset >> 3] >> (src_offset & 7))
<< (dst_offset & 7);
dst_addrs[pid][dst_offset >> 3] = dst;
_mm_prefetch(&(src_addr)[src_offset + 64], _MM_HINT_T0);
partition_buffer_dst_offset_[pid]++;
}
}
RETURN_NOT_OK(SplitBoolType(src_addr, dst_addrs));
#else
std::copy(partition_buffer_idx_base_.begin(), partition_buffer_idx_base_.end(),
partition_buffer_dst_offset_.begin());
for (auto row = 0; row < num_rows; ++row) {
auto pid = partition_id_[row];
row_offset_type dst_offset = partition_buffer_dst_offset_[pid];
Expand Down Expand Up @@ -1380,6 +1364,96 @@ arrow::Status Splitter::SplitFixedWidthValueBufferAVX(const arrow::RecordBatch&
return arrow::Status::OK();
}
#endif
arrow::Status Splitter::SplitBoolType(const uint8_t* src_addr,
const std::vector<uint8_t*>& dst_addrs) {
// assume batch size = 32k; reducer# = 4K; row/reducer = 8
for (auto pid = 0; pid < num_partitions_; pid++) {
// set the last byte
auto dstaddr = dst_addrs[pid];
if (partition_id_cnt_[pid] > 0 && dstaddr != nullptr) {
auto r = reducer_offset_offset_[pid]; /*8k*/
auto size = reducer_offset_offset_[pid + 1];
row_offset_type dst_offset = partition_buffer_idx_base_[pid];
row_offset_type dst_offset_in_byte = (8 - (dst_offset & 0x7)) & 0x7;
row_offset_type dst_idx_byte = dst_offset_in_byte;
uint8_t dst = dstaddr[dst_offset >> 3];
if (pid + 1 < num_partitions_) {
_mm_prefetch(&dstaddr[partition_buffer_idx_base_[pid + 1] >> 3], _MM_HINT_T1);
}
for (r; r < size && dst_idx_byte > 0; r++, dst_idx_byte--) {
auto src_offset = reducer_offsets_[r]; /*16k*/
uint8_t src = src_addr[src_offset >> 3];
src =
src >> (src_offset & 7) | 0xfe; // get the bit in bit 0, other bits set to 1
src = __rolb(src, 8 - dst_idx_byte);
dst = dst & src; // only take the useful bit.
}
dstaddr[dst_offset >> 3] = dst;
dst_offset += dst_offset_in_byte;
// now dst_offset is 8 aligned
for (r; r + 8 < size; r += 8) {
uint8_t src = 0;
auto src_offset = reducer_offsets_[r]; /*16k*/
src = src_addr[src_offset >> 3];
_mm_prefetch(&(src_addr)[(src_offset >> 3) + 64], _MM_HINT_T0);
dst =
src >> (src_offset & 7) | 0xfe; // get the bit in bit 0, other bits set to 1

src_offset = reducer_offsets_[r + 1]; /*16k*/
src = src_addr[src_offset >> 3];
dst &= src >> (src_offset & 7) << 1 |
0xfd; // get the bit in bit 0, other bits set to 1

src_offset = reducer_offsets_[r + 2]; /*16k*/
src = src_addr[src_offset >> 3];
dst &= src >> (src_offset & 7) << 2 |
0xfb; // get the bit in bit 0, other bits set to 1

src_offset = reducer_offsets_[r + 3]; /*16k*/
src = src_addr[src_offset >> 3];
dst &= src >> (src_offset & 7) << 3 |
0xf7; // get the bit in bit 0, other bits set to 1

src_offset = reducer_offsets_[r + 4]; /*16k*/
src = src_addr[src_offset >> 3];
dst &= src >> (src_offset & 7) << 4 |
0xef; // get the bit in bit 0, other bits set to 1

src_offset = reducer_offsets_[r + 5]; /*16k*/
src = src_addr[src_offset >> 3];
dst &= src >> (src_offset & 7) << 5 |
0xdf; // get the bit in bit 0, other bits set to 1

src_offset = reducer_offsets_[r + 6]; /*16k*/
src = src_addr[src_offset >> 3];
dst &= src >> (src_offset & 7) << 6 |
0xbf; // get the bit in bit 0, other bits set to 1

src_offset = reducer_offsets_[r + 7]; /*16k*/
src = src_addr[src_offset >> 3];
dst &= src >> (src_offset & 7) << 7 |
0x7f; // get the bit in bit 0, other bits set to 1

dstaddr[dst_offset >> 3] = dst;
dst_offset += 8;
//_mm_prefetch(dstaddr + (dst_offset >> 3) + 64, _MM_HINT_T0);
}
// last byte, set it to 0xff is ok
dst = 0xff;
dst_idx_byte = 0;
for (r; r < size; r++, dst_idx_byte++) {
auto src_offset = reducer_offsets_[r]; /*16k*/
uint8_t src = src_addr[src_offset >> 3];
src =
src >> (src_offset & 7) | 0xfe; // get the bit in bit 0, other bits set to 1
src = __rolb(src, dst_idx_byte);
dst = dst & src; // only take the useful bit.
}
dstaddr[dst_offset >> 3] = dst;
}
}
return arrow::Status::OK();
}

arrow::Status Splitter::SplitFixedWidthValidityBuffer(const arrow::RecordBatch& rb) {
const auto num_rows = rb.num_rows();
Expand All @@ -1404,37 +1478,11 @@ arrow::Status Splitter::SplitFixedWidthValidityBuffer(const arrow::RecordBatch&
}
}
auto src_addr = const_cast<uint8_t*>(rb.column_data(col_idx)->buffers[0]->data());
std::copy(partition_buffer_idx_base_.begin(), partition_buffer_idx_base_.end(),
partition_buffer_dst_offset_.begin());
#ifdef PROCESSROW
// assume batch size = 32k; reducer# = 4K; row/reducer = 8
for (auto pid = 0; pid < num_partitions_; pid++) {
auto dst_pid_base =
reinterpret_cast<uint8_t*>(partition_buffer_dst_addr_[pid]); /*32k*/
auto r = reducer_offset_offset_[pid]; /*8k*/
auto size = reducer_offset_offset_[pid + 1];
for (r; r < size; r++) {
auto src_offset = reducer_offsets_[r]; /*16k*/
row_offset_type dst_offset = partition_buffer_dst_offset_[pid];
uint8_t dst = dst_addrs[pid][dst_offset >> 3];
dst ^= (dst >> (dst_offset & 7) ^ src_addr[src_offset >> 3] >> (src_offset & 7))
<< (dst_offset & 7);
dst_addrs[pid][dst_offset >> 3] = dst;
_mm_prefetch(&(src_addr)[src_offset + 64], _MM_HINT_T0);
partition_buffer_dst_offset_[pid]++;
}
// set the last byte
if (partition_id_cnt_[pid] > 0 && dst_addrs[pid] != nullptr) {
auto lastoffset = partition_buffer_dst_offset_[pid];
uint8_t dst = dst_addrs[pid][lastoffset >> 3];
uint8_t msk = 0x1 << (lastoffset & 0x7);
msk = ~(msk - 1);
msk &= ((lastoffset & 7) == 0) - 1;
dst |= msk;
dst_addrs[pid][lastoffset >> 3] = dst;
}
}
RETURN_NOT_OK(SplitBoolType(src_addr, dst_addrs));
#else
std::copy(partition_buffer_idx_base_.begin(), partition_buffer_idx_base_.end(),
partition_buffer_dst_offset_.begin());
for (auto row = 0; row < num_rows; ++row) {
auto pid = partition_id_[row];
auto dst_offset = partition_buffer_dst_offset_[pid];
Expand Down
2 changes: 2 additions & 0 deletions native-sql-engine/cpp/src/shuffle/splitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ class Splitter {
#if defined(COLUMNAR_PLUGIN_USE_AVX512)
arrow::Status SplitFixedWidthValueBufferAVX(const arrow::RecordBatch& rb);
#endif
arrow::Status SplitBoolType(const uint8_t* src_addr,
const std::vector<uint8_t*>& dst_addrs);

arrow::Status SplitFixedWidthValidityBuffer(const arrow::RecordBatch& rb);

Expand Down