Skip to content

Commit b109201

Browse files
authored
Support empty projection in ParquetRecordBatchReader (#1560)
* Support empty projection in ParquetRecordBatchReader * Fix async reader * Fix RAT
1 parent 0192872 commit b109201

File tree

5 files changed

+122
-29
lines changed

5 files changed

+122
-29
lines changed

parquet/src/arrow/array_reader.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ mod builder;
5555
mod byte_array;
5656
mod byte_array_dictionary;
5757
mod dictionary_buffer;
58+
mod empty_array;
5859
mod offset_buffer;
5960

6061
#[cfg(test)]
@@ -97,6 +98,9 @@ pub trait RowGroupCollection {
9798
/// Get schema of parquet file.
9899
fn schema(&self) -> Result<SchemaDescPtr>;
99100

101+
/// Get the numer of rows in this collection
102+
fn num_rows(&self) -> usize;
103+
100104
/// Returns an iterator over the column chunks for particular column
101105
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
102106
}
@@ -106,6 +110,10 @@ impl RowGroupCollection for Arc<dyn FileReader> {
106110
Ok(self.metadata().file_metadata().schema_descr_ptr())
107111
}
108112

113+
fn num_rows(&self) -> usize {
114+
self.metadata().file_metadata().num_rows() as usize
115+
}
116+
109117
fn column_chunks(&self, column_index: usize) -> Result<Box<dyn PageIterator>> {
110118
let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
111119
Ok(Box::new(iterator))

parquet/src/arrow/array_reader/builder.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::sync::Arc;
2020

2121
use arrow::datatypes::{DataType as ArrowType, Field, IntervalUnit, Schema, SchemaRef};
2222

23+
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
2324
use crate::arrow::array_reader::{
2425
make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
2526
ComplexObjectArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
@@ -37,7 +38,7 @@ use crate::data_type::{
3738
Int96Type,
3839
};
3940
use crate::errors::ParquetError::ArrowError;
40-
use crate::errors::{ParquetError, Result};
41+
use crate::errors::{Result};
4142
use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr};
4243
use crate::schema::visitor::TypeVisitor;
4344

@@ -64,10 +65,6 @@ where
6465
filtered_root_names.insert(root.name().to_string());
6566
}
6667

67-
if leaves.is_empty() {
68-
return Err(general_err!("Can't build array reader without columns!"));
69-
}
70-
7168
// Only pass root fields that take part in the projection
7269
// to avoid traversal of columns that are not read.
7370
// TODO: also prune unread parts of the tree in child structures
@@ -412,10 +409,10 @@ impl<'a> ArrayReaderBuilder {
412409
fn build_array_reader(&mut self) -> Result<Box<dyn ArrayReader>> {
413410
let context = ArrayReaderBuilderContext::default();
414411

415-
self.visit_struct(self.root_schema.clone(), &context)
416-
.and_then(|reader_opt| {
417-
reader_opt.ok_or_else(|| general_err!("Failed to build array reader!"))
418-
})
412+
match self.visit_struct(self.root_schema.clone(), &context)? {
413+
Some(reader) => Ok(reader),
414+
None => Ok(make_empty_array_reader(self.row_groups.num_rows())),
415+
}
419416
}
420417

421418
// Utility functions
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::arrow::array_reader::ArrayReader;
19+
use crate::errors::Result;
20+
use arrow::array::{ArrayDataBuilder, ArrayRef, StructArray};
21+
use arrow::datatypes::DataType as ArrowType;
22+
use std::any::Any;
23+
use std::sync::Arc;
24+
25+
/// Returns an [`ArrayReader`] that yields [`StructArray`] with no columns
26+
/// but with row counts that correspond to the amount of data in the file
27+
///
28+
/// This is useful for when projection eliminates all columns within a collection
29+
pub fn make_empty_array_reader(row_count: usize) -> Box<dyn ArrayReader> {
30+
Box::new(EmptyArrayReader::new(row_count))
31+
}
32+
33+
struct EmptyArrayReader {
34+
data_type: ArrowType,
35+
remaining_rows: usize,
36+
}
37+
38+
impl EmptyArrayReader {
39+
pub fn new(row_count: usize) -> Self {
40+
Self {
41+
data_type: ArrowType::Struct(vec![]),
42+
remaining_rows: row_count,
43+
}
44+
}
45+
}
46+
47+
impl ArrayReader for EmptyArrayReader {
48+
fn as_any(&self) -> &dyn Any {
49+
self
50+
}
51+
52+
fn get_data_type(&self) -> &ArrowType {
53+
&self.data_type
54+
}
55+
56+
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
57+
let len = self.remaining_rows.min(batch_size);
58+
self.remaining_rows -= len;
59+
60+
let data = ArrayDataBuilder::new(self.data_type.clone())
61+
.len(len)
62+
.build()
63+
.unwrap();
64+
65+
Ok(Arc::new(StructArray::from(data)))
66+
}
67+
68+
fn get_def_levels(&self) -> Option<&[i16]> {
69+
None
70+
}
71+
72+
fn get_rep_levels(&self) -> Option<&[i16]> {
73+
None
74+
}
75+
}

