Skip to content

[C++][Parquet] Iterating over Parquet RecordBatchReader uses memory equivalent to whole file size #46935

@adamreeve

Description

@adamreeve

Describe the bug, including details regarding any error messages, version, and platform.

When reading a Parquet file with FileReader::GetRecordBatchReader and default options, memory usage will increase while iterating over batches and reach the size of all data in the file.

This may be expected behaviour, but it was quite surprising to me so I want to open this issue to discuss whether this can be improved.

I have my test code in a branch on my fork: main...adamreeve:arrow:mem_use_test

Writing a test file with 100 row groups of 40 MB each (4 GB total):

Details
TEST(TestStreamFile, WriteFile) {
  const std::string file_path = "/tmp/stream_test.parquet";
  constexpr int64_t num_row_groups = 100;
  constexpr int64_t rows_per_row_group = 1000000;
  constexpr int64_t num_columns = 10;

  PARQUET_ASSIGN_OR_THROW(
    const std::shared_ptr<::arrow::io::FileOutputStream> file, ::arrow::io::FileOutputStream::Open(file_path));

  WriterProperties::Builder writer_properties_builder;
  auto writer_properties = writer_properties_builder.build();

  std::vector<NodePtr> fields;
  for (auto col_idx = 0; col_idx < num_columns; ++col_idx) {
    fields.push_back(PrimitiveNode::Make("x" + std::to_string(col_idx), Repetition::REQUIRED, Type::FLOAT));
  }
  auto schema = std::dynamic_pointer_cast<schema::GroupNode>(schema::GroupNode::Make(
      "root", Repetition::REQUIRED, fields));
  std::unique_ptr<ParquetFileWriter> writer = ParquetFileWriter::Open(file, schema, writer_properties, nullptr);

  std::vector<float> buffer(rows_per_row_group);
  for (auto row_group_idx = 0; row_group_idx < num_row_groups; ++row_group_idx) {
    auto row_group = writer->AppendRowGroup();
    for (auto col_idx = 0; col_idx < num_columns; ++col_idx) {
      ::arrow::random_real(rows_per_row_group, row_group_idx * num_columns + col_idx, -1.0, 1.0, &buffer);
      auto column_writer = row_group->NextColumn();
      auto& float_column_writer = dynamic_cast<FloatWriter&>(*column_writer);
      float_column_writer.WriteBatch(rows_per_row_group, nullptr, nullptr, buffer.data());
    }
    row_group->Close();
  }


  writer->Close();
} 

Reading the file:

Details
TEST(TestStreamFile, ReadFile) {
  const std::string file_path = "/tmp/stream_test.parquet";
  PARQUET_ASSIGN_OR_THROW(
    std::shared_ptr<::arrow::io::ReadableFile> input_file, ::arrow::io::ReadableFile::Open(file_path, ::arrow::default_memory_pool()));

  ReaderProperties reader_properties;
  ArrowReaderProperties arrow_reader_properties;
  //arrow_reader_properties.set_pre_buffer(false);

  FileReaderBuilder builder;
  PARQUET_THROW_NOT_OK(builder.Open(input_file, reader_properties));
  builder.properties(arrow_reader_properties);

  int batchesRead = 0;
  int64_t maxRss = 0;
  {
    std::unique_ptr<FileReader> reader;
    PARQUET_THROW_NOT_OK(builder.Build(&reader));

    PARQUET_ASSIGN_OR_THROW(
      std::shared_ptr<::arrow::RecordBatchReader> batch_reader, reader->GetRecordBatchReader());

    while (true) {
      std::shared_ptr<::arrow::RecordBatch> batch;
      PARQUET_THROW_NOT_OK(batch_reader->ReadNext(&batch));
      if (batch == nullptr) {
        break;
      }
      int64_t rss = ::arrow::internal::GetCurrentRSS();
      std::cout << "Batch " << batchesRead << ", RSS = " << (rss / (double)(1024 * 1024)) << " MB" << std::endl;
      maxRss = std::max(maxRss, rss);
      batchesRead++;
    }
  }

When running the read test, the output looks like:

Batch 0, RSS = 1151.82 MB
Batch 1, RSS = 1151.82 MB
...
Batch 318, RSS = 1151.82 MB
Batch 319, RSS = 1151.82 MB
Batch 320, RSS = 2175.82 MB
Batch 321, RSS = 2175.82 MB
...
Batch 669, RSS = 2175.82 MB
Batch 670, RSS = 2175.82 MB
Batch 671, RSS = 3199.82 MB
Batch 672, RSS = 3199.82 MB
...
Batch 1005, RSS = 3199.82 MB
Batch 1006, RSS = 3199.82 MB
Batch 1007, RSS = 4223.82 MB
Batch 1008, RSS = 4223.82 MB
...
Batch 1340, RSS = 4223.82 MB
Batch 1341, RSS = 4223.82 MB
Batch 1342, RSS = 5247.82 MB
Batch 1343, RSS = 5247.82 MB
...
Batch 1524, RSS = 5247.82 MB
Batch 1525, RSS = 5247.82 MB
Read 1526 batches
Max RSS = 5247.82 MB

From some experimenting, I found that disabling pre-buffering (uncommenting this line) greatly reduces memory use:

Batch 0, RSS = 1079.81 MB
Batch 1, RSS = 1079.8 MB
...
Batch 1524, RSS = 1079.8 MB
Batch 1525, RSS = 1079.8 MB
Read 1526 batches
Max RSS = 1079.81 MB

This memory use still looked a bit high to me, but the max RSS reported by /usr/bin/time -v was a lot lower, at about 94 MB and 4.7 GB with pre-buffering.

From looking at the code, I can see there is a cache of futures of buffers in the ReadRangeCache::Impl.

Unless I'm missing something, it looks like once a buffer is stored in this cache, it's never removed, which explains the memory usage behaviour. Should buffers be evicted from this cache once they've been read to reduce memory usage?

Component(s)

C++, Parquet

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions