Skip to content

Commit

Permalink
remove redundant tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Sep 26, 2024
1 parent 51516af commit 9045968
Showing 1 changed file with 0 additions and 297 deletions.
297 changes: 0 additions & 297 deletions parquet/src/file/metadata/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,300 +377,3 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
}
}
}

#[cfg(test)]
#[cfg(feature = "arrow")]
#[cfg(feature = "async")]
mod tests {
use std::sync::Arc;

use crate::file::metadata::{
ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter,
RowGroupMetaData,
};
use crate::file::properties::{EnabledStatistics, WriterProperties};
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::{
arrow::ArrowWriter,
file::{page_index::index::Index, serialized_reader::ReadOptionsBuilder},
};
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use arrow_schema::{DataType as ArrowDataType, Field, Schema};
use bytes::{BufMut, Bytes, BytesMut};

struct TestMetadata {
#[allow(dead_code)]
file_size: usize,
metadata: ParquetMetaData,
}

fn has_page_index(metadata: &ParquetMetaData) -> bool {
match metadata.column_index() {
Some(column_index) => column_index
.iter()
.any(|rg_idx| rg_idx.iter().all(|col_idx| !matches!(col_idx, Index::NONE))),
None => false,
}
}

#[test]
fn test_roundtrip_parquet_metadata_without_page_index() {
// We currently don't have an ad-hoc ParquetMetadata loader that can load page indexes so
// we at least test round trip without them
let metadata = get_test_metadata(false, false);
assert!(!has_page_index(&metadata.metadata));

let mut buf = BytesMut::new().writer();
{
let writer = ParquetMetaDataWriter::new(&mut buf, &metadata.metadata);
writer.finish().unwrap();
}

let data = buf.into_inner().freeze();

let decoded_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
assert!(!has_page_index(&metadata.metadata));

assert_eq!(metadata.metadata, decoded_metadata);
}

fn get_test_metadata(write_page_index: bool, read_page_index: bool) -> TestMetadata {
let mut buf = BytesMut::new().writer();
let schema: Arc<Schema> = Arc::new(Schema::new(vec![Field::new(
"a",
ArrowDataType::Int32,
true,
)]));

// build row groups / pages that exercise different combinations of nulls and values
// note that below we set the row group and page sizes to 4 and 2 respectively
// so that these "groupings" make sense
let a: ArrayRef = Arc::new(Int32Array::from(vec![
// a row group that has all values
Some(i32::MIN),
Some(-1),
Some(1),
Some(i32::MAX),
// a row group with a page of all nulls and a page of all values
None,
None,
Some(2),
Some(3),
// a row group that has all null pages
None,
None,
None,
None,
// a row group having 1 page with all values and 1 page with some nulls
Some(4),
Some(5),
None,
Some(6),
// a row group having 1 page with all nulls and 1 page with some nulls
None,
None,
Some(7),
None,
// a row group having all pages with some nulls
None,
Some(8),
Some(9),
None,
]));

let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();

let writer_props_builder = match write_page_index {
true => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Page),
false => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Chunk),
};

// tune the size or pages to the data above
// to make sure we exercise code paths where all items in a page are null, etc.
let writer_props = writer_props_builder
.set_max_row_group_size(4)
.set_data_page_row_count_limit(2)
.set_write_batch_size(2)
.build();

let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(writer_props)).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

let data = buf.into_inner().freeze();

let reader_opts = match read_page_index {
true => ReadOptionsBuilder::new().with_page_index().build(),
false => ReadOptionsBuilder::new().build(),
};
let reader = SerializedFileReader::new_with_options(data.clone(), reader_opts).unwrap();
let metadata = reader.metadata().clone();
TestMetadata {
file_size: data.len(),
metadata,
}
}

