Skip to content

Commit

Permalink
Assume Pages Delimit Records When Offset Index Loaded (#4921) (#4943)
Browse files Browse the repository at this point in the history
* Assume records not split across pages (#4921)

* More test

* Add PageReader::at_record_boundary

* Fix flush partial
  • Loading branch information
tustvold authored Oct 17, 2023
1 parent ab87abd commit d4d11fe
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 7 deletions.
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ where
Ok(records_read)
}

/// Uses `record_reader` to skip up to `batch_size` records from`pages`
/// Uses `record_reader` to skip up to `batch_size` records from `pages`
///
/// Returns the number of records skipped, which can be less than `batch_size` if
/// pages is exhausted
Expand Down
96 changes: 94 additions & 2 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -878,12 +878,17 @@ mod tests {
use crate::file::properties::WriterProperties;
use arrow::compute::kernels::cmp::eq;
use arrow::error::Result as ArrowResult;
use arrow_array::builder::{ListBuilder, StringBuilder};
use arrow_array::cast::AsArray;
use arrow_array::types::Int32Type;
use arrow_array::{Array, ArrayRef, Int32Array, Int8Array, Scalar, StringArray};
use futures::TryStreamExt;
use arrow_array::{
Array, ArrayRef, Int32Array, Int8Array, Scalar, StringArray, UInt64Array,
};
use arrow_schema::{DataType, Field, Schema};
use futures::{StreamExt, TryStreamExt};
use rand::{thread_rng, Rng};
use std::sync::Mutex;
use tempfile::tempfile;

#[derive(Clone)]
struct TestReader {
Expand Down Expand Up @@ -1677,4 +1682,91 @@ mod tests {
assert!(sbbf.check(&"Hello"));
assert!(!sbbf.check(&"Hello_Not_Exists"));
}

#[tokio::test]
async fn test_nested_skip() {
let schema = Arc::new(Schema::new(vec![
Field::new("col_1", DataType::UInt64, false),
Field::new_list("col_2", Field::new("item", DataType::Utf8, true), true),
]));

// Default writer properties
let props = WriterProperties::builder()
.set_data_page_row_count_limit(256)
.set_write_batch_size(256)
.set_max_row_group_size(1024);

// Write data
let mut file = tempfile().unwrap();
let mut writer =
ArrowWriter::try_new(&mut file, schema.clone(), Some(props.build())).unwrap();

let mut builder = ListBuilder::new(StringBuilder::new());
for id in 0..1024 {
match id % 3 {
0 => builder
.append_value([Some("val_1".to_string()), Some(format!("id_{id}"))]),
1 => builder.append_value([Some(format!("id_{id}"))]),
_ => builder.append_null(),
}
}
let refs = vec![
Arc::new(UInt64Array::from_iter_values(0..1024)) as ArrayRef,
Arc::new(builder.finish()) as ArrayRef,
];

let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

let selections = [
RowSelection::from(vec![
RowSelector::skip(313),
RowSelector::select(1),
RowSelector::skip(709),
RowSelector::select(1),
]),
RowSelection::from(vec![
RowSelector::skip(255),
RowSelector::select(1),
RowSelector::skip(767),
RowSelector::select(1),
]),
RowSelection::from(vec![
RowSelector::select(255),
RowSelector::skip(1),
RowSelector::select(767),
RowSelector::skip(1),
]),
RowSelection::from(vec![
RowSelector::skip(254),
RowSelector::select(1),
RowSelector::select(1),
RowSelector::skip(767),
RowSelector::select(1),
]),
];

for selection in selections {
let expected = selection.row_count();
// Read data
let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
tokio::fs::File::from_std(file.try_clone().unwrap()),
ArrowReaderOptions::new().with_page_index(true),
)
.await
.unwrap();

reader = reader.with_row_selection(selection);

let mut stream = reader.build().unwrap();

let mut total_rows = 0;
while let Some(rb) = stream.next().await {
let rb = rb.unwrap();
total_rows += rb.num_rows();
}
assert_eq!(total_rows, expected);
}
}
}
14 changes: 14 additions & 0 deletions parquet/src/column/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,20 @@ pub trait PageReader: Iterator<Item = Result<Page>> + Send {
/// Skips reading the next page, returns an error if no
/// column index information
fn skip_next_page(&mut self) -> Result<()>;

/// Returns `true` if the next page can be assumed to contain the start of a new record
///
/// Prior to parquet V2 the specification was ambiguous as to whether a single record
/// could be split across multiple pages, and prior to [(#4327)] the Rust writer would do
/// this in certain situations. However, correctly interpreting the offset index relies on
/// this assumption holding [(#4943)], and so this mechanism is provided for a [`PageReader`]
/// to signal this to the calling context
///
/// [(#4327)]: https://github.com/apache/arrow-rs/pull/4327
/// [(#4943)]: https://github.com/apache/arrow-rs/pull/4943
fn at_record_boundary(&mut self) -> Result<bool> {
Ok(self.peek_next_page()?.is_none())
}
}

/// API for writing pages in a column chunk.
Expand Down
8 changes: 4 additions & 4 deletions parquet/src/column/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ where
// Reached end of page, which implies records_read < remaining_records
// as otherwise would have stopped reading before reaching the end
assert!(records_read < remaining_records); // Sanity check
records_read += 1;
records_read += reader.flush_partial() as usize;
}
(records_read, levels_read)
}
Expand Down Expand Up @@ -380,7 +380,7 @@ where
// Reached end of page, which implies records_read < remaining_records
// as otherwise would have stopped reading before reaching the end
assert!(records_read < remaining_records); // Sanity check
records_read += 1;
records_read += decoder.flush_partial() as usize;
}

(records_read, levels_read)
Expand Down Expand Up @@ -491,7 +491,7 @@ where
offset += bytes_read;

self.has_record_delimiter =
self.page_reader.peek_next_page()?.is_none();
self.page_reader.at_record_boundary()?;

self.rep_level_decoder
.as_mut()
Expand Down Expand Up @@ -548,7 +548,7 @@ where
// across multiple pages, however, the parquet writer
// used to do this so we preserve backwards compatibility
self.has_record_delimiter =
self.page_reader.peek_next_page()?.is_none();
self.page_reader.at_record_boundary()?;

self.rep_level_decoder.as_mut().unwrap().set_data(
Encoding::RLE,
Expand Down
7 changes: 7 additions & 0 deletions parquet/src/column/reader/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
num_records: usize,
num_levels: usize,
) -> Result<(usize, usize)>;

/// Flush any partially read or skipped record
fn flush_partial(&mut self) -> bool;
}

pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
Expand Down Expand Up @@ -519,6 +522,10 @@ impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl {
}
Ok((total_records_read, total_levels_read))
}

fn flush_partial(&mut self) -> bool {
std::mem::take(&mut self.has_partial)
}
}

#[cfg(test)]
Expand Down
9 changes: 9 additions & 0 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,15 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
}
}
}

fn at_record_boundary(&mut self) -> Result<bool> {
match &mut self.state {
SerializedPageReaderState::Values { .. } => {
Ok(self.peek_next_page()?.is_none())
}
SerializedPageReaderState::Pages { .. } => Ok(true),
}
}
}

#[cfg(test)]
Expand Down

0 comments on commit d4d11fe

Please sign in to comment.