Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose page-level arrow reader API (#4298) #4307

Merged
merged 4 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_
use crate::arrow::array_reader::{
make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
PrimitiveArrayReader, RowGroupCollection, StructArrayReader,
PrimitiveArrayReader, RowGroups, StructArrayReader,
};
use crate::arrow::schema::{ParquetField, ParquetFieldType};
use crate::arrow::ProjectionMask;
Expand All @@ -39,7 +39,7 @@ use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
pub fn build_array_reader(
field: Option<&ParquetField>,
mask: &ProjectionMask,
row_groups: &dyn RowGroupCollection,
row_groups: &dyn RowGroups,
) -> Result<Box<dyn ArrayReader>> {
let reader = field
.and_then(|field| build_reader(field, mask, row_groups).transpose())
Expand All @@ -52,7 +52,7 @@ pub fn build_array_reader(
fn build_reader(
field: &ParquetField,
mask: &ProjectionMask,
row_groups: &dyn RowGroupCollection,
row_groups: &dyn RowGroups,
) -> Result<Option<Box<dyn ArrayReader>>> {
match field.field_type {
ParquetFieldType::Primitive { .. } => {
Expand All @@ -75,7 +75,7 @@ fn build_reader(
fn build_map_reader(
field: &ParquetField,
mask: &ProjectionMask,
row_groups: &dyn RowGroupCollection,
row_groups: &dyn RowGroups,
) -> Result<Option<Box<dyn ArrayReader>>> {
let children = field.children().unwrap();
assert_eq!(children.len(), 2);
Expand Down Expand Up @@ -127,7 +127,7 @@ fn build_list_reader(
field: &ParquetField,
mask: &ProjectionMask,
is_large: bool,
row_groups: &dyn RowGroupCollection,
row_groups: &dyn RowGroups,
) -> Result<Option<Box<dyn ArrayReader>>> {
let children = field.children().unwrap();
assert_eq!(children.len(), 1);
Expand Down Expand Up @@ -173,7 +173,7 @@ fn build_list_reader(
fn build_fixed_size_list_reader(
field: &ParquetField,
mask: &ProjectionMask,
row_groups: &dyn RowGroupCollection,
row_groups: &dyn RowGroups,
) -> Result<Option<Box<dyn ArrayReader>>> {
let children = field.children().unwrap();
assert_eq!(children.len(), 1);
Expand Down Expand Up @@ -210,7 +210,7 @@ fn build_fixed_size_list_reader(
fn build_primitive_reader(
field: &ParquetField,
mask: &ProjectionMask,
row_groups: &dyn RowGroupCollection,
row_groups: &dyn RowGroups,
) -> Result<Option<Box<dyn ArrayReader>>> {
let (col_idx, primitive_type) = match &field.field_type {
ParquetFieldType::Primitive {
Expand Down Expand Up @@ -301,7 +301,7 @@ fn build_primitive_reader(
fn build_struct_reader(
field: &ParquetField,
mask: &ProjectionMask,
row_groups: &dyn RowGroupCollection,
row_groups: &dyn RowGroups,
) -> Result<Option<Box<dyn ArrayReader>>> {
let arrow_fields = match &field.arrow_type {
DataType::Struct(children) => children,
Expand Down Expand Up @@ -338,7 +338,7 @@ fn build_struct_reader(
#[cfg(test)]
mod tests {
use super::*;
use crate::arrow::schema::parquet_to_array_schema_and_fields;
use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::util::test_common::file_util::get_test_file;
use arrow::datatypes::Field;
Expand All @@ -352,7 +352,7 @@ mod tests {

let file_metadata = file_reader.metadata().file_metadata();
let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
let (_, fields) = parquet_to_array_schema_and_fields(
let (_, fields) = parquet_to_arrow_schema_and_fields(
file_metadata.schema_descr(),
ProjectionMask::all(),
file_metadata.key_value_metadata(),
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/array_reader/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ mod tests {
use crate::arrow::array_reader::build_array_reader;
use crate::arrow::array_reader::list_array::ListArrayReader;
use crate::arrow::array_reader::test_util::InMemoryArrayReader;
use crate::arrow::schema::parquet_to_array_schema_and_fields;
use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
use crate::arrow::{parquet_to_arrow_schema, ArrowWriter, ProjectionMask};
use crate::file::properties::WriterProperties;
use crate::file::reader::{FileReader, SerializedFileReader};
Expand Down Expand Up @@ -566,7 +566,7 @@ mod tests {
let file_metadata = file_reader.metadata().file_metadata();
let schema = file_metadata.schema_descr();
let mask = ProjectionMask::leaves(schema, vec![0]);
let (_, fields) = parquet_to_array_schema_and_fields(
let (_, fields) = parquet_to_arrow_schema_and_fields(
schema,
ProjectionMask::all(),
file_metadata.key_value_metadata(),
Expand Down
26 changes: 7 additions & 19 deletions parquet/src/arrow/array_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use crate::arrow::record_reader::GenericRecordReader;
use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::file::reader::{FilePageIterator, FileReader};
use crate::schema::types::SchemaDescPtr;

mod builder;
mod byte_array;
Expand Down Expand Up @@ -100,22 +99,15 @@ pub trait ArrayReader: Send {
}

/// A collection of row groups
pub trait RowGroupCollection {
/// Get schema of parquet file.
fn schema(&self) -> SchemaDescPtr;

pub trait RowGroups {
/// Get the number of rows in this collection
fn num_rows(&self) -> usize;

/// Returns an iterator over the column chunks for particular column
/// Returns a [`PageIterator`] for the column chunks with the given leaf column index
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
}

impl RowGroupCollection for Arc<dyn FileReader> {
fn schema(&self) -> SchemaDescPtr {
self.metadata().file_metadata().schema_descr_ptr()
}

impl RowGroups for Arc<dyn FileReader> {
fn num_rows(&self) -> usize {
self.metadata().file_metadata().num_rows() as usize
}
Expand All @@ -126,26 +118,22 @@ impl RowGroupCollection for Arc<dyn FileReader> {
}
}

pub(crate) struct FileReaderRowGroupCollection {
pub(crate) struct FileReaderRowGroups {
/// The underling file reader
reader: Arc<dyn FileReader>,
/// Optional list of row group indices to scan
row_groups: Option<Vec<usize>>,
}

impl FileReaderRowGroupCollection {
/// Creates a new [`RowGroupCollection`] from a `FileReader` and an optional
impl FileReaderRowGroups {
/// Creates a new [`RowGroups`] from a `FileReader` and an optional
/// list of row group indexes to scan
pub fn new(reader: Arc<dyn FileReader>, row_groups: Option<Vec<usize>>) -> Self {
Self { reader, row_groups }
}
}

impl RowGroupCollection for FileReaderRowGroupCollection {
fn schema(&self) -> SchemaDescPtr {
self.reader.metadata().file_metadata().schema_descr_ptr()
}

impl RowGroups for FileReaderRowGroups {
fn num_rows(&self) -> usize {
match &self.row_groups {
None => self.reader.metadata().file_metadata().num_rows() as usize,
Expand Down
39 changes: 30 additions & 9 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@ use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
use arrow_select::filter::prep_null_mask_filter;

use crate::arrow::array_reader::{
build_array_reader, ArrayReader, FileReaderRowGroupCollection, RowGroupCollection,
};
use crate::arrow::schema::parquet_to_array_schema_and_fields;
use crate::arrow::schema::ParquetField;
use crate::arrow::ProjectionMask;
use crate::arrow::array_reader::{build_array_reader, ArrayReader, FileReaderRowGroups};
use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
use crate::arrow::{FieldLevels, ProjectionMask};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::ParquetMetaData;
use crate::file::reader::{ChunkReader, SerializedFileReader};
Expand All @@ -41,6 +38,7 @@ use crate::schema::types::SchemaDescriptor;
mod filter;
mod selection;

pub use crate::arrow::array_reader::RowGroups;
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};

Expand Down Expand Up @@ -87,7 +85,7 @@ impl<T> ArrowReaderBuilder<T> {
false => metadata.file_metadata().key_value_metadata(),
};

let (schema, fields) = parquet_to_array_schema_and_fields(
let (schema, fields) = parquet_to_arrow_schema_and_fields(
metadata.file_metadata().schema_descr(),
ProjectionMask::all(),
kv_metadata,
Expand Down Expand Up @@ -269,8 +267,7 @@ impl<T: ChunkReader + 'static> ArrowReaderBuilder<SyncReader<T>> {
///
/// Note: this will eagerly evaluate any `RowFilter` before returning
pub fn build(self) -> Result<ParquetRecordBatchReader> {
let reader =
FileReaderRowGroupCollection::new(Arc::new(self.input.0), self.row_groups);
let reader = FileReaderRowGroups::new(Arc::new(self.input.0), self.row_groups);

let mut filter = self.filter;
let mut selection = self.selection;
Expand Down Expand Up @@ -420,6 +417,30 @@ impl ParquetRecordBatchReader {
.build()
}

/// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`]
///
/// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a
/// higher-level interface for reading parquet data from a file
pub fn try_new_with_row_groups(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lets you construct a ParquetRecordBatchReader from an arbitrary source of pages

levels: &FieldLevels,
row_groups: &dyn RowGroups,
batch_size: usize,
selection: Option<RowSelection>,
) -> Result<Self> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to include a ProjectionMask as this can be passed to parquet_to_arrow_field_levels instead

let array_reader = build_array_reader(
levels.levels.as_ref(),
&ProjectionMask::all(),
row_groups,
)?;

Ok(Self {
batch_size,
array_reader,
schema: Arc::new(Schema::new(levels.fields.clone())),
selection: selection.map(|s| s.trim().into()),
})
}

/// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
/// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
/// all rows will be returned
Expand Down
11 changes: 8 additions & 3 deletions parquet/src/arrow/arrow_reader/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,14 @@ impl RowSelection {
}
}

/// Given an offset index, return the offset ranges for all data pages selected by `self`
#[cfg(any(test, feature = "async"))]
pub(crate) fn scan_ranges(
/// Given an offset index, return the byte ranges for all data pages selected by `self`
///
/// This is useful for determining what byte ranges to fetch from underlying storage
///
/// Note: this method does not make any effort to combine consecutive ranges, nor coalesce
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// ranges that are close together. This is instead delegated to the IO subsystem to optimise,
/// e.g. [`ObjectStore::get_ranges`](object_store::ObjectStore::get_ranges)
pub fn scan_ranges(
&self,
page_locations: &[crate::format::PageLocation],
) -> Vec<Range<usize>> {
Expand Down
14 changes: 5 additions & 9 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,11 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;

use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
use crate::arrow::array_reader::{build_array_reader, RowGroups};
use crate::arrow::arrow_reader::{
apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderOptions,
ParquetRecordBatchReader, RowFilter, RowSelection,
};
use crate::arrow::schema::ParquetField;
use crate::arrow::ProjectionMask;

use crate::column::page::{PageIterator, PageReader};
Expand All @@ -120,6 +119,7 @@ pub use metadata::*;
#[cfg(feature = "object_store")]
mod store;

use crate::arrow::schema::ParquetField;
#[cfg(feature = "object_store")]
pub use store::*;

Expand Down Expand Up @@ -648,11 +648,7 @@ impl<'a> InMemoryRowGroup<'a> {
}
}

impl<'a> RowGroupCollection for InMemoryRowGroup<'a> {
fn schema(&self) -> SchemaDescPtr {
self.metadata.schema_descr_ptr()
}

impl<'a> RowGroups for InMemoryRowGroup<'a> {
fn num_rows(&self) -> usize {
self.row_count
}
Expand Down Expand Up @@ -768,7 +764,7 @@ mod tests {
use crate::arrow::arrow_reader::{
ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
};
use crate::arrow::schema::parquet_to_array_schema_and_fields;
use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
use crate::arrow::ArrowWriter;
use crate::file::footer::parse_metadata;
use crate::file::page_index::index_reader;
Expand Down Expand Up @@ -1413,7 +1409,7 @@ mod tests {
};

let requests = async_reader.requests.clone();
let (_, fields) = parquet_to_array_schema_and_fields(
let (_, fields) = parquet_to_arrow_schema_and_fields(
metadata.file_metadata().schema_descr(),
ProjectionMask::all(),
None,
Expand Down
3 changes: 2 additions & 1 deletion parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ pub use self::async_writer::AsyncArrowWriter;
use crate::schema::types::SchemaDescriptor;

pub use self::schema::{
arrow_to_parquet_schema, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns,
arrow_to_parquet_schema, parquet_to_arrow_field_levels, parquet_to_arrow_schema,
parquet_to_arrow_schema_by_columns, FieldLevels,
};

/// Schema metadata key used to store serialized Arrow IPC schema
Expand Down
10 changes: 6 additions & 4 deletions parquet/src/arrow/schema/complex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::basic::{ConvertedType, Repetition};
use crate::errors::ParquetError;
use crate::errors::Result;
use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
use arrow_schema::{DataType, Field, Schema, SchemaBuilder};
use arrow_schema::{DataType, Field, Fields, SchemaBuilder};

fn get_repetition(t: &Type) -> Repetition {
let info = t.get_basic_info();
Expand All @@ -35,6 +35,7 @@ fn get_repetition(t: &Type) -> Repetition {
}

/// Representation of a parquet file, in terms of arrow schema elements
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When reading this PR I see this comment and wonder "is it really a parquet file" or is it more like a "parquet field"? Or a "possibly nested field" 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reworded, I agree the use of file here is very misleading

#[derive(Debug, Clone)]
pub struct ParquetField {
/// The level which represents an insertion into the current list
/// i.e. guaranteed to be > 0 for a list type
Expand Down Expand Up @@ -82,6 +83,7 @@ impl ParquetField {
}
}

#[derive(Debug, Clone)]
pub enum ParquetFieldType {
Primitive {
/// The index of the column in parquet
Expand Down Expand Up @@ -554,13 +556,13 @@ fn convert_field(

/// Computes the [`ParquetField`] for the provided [`SchemaDescriptor`] with `leaf_columns` listing
/// the indexes of leaf columns to project, and `embedded_arrow_schema` the optional
/// [`Schema`] embedded in the parquet metadata
/// [`Fields`] embedded in the parquet metadata
///
/// Note: This does not support out of order column projection
pub fn convert_schema(
schema: &SchemaDescriptor,
mask: ProjectionMask,
embedded_arrow_schema: Option<&Schema>,
embedded_arrow_schema: Option<&Fields>,
) -> Result<Option<ParquetField>> {
let mut visitor = Visitor {
next_col_idx: 0,
Expand All @@ -570,7 +572,7 @@ pub fn convert_schema(
let context = VisitorContext {
rep_level: 0,
def_level: 0,
data_type: embedded_arrow_schema.map(|s| DataType::Struct(s.fields().clone())),
data_type: embedded_arrow_schema.map(|fields| DataType::Struct(fields.clone())),
};

visitor.dispatch(&schema.root_schema_ptr(), context)
Expand Down
Loading