Skip to content

Commit

Permalink
avoid large overallocates in sync reader (#2511)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ted-Jiang authored Aug 19, 2022
1 parent 68934f0 commit b8c5a64
Showing 1 changed file with 27 additions and 3 deletions.
30 changes: 27 additions & 3 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use arrow::record_batch::{RecordBatch, RecordBatchReader};
use arrow::{array::StructArray, error::ArrowError};

use crate::arrow::array_reader::{
build_array_reader, ArrayReader, FileReaderRowGroupCollection,
build_array_reader, ArrayReader, FileReaderRowGroupCollection, RowGroupCollection,
};
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::arrow::schema::parquet_to_arrow_schema_by_columns;
Expand Down Expand Up @@ -282,6 +282,8 @@ impl ArrowReader for ParquetFileArrowReader {
let array_reader =
build_array_reader(Arc::new(self.get_schema()?), mask, &self.file_reader)?;

// Try to avoid allocate large buffer
let batch_size = self.file_reader.num_rows().min(batch_size);
Ok(ParquetRecordBatchReader::new(
batch_size,
array_reader,
Expand Down Expand Up @@ -404,6 +406,10 @@ impl<T: ChunkReader + 'static> ArrowReaderBuilder<SyncReader<T>> {
let mut filter = self.filter;
let mut selection = self.selection;

// Try to avoid allocate large buffer
let batch_size = self
.batch_size
.min(self.metadata.file_metadata().num_rows() as usize);
if let Some(filter) = filter.as_mut() {
for predicate in filter.predicates.iter_mut() {
if !selects_any(selection.as_ref()) {
Expand All @@ -415,7 +421,7 @@ impl<T: ChunkReader + 'static> ArrowReaderBuilder<SyncReader<T>> {
build_array_reader(Arc::clone(&self.schema), projection, &reader)?;

selection = Some(evaluate_predicate(
self.batch_size,
batch_size,
array_reader,
selection,
predicate.as_mut(),
Expand All @@ -431,7 +437,7 @@ impl<T: ChunkReader + 'static> ArrowReaderBuilder<SyncReader<T>> {
}

Ok(ParquetRecordBatchReader::new(
self.batch_size,
batch_size,
array_reader,
selection,
))
Expand Down Expand Up @@ -2152,4 +2158,22 @@ mod tests {
.unwrap()
}
}

#[test]
fn test_batch_size_overallocate() {
let testdata = arrow::util::test_util::parquet_test_data();
// `alltypes_plain.parquet` only have 8 rows
let path = format!("{}/alltypes_plain.parquet", testdata);
let test_file = File::open(&path).unwrap();

let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
let num_rows = builder.metadata.file_metadata().num_rows();
let reader = builder
.with_batch_size(1024)
.with_projection(ProjectionMask::all())
.build()
.unwrap();
assert_ne!(1024, num_rows);
assert_eq!(reader.batch_size, num_rows as usize);
}
}

0 comments on commit b8c5a64

Please sign in to comment.