Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-855] allocate large block of memory for all reducer #881 (#894)
Browse files Browse the repository at this point in the history
* merge master and branch shuffle_opt_fillbyreducer. To submit PR to upstream
Implemented fill by reducer

* format code

* Allocate large block of memory then slice to each buffer

* wip, rebase to master

* to rebase to master

* return to original

* added memory leak check in test

* Done

* disable alignment allocation in benchmark since arrow doesn't support it

* optimized validity buffer assign. initialize the validity buffer as true once allocated. skip the initialize during split
fix validity buffer bug

* fix out of memory test

* fix setbitsto bug
remove nullcnt

* add shuffle test

* remove unused variables

* allocate validity buffer from pool

* fix bug
set validity buffer after allocation
fix bug during of last bits after process valitity buffer

* Add arrow check for batch size and part number
use uint32 as row number size

* format code

* fix format

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

Co-authored-by: Yuan Zhou <yuan.zhou@intel.com>
  • Loading branch information
FelixYBW and zhouyuan authored May 9, 2022
1 parent f8e51f4 commit 3b4ce7e
Show file tree
Hide file tree
Showing 4 changed files with 358 additions and 124 deletions.
212 changes: 167 additions & 45 deletions native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,25 @@
#include <arrow/util/io_util.h>
//#include <gtest/gtest.h>
#include <benchmark/benchmark.h>
#include <execinfo.h>
#include <parquet/arrow/reader.h>
#include <parquet/file_reader.h>
#include <sched.h>
#include <shuffle/splitter.h>
#include <sys/mman.h>

#include <chrono>
void print_trace(void) {
char** strings;
size_t i, size;
enum Constexpr { MAX_SIZE = 1024 };
void* array[MAX_SIZE];
size = backtrace(array, MAX_SIZE);
strings = backtrace_symbols(array, size);
for (i = 0; i < size; i++) printf(" %s\n", strings[i]);
puts("");
free(strings);
}

#include "codegen/code_generator.h"
#include "codegen/code_generator_factory.h"
Expand All @@ -38,9 +51,112 @@
namespace sparkcolumnarplugin {
namespace shuffle {

#define ALIGNMENT 2048 * 1024

const int batch_buffer_size = 32768;
const int split_buffer_size = 8192;

class MyMemoryPool : public arrow::MemoryPool {
public:
explicit MyMemoryPool() {}

Status Allocate(int64_t size, uint8_t** out) override {
RETURN_NOT_OK(pool_->Allocate(size, out));
stats_.UpdateAllocatedBytes(size);
// std::cout << "Allocate: size = " << size << " addr = " << std::hex <<
// (uint64_t)*out << std::dec << std::endl; print_trace();
return arrow::Status::OK();
}

Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override {
auto old_ptr = *ptr;
RETURN_NOT_OK(pool_->Reallocate(old_size, new_size, ptr));
stats_.UpdateAllocatedBytes(new_size - old_size);
// std::cout << "Reallocate: old_size = " << old_size << " old_ptr = " << std::hex <<
// (uint64_t)old_ptr << std::dec << " new_size = " << new_size << " addr = " <<
// std::hex << (uint64_t)*ptr << std::dec << std::endl; print_trace();
return arrow::Status::OK();
}

void Free(uint8_t* buffer, int64_t size) override {
pool_->Free(buffer, size);
stats_.UpdateAllocatedBytes(-size);
// std::cout << "Free: size = " << size << " addr = " << std::hex << (uint64_t)buffer
// << std::dec << std::endl; print_trace();
}

int64_t bytes_allocated() const override { return stats_.bytes_allocated(); }

int64_t max_memory() const override { return pool_->max_memory(); }

std::string backend_name() const override { return pool_->backend_name(); }

private:
MemoryPool* pool_ = arrow::default_memory_pool();
arrow::internal::MemoryPoolStats stats_;
};

#define ENABLELARGEPAGE

class LargePageMemoryPool : public MemoryPool {
public:
explicit LargePageMemoryPool() {}

~LargePageMemoryPool() override = default;

Status Allocate(int64_t size, uint8_t** out) override {
#ifdef ENABLELARGEPAGE
if (size < 2 * 1024 * 1024) {
return pool_->Allocate(size, out);
} else {
Status st = pool_->AlignAllocate(size, out, ALIGNMENT);
madvise(*out, size, /*MADV_HUGEPAGE */ 14);
//std::cout << "Allocate: size = " << size << " addr = " \
// << std::hex << (uint64_t)*out << " end = " << std::hex << (uint64_t)(*out+size) << std::dec << std::endl;
return st;
}
#else
return pool_->Allocate(size, out);
#endif
}

Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override {
return pool_->Reallocate(old_size, new_size, ptr);
#ifdef ENABLELARGEPAGE
if (new_size < 2 * 1024 * 1024) {
return pool_->Reallocate(old_size, new_size, ptr);
} else {
Status st = pool_->AlignReallocate(old_size, new_size, ptr, ALIGNMENT);
// madvise(*ptr, new_size, /*MADV_HUGEPAGE */ 14);
return st;
}
#else
return pool_->Reallocate(old_size, new_size, ptr);
#endif
}

void Free(uint8_t* buffer, int64_t size) override {
#ifdef ENABLELARGEPAGE
if (size < 2 * 1024 * 1024) {
pool_->Free(buffer, size);
} else {
pool_->Free(buffer, size, ALIGNMENT);
}
#else
pool_->Free(buffer, size);
#endif
}

int64_t bytes_allocated() const override { return pool_->bytes_allocated(); }

int64_t max_memory() const override { return pool_->max_memory(); }

std::string backend_name() const override { return "LargePageMemoryPool"; }

private:
MemoryPool* pool_ = arrow::default_memory_pool();
};

class BenchmarkShuffleSplit {
public:
BenchmarkShuffleSplit(std::string file_name) { GetRecordBatchReader(file_name); }
Expand Down Expand Up @@ -89,6 +205,8 @@ class BenchmarkShuffleSplit {
SetCPU(state.thread_index());
arrow::Compression::type compression_type = (arrow::Compression::type)state.range(1);

std::shared_ptr<arrow::MemoryPool> pool = std::make_shared<LargePageMemoryPool>();

const int num_partitions = state.range(0);

auto options = SplitOptions::Defaults();
Expand All @@ -98,6 +216,7 @@ class BenchmarkShuffleSplit {
options.offheap_per_task = 128 * 1024 * 1024 * 1024L;
options.prefer_spill = true;
options.write_schema = false;
options.memory_pool = pool.get();

std::shared_ptr<Splitter> splitter;
int64_t elapse_read = 0;
Expand Down Expand Up @@ -166,6 +285,7 @@ class BenchmarkShuffleSplit {
splitter->TotalWriteTime();
state.counters["split_time"] = benchmark::Counter(
split_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
splitter.reset();
}

protected:
Expand Down Expand Up @@ -201,26 +321,27 @@ class BenchmarkShuffleSplit_CacheScan_Benchmark : public BenchmarkShuffleSplit {
const int num_partitions, SplitOptions options, benchmark::State& state) {
std::vector<int> local_column_indices;
local_column_indices.push_back(0);
local_column_indices.push_back(1);
local_column_indices.push_back(2);
local_column_indices.push_back(4);
local_column_indices.push_back(5);
local_column_indices.push_back(6);
local_column_indices.push_back(7);
/* local_column_indices.push_back(0);
local_column_indices.push_back(1);
local_column_indices.push_back(2);
local_column_indices.push_back(4);
local_column_indices.push_back(5);
local_column_indices.push_back(6);
local_column_indices.push_back(7);*/

std::shared_ptr<arrow::Schema> local_schema;
local_schema = std::make_shared<arrow::Schema>(*schema.get());

ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(15));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(14));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(13));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(12));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(11));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(10));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(9));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(8));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(3));

/* ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(15));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(14));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(13));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(12));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(11));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(10));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(9));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(8));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(3));
*/
if (state.thread_index() == 0) std::cout << local_schema->ToString() << std::endl;

ARROW_ASSIGN_OR_THROW(splitter,
Expand Down Expand Up @@ -251,11 +372,13 @@ class BenchmarkShuffleSplit_CacheScan_Benchmark : public BenchmarkShuffleSplit {
std::cout << "batches = " << num_batches << " rows = " << num_rows << std::endl;

for (auto _ : state) {
for_each(
batches.begin(), batches.end(),
[&splitter, &split_time](std::shared_ptr<arrow::RecordBatch>& record_batch) {
TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch));
});
for_each(batches.begin(), batches.end(),
[&splitter, &split_time,
&options](std::shared_ptr<arrow::RecordBatch>& record_batch) {
TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch));
});
// std::cout << " split done memory allocated = " <<
// options.memory_pool->bytes_allocated() << std::endl;
}

TIME_NANO_OR_THROW(split_time, splitter->Stop());
Expand Down Expand Up @@ -374,31 +497,30 @@ int main(int argc, char** argv) {
->MeasureProcessCPUTime()
->Unit(benchmark::kSecond);

/* sparkcolumnarplugin::shuffle::BenchmarkShuffleSplit_IterateScan_Benchmark
bck(datafile);
benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck)
->Iterations(1)
->Args({96*2, arrow::Compression::FASTPFOR})
->Args({96*4, arrow::Compression::FASTPFOR})
->Args({96*8, arrow::Compression::FASTPFOR})
->Args({96*16, arrow::Compression::FASTPFOR})
->Args({96*32, arrow::Compression::FASTPFOR})
->Threads(24)
->Unit(benchmark::kSecond);
benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck)
->Iterations(1)
->Args({4096, arrow::Compression::FASTPFOR})
->Threads(1)
->Threads(2)
->Threads(4)
->Threads(8)
->Threads(16)
->Threads(24)
->Unit(benchmark::kSecond);
/* sparkcolumnarplugin::shuffle::BenchmarkShuffleSplit_IterateScan_Benchmark
bck(datafile);
benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck)
->Iterations(1)
->Args({96*2, arrow::Compression::FASTPFOR})
->Args({96*4, arrow::Compression::FASTPFOR})
->Args({96*8, arrow::Compression::FASTPFOR})
->Args({96*16, arrow::Compression::FASTPFOR})
->Args({96*32, arrow::Compression::FASTPFOR})
->Threads(24)
->Unit(benchmark::kSecond);
benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck)
->Iterations(1)
->Args({4096, arrow::Compression::FASTPFOR})
->Threads(1)
->Threads(2)
->Threads(4)
->Threads(8)
->Threads(16)
->Threads(24)
->Unit(benchmark::kSecond);
*/

benchmark::Initialize(&argc, argv);
benchmark::RunSpecifiedBenchmarks();
benchmark::Shutdown();
Expand Down
Loading

0 comments on commit 3b4ce7e

Please sign in to comment.