diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 6d99c912fd9d..8ab78efe0cb7 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -121,6 +121,12 @@ sysinfo = ["dep:sysinfo"] # Verify 32-bit CRC checksum when decoding parquet pages crc = ["dep:crc32fast"] + +[[example]] +name = "external_metadata" +required-features = ["arrow", "async"] +path = "./examples/external_metadata.rs" + [[example]] name = "read_parquet" required-features = ["arrow"] diff --git a/parquet/examples/external_metadata.rs b/parquet/examples/external_metadata.rs new file mode 100644 index 000000000000..5ca322ee2890 --- /dev/null +++ b/parquet/examples/external_metadata.rs @@ -0,0 +1,236 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray}; +use arrow_cast::pretty::pretty_format_batches; +use futures::TryStreamExt; +use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; +use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder}; +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter}; +use parquet::file::properties::{EnabledStatistics, WriterProperties}; +use std::fs::File; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tempfile::TempDir; + +/// This example demonstrates advanced usage of the Parquet metadata APIs. +/// +/// It is sometimes desired to copy the metadata for parquet files stored on +/// remote object storage (e.g. S3) to a local file or an in-memory cache, use a +/// query engine like DataFusion to analyze the metadata to determine which file +/// to read, and then read any matching files with a single subsequent object +/// store request. +/// +/// # Example Overview +/// +/// 1. Reads the metadata of a Parquet file using [`ParquetMetaDataReader`] +/// +/// 2. Removes column statistics from the metadata (to make it smaller) +/// +/// 3. Stores the metadata in a separate file using [`ParquetMetaDataWriter`] +/// +/// 4. Reads the metadata from the separate file and uses that to read the +/// Parquet file, thus avoiding a second IO to read metadata or reparsing +/// the footer. +/// +#[tokio::main(flavor = "current_thread")] +async fn main() -> parquet::errors::Result<()> { + let tempdir = TempDir::new().unwrap(); + let parquet_path = create_parquet_file(&tempdir); + let metadata_path = tempdir.path().join("thrift_metadata.dat"); + + // In this example, we use a tokio file to mimic an async remote data source + let mut remote_parquet_file = tokio::fs::File::open(&parquet_path).await?; + + let metadata = get_metadata_from_remote_parquet_file(&mut remote_parquet_file).await; + println!( + "Metadata from 'remote' Parquet file into memory: {} bytes", + metadata.memory_size() + ); + + // now slim down the metadata and write it to a "local" file + let metadata = prepare_metadata(metadata); + write_metadata_to_local_file(metadata, &metadata_path); + + // now read the metadata from the local file and use it to read the "remote" Parquet file + let metadata = read_metadata_from_local_file(&metadata_path); + println!("Read metadata from file"); + + let batches = read_remote_parquet_file_with_metadata(remote_parquet_file, metadata).await; + + // display the results + let batches_string = pretty_format_batches(&batches).unwrap().to_string(); + let batches_lines: Vec<_> = batches_string.split('\n').collect(); + + assert_eq!( + batches_lines, + [ + "+-----+-------------+", + "| id | description |", + "+-----+-------------+", + "| 100 | oranges |", + "| 200 | apples |", + "| 201 | grapefruit |", + "| 300 | bannanas |", + "| 102 | grapes |", + "| 33 | pears |", + "+-----+-------------+", + ], + "actual output:\n\n{batches_lines:#?}" + ); + + Ok(()) +} + +/// Reads the metadata from a "remote" parquet file +/// +/// Note that this function models reading from a remote file source using a +/// tokio file. In a real application, you would implement [`MetadataFetch`] for +/// your own remote source. +/// +/// [`MetadataFetch`]: parquet::arrow::async_reader::MetadataFetch +async fn get_metadata_from_remote_parquet_file( + remote_file: &mut tokio::fs::File, +) -> ParquetMetaData { + // the remote source must know the total file size (e.g. from an object store LIST operation) + let file_size = remote_file.metadata().await.unwrap().len(); + + // tell the reader to read the page index + ParquetMetaDataReader::new() + .with_page_indexes(true) + .load_and_finish(remote_file, file_size as usize) + .await + .unwrap() +} + +/// modifies the metadata to reduce its size +fn prepare_metadata(metadata: ParquetMetaData) -> ParquetMetaData { + let orig_size = metadata.memory_size(); + + let mut builder = metadata.into_builder(); + + // remove column statistics to reduce the size of the metadata by converting + // the various structures into their respective builders and modifying them + // as needed. + for row_group in builder.take_row_groups() { + let mut row_group_builder = row_group.into_builder(); + for column in row_group_builder.take_columns() { + let column = column.into_builder().clear_statistics().build().unwrap(); + row_group_builder = row_group_builder.add_column_metadata(column); + } + let row_group = row_group_builder.build().unwrap(); + builder = builder.add_row_group(row_group); + } + let metadata = builder.build(); + + // verifiy that the size has indeed been reduced + let new_size = metadata.memory_size(); + assert!(new_size < orig_size, "metadata size did not decrease"); + println!("Reduced metadata size from {} to {}", orig_size, new_size); + metadata +} + +/// writes the metadata to a file +/// +/// The data is stored using the same thrift format as the Parquet file metadata +fn write_metadata_to_local_file(metadata: ParquetMetaData, file: impl AsRef) { + let file = File::create(file).unwrap(); + ParquetMetaDataWriter::new(file, &metadata) + .finish() + .unwrap() +} + +/// Reads the metadata from a file +/// +/// This function reads the format written by `write_metadata_to_file` +fn read_metadata_from_local_file(file: impl AsRef) -> ParquetMetaData { + let file = File::open(file).unwrap(); + ParquetMetaDataReader::new() + .with_page_indexes(true) + .parse_and_finish(&file) + .unwrap() +} + +/// Reads the "remote" Parquet file using the metadata +/// +/// This shows how to read the Parquet file using previously read metadata +/// instead of the metadata in the Parquet file itself. This avoids an IO / +/// having to fetch and decode the metadata from the Parquet file before +/// beginning to read it. +/// +/// Note that this function models reading from a remote file source using a +/// tokio file. In a real application, you would implement [`AsyncFileReader`] +/// for your own remote source. +/// +/// In this example, we simply buffer the results in memory as Arrow record +/// batches but a real application would likely process the batches as they are +/// read. +/// +/// [`AsyncFileReader`]: parquet::arrow::async_reader::AsyncFileReader +async fn read_remote_parquet_file_with_metadata( + remote_file: tokio::fs::File, + metadata: ParquetMetaData, +) -> Vec { + let options = ArrowReaderOptions::new() + // tell the reader to read the page index + .with_page_index(true); + // create a reader with pre-existing metadata + let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap(); + let reader = + ParquetRecordBatchStreamBuilder::new_with_metadata(remote_file, arrow_reader_metadata) + .build() + .unwrap(); + + reader.try_collect::>().await.unwrap() +} + +/// Make a new parquet file in the temporary directory, and returns the path +fn create_parquet_file(tmpdir: &TempDir) -> PathBuf { + let path = tmpdir.path().join("example.parquet"); + let new_file = File::create(&path).unwrap(); + + let batch = RecordBatch::try_from_iter(vec![ + ( + "id", + Arc::new(Int32Array::from(vec![100, 200, 201, 300, 102, 33])) as ArrayRef, + ), + ( + "description", + Arc::new(StringArray::from(vec![ + "oranges", + "apples", + "grapefruit", + "bannanas", + "grapes", + "pears", + ])), + ), + ]) + .unwrap(); + + let props = WriterProperties::builder() + // ensure we write the page index level statistics + .set_statistics_enabled(EnabledStatistics::Page) + .build(); + + let mut writer = ArrowWriter::try_new(new_file, batch.schema(), Some(props)).unwrap(); + + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + + path +} diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index fa56894f239c..32b985710023 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -36,30 +36,27 @@ //! //! # APIs for working with Parquet Metadata //! -//! The Parquet readers and writers in this crate read and write +//! The Parquet readers and writers in this crate handle reading and writing //! metadata into parquet files. To work with metadata directly, -//! the following APIs are available. +//! the following APIs are available: //! -//! Reading: -//! * Read from bytes to `ParquetMetaData`: [`decode_footer`] -//! and [`decode_metadata`] -//! * Read from an `async` source to `ParquetMetaData`: [`MetadataLoader`] -//! * Read from bytes or from an async source to `ParquetMetaData`: [`ParquetMetaDataReader`] +//! * [`ParquetMetaDataReader`] for reading +//! * [`ParquetMetaDataWriter`] for writing. //! -//! [`MetadataLoader`]: https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.MetadataLoader.html -//! [`decode_footer`]: crate::file::footer::decode_footer -//! [`decode_metadata`]: crate::file::footer::decode_metadata +//! [`ParquetMetaDataReader`]: https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html +//! [`ParquetMetaDataWriter`]: https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataWriter.html //! -//! Writing: -//! * Write `ParquetMetaData` to bytes in memory: [`ParquetMetaDataWriter`] -//! * Writes `ParquetMetaData` to an async target: Not yet supported +//! # Examples //! +//! Please see [`external_metadata.rs`] +//! +//! [`external_metadata.rs`]: https://github.com/apache/arrow-rs/tree/master/parquet/examples/external_metadata.rs //! //! # Metadata Encodings and Structures //! //! There are three different encodings of Parquet Metadata in this crate: //! -//! 1. `bytes`:encoded with the Thrift TCompactProtocol as defined in +//! 1. `bytes`:encoded with the Thrift `TCompactProtocol` as defined in //! [parquet.thrift] //! //! 2. [`format`]: Rust structures automatically generated by the thrift compiler