Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-857] Further optimizations of validity buffer split (#915)
Browse files Browse the repository at this point in the history
further optimization of validity buffer split. Get 8 bit each time and set the destination.
  • Loading branch information
FelixYBW authored May 11, 2022
1 parent a154e27 commit 3e97b3d
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 50 deletions.
148 changes: 98 additions & 50 deletions native-sql-engine/cpp/src/shuffle/splitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <gandiva/projector.h>
#include <gandiva/tree_expr_builder.h>
#include <immintrin.h>
#include <x86intrin.h>

#include <cstring>
#include <memory>
Expand Down Expand Up @@ -945,7 +946,7 @@ arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) {
// check input_fixed_width_has_null_[col] is cheaper than GetNullCount()
// once input_fixed_width_has_null_ is set to true, we didn't reset it after spill
if (input_fixed_width_has_null_[col] == false &&
rb.column_data(col_idx)->GetNullCount() != 0) {
rb.column_data(col_idx)->GetNullCount() > 0) {
input_fixed_width_has_null_[col] = true;
}
}
Expand Down Expand Up @@ -1173,28 +1174,11 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb)
break;
#endif
case 1: // arrow::BooleanType::type_id:
std::copy(partition_buffer_idx_base_.begin(), partition_buffer_idx_base_.end(),
partition_buffer_dst_offset_.begin());
#ifdef PROCESSROW
// assume batch size = 32k; reducer# = 4K; row/reducer = 8
for (auto pid = 0; pid < num_partitions_; pid++) {
auto dst_pid_base =
reinterpret_cast<uint8_t*>(partition_buffer_dst_addr_[pid]); /*32k*/
auto r = reducer_offset_offset_[pid]; /*8k*/
auto size = reducer_offset_offset_[pid + 1];
for (r; r < size; r++) {
auto src_offset = reducer_offsets_[r]; /*16k*/
row_offset_type dst_offset = partition_buffer_dst_offset_[pid];
uint8_t dst = dst_addrs[pid][dst_offset >> 3];
dst ^=
(dst >> (dst_offset & 7) ^ src_addr[src_offset >> 3] >> (src_offset & 7))
<< (dst_offset & 7);
dst_addrs[pid][dst_offset >> 3] = dst;
_mm_prefetch(&(src_addr)[src_offset + 64], _MM_HINT_T0);
partition_buffer_dst_offset_[pid]++;
}
}
RETURN_NOT_OK(SplitBoolType(src_addr, dst_addrs));
#else
std::copy(partition_buffer_idx_base_.begin(), partition_buffer_idx_base_.end(),
partition_buffer_dst_offset_.begin());
for (auto row = 0; row < num_rows; ++row) {
auto pid = partition_id_[row];
row_offset_type dst_offset = partition_buffer_dst_offset_[pid];
Expand Down Expand Up @@ -1380,6 +1364,96 @@ arrow::Status Splitter::SplitFixedWidthValueBufferAVX(const arrow::RecordBatch&
return arrow::Status::OK();
}
#endif
arrow::Status Splitter::SplitBoolType(const uint8_t* src_addr,
const std::vector<uint8_t*>& dst_addrs) {
// assume batch size = 32k; reducer# = 4K; row/reducer = 8
for (auto pid = 0; pid < num_partitions_; pid++) {
// set the last byte
auto dstaddr = dst_addrs[pid];
if (partition_id_cnt_[pid] > 0 && dstaddr != nullptr) {
auto r = reducer_offset_offset_[pid]; /*8k*/
auto size = reducer_offset_offset_[pid + 1];
row_offset_type dst_offset = partition_buffer_idx_base_[pid];
row_offset_type dst_offset_in_byte = (8 - (dst_offset & 0x7)) & 0x7;
row_offset_type dst_idx_byte = dst_offset_in_byte;
uint8_t dst = dstaddr[dst_offset >> 3];
if (pid + 1 < num_partitions_) {
_mm_prefetch(&dstaddr[partition_buffer_idx_base_[pid + 1] >> 3], _MM_HINT_T1);
}
for (r; r < size && dst_idx_byte > 0; r++, dst_idx_byte--) {
auto src_offset = reducer_offsets_[r]; /*16k*/
uint8_t src = src_addr[src_offset >> 3];
src =
src >> (src_offset & 7) | 0xfe; // get the bit in bit 0, other bits set to 1
src = __rolb(src, 8 - dst_idx_byte);
dst = dst & src; // only take the useful bit.
}
dstaddr[dst_offset >> 3] = dst;
dst_offset += dst_offset_in_byte;
// now dst_offset is 8 aligned
for (r; r + 8 < size; r += 8) {
uint8_t src = 0;
auto src_offset = reducer_offsets_[r]; /*16k*/
src = src_addr[src_offset >> 3];
_mm_prefetch(&(src_addr)[(src_offset >> 3) + 64], _MM_HINT_T0);
dst =
src >> (src_offset & 7) | 0xfe; // get the bit in bit 0, other bits set to 1

src_offset = reducer_offsets_[r + 1]; /*16k*/
src = src_addr[src_offset >> 3];
dst &= src >> (src_offset & 7) << 1 |
0xfd; // get the bit in bit 0, other bits set to 1

src_offset = reducer_offsets_[r + 2]; /*16k*/
src = src_addr[src_offset >> 3];
dst &= src >> (src_offset & 7) << 2 |
0xfb; // get the bit in bit 0, other bits set to 1

src_offset = reducer_offsets_[r + 3]; /*16k*/
src = src_addr[src_offset >> 3];
dst &= src >> (src_offset & 7) << 3 |
0xf7; // get the bit in bit 0, other bits set to 1

src_offset = reducer_offsets_[r + 4]; /*16k*/
src = src_addr[src_offset >> 3];
dst &= src >> (src_offset & 7) << 4 |
0xef; // get the bit in bit 0, other bits set to 1

src_offset = reducer_offsets_[r + 5]; /*16k*/
src = src_addr[src_offset >> 3];
dst &= src >> (src_offset & 7) << 5 |
0xdf; // get the bit in bit 0, other bits set to 1

src_offset = reducer_offsets_[r + 6]; /*16k*/
src = src_addr[src_offset >> 3];
dst &= src >> (src_offset & 7) << 6 |
0xbf; // get the bit in bit 0, other bits set to 1

src_offset = reducer_offsets_[r + 7]; /*16k*/
src = src_addr[src_offset >> 3];
dst &= src >> (src_offset & 7) << 7 |
0x7f; // get the bit in bit 0, other bits set to 1

dstaddr[dst_offset >> 3] = dst;
dst_offset += 8;
//_mm_prefetch(dstaddr + (dst_offset >> 3) + 64, _MM_HINT_T0);
}
// last byte, set it to 0xff is ok
dst = 0xff;
dst_idx_byte = 0;
for (r; r < size; r++, dst_idx_byte++) {
auto src_offset = reducer_offsets_[r]; /*16k*/
uint8_t src = src_addr[src_offset >> 3];
src =
src >> (src_offset & 7) | 0xfe; // get the bit in bit 0, other bits set to 1
src = __rolb(src, dst_idx_byte);
dst = dst & src; // only take the useful bit.
}
dstaddr[dst_offset >> 3] = dst;
}
}
return arrow::Status::OK();
}

arrow::Status Splitter::SplitFixedWidthValidityBuffer(const arrow::RecordBatch& rb) {
const auto num_rows = rb.num_rows();
Expand All @@ -1404,37 +1478,11 @@ arrow::Status Splitter::SplitFixedWidthValidityBuffer(const arrow::RecordBatch&
}
}
auto src_addr = const_cast<uint8_t*>(rb.column_data(col_idx)->buffers[0]->data());
std::copy(partition_buffer_idx_base_.begin(), partition_buffer_idx_base_.end(),
partition_buffer_dst_offset_.begin());
#ifdef PROCESSROW
// assume batch size = 32k; reducer# = 4K; row/reducer = 8
for (auto pid = 0; pid < num_partitions_; pid++) {
auto dst_pid_base =
reinterpret_cast<uint8_t*>(partition_buffer_dst_addr_[pid]); /*32k*/
auto r = reducer_offset_offset_[pid]; /*8k*/
auto size = reducer_offset_offset_[pid + 1];
for (r; r < size; r++) {
auto src_offset = reducer_offsets_[r]; /*16k*/
row_offset_type dst_offset = partition_buffer_dst_offset_[pid];
uint8_t dst = dst_addrs[pid][dst_offset >> 3];
dst ^= (dst >> (dst_offset & 7) ^ src_addr[src_offset >> 3] >> (src_offset & 7))
<< (dst_offset & 7);
dst_addrs[pid][dst_offset >> 3] = dst;
_mm_prefetch(&(src_addr)[src_offset + 64], _MM_HINT_T0);
partition_buffer_dst_offset_[pid]++;
}
// set the last byte
if (partition_id_cnt_[pid] > 0 && dst_addrs[pid] != nullptr) {
auto lastoffset = partition_buffer_dst_offset_[pid];
uint8_t dst = dst_addrs[pid][lastoffset >> 3];
uint8_t msk = 0x1 << (lastoffset & 0x7);
msk = ~(msk - 1);
msk &= ((lastoffset & 7) == 0) - 1;
dst |= msk;
dst_addrs[pid][lastoffset >> 3] = dst;
}
}
RETURN_NOT_OK(SplitBoolType(src_addr, dst_addrs));
#else
std::copy(partition_buffer_idx_base_.begin(), partition_buffer_idx_base_.end(),
partition_buffer_dst_offset_.begin());
for (auto row = 0; row < num_rows; ++row) {
auto pid = partition_id_[row];
auto dst_offset = partition_buffer_dst_offset_[pid];
Expand Down
2 changes: 2 additions & 0 deletions native-sql-engine/cpp/src/shuffle/splitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ class Splitter {
#if defined(COLUMNAR_PLUGIN_USE_AVX512)
arrow::Status SplitFixedWidthValueBufferAVX(const arrow::RecordBatch& rb);
#endif
arrow::Status SplitBoolType(const uint8_t* src_addr,
const std::vector<uint8_t*>& dst_addrs);

arrow::Status SplitFixedWidthValidityBuffer(const arrow::RecordBatch& rb);

Expand Down

0 comments on commit 3e97b3d

Please sign in to comment.