Skip to content

Commit

Permalink
ARROW-15332: [C++] Add new cases and fix issues in IPC read/write ben…
Browse files Browse the repository at this point in the history
…chmark

 * The benchmark named ReadFile is misleading since it is actually reading from an in-memory buffer and no OS "read" call is ever issued.  I've renamed it to ReadBuffer (and ReadCompressedBuffer)
 * Renamed ReadTempFile to ReadCachedFile and added a second case for ReadUncachedFile. The former reads a file in the OS' page cache and the latter forces a read to actually hit the disk.
 * The TempFile benchmarks were not actually writing the correct amount of data and were reporting unrealistically high rates as a result.
 * Added a "partial read" parameter which, when true, only reads 1/8 the columns in the file so we can see the impact of pushdown projection.
 * Slightly reduced the range of parameters to keep the benchmark time reasonable (8k columns wasn't telling us anything more than 4k columns).

NOTE: This PR will invalidate some previous results from arrow-ipc-read-write-benchmark, disrupting conbench & other monitoring efforts.  This is because those previous results were wrong.

It also likely invalidates even more arrow-ipc-read-write-benchmark results because we added a new parameter and renamed some of the benchmarks.

Closes #12150 from westonpace/feature/ARROW-15332--add-fix-ipc-read-write-benchmark

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
  • Loading branch information
westonpace committed Jan 14, 2022
1 parent 093fdad commit f585a47
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 42 deletions.
26 changes: 26 additions & 0 deletions cpp/src/arrow/io/test_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@
#include "arrow/io/memory.h"
#include "arrow/memory_pool.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/io_util.h"

namespace arrow {

using internal::IOErrorFromErrno;

namespace io {

void AssertFileContents(const std::string& path, const std::string& contents) {
Expand All @@ -48,6 +52,28 @@ void AssertFileContents(const std::string& path, const std::string& contents) {

bool FileExists(const std::string& path) { return std::ifstream(path.c_str()).good(); }

Status PurgeLocalFileFromOsCache(const std::string& path) {
#if defined(POSIX_FADV_WILLNEED)
int fd = open(path.c_str(), O_WRONLY);
if (fd < 0) {
return IOErrorFromErrno(errno, "open on ", path,
" to clear from cache did not succeed.");
}
int err = posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED);
if (err != 0) {
return IOErrorFromErrno(err, "fadvise on ", path,
" to clear from cache did not succeed");
}
err = close(fd);
if (err == 0) {
return Status::OK();
}
return IOErrorFromErrno(err, "close on ", path, " to clear from cache did not succeed");
#else
return Status::NotImplemented("posix_fadvise is not implemented on this machine");
#endif
}

#if defined(_WIN32)
static void InvalidParamHandler(const wchar_t* expr, const wchar_t* func,
const wchar_t* source_file, unsigned int source_line,
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/io/test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ void AssertFileContents(const std::string& path, const std::string& contents);

ARROW_TESTING_EXPORT bool FileExists(const std::string& path);

ARROW_TESTING_EXPORT Status PurgeLocalFileFromOsCache(const std::string& path);

ARROW_TESTING_EXPORT bool FileIsClosed(int fd);

ARROW_TESTING_EXPORT
Expand Down
191 changes: 149 additions & 42 deletions cpp/src/arrow/ipc/read_write_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
#include "arrow/buffer.h"
#include "arrow/io/file.h"
#include "arrow/io/memory.h"
#include "arrow/io/test_common.h"
#include "arrow/ipc/api.h"
#include "arrow/record_batch.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/type.h"
#include "arrow/util/io_util.h"

namespace arrow {

Expand All @@ -50,6 +52,27 @@ std::shared_ptr<RecordBatch> MakeRecordBatch(int64_t total_size, int64_t num_fie
return RecordBatch::Make(schema, length, arrays);
}

std::vector<int> GetIncludedFields(int64_t num_fields, int64_t is_partial_read) {
if (is_partial_read) {
std::vector<int> field_indices;
for (int i = 0; i < num_fields; i += 8) {
field_indices.push_back(i);
}
return field_indices;
} else {
return std::vector<int>();
}
}

int64_t BytesPerIteration(int64_t num_fields, int64_t is_partial_read,
int64_t total_size) {
std::size_t num_actual_fields = GetIncludedFields(num_fields, is_partial_read).size();
double selectivity = num_actual_fields / static_cast<double>(num_fields);
if (num_actual_fields == 0) selectivity = 1;
auto bytes = total_size * selectivity;
return static_cast<int64_t>(bytes);
}

static void WriteRecordBatch(benchmark::State& state) { // NOLINT non-const reference
// 1MB
constexpr int64_t kTotalSize = 1 << 20;
Expand Down Expand Up @@ -177,7 +200,8 @@ static void DecodeStream(benchmark::State& state) { // NOLINT non-const referen
} \
ABORT_NOT_OK(writer->Close()); \
ABORT_NOT_OK(stream.Close()); \
}
} \
constexpr int64_t total_size = kBatchSize * kBatches;
#endif

#define GENERATE_DATA_IN_MEMORY() \
Expand All @@ -192,7 +216,8 @@ static void DecodeStream(benchmark::State& state) { // NOLINT non-const referen
ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \
ABORT_NOT_OK(writer->Close()); \
ABORT_NOT_OK(stream.Close()); \
}
} \
constexpr int64_t total_size = kBatchSize * kBatches;

#define GENERATE_DATA_TEMP_FILE() \
constexpr int64_t kBatchSize = 1 << 20; /* 1 MB */ \
Expand All @@ -202,60 +227,142 @@ static void DecodeStream(benchmark::State& state) { // NOLINT non-const referen
{ \
auto record_batch = MakeRecordBatch(kBatchSize, state.range(0)); \
auto writer = *ipc::MakeFileWriter(sink, record_batch->schema(), options); \
ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \
for (int64_t i = 0; i < kBatches; i++) { \
ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \
} \
ABORT_NOT_OK(writer->Close()); \
ABORT_NOT_OK(sink->Close()); \
} \
constexpr int64_t total_size = kBatchSize * kBatches;

// Note: When working with real files we ensure each array is at least 4MB large
// This slows things down considerably but using smaller sized arrays will cause
// the I/O to bottleneck for partial reads which is not what we are trying to
// measure here (although this may be interesting to optimize someday)
#define GENERATE_DATA_REAL_FILE() \
constexpr int64_t kArraySize = (1 << 19) * sizeof(int64_t); /* 4 MB */ \
constexpr int64_t kBatches = 4; \
auto num_fields = state.range(0); \
auto options = ipc::IpcWriteOptions::Defaults(); \
ASSIGN_OR_ABORT(auto sink, io::FileOutputStream::Open("/tmp/benchmark.arrow")); \
{ \
auto batch_size = kArraySize * num_fields; \
auto record_batch = MakeRecordBatch(batch_size, num_fields); \
auto writer = *ipc::MakeFileWriter(sink, record_batch->schema(), options); \
for (int64_t i = 0; i < kBatches; i++) { \
ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch)); \
} \
ABORT_NOT_OK(writer->Close()); \
ABORT_NOT_OK(sink->Close()); \
} \
int64_t total_size = kArraySize * kBatches * num_fields;

#define PURGE_OR_SKIP(FILE) \
{ \
auto status = io::PurgeLocalFileFromOsCache("/tmp/benchmark.arrow"); \
if (!status.ok()) { \
std::string err = "Cannot purge local files from cache: " + status.ToString(); \
state.SkipWithError(err.c_str()); \
} \
}

#define READ_DATA_IN_MEMORY() auto input = std::make_shared<io::BufferReader>(buffer);
#define READ_DATA_TEMP_FILE() \
ASSIGN_OR_ABORT(auto input, io::ReadableFile::Open("/tmp/benchmark.arrow"));
// This will not be correct if your system mounts /tmp to RAM (using tmpfs
// or ramfs).
#define READ_DATA_REAL_FILE() \
PURGE_OR_SKIP("/tmp/benchmark.arrow"); \
ASSIGN_OR_ABORT(auto input, io::ReadableFile::Open("/tmp/benchmark.arrow"));

#define READ_DATA_MMAP_FILE() \
ASSIGN_OR_ABORT(auto input, io::MemoryMappedFile::Open("/tmp/benchmark.arrow", \
io::FileMode::type::READ));
#define READ_DATA_MMAP_REAL_FILE() \
PURGE_OR_SKIP("/tmp/benchmark.arrow"); \
ASSIGN_OR_ABORT(auto input, io::MemoryMappedFile::Open("/tmp/benchmark.arrow", \
io::FileMode::type::READ));

#define READ_SYNC(NAME, GENERATE, READ) \
static void NAME(benchmark::State& state) { \
GENERATE(); \
for (auto _ : state) { \
READ(); \
auto reader = *ipc::RecordBatchFileReader::Open(input.get(), \
ipc::IpcReadOptions::Defaults()); \
const int num_batches = reader->num_record_batches(); \
for (int i = 0; i < num_batches; ++i) { \
auto batch = *reader->ReadRecordBatch(i); \
} \
} \
state.SetBytesProcessed(int64_t(state.iterations()) * kBatchSize * kBatches); \
} \
BENCHMARK(NAME)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();

#define READ_ASYNC(NAME, GENERATE, READ) \
static void NAME##Async(benchmark::State& state) { \
GENERATE(); \
for (auto _ : state) { \
READ(); \
auto reader = *ipc::RecordBatchFileReader::Open(input.get(), \
ipc::IpcReadOptions::Defaults()); \
ASSIGN_OR_ABORT(auto generator, reader->GetRecordBatchGenerator()); \
const int num_batches = reader->num_record_batches(); \
for (int i = 0; i < num_batches; ++i) { \
auto batch = *generator().result(); \
} \
} \
state.SetBytesProcessed(int64_t(state.iterations()) * kBatchSize * kBatches); \
} \
BENCHMARK(NAME##Async)->RangeMultiplier(4)->Range(1, 1 << 13)->UseRealTime();
#define READ_SYNC(NAME, GENERATE, READ) \
static void NAME(benchmark::State& state) { \
GENERATE(); \
for (auto _ : state) { \
READ(); \
ipc::IpcReadOptions options; \
options.included_fields = GetIncludedFields(state.range(0), state.range(1)); \
auto reader = *ipc::RecordBatchFileReader::Open(input.get(), options); \
const int num_batches = reader->num_record_batches(); \
for (int i = 0; i < num_batches; ++i) { \
auto batch = *reader->ReadRecordBatch(i); \
} \
} \
int64_t bytes_per_iter = \
BytesPerIteration(state.range(0), state.range(1), total_size); \
state.SetBytesProcessed(int64_t(state.iterations()) * bytes_per_iter); \
} \
BENCHMARK(NAME)

#define READ_ASYNC(NAME, GENERATE, READ) \
static void NAME##Async(benchmark::State& state) { \
GENERATE(); \
for (auto _ : state) { \
READ(); \
ipc::IpcReadOptions options; \
options.included_fields = GetIncludedFields(state.range(0), state.range(1)); \
auto reader = *ipc::RecordBatchFileReader::Open(input.get(), options); \
ASSIGN_OR_ABORT(auto generator, reader->GetRecordBatchGenerator()); \
const int num_batches = reader->num_record_batches(); \
for (int i = 0; i < num_batches; ++i) { \
auto batch = *generator().result(); \
} \
} \
int64_t bytes_per_iter = \
BytesPerIteration(state.range(0), state.range(1), total_size); \
state.SetBytesProcessed(int64_t(state.iterations()) * bytes_per_iter); \
} \
BENCHMARK(NAME##Async)

const std::vector<std::string> kArgNames = {"num_cols", "is_partial"};

#define READ_BENCHMARK(NAME, GENERATE, READ) \
READ_SYNC(NAME, GENERATE, READ); \
READ_ASYNC(NAME, GENERATE, READ);

READ_BENCHMARK(ReadFile, GENERATE_DATA_IN_MEMORY, READ_DATA_IN_MEMORY);
READ_BENCHMARK(ReadTempFile, GENERATE_DATA_TEMP_FILE, READ_DATA_TEMP_FILE);
READ_BENCHMARK(ReadMmapFile, GENERATE_DATA_TEMP_FILE, READ_DATA_MMAP_FILE);
READ_SYNC(NAME, GENERATE, READ) \
->RangeMultiplier(8) \
->Ranges({{1, 1 << 12}, {0, 1}}) \
->ArgNames(kArgNames) \
->UseRealTime(); \
READ_ASYNC(NAME, GENERATE, READ) \
->RangeMultiplier(8) \
->ArgNames(kArgNames) \
->Ranges({{1, 1 << 12}, {0, 1}}) \
->UseRealTime();

READ_BENCHMARK(ReadBuffer, GENERATE_DATA_IN_MEMORY, READ_DATA_IN_MEMORY);
READ_BENCHMARK(ReadCachedFile, GENERATE_DATA_TEMP_FILE, READ_DATA_TEMP_FILE);
// We use READ_SYNC/READ_ASYNC directly here so we can reduce the parameter
// space as real files get quite large
READ_SYNC(ReadUncachedFile, GENERATE_DATA_REAL_FILE, READ_DATA_REAL_FILE)
->RangeMultiplier(8)
->ArgNames(kArgNames)
->Ranges({{1, 1 << 6}, {0, 1}})
->UseRealTime();
READ_ASYNC(ReadUncachedFile, GENERATE_DATA_REAL_FILE, READ_DATA_REAL_FILE)
->RangeMultiplier(8)
->ArgNames(kArgNames)
->Ranges({{1, 1 << 6}, {0, 1}})
->UseRealTime();
READ_BENCHMARK(ReadMmapCachedFile, GENERATE_DATA_TEMP_FILE, READ_DATA_MMAP_FILE);
READ_SYNC(ReadMmapUncachedFile, GENERATE_DATA_REAL_FILE, READ_DATA_MMAP_REAL_FILE)
->RangeMultiplier(8)
->ArgNames(kArgNames)
->Ranges({{1, 1 << 6}, {0, 1}})
->UseRealTime();
READ_ASYNC(ReadMmapUncachedFile, GENERATE_DATA_REAL_FILE, READ_DATA_MMAP_REAL_FILE)
->RangeMultiplier(8)
->ArgNames(kArgNames)
->Ranges({{1, 1 << 6}, {0, 1}})
->UseRealTime();
#ifdef ARROW_WITH_ZSTD
READ_BENCHMARK(ReadCompressedFile, GENERATE_COMPRESSED_DATA_IN_MEMORY,
READ_BENCHMARK(ReadCompressedBuffer, GENERATE_COMPRESSED_DATA_IN_MEMORY,
READ_DATA_IN_MEMORY);
#endif

Expand Down

0 comments on commit f585a47

Please sign in to comment.