Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add round trip tests for reading/writing parquet metadata #6463

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,146 @@ pub fn parquet_column<'a>(
.find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
Some((parquet_idx, field))
}

#[cfg(test)]
mod test {
use crate::arrow::ArrowWriter;
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter};
use crate::file::properties::{EnabledStatistics, WriterProperties};
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use bytes::Bytes;
use std::sync::Arc;

#[test]
fn test_metadata_read_write_roundtrip() {
let parquet_bytes = create_parquet_file();

// read the metadata from the file
let original_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&parquet_bytes)
.unwrap();

// read metadata back from the serialized bytes and ensure it is the same
let metadata_bytes = metadata_to_bytes(&original_metadata);
assert_ne!(
metadata_bytes.len(),
parquet_bytes.len(),
"metadata is subset of parquet"
);

let roundtrip_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&metadata_bytes)
.unwrap();

assert_eq!(original_metadata, roundtrip_metadata);
}

#[test]
fn test_metadata_read_write_roundtrip_page_index() {
let parquet_bytes = create_parquet_file();

// read the metadata from the file including the page index structures
alamb marked this conversation as resolved.
Show resolved Hide resolved
// (which are stored elsewhere in the footer)
let original_metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.parse_and_finish(&parquet_bytes)
.unwrap();

// read metadata back from the serialized bytes and ensure it is the same
let metadata_bytes = metadata_to_bytes(&original_metadata);
let roundtrip_metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.parse_and_finish(&metadata_bytes)
.unwrap();

// Need to normalize the metadata first to remove offsets in data
let original_metadata = normalize_locations(original_metadata);
let roundtrip_metadata = normalize_locations(roundtrip_metadata);
assert_eq!(
format!("{original_metadata:#?}"),
format!("{roundtrip_metadata:#?}")
);
assert_eq!(original_metadata, roundtrip_metadata);
}

#[test]
// Reproducer for https://github.com/apache/arrow-rs/issues/6464 (this should eventually pass)
#[should_panic(expected = "missing required field ColumnIndex.null_pages")]
fn test_metadata_read_write_partial_offset() {
let parquet_bytes = create_parquet_file();

// read the metadata from the file WITHOUT the page index structures
let original_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&parquet_bytes)
.unwrap();

// read metadata back from the serialized bytes requesting to read the offsets
let metadata_bytes = metadata_to_bytes(&original_metadata);
let roundtrip_metadata = ParquetMetaDataReader::new()
.with_page_indexes(true) // there are no page indexes in the metadata
.parse_and_finish(&metadata_bytes)
.unwrap();

// Need to normalize the metadata first to remove offsets in data
let original_metadata = normalize_locations(original_metadata);
let roundtrip_metadata = normalize_locations(roundtrip_metadata);
assert_eq!(
format!("{original_metadata:#?}"),
format!("{roundtrip_metadata:#?}")
);
assert_eq!(original_metadata, roundtrip_metadata);
}

// TODO: test reading parquet bytes from serialized metadata

/// Write a parquet filed into an in memory buffer
fn create_parquet_file() -> Bytes {
let mut buf = vec![];
let data = vec![100, 200, 201, 300, 102, 33];
let array: ArrayRef = Arc::new(Int32Array::from(data));
let batch = RecordBatch::try_from_iter(vec![("id", array)]).unwrap();
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.build();

let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
drop(writer);

Bytes::from(buf)
}

/// Serializes `ParquetMetaData` into a memory buffer, using `ParquetMetadataWriter
alamb marked this conversation as resolved.
Show resolved Hide resolved
fn metadata_to_bytes(metadata: &ParquetMetaData) -> Bytes {
let mut buf = vec![];
ParquetMetaDataWriter::new(&mut buf, metadata)
.finish()
.unwrap();
Bytes::from(buf)
}

/// Sets the page index offset locations in the metadata None
alamb marked this conversation as resolved.
Show resolved Hide resolved
///
/// This is because the offsets are used to find the relative location of the index
/// structures, and thus differ depending on how the structures are stored.
fn normalize_locations(metadata: ParquetMetaData) -> ParquetMetaData {
let mut metadata_builder = metadata.into_builder();
for rg in metadata_builder.take_row_groups() {
let mut rg_builder = rg.into_builder();
for col in rg_builder.take_columns() {
rg_builder = rg_builder.add_column_metadata(
col.into_builder()
.set_offset_index_offset(None)
.set_index_page_offset(None)
.set_column_index_offset(None)
.build()
.unwrap(),
);
}
let rg = rg_builder.build().unwrap();
metadata_builder = metadata_builder.add_row_group(rg);
}
metadata_builder.build()
}
}
Loading
Loading