parquet/src/arrow/arrow_reader.rs

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,18 @@
1919
2020
use std::sync::Arc;
2121

22+
use arrow::array::Array;
2223
use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef};
2324
use arrow::error::Result as ArrowResult;
2425
use arrow::record_batch::{RecordBatch, RecordBatchReader};
2526
use arrow::{array::StructArray, error::ArrowError};
2627

27-
use crate::arrow::array_reader::{build_array_reader, ArrayReader, StructArrayReader};
28+
use crate::arrow::array_reader::{build_array_reader, ArrayReader};
2829
use crate::arrow::schema::parquet_to_arrow_schema;
2930
use crate::arrow::schema::{
3031
parquet_to_arrow_schema_by_columns, parquet_to_arrow_schema_by_root_columns,
3132
};
32-
use crate::errors::{ParquetError, Result};
33+
use crate::errors::Result;
3334
use crate::file::metadata::{KeyValue, ParquetMetaData};
3435
use crate::file::reader::FileReader;
3536

@@ -234,20 +235,10 @@ impl Iterator for ParquetRecordBatchReader {
234235
"Struct array reader should return struct array".to_string(),
235236
)
236237
});
238+
237239
match struct_array {
238240
Err(err) => Some(Err(err)),
239-
Ok(e) => {
240-
match RecordBatch::try_new(self.schema.clone(), e.columns_ref()) {
241-
Err(err) => Some(Err(err)),
242-
Ok(record_batch) => {
243-
if record_batch.num_rows() > 0 {
244-
Some(Ok(record_batch))
245-
} else {
246-
None
247-
}
248-
}
249-
}
250-
}
241+
Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))),
251242
}
252243
}
253244
}
@@ -265,12 +256,6 @@ impl ParquetRecordBatchReader {
265256
batch_size: usize,
266257
array_reader: Box<dyn ArrayReader>,
267258
) -> Result<Self> {
268-
// Check that array reader is struct array reader
269-
array_reader
270-
.as_any()
271-
.downcast_ref::<StructArrayReader>()
272-
.ok_or_else(|| general_err!("The input must be struct array reader!"))?;
273-
274259
let schema = match array_reader.get_data_type() {
275260
ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
276261
_ => unreachable!("Struct array reader's data type is not struct!"),
@@ -1386,4 +1371,26 @@ mod tests {
13861371
schema_without_metadata.as_ref()
13871372
);
13881373
}
1374+
1375+
#[test]
1376+
fn test_empty_projection() {
1377+
let testdata = arrow::util::test_util::parquet_test_data();
1378+
let path = format!("{}/alltypes_plain.parquet", testdata);
1379+
let file = File::open(&path).unwrap();
1380+
let reader = SerializedFileReader::try_from(file).unwrap();
1381+
let expected_rows = reader.metadata().file_metadata().num_rows() as usize;
1382+
1383+
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
1384+
let batch_reader = arrow_reader.get_record_reader_by_columns([], 2).unwrap();
1385+
1386+
let mut total_rows = 0;
1387+
for maybe_batch in batch_reader {
1388+
let batch = maybe_batch.unwrap();
1389+
total_rows += batch.num_rows();
1390+
assert_eq!(batch.num_columns(), 0);
1391+
assert!(batch.num_rows() <= 2);
1392+
}
1393+
1394+
assert_eq!(total_rows, expected_rows);
1395+
}
13891396
}

parquet/src/arrow/async_reader.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,7 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> Stream
345345
input,
346346
InMemoryRowGroup {
347347
schema: metadata.file_metadata().schema_descr_ptr(),
348+
row_count: row_group_metadata.num_rows() as usize,
348349
column_chunks,
349350
},
350351
))
@@ -419,13 +420,18 @@ async fn read_footer<T: AsyncRead + AsyncSeek + Unpin>(
419420
struct InMemoryRowGroup {
420421
schema: SchemaDescPtr,
421422
column_chunks: Vec<Option<InMemoryColumnChunk>>,
423+
row_count: usize,
422424
}
423425

424426
impl RowGroupCollection for InMemoryRowGroup {
425427
fn schema(&self) -> Result<SchemaDescPtr> {
426428
Ok(self.schema.clone())
427429
}
428430

431+
fn num_rows(&self) -> usize {
432+
self.row_count
433+
}
434+
429435
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
430436
let page_reader = self.column_chunks[i].as_ref().unwrap().pages();
431437

0 commit comments

Comments
 (0)