/// Temporary function so we can test loading metadata with page indexes
/// while we haven't fully figured out how to load it cleanly
async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> ParquetMetaData {
use crate::arrow::async_reader::{MetadataFetch, MetadataLoader};
use crate::errors::Result as ParquetResult;
use futures::future::BoxFuture;
use futures::FutureExt;
use std::ops::Range;

/// Adapt a `Bytes` to a `MetadataFetch` implementation.
struct AsyncBytes {
data: Bytes,
}

impl AsyncBytes {
fn new(data: Bytes) -> Self {
Self { data }
}
}

impl MetadataFetch for AsyncBytes {
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, ParquetResult<Bytes>> {
async move { Ok(self.data.slice(range.start..range.end)) }.boxed()
}
}

/// A `MetadataFetch` implementation that reads from a subset of the full data
/// while accepting ranges that address the full data.
struct MaskedBytes {
inner: Box<dyn MetadataFetch + Send>,
inner_range: Range<usize>,
}

impl MaskedBytes {
fn new(inner: Box<dyn MetadataFetch + Send>, inner_range: Range<usize>) -> Self {
Self { inner, inner_range }
}
}

impl MetadataFetch for &mut MaskedBytes {
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, ParquetResult<Bytes>> {
let inner_range = self.inner_range.clone();
println!("inner_range: {:?}", inner_range);
println!("range: {:?}", range);
assert!(inner_range.start <= range.start && inner_range.end >= range.end);
let range =
range.start - self.inner_range.start..range.end - self.inner_range.start;
self.inner.fetch(range)
}
}

let metadata_length = data.len();
let mut reader = MaskedBytes::new(
Box::new(AsyncBytes::new(data)),
file_size - metadata_length..file_size,
);
let metadata = MetadataLoader::load(&mut reader, file_size, None)
.await
.unwrap();
let loaded_metadata = metadata.finish();
let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata);
metadata.load_page_index(true, true).await.unwrap();
metadata.finish()
}

fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right: &ColumnChunkMetaData) {
assert_eq!(left.column_descr(), right.column_descr());
assert_eq!(left.encodings(), right.encodings());
assert_eq!(left.num_values(), right.num_values());
assert_eq!(left.compressed_size(), right.compressed_size());
assert_eq!(left.data_page_offset(), right.data_page_offset());
assert_eq!(left.statistics(), right.statistics());
assert_eq!(left.offset_index_length(), right.offset_index_length());
assert_eq!(left.column_index_length(), right.column_index_length());
assert_eq!(
left.unencoded_byte_array_data_bytes(),
right.unencoded_byte_array_data_bytes()
);
}

fn check_row_groups_are_equivalent(left: &RowGroupMetaData, right: &RowGroupMetaData) {
assert_eq!(left.num_rows(), right.num_rows());
assert_eq!(left.file_offset(), right.file_offset());
assert_eq!(left.total_byte_size(), right.total_byte_size());
assert_eq!(left.schema_descr(), right.schema_descr());
assert_eq!(left.num_columns(), right.num_columns());
left.columns()
.iter()
.zip(right.columns().iter())
.for_each(|(lc, rc)| {
check_columns_are_equivalent(lc, rc);
});
}

#[tokio::test]
async fn test_encode_parquet_metadata_with_page_index() {
// Create a ParquetMetadata with page index information
let metadata = get_test_metadata(true, true);
assert!(has_page_index(&metadata.metadata));

let mut buf = BytesMut::new().writer();
{
let writer = ParquetMetaDataWriter::new(&mut buf, &metadata.metadata);
writer.finish().unwrap();
}

let data = buf.into_inner().freeze();

let decoded_metadata = load_metadata_from_bytes(data.len(), data).await;

// Because the page index offsets will differ, compare invariant parts of the metadata
assert_eq!(
metadata.metadata.file_metadata(),
decoded_metadata.file_metadata()
);
assert_eq!(
metadata.metadata.column_index(),
decoded_metadata.column_index()
);
assert_eq!(
metadata.metadata.offset_index(),
decoded_metadata.offset_index()
);
assert_eq!(
metadata.metadata.num_row_groups(),
decoded_metadata.num_row_groups()
);

// check that the mins and maxes are what we expect for each page
// also indirectly checking that the pages were written out as we expected them to be laid out
// (if they're not, or something gets refactored in the future that breaks that assumption,
// this test may have to drop down to a lower level and create metadata directly instead of relying on
// writing an entire file)
let column_indexes = metadata.metadata.column_index().unwrap();
assert_eq!(column_indexes.len(), 6);
// make sure each row group has 2 pages by checking the first column
// page counts for each column for each row group, should all be the same and there should be
// 12 pages in total across 6 row groups / 1 column
let mut page_counts = vec![];
for row_group in column_indexes {
for column in row_group {
match column {
Index::INT32(column_index) => {
page_counts.push(column_index.indexes.len());
}
_ => panic!("unexpected column index type"),
}
}
}
assert_eq!(page_counts, vec![2; 6]);

metadata
.metadata
.row_groups()
.iter()
.zip(decoded_metadata.row_groups().iter())
.for_each(|(left, right)| {
check_row_groups_are_equivalent(left, right);
});
}
}

0 comments on commit 9045968

Please sign in to comment.