Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions parquet/benches/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use std::sync::Arc;

use parquet::basic::{Encoding, PageType, Type as PhysicalType};
use parquet::file::metadata::{
ColumnChunkMetaData, FileMetaData, PageEncodingStats, ParquetMetaData, ParquetMetaDataReader,
ParquetMetaDataWriter, RowGroupMetaData,
ColumnChunkMetaData, FileMetaData, PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions,
ParquetMetaDataReader, ParquetMetaDataWriter, RowGroupMetaData,
};
use parquet::file::statistics::Statistics;
use parquet::file::writer::TrackedWrite;
Expand Down Expand Up @@ -164,12 +164,29 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});

let schema = ParquetMetaDataReader::decode_schema(&meta_data).unwrap();
let options = ParquetMetaDataOptions::new().with_schema(schema);
c.bench_function("decode metadata with schema", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata_with_options(&meta_data, Some(&options))
.unwrap();
})
});

let buf: Bytes = black_box(encoded_meta()).into();
c.bench_function("decode parquet metadata (wide)", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata(&buf).unwrap();
})
});

let schema = ParquetMetaDataReader::decode_schema(&buf).unwrap();
let options = ParquetMetaDataOptions::new().with_schema(schema);
c.bench_function("decode metadata (wide) with schema", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap();
})
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
42 changes: 39 additions & 3 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ use crate::column::page::{PageIterator, PageReader};
#[cfg(feature = "encryption")]
use crate::encryption::decrypt::FileDecryptionProperties;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
use crate::file::metadata::{
PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
};
use crate::file::reader::{ChunkReader, SerializedPageReader};
use crate::schema::types::SchemaDescriptor;

Expand Down Expand Up @@ -387,6 +389,8 @@ pub struct ArrowReaderOptions {
supplied_schema: Option<SchemaRef>,
/// Policy for reading offset and column indexes.
pub(crate) page_index_policy: PageIndexPolicy,
/// Options to control reading of Parquet metadata
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed the ArrowReaderOptions and ArrowReaderMetadata structures and their use, and I agree this is the appropriate structure to add metadata parsing to.

Do you think it eventually makes sense to move the other fields from ArrowReaderOptions to ParquetMetaDataOptions? (e.g. supplied_schema)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking perhaps the page_index_policy, but the other things in ArrowReaderOptions are more Arrow specific rather than Parquet. That might get confusing.

metadata_options: ParquetMetaDataOptions,
/// If encryption is enabled, the file decryption properties can be provided
#[cfg(feature = "encryption")]
pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
Expand Down Expand Up @@ -504,6 +508,16 @@ impl ArrowReaderOptions {
}
}

/// Provide a Parquet schema to use when decoding the metadata. The schema in the Parquet
/// footer will be skipped.
///
/// This can be used to avoid reparsing the schema from the file when it is
/// already known.
pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
self.metadata_options.set_schema(schema);
self
}

/// Provide the file decryption properties to use when reading encrypted parquet files.
///
/// If encryption is enabled and the file is encrypted, the `file_decryption_properties` must be provided.
Expand All @@ -525,6 +539,11 @@ impl ArrowReaderOptions {
self.page_index_policy != PageIndexPolicy::Skip
}

/// Retrieve the currently set metadata decoding options.
pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
&self.metadata_options
}

