diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index efb1857be9b8f..4cf2b5f87d96b 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -42,7 +42,7 @@ pub struct ParquetTable { impl ParquetTable { pub fn try_new(filename: &str) -> Result { let file = File::open(filename)?; - let parquet_file = ParquetFile::open(file, None)?; + let parquet_file = ParquetFile::open(file, None, 0)?; let schema = parquet_file.projection_schema.clone(); Ok(Self { filename: filename.to_string(), @@ -59,10 +59,10 @@ impl Table for ParquetTable { fn scan( &self, projection: &Option>, - _batch_size: usize, + batch_size: usize, ) -> Result> { let file = File::open(self.filename.clone())?; - let parquet_file = ParquetFile::open(file, projection.clone())?; + let parquet_file = ParquetFile::open(file, projection.clone(), batch_size)?; Ok(vec![Arc::new(Mutex::new(parquet_file))]) } } @@ -172,7 +172,11 @@ where } impl ParquetFile { - pub fn open(file: File, projection: Option>) -> Result { + pub fn open( + file: File, + projection: Option>, + batch_size: usize, + ) -> Result { let reader = SerializedFileReader::new(file)?; let metadata = reader.metadata(); @@ -215,7 +219,7 @@ impl ParquetFile { row_group_index: 0, projection_schema: projected_schema, projection, - batch_size: 64 * 1024, + batch_size, current_row_group: None, column_readers: vec![], }) @@ -225,6 +229,7 @@ impl ParquetFile { if self.row_group_index < self.reader.num_row_groups() { let reader = self.reader.get_row_group(self.row_group_index)?; + self.column_readers.clear(); self.column_readers = Vec::with_capacity(self.projection.len()); for i in 0..self.projection.len() {