diff --git a/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc b/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc index d2bffe36a..106d7dba8 100644 --- a/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc +++ b/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc @@ -24,12 +24,25 @@ #include //#include #include +#include #include #include #include #include +#include #include +void print_trace(void) { + char** strings; + size_t i, size; + enum Constexpr { MAX_SIZE = 1024 }; + void* array[MAX_SIZE]; + size = backtrace(array, MAX_SIZE); + strings = backtrace_symbols(array, size); + for (i = 0; i < size; i++) printf(" %s\n", strings[i]); + puts(""); + free(strings); +} #include "codegen/code_generator.h" #include "codegen/code_generator_factory.h" @@ -38,9 +51,112 @@ namespace sparkcolumnarplugin { namespace shuffle { +#define ALIGNMENT 2048 * 1024 + const int batch_buffer_size = 32768; const int split_buffer_size = 8192; +class MyMemoryPool : public arrow::MemoryPool { + public: + explicit MyMemoryPool() {} + + Status Allocate(int64_t size, uint8_t** out) override { + RETURN_NOT_OK(pool_->Allocate(size, out)); + stats_.UpdateAllocatedBytes(size); + // std::cout << "Allocate: size = " << size << " addr = " << std::hex << + // (uint64_t)*out << std::dec << std::endl; print_trace(); + return arrow::Status::OK(); + } + + Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override { + auto old_ptr = *ptr; + RETURN_NOT_OK(pool_->Reallocate(old_size, new_size, ptr)); + stats_.UpdateAllocatedBytes(new_size - old_size); + // std::cout << "Reallocate: old_size = " << old_size << " old_ptr = " << std::hex << + // (uint64_t)old_ptr << std::dec << " new_size = " << new_size << " addr = " << + // std::hex << (uint64_t)*ptr << std::dec << std::endl; print_trace(); + return arrow::Status::OK(); + } + + void Free(uint8_t* buffer, int64_t size) override { + pool_->Free(buffer, size); + stats_.UpdateAllocatedBytes(-size); + // std::cout << "Free: size = " << size << " addr = " << std::hex << (uint64_t)buffer + // << std::dec << std::endl; print_trace(); + } + + int64_t bytes_allocated() const override { return stats_.bytes_allocated(); } + + int64_t max_memory() const override { return pool_->max_memory(); } + + std::string backend_name() const override { return pool_->backend_name(); } + + private: + MemoryPool* pool_ = arrow::default_memory_pool(); + arrow::internal::MemoryPoolStats stats_; +}; + +#define ENABLELARGEPAGE + +class LargePageMemoryPool : public MemoryPool { + public: + explicit LargePageMemoryPool() {} + + ~LargePageMemoryPool() override = default; + + Status Allocate(int64_t size, uint8_t** out) override { +#ifdef ENABLELARGEPAGE + if (size < 2 * 1024 * 1024) { + return pool_->Allocate(size, out); + } else { + Status st = pool_->AlignAllocate(size, out, ALIGNMENT); + madvise(*out, size, /*MADV_HUGEPAGE */ 14); + //std::cout << "Allocate: size = " << size << " addr = " \ + // << std::hex << (uint64_t)*out << " end = " << std::hex << (uint64_t)(*out+size) << std::dec << std::endl; + return st; + } +#else + return pool_->Allocate(size, out); +#endif + } + + Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override { + return pool_->Reallocate(old_size, new_size, ptr); +#ifdef ENABLELARGEPAGE + if (new_size < 2 * 1024 * 1024) { + return pool_->Reallocate(old_size, new_size, ptr); + } else { + Status st = pool_->AlignReallocate(old_size, new_size, ptr, ALIGNMENT); + // madvise(*ptr, new_size, /*MADV_HUGEPAGE */ 14); + return st; + } +#else + return pool_->Reallocate(old_size, new_size, ptr); +#endif + } + + void Free(uint8_t* buffer, int64_t size) override { +#ifdef ENABLELARGEPAGE + if (size < 2 * 1024 * 1024) { + pool_->Free(buffer, size); + } else { + pool_->Free(buffer, size, ALIGNMENT); + } +#else + pool_->Free(buffer, size); +#endif + } + + int64_t bytes_allocated() const override { return pool_->bytes_allocated(); } + + int64_t max_memory() const override { return pool_->max_memory(); } + + std::string backend_name() const override { return "LargePageMemoryPool"; } + + private: + MemoryPool* pool_ = arrow::default_memory_pool(); +}; + class BenchmarkShuffleSplit { public: BenchmarkShuffleSplit(std::string file_name) { GetRecordBatchReader(file_name); } @@ -89,6 +205,8 @@ class BenchmarkShuffleSplit { SetCPU(state.thread_index()); arrow::Compression::type compression_type = (arrow::Compression::type)state.range(1); + std::shared_ptr pool = std::make_shared(); + const int num_partitions = state.range(0); auto options = SplitOptions::Defaults(); @@ -98,6 +216,7 @@ class BenchmarkShuffleSplit { options.offheap_per_task = 128 * 1024 * 1024 * 1024L; options.prefer_spill = true; options.write_schema = false; + options.memory_pool = pool.get(); std::shared_ptr splitter; int64_t elapse_read = 0; @@ -166,6 +285,7 @@ class BenchmarkShuffleSplit { splitter->TotalWriteTime(); state.counters["split_time"] = benchmark::Counter( split_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + splitter.reset(); } protected: @@ -201,26 +321,27 @@ class BenchmarkShuffleSplit_CacheScan_Benchmark : public BenchmarkShuffleSplit { const int num_partitions, SplitOptions options, benchmark::State& state) { std::vector local_column_indices; local_column_indices.push_back(0); - local_column_indices.push_back(1); - local_column_indices.push_back(2); - local_column_indices.push_back(4); - local_column_indices.push_back(5); - local_column_indices.push_back(6); - local_column_indices.push_back(7); + /* local_column_indices.push_back(0); + local_column_indices.push_back(1); + local_column_indices.push_back(2); + local_column_indices.push_back(4); + local_column_indices.push_back(5); + local_column_indices.push_back(6); + local_column_indices.push_back(7);*/ std::shared_ptr local_schema; local_schema = std::make_shared(*schema.get()); - ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(15)); - ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(14)); - ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(13)); - ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(12)); - ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(11)); - ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(10)); - ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(9)); - ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(8)); - ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(3)); - + /* ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(15)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(14)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(13)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(12)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(11)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(10)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(9)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(8)); + ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(3)); + */ if (state.thread_index() == 0) std::cout << local_schema->ToString() << std::endl; ARROW_ASSIGN_OR_THROW(splitter, @@ -251,11 +372,13 @@ class BenchmarkShuffleSplit_CacheScan_Benchmark : public BenchmarkShuffleSplit { std::cout << "batches = " << num_batches << " rows = " << num_rows << std::endl; for (auto _ : state) { - for_each( - batches.begin(), batches.end(), - [&splitter, &split_time](std::shared_ptr& record_batch) { - TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch)); - }); + for_each(batches.begin(), batches.end(), + [&splitter, &split_time, + &options](std::shared_ptr& record_batch) { + TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch)); + }); + // std::cout << " split done memory allocated = " << + // options.memory_pool->bytes_allocated() << std::endl; } TIME_NANO_OR_THROW(split_time, splitter->Stop()); @@ -374,31 +497,30 @@ int main(int argc, char** argv) { ->MeasureProcessCPUTime() ->Unit(benchmark::kSecond); - /* sparkcolumnarplugin::shuffle::BenchmarkShuffleSplit_IterateScan_Benchmark - bck(datafile); - - benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck) - ->Iterations(1) - ->Args({96*2, arrow::Compression::FASTPFOR}) - ->Args({96*4, arrow::Compression::FASTPFOR}) - ->Args({96*8, arrow::Compression::FASTPFOR}) - ->Args({96*16, arrow::Compression::FASTPFOR}) - ->Args({96*32, arrow::Compression::FASTPFOR}) - ->Threads(24) - ->Unit(benchmark::kSecond); - - benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck) - ->Iterations(1) - ->Args({4096, arrow::Compression::FASTPFOR}) - ->Threads(1) - ->Threads(2) - ->Threads(4) - ->Threads(8) - ->Threads(16) - ->Threads(24) - ->Unit(benchmark::kSecond); + /* sparkcolumnarplugin::shuffle::BenchmarkShuffleSplit_IterateScan_Benchmark + bck(datafile); + + benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck) + ->Iterations(1) + ->Args({96*2, arrow::Compression::FASTPFOR}) + ->Args({96*4, arrow::Compression::FASTPFOR}) + ->Args({96*8, arrow::Compression::FASTPFOR}) + ->Args({96*16, arrow::Compression::FASTPFOR}) + ->Args({96*32, arrow::Compression::FASTPFOR}) + ->Threads(24) + ->Unit(benchmark::kSecond); + + benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck) + ->Iterations(1) + ->Args({4096, arrow::Compression::FASTPFOR}) + ->Threads(1) + ->Threads(2) + ->Threads(4) + ->Threads(8) + ->Threads(16) + ->Threads(24) + ->Unit(benchmark::kSecond); */ - benchmark::Initialize(&argc, argv); benchmark::RunSpecifiedBenchmarks(); benchmark::Shutdown(); diff --git a/native-sql-engine/cpp/src/shuffle/splitter.cc b/native-sql-engine/cpp/src/shuffle/splitter.cc index 7839c4ce4..5213d607a 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.cc +++ b/native-sql-engine/cpp/src/shuffle/splitter.cc @@ -47,6 +47,11 @@ namespace sparkcolumnarplugin { namespace shuffle { using arrow::internal::checked_cast; +#ifndef SPLIT_BUFFER_SIZE +// by default, allocate 8M block, 2M page size +#define SPLIT_BUFFER_SIZE 8 * 1024 * 1024 +#endif + template std::string __m128i_toString(const __m128i var) { std::stringstream sstr; @@ -298,6 +303,11 @@ arrow::Result> Splitter::Make( } arrow::Status Splitter::Init() { + // partition number should be less than 64k + ARROW_CHECK_LE(num_partitions_, 64 * 1024); + // split record batch size should be less than 32k + ARROW_CHECK_LE(options_.buffer_size, 32 * 1024); + const auto& fields = schema_->fields(); ARROW_ASSIGN_OR_RAISE(column_type_id_, ToSplitterTypeId(schema_->fields())); @@ -345,7 +355,6 @@ arrow::Status Splitter::Init() { auto num_fixed_width = fixed_width_array_idx_.size(); partition_fixed_width_validity_addrs_.resize(num_fixed_width); - column_has_null_.resize(num_fixed_width, false); partition_fixed_width_value_addrs_.resize(num_fixed_width); partition_fixed_width_buffers_.resize(num_fixed_width); binary_array_empirical_size_.resize(binary_array_idx_.size()); @@ -401,6 +410,32 @@ arrow::Status Splitter::Init() { tiny_bach_write_options_.codec, arrow::util::Codec::CreateInt32(arrow::Compression::UNCOMPRESSED)); + // Allocate first buffer for split reducer + ARROW_ASSIGN_OR_RAISE(combine_buffer_, + arrow::AllocateResizableBuffer(0, options_.memory_pool)); + combine_buffer_->Resize(0, /*shrink_to_fit =*/false); + + return arrow::Status::OK(); +} +arrow::Status Splitter::AllocateBufferFromPool(std::shared_ptr& buffer, + uint32_t size) { + // if size is already larger than buffer pool size, allocate it directly + // make size 64byte aligned + auto reminder = size & 0x3f; + size += (64 - reminder) & ((reminder == 0) - 1); + if (size > SPLIT_BUFFER_SIZE) { + ARROW_ASSIGN_OR_RAISE(buffer, + arrow::AllocateResizableBuffer(size, options_.memory_pool)); + return arrow::Status::OK(); + } else if (combine_buffer_->capacity() - combine_buffer_->size() < size) { + // memory pool is not enough + ARROW_ASSIGN_OR_RAISE(combine_buffer_, arrow::AllocateResizableBuffer( + SPLIT_BUFFER_SIZE, options_.memory_pool)); + combine_buffer_->Resize(0, /*shrink_to_fit = */ false); + } + buffer = arrow::SliceMutableBuffer(combine_buffer_, combine_buffer_->size(), size); + + combine_buffer_->Resize(combine_buffer_->size() + size, /*shrink_to_fit = */ false); return arrow::Status::OK(); } @@ -473,11 +508,15 @@ arrow::Status Splitter::Stop() { partition_lengths_[pid] = 0; } } + this->combine_buffer_.reset(); + this->schema_payload_.reset(); + partition_fixed_width_buffers_.clear(); // close data file output Stream RETURN_NOT_OK(data_file_os_->Close()); EVAL_END("write", options_.thread_id, options_.task_attempt_id) + return arrow::Status::OK(); } int64_t batch_nbytes(const arrow::RecordBatch& batch) { @@ -574,37 +613,33 @@ arrow::Status Splitter::CacheRecordBatch(int32_t partition_id, bool reset_buffer break; } default: { - auto& buffers = partition_fixed_width_buffers_[fixed_width_idx][partition_id]; + auto buffers = partition_fixed_width_buffers_[fixed_width_idx][partition_id]; if (buffers[0] != nullptr) { - buffers[0]->Resize((num_rows >> 3) + 1, /*shrink_to_fit =*/false); + buffers[0] = + arrow::SliceBuffer(buffers[0], 0, arrow::BitUtil::BytesForBits(num_rows)); } if (buffers[1] != nullptr) { if (column_type_id_[i]->id() == arrow::BooleanType::type_id) - buffers[1]->Resize((num_rows >> 3) + 1, /*shrink_to_fit =*/false); + buffers[1] = arrow::SliceBuffer(buffers[1], 0, + arrow::BitUtil::BytesForBits(num_rows)); else - buffers[1]->Resize( - num_rows * (arrow::bit_width(column_type_id_[i]->id()) >> 3), - /*shrink_to_fit =*/false); + buffers[1] = arrow::SliceBuffer( + buffers[1], 0, + num_rows * (arrow::bit_width(column_type_id_[i]->id()) >> 3)); } + arrays[i] = arrow::MakeArray(arrow::ArrayData::Make( + schema_->field(i)->type(), num_rows, {buffers[0], buffers[1]})); if (reset_buffers) { - arrays[i] = arrow::MakeArray( - arrow::ArrayData::Make(schema_->field(i)->type(), num_rows, - {std::move(buffers[0]), std::move(buffers[1])})); - buffers = {nullptr, nullptr}; partition_fixed_width_validity_addrs_[fixed_width_idx][partition_id] = nullptr; partition_fixed_width_value_addrs_[fixed_width_idx][partition_id] = nullptr; - } else { - arrays[i] = arrow::MakeArray(arrow::ArrayData::Make( - schema_->field(i)->type(), num_rows, {buffers[0], buffers[1]})); } fixed_width_idx++; break; } } } - auto batch = arrow::RecordBatch::Make(schema_, num_rows, std::move(arrays)); int64_t raw_size = batch_nbytes(batch); @@ -642,12 +677,14 @@ arrow::Status Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t n auto binary_idx = 0; auto large_binary_idx = 0; auto list_idx = 0; + auto total_size = 0; std::vector> new_binary_builders; std::vector> new_large_binary_builders; std::vector> new_list_builders; - std::vector> new_value_buffers; - std::vector> new_validity_buffers; + std::vector> new_value_buffers; + std::vector> new_validity_buffers; + for (auto i = 0; i < num_fields; ++i) { switch (column_type_id_[i]->id()) { case arrow::BinaryType::type_id: @@ -688,25 +725,24 @@ arrow::Status Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t n case arrow::NullType::type_id: break; default: { - std::shared_ptr value_buffer; + std::shared_ptr value_buffer; if (column_type_id_[i]->id() == arrow::BooleanType::type_id) { - ARROW_ASSIGN_OR_RAISE(value_buffer, arrow::AllocateResizableBuffer( - arrow::BitUtil::BytesForBits(new_size), - options_.memory_pool)); + auto status = AllocateBufferFromPool(value_buffer, + arrow::BitUtil::BytesForBits(new_size)); + ARROW_RETURN_NOT_OK(status); } else { - ARROW_ASSIGN_OR_RAISE( - value_buffer, - arrow::AllocateResizableBuffer( - new_size * (arrow::bit_width(column_type_id_[i]->id()) / 8), - options_.memory_pool)); + auto status = AllocateBufferFromPool( + value_buffer, new_size * (arrow::bit_width(column_type_id_[i]->id()) >> 3)); + ARROW_RETURN_NOT_OK(status); } new_value_buffers.push_back(std::move(value_buffer)); if (input_fixed_width_has_null_[fixed_width_idx]) { - std::shared_ptr validity_buffer; - ARROW_ASSIGN_OR_RAISE( - validity_buffer, - arrow::AllocateResizableBuffer(arrow::BitUtil::BytesForBits(new_size), - options_.memory_pool)); + std::shared_ptr validity_buffer; + auto status = AllocateBufferFromPool(validity_buffer, + arrow::BitUtil::BytesForBits(new_size)); + ARROW_RETURN_NOT_OK(status); + // initialize all true once allocated + memset(validity_buffer->mutable_data(), 0xff, validity_buffer->capacity()); new_validity_buffers.push_back(std::move(validity_buffer)); } else { new_validity_buffers.push_back(nullptr); @@ -746,10 +782,10 @@ arrow::Status Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t n break; default: partition_fixed_width_value_addrs_[fixed_width_idx][partition_id] = - const_cast(new_value_buffers[fixed_width_idx]->data()); + new_value_buffers[fixed_width_idx]->mutable_data(); if (input_fixed_width_has_null_[fixed_width_idx]) { partition_fixed_width_validity_addrs_[fixed_width_idx][partition_id] = - const_cast(new_validity_buffers[fixed_width_idx]->data()); + new_validity_buffers[fixed_width_idx]->mutable_data(); } else { partition_fixed_width_validity_addrs_[fixed_width_idx][partition_id] = nullptr; } @@ -813,6 +849,18 @@ arrow::Status Splitter::SpillPartition(int32_t partition_id) { std::make_shared(this, partition_id); } TIME_NANO_OR_RAISE(total_spill_time_, partition_writer_[partition_id]->Spill()); + + // reset validity buffer after spill + std::for_each(partition_fixed_width_buffers_.begin(), + partition_fixed_width_buffers_.end(), + [partition_id](std::vector& bufs) { + if (bufs[partition_id][0] != nullptr) { + // initialize all true once allocated + auto addr = bufs[partition_id][0]->mutable_data(); + memset(addr, 0xff, bufs[partition_id][0]->capacity()); + } + }); + return arrow::Status::OK(); } @@ -840,6 +888,9 @@ arrow::Result Splitter::SpillLargestPartition(int64_t* size) { } arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) { + // buffer is allocated less than 64K + // ARROW_CHECK_LE(rb.num_rows(),64*1024); + #ifdef PROCESSROW reducer_offsets_.resize(rb.num_rows()); @@ -857,7 +908,7 @@ arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) { } std::transform(reducer_offset_offset_.begin(), std::prev(reducer_offset_offset_.end()), partition_id_cnt_.begin(), reducer_offset_offset_.begin(), - [](uint16_t x, int16_t y) { return x - y; }); + [](row_offset_type x, row_offset_type y) { return x - y; }); #endif // for the first input record batch, scan binary arrays and large binary @@ -889,7 +940,10 @@ arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) { for (auto col = 0; col < fixed_width_array_idx_.size(); ++col) { auto col_idx = fixed_width_array_idx_[col]; size_per_row += arrow::bit_width(column_type_id_[col_idx]->id()) / 8; - if (rb.column_data(col_idx)->GetNullCount() != 0) { + // check input_fixed_width_has_null_[col] is cheaper than GetNullCount() + // once input_fixed_width_has_null_ is set to true, we didn't reset it after spill + if (input_fixed_width_has_null_[col] == false && + rb.column_data(col_idx)->GetNullCount() != 0) { input_fixed_width_has_null_[col] = true; } } @@ -904,7 +958,7 @@ arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) { for (auto pid = 0; pid < num_partitions_; ++pid) { if (partition_id_cnt_[pid] > 0) { // make sure the size to be allocated is larger than the size to be filled - auto new_size = std::max((uint16_t)prealloc_row_cnt, partition_id_cnt_[pid]); + auto new_size = std::max((row_offset_type)prealloc_row_cnt, partition_id_cnt_[pid]); if (partition_buffer_size_[pid] == 0) { // allocate buffer if it's not yet allocated RETURN_NOT_OK(AllocatePartitionBuffers(pid, new_size)); @@ -952,13 +1006,14 @@ arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) { for (auto pid = 0; pid < num_partitions_; ++pid) { partition_buffer_idx_base_[pid] += partition_id_cnt_[pid]; } + return arrow::Status::OK(); } arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) { const auto num_rows = rb.num_rows(); int64_t row; - std::vector partition_buffer_idx_offset; + std::vector partition_buffer_idx_offset; for (auto col = 0; col < fixed_width_array_idx_.size(); ++col) { const auto& dst_addrs = partition_fixed_width_value_addrs_[col]; @@ -973,7 +1028,7 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) std::transform(partition_buffer_idx_offset_.begin(), \ partition_buffer_idx_offset_.end(), partition_buffer_idx_base_.begin(), \ partition_buffer_idx_offset_.begin(), \ - [](uint8_t* x, int16_t y) { return x + y * sizeof(_CTYPE); }); \ + [](uint8_t* x, row_offset_type y) { return x + y * sizeof(_CTYPE); }); \ for (auto pid = 0; pid < num_partitions_; pid++) { \ auto dst_pid_base = \ reinterpret_cast<_CTYPE*>(partition_buffer_idx_offset_[pid]); /*32k*/ \ @@ -992,7 +1047,7 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) std::transform(partition_buffer_idx_offset_.begin(), \ partition_buffer_idx_offset_.end(), partition_buffer_idx_base_.begin(), \ partition_buffer_idx_offset_.begin(), \ - [](uint8_t* x, int16_t y) { return x + y * sizeof(_CTYPE); }); \ + [](uint8_t* x, row_offset_type y) { return x + y * sizeof(_CTYPE); }); \ for (row = 0; row < num_rows; ++row) { \ auto pid = partition_id_[row]; \ auto dst_pid_base = reinterpret_cast<_CTYPE*>(partition_buffer_idx_offset_[pid]); \ @@ -1013,7 +1068,7 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) std::transform( partition_buffer_idx_offset_.begin(), partition_buffer_idx_offset_.end(), partition_buffer_idx_base_.begin(), partition_buffer_idx_offset_.begin(), - [](uint8_t* x, int16_t y) { return x + y * sizeof(uint64_t); }); + [](uint8_t* x, row_offset_type y) { return x + y * sizeof(uint64_t); }); for (auto pid = 0; pid < num_partitions_; pid++) { auto dst_pid_base = reinterpret_cast(partition_buffer_idx_offset_[pid]); /*32k*/ @@ -1082,7 +1137,7 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) std::transform( partition_buffer_idx_offset_.begin(), partition_buffer_idx_offset_.end(), partition_buffer_idx_base_.begin(), partition_buffer_idx_offset_.begin(), - [](uint8_t* x, int16_t y) { return x + y * 16; }); + [](uint8_t* x, row_offset_type y) { return x + y * 16; }); for (auto pid = 0; pid < num_partitions_; pid++) { auto dst_pid_base = reinterpret_cast(partition_buffer_idx_offset_[pid]); /*32k*/ @@ -1103,7 +1158,7 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) std::transform( partition_buffer_idx_offset_.begin(), partition_buffer_idx_offset_.end(), partition_buffer_idx_base_.begin(), partition_buffer_idx_offset_.begin(), - [](uint8_t* x, int16_t y) { return x + y * 16; }); + [](uint8_t* x, row_offset_type y) { return x + y * 16; }); for (auto row = 0; row < num_rows; ++row) { auto pid = partition_id_[row]; reinterpret_cast(partition_buffer_idx_offset_[pid])[0] = @@ -1122,7 +1177,7 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) partition_buffer_idx_offset.begin()); for (auto row = 0; row < num_rows; ++row) { auto pid = partition_id_[row]; - uint16_t dst_offset = partition_buffer_idx_offset[pid]; + row_offset_type dst_offset = partition_buffer_idx_offset[pid]; dst_addrs[pid][dst_offset >> 3] ^= (dst_addrs[pid][dst_offset >> 3] >> (dst_offset & 7) ^ src_addr[row >> 3] >> (row & 7)) @@ -1307,41 +1362,27 @@ arrow::Status Splitter::SplitFixedWidthValueBufferAVX(const arrow::RecordBatch& arrow::Status Splitter::SplitFixedWidthValidityBuffer(const arrow::RecordBatch& rb) { const auto num_rows = rb.num_rows(); - std::vector partition_buffer_idx_offset; + std::vector partition_buffer_idx_offset; for (auto col = 0; col < fixed_width_array_idx_.size(); ++col) { auto col_idx = fixed_width_array_idx_[col]; auto& dst_addrs = partition_fixed_width_validity_addrs_[col]; - if (rb.column_data(col_idx)->GetNullCount() == 0 && - column_has_null_[col_idx] == true) { - // if the input record batch doesn't have null, set validity to True - // column_has_null_ is used to skip the partition_id_cnt_[pid] and dst_addrs[pid] - // access - for (auto pid = 0; pid < num_partitions_; ++pid) { - if (partition_id_cnt_[pid] > 0 && dst_addrs[pid] != nullptr) { - arrow::BitUtil::SetBitsTo(dst_addrs[pid], partition_buffer_idx_base_[pid], - partition_id_cnt_[pid], true); - } - } - } else if (rb.column_data(col_idx)->GetNullCount() > 0) { + if (rb.column_data(col_idx)->GetNullCount() > 0) { // there is Null count - column_has_null_[col_idx] = true; for (auto pid = 0; pid < num_partitions_; ++pid) { if (partition_id_cnt_[pid] > 0 && dst_addrs[pid] == nullptr) { // init bitmap if it's null, initialize the buffer as true auto new_size = - std::max(partition_id_cnt_[pid], (uint16_t)options_.buffer_size); - ARROW_ASSIGN_OR_RAISE( - auto validity_buffer, - arrow::AllocateResizableBuffer(arrow::BitUtil::BytesForBits(new_size), - options_.memory_pool)); + std::max(partition_id_cnt_[pid], (row_offset_type)options_.buffer_size); + std::shared_ptr validity_buffer; + auto status = AllocateBufferFromPool(validity_buffer, + arrow::BitUtil::BytesForBits(new_size)); + ARROW_RETURN_NOT_OK(status); dst_addrs[pid] = const_cast(validity_buffer->data()); - arrow::BitUtil::SetBitsTo(dst_addrs[pid], 0, partition_buffer_idx_base_[pid], - true); + memset(validity_buffer->mutable_data(), 0xff, validity_buffer->capacity()); partition_fixed_width_buffers_[col][pid][0] = std::move(validity_buffer); } } - auto src_addr = const_cast(rb.column_data(col_idx)->buffers[0]->data()); partition_buffer_idx_offset.resize(partition_buffer_idx_base_.size()); std::copy(partition_buffer_idx_base_.begin(), partition_buffer_idx_base_.end(), @@ -1355,6 +1396,18 @@ arrow::Status Splitter::SplitFixedWidthValidityBuffer(const arrow::RecordBatch& << (dst_offset & 7); partition_buffer_idx_offset[pid]++; } + // the last row may update the following bits to 0, reinitialize it as 1 + for (auto pid = 0; pid < num_partitions_; pid++) { + if (partition_id_cnt_[pid] > 0 && dst_addrs[pid] != nullptr) { + auto lastoffset = partition_buffer_idx_base_[pid] + partition_id_cnt_[pid]; + uint8_t dst = dst_addrs[pid][lastoffset >> 3]; + uint8_t msk = 0x1 << (lastoffset & 0x7); + msk = ~(msk - 1); + msk &= ((lastoffset & 7) == 0) - 1; + dst |= msk; + dst_addrs[pid][lastoffset >> 3] = dst; + } + } } } return arrow::Status::OK(); diff --git a/native-sql-engine/cpp/src/shuffle/splitter.h b/native-sql-engine/cpp/src/shuffle/splitter.h index 2fb4bb3d4..ab71446f9 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.h +++ b/native-sql-engine/cpp/src/shuffle/splitter.h @@ -48,6 +48,8 @@ class Splitter { virtual const std::shared_ptr& input_schema() const { return schema_; } + typedef uint32_t row_offset_type; + /** * Split input record batch into partition buffers according to the computed * partition id. The largest partition buffer will be spilled if memory @@ -138,6 +140,9 @@ class Splitter { arrow::Status SplitListArray(const arrow::RecordBatch& rb); + arrow::Status AllocateBufferFromPool(std::shared_ptr& buffer, + uint32_t size); + template ::ArrayType, typename BuilderType = typename arrow::TypeTraits::BuilderType> arrow::Status AppendBinary( @@ -174,8 +179,8 @@ class Splitter { // partid std::vector partition_buffer_size_; - // partid - std::vector partition_buffer_idx_base_; + // partid, value is reducer batch's offset, output rb rownum < 64k + std::vector partition_buffer_idx_base_; // partid // temp array to hold the destination pointer std::vector partition_buffer_idx_offset_; @@ -183,12 +188,11 @@ class Splitter { std::vector> partition_writer_; // col partid std::vector> partition_fixed_width_validity_addrs_; - // cache if the column has null so far for any reducer. To bypass the reducer check - std::vector column_has_null_; + // col partid std::vector> partition_fixed_width_value_addrs_; // col partid - std::vector>>> + std::vector>>> partition_fixed_width_buffers_; // col partid std::vector>> @@ -198,6 +202,11 @@ class Splitter { partition_large_binary_builders_; std::vector>> partition_list_builders_; // col partid + + // slice the buffer for each reducer's column, in this way we can combine into large + // page + std::shared_ptr combine_buffer_; + // partid std::vector>> partition_cached_recordbatch_; @@ -224,14 +233,15 @@ class Splitter { std::vector input_fixed_width_has_null_; // updated for each input record batch - // col + // col; value is partition number, part_num < 64k std::vector partition_id_; - // [num_rows] - std::vector reducer_offsets_; - // [num_partitions] - std::vector reducer_offset_offset_; - // col - std::vector partition_id_cnt_; + // [num_rows] ; value is offset in input record batch; input rb rownum < 64k + std::vector reducer_offsets_; + // [num_partitions]; value is offset of row in record batch; input rb rownum < 64k + std::vector reducer_offset_offset_; + // col ; value is reducer's row number for each input record batch; output rb rownum < + // 64k + std::vector partition_id_cnt_; int32_t num_partitions_; std::shared_ptr schema_; diff --git a/native-sql-engine/cpp/src/tests/shuffle_split_test.cc b/native-sql-engine/cpp/src/tests/shuffle_split_test.cc index 1f12742cd..fa03be61d 100644 --- a/native-sql-engine/cpp/src/tests/shuffle_split_test.cc +++ b/native-sql-engine/cpp/src/tests/shuffle_split_test.cc @@ -22,9 +22,21 @@ #include #include #include +#include #include #include +void print_trace(void) { + char** strings; + size_t i, size; + enum Constexpr { MAX_SIZE = 1024 }; + void* array[MAX_SIZE]; + size = backtrace(array, MAX_SIZE); + strings = backtrace_symbols(array, size); + for (i = 0; i < size; i++) printf(" %s\n", strings[i]); + puts(""); + free(strings); +} #include "shuffle/splitter.h" #include "tests/test_utils.h" @@ -42,6 +54,9 @@ class MyMemoryPool : public arrow::MemoryPool { } RETURN_NOT_OK(pool_->Allocate(size, out)); stats_.UpdateAllocatedBytes(size); + // std::cout << "Allocate: size = " << size << " addr = " << std::hex << + //(uint64_t)*out << std::dec << std::endl; + // print_trace(); return arrow::Status::OK(); } @@ -49,14 +64,22 @@ class MyMemoryPool : public arrow::MemoryPool { if (new_size > capacity_) { return Status::OutOfMemory("malloc of size ", new_size, " failed"); } + auto old_ptr = *ptr; RETURN_NOT_OK(pool_->Reallocate(old_size, new_size, ptr)); stats_.UpdateAllocatedBytes(new_size - old_size); + // std::cout << "Reallocate: old_size = " << old_size << " old_ptr = " << std::hex << + //(uint64_t)old_ptr << std::dec << " new_size = " << new_size << " addr = " << + // std::hex << (uint64_t)*ptr << std::dec << std::endl; + // print_trace(); return arrow::Status::OK(); } void Free(uint8_t* buffer, int64_t size) override { pool_->Free(buffer, size); stats_.UpdateAllocatedBytes(-size); + // std::cout << "Free: size = " << size << " addr = " << std::hex << (uint64_t)buffer + //<< std::dec << std::endl; + // print_trace(); } int64_t bytes_allocated() const override { return stats_.bytes_allocated(); } @@ -287,6 +310,31 @@ TEST_F(SplitterTest, TestRoundRobinSplitter) { } } +TEST_F(SplitterTest, TestSplitterMemoryLeak) { + std::shared_ptr pool = + std::make_shared(9 * 1024 * 1024); + + int32_t num_partitions = 2; + split_options_.buffer_size = 4; + split_options_.memory_pool = pool.get(); + split_options_.write_schema = false; + + ARROW_ASSIGN_OR_THROW(splitter_, + Splitter::Make("rr", schema_, num_partitions, split_options_)); + + ASSERT_NOT_OK(splitter_->Split(*input_batch_1_)); + ASSERT_NOT_OK(splitter_->Split(*input_batch_2_)); + ASSERT_NOT_OK(splitter_->Split(*input_batch_1_)); + + ASSERT_NOT_OK(splitter_->Stop()); + + ASSERT_TRUE(pool->bytes_allocated() == 0); + splitter_.reset(); + ASSERT_TRUE(pool->bytes_allocated() == 0); + + split_options_.memory_pool = arrow::default_memory_pool(); +} + TEST_F(SplitterTest, TestHashSplitter) { int32_t num_partitions = 2; split_options_.buffer_size = 4; @@ -431,12 +479,13 @@ TEST_F(SplitterTest, TestSpillFailWithOutOfMemory) { } TEST_F(SplitterTest, TestSpillLargestPartition) { - std::shared_ptr pool = std::make_shared(4000000); + std::shared_ptr pool = + std::make_shared(9 * 1024 * 1024); // pool = std::make_shared(pool.get()); int32_t num_partitions = 2; split_options_.buffer_size = 4; - split_options_.memory_pool = pool.get(); + // split_options_.memory_pool = pool.get(); split_options_.compression_type = arrow::Compression::UNCOMPRESSED; ARROW_ASSIGN_OR_THROW(splitter_, Splitter::Make("rr", schema_, num_partitions, split_options_));