/// Retrieve the currently set file decryption properties.
///
/// This can be set via
Expand Down Expand Up @@ -571,8 +590,9 @@ impl ArrowReaderMetadata {
/// `Self::metadata` is missing the page index, this function will attempt
/// to load the page index by making an object store request.
pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
let metadata =
ParquetMetaDataReader::new().with_page_index_policy(options.page_index_policy);
let metadata = ParquetMetaDataReader::new()
.with_page_index_policy(options.page_index_policy)
.with_metadata_options(Some(options.metadata_options.clone()));
#[cfg(feature = "encryption")]
let metadata = metadata.with_decryption_properties(
options.file_decryption_properties.as_ref().map(Arc::clone),
Expand Down Expand Up @@ -1221,6 +1241,22 @@ mod tests {
assert_eq!(original_schema.fields(), reader.schema().fields());
}

#[test]
fn test_reuse_schema() {
let file = get_test_file("parquet/alltypes-java.parquet");

let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
let expected = builder.metadata;
let schema = expected.file_metadata().schema_descr_ptr();

let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
let builder =
ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();

// Verify that the metadata matches
assert_eq!(expected.as_ref(), builder.metadata.as_ref());
}

#[test]
fn test_arrow_reader_single_column() {
let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
Expand Down
9 changes: 6 additions & 3 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,12 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
async move {
let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
);
let metadata_opts = options.map(|o| o.metadata_options().clone());
let metadata_reader = ParquetMetaDataReader::new()
.with_page_index_policy(PageIndexPolicy::from(
options.is_some_and(|o| o.page_index()),
))
.with_metadata_options(metadata_opts);

#[cfg(feature = "encryption")]
let metadata_reader = metadata_reader.with_decryption_properties(
Expand Down
2 changes: 2 additions & 0 deletions parquet/src/arrow/async_reader/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ impl AsyncFileReader for ParquetObjectReader {
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let metadata_opts = options.map(|o| o.metadata_options().clone());
let mut metadata = ParquetMetaDataReader::new()
.with_metadata_options(metadata_opts)
.with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
.with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index))
.with_prefetch_hint(self.metadata_size_hint);
Expand Down
2 changes: 2 additions & 0 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
//! ```
mod footer_tail;
mod memory;
mod options;
mod parser;
mod push_decoder;
pub(crate) mod reader;
Expand Down Expand Up @@ -127,6 +128,7 @@ use crate::{
};

pub use footer_tail::FooterTail;
pub use options::ParquetMetaDataOptions;
pub use push_decoder::ParquetMetaDataPushDecoder;
pub use reader::{PageIndexPolicy, ParquetMetaDataReader};
use std::io::Write;
Expand Down
108 changes: 108 additions & 0 deletions parquet/src/file/metadata/options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Options used to control metadata parsing
use crate::schema::types::SchemaDescPtr;

/// Options that can be set to control what parts of the Parquet file footer
/// metadata will be decoded and made present in the [`ParquetMetaData`] returned
/// by [`ParquetMetaDataReader`] and [`ParquetMetaDataPushDecoder`].
///
/// [`ParquetMetaData`]: crate::file::metadata::ParquetMetaData
/// [`ParquetMetaDataReader`]: crate::file::metadata::ParquetMetaDataReader
/// [`ParquetMetaDataPushDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
#[derive(Default, Debug, Clone)]
pub struct ParquetMetaDataOptions {
schema_descr: Option<SchemaDescPtr>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this means (1) User provided schema or (2) only (min, max, etc) columns in schema_descr be decoded?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's (1). Say you have a large number of files that share the same schema, there's no need to decode them all. Just grab the schema from the first file and use it for all the others.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a ticket that explains the use case a bit more;

}

impl ParquetMetaDataOptions {
/// Return a new default [`ParquetMetaDataOptions`].
pub fn new() -> Self {
Default::default()
}

/// Returns an optional [`SchemaDescPtr`] to use when decoding. If this is not `None` then
/// the schema in the footer will be skipped.
pub fn schema(&self) -> Option<&SchemaDescPtr> {
self.schema_descr.as_ref()
}

/// Provide a schema to use when decoding the metadata.
pub fn set_schema(&mut self, val: SchemaDescPtr) {
self.schema_descr = Some(val);
}

/// Provide a schema to use when decoding the metadata. Returns `Self` for chaining.
pub fn with_schema(mut self, val: SchemaDescPtr) -> Self {
self.schema_descr = Some(val);
self
}
}

#[cfg(test)]
mod tests {
use bytes::Bytes;

use crate::{
DecodeResult,
file::metadata::{ParquetMetaDataOptions, ParquetMetaDataPushDecoder},
util::test_common::file_util::get_test_file,
};
use std::{io::Read, sync::Arc};

#[test]
fn test_provide_schema() {
let mut buf: Vec<u8> = Vec::new();
get_test_file("alltypes_plain.parquet")
.read_to_end(&mut buf)
.unwrap();

let data = Bytes::from(buf);
let mut decoder = ParquetMetaDataPushDecoder::try_new(data.len() as u64).unwrap();
decoder
.push_range(0..data.len() as u64, data.clone())
.unwrap();

let expected = match decoder.try_decode().unwrap() {
DecodeResult::Data(m) => m,
_ => panic!("could not parse metadata"),
};
let expected_schema = expected.file_metadata().schema_descr_ptr();

let mut options = ParquetMetaDataOptions::new();
options.set_schema(expected_schema);
let options = Arc::new(options);

let mut decoder = ParquetMetaDataPushDecoder::try_new(data.len() as u64)
.unwrap()
.with_metadata_options(Some(options));
decoder.push_range(0..data.len() as u64, data).unwrap();
let metadata = match decoder.try_decode().unwrap() {
DecodeResult::Data(m) => m,
_ => panic!("could not parse metadata"),
};

assert_eq!(expected, metadata);
// the schema pointers should be the same
assert!(Arc::ptr_eq(
&expected.file_metadata().schema_descr_ptr(),
&metadata.file_metadata().schema_descr_ptr()
));
}
}
45 changes: 38 additions & 7 deletions parquet/src/file/metadata/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@

use crate::errors::ParquetError;
use crate::file::metadata::thrift::parquet_metadata_from_bytes;
use crate::file::metadata::{ColumnChunkMetaData, PageIndexPolicy, ParquetMetaData};
use crate::file::metadata::{
ColumnChunkMetaData, PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions,
};

use crate::file::page_index::column_index::ColumnIndexMetaData;
use crate::file::page_index::index_reader::{decode_column_index, decode_offset_index};
Expand Down Expand Up @@ -51,6 +53,8 @@ mod inner {
pub(crate) struct MetadataParser {
// the credentials and keys needed to decrypt metadata
file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
// metadata parsing options
metadata_options: Option<Arc<ParquetMetaDataOptions>>,
}

impl MetadataParser {
Expand All @@ -66,6 +70,16 @@ mod inner {
self
}

pub(crate) fn with_metadata_options(
self,
options: Option<Arc<ParquetMetaDataOptions>>,
) -> Self {
Self {
metadata_options: options,
..self
}
}

pub(crate) fn decode_metadata(
&self,
buf: &[u8],
Expand All @@ -76,9 +90,10 @@ mod inner {
self.file_decryption_properties.as_ref(),
encrypted_footer,
buf,
self.metadata_options.as_deref(),
)
} else {
decode_metadata(buf)
decode_metadata(buf, self.metadata_options.as_deref())
}
}
}
Expand Down Expand Up @@ -144,15 +159,28 @@ mod inner {
mod inner {
use super::*;
use crate::errors::Result;
use std::sync::Arc;
/// parallel implementation when encryption feature is not enabled
///
/// This has the same API as the encryption-enabled version
#[derive(Debug, Default)]
pub(crate) struct MetadataParser;
pub(crate) struct MetadataParser {
// metadata parsing options
metadata_options: Option<Arc<ParquetMetaDataOptions>>,
}

impl MetadataParser {
pub(crate) fn new() -> Self {
MetadataParser
MetadataParser::default()
}

pub(crate) fn with_metadata_options(
self,
options: Option<Arc<ParquetMetaDataOptions>>,
) -> Self {
Self {
metadata_options: options,
}
}

pub(crate) fn decode_metadata(
Expand All @@ -165,7 +193,7 @@ mod inner {
"Parquet file has an encrypted footer but the encryption feature is disabled"
))
} else {
decode_metadata(buf)
decode_metadata(buf, self.metadata_options.as_deref())
}
}
}
Expand Down Expand Up @@ -198,8 +226,11 @@ mod inner {
/// by the [Parquet Spec].
///
/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
pub(crate) fn decode_metadata(buf: &[u8]) -> crate::errors::Result<ParquetMetaData> {
parquet_metadata_from_bytes(buf)
pub(crate) fn decode_metadata(
buf: &[u8],
options: Option<&ParquetMetaDataOptions>,
) -> crate::errors::Result<ParquetMetaData> {
parquet_metadata_from_bytes(buf, options)
}

/// Parses column index from the provided bytes and adds it to the metadata.
Expand Down
Loading
Loading