Skip to content

Commit

Permalink
fix: enable benchmark (#151)
Browse files Browse the repository at this point in the history
Signed-off-by: Ted Xu <ted.xu@zilliz.com>
  • Loading branch information
tedxu authored Sep 30, 2024
1 parent 872debe commit 8b6ca71
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 45 deletions.
55 changes: 30 additions & 25 deletions cpp/benchmark/benchmark_packed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <packed/writer.h>
#include <parquet/properties.h>
#include <packed/reader.h>
#include <iostream>
#include <memory>
#include <ratio>
#include <arrow/type.h>
Expand All @@ -32,8 +31,12 @@
#include <arrow/util/key_value_metadata.h>
#include "filesystem/fs.h"

namespace milvus_storage {
#define SKIP_IF_NOT_OK(status, st) \
if (!status.ok()) { \
st.SkipWithError(status.ToString()); \
}

namespace milvus_storage {
// Environment variables to configure the S3 test environment
static const char* kEnvAccessKey = "ACCESS_KEY";
static const char* kEnvSecretKey = "SECRET_KEY";
Expand Down Expand Up @@ -75,22 +78,23 @@ static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const st

// after writing, the column of large_str is in 0th file, and the last int64 columns are in 1st file
std::vector<std::shared_ptr<arrow::Field>> fields = {
arrow::field("str", arrow::utf8()),
arrow::field("int32", arrow::int32()),
arrow::field("int64", arrow::int64()),
arrow::field("int", arrow::utf8()),
arrow::field("int64", arrow::int32()),
arrow::field("str", arrow::int64()),
};
auto schema = arrow::schema(fields);

for (auto _ : st) {
PackedRecordBatchReader pr(*fs, paths, schema, column_offsets, needed_columns, buffer_size);
auto r = pr.ToTable();
if (!r.ok()) {
st.SkipWithError(r.status().ToString());
}
auto table = r.ValueOrDie();
r = pr.Close();
if (!r.ok()) {
st.SkipWithError(r.status().ToString());
auto r = arrow::RecordBatch::MakeEmpty(schema);
SKIP_IF_NOT_OK(r.status(), st)
auto rb = r.ValueOrDie();
while (true) {
SKIP_IF_NOT_OK(pr.ReadNext(&rb), st);
if (rb == nullptr || rb->num_rows() == 0) {
SKIP_IF_NOT_OK(pr.Close(), st)
break;
}
}
}
}
Expand All @@ -102,28 +106,28 @@ static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const s
arrow::Int64Builder int64_builder;
arrow::StringBuilder str_builder;

int_builder.AppendValues({1, 2, 3});
int64_builder.AppendValues({4, 5, 6});
str_builder.AppendValues({"foo", "bar", "baz"});
SKIP_IF_NOT_OK(int_builder.AppendValues({1, 2, 3}), st);
SKIP_IF_NOT_OK(int64_builder.AppendValues({4, 5, 6}), st);
SKIP_IF_NOT_OK(str_builder.AppendValues({std::string(1024, 'b'), std::string(1024, 'a'), std::string(1024, 'z')}),
st);

std::shared_ptr<arrow::Array> int_array;
std::shared_ptr<arrow::Array> int64_array;
std::shared_ptr<arrow::Array> str_array;

int_builder.Finish(&int_array);
int64_builder.Finish(&int64_array);
str_builder.Finish(&str_array);
SKIP_IF_NOT_OK(int_builder.Finish(&int_array), st);
SKIP_IF_NOT_OK(int64_builder.Finish(&int64_array), st);
SKIP_IF_NOT_OK(str_builder.Finish(&str_array), st);

std::vector<std::shared_ptr<arrow::Array>> arrays = {int_array, int64_array, str_array};
auto record_batch = arrow::RecordBatch::Make(schema, 3, arrays);

for (auto _ : st) {
PackedRecordBatchWriter writer(buffer_size, schema, *fs, path, *parquet::default_writer_properties());
for (int i = 0; i < 500000; ++i) {
for (int i = 0; i < 8 * 1024; ++i) {
auto r = writer.Write(record_batch);
if (!r.ok()) {
st.SkipWithError(r.ToString());
std::cerr << "exit on iteration " << i << std::endl;
break;
}
}
Expand All @@ -134,14 +138,15 @@ static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const s
}
}

std::string PATH = "/tmp/bench/foo";

BENCHMARK_DEFINE_F(S3Fixture, Write32MB)(benchmark::State& st) {
PackedWrite(st, fs_.get(), "/tmp/bench/foo", 12 * 1024 * 1024);
SKIP_IF_NOT_OK(fs_->CreateDir(PATH), st);
PackedWrite(st, fs_.get(), PATH, 22 * 1024 * 1024);
}
BENCHMARK_REGISTER_F(S3Fixture, Write32MB)->UseRealTime();

BENCHMARK_DEFINE_F(S3Fixture, Read32MB)(benchmark::State& st) {
PackedRead(st, fs_.get(), "/tmp/bench/foo", 12 * 1024 * 1024);
}
BENCHMARK_DEFINE_F(S3Fixture, Read32MB)(benchmark::State& st) { PackedRead(st, fs_.get(), PATH, 22 * 1024 * 1024); }
BENCHMARK_REGISTER_F(S3Fixture, Read32MB)->UseRealTime();

} // namespace milvus_storage
4 changes: 2 additions & 2 deletions cpp/include/milvus-storage/packed/column_group_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ class ColumnGroupWriter {
std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::vector<int> origin_column_indices);
const std::vector<int>& origin_column_indices);

ColumnGroupWriter(GroupId group_id,
std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const parquet::WriterProperties& props,
const std::vector<int> origin_column_indices);
const std::vector<int>& origin_column_indices);

