From 1e5845616ac5bec7c0d5dfd690a7e2ddf0dc60cc Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 26 Sep 2024 10:26:48 -0400 Subject: [PATCH 1/4] Add round trip tests for reading/writing parquet metadata --- parquet/src/arrow/mod.rs | 143 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index afe7ed1ebec9..2dc2a5ed755a 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -236,3 +236,146 @@ pub fn parquet_column<'a>( .find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?; Some((parquet_idx, field)) } + +#[cfg(test)] +mod test { + use crate::arrow::ArrowWriter; + use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter}; + use crate::file::properties::{EnabledStatistics, WriterProperties}; + use arrow_array::{ArrayRef, Int32Array, RecordBatch}; + use bytes::Bytes; + use std::sync::Arc; + + #[test] + fn test_metadata_read_write_roundtrip() { + let parquet_bytes = create_parquet_file(); + + // read the metadata from the file + let original_metadata = ParquetMetaDataReader::new() + .parse_and_finish(&parquet_bytes) + .unwrap(); + + // read metadata back from the serialized bytes and ensure it is the same + let metadata_bytes = metadata_to_bytes(&original_metadata); + assert_ne!( + metadata_bytes.len(), + parquet_bytes.len(), + "metadata is subset of parquet" + ); + + let roundtrip_metadata = ParquetMetaDataReader::new() + .parse_and_finish(&metadata_bytes) + .unwrap(); + + assert_eq!(original_metadata, roundtrip_metadata); + } + + #[test] + fn test_metadata_read_write_roundtrip_page_index() { + let parquet_bytes = create_parquet_file(); + + // read the metadata from the file including the page index structures + // (which are stored elsewhere in the footer) + let original_metadata = ParquetMetaDataReader::new() + .with_page_indexes(true) + .parse_and_finish(&parquet_bytes) + .unwrap(); + + // read metadata back from the serialized bytes and ensure it is the same + let metadata_bytes = metadata_to_bytes(&original_metadata); + let roundtrip_metadata = ParquetMetaDataReader::new() + .with_page_indexes(true) + .parse_and_finish(&metadata_bytes) + .unwrap(); + + // Need to normalize the metadata first to remove offsets in data + let original_metadata = normalize_locations(original_metadata); + let roundtrip_metadata = normalize_locations(roundtrip_metadata); + assert_eq!( + format!("{original_metadata:#?}"), + format!("{roundtrip_metadata:#?}") + ); + assert_eq!(original_metadata, roundtrip_metadata); + } + + #[test] + // Reproducer for https://github.com/apache/arrow-rs/issues/6464 (this should eventually pass) + #[should_panic(expected = "missing required field ColumnIndex.null_pages")] + fn test_metadata_read_write_partial_offset() { + let parquet_bytes = create_parquet_file(); + + // read the metadata from the file WITHOUT the page index structures + let original_metadata = ParquetMetaDataReader::new() + .parse_and_finish(&parquet_bytes) + .unwrap(); + + // read metadata back from the serialized bytes requesting to read the offsets + let metadata_bytes = metadata_to_bytes(&original_metadata); + let roundtrip_metadata = ParquetMetaDataReader::new() + .with_page_indexes(true) // there are no page indexes in the metadata + .parse_and_finish(&metadata_bytes) + .unwrap(); + + // Need to normalize the metadata first to remove offsets in data + let original_metadata = normalize_locations(original_metadata); + let roundtrip_metadata = normalize_locations(roundtrip_metadata); + assert_eq!( + format!("{original_metadata:#?}"), + format!("{roundtrip_metadata:#?}") + ); + assert_eq!(original_metadata, roundtrip_metadata); + } + + // TODO: test reading parquet bytes from serialized metadata + + /// Write a parquet filed into an in memory buffer + fn create_parquet_file() -> Bytes { + let mut buf = vec![]; + let data = vec![100, 200, 201, 300, 102, 33]; + let array: ArrayRef = Arc::new(Int32Array::from(data)); + let batch = RecordBatch::try_from_iter(vec![("id", array)]).unwrap(); + let props = WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Page) + .build(); + + let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + drop(writer); + + Bytes::from(buf) + } + + /// Serializes `ParquetMetaData` into a memory buffer, using `ParquetMetadataWriter + fn metadata_to_bytes(metadata: &ParquetMetaData) -> Bytes { + let mut buf = vec![]; + ParquetMetaDataWriter::new(&mut buf, metadata) + .finish() + .unwrap(); + Bytes::from(buf) + } + + /// Sets the page index offset locations in the metadata None + /// + /// This is because the offsets are used to find the relative location of the index + /// structures, and thus differ depending on how the structures are stored. + fn normalize_locations(metadata: ParquetMetaData) -> ParquetMetaData { + let mut metadata_builder = metadata.into_builder(); + for rg in metadata_builder.take_row_groups() { + let mut rg_builder = rg.into_builder(); + for col in rg_builder.take_columns() { + rg_builder = rg_builder.add_column_metadata( + col.into_builder() + .set_offset_index_offset(None) + .set_index_page_offset(None) + .set_column_index_offset(None) + .build() + .unwrap(), + ); + } + let rg = rg_builder.build().unwrap(); + metadata_builder = metadata_builder.add_row_group(rg); + } + metadata_builder.build() + } +} From 4f6785f07cb324e076456d71457c9d4f992ec22a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 26 Sep 2024 15:56:16 -0400 Subject: [PATCH 2/4] remove redundant tests --- parquet/src/file/metadata/writer.rs | 295 ---------------------------- 1 file changed, 295 deletions(-) diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index 69a939e00f55..87f1fdebd91e 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -378,298 +378,3 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> { } } } - -#[cfg(test)] -#[cfg(feature = "arrow")] -#[cfg(feature = "async")] -mod tests { - use std::sync::Arc; - - use crate::file::metadata::{ - ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter, - RowGroupMetaData, - }; - use crate::file::properties::{EnabledStatistics, WriterProperties}; - use crate::file::reader::{FileReader, SerializedFileReader}; - use crate::{ - arrow::ArrowWriter, - file::{page_index::index::Index, serialized_reader::ReadOptionsBuilder}, - }; - use arrow_array::{ArrayRef, Int32Array, RecordBatch}; - use arrow_schema::{DataType as ArrowDataType, Field, Schema}; - use bytes::{BufMut, Bytes, BytesMut}; - - struct TestMetadata { - #[allow(dead_code)] - file_size: usize, - metadata: ParquetMetaData, - } - - fn has_page_index(metadata: &ParquetMetaData) -> bool { - match metadata.column_index() { - Some(column_index) => column_index - .iter() - .any(|rg_idx| rg_idx.iter().all(|col_idx| !matches!(col_idx, Index::NONE))), - None => false, - } - } - - #[test] - fn test_roundtrip_parquet_metadata_without_page_index() { - // We currently don't have an ad-hoc ParquetMetadata loader that can load page indexes so - // we at least test round trip without them - let metadata = get_test_metadata(false, false); - assert!(!has_page_index(&metadata.metadata)); - - let mut buf = BytesMut::new().writer(); - { - let writer = ParquetMetaDataWriter::new(&mut buf, &metadata.metadata); - writer.finish().unwrap(); - } - - let data = buf.into_inner().freeze(); - - let decoded_metadata = ParquetMetaDataReader::new() - .parse_and_finish(&data) - .unwrap(); - assert!(!has_page_index(&metadata.metadata)); - - assert_eq!(metadata.metadata, decoded_metadata); - } - - fn get_test_metadata(write_page_index: bool, read_page_index: bool) -> TestMetadata { - let mut buf = BytesMut::new().writer(); - let schema: Arc = Arc::new(Schema::new(vec![Field::new( - "a", - ArrowDataType::Int32, - true, - )])); - - // build row groups / pages that exercise different combinations of nulls and values - // note that below we set the row group and page sizes to 4 and 2 respectively - // so that these "groupings" make sense - let a: ArrayRef = Arc::new(Int32Array::from(vec![ - // a row group that has all values - Some(i32::MIN), - Some(-1), - Some(1), - Some(i32::MAX), - // a row group with a page of all nulls and a page of all values - None, - None, - Some(2), - Some(3), - // a row group that has all null pages - None, - None, - None, - None, - // a row group having 1 page with all values and 1 page with some nulls - Some(4), - Some(5), - None, - Some(6), - // a row group having 1 page with all nulls and 1 page with some nulls - None, - None, - Some(7), - None, - // a row group having all pages with some nulls - None, - Some(8), - Some(9), - None, - ])); - - let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); - - let writer_props_builder = match write_page_index { - true => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Page), - false => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Chunk), - }; - - // tune the size or pages to the data above - // to make sure we exercise code paths where all items in a page are null, etc. - let writer_props = writer_props_builder - .set_max_row_group_size(4) - .set_data_page_row_count_limit(2) - .set_write_batch_size(2) - .build(); - - let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(writer_props)).unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); - - let data = buf.into_inner().freeze(); - - let reader_opts = match read_page_index { - true => ReadOptionsBuilder::new().with_page_index().build(), - false => ReadOptionsBuilder::new().build(), - }; - let reader = SerializedFileReader::new_with_options(data.clone(), reader_opts).unwrap(); - let metadata = reader.metadata().clone(); - TestMetadata { - file_size: data.len(), - metadata, - } - } - - /// Temporary function so we can test loading metadata with page indexes - /// while we haven't fully figured out how to load it cleanly - async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> ParquetMetaData { - use crate::arrow::async_reader::MetadataFetch; - use crate::errors::Result as ParquetResult; - use futures::future::BoxFuture; - use futures::FutureExt; - use std::ops::Range; - - /// Adapt a `Bytes` to a `MetadataFetch` implementation. - struct AsyncBytes { - data: Bytes, - } - - impl AsyncBytes { - fn new(data: Bytes) -> Self { - Self { data } - } - } - - impl MetadataFetch for AsyncBytes { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, ParquetResult> { - async move { Ok(self.data.slice(range.start..range.end)) }.boxed() - } - } - - /// A `MetadataFetch` implementation that reads from a subset of the full data - /// while accepting ranges that address the full data. - struct MaskedBytes { - inner: Box, - inner_range: Range, - } - - impl MaskedBytes { - fn new(inner: Box, inner_range: Range) -> Self { - Self { inner, inner_range } - } - } - - impl MetadataFetch for &mut MaskedBytes { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, ParquetResult> { - let inner_range = self.inner_range.clone(); - println!("inner_range: {:?}", inner_range); - println!("range: {:?}", range); - assert!(inner_range.start <= range.start && inner_range.end >= range.end); - let range = - range.start - self.inner_range.start..range.end - self.inner_range.start; - self.inner.fetch(range) - } - } - - let metadata_length = data.len(); - let mut reader = MaskedBytes::new( - Box::new(AsyncBytes::new(data)), - file_size - metadata_length..file_size, - ); - ParquetMetaDataReader::new() - .with_page_indexes(true) - .load_and_finish(&mut reader, file_size) - .await - .unwrap() - } - - fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right: &ColumnChunkMetaData) { - assert_eq!(left.column_descr(), right.column_descr()); - assert_eq!(left.encodings(), right.encodings()); - assert_eq!(left.num_values(), right.num_values()); - assert_eq!(left.compressed_size(), right.compressed_size()); - assert_eq!(left.data_page_offset(), right.data_page_offset()); - assert_eq!(left.statistics(), right.statistics()); - assert_eq!(left.offset_index_length(), right.offset_index_length()); - assert_eq!(left.column_index_length(), right.column_index_length()); - assert_eq!( - left.unencoded_byte_array_data_bytes(), - right.unencoded_byte_array_data_bytes() - ); - } - - fn check_row_groups_are_equivalent(left: &RowGroupMetaData, right: &RowGroupMetaData) { - assert_eq!(left.num_rows(), right.num_rows()); - assert_eq!(left.file_offset(), right.file_offset()); - assert_eq!(left.total_byte_size(), right.total_byte_size()); - assert_eq!(left.schema_descr(), right.schema_descr()); - assert_eq!(left.num_columns(), right.num_columns()); - left.columns() - .iter() - .zip(right.columns().iter()) - .for_each(|(lc, rc)| { - check_columns_are_equivalent(lc, rc); - }); - } - - #[tokio::test] - async fn test_encode_parquet_metadata_with_page_index() { - // Create a ParquetMetadata with page index information - let metadata = get_test_metadata(true, true); - assert!(has_page_index(&metadata.metadata)); - - let mut buf = BytesMut::new().writer(); - { - let writer = ParquetMetaDataWriter::new(&mut buf, &metadata.metadata); - writer.finish().unwrap(); - } - - let data = buf.into_inner().freeze(); - - let decoded_metadata = load_metadata_from_bytes(data.len(), data).await; - - // Because the page index offsets will differ, compare invariant parts of the metadata - assert_eq!( - metadata.metadata.file_metadata(), - decoded_metadata.file_metadata() - ); - assert_eq!( - metadata.metadata.column_index(), - decoded_metadata.column_index() - ); - assert_eq!( - metadata.metadata.offset_index(), - decoded_metadata.offset_index() - ); - assert_eq!( - metadata.metadata.num_row_groups(), - decoded_metadata.num_row_groups() - ); - - // check that the mins and maxes are what we expect for each page - // also indirectly checking that the pages were written out as we expected them to be laid out - // (if they're not, or something gets refactored in the future that breaks that assumption, - // this test may have to drop down to a lower level and create metadata directly instead of relying on - // writing an entire file) - let column_indexes = metadata.metadata.column_index().unwrap(); - assert_eq!(column_indexes.len(), 6); - // make sure each row group has 2 pages by checking the first column - // page counts for each column for each row group, should all be the same and there should be - // 12 pages in total across 6 row groups / 1 column - let mut page_counts = vec![]; - for row_group in column_indexes { - for column in row_group { - match column { - Index::INT32(column_index) => { - page_counts.push(column_index.indexes.len()); - } - _ => panic!("unexpected column index type"), - } - } - } - assert_eq!(page_counts, vec![2; 6]); - - metadata - .metadata - .row_groups() - .iter() - .zip(decoded_metadata.row_groups().iter()) - .for_each(|(left, right)| { - check_row_groups_are_equivalent(left, right); - }); - } -} From b0ec8d0b31bb94f26ab6b95f8930a2058fea52bf Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 4 Oct 2024 18:38:16 -0400 Subject: [PATCH 3/4] Apply suggestions from code review Co-authored-by: Matthijs Brobbel --- parquet/src/arrow/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 2dc2a5ed755a..b3e3487fc4be 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -274,7 +274,7 @@ mod test { fn test_metadata_read_write_roundtrip_page_index() { let parquet_bytes = create_parquet_file(); - // read the metadata from the file including the page index structures + // read the metadata from the file including the page index structures // (which are stored elsewhere in the footer) let original_metadata = ParquetMetaDataReader::new() .with_page_indexes(true) @@ -346,7 +346,7 @@ mod test { Bytes::from(buf) } - /// Serializes `ParquetMetaData` into a memory buffer, using `ParquetMetadataWriter + /// Serializes `ParquetMetaData` into a memory buffer, using `ParquetMetadataWriter` fn metadata_to_bytes(metadata: &ParquetMetaData) -> Bytes { let mut buf = vec![]; ParquetMetaDataWriter::new(&mut buf, metadata) @@ -355,7 +355,7 @@ mod test { Bytes::from(buf) } - /// Sets the page index offset locations in the metadata None + /// Sets the page index offset locations in the metadata to `None` /// /// This is because the offsets are used to find the relative location of the index /// structures, and thus differ depending on how the structures are stored. From cda9f34941d738e6fba62ffce83e0d94999c641a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 4 Oct 2024 18:45:28 -0400 Subject: [PATCH 4/4] fix merge --- parquet/src/arrow/mod.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 19907bbbf053..2d09cd19203f 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -246,6 +246,29 @@ mod test { use bytes::Bytes; use std::sync::Arc; + #[test] + // Reproducer for https://github.com/apache/arrow-rs/issues/6464 + fn test_metadata_read_write_partial_offset() { + let parquet_bytes = create_parquet_file(); + + // read the metadata from the file WITHOUT the page index structures + let original_metadata = ParquetMetaDataReader::new() + .parse_and_finish(&parquet_bytes) + .unwrap(); + + // this should error because the page indexes are not present, but have offsets specified + let metadata_bytes = metadata_to_bytes(&original_metadata); + let err = ParquetMetaDataReader::new() + .with_page_indexes(true) // there are no page indexes in the metadata + .parse_and_finish(&metadata_bytes) + .err() + .unwrap(); + assert_eq!( + err.to_string(), + "EOF: Parquet file too small. Page index range 82..115 overlaps with file metadata 0..341" + ); + } + #[test] fn test_metadata_read_write_roundtrip() { let parquet_bytes = create_parquet_file();