From f03bd832d22c2e29954af5ad54319d65c5241114 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 29 May 2023 22:12:06 +0100 Subject: [PATCH 1/3] Expose page-level arrow reader API (#4298) --- parquet/src/arrow/array_reader/builder.rs | 20 ++++---- parquet/src/arrow/array_reader/list_array.rs | 4 +- parquet/src/arrow/array_reader/mod.rs | 26 +++------- parquet/src/arrow/arrow_reader/mod.rs | 39 ++++++++++---- parquet/src/arrow/async_reader/mod.rs | 14 ++--- parquet/src/arrow/mod.rs | 3 +- parquet/src/arrow/schema/complex.rs | 10 ++-- parquet/src/arrow/schema/mod.rs | 54 ++++++++++++++++---- 8 files changed, 107 insertions(+), 63 deletions(-) diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 5e0d05e8953c..bb3f403358ee 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -24,7 +24,7 @@ use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_ use crate::arrow::array_reader::{ make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader, FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader, - PrimitiveArrayReader, RowGroupCollection, StructArrayReader, + PrimitiveArrayReader, RowGroups, StructArrayReader, }; use crate::arrow::schema::{ParquetField, ParquetFieldType}; use crate::arrow::ProjectionMask; @@ -39,7 +39,7 @@ use crate::schema::types::{ColumnDescriptor, ColumnPath, Type}; pub fn build_array_reader( field: Option<&ParquetField>, mask: &ProjectionMask, - row_groups: &dyn RowGroupCollection, + row_groups: &dyn RowGroups, ) -> Result> { let reader = field .and_then(|field| build_reader(field, mask, row_groups).transpose()) @@ -52,7 +52,7 @@ pub fn build_array_reader( fn build_reader( field: &ParquetField, mask: &ProjectionMask, - row_groups: &dyn RowGroupCollection, + row_groups: &dyn RowGroups, ) -> Result>> { match field.field_type { ParquetFieldType::Primitive { .. } => { @@ -75,7 +75,7 @@ fn build_reader( fn build_map_reader( field: &ParquetField, mask: &ProjectionMask, - row_groups: &dyn RowGroupCollection, + row_groups: &dyn RowGroups, ) -> Result>> { let children = field.children().unwrap(); assert_eq!(children.len(), 2); @@ -127,7 +127,7 @@ fn build_list_reader( field: &ParquetField, mask: &ProjectionMask, is_large: bool, - row_groups: &dyn RowGroupCollection, + row_groups: &dyn RowGroups, ) -> Result>> { let children = field.children().unwrap(); assert_eq!(children.len(), 1); @@ -173,7 +173,7 @@ fn build_list_reader( fn build_fixed_size_list_reader( field: &ParquetField, mask: &ProjectionMask, - row_groups: &dyn RowGroupCollection, + row_groups: &dyn RowGroups, ) -> Result>> { let children = field.children().unwrap(); assert_eq!(children.len(), 1); @@ -210,7 +210,7 @@ fn build_fixed_size_list_reader( fn build_primitive_reader( field: &ParquetField, mask: &ProjectionMask, - row_groups: &dyn RowGroupCollection, + row_groups: &dyn RowGroups, ) -> Result>> { let (col_idx, primitive_type) = match &field.field_type { ParquetFieldType::Primitive { @@ -301,7 +301,7 @@ fn build_primitive_reader( fn build_struct_reader( field: &ParquetField, mask: &ProjectionMask, - row_groups: &dyn RowGroupCollection, + row_groups: &dyn RowGroups, ) -> Result>> { let arrow_fields = match &field.arrow_type { DataType::Struct(children) => children, @@ -338,7 +338,7 @@ fn build_struct_reader( #[cfg(test)] mod tests { use super::*; - use crate::arrow::schema::parquet_to_array_schema_and_fields; + use crate::arrow::schema::parquet_to_arrow_schema_and_fields; use crate::file::reader::{FileReader, SerializedFileReader}; use crate::util::test_common::file_util::get_test_file; use arrow::datatypes::Field; @@ -352,7 +352,7 @@ mod tests { let file_metadata = file_reader.metadata().file_metadata(); let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]); - let (_, fields) = parquet_to_array_schema_and_fields( + let (_, fields) = parquet_to_arrow_schema_and_fields( file_metadata.schema_descr(), ProjectionMask::all(), file_metadata.key_value_metadata(), diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 932034417c81..7c66c5c23112 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -250,7 +250,7 @@ mod tests { use crate::arrow::array_reader::build_array_reader; use crate::arrow::array_reader::list_array::ListArrayReader; use crate::arrow::array_reader::test_util::InMemoryArrayReader; - use crate::arrow::schema::parquet_to_array_schema_and_fields; + use crate::arrow::schema::parquet_to_arrow_schema_and_fields; use crate::arrow::{parquet_to_arrow_schema, ArrowWriter, ProjectionMask}; use crate::file::properties::WriterProperties; use crate::file::reader::{FileReader, SerializedFileReader}; @@ -566,7 +566,7 @@ mod tests { let file_metadata = file_reader.metadata().file_metadata(); let schema = file_metadata.schema_descr(); let mask = ProjectionMask::leaves(schema, vec![0]); - let (_, fields) = parquet_to_array_schema_and_fields( + let (_, fields) = parquet_to_arrow_schema_and_fields( schema, ProjectionMask::all(), file_metadata.key_value_metadata(), diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index 823084b43207..1e781fb73ce5 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -28,7 +28,6 @@ use crate::arrow::record_reader::GenericRecordReader; use crate::column::page::PageIterator; use crate::column::reader::decoder::ColumnValueDecoder; use crate::file::reader::{FilePageIterator, FileReader}; -use crate::schema::types::SchemaDescPtr; mod builder; mod byte_array; @@ -100,22 +99,15 @@ pub trait ArrayReader: Send { } /// A collection of row groups -pub trait RowGroupCollection { - /// Get schema of parquet file. - fn schema(&self) -> SchemaDescPtr; - +pub trait RowGroups { /// Get the number of rows in this collection fn num_rows(&self) -> usize; - /// Returns an iterator over the column chunks for particular column + /// Returns a [`PageIterator`] for the column chunks with the given leaf column index fn column_chunks(&self, i: usize) -> Result>; } -impl RowGroupCollection for Arc { - fn schema(&self) -> SchemaDescPtr { - self.metadata().file_metadata().schema_descr_ptr() - } - +impl RowGroups for Arc { fn num_rows(&self) -> usize { self.metadata().file_metadata().num_rows() as usize } @@ -126,26 +118,22 @@ impl RowGroupCollection for Arc { } } -pub(crate) struct FileReaderRowGroupCollection { +pub(crate) struct FileReaderRowGroups { /// The underling file reader reader: Arc, /// Optional list of row group indices to scan row_groups: Option>, } -impl FileReaderRowGroupCollection { - /// Creates a new [`RowGroupCollection`] from a `FileReader` and an optional +impl FileReaderRowGroups { + /// Creates a new [`RowGroups`] from a `FileReader` and an optional /// list of row group indexes to scan pub fn new(reader: Arc, row_groups: Option>) -> Self { Self { reader, row_groups } } } -impl RowGroupCollection for FileReaderRowGroupCollection { - fn schema(&self) -> SchemaDescPtr { - self.reader.metadata().file_metadata().schema_descr_ptr() - } - +impl RowGroups for FileReaderRowGroups { fn num_rows(&self) -> usize { match &self.row_groups { None => self.reader.metadata().file_metadata().num_rows() as usize, diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 4b14a54c531b..af2179e01bfe 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -26,12 +26,9 @@ use arrow_array::{RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; use arrow_select::filter::prep_null_mask_filter; -use crate::arrow::array_reader::{ - build_array_reader, ArrayReader, FileReaderRowGroupCollection, RowGroupCollection, -}; -use crate::arrow::schema::parquet_to_array_schema_and_fields; -use crate::arrow::schema::ParquetField; -use crate::arrow::ProjectionMask; +use crate::arrow::array_reader::{build_array_reader, ArrayReader, FileReaderRowGroups}; +use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField}; +use crate::arrow::{FieldLevels, ProjectionMask}; use crate::errors::{ParquetError, Result}; use crate::file::metadata::ParquetMetaData; use crate::file::reader::{ChunkReader, SerializedFileReader}; @@ -41,6 +38,7 @@ use crate::schema::types::SchemaDescriptor; mod filter; mod selection; +pub use crate::arrow::array_reader::RowGroups; pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; pub use selection::{RowSelection, RowSelector}; @@ -87,7 +85,7 @@ impl ArrowReaderBuilder { false => metadata.file_metadata().key_value_metadata(), }; - let (schema, fields) = parquet_to_array_schema_and_fields( + let (schema, fields) = parquet_to_arrow_schema_and_fields( metadata.file_metadata().schema_descr(), ProjectionMask::all(), kv_metadata, @@ -269,8 +267,7 @@ impl ArrowReaderBuilder> { /// /// Note: this will eagerly evaluate any `RowFilter` before returning pub fn build(self) -> Result { - let reader = - FileReaderRowGroupCollection::new(Arc::new(self.input.0), self.row_groups); + let reader = FileReaderRowGroups::new(Arc::new(self.input.0), self.row_groups); let mut filter = self.filter; let mut selection = self.selection; @@ -420,6 +417,30 @@ impl ParquetRecordBatchReader { .build() } + /// Create a new [`ParquetRecordBatchReader`] from the provided [`RowGroups`] + /// + /// Note: this is a low-level interface see [`ParquetRecordBatchReader::try_new`] for a + /// higher-level interface for reading parquet data from a file + pub fn try_new_with_row_groups( + levels: &FieldLevels, + row_groups: &dyn RowGroups, + batch_size: usize, + selection: Option, + ) -> Result { + let array_reader = build_array_reader( + levels.levels.as_ref(), + &ProjectionMask::all(), + row_groups, + )?; + + Ok(Self { + batch_size, + array_reader, + schema: Arc::new(Schema::new(levels.fields.clone())), + selection: selection.map(|s| s.trim().into()), + }) + } + /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None` /// all rows will be returned diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index fb81a2b5d966..f9fedf86f6e0 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -94,12 +94,11 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; -use crate::arrow::array_reader::{build_array_reader, RowGroupCollection}; +use crate::arrow::array_reader::{build_array_reader, RowGroups}; use crate::arrow::arrow_reader::{ apply_range, evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader, RowFilter, RowSelection, }; -use crate::arrow::schema::ParquetField; use crate::arrow::ProjectionMask; use crate::column::page::{PageIterator, PageReader}; @@ -120,6 +119,7 @@ pub use metadata::*; #[cfg(feature = "object_store")] mod store; +use crate::arrow::schema::ParquetField; #[cfg(feature = "object_store")] pub use store::*; @@ -648,11 +648,7 @@ impl<'a> InMemoryRowGroup<'a> { } } -impl<'a> RowGroupCollection for InMemoryRowGroup<'a> { - fn schema(&self) -> SchemaDescPtr { - self.metadata.schema_descr_ptr() - } - +impl<'a> RowGroups for InMemoryRowGroup<'a> { fn num_rows(&self) -> usize { self.row_count } @@ -768,7 +764,7 @@ mod tests { use crate::arrow::arrow_reader::{ ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector, }; - use crate::arrow::schema::parquet_to_array_schema_and_fields; + use crate::arrow::schema::parquet_to_arrow_schema_and_fields; use crate::arrow::ArrowWriter; use crate::file::footer::parse_metadata; use crate::file::page_index::index_reader; @@ -1413,7 +1409,7 @@ mod tests { }; let requests = async_reader.requests.clone(); - let (_, fields) = parquet_to_array_schema_and_fields( + let (_, fields) = parquet_to_arrow_schema_and_fields( metadata.file_metadata().schema_descr(), ProjectionMask::all(), None, diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index e5211ec23931..aad4925c7c70 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -123,7 +123,8 @@ pub use self::async_writer::AsyncArrowWriter; use crate::schema::types::SchemaDescriptor; pub use self::schema::{ - arrow_to_parquet_schema, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, + arrow_to_parquet_schema, parquet_to_arrow_field_levels, parquet_to_arrow_schema, + parquet_to_arrow_schema_by_columns, FieldLevels, }; /// Schema metadata key used to store serialized Arrow IPC schema diff --git a/parquet/src/arrow/schema/complex.rs b/parquet/src/arrow/schema/complex.rs index c1699aafcfe8..fab6e7f96944 100644 --- a/parquet/src/arrow/schema/complex.rs +++ b/parquet/src/arrow/schema/complex.rs @@ -24,7 +24,7 @@ use crate::basic::{ConvertedType, Repetition}; use crate::errors::ParquetError; use crate::errors::Result; use crate::schema::types::{SchemaDescriptor, Type, TypePtr}; -use arrow_schema::{DataType, Field, Schema, SchemaBuilder}; +use arrow_schema::{DataType, Field, Fields, SchemaBuilder}; fn get_repetition(t: &Type) -> Repetition { let info = t.get_basic_info(); @@ -35,6 +35,7 @@ fn get_repetition(t: &Type) -> Repetition { } /// Representation of a parquet file, in terms of arrow schema elements +#[derive(Debug, Clone)] pub struct ParquetField { /// The level which represents an insertion into the current list /// i.e. guaranteed to be > 0 for a list type @@ -82,6 +83,7 @@ impl ParquetField { } } +#[derive(Debug, Clone)] pub enum ParquetFieldType { Primitive { /// The index of the column in parquet @@ -554,13 +556,13 @@ fn convert_field( /// Computes the [`ParquetField`] for the provided [`SchemaDescriptor`] with `leaf_columns` listing /// the indexes of leaf columns to project, and `embedded_arrow_schema` the optional -/// [`Schema`] embedded in the parquet metadata +/// [`Fields`] embedded in the parquet metadata /// /// Note: This does not support out of order column projection pub fn convert_schema( schema: &SchemaDescriptor, mask: ProjectionMask, - embedded_arrow_schema: Option<&Schema>, + embedded_arrow_schema: Option<&Fields>, ) -> Result> { let mut visitor = Visitor { next_col_idx: 0, @@ -570,7 +572,7 @@ pub fn convert_schema( let context = VisitorContext { rep_level: 0, def_level: 0, - data_type: embedded_arrow_schema.map(|s| DataType::Struct(s.fields().clone())), + data_type: embedded_arrow_schema.map(|fields| DataType::Struct(fields.clone())), }; visitor.dispatch(&schema.root_schema_ptr(), context) diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 399dcba9e981..0305c8362aea 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -45,7 +45,8 @@ mod primitive; use crate::arrow::ProjectionMask; pub(crate) use complex::{ParquetField, ParquetFieldType}; -/// Convert Parquet schema to Arrow schema including optional metadata. +/// Convert Parquet schema to Arrow schema including optional metadata +/// /// Attempts to decode any existing Arrow schema metadata, falling back /// to converting the Parquet schema column-wise pub fn parquet_to_arrow_schema( @@ -66,11 +67,11 @@ pub fn parquet_to_arrow_schema_by_columns( mask: ProjectionMask, key_value_metadata: Option<&Vec>, ) -> Result { - Ok(parquet_to_array_schema_and_fields(parquet_schema, mask, key_value_metadata)?.0) + Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata)?.0) } /// Extracts the arrow metadata -pub(crate) fn parquet_to_array_schema_and_fields( +pub(crate) fn parquet_to_arrow_schema_and_fields( parquet_schema: &SchemaDescriptor, mask: ProjectionMask, key_value_metadata: Option<&Vec>, @@ -88,15 +89,50 @@ pub(crate) fn parquet_to_array_schema_and_fields( }); } - match complex::convert_schema(parquet_schema, mask, maybe_schema.as_ref())? { + let hint = maybe_schema.as_ref().map(|s| s.fields()); + let field_levels = parquet_to_arrow_field_levels(parquet_schema, mask, hint)?; + let schema = Schema::new_with_metadata(field_levels.fields, metadata); + Ok((schema, field_levels.levels)) +} + +/// Stores the parquet level information for a set of arrow [`Fields`] +#[derive(Debug, Clone)] +pub struct FieldLevels { + pub(crate) fields: Fields, + pub(crate) levels: Option, +} + +/// Convert a parquet [`SchemaDescriptor`] to its corresponding arrow representation +/// +/// Columns not included within [`ProjectionMask`] will be ignored. +/// +/// Where a field type in `hint` is compatible with the corresponding parquet type in `schema`, it +/// will be used, otherwise the default arrow type for the given parquet column type will be used. +/// +/// This is to accommodate arrow types that cannot be round-tripped through parquet natively. +/// Depending on the parquet writer, this can lead to a mismatch between a file's parquet schema +/// and its embedded arrow schema. The parquet `schema` must be treated as authoritative in such +/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for more information +/// +/// Note: this is a low-level API, most users will want to make use of the higher-level +/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file. +pub fn parquet_to_arrow_field_levels( + schema: &SchemaDescriptor, + mask: ProjectionMask, + hint: Option<&Fields>, +) -> Result { + match complex::convert_schema(schema, mask, hint)? { Some(field) => match &field.arrow_type { - DataType::Struct(fields) => Ok(( - Schema::new_with_metadata(fields.clone(), metadata), - Some(field), - )), + DataType::Struct(fields) => Ok(FieldLevels { + fields: fields.clone(), + levels: Some(field), + }), _ => unreachable!(), }, - None => Ok((Schema::new_with_metadata(Fields::empty(), metadata), None)), + None => Ok(FieldLevels { + fields: Fields::empty(), + levels: None, + }), } } From ad80d20e80c7e38f1df99123709fb46c4980f27f Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 30 May 2023 11:21:37 +0100 Subject: [PATCH 2/3] Make scan_ranges public --- parquet/src/arrow/arrow_reader/selection.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 76f950620688..a558f893c43e 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -173,9 +173,14 @@ impl RowSelection { } } - /// Given an offset index, return the offset ranges for all data pages selected by `self` - #[cfg(any(test, feature = "async"))] - pub(crate) fn scan_ranges( + /// Given an offset index, return the byte ranges for all data pages selected by `self` + /// + /// This is useful for determining what byte ranges to fetch from underlying storage + /// + /// Note: this method does not make any effort to combine consecutive ranges, nor coalesce + /// ranges that are close together. This is instead delegated to the IO subsystem to optimise, + /// e.g. [`ObjectStore::get_ranges`](object_store::ObjectStore::get_ranges) + pub fn scan_ranges( &self, page_locations: &[crate::format::PageLocation], ) -> Vec> { From de965b9905ef387e60e81f3ee4abc7ba0ffdb568 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 31 May 2023 11:02:16 +0100 Subject: [PATCH 3/3] Review feedback --- parquet/src/arrow/schema/complex.rs | 2 +- parquet/src/arrow/schema/mod.rs | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/parquet/src/arrow/schema/complex.rs b/parquet/src/arrow/schema/complex.rs index fab6e7f96944..0d19875d97de 100644 --- a/parquet/src/arrow/schema/complex.rs +++ b/parquet/src/arrow/schema/complex.rs @@ -34,7 +34,7 @@ fn get_repetition(t: &Type) -> Repetition { } } -/// Representation of a parquet file, in terms of arrow schema elements +/// Representation of a parquet schema element, in terms of arrow schema elements #[derive(Debug, Clone)] pub struct ParquetField { /// The level which represents an insertion into the current list diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 0305c8362aea..48c7838eede3 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -95,14 +95,20 @@ pub(crate) fn parquet_to_arrow_schema_and_fields( Ok((schema, field_levels.levels)) } -/// Stores the parquet level information for a set of arrow [`Fields`] +/// Schema information necessary to decode a parquet file as arrow [`Fields`] +/// +/// In particular this stores the dremel-level information necessary to correctly +/// interpret the encoded definition and repetition levels +/// +/// Note: this is an opaque container intended to be used with lower-level APIs +/// within this crate #[derive(Debug, Clone)] pub struct FieldLevels { pub(crate) fields: Fields, pub(crate) levels: Option, } -/// Convert a parquet [`SchemaDescriptor`] to its corresponding arrow representation +/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`] /// /// Columns not included within [`ProjectionMask`] will be ignored. ///