Skip to content

Commit

Permalink
Detect missing page indexes while reading Parquet metadata (#6507)
Browse files Browse the repository at this point in the history
* detect missing page indexes on read

* lint

* fix overlap test

* add some clarifying comments per review suggestion
  • Loading branch information
etseidl authored Oct 3, 2024
1 parent b9cf3c5 commit c039762
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 6 deletions.
60 changes: 60 additions & 0 deletions parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,63 @@ pub fn parquet_column<'a>(
.find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
Some((parquet_idx, field))
}

#[cfg(test)]
mod test {
use crate::arrow::ArrowWriter;
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter};
use crate::file::properties::{EnabledStatistics, WriterProperties};
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use bytes::Bytes;
use std::sync::Arc;

#[test]
// Reproducer for https://github.com/apache/arrow-rs/issues/6464
fn test_metadata_read_write_partial_offset() {
let parquet_bytes = create_parquet_file();

// read the metadata from the file WITHOUT the page index structures
let original_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&parquet_bytes)
.unwrap();

// this should error because the page indexes are not present, but have offsets specified
let metadata_bytes = metadata_to_bytes(&original_metadata);
let err = ParquetMetaDataReader::new()
.with_page_indexes(true) // there are no page indexes in the metadata
.parse_and_finish(&metadata_bytes)
.err()
.unwrap();
assert_eq!(
err.to_string(),
"EOF: Parquet file too small. Page index range 82..115 overlaps with file metadata 0..341"
);
}

/// Write a parquet filed into an in memory buffer
fn create_parquet_file() -> Bytes {
let mut buf = vec![];
let data = vec![100, 200, 201, 300, 102, 33];
let array: ArrayRef = Arc::new(Int32Array::from(data));
let batch = RecordBatch::try_from_iter(vec![("id", array)]).unwrap();
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.build();

let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
drop(writer);

Bytes::from(buf)
}

/// Serializes `ParquetMetaData` into a memory buffer, using `ParquetMetadataWriter
fn metadata_to_bytes(metadata: &ParquetMetaData) -> Bytes {
let mut buf = vec![];
ParquetMetaDataWriter::new(&mut buf, metadata)
.finish()
.unwrap();
Bytes::from(buf)
}
}
34 changes: 28 additions & 6 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ pub struct ParquetMetaDataReader {
column_index: bool,
offset_index: bool,
prefetch_hint: Option<usize>,
// Size of the serialized thrift metadata plus the 8 byte footer. Only set if
// `self.parse_metadata` is called.
metadata_size: Option<usize>,
}

impl ParquetMetaDataReader {
Expand Down Expand Up @@ -195,7 +198,7 @@ impl ParquetMetaDataReader {
/// let metadata = reader.finish().unwrap();
/// ```
pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: usize) -> Result<()> {
self.metadata = match Self::parse_metadata(reader) {
self.metadata = match self.parse_metadata(reader) {
Ok(metadata) => Some(metadata),
// FIXME: throughout this module ParquetError::IndexOutOfBound is used to indicate the
// need for more data. This is not it's intended use. The plan is to add a NeedMoreData
Expand Down Expand Up @@ -282,6 +285,19 @@ impl ParquetMetaDataReader {
}
}

// Perform extra sanity check to make sure `range` and the footer metadata don't
// overlap.
if let Some(metadata_size) = self.metadata_size {
let metadata_range = file_size.saturating_sub(metadata_size)..file_size;
if range.end > metadata_range.start {
return Err(eof_err!(
"Parquet file too small. Page index range {:?} overlaps with file metadata {:?}",
range,
metadata_range
));
}
}

let bytes_needed = range.end - range.start;
let bytes = reader.get_bytes((range.start - file_range.start) as u64, bytes_needed)?;
let offset = range.start;
Expand Down Expand Up @@ -455,8 +471,9 @@ impl ParquetMetaDataReader {
range
}

// one-shot parse of footer
fn parse_metadata<R: ChunkReader>(chunk_reader: &R) -> Result<ParquetMetaData> {
// One-shot parse of footer.
// Side effect: this will set `self.metadata_size`
fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
// check file is large enough to hold footer
let file_size = chunk_reader.len();
if file_size < (FOOTER_SIZE as u64) {
Expand All @@ -473,6 +490,7 @@ impl ParquetMetaDataReader {

let metadata_len = Self::decode_footer(&footer)?;
let footer_metadata_len = FOOTER_SIZE + metadata_len;
self.metadata_size = Some(footer_metadata_len);

if footer_metadata_len > file_size as usize {
return Err(ParquetError::IndexOutOfBound(
Expand Down Expand Up @@ -654,14 +672,16 @@ mod tests {
#[test]
fn test_parse_metadata_size_smaller_than_footer() {
let test_file = tempfile::tempfile().unwrap();
let err = ParquetMetaDataReader::parse_metadata(&test_file).unwrap_err();
let err = ParquetMetaDataReader::new()
.parse_metadata(&test_file)
.unwrap_err();
assert!(matches!(err, ParquetError::IndexOutOfBound(8, _)));
}

#[test]
fn test_parse_metadata_corrupt_footer() {
let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
let reader_result = ParquetMetaDataReader::parse_metadata(&data);
let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
assert_eq!(
reader_result.unwrap_err().to_string(),
"Parquet error: Invalid Parquet file. Corrupt footer"
Expand All @@ -671,7 +691,9 @@ mod tests {
#[test]
fn test_parse_metadata_invalid_start() {
let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
let err = ParquetMetaDataReader::parse_metadata(&test_file).unwrap_err();
let err = ParquetMetaDataReader::new()
.parse_metadata(&test_file)
.unwrap_err();
assert!(matches!(err, ParquetError::IndexOutOfBound(263, _)));
}

Expand Down

0 comments on commit c039762

Please sign in to comment.