Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
f93d36e
Add support for file row numbers in Parquet readers
jkylling Mar 16, 2025
e485c0b
Add Apache license header to row_number.rs
jkylling Mar 26, 2025
2a62009
Run cargo format
jkylling Mar 26, 2025
fb5126f
Change with_row_number_column to take impl Into<String>
jkylling Mar 27, 2025
5350728
Change Option<String> -> Option<&str> in build_array_reader
jkylling Mar 27, 2025
188f350
Replace ParquetError::RowGroupMetaDataMissingRowNumber with General
jkylling Mar 27, 2025
37a9d83
Split test_create_array_reader test into two
jkylling Mar 27, 2025
41e38fe
first_row_number -> first_row_index
jkylling Mar 28, 2025
1a1e6b6
Simplify RowNumberReader with iterators
jkylling May 11, 2025
bcad87f
Merge remote-tracking branch 'origin/main' into feature/parquet-reade…
vustef Oct 23, 2025
89c1fd1
add parquet-testing change from the merge
vustef Oct 23, 2025
b0d53d0
Fix test_arrow_reader_all_columns
vustef Oct 23, 2025
094ae81
Fix first_row_number
vustef Oct 23, 2025
a5858df
Rename to first_row_index consistently, remove Option.
vustef Oct 23, 2025
5e7d9a1
revert parquet-testing update
vustef Oct 23, 2025
54c22c6
Fix baselines in file::metadata::tests::test_memory_size
vustef Oct 23, 2025
f05d470
Fix encryption metadata and async tests. Those features and default f…
vustef Oct 23, 2025
11e4f39
RowNumber extension type
vustef Oct 24, 2025
d02c977
using supplied_schema works
vustef Oct 24, 2025
6fecc17
Don't modify parsing of parquet schema, virtual columns can only be a…
vustef Oct 24, 2025
1414421
Reworked with_virtual_columns in options
vustef Oct 27, 2025
07eb467
switch to ref to slice; cleanup with_row_number_columns; async tests …
vustef Oct 27, 2025
af0e0f9
Bring back optionality to first_row_index, for future consideration w…
vustef Oct 27, 2025
8bccd22
Reexport
vustef Oct 27, 2025
65679ba
reexport all within virtual_type
vustef Oct 27, 2025
968d461
pub mod virtual_type skipping experimental schema
vustef Oct 27, 2025
6144967
Switch back to `virtual_type::*` for now; fix warnings on cargo test
vustef Oct 27, 2025
3af3ad7
Fix `projected_fields` assertion in async reader
vustef Oct 27, 2025
fad0ea1
common virtual column struct
vustef Nov 6, 2025
ca6c7a6
assert that column is virtual
vustef Nov 6, 2025
da9245d
don't change pub API
vustef Nov 6, 2025
031c6d5
complex_schema rename
vustef Nov 6, 2025
079a78d
passing docstring test
vustef Nov 6, 2025
f2a4f45
Pass parquet metadata to array reader builder
vustef Nov 6, 2025
3933d8e
Add virtual fields outside of the visitor
vustef Nov 7, 2025
e5449e1
use parquet.virtual instead of arrow.virtual
vustef Nov 7, 2025
a2c55dc
more struct based approach to virtual type reuse
vustef Nov 7, 2025
688ce7b
Switch to directly implementing ExtensionType for RowNumber, no commo…
vustef Nov 7, 2025
8e7f668
Use FieldRef
vustef Nov 7, 2025
3aeced1
row number virtual_prefix sharing
vustef Nov 7, 2025
31679f1
RowNumber instead of RowNumber::default()
vustef Nov 7, 2025
83a20c6
Default ordinals
vustef Nov 7, 2025
f22e9f5
Merge branch 'main' of github.com:apache/arrow-rs into feature/parque…
vustef Nov 10, 2025
3e3b90f
merge fixes
vustef Nov 10, 2025
5db9113
Fix example
vustef Nov 10, 2025
b00373b
cargo fmt
vustef Nov 10, 2025
6651017
fix infinite loop
vustef Nov 10, 2025
5ff1cc9
cargo fmt -p parquet ...
vustef Nov 10, 2025
8da925c
Fix clippy too
vustef Nov 10, 2025
40db3d6
fix doctest too
vustef Nov 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions parquet/examples/read_with_rowgroup.rs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm yet to merge latest main, which has push decoder changes...

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ProjectionMask, parquet_to_arrow_field_levels};
use parquet::column::page::{PageIterator, PageReader};
use parquet::errors::{ParquetError, Result};
use parquet::file::metadata::RowGroupMetaData;
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use parquet::file::reader::{ChunkReader, Length};
use parquet::file::serialized_reader::SerializedPageReader;
use std::sync::Arc;
Expand All @@ -35,10 +35,11 @@ async fn main() -> Result<()> {
let mut file = File::open(&path).await.unwrap();

// The metadata could be cached in other places, this example only shows how to read
let metadata = file.get_metadata(None).await?;
let metadata = Arc::try_unwrap(file.get_metadata(None).await?).unwrap();

for rg in metadata.row_groups() {
let mut rowgroup = InMemoryRowGroup::create(rg.clone(), ProjectionMask::all());
for row_group_idx in 0..metadata.row_groups().len() {
let mut rowgroup =
InMemoryRowGroup::create(metadata.clone(), row_group_idx, ProjectionMask::all());
rowgroup.async_fetch_data(&mut file, None).await?;
let reader = rowgroup.build_reader(1024, None)?;

Expand Down Expand Up @@ -100,14 +101,15 @@ impl ChunkReader for ColumnChunkData {

#[derive(Clone)]
pub struct InMemoryRowGroup {
pub metadata: RowGroupMetaData,
metadata: ParquetMetaData,
row_group_idx: usize,
mask: ProjectionMask,
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
}

impl RowGroups for InMemoryRowGroup {
fn num_rows(&self) -> usize {
self.metadata.num_rows() as usize
self.row_group_metadata().num_rows() as usize
}

fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
Expand All @@ -118,7 +120,7 @@ impl RowGroups for InMemoryRowGroup {
Some(data) => {
let page_reader: Box<dyn PageReader> = Box::new(SerializedPageReader::new(
data.clone(),
self.metadata.column(i),
self.row_group_metadata().column(i),
self.num_rows(),
None,
)?);
Expand All @@ -129,26 +131,44 @@ impl RowGroups for InMemoryRowGroup {
}
}
}

fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
Box::new(std::iter::once(self.row_group_metadata()))
}

fn metadata(&self) -> &ParquetMetaData {
&self.metadata
}
}

impl InMemoryRowGroup {
pub fn create(metadata: RowGroupMetaData, mask: ProjectionMask) -> Self {
let column_chunks = metadata.columns().iter().map(|_| None).collect::<Vec<_>>();
pub fn create(metadata: ParquetMetaData, row_group_idx: usize, mask: ProjectionMask) -> Self {
let column_chunks = metadata
.row_group(row_group_idx)
.columns()
.iter()
.map(|_| None)
.collect::<Vec<_>>();

Self {
metadata,
row_group_idx,
mask,
column_chunks,
}
}

pub fn row_group_metadata(&self) -> &RowGroupMetaData {
self.metadata.row_group(self.row_group_idx)
}

pub fn build_reader(
&self,
batch_size: usize,
selection: Option<RowSelection>,
) -> Result<ParquetRecordBatchReader> {
let levels = parquet_to_arrow_field_levels(
&self.metadata.schema_descr_ptr(),
&self.row_group_metadata().schema_descr_ptr(),
self.mask.clone(),
None,
)?;
Expand All @@ -163,7 +183,7 @@ impl InMemoryRowGroup {
_selection: Option<&RowSelection>,
) -> Result<()> {
let mut vs = std::mem::take(&mut self.column_chunks);
for (leaf_idx, meta) in self.metadata.columns().iter().enumerate() {
for (leaf_idx, meta) in self.row_group_metadata().columns().iter().enumerate() {
if self.mask.leaf_included(leaf_idx) {
let (start, len) = meta.byte_range();
let data = reader.get_bytes(start..(start + len)).await?;
Expand Down
74 changes: 71 additions & 3 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::{Arc, Mutex};

use arrow_schema::{DataType, Fields, SchemaBuilder};
use std::sync::{Arc, Mutex};

use crate::arrow::ProjectionMask;
use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
Expand All @@ -26,16 +25,18 @@ use crate::arrow::array_reader::cached_array_reader::CachedArrayReader;
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
use crate::arrow::array_reader::row_group_cache::RowGroupCache;
use crate::arrow::array_reader::row_number::RowNumberReader;
use crate::arrow::array_reader::{
ArrayReader, FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
PrimitiveArrayReader, RowGroups, StructArrayReader, make_byte_array_dictionary_reader,
make_byte_array_reader,
};
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use crate::arrow::schema::{ParquetField, ParquetFieldType};
use crate::arrow::schema::{ParquetField, ParquetFieldType, VirtualColumnType};
use crate::basic::Type as PhysicalType;
use crate::data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::ParquetMetaData;
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};

/// Builder for [`CacheOptions`]
Expand Down Expand Up @@ -89,6 +90,8 @@ pub struct ArrayReaderBuilder<'a> {
row_groups: &'a dyn RowGroups,
/// Optional cache options for the array reader
cache_options: Option<&'a CacheOptions<'a>>,
/// Parquet metadata for computing virtual column values
parquet_metadata: Option<&'a ParquetMetaData>,
/// metrics
metrics: &'a ArrowReaderMetrics,
}
Expand All @@ -98,6 +101,7 @@ impl<'a> ArrayReaderBuilder<'a> {
Self {
row_groups,
cache_options: None,
parquet_metadata: None,
metrics,
}
}
Expand All @@ -108,6 +112,12 @@ impl<'a> ArrayReaderBuilder<'a> {
self
}

/// Add parquet metadata to the builder for computing virtual column values
pub fn with_parquet_metadata(mut self, parquet_metadata: &'a ParquetMetaData) -> Self {
self.parquet_metadata = Some(parquet_metadata);
self
}

/// Create [`ArrayReader`] from parquet schema, projection mask, and parquet file reader.
pub fn build_array_reader(
&self,
Expand Down Expand Up @@ -153,6 +163,13 @@ impl<'a> ArrayReaderBuilder<'a> {
Ok(Some(reader))
}
}
ParquetFieldType::Virtual(virtual_type) => {
// Virtual columns don't have data in the parquet file
// They need to be built by specialized readers
match virtual_type {
VirtualColumnType::RowNumber => Ok(Some(self.build_row_number_reader()?)),
}
}
ParquetFieldType::Group { .. } => match &field.arrow_type {
DataType::Map(_, _) => self.build_map_reader(field, mask),
DataType::Struct(_) => self.build_struct_reader(field, mask),
Expand All @@ -164,6 +181,18 @@ impl<'a> ArrayReaderBuilder<'a> {
}
}

fn build_row_number_reader(&self) -> Result<Box<dyn ArrayReader>> {
let parquet_metadata = self.parquet_metadata.ok_or_else(|| {
ParquetError::General(
"ParquetMetaData is required to read virtual row number columns.".to_string(),
)
})?;
Ok(Box::new(RowNumberReader::try_new(
parquet_metadata,
self.row_groups.row_groups(),
)?))
}

/// Build array reader for map type.
fn build_map_reader(
&self,
Expand Down Expand Up @@ -439,6 +468,7 @@ impl<'a> ArrayReaderBuilder<'a> {
mod tests {
use super::*;
use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
use crate::arrow::schema::virtual_type::RowNumber;
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::util::test_common::file_util::get_test_file;
use arrow::datatypes::Field;
Expand All @@ -455,6 +485,7 @@ mod tests {
file_metadata.schema_descr(),
ProjectionMask::all(),
file_metadata.key_value_metadata(),
&[],
)
.unwrap();

Expand All @@ -472,4 +503,41 @@ mod tests {

assert_eq!(array_reader.get_data_type(), &arrow_type);
}

#[test]
fn test_create_array_reader_with_row_numbers() {
let file = get_test_file("nulls.snappy.parquet");
let file_reader: Arc<dyn FileReader> = Arc::new(SerializedFileReader::new(file).unwrap());

let file_metadata = file_reader.metadata().file_metadata();
let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
let row_number_field = Arc::new(
Field::new("row_number", DataType::Int64, false).with_extension_type(RowNumber),
);
let (_, fields) = parquet_to_arrow_schema_and_fields(
file_metadata.schema_descr(),
ProjectionMask::all(),
file_metadata.key_value_metadata(),
std::slice::from_ref(&row_number_field),
)
.unwrap();

let metrics = ArrowReaderMetrics::disabled();
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
.with_parquet_metadata(file_reader.metadata())
.build_array_reader(fields.as_ref(), &mask)
.unwrap();

// Create arrow types
let arrow_type = DataType::Struct(Fields::from(vec![
Field::new(
"b_struct",
DataType::Struct(vec![Field::new("b_c_int", DataType::Int32, true)].into()),
true,
),
(*row_number_field).clone(),
]));

assert_eq!(array_reader.get_data_type(), &arrow_type);
}
}
1 change: 1 addition & 0 deletions parquet/src/arrow/array_reader/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ mod tests {
schema,
ProjectionMask::all(),
file_metadata.key_value_metadata(),
&[],
)
.unwrap();

Expand Down
22 changes: 20 additions & 2 deletions parquet/src/arrow/array_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::record_reader::buffer::ValuesBuffer;
use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::file::metadata::ParquetMetaData;
use crate::file::reader::{FilePageIterator, FileReader};

mod builder;
Expand All @@ -42,12 +43,13 @@ mod map_array;
mod null_array;
mod primitive_array;
mod row_group_cache;
mod row_number;
mod struct_array;

#[cfg(test)]
mod test_util;

// Note that this crate is public under the `experimental` feature flag.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't remove this comment

use crate::file::metadata::RowGroupMetaData;
pub use builder::{ArrayReaderBuilder, CacheOptions, CacheOptionsBuilder};
pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;
Expand Down Expand Up @@ -139,17 +141,33 @@ pub trait RowGroups {
/// Returns a [`PageIterator`] for all pages in the specified column chunk
/// across all row groups in this collection.
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;

/// Returns an iterator over the row groups in this collection
fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_>;

/// Returns the parquet metadata
fn metadata(&self) -> &ParquetMetaData;
}

impl RowGroups for Arc<dyn FileReader> {
fn num_rows(&self) -> usize {
self.metadata().file_metadata().num_rows() as usize
FileReader::metadata(self.as_ref())
.file_metadata()
.num_rows() as usize
}

fn column_chunks(&self, column_index: usize) -> Result<Box<dyn PageIterator>> {
let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
Ok(Box::new(iterator))
}

fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
Box::new(FileReader::metadata(self.as_ref()).row_groups().iter())
}

fn metadata(&self) -> &ParquetMetaData {
FileReader::metadata(self.as_ref())
}
}

/// Uses `record_reader` to read up to `batch_size` records from `pages`
Expand Down
Loading
Loading