Skip to content

Commit

Permalink
Add round trip tests for reading/writing parquet metadata (#6463)
Browse files Browse the repository at this point in the history
* Add round trip tests for reading/writing parquet metadata

* remove redundant tests

* Apply suggestions from code review

Co-authored-by: Matthijs Brobbel <m1brobbel@gmail.com>

* fix merge

---------

Co-authored-by: Matthijs Brobbel <m1brobbel@gmail.com>
  • Loading branch information
alamb and mbrobbel authored Oct 5, 2024
1 parent af10781 commit ac51632
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 295 deletions.
76 changes: 76 additions & 0 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,82 @@ mod test {
);
}

#[test]
fn test_metadata_read_write_roundtrip() {
let parquet_bytes = create_parquet_file();

// read the metadata from the file
let original_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&parquet_bytes)
.unwrap();

// read metadata back from the serialized bytes and ensure it is the same
let metadata_bytes = metadata_to_bytes(&original_metadata);
assert_ne!(
metadata_bytes.len(),
parquet_bytes.len(),
"metadata is subset of parquet"
);

let roundtrip_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&metadata_bytes)
.unwrap();

assert_eq!(original_metadata, roundtrip_metadata);
}

#[test]
fn test_metadata_read_write_roundtrip_page_index() {
let parquet_bytes = create_parquet_file();

// read the metadata from the file including the page index structures
// (which are stored elsewhere in the footer)
let original_metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.parse_and_finish(&parquet_bytes)
.unwrap();

// read metadata back from the serialized bytes and ensure it is the same
let metadata_bytes = metadata_to_bytes(&original_metadata);
let roundtrip_metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.parse_and_finish(&metadata_bytes)
.unwrap();

// Need to normalize the metadata first to remove offsets in data
let original_metadata = normalize_locations(original_metadata);
let roundtrip_metadata = normalize_locations(roundtrip_metadata);
assert_eq!(
format!("{original_metadata:#?}"),
format!("{roundtrip_metadata:#?}")
);
assert_eq!(original_metadata, roundtrip_metadata);
}

/// Sets the page index offset locations in the metadata to `None`
///
/// This is because the offsets are used to find the relative location of the index
/// structures, and thus differ depending on how the structures are stored.
fn normalize_locations(metadata: ParquetMetaData) -> ParquetMetaData {
let mut metadata_builder = metadata.into_builder();
for rg in metadata_builder.take_row_groups() {
let mut rg_builder = rg.into_builder();
for col in rg_builder.take_columns() {
rg_builder = rg_builder.add_column_metadata(
col.into_builder()
.set_offset_index_offset(None)
.set_index_page_offset(None)
.set_column_index_offset(None)
.build()
.unwrap(),
);
}
let rg = rg_builder.build().unwrap();
metadata_builder = metadata_builder.add_row_group(rg);
}
metadata_builder.build()
}

/// Write a parquet filed into an in memory buffer
fn create_parquet_file() -> Bytes {
let mut buf = vec![];
Expand Down
295 changes: 0 additions & 295 deletions parquet/src/file/metadata/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,298 +378,3 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
}
}
}