Status Init();
Status Write(const std::shared_ptr<arrow::RecordBatch>& record);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/format/parquet/file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ ParquetFileWriter::ParquetFileWriter(std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const parquet::WriterProperties& props)
: schema_(std::move(schema)), fs_(fs), file_path_(file_path), props_(props), count_(0) {}
: schema_(std::move(schema)), fs_(fs), file_path_(file_path), props_(props) {}

Status ParquetFileWriter::Init() {
auto coln = schema_->num_fields();
Expand Down
14 changes: 8 additions & 6 deletions cpp/src/packed/column_group_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <arrow/record_batch.h>
#include <arrow/status.h>
#include <parquet/properties.h>

#include <utility>
#include "common/log.h"
#include "common/status.h"
#include "format/parquet/file_writer.h"
Expand All @@ -27,9 +29,9 @@ ColumnGroupWriter::ColumnGroupWriter(GroupId group_id,
std::shared_ptr<arrow::Schema> schema,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::vector<int> origin_column_indices)
const std::vector<int>& origin_column_indices)
: group_id_(group_id),
writer_(schema, fs, file_path),
writer_(std::move(schema), fs, file_path),
column_group_(group_id, origin_column_indices),
finished_(false) {}

Expand All @@ -38,9 +40,9 @@ ColumnGroupWriter::ColumnGroupWriter(GroupId group_id,
arrow::fs::FileSystem& fs,
const std::string& file_path,
const parquet::WriterProperties& props,
const std::vector<int> origin_column_indices)
const std::vector<int>& origin_column_indices)
: group_id_(group_id),
writer_(schema, fs, file_path, props),
writer_(std::move(schema), fs, file_path, props),
column_group_(group_id, origin_column_indices),
flushed_batches_(0),
flushed_rows_(0),
Expand Down Expand Up @@ -73,8 +75,8 @@ Status ColumnGroupWriter::Flush() {

Status ColumnGroupWriter::Close() {
finished_ = true;
LOG_STORAGE_INFO_ << "Group " << group_id_ << " flushed " << flushed_batches_ << " batches and " << flushed_rows_
<< " rows in " << flushed_count_ << " flushes";
LOG_STORAGE_DEBUG_ << "Group " << group_id_ << " flushed " << flushed_batches_ << " batches and " << flushed_rows_
<< " rows in " << flushed_count_ << " flushes";
return writer_.Close();
}

Expand Down
23 changes: 15 additions & 8 deletions cpp/src/packed/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
const std::vector<ColumnOffset>& column_offsets,
const std::set<int>& needed_columns,
const int64_t buffer_size)
: schema_(std::move(schema)),
: schema_(schema),
buffer_available_(buffer_size),
memory_limit_(buffer_size),
row_limit_(0),
Expand Down Expand Up @@ -74,9 +74,10 @@ arrow::Status PackedRecordBatchReader::advanceBuffer() {
auto advance_row_group = [&](int i) -> int64_t {
auto& reader = file_readers_[i];
int rg = column_group_states_[i].row_group_offset + 1;
if (rg >= reader->parquet_reader()->metadata()->num_row_groups()) {
int num_row_groups = reader->parquet_reader()->metadata()->num_row_groups();
if (rg >= num_row_groups) {
// No more row groups. It means we're done or there is an error.
LOG_STORAGE_DEBUG_ << "No more row groups in file " << i;
LOG_STORAGE_DEBUG_ << "No more row groups in file " << i << " total row groups " << num_row_groups;
return -1;
}
int64_t rg_size = row_group_sizes_[i][rg];
Expand Down Expand Up @@ -109,8 +110,14 @@ arrow::Status PackedRecordBatchReader::advanceBuffer() {
chunk_manager_->ResetChunkState(i);
}

if (drained_index >= 0 && plan_buffer_size == 0) {
return arrow::Status::OK();
if (drained_index >= 0) {
if (plan_buffer_size == 0) {
// If nothing to fill, it must be done
return arrow::Status::OK();
} else {
// Otherwise, the rows are not match, there is something wrong with the files.
return arrow::Status::Invalid("File broken at index " + std::to_string(drained_index));
}
}

// Fill in tables if we have enough buffer size
Expand All @@ -134,7 +141,7 @@ arrow::Status PackedRecordBatchReader::advanceBuffer() {
continue;
}
read_count_++;
LOG_STORAGE_DEBUG_ << "File reader " << i << " read " << rgs_to_read[i].size() << " row groups";
LOG_STORAGE_DEBUG_ << "File reader " << i << " advanced to row group " << rgs_to_read[i].back();
column_group_states_[i].read_times++;
std::shared_ptr<arrow::Table> read_table = nullptr;
RETURN_NOT_OK(file_readers_[i]->ReadRowGroups(rgs_to_read[i], &read_table));
Expand Down Expand Up @@ -163,9 +170,9 @@ arrow::Status PackedRecordBatchReader::ReadNext(std::shared_ptr<arrow::RecordBat
}

arrow::Status PackedRecordBatchReader::Close() {
LOG_STORAGE_INFO_ << "PackedRecordBatchReader::Close(), total read " << read_count_ << " times";
LOG_STORAGE_DEBUG_ << "PackedRecordBatchReader::Close(), total read " << read_count_ << " times";
for (int i = 0; i < column_group_states_.size(); ++i) {
LOG_STORAGE_INFO_ << "File reader " << i << " read " << column_group_states_[i].read_times << " times";
LOG_STORAGE_DEBUG_ << "File reader " << i << " read " << column_group_states_[i].read_times << " times";
}
read_count_ = 0;
column_group_states_.clear();
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/packed/splitter/size_based_splitter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ std::vector<ColumnGroup> SizeBasedSplitter::SplitRecordBatches(
for (GroupId group_id = 0; group_id < group_indices.size(); ++group_id) {
auto batch = record->SelectColumns(group_indices[group_id]).ValueOrDie();
if (column_groups.size() < group_indices.size()) {
column_groups.push_back(ColumnGroup(group_id, group_indices[group_id], batch));
column_groups.emplace_back(group_id, group_indices[group_id], batch);
} else {
column_groups[group_id].AddRecordBatch(batch);
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/packed/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ PackedRecordBatchWriter::PackedRecordBatchWriter(size_t memory_limit,
Status PackedRecordBatchWriter::Write(const std::shared_ptr<arrow::RecordBatch>& record) {
size_t next_batch_size = GetRecordBatchMemorySize(record);
if (next_batch_size > memory_limit_) {
return Status::InvalidArgument("Provided record batch size exceeds memory limit");
LOG_STORAGE_WARNING_ << "Batch size " << next_batch_size << " exceeds memory limit " << memory_limit_;
}
if (!size_split_done_) {
if (current_memory_usage_ + next_batch_size < memory_limit_ / 2 || buffered_batches_.empty()) {
Expand Down Expand Up @@ -81,7 +81,7 @@ Status PackedRecordBatchWriter::splitAndWriteFirstBuffer() {
splitter_ = IndicesBasedSplitter(group_indices);

// check memory usage limit
size_t min_memory_limit = groups.size() * MIN_BUFFER_SIZE_PER_FILE;
size_t min_memory_limit = groups.size() * ARROW_PART_UPLOAD_SIZE;
if (memory_limit_ < min_memory_limit) {
return Status::InvalidArgument("Please provide at least " + std::to_string(min_memory_limit / 1024 / 1024) +
" MB of memory for packed writer.");
Expand Down

0 comments on commit 8b6ca71

Please sign in to comment.