Skip to content


clean up reader and writer (#130)
Browse files Browse the repository at this point in the history
issue: #127

Signed-off-by: shaoting-huang <>
  • Loading branch information
shaoting-huang authored Aug 8, 2024
1 parent e48b9c6 commit 7fa7dba
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 220 deletions.
95 changes: 65 additions & 30 deletions cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,61 +18,96 @@
#include <arrow/filesystem/filesystem.h>
#include <arrow/record_batch.h>
#include <cstddef>
#include <set>
#include <cstdint>
#include <string>
#include <utility>
#include <vector>
#include "storage/options.h"
#include <queue>

namespace milvus_storage {

struct ColumnOffset {
int file_index;
int column_index;

ColumnOffset(int file_index, int column_index) : file_index(file_index), column_index(column_index) {}

struct ColumnOffsetComparator {
bool operator()(const ColumnOffset& a, const ColumnOffset& b) const { return a.column_index < b.column_index; }

using ColumnOffsetMinHeap = std::priority_queue<ColumnOffset, std::vector<ColumnOffset>, ColumnOffsetComparator>;

struct TableState {
int64_t row_offset;
int64_t row_group_offset;
int64_t memory_size;

TableState(int64_t row_offset, int64_t row_group_offset, int64_t memory_size)
: row_offset(row_offset), row_group_offset(row_group_offset), memory_size(memory_size) {}

void addRowOffset(int64_t row_offset) { this->row_offset += row_offset; }

void setRowGroupOffset(int64_t row_group_offset) { this->row_group_offset = row_group_offset; }

void addMemorySize(int64_t memory_size) { this->memory_size += memory_size; }

void resetMemorySize() { this->memory_size = 0; }

struct ChunkState {
int count;
int64_t offset;

ChunkState(int count, int64_t offset) : count(count), offset(offset) {}

void reset() {

void resetOffset() { this->offset = 0; }

void addOffset(int64_t offset) { this->offset += offset; }

void resetCount() { this->count = 0; }

void addCount(int count) { this->count += count; }

// Default number of rows to read when using ::arrow::RecordBatchReader
static constexpr int64_t DefaultBatchSize = 1024;
static constexpr int64_t DefaultBufferSize = 16 * 1024 * 1024;

class PackedRecordBatchReader : public arrow::RecordBatchReader {
PackedRecordBatchReader(arrow::fs::FileSystem& fs,
std::vector<std::string>& paths,
std::shared_ptr<arrow::Schema> schema,
std::vector<std::pair<int, int>>& column_offsets,
std::vector<int>& needed_columns,
int64_t buffer_size = DefaultBufferSize);
const std::vector<std::string>& paths,
const std::shared_ptr<arrow::Schema> schema,
const std::vector<ColumnOffset>& column_offsets,
const std::vector<int>& needed_columns,
const int64_t buffer_size = DefaultBufferSize);

std::shared_ptr<arrow::Schema> schema() const override;

arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) override;

// Advance buffer to fill the expected buffer size
arrow::Status advance_buffer();
// Open file readers
arrow::Status openInternal();
arrow::Status advanceBuffer();

size_t buffer_size_;
size_t buffer_available_;
const std::shared_ptr<arrow::Schema> schema_;

// Files
arrow::fs::FileSystem& fs_;
std::vector<std::string>& paths_;
std::set<int> needed_path_indices_;
std::shared_ptr<arrow::Schema> schema_;
size_t buffer_available_;
std::vector<std::unique_ptr<parquet::arrow::FileReader>> file_readers_;
std::vector<std::pair<int, int>> needed_column_offsets_;
std::vector<int> needed_columns_;

// Internal table states
std::vector<ColumnOffset> needed_column_offsets_;
std::vector<std::shared_ptr<arrow::Table>> tables_;
std::vector<TableState> table_states_;
int64_t limit_;
std::vector<int64_t> row_offsets_;
std::vector<int> row_group_offsets_;
std::vector<int64_t> table_memory_sizes_;

// Internal chunking states
std::vector<int> chunk_numbers_;
std::vector<int64_t> chunk_offsets_;
std::vector<ChunkState> chunk_states_;
int64_t absolute_row_position_;
ColumnOffsetMinHeap sorted_offsets_;

} // namespace milvus_storage
162 changes: 63 additions & 99 deletions cpp/src/packed/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,77 +17,58 @@
#include <arrow/status.h>
#include <arrow/table.h>
#include <parquet/properties.h>
#include <iostream>
#include <queue>
#include <utility>
#include "common/arrow_util.h"
#include "test_util.h"

using parquet::arrow::FileReader;

namespace milvus_storage {

PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
std::vector<std::string>& paths,
std::shared_ptr<arrow::Schema> schema,
std::vector<std::pair<int, int>>& column_offsets,
std::vector<int>& needed_columns,
int64_t buffer_size)
: fs_(fs), paths_(paths), schema_(std::move(schema)), needed_columns_(needed_columns), buffer_size_(buffer_size) {
buffer_available_ = buffer_size_;

for (auto i : needed_columns) {
const std::vector<std::string>& paths,
const std::shared_ptr<arrow::Schema> schema,
const std::vector<ColumnOffset>& column_offsets,
const std::vector<int>& needed_columns,
const int64_t buffer_size)
: schema_(std::move(schema)), buffer_available_(buffer_size), limit_(0), absolute_row_position_(0) {
std::set<int> needed_path_indices;
for (int i : needed_columns) {

std::shared_ptr<arrow::Schema> PackedRecordBatchReader::schema() const { return schema_; }

arrow::Status PackedRecordBatchReader::openInternal() {
// auto read_properties = parquet::default_arrow_reader_properties();
// read_properties.set_pre_buffer(true);
for (int i = 0; i < paths_.size(); i++) {
// auto res = MakeArrowFileReader(fs_, path, read_properties);
if (needed_path_indices_.find(i) == needed_path_indices_.end()) {
for (int i = 0; i < paths.size(); i++) {
if (needed_path_indices.find(i) == needed_path_indices.end()) {
auto res = MakeArrowFileReader(fs_, paths_[i]); // PreBuffer is turned on by default
if (!res.ok()) {
throw std::runtime_error(res.status().ToString());
// PreBuffer is turned on by default
auto result = MakeArrowFileReader(fs, paths[i]);
if (!result.ok()) {
throw std::runtime_error(result.status().ToString());
row_group_offsets_ = std::vector<int>(file_readers_.size(), -1);
row_offsets_ = std::vector<int64_t>(file_readers_.size(), 0);
table_memory_sizes_ = std::vector<int64_t>(file_readers_.size(), 0);
tables_ = std::vector<std::shared_ptr<arrow::Table>>(
paths_.size(), nullptr); // tables are referrenced by column_offsets, so it's size is of paths_'s size.
limit_ = 0;
absolute_row_position_ = 0;
chunk_numbers_ = std::vector<int>(needed_columns_.size(), 0);
chunk_offsets_ = std::vector<int64_t>(needed_columns_.size(), 0);
return arrow::Status::OK();

// Initialize table states and chunk states
table_states_.resize(file_readers_.size(), TableState(0, -1, 0));
// tables are referrenced by column_offsets, so it's size is of paths's size.
tables_.resize(paths.size(), nullptr);
chunk_states_.resize(needed_column_offsets_.size(), ChunkState(0, 0));

arrow::Status PackedRecordBatchReader::advance_buffer() {
if (file_readers_.empty()) {
std::shared_ptr<arrow::Schema> PackedRecordBatchReader::schema() const { return schema_; }

auto rgs_to_read = std::vector<std::vector<int>>(file_readers_.size(), std::vector<int>());
arrow::Status PackedRecordBatchReader::advanceBuffer() {
std::vector<std::vector<int>> rgs_to_read(file_readers_.size());
size_t plan_buffer_size = 0;

auto advance_row_group = [&](int i) {
auto& reader = file_readers_[i];
int rg = row_group_offsets_[i] + 1;
int rg = table_states_[i].row_group_offset + 1;
if (rg < reader->parquet_reader()->metadata()->num_row_groups()) {
int64_t rg_size = reader->parquet_reader()->metadata()->RowGroup(rg)->total_byte_size();
plan_buffer_size += rg_size;
table_memory_sizes_[i] += rg_size;
row_group_offsets_[i] = rg;
row_offsets_[i] += reader->parquet_reader()->metadata()->RowGroup(rg)->num_rows();
return rg;
// No more row groups. It means we're done or there is an error.
Expand All @@ -97,52 +78,47 @@ arrow::Status PackedRecordBatchReader::advance_buffer() {
// Fill in tables that have no rows available
int drained_index = -1;
for (int i = 0; i < file_readers_.size(); ++i) {
if (row_offsets_[i] > limit_) {
if (table_states_[i].row_offset > limit_) {
buffer_available_ += table_memory_sizes_[i]; // Release memory
table_memory_sizes_[i] = 0;
auto rg = advance_row_group(i);
buffer_available_ += table_states_[i].memory_size;
int rg = advance_row_group(i);
if (rg < 0) {
drained_index = i;
// TODO: reset chunk_numbers_
for (int j = 0; j < needed_column_offsets_.size(); ++j) {
if (needed_column_offsets_[j].first == i) {
chunk_numbers_[j] = 0;
chunk_offsets_[j] = 0;
if (needed_column_offsets_[j].file_index == i) {
if (drained_index >= 0) {
if (plan_buffer_size == 0) {
return arrow::Status::OK();
if (drained_index >= 0 && plan_buffer_size == 0) {
return arrow::Status::OK();

// Fill in tables if we have enough buffer size
// find the lowest offset table and advance it
auto second_comp = [](const std::pair<int, int>& a, const std::pair<int, int>& b) { return a.second < b.second; };
std::priority_queue<std::pair<int, int>, std::vector<std::pair<int, int>>, decltype(second_comp)> sorted_offsets(
for (int i = 0; i < row_offsets_.size(); ++i) {
sorted_offsets.emplace(i, row_offsets_[i]);
ColumnOffsetMinHeap sorted_offsets;
for (int i = 0; i < table_states_.size(); ++i) {
sorted_offsets.emplace(i, table_states_[i].row_offset);
while (true) {
auto lowest_offset =;
int i = lowest_offset.first;
int rg = row_group_offsets_[i] + 1;
auto& reader = file_readers_[i];
ColumnOffset lowest_offset =;
int file_index = lowest_offset.file_index;
int rg = table_states_[file_index].row_group_offset + 1;
auto& reader = file_readers_[file_index];
if (rg < reader->parquet_reader()->metadata()->num_row_groups()) {
int64_t size_in_plan = reader->parquet_reader()->metadata()->RowGroup(rg)->total_byte_size();
if (plan_buffer_size + size_in_plan < buffer_available_) {
int rg = advance_row_group(i);
int rg = advance_row_group(file_index);
if (rg < 0) {
sorted_offsets.emplace(i, row_offsets_[i]);
sorted_offsets.emplace(file_index, table_states_[file_index].row_offset);
Expand All @@ -151,25 +127,20 @@ arrow::Status PackedRecordBatchReader::advance_buffer() {

// Conduct read and update buffer size
for (int i = 0; i < file_readers_.size(); ++i) {
auto& reader = file_readers_[i];
RETURN_NOT_OK(reader->ReadRowGroups(rgs_to_read[i], &tables_[i]));
RETURN_NOT_OK(file_readers_[i]->ReadRowGroups(rgs_to_read[i], &tables_[i]));
buffer_available_ -= plan_buffer_size;
limit_ =;
limit_ =;

return arrow::Status::OK();

arrow::Status PackedRecordBatchReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) {
if (absolute_row_position_ >= limit_) {
arrow::Status st = advance_buffer();
if (!st.ok()) {
return st;
} else {
if (absolute_row_position_ >= limit_) {
*out = nullptr;
return arrow::Status::OK();
if (absolute_row_position_ >= limit_) {
*out = nullptr;
return arrow::Status::OK();

Expand All @@ -178,12 +149,11 @@ arrow::Status PackedRecordBatchReader::ReadNext(std::shared_ptr<arrow::RecordBat
std::vector<const arrow::Array*> chunks(needed_column_offsets_.size());

for (int i = 0; i < needed_column_offsets_.size(); ++i) {
auto offset = needed_column_offsets_[i];
auto table = tables_[offset.first];
auto column = table->column(offset.second);
int column_index = needed_column_offsets_[i].column_index;
auto column = tables_[needed_column_offsets_[i].file_index]->column(column_index);

auto chunk = column->chunk(chunk_numbers_[i]).get();
int64_t chunk_remaining = chunk->length() - chunk_offsets_[i];
auto chunk = column->chunk(chunk_states_[i].count).get();
int64_t chunk_remaining = chunk->length() - chunk_states_[i].offset;

if (chunk_remaining < chunksize) {
chunksize = chunk_remaining;
Expand All @@ -197,21 +167,15 @@ arrow::Status PackedRecordBatchReader::ReadNext(std::shared_ptr<arrow::RecordBat

for (int i = 0; i < needed_column_offsets_.size(); ++i) {
// Exhausted chunk
const arrow::Array* chunk = chunks[i];
const int64_t offset = chunk_offsets_[i];
auto chunk = chunks[i];
auto offset = chunk_states_[i].offset;
std::shared_ptr<arrow::ArrayData> slice_data;
if ((chunk->length() - offset) == chunksize) {
chunk_offsets_[i] = 0;
if (offset > 0) {
// Need to slice
slice_data = chunk->Slice(offset, chunksize)->data();
} else {
// No slice
slice_data = chunk->data();
if (chunk->length() - offset == chunksize) {
slice_data = (offset > 0) ? chunk->Slice(offset, chunksize)->data() : chunk->data();
} else {
chunk_offsets_[i] += chunksize;
slice_data = chunk->Slice(offset, chunksize)->data();
batch_data[i] = std::move(slice_data);
Expand Down

0 comments on commit 7fa7dba

Please sign in to comment.