diff --git a/cpp/benchmark/benchmark_packed.cpp b/cpp/benchmark/benchmark_packed.cpp index f252f78..b391ef9 100644 --- a/cpp/benchmark/benchmark_packed.cpp +++ b/cpp/benchmark/benchmark_packed.cpp @@ -22,7 +22,6 @@ #include #include #include -#include #include #include #include @@ -32,8 +31,12 @@ #include #include "filesystem/fs.h" -namespace milvus_storage { +#define SKIP_IF_NOT_OK(status, st) \ + if (!status.ok()) { \ + st.SkipWithError(status.ToString()); \ + } +namespace milvus_storage { // Environment variables to configure the S3 test environment static const char* kEnvAccessKey = "ACCESS_KEY"; static const char* kEnvSecretKey = "SECRET_KEY"; @@ -75,22 +78,23 @@ static void PackedRead(benchmark::State& st, arrow::fs::FileSystem* fs, const st // after writing, the column of large_str is in 0th file, and the last int64 columns are in 1st file std::vector> fields = { - arrow::field("str", arrow::utf8()), - arrow::field("int32", arrow::int32()), - arrow::field("int64", arrow::int64()), + arrow::field("int", arrow::utf8()), + arrow::field("int64", arrow::int32()), + arrow::field("str", arrow::int64()), }; auto schema = arrow::schema(fields); for (auto _ : st) { PackedRecordBatchReader pr(*fs, paths, schema, column_offsets, needed_columns, buffer_size); - auto r = pr.ToTable(); - if (!r.ok()) { - st.SkipWithError(r.status().ToString()); - } - auto table = r.ValueOrDie(); - r = pr.Close(); - if (!r.ok()) { - st.SkipWithError(r.status().ToString()); + auto r = arrow::RecordBatch::MakeEmpty(schema); + SKIP_IF_NOT_OK(r.status(), st) + auto rb = r.ValueOrDie(); + while (true) { + SKIP_IF_NOT_OK(pr.ReadNext(&rb), st); + if (rb == nullptr || rb->num_rows() == 0) { + SKIP_IF_NOT_OK(pr.Close(), st) + break; + } } } } @@ -102,28 +106,28 @@ static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const s arrow::Int64Builder int64_builder; arrow::StringBuilder str_builder; - int_builder.AppendValues({1, 2, 3}); - int64_builder.AppendValues({4, 5, 6}); - str_builder.AppendValues({"foo", "bar", "baz"}); + SKIP_IF_NOT_OK(int_builder.AppendValues({1, 2, 3}), st); + SKIP_IF_NOT_OK(int64_builder.AppendValues({4, 5, 6}), st); + SKIP_IF_NOT_OK(str_builder.AppendValues({std::string(1024, 'b'), std::string(1024, 'a'), std::string(1024, 'z')}), + st); std::shared_ptr int_array; std::shared_ptr int64_array; std::shared_ptr str_array; - int_builder.Finish(&int_array); - int64_builder.Finish(&int64_array); - str_builder.Finish(&str_array); + SKIP_IF_NOT_OK(int_builder.Finish(&int_array), st); + SKIP_IF_NOT_OK(int64_builder.Finish(&int64_array), st); + SKIP_IF_NOT_OK(str_builder.Finish(&str_array), st); std::vector> arrays = {int_array, int64_array, str_array}; auto record_batch = arrow::RecordBatch::Make(schema, 3, arrays); for (auto _ : st) { PackedRecordBatchWriter writer(buffer_size, schema, *fs, path, *parquet::default_writer_properties()); - for (int i = 0; i < 500000; ++i) { + for (int i = 0; i < 8 * 1024; ++i) { auto r = writer.Write(record_batch); if (!r.ok()) { st.SkipWithError(r.ToString()); - std::cerr << "exit on iteration " << i << std::endl; break; } } @@ -134,14 +138,15 @@ static void PackedWrite(benchmark::State& st, arrow::fs::FileSystem* fs, const s } } +std::string PATH = "/tmp/bench/foo"; + BENCHMARK_DEFINE_F(S3Fixture, Write32MB)(benchmark::State& st) { - PackedWrite(st, fs_.get(), "/tmp/bench/foo", 12 * 1024 * 1024); + SKIP_IF_NOT_OK(fs_->CreateDir(PATH), st); + PackedWrite(st, fs_.get(), PATH, 22 * 1024 * 1024); } BENCHMARK_REGISTER_F(S3Fixture, Write32MB)->UseRealTime(); -BENCHMARK_DEFINE_F(S3Fixture, Read32MB)(benchmark::State& st) { - PackedRead(st, fs_.get(), "/tmp/bench/foo", 12 * 1024 * 1024); -} +BENCHMARK_DEFINE_F(S3Fixture, Read32MB)(benchmark::State& st) { PackedRead(st, fs_.get(), PATH, 22 * 1024 * 1024); } BENCHMARK_REGISTER_F(S3Fixture, Read32MB)->UseRealTime(); } // namespace milvus_storage \ No newline at end of file diff --git a/cpp/include/milvus-storage/packed/column_group_writer.h b/cpp/include/milvus-storage/packed/column_group_writer.h index 478fefe..2794d7b 100644 --- a/cpp/include/milvus-storage/packed/column_group_writer.h +++ b/cpp/include/milvus-storage/packed/column_group_writer.h @@ -30,14 +30,14 @@ class ColumnGroupWriter { std::shared_ptr schema, arrow::fs::FileSystem& fs, const std::string& file_path, - const std::vector origin_column_indices); + const std::vector& origin_column_indices); ColumnGroupWriter(GroupId group_id, std::shared_ptr schema, arrow::fs::FileSystem& fs, const std::string& file_path, const parquet::WriterProperties& props, - const std::vector origin_column_indices); + const std::vector& origin_column_indices); Status Init(); Status Write(const std::shared_ptr& record); diff --git a/cpp/src/format/parquet/file_writer.cpp b/cpp/src/format/parquet/file_writer.cpp index 72e1898..670d228 100644 --- a/cpp/src/format/parquet/file_writer.cpp +++ b/cpp/src/format/parquet/file_writer.cpp @@ -37,7 +37,7 @@ ParquetFileWriter::ParquetFileWriter(std::shared_ptr schema, arrow::fs::FileSystem& fs, const std::string& file_path, const parquet::WriterProperties& props) - : schema_(std::move(schema)), fs_(fs), file_path_(file_path), props_(props), count_(0) {} + : schema_(std::move(schema)), fs_(fs), file_path_(file_path), props_(props) {} Status ParquetFileWriter::Init() { auto coln = schema_->num_fields(); diff --git a/cpp/src/packed/column_group_writer.cpp b/cpp/src/packed/column_group_writer.cpp index ff07321..609842a 100644 --- a/cpp/src/packed/column_group_writer.cpp +++ b/cpp/src/packed/column_group_writer.cpp @@ -16,6 +16,8 @@ #include #include #include + +#include #include "common/log.h" #include "common/status.h" #include "format/parquet/file_writer.h" @@ -27,9 +29,9 @@ ColumnGroupWriter::ColumnGroupWriter(GroupId group_id, std::shared_ptr schema, arrow::fs::FileSystem& fs, const std::string& file_path, - const std::vector origin_column_indices) + const std::vector& origin_column_indices) : group_id_(group_id), - writer_(schema, fs, file_path), + writer_(std::move(schema), fs, file_path), column_group_(group_id, origin_column_indices), finished_(false) {} @@ -38,9 +40,9 @@ ColumnGroupWriter::ColumnGroupWriter(GroupId group_id, arrow::fs::FileSystem& fs, const std::string& file_path, const parquet::WriterProperties& props, - const std::vector origin_column_indices) + const std::vector& origin_column_indices) : group_id_(group_id), - writer_(schema, fs, file_path, props), + writer_(std::move(schema), fs, file_path, props), column_group_(group_id, origin_column_indices), flushed_batches_(0), flushed_rows_(0), @@ -73,8 +75,8 @@ Status ColumnGroupWriter::Flush() { Status ColumnGroupWriter::Close() { finished_ = true; - LOG_STORAGE_INFO_ << "Group " << group_id_ << " flushed " << flushed_batches_ << " batches and " << flushed_rows_ - << " rows in " << flushed_count_ << " flushes"; + LOG_STORAGE_DEBUG_ << "Group " << group_id_ << " flushed " << flushed_batches_ << " batches and " << flushed_rows_ + << " rows in " << flushed_count_ << " flushes"; return writer_.Close(); } diff --git a/cpp/src/packed/reader.cpp b/cpp/src/packed/reader.cpp index 4b833f8..6619a5c 100644 --- a/cpp/src/packed/reader.cpp +++ b/cpp/src/packed/reader.cpp @@ -33,7 +33,7 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs, const std::vector& column_offsets, const std::set& needed_columns, const int64_t buffer_size) - : schema_(std::move(schema)), + : schema_(schema), buffer_available_(buffer_size), memory_limit_(buffer_size), row_limit_(0), @@ -74,9 +74,10 @@ arrow::Status PackedRecordBatchReader::advanceBuffer() { auto advance_row_group = [&](int i) -> int64_t { auto& reader = file_readers_[i]; int rg = column_group_states_[i].row_group_offset + 1; - if (rg >= reader->parquet_reader()->metadata()->num_row_groups()) { + int num_row_groups = reader->parquet_reader()->metadata()->num_row_groups(); + if (rg >= num_row_groups) { // No more row groups. It means we're done or there is an error. - LOG_STORAGE_DEBUG_ << "No more row groups in file " << i; + LOG_STORAGE_DEBUG_ << "No more row groups in file " << i << " total row groups " << num_row_groups; return -1; } int64_t rg_size = row_group_sizes_[i][rg]; @@ -109,8 +110,14 @@ arrow::Status PackedRecordBatchReader::advanceBuffer() { chunk_manager_->ResetChunkState(i); } - if (drained_index >= 0 && plan_buffer_size == 0) { - return arrow::Status::OK(); + if (drained_index >= 0) { + if (plan_buffer_size == 0) { + // If nothing to fill, it must be done + return arrow::Status::OK(); + } else { + // Otherwise, the rows are not match, there is something wrong with the files. + return arrow::Status::Invalid("File broken at index " + std::to_string(drained_index)); + } } // Fill in tables if we have enough buffer size @@ -134,7 +141,7 @@ arrow::Status PackedRecordBatchReader::advanceBuffer() { continue; } read_count_++; - LOG_STORAGE_DEBUG_ << "File reader " << i << " read " << rgs_to_read[i].size() << " row groups"; + LOG_STORAGE_DEBUG_ << "File reader " << i << " advanced to row group " << rgs_to_read[i].back(); column_group_states_[i].read_times++; std::shared_ptr read_table = nullptr; RETURN_NOT_OK(file_readers_[i]->ReadRowGroups(rgs_to_read[i], &read_table)); @@ -163,9 +170,9 @@ arrow::Status PackedRecordBatchReader::ReadNext(std::shared_ptr SizeBasedSplitter::SplitRecordBatches( for (GroupId group_id = 0; group_id < group_indices.size(); ++group_id) { auto batch = record->SelectColumns(group_indices[group_id]).ValueOrDie(); if (column_groups.size() < group_indices.size()) { - column_groups.push_back(ColumnGroup(group_id, group_indices[group_id], batch)); + column_groups.emplace_back(group_id, group_indices[group_id], batch); } else { column_groups[group_id].AddRecordBatch(batch); } diff --git a/cpp/src/packed/writer.cpp b/cpp/src/packed/writer.cpp index 993fbd8..a93de71 100644 --- a/cpp/src/packed/writer.cpp +++ b/cpp/src/packed/writer.cpp @@ -45,7 +45,7 @@ PackedRecordBatchWriter::PackedRecordBatchWriter(size_t memory_limit, Status PackedRecordBatchWriter::Write(const std::shared_ptr& record) { size_t next_batch_size = GetRecordBatchMemorySize(record); if (next_batch_size > memory_limit_) { - return Status::InvalidArgument("Provided record batch size exceeds memory limit"); + LOG_STORAGE_WARNING_ << "Batch size " << next_batch_size << " exceeds memory limit " << memory_limit_; } if (!size_split_done_) { if (current_memory_usage_ + next_batch_size < memory_limit_ / 2 || buffered_batches_.empty()) { @@ -81,7 +81,7 @@ Status PackedRecordBatchWriter::splitAndWriteFirstBuffer() { splitter_ = IndicesBasedSplitter(group_indices); // check memory usage limit - size_t min_memory_limit = groups.size() * MIN_BUFFER_SIZE_PER_FILE; + size_t min_memory_limit = groups.size() * ARROW_PART_UPLOAD_SIZE; if (memory_limit_ < min_memory_limit) { return Status::InvalidArgument("Please provide at least " + std::to_string(min_memory_limit / 1024 / 1024) + " MB of memory for packed writer.");