#[cfg(test)]
#[cfg(feature = "arrow")]
#[cfg(feature = "async")]
mod tests {
use std::sync::Arc;

use crate::file::metadata::{
ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter,
RowGroupMetaData,
};
use crate::file::properties::{EnabledStatistics, WriterProperties};
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::{
arrow::ArrowWriter,
file::{page_index::index::Index, serialized_reader::ReadOptionsBuilder},
};
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use arrow_schema::{DataType as ArrowDataType, Field, Schema};
use bytes::{BufMut, Bytes, BytesMut};

struct TestMetadata {
#[allow(dead_code)]
file_size: usize,
metadata: ParquetMetaData,
}

fn has_page_index(metadata: &ParquetMetaData) -> bool {
match metadata.column_index() {
Some(column_index) => column_index
.iter()
.any(|rg_idx| rg_idx.iter().all(|col_idx| !matches!(col_idx, Index::NONE))),
None => false,
}
}

#[test]
fn test_roundtrip_parquet_metadata_without_page_index() {
// We currently don't have an ad-hoc ParquetMetadata loader that can load page indexes so
// we at least test round trip without them
let metadata = get_test_metadata(false, false);
assert!(!has_page_index(&metadata.metadata));

let mut buf = BytesMut::new().writer();
{
let writer = ParquetMetaDataWriter::new(&mut buf, &metadata.metadata);
writer.finish().unwrap();
}

let data = buf.into_inner().freeze();

let decoded_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
assert!(!has_page_index(&metadata.metadata));

assert_eq!(metadata.metadata, decoded_metadata);
}

fn get_test_metadata(write_page_index: bool, read_page_index: bool) -> TestMetadata {
let mut buf = BytesMut::new().writer();
let schema: Arc<Schema> = Arc::new(Schema::new(vec![Field::new(
"a",
ArrowDataType::Int32,
true,
)]));

// build row groups / pages that exercise different combinations of nulls and values
// note that below we set the row group and page sizes to 4 and 2 respectively
// so that these "groupings" make sense
let a: ArrayRef = Arc::new(Int32Array::from(vec![
// a row group that has all values
Some(i32::MIN),
Some(-1),
Some(1),
Some(i32::MAX),
// a row group with a page of all nulls and a page of all values
None,
None,
Some(2),
Some(3),
// a row group that has all null pages
None,
None,
None,
None,
// a row group having 1 page with all values and 1 page with some nulls
Some(4),
Some(5),
None,
Some(6),
// a row group having 1 page with all nulls and 1 page with some nulls
None,
None,
Some(7),
None,
// a row group having all pages with some nulls
None,
Some(8),
Some(9),
None,
]));

let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();

let writer_props_builder = match write_page_index {
true => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Page),
false => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Chunk),
};

// tune the size or pages to the data above
// to make sure we exercise code paths where all items in a page are null, etc.
let writer_props = writer_props_builder
.set_max_row_group_size(4)
.set_data_page_row_count_limit(2)
.set_write_batch_size(2)
.build();

let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(writer_props)).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

let data = buf.into_inner().freeze();

let reader_opts = match read_page_index {
true => ReadOptionsBuilder::new().with_page_index().build(),
false => ReadOptionsBuilder::new().build(),
};
let reader = SerializedFileReader::new_with_options(data.clone(), reader_opts).unwrap();
let metadata = reader.metadata().clone();
TestMetadata {
file_size: data.len(),
metadata,
}
}

/// Temporary function so we can test loading metadata with page indexes
/// while we haven't fully figured out how to load it cleanly
async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> ParquetMetaData {
use crate::arrow::async_reader::MetadataFetch;
use crate::errors::Result as ParquetResult;
use futures::future::BoxFuture;
use futures::FutureExt;
use std::ops::Range;

/// Adapt a `Bytes` to a `MetadataFetch` implementation.
struct AsyncBytes {
data: Bytes,
}

impl AsyncBytes {
fn new(data: Bytes) -> Self {
Self { data }
}
}

impl MetadataFetch for AsyncBytes {
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, ParquetResult<Bytes>> {
async move { Ok(self.data.slice(range.start..range.end)) }.boxed()
}
}

/// A `MetadataFetch` implementation that reads from a subset of the full data
/// while accepting ranges that address the full data.
struct MaskedBytes {
inner: Box<dyn MetadataFetch + Send>,
inner_range: Range<usize>,
}

impl MaskedBytes {
fn new(inner: Box<dyn MetadataFetch + Send>, inner_range: Range<usize>) -> Self {
Self { inner, inner_range }
}
}

impl MetadataFetch for &mut MaskedBytes {
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, ParquetResult<Bytes>> {
let inner_range = self.inner_range.clone();
println!("inner_range: {:?}", inner_range);
println!("range: {:?}", range);
assert!(inner_range.start <= range.start && inner_range.end >= range.end);
let range =
range.start - self.inner_range.start..range.end - self.inner_range.start;
self.inner.fetch(range)
}
}

let metadata_length = data.len();
let mut reader = MaskedBytes::new(
Box::new(AsyncBytes::new(data)),
file_size - metadata_length..file_size,
);
ParquetMetaDataReader::new()
.with_page_indexes(true)
.load_and_finish(&mut reader, file_size)
.await
.unwrap()
}

fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right: &ColumnChunkMetaData) {
assert_eq!(left.column_descr(), right.column_descr());
assert_eq!(left.encodings(), right.encodings());
assert_eq!(left.num_values(), right.num_values());
assert_eq!(left.compressed_size(), right.compressed_size());
assert_eq!(left.data_page_offset(), right.data_page_offset());
assert_eq!(left.statistics(), right.statistics());
assert_eq!(left.offset_index_length(), right.offset_index_length());
assert_eq!(left.column_index_length(), right.column_index_length());
assert_eq!(
left.unencoded_byte_array_data_bytes(),
right.unencoded_byte_array_data_bytes()
);
}

fn check_row_groups_are_equivalent(left: &RowGroupMetaData, right: &RowGroupMetaData) {
assert_eq!(left.num_rows(), right.num_rows());
assert_eq!(left.file_offset(), right.file_offset());
assert_eq!(left.total_byte_size(), right.total_byte_size());
assert_eq!(left.schema_descr(), right.schema_descr());
assert_eq!(left.num_columns(), right.num_columns());
left.columns()
.iter()
.zip(right.columns().iter())
.for_each(|(lc, rc)| {
check_columns_are_equivalent(lc, rc);
});
}

#[tokio::test]
async fn test_encode_parquet_metadata_with_page_index() {
// Create a ParquetMetadata with page index information
let metadata = get_test_metadata(true, true);
assert!(has_page_index(&metadata.metadata));

let mut buf = BytesMut::new().writer();
{
let writer = ParquetMetaDataWriter::new(&mut buf, &metadata.metadata);
writer.finish().unwrap();
}

let data = buf.into_inner().freeze();

let decoded_metadata = load_metadata_from_bytes(data.len(), data).await;

// Because the page index offsets will differ, compare invariant parts of the metadata
assert_eq!(
metadata.metadata.file_metadata(),
decoded_metadata.file_metadata()
);
assert_eq!(
metadata.metadata.column_index(),
decoded_metadata.column_index()
);
assert_eq!(
metadata.metadata.offset_index(),
decoded_metadata.offset_index()
);
assert_eq!(
metadata.metadata.num_row_groups(),
decoded_metadata.num_row_groups()
);

// check that the mins and maxes are what we expect for each page
// also indirectly checking that the pages were written out as we expected them to be laid out
// (if they're not, or something gets refactored in the future that breaks that assumption,
// this test may have to drop down to a lower level and create metadata directly instead of relying on
// writing an entire file)
let column_indexes = metadata.metadata.column_index().unwrap();
assert_eq!(column_indexes.len(), 6);
// make sure each row group has 2 pages by checking the first column
// page counts for each column for each row group, should all be the same and there should be
// 12 pages in total across 6 row groups / 1 column
let mut page_counts = vec![];
for row_group in column_indexes {
for column in row_group {
match column {
Index::INT32(column_index) => {
page_counts.push(column_index.indexes.len());
}
_ => panic!("unexpected column index type"),
}
}
}
assert_eq!(page_counts, vec![2; 6]);

metadata
.metadata
.row_groups()
.iter()
.zip(decoded_metadata.row_groups().iter())
.for_each(|(left, right)| {
check_row_groups_are_equivalent(left, right);
});
}
}

0 comments on commit ac51632

Please sign in to comment.