diff --git a/parquet/benches/metadata.rs b/parquet/benches/metadata.rs index 04feb0d900b0..43b08e6b26a4 100644 --- a/parquet/benches/metadata.rs +++ b/parquet/benches/metadata.rs @@ -20,8 +20,8 @@ use std::sync::Arc; use parquet::basic::{Encoding, PageType, Type as PhysicalType}; use parquet::file::metadata::{ - ColumnChunkMetaData, FileMetaData, PageEncodingStats, ParquetMetaData, ParquetMetaDataReader, - ParquetMetaDataWriter, RowGroupMetaData, + ColumnChunkMetaData, FileMetaData, PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions, + ParquetMetaDataReader, ParquetMetaDataWriter, RowGroupMetaData, }; use parquet::file::statistics::Statistics; use parquet::file::writer::TrackedWrite; @@ -164,12 +164,29 @@ fn criterion_benchmark(c: &mut Criterion) { }) }); + let schema = ParquetMetaDataReader::decode_schema(&meta_data).unwrap(); + let options = ParquetMetaDataOptions::new().with_schema(schema); + c.bench_function("decode metadata with schema", |b| { + b.iter(|| { + ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options)) + .unwrap(); + }) + }); + let buf: Bytes = black_box(encoded_meta()).into(); c.bench_function("decode parquet metadata (wide)", |b| { b.iter(|| { ParquetMetaDataReader::decode_metadata(&buf).unwrap(); }) }); + + let schema = ParquetMetaDataReader::decode_schema(&buf).unwrap(); + let options = ParquetMetaDataOptions::new().with_schema(schema); + c.bench_function("decode metadata (wide) with schema", |b| { + b.iter(|| { + ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap(); + }) + }); } criterion_group!(benches, criterion_benchmark); diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index e41515d613c9..0f0bcdb16139 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -38,7 +38,9 @@ use crate::column::page::{PageIterator, PageReader}; #[cfg(feature = "encryption")] use crate::encryption::decrypt::FileDecryptionProperties; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader}; +use crate::file::metadata::{ + PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader, +}; use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; @@ -387,6 +389,8 @@ pub struct ArrowReaderOptions { supplied_schema: Option, /// Policy for reading offset and column indexes. pub(crate) page_index_policy: PageIndexPolicy, + /// Options to control reading of Parquet metadata + metadata_options: ParquetMetaDataOptions, /// If encryption is enabled, the file decryption properties can be provided #[cfg(feature = "encryption")] pub(crate) file_decryption_properties: Option>, @@ -504,6 +508,16 @@ impl ArrowReaderOptions { } } + /// Provide a Parquet schema to use when decoding the metadata. The schema in the Parquet + /// footer will be skipped. + /// + /// This can be used to avoid reparsing the schema from the file when it is + /// already known. + pub fn with_parquet_schema(mut self, schema: Arc) -> Self { + self.metadata_options.set_schema(schema); + self + } + /// Provide the file decryption properties to use when reading encrypted parquet files. /// /// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided. @@ -525,6 +539,11 @@ impl ArrowReaderOptions { self.page_index_policy != PageIndexPolicy::Skip } + /// Retrieve the currently set metadata decoding options. + pub fn metadata_options(&self) -> &ParquetMetaDataOptions { + &self.metadata_options + } + /// Retrieve the currently set file decryption properties. /// /// This can be set via @@ -571,8 +590,9 @@ impl ArrowReaderMetadata { /// `Self::metadata` is missing the page index, this function will attempt /// to load the page index by making an object store request. pub fn load(reader: &T, options: ArrowReaderOptions) -> Result { - let metadata = - ParquetMetaDataReader::new().with_page_index_policy(options.page_index_policy); + let metadata = ParquetMetaDataReader::new() + .with_page_index_policy(options.page_index_policy) + .with_metadata_options(Some(options.metadata_options.clone())); #[cfg(feature = "encryption")] let metadata = metadata.with_decryption_properties( options.file_decryption_properties.as_ref().map(Arc::clone), @@ -1221,6 +1241,22 @@ mod tests { assert_eq!(original_schema.fields(), reader.schema().fields()); } + #[test] + fn test_reuse_schema() { + let file = get_test_file("parquet/alltypes-java.parquet"); + + let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap(); + let expected = builder.metadata; + let schema = expected.file_metadata().schema_descr_ptr(); + + let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone()); + let builder = + ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap(); + + // Verify that the metadata matches + assert_eq!(expected.as_ref(), builder.metadata.as_ref()); + } + #[test] fn test_arrow_reader_single_column() { let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet"); diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index c5badea7f32c..e1b096d36c3d 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -171,9 +171,12 @@ impl AsyncFileReader for T { options: Option<&'a ArrowReaderOptions>, ) -> BoxFuture<'a, Result>> { async move { - let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy( - PageIndexPolicy::from(options.is_some_and(|o| o.page_index())), - ); + let metadata_opts = options.map(|o| o.metadata_options().clone()); + let metadata_reader = ParquetMetaDataReader::new() + .with_page_index_policy(PageIndexPolicy::from( + options.is_some_and(|o| o.page_index()), + )) + .with_metadata_options(metadata_opts); #[cfg(feature = "encryption")] let metadata_reader = metadata_reader.with_decryption_properties( diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index 5ac21567a12d..efb3de0f22e8 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -199,7 +199,9 @@ impl AsyncFileReader for ParquetObjectReader { options: Option<&'a ArrowReaderOptions>, ) -> BoxFuture<'a, Result>> { Box::pin(async move { + let metadata_opts = options.map(|o| o.metadata_options().clone()); let mut metadata = ParquetMetaDataReader::new() + .with_metadata_options(metadata_opts) .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index)) .with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index)) .with_prefetch_hint(self.metadata_size_hint); diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 1c8f3e9c69ba..45b69a66799f 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -88,6 +88,7 @@ //! ``` mod footer_tail; mod memory; +mod options; mod parser; mod push_decoder; pub(crate) mod reader; @@ -127,6 +128,7 @@ use crate::{ }; pub use footer_tail::FooterTail; +pub use options::ParquetMetaDataOptions; pub use push_decoder::ParquetMetaDataPushDecoder; pub use reader::{PageIndexPolicy, ParquetMetaDataReader}; use std::io::Write; diff --git a/parquet/src/file/metadata/options.rs b/parquet/src/file/metadata/options.rs new file mode 100644 index 000000000000..bbc5314d3ac7 --- /dev/null +++ b/parquet/src/file/metadata/options.rs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options used to control metadata parsing + +use crate::schema::types::SchemaDescPtr; + +/// Options that can be set to control what parts of the Parquet file footer +/// metadata will be decoded and made present in the [`ParquetMetaData`] returned +/// by [`ParquetMetaDataReader`] and [`ParquetMetaDataPushDecoder`]. +/// +/// [`ParquetMetaData`]: crate::file::metadata::ParquetMetaData +/// [`ParquetMetaDataReader`]: crate::file::metadata::ParquetMetaDataReader +/// [`ParquetMetaDataPushDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder +#[derive(Default, Debug, Clone)] +pub struct ParquetMetaDataOptions { + schema_descr: Option, +} + +impl ParquetMetaDataOptions { + /// Return a new default [`ParquetMetaDataOptions`]. + pub fn new() -> Self { + Default::default() + } + + /// Returns an optional [`SchemaDescPtr`] to use when decoding. If this is not `None` then + /// the schema in the footer will be skipped. + pub fn schema(&self) -> Option<&SchemaDescPtr> { + self.schema_descr.as_ref() + } + + /// Provide a schema to use when decoding the metadata. + pub fn set_schema(&mut self, val: SchemaDescPtr) { + self.schema_descr = Some(val); + } + + /// Provide a schema to use when decoding the metadata. Returns `Self` for chaining. + pub fn with_schema(mut self, val: SchemaDescPtr) -> Self { + self.schema_descr = Some(val); + self + } +} + +#[cfg(test)] +mod tests { + use bytes::Bytes; + + use crate::{ + DecodeResult, + file::metadata::{ParquetMetaDataOptions, ParquetMetaDataPushDecoder}, + util::test_common::file_util::get_test_file, + }; + use std::{io::Read, sync::Arc}; + + #[test] + fn test_provide_schema() { + let mut buf: Vec = Vec::new(); + get_test_file("alltypes_plain.parquet") + .read_to_end(&mut buf) + .unwrap(); + + let data = Bytes::from(buf); + let mut decoder = ParquetMetaDataPushDecoder::try_new(data.len() as u64).unwrap(); + decoder + .push_range(0..data.len() as u64, data.clone()) + .unwrap(); + + let expected = match decoder.try_decode().unwrap() { + DecodeResult::Data(m) => m, + _ => panic!("could not parse metadata"), + }; + let expected_schema = expected.file_metadata().schema_descr_ptr(); + + let mut options = ParquetMetaDataOptions::new(); + options.set_schema(expected_schema); + let options = Arc::new(options); + + let mut decoder = ParquetMetaDataPushDecoder::try_new(data.len() as u64) + .unwrap() + .with_metadata_options(Some(options)); + decoder.push_range(0..data.len() as u64, data).unwrap(); + let metadata = match decoder.try_decode().unwrap() { + DecodeResult::Data(m) => m, + _ => panic!("could not parse metadata"), + }; + + assert_eq!(expected, metadata); + // the schema pointers should be the same + assert!(Arc::ptr_eq( + &expected.file_metadata().schema_descr_ptr(), + &metadata.file_metadata().schema_descr_ptr() + )); + } +} diff --git a/parquet/src/file/metadata/parser.rs b/parquet/src/file/metadata/parser.rs index cb7c67e9bf19..9df6bcdd7185 100644 --- a/parquet/src/file/metadata/parser.rs +++ b/parquet/src/file/metadata/parser.rs @@ -22,7 +22,9 @@ use crate::errors::ParquetError; use crate::file::metadata::thrift::parquet_metadata_from_bytes; -use crate::file::metadata::{ColumnChunkMetaData, PageIndexPolicy, ParquetMetaData}; +use crate::file::metadata::{ + ColumnChunkMetaData, PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, +}; use crate::file::page_index::column_index::ColumnIndexMetaData; use crate::file::page_index::index_reader::{decode_column_index, decode_offset_index}; @@ -51,6 +53,8 @@ mod inner { pub(crate) struct MetadataParser { // the credentials and keys needed to decrypt metadata file_decryption_properties: Option>, + // metadata parsing options + metadata_options: Option>, } impl MetadataParser { @@ -66,6 +70,16 @@ mod inner { self } + pub(crate) fn with_metadata_options( + self, + options: Option>, + ) -> Self { + Self { + metadata_options: options, + ..self + } + } + pub(crate) fn decode_metadata( &self, buf: &[u8], @@ -76,9 +90,10 @@ mod inner { self.file_decryption_properties.as_ref(), encrypted_footer, buf, + self.metadata_options.as_deref(), ) } else { - decode_metadata(buf) + decode_metadata(buf, self.metadata_options.as_deref()) } } } @@ -144,15 +159,28 @@ mod inner { mod inner { use super::*; use crate::errors::Result; + use std::sync::Arc; /// parallel implementation when encryption feature is not enabled /// /// This has the same API as the encryption-enabled version #[derive(Debug, Default)] - pub(crate) struct MetadataParser; + pub(crate) struct MetadataParser { + // metadata parsing options + metadata_options: Option>, + } impl MetadataParser { pub(crate) fn new() -> Self { - MetadataParser + MetadataParser::default() + } + + pub(crate) fn with_metadata_options( + self, + options: Option>, + ) -> Self { + Self { + metadata_options: options, + } } pub(crate) fn decode_metadata( @@ -165,7 +193,7 @@ mod inner { "Parquet file has an encrypted footer but the encryption feature is disabled" )) } else { - decode_metadata(buf) + decode_metadata(buf, self.metadata_options.as_deref()) } } } @@ -198,8 +226,11 @@ mod inner { /// by the [Parquet Spec]. /// /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata -pub(crate) fn decode_metadata(buf: &[u8]) -> crate::errors::Result { - parquet_metadata_from_bytes(buf) +pub(crate) fn decode_metadata( + buf: &[u8], + options: Option<&ParquetMetaDataOptions>, +) -> crate::errors::Result { + parquet_metadata_from_bytes(buf, options) } /// Parses column index from the provided bytes and adds it to the metadata. diff --git a/parquet/src/file/metadata/push_decoder.rs b/parquet/src/file/metadata/push_decoder.rs index 3393007f6451..34b7fec2c0c5 100644 --- a/parquet/src/file/metadata/push_decoder.rs +++ b/parquet/src/file/metadata/push_decoder.rs @@ -21,11 +21,12 @@ use crate::encryption::decrypt::FileDecryptionProperties; use crate::errors::{ParquetError, Result}; use crate::file::FOOTER_SIZE; use crate::file::metadata::parser::{MetadataParser, parse_column_index, parse_offset_index}; -use crate::file::metadata::{FooterTail, PageIndexPolicy, ParquetMetaData}; +use crate::file::metadata::{FooterTail, PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions}; use crate::file::page_index::index_reader::acc_range; use crate::file::reader::ChunkReader; use bytes::Bytes; use std::ops::Range; +use std::sync::Arc; /// A push decoder for [`ParquetMetaData`]. /// @@ -299,6 +300,12 @@ impl ParquetMetaDataPushDecoder { self } + /// Set the options to use when decoding the Parquet metadata. + pub fn with_metadata_options(mut self, options: Option>) -> Self { + self.metadata_parser = self.metadata_parser.with_metadata_options(options); + self + } + #[cfg(feature = "encryption")] /// Provide decryption properties for decoding encrypted Parquet files pub(crate) fn with_file_decryption_properties( diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 091895e65919..a18a5e68a9b5 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -20,9 +20,14 @@ use crate::encryption::decrypt::FileDecryptionProperties; use crate::errors::{ParquetError, Result}; use crate::file::FOOTER_SIZE; use crate::file::metadata::parser::decode_metadata; -use crate::file::metadata::{FooterTail, ParquetMetaData, ParquetMetaDataPushDecoder}; +use crate::file::metadata::thrift::parquet_schema_from_bytes; +use crate::file::metadata::{ + FooterTail, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataPushDecoder, +}; use crate::file::reader::ChunkReader; +use crate::schema::types::SchemaDescriptor; use bytes::Bytes; +use std::sync::Arc; use std::{io::Read, ops::Range}; use crate::DecodeResult; @@ -68,11 +73,12 @@ pub struct ParquetMetaDataReader { column_index: PageIndexPolicy, offset_index: PageIndexPolicy, prefetch_hint: Option, + metadata_options: Option>, // Size of the serialized thrift metadata plus the 8 byte footer. Only set if // `self.parse_metadata` is called. metadata_size: Option, #[cfg(feature = "encryption")] - file_decryption_properties: Option>, + file_decryption_properties: Option>, } /// Describes the policy for reading page indexes @@ -158,6 +164,12 @@ impl ParquetMetaDataReader { self } + /// Sets the [`ParquetMetaDataOptions`] to use when decoding + pub fn with_metadata_options(mut self, options: Option) -> Self { + self.metadata_options = options.map(Arc::new); + self + } + /// Provide a hint as to the number of bytes needed to fully parse the [`ParquetMetaData`]. /// Only used for the asynchronous [`Self::try_load()`] method. /// @@ -355,7 +367,8 @@ impl ParquetMetaDataReader { let push_decoder = ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)? .with_offset_index_policy(self.offset_index) - .with_column_index_policy(self.column_index); + .with_column_index_policy(self.column_index) + .with_metadata_options(self.metadata_options.clone()); let mut push_decoder = self.prepare_push_decoder(push_decoder); // Get bounds needed for page indexes (if any are present in the file). @@ -501,7 +514,8 @@ impl ParquetMetaDataReader { let file_size = u64::MAX; let push_decoder = ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)? .with_offset_index_policy(self.offset_index) - .with_column_index_policy(self.column_index); + .with_column_index_policy(self.column_index) + .with_metadata_options(self.metadata_options.clone()); let mut push_decoder = self.prepare_push_decoder(push_decoder); // Get bounds needed for page indexes (if any are present in the file). @@ -751,7 +765,8 @@ impl ParquetMetaDataReader { let push_decoder = ParquetMetaDataPushDecoder::try_new_with_footer_tail(file_size, footer_tail)? // NOTE: DO NOT enable page indexes here, they are handled separately - .with_page_index_policy(PageIndexPolicy::Skip); + .with_page_index_policy(PageIndexPolicy::Skip) + .with_metadata_options(self.metadata_options.clone()); let mut push_decoder = self.prepare_push_decoder(push_decoder); push_decoder.push_range(range, buf)?; @@ -795,7 +810,24 @@ impl ParquetMetaDataReader { /// /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata pub fn decode_metadata(buf: &[u8]) -> Result { - decode_metadata(buf) + decode_metadata(buf, None) + } + + /// Decodes [`ParquetMetaData`] from the provided bytes. + /// + /// Like [`Self::decode_metadata`] but this also accepts + /// metadata parsing options. + pub fn decode_metadata_with_options( + buf: &[u8], + options: Option<&ParquetMetaDataOptions>, + ) -> Result { + decode_metadata(buf, options) + } + + /// Decodes the schema from the Parquet footer in `buf`. Returned as + /// a [`SchemaDescriptor`]. + pub fn decode_schema(buf: &[u8]) -> Result> { + Ok(Arc::new(parquet_schema_from_bytes(buf)?)) } } diff --git a/parquet/src/file/metadata/thrift/encryption.rs b/parquet/src/file/metadata/thrift/encryption.rs index 9744f0f7a6b5..56c5a6a4b9da 100644 --- a/parquet/src/file/metadata/thrift/encryption.rs +++ b/parquet/src/file/metadata/thrift/encryption.rs @@ -23,7 +23,7 @@ use crate::{ file::{ column_crypto_metadata::ColumnCryptoMetaData, metadata::{ - HeapSize, ParquetMetaData, RowGroupMetaData, + HeapSize, ParquetMetaData, ParquetMetaDataOptions, RowGroupMetaData, thrift::{parquet_metadata_from_bytes, read_column_metadata, validate_column_metadata}, }, }, @@ -213,6 +213,7 @@ pub(crate) fn parquet_metadata_with_encryption( file_decryption_properties: Option<&Arc>, encrypted_footer: bool, buf: &[u8], + options: Option<&ParquetMetaDataOptions>, ) -> Result { use crate::file::metadata::ParquetMetaDataBuilder; @@ -262,7 +263,7 @@ pub(crate) fn parquet_metadata_with_encryption( } } - let parquet_meta = parquet_metadata_from_bytes(buf) + let parquet_meta = parquet_metadata_from_bytes(buf, options) .map_err(|e| general_err!("Could not parse metadata: {}", e))?; let ParquetMetaData { diff --git a/parquet/src/file/metadata/thrift/mod.rs b/parquet/src/file/metadata/thrift/mod.rs index 14774910961f..175a152839b4 100644 --- a/parquet/src/file/metadata/thrift/mod.rs +++ b/parquet/src/file/metadata/thrift/mod.rs @@ -43,8 +43,8 @@ use crate::{ file::{ metadata::{ ColumnChunkMetaData, ColumnChunkMetaDataBuilder, KeyValue, LevelHistogram, - PageEncodingStats, ParquetMetaData, RowGroupMetaData, RowGroupMetaDataBuilder, - SortingColumn, + PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions, RowGroupMetaData, + RowGroupMetaDataBuilder, SortingColumn, }, statistics::ValueStatistics, }, @@ -669,9 +669,37 @@ fn read_row_group( Ok(row_group) } +/// Create a [`SchemaDescriptor`] from thrift input. The input buffer must contain a complete +/// Parquet footer. +pub(crate) fn parquet_schema_from_bytes(buf: &[u8]) -> Result { + let mut prot = ThriftSliceInputProtocol::new(buf); + + let mut last_field_id = 0i16; + loop { + let field_ident = prot.read_field_begin(last_field_id)?; + if field_ident.field_type == FieldType::Stop { + break; + } + match field_ident.id { + 2 => { + // read schema and convert to SchemaDescriptor for use when reading row groups + let val = read_thrift_vec::(&mut prot)?; + let val = parquet_schema_from_array(val)?; + return Ok(SchemaDescriptor::new(val)); + } + _ => prot.skip(field_ident.field_type)?, + } + last_field_id = field_ident.id; + } + Err(general_err!("Input does not contain a schema")) +} + /// Create [`ParquetMetaData`] from thrift input. Note that this only decodes the file metadata in /// the Parquet footer. Page indexes will need to be added later. -pub(crate) fn parquet_metadata_from_bytes(buf: &[u8]) -> Result { +pub(crate) fn parquet_metadata_from_bytes( + buf: &[u8], + options: Option<&ParquetMetaDataOptions>, +) -> Result { let mut prot = ThriftSliceInputProtocol::new(buf); // begin reading the file metadata @@ -689,6 +717,11 @@ pub(crate) fn parquet_metadata_from_bytes(buf: &[u8]) -> Result // this will need to be set before parsing row groups let mut schema_descr: Option> = None; + // see if we already have a schema. + if let Some(options) = options { + schema_descr = options.schema().cloned(); + } + // struct FileMetaData { // 1: required i32 version // 2: required list schema; @@ -711,10 +744,16 @@ pub(crate) fn parquet_metadata_from_bytes(buf: &[u8]) -> Result version = Some(i32::read_thrift(&mut prot)?); } 2 => { - // read schema and convert to SchemaDescriptor for use when reading row groups - let val = read_thrift_vec::(&mut prot)?; - let val = parquet_schema_from_array(val)?; - schema_descr = Some(Arc::new(SchemaDescriptor::new(val))); + // If schema was passed in, skip parsing it + if schema_descr.is_some() { + prot.skip(field_ident.field_type)?; + } else { + // read schema and convert to SchemaDescriptor for use when reading row groups + let val = + read_thrift_vec::(&mut prot)?; + let val = parquet_schema_from_array(val)?; + schema_descr = Some(Arc::new(SchemaDescriptor::new(val))); + } } 3 => { num_rows = Some(i64::read_thrift(&mut prot)?); diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 1b866a45cfa9..8326b14a36e9 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -38,7 +38,7 @@ use crate::parquet_thrift::ThriftSliceInputProtocol; use crate::parquet_thrift::{ReadThrift, ThriftReadInputProtocol}; use crate::record::Row; use crate::record::reader::RowIter; -use crate::schema::types::Type as SchemaType; +use crate::schema::types::{SchemaDescPtr, Type as SchemaType}; use bytes::Bytes; use std::collections::VecDeque; use std::{fs::File, io::Read, path::Path, sync::Arc}; @@ -110,6 +110,7 @@ pub struct ReadOptionsBuilder { predicates: Vec, enable_page_index: bool, props: Option, + metadata_options: ParquetMetaDataOptions, } impl ReadOptionsBuilder { @@ -152,6 +153,13 @@ impl ReadOptionsBuilder { self } + /// Provide a Parquet schema to use when decoding the metadata. The schema in the Parquet + /// footer will be skipped. + pub fn with_parquet_schema(mut self, schema: SchemaDescPtr) -> Self { + self.metadata_options.set_schema(schema); + self + } + /// Seal the builder and return the read options pub fn build(self) -> ReadOptions { let props = self @@ -161,18 +169,20 @@ impl ReadOptionsBuilder { predicates: self.predicates, enable_page_index: self.enable_page_index, props, + metadata_options: self.metadata_options, } } } /// A collection of options for reading a Parquet file. /// -/// Currently, only predicates on row group metadata are supported. +/// Predicates are currently only supported on row group metadata. /// All predicates will be chained using 'AND' to filter the row groups. pub struct ReadOptions { predicates: Vec, enable_page_index: bool, props: ReaderProperties, + metadata_options: ParquetMetaDataOptions, } impl SerializedFileReader { @@ -193,6 +203,7 @@ impl SerializedFileReader { #[allow(deprecated)] pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result { let mut metadata_builder = ParquetMetaDataReader::new() + .with_metadata_options(Some(options.metadata_options.clone())) .parse_and_finish(&chunk_reader)? .into_builder(); let mut predicates = options.predicates; @@ -2697,6 +2708,26 @@ mod tests { } } + #[test] + fn test_reuse_schema() { + let file = get_test_file("alltypes_plain.parquet"); + let file_reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap(); + let schema = file_reader.metadata().file_metadata().schema_descr_ptr(); + let expected = file_reader.metadata; + + let options = ReadOptionsBuilder::new() + .with_parquet_schema(schema) + .build(); + let file_reader = SerializedFileReader::new_with_options(file, options).unwrap(); + + assert_eq!(expected.as_ref(), file_reader.metadata.as_ref()); + // Should have used the same schema instance + assert!(Arc::ptr_eq( + &expected.file_metadata().schema_descr_ptr(), + &file_reader.metadata.file_metadata().schema_descr_ptr() + )); + } + #[test] fn test_read_unknown_logical_type() { let file = get_test_file("unknown-logical-type.parquet");