diff --git a/parquet/examples/read_with_rowgroup.rs b/parquet/examples/read_with_rowgroup.rs index a4c5fdc04d36..e3c714c2807a 100644 --- a/parquet/examples/read_with_rowgroup.rs +++ b/parquet/examples/read_with_rowgroup.rs @@ -22,7 +22,7 @@ use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ProjectionMask, parquet_to_arrow_field_levels}; use parquet::column::page::{PageIterator, PageReader}; use parquet::errors::{ParquetError, Result}; -use parquet::file::metadata::RowGroupMetaData; +use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; use parquet::file::reader::{ChunkReader, Length}; use parquet::file::serialized_reader::SerializedPageReader; use std::sync::Arc; @@ -35,10 +35,11 @@ async fn main() -> Result<()> { let mut file = File::open(&path).await.unwrap(); // The metadata could be cached in other places, this example only shows how to read - let metadata = file.get_metadata(None).await?; + let metadata = Arc::try_unwrap(file.get_metadata(None).await?).unwrap(); - for rg in metadata.row_groups() { - let mut rowgroup = InMemoryRowGroup::create(rg.clone(), ProjectionMask::all()); + for row_group_idx in 0..metadata.row_groups().len() { + let mut rowgroup = + InMemoryRowGroup::create(metadata.clone(), row_group_idx, ProjectionMask::all()); rowgroup.async_fetch_data(&mut file, None).await?; let reader = rowgroup.build_reader(1024, None)?; @@ -100,14 +101,15 @@ impl ChunkReader for ColumnChunkData { #[derive(Clone)] pub struct InMemoryRowGroup { - pub metadata: RowGroupMetaData, + metadata: ParquetMetaData, + row_group_idx: usize, mask: ProjectionMask, column_chunks: Vec>>, } impl RowGroups for InMemoryRowGroup { fn num_rows(&self) -> usize { - self.metadata.num_rows() as usize + self.row_group_metadata().num_rows() as usize } fn column_chunks(&self, i: usize) -> Result> { @@ -118,7 +120,7 @@ impl RowGroups for InMemoryRowGroup { Some(data) => { let page_reader: Box = Box::new(SerializedPageReader::new( data.clone(), - self.metadata.column(i), + self.row_group_metadata().column(i), self.num_rows(), None, )?); @@ -129,26 +131,44 @@ impl RowGroups for InMemoryRowGroup { } } } + + fn row_groups(&self) -> Box + '_> { + Box::new(std::iter::once(self.row_group_metadata())) + } + + fn metadata(&self) -> &ParquetMetaData { + &self.metadata + } } impl InMemoryRowGroup { - pub fn create(metadata: RowGroupMetaData, mask: ProjectionMask) -> Self { - let column_chunks = metadata.columns().iter().map(|_| None).collect::>(); + pub fn create(metadata: ParquetMetaData, row_group_idx: usize, mask: ProjectionMask) -> Self { + let column_chunks = metadata + .row_group(row_group_idx) + .columns() + .iter() + .map(|_| None) + .collect::>(); Self { metadata, + row_group_idx, mask, column_chunks, } } + pub fn row_group_metadata(&self) -> &RowGroupMetaData { + self.metadata.row_group(self.row_group_idx) + } + pub fn build_reader( &self, batch_size: usize, selection: Option, ) -> Result { let levels = parquet_to_arrow_field_levels( - &self.metadata.schema_descr_ptr(), + &self.row_group_metadata().schema_descr_ptr(), self.mask.clone(), None, )?; @@ -163,7 +183,7 @@ impl InMemoryRowGroup { _selection: Option<&RowSelection>, ) -> Result<()> { let mut vs = std::mem::take(&mut self.column_chunks); - for (leaf_idx, meta) in self.metadata.columns().iter().enumerate() { + for (leaf_idx, meta) in self.row_group_metadata().columns().iter().enumerate() { if self.mask.leaf_included(leaf_idx) { let (start, len) = meta.byte_range(); let data = reader.get_bytes(start..(start + len)).await?; diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 6107ba4f2575..82c8e77f6393 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -26,16 +26,18 @@ use crate::arrow::array_reader::cached_array_reader::CachedArrayReader; use crate::arrow::array_reader::empty_array::make_empty_array_reader; use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader; use crate::arrow::array_reader::row_group_cache::RowGroupCache; +use crate::arrow::array_reader::row_number::RowNumberReader; use crate::arrow::array_reader::{ ArrayReader, FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader, PrimitiveArrayReader, RowGroups, StructArrayReader, make_byte_array_dictionary_reader, make_byte_array_reader, }; use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; -use crate::arrow::schema::{ParquetField, ParquetFieldType}; +use crate::arrow::schema::{ParquetField, ParquetFieldType, VirtualColumnType}; use crate::basic::Type as PhysicalType; use crate::data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type}; use crate::errors::{ParquetError, Result}; +use crate::file::metadata::ParquetMetaData; use crate::schema::types::{ColumnDescriptor, ColumnPath, Type}; /// Builder for [`CacheOptions`] @@ -89,6 +91,8 @@ pub struct ArrayReaderBuilder<'a> { row_groups: &'a dyn RowGroups, /// Optional cache options for the array reader cache_options: Option<&'a CacheOptions<'a>>, + /// Parquet metadata for computing virtual column values + parquet_metadata: Option<&'a ParquetMetaData>, /// metrics metrics: &'a ArrowReaderMetrics, } @@ -98,6 +102,7 @@ impl<'a> ArrayReaderBuilder<'a> { Self { row_groups, cache_options: None, + parquet_metadata: None, metrics, } } @@ -108,6 +113,12 @@ impl<'a> ArrayReaderBuilder<'a> { self } + /// Add parquet metadata to the builder for computing virtual column values + pub fn with_parquet_metadata(mut self, parquet_metadata: &'a ParquetMetaData) -> Self { + self.parquet_metadata = Some(parquet_metadata); + self + } + /// Create [`ArrayReader`] from parquet schema, projection mask, and parquet file reader. pub fn build_array_reader( &self, @@ -153,6 +164,13 @@ impl<'a> ArrayReaderBuilder<'a> { Ok(Some(reader)) } } + ParquetFieldType::Virtual(virtual_type) => { + // Virtual columns don't have data in the parquet file + // They need to be built by specialized readers + match virtual_type { + VirtualColumnType::RowNumber => Ok(Some(self.build_row_number_reader()?)), + } + } ParquetFieldType::Group { .. } => match &field.arrow_type { DataType::Map(_, _) => self.build_map_reader(field, mask), DataType::Struct(_) => self.build_struct_reader(field, mask), @@ -164,6 +182,18 @@ impl<'a> ArrayReaderBuilder<'a> { } } + fn build_row_number_reader(&self) -> Result> { + let parquet_metadata = self.parquet_metadata.ok_or_else(|| { + ParquetError::General( + "ParquetMetaData is required to read virtual row number columns.".to_string(), + ) + })?; + Ok(Box::new(RowNumberReader::try_new( + parquet_metadata, + self.row_groups.row_groups(), + )?)) + } + /// Build array reader for map type. fn build_map_reader( &self, @@ -439,6 +469,7 @@ impl<'a> ArrayReaderBuilder<'a> { mod tests { use super::*; use crate::arrow::schema::parquet_to_arrow_schema_and_fields; + use crate::arrow::schema::virtual_type::RowNumber; use crate::file::reader::{FileReader, SerializedFileReader}; use crate::util::test_common::file_util::get_test_file; use arrow::datatypes::Field; @@ -455,6 +486,7 @@ mod tests { file_metadata.schema_descr(), ProjectionMask::all(), file_metadata.key_value_metadata(), + &[], ) .unwrap(); @@ -472,4 +504,41 @@ mod tests { assert_eq!(array_reader.get_data_type(), &arrow_type); } + + #[test] + fn test_create_array_reader_with_row_numbers() { + let file = get_test_file("nulls.snappy.parquet"); + let file_reader: Arc = Arc::new(SerializedFileReader::new(file).unwrap()); + + let file_metadata = file_reader.metadata().file_metadata(); + let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]); + let row_number_field = Arc::new( + Field::new("row_number", DataType::Int64, false).with_extension_type(RowNumber), + ); + let (_, fields) = parquet_to_arrow_schema_and_fields( + file_metadata.schema_descr(), + ProjectionMask::all(), + file_metadata.key_value_metadata(), + std::slice::from_ref(&row_number_field), + ) + .unwrap(); + + let metrics = ArrowReaderMetrics::disabled(); + let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics) + .with_parquet_metadata(file_reader.metadata()) + .build_array_reader(fields.as_ref(), &mask) + .unwrap(); + + // Create arrow types + let arrow_type = DataType::Struct(Fields::from(vec![ + Field::new( + "b_struct", + DataType::Struct(vec![Field::new("b_c_int", DataType::Int32, true)].into()), + true, + ), + (*row_number_field).clone(), + ])); + + assert_eq!(array_reader.get_data_type(), &arrow_type); + } } diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 487c2bdd56cd..ff1b414c27bb 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -561,6 +561,7 @@ mod tests { schema, ProjectionMask::all(), file_metadata.key_value_metadata(), + &[], ) .unwrap(); diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index b3595e58d695..54be89f23084 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -27,6 +27,7 @@ use crate::arrow::record_reader::GenericRecordReader; use crate::arrow::record_reader::buffer::ValuesBuffer; use crate::column::page::PageIterator; use crate::column::reader::decoder::ColumnValueDecoder; +use crate::file::metadata::ParquetMetaData; use crate::file::reader::{FilePageIterator, FileReader}; mod builder; @@ -42,12 +43,14 @@ mod map_array; mod null_array; mod primitive_array; mod row_group_cache; +mod row_number; mod struct_array; #[cfg(test)] mod test_util; // Note that this crate is public under the `experimental` feature flag. +use crate::file::metadata::RowGroupMetaData; pub use builder::{ArrayReaderBuilder, CacheOptions, CacheOptionsBuilder}; pub use byte_array::make_byte_array_reader; pub use byte_array_dictionary::make_byte_array_dictionary_reader; @@ -139,17 +142,35 @@ pub trait RowGroups { /// Returns a [`PageIterator`] for all pages in the specified column chunk /// across all row groups in this collection. fn column_chunks(&self, i: usize) -> Result>; + + /// Returns an iterator over the row groups in this collection + /// + /// Note this may not include all row groups in [`Self::metadata`]. + fn row_groups(&self) -> Box + '_>; + + /// Returns the parquet metadata + fn metadata(&self) -> &ParquetMetaData; } impl RowGroups for Arc { fn num_rows(&self) -> usize { - self.metadata().file_metadata().num_rows() as usize + FileReader::metadata(self.as_ref()) + .file_metadata() + .num_rows() as usize } fn column_chunks(&self, column_index: usize) -> Result> { let iterator = FilePageIterator::new(column_index, Arc::clone(self))?; Ok(Box::new(iterator)) } + + fn row_groups(&self) -> Box + '_> { + Box::new(FileReader::metadata(self.as_ref()).row_groups().iter()) + } + + fn metadata(&self) -> &ParquetMetaData { + FileReader::metadata(self.as_ref()) + } } /// Uses `record_reader` to read up to `batch_size` records from `pages` diff --git a/parquet/src/arrow/array_reader/row_number.rs b/parquet/src/arrow/array_reader/row_number.rs new file mode 100644 index 000000000000..e8116b2936b8 --- /dev/null +++ b/parquet/src/arrow/array_reader/row_number.rs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::ArrayReader; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; +use arrow_array::{ArrayRef, Int64Array}; +use arrow_schema::DataType; +use std::any::Any; +use std::collections::HashSet; +use std::sync::Arc; + +pub(crate) struct RowNumberReader { + buffered_row_numbers: Vec, + remaining_row_numbers: std::iter::Flatten>>, +} + +impl RowNumberReader { + pub(crate) fn try_new<'a>( + parquet_metadata: &'a ParquetMetaData, + row_groups: impl Iterator, + ) -> Result { + // Collect ordinals from the selected row groups + let selected_ordinals: HashSet = row_groups + .map(|rg| { + rg.ordinal().ok_or_else(|| { + ParquetError::General( + "Row group missing ordinal field, required to compute row numbers" + .to_string(), + ) + }) + }) + .collect::>()?; + + // Iterate through all row groups once, computing first_row_index and creating ranges + // This is O(M) where M is total row groups, much better than O(N * O) where N is selected + let mut first_row_index: i64 = 0; + let mut ranges = Vec::new(); + + for rg in parquet_metadata.row_groups() { + if let Some(ordinal) = rg.ordinal() { + if selected_ordinals.contains(&ordinal) { + ranges.push((ordinal, first_row_index..first_row_index + rg.num_rows())); + } + } + first_row_index += rg.num_rows(); + } + + // Sort ranges by ordinal to maintain original row group order + ranges.sort_by_key(|(ordinal, _)| *ordinal); + let ranges: Vec<_> = ranges.into_iter().map(|(_, range)| range).collect(); + + Ok(Self { + buffered_row_numbers: Vec::new(), + remaining_row_numbers: ranges.into_iter().flatten(), + }) + } +} + +impl ArrayReader for RowNumberReader { + fn read_records(&mut self, batch_size: usize) -> Result { + let starting_len = self.buffered_row_numbers.len(); + self.buffered_row_numbers + .extend((&mut self.remaining_row_numbers).take(batch_size)); + Ok(self.buffered_row_numbers.len() - starting_len) + } + + fn skip_records(&mut self, num_records: usize) -> Result { + // TODO: Use advance_by when it stabilizes to improve performance + Ok((&mut self.remaining_row_numbers).take(num_records).count()) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &DataType { + &DataType::Int64 + } + + fn consume_batch(&mut self) -> Result { + Ok(Arc::new(Int64Array::from_iter( + self.buffered_row_numbers.drain(..), + ))) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + None + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + None + } +} diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 6122b861f488..01866784f0ef 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -20,7 +20,7 @@ use arrow_array::Array; use arrow_array::cast::AsArray; use arrow_array::{RecordBatch, RecordBatchReader}; -use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef}; +use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef}; pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter}; pub use selection::{RowSelection, RowSelector}; use std::fmt::{Debug, Formatter}; @@ -28,8 +28,10 @@ use std::sync::Arc; pub use crate::arrow::array_reader::RowGroups; use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder}; -use crate::arrow::schema::{ParquetField, parquet_to_arrow_schema_and_fields}; -use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels}; +use crate::arrow::schema::{ + ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column, +}; +use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual}; use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; use crate::bloom_filter::{ SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset, @@ -40,6 +42,7 @@ use crate::encryption::decrypt::FileDecryptionProperties; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader, + RowGroupMetaData, }; use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; @@ -411,6 +414,8 @@ pub struct ArrowReaderOptions { /// If encryption is enabled, the file decryption properties can be provided #[cfg(feature = "encryption")] pub(crate) file_decryption_properties: Option>, + + virtual_columns: Vec, } impl ArrowReaderOptions { @@ -551,6 +556,73 @@ impl ArrowReaderOptions { } } + /// Include virtual columns in the output. + /// + /// Virtual columns are columns that are not part of the Parquet schema, but are added to the output by the reader such as row numbers. + /// + /// # Example + /// ``` + /// # use std::sync::Arc; + /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch}; + /// # use arrow_schema::{DataType, Field, Schema}; + /// # use parquet::arrow::{ArrowWriter, RowNumber}; + /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder}; + /// # use tempfile::tempfile; + /// # + /// # fn main() -> Result<(), Box> { + /// // Create a simple record batch with some data + /// let values = Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef; + /// let batch = RecordBatch::try_from_iter(vec![("value", values)])?; + /// + /// // Write the batch to a temporary parquet file + /// let file = tempfile()?; + /// let mut writer = ArrowWriter::try_new( + /// file.try_clone()?, + /// batch.schema(), + /// None + /// )?; + /// writer.write(&batch)?; + /// writer.close()?; + /// + /// // Create a virtual column for row numbers + /// let row_number_field = Arc::new(Field::new("row_number", DataType::Int64, false) + /// .with_extension_type(RowNumber)); + /// + /// // Configure options with virtual columns + /// let options = ArrowReaderOptions::new() + /// .with_virtual_columns(vec![row_number_field]); + /// + /// // Create a reader with the options + /// let mut reader = ParquetRecordBatchReaderBuilder::try_new_with_options( + /// file, + /// options + /// )? + /// .build()?; + /// + /// // Read the batch - it will include both the original column and the virtual row_number column + /// let result_batch = reader.next().unwrap()?; + /// assert_eq!(result_batch.num_columns(), 2); // "value" + "row_number" + /// assert_eq!(result_batch.num_rows(), 3); + /// # + /// # Ok(()) + /// # } + /// ``` + pub fn with_virtual_columns(self, virtual_columns: Vec) -> Self { + // Validate that all fields are virtual columns + for field in &virtual_columns { + if !is_virtual_column(field) { + panic!( + "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'", + field.name() + ); + } + } + Self { + virtual_columns, + ..self + } + } + /// Retrieve the currently set page index behavior. /// /// This can be set via [`with_page_index`][Self::with_page_index]. @@ -630,7 +702,11 @@ impl ArrowReaderMetadata { /// of the settings in `options`. See [`Self::load`] to load metadata including the page index if needed. pub fn try_new(metadata: Arc, options: ArrowReaderOptions) -> Result { match options.supplied_schema { - Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()), + Some(supplied_schema) => Self::with_supplied_schema( + metadata, + supplied_schema.clone(), + &options.virtual_columns, + ), None => { let kv_metadata = match options.skip_arrow_metadata { true => None, @@ -641,6 +717,7 @@ impl ArrowReaderMetadata { metadata.file_metadata().schema_descr(), ProjectionMask::all(), kv_metadata, + &options.virtual_columns, )?; Ok(Self { @@ -655,16 +732,18 @@ impl ArrowReaderMetadata { fn with_supplied_schema( metadata: Arc, supplied_schema: SchemaRef, + virtual_columns: &[FieldRef], ) -> Result { let parquet_schema = metadata.file_metadata().schema_descr(); - let field_levels = parquet_to_arrow_field_levels( + let field_levels = parquet_to_arrow_field_levels_with_virtual( parquet_schema, ProjectionMask::all(), Some(supplied_schema.fields()), + virtual_columns, )?; let fields = field_levels.fields; let inferred_len = fields.len(); - let supplied_len = supplied_schema.fields().len(); + let supplied_len = supplied_schema.fields().len() + virtual_columns.len(); // Ensure the supplied schema has the same number of columns as the parquet schema. // parquet_to_arrow_field_levels is expected to throw an error if the schemas have // different lengths, but we check here to be safe. @@ -946,6 +1025,7 @@ impl ParquetRecordBatchReaderBuilder { cache_projection.intersect(&projection); let array_reader = ArrayReaderBuilder::new(&reader, &metrics) + .with_parquet_metadata(&reader.metadata) .build_array_reader(fields.as_deref(), predicate.projection())?; plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?; @@ -953,6 +1033,7 @@ impl ParquetRecordBatchReaderBuilder { } let array_reader = ArrayReaderBuilder::new(&reader, &metrics) + .with_parquet_metadata(&reader.metadata) .build_array_reader(fields.as_deref(), &projection)?; let read_plan = plan_builder @@ -991,6 +1072,18 @@ impl RowGroups for ReaderRowGroups { row_groups: self.row_groups.clone().into_iter(), })) } + + fn row_groups(&self) -> Box + '_> { + Box::new( + self.row_groups + .iter() + .map(move |i| self.metadata.row_group(*i)), + ) + } + + fn metadata(&self) -> &ParquetMetaData { + self.metadata.as_ref() + } } struct ReaderPageIterator { @@ -1174,6 +1267,7 @@ impl ParquetRecordBatchReader { // note metrics are not supported in this API let metrics = ArrowReaderMetrics::disabled(); let array_reader = ArrayReaderBuilder::new(row_groups, &metrics) + .with_parquet_metadata(row_groups.metadata()) .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?; let read_plan = ReadPlanBuilder::new(batch_size) @@ -1210,7 +1304,7 @@ impl ParquetRecordBatchReader { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use std::cmp::min; use std::collections::{HashMap, VecDeque}; use std::fmt::Formatter; @@ -1219,6 +1313,10 @@ mod tests { use std::path::PathBuf; use std::sync::Arc; + use rand::rngs::StdRng; + use rand::{Rng, RngCore, SeedableRng, random, rng}; + use tempfile::tempfile; + use arrow_array::builder::*; use arrow_array::cast::AsArray; use arrow_array::types::{ @@ -1236,14 +1334,13 @@ mod tests { use bytes::Bytes; use half::f16; use num_traits::PrimInt; - use rand::{Rng, RngCore, rng}; - use tempfile::tempfile; use crate::arrow::arrow_reader::{ - ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader, - ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector, + ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, + ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, + RowSelector, }; - use crate::arrow::schema::add_encoded_arrow_schema_to_metadata; + use crate::arrow::schema::{add_encoded_arrow_schema_to_metadata, virtual_type::RowNumber}; use crate::arrow::{ArrowWriter, ProjectionMask}; use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType}; use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE; @@ -3010,7 +3107,7 @@ mod tests { assert_eq!(end - total_read, batch.num_rows()); let a = converter(&expected_data[total_read..end]); - let b = Arc::clone(batch.column(0)); + let b = batch.column(0); assert_eq!(a.data_type(), b.data_type()); assert_eq!(a.to_data(), b.to_data()); @@ -5111,4 +5208,258 @@ mod tests { assert_eq!(out.num_rows(), 3); assert_eq!(out.num_columns(), 2); } + + #[test] + fn test_read_row_numbers() { + let file = write_parquet_from_iter(vec![( + "value", + Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef, + )]); + let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]); + + let row_number_field = Arc::new( + Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber), + ); + + let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields))); + let options = options.with_virtual_columns(vec![row_number_field.clone()]); + let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options( + file.try_clone().unwrap(), + options, + ) + .expect("reader builder with schema") + .build() + .expect("reader with schema"); + + let batch = arrow_reader.next().unwrap().unwrap(); + let schema = Arc::new(Schema::new(vec![ + Field::new("value", ArrowDataType::Int64, false), + (*row_number_field).clone(), + ])); + + assert_eq!(batch.schema(), schema); + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.num_rows(), 3); + assert_eq!( + batch + .column(0) + .as_primitive::() + .iter() + .collect::>(), + vec![Some(1), Some(2), Some(3)] + ); + assert_eq!( + batch + .column(1) + .as_primitive::() + .iter() + .collect::>(), + vec![Some(0), Some(1), Some(2)] + ); + } + + #[test] + fn test_read_only_row_numbers() { + let file = write_parquet_from_iter(vec![( + "value", + Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef, + )]); + let row_number_field = Arc::new( + Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber), + ); + let options = + ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()]); + let metadata = ArrowReaderMetadata::load(&file, options).unwrap(); + let num_columns = metadata + .metadata + .file_metadata() + .schema_descr() + .num_columns(); + + let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata) + .with_projection(ProjectionMask::none(num_columns)) + .build() + .expect("reader with schema"); + + let batch = arrow_reader.next().unwrap().unwrap(); + let schema = Arc::new(Schema::new(vec![row_number_field])); + + assert_eq!(batch.schema(), schema); + assert_eq!(batch.num_columns(), 1); + assert_eq!(batch.num_rows(), 3); + assert_eq!( + batch + .column(0) + .as_primitive::() + .iter() + .collect::>(), + vec![Some(0), Some(1), Some(2)] + ); + } + + #[test] + #[should_panic(expected = "is not a virtual column")] + fn test_with_virtual_columns_rejects_non_virtual_fields() { + // Try to pass a regular field (not a virtual column) to with_virtual_columns + let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false)); + let _options = ArrowReaderOptions::new().with_virtual_columns(vec![regular_field]); + } + + #[test] + fn test_row_numbers_with_multiple_row_groups() { + test_row_numbers_with_multiple_row_groups_helper( + false, + |path, selection, _row_filter, batch_size| { + let file = File::open(path).unwrap(); + let row_number_field = Arc::new( + Field::new("row_number", ArrowDataType::Int64, false) + .with_extension_type(RowNumber), + ); + let options = + ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field]); + let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options) + .unwrap() + .with_row_selection(selection) + .with_batch_size(batch_size) + .build() + .expect("Could not create reader"); + reader + .collect::, _>>() + .expect("Could not read") + }, + ); + } + + #[test] + fn test_row_numbers_with_multiple_row_groups_and_filter() { + test_row_numbers_with_multiple_row_groups_helper( + true, + |path, selection, row_filter, batch_size| { + let file = File::open(path).unwrap(); + let row_number_field = Arc::new( + Field::new("row_number", ArrowDataType::Int64, false) + .with_extension_type(RowNumber), + ); + let options = + ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field]); + let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options) + .unwrap() + .with_row_selection(selection) + .with_batch_size(batch_size) + .with_row_filter(row_filter.expect("No filter")) + .build() + .expect("Could not create reader"); + reader + .collect::, _>>() + .expect("Could not read") + }, + ); + } + + pub(crate) fn test_row_numbers_with_multiple_row_groups_helper( + use_filter: bool, + test_case: F, + ) where + F: FnOnce(PathBuf, RowSelection, Option, usize) -> Vec, + { + let seed: u64 = random(); + println!("test_row_numbers_with_multiple_row_groups seed: {}", seed); + let mut rng = StdRng::seed_from_u64(seed); + + use tempfile::TempDir; + let tempdir = TempDir::new().expect("Could not create temp dir"); + + let (bytes, metadata) = generate_file_with_row_numbers(&mut rng); + + let path = tempdir.path().join("test.parquet"); + std::fs::write(&path, bytes).expect("Could not write file"); + + let mut case = vec![]; + let mut remaining = metadata.file_metadata().num_rows(); + while remaining > 0 { + let row_count = rng.random_range(1..=remaining); + remaining -= row_count; + case.push(RowSelector { + row_count: row_count as usize, + skip: rng.random_bool(0.5), + }); + } + + let filter = use_filter.then(|| { + let filter = (0..metadata.file_metadata().num_rows()) + .map(|_| rng.random_bool(0.99)) + .collect::>(); + let mut filter_offset = 0; + RowFilter::new(vec![Box::new(ArrowPredicateFn::new( + ProjectionMask::all(), + move |b| { + let array = BooleanArray::from_iter( + filter + .iter() + .skip(filter_offset) + .take(b.num_rows()) + .map(|x| Some(*x)), + ); + filter_offset += b.num_rows(); + Ok(array) + }, + ))]) + }); + + let selection = RowSelection::from(case); + let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096)); + + if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize { + assert!(batches.into_iter().all(|batch| batch.num_rows() == 0)); + return; + } + let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches) + .expect("Failed to concatenate"); + // assert_eq!(selection.row_count(), actual.num_rows()); + let values = actual + .column(0) + .as_primitive::() + .iter() + .collect::>(); + let row_numbers = actual + .column(1) + .as_primitive::() + .iter() + .collect::>(); + assert_eq!( + row_numbers + .into_iter() + .map(|number| number.map(|number| number + 1)) + .collect::>(), + values + ); + } + + fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) { + let schema = Arc::new(Schema::new(Fields::from(vec![Field::new( + "value", + ArrowDataType::Int64, + false, + )]))); + + let mut buf = Vec::with_capacity(1024); + let mut writer = + ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer"); + + let mut values = 1..=rng.random_range(1..4096); + while !values.is_empty() { + let batch_values = values + .by_ref() + .take(rng.random_range(1..4096)) + .collect::>(); + let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef; + let batch = + RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch"); + writer.write(&batch).expect("Could not write batch"); + writer.flush().expect("Could not flush"); + } + let metadata = writer.close().expect("Could not close writer"); + + (Bytes::from(buf), metadata) + } } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index becc698abe2c..564456b42f57 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -497,9 +497,10 @@ impl ParquetRecordBatchStreamBuilder { // Ensure schema of ParquetRecordBatchStream respects projection, and does // not store metadata (same as for ParquetRecordBatchReader and emitted RecordBatches) + let projection_len = projection.mask.as_ref().map_or(usize::MAX, |m| m.len()); let projected_fields = schema .fields - .filter_leaves(|idx, _| projection.leaf_included(idx)); + .filter_leaves(|idx, _| idx < projection_len && projection.leaf_included(idx)); let projected_schema = Arc::new(Schema::new(projected_fields)); let decoder = ParquetPushDecoderBuilder { @@ -764,10 +765,12 @@ where #[cfg(test)] mod tests { use super::*; + use crate::arrow::arrow_reader::tests::test_row_numbers_with_multiple_row_groups_helper; use crate::arrow::arrow_reader::{ ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector, }; use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; + use crate::arrow::schema::virtual_type::RowNumber; use crate::arrow::{ArrowWriter, ProjectionMask}; use crate::file::metadata::ParquetMetaDataReader; use crate::file::properties::WriterProperties; @@ -2102,4 +2105,65 @@ mod tests { 92 ); } + + #[test] + fn test_row_numbers_with_multiple_row_groups() { + test_row_numbers_with_multiple_row_groups_helper( + false, + |path, selection, _row_filter, batch_size| { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Could not create runtime"); + runtime.block_on(async move { + let file = tokio::fs::File::open(path).await.unwrap(); + let row_number_field = Arc::new( + Field::new("row_number", DataType::Int64, false) + .with_extension_type(RowNumber), + ); + let options = + ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field]); + let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options) + .await + .unwrap() + .with_row_selection(selection) + .with_batch_size(batch_size) + .build() + .expect("Could not create reader"); + reader.try_collect::>().await.unwrap() + }) + }, + ); + } + + #[test] + fn test_row_numbers_with_multiple_row_groups_and_filter() { + test_row_numbers_with_multiple_row_groups_helper( + true, + |path, selection, row_filter, batch_size| { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Could not create runtime"); + runtime.block_on(async move { + let file = tokio::fs::File::open(path).await.unwrap(); + let row_number_field = Arc::new( + Field::new("row_number", DataType::Int64, false) + .with_extension_type(RowNumber), + ); + let options = + ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field]); + let reader = ParquetRecordBatchStreamBuilder::new_with_options(file, options) + .await + .unwrap() + .with_row_selection(selection) + .with_row_filter(row_filter.expect("No row filter")) + .with_batch_size(batch_size) + .build() + .expect("Could not create reader"); + reader.try_collect::>().await.unwrap() + }) + }, + ); + } } diff --git a/parquet/src/arrow/in_memory_row_group.rs b/parquet/src/arrow/in_memory_row_group.rs index 34e46cd34e91..0014bb391630 100644 --- a/parquet/src/arrow/in_memory_row_group.rs +++ b/parquet/src/arrow/in_memory_row_group.rs @@ -20,7 +20,7 @@ use crate::arrow::array_reader::RowGroups; use crate::arrow::arrow_reader::RowSelection; use crate::column::page::{PageIterator, PageReader}; use crate::errors::ParquetError; -use crate::file::metadata::ParquetMetaData; +use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; use bytes::{Buf, Bytes}; @@ -226,6 +226,14 @@ impl RowGroups for InMemoryRowGroup<'_> { } } } + + fn row_groups(&self) -> Box + '_> { + Box::new(std::iter::once(self.metadata.row_group(self.row_group_idx))) + } + + fn metadata(&self) -> &ParquetMetaData { + self.metadata + } } /// An in-memory column chunk. diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 34aac8b08aa0..244364d8d161 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -209,7 +209,8 @@ use arrow_schema::{FieldRef, Schema}; pub use self::schema::{ ArrowSchemaConverter, FieldLevels, add_encoded_arrow_schema_to_metadata, encode_arrow_schema, - parquet_to_arrow_field_levels, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, + parquet_to_arrow_field_levels, parquet_to_arrow_field_levels_with_virtual, + parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, virtual_type::*, }; /// Schema metadata key used to store serialized Arrow schema @@ -264,7 +265,7 @@ pub struct ProjectionMask { /// A mask of `[true, false, true, false]` will result in a schema 2 /// elements long: /// * `fields[0]`: `a` - /// * `fields[1]`: `c` + /// * `fields[1]`: `c` /// /// A mask of `None` will result in a schema 4 elements long: /// * `fields[0]`: `a` diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index a0ced8aa8522..150093d4f327 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -425,6 +425,7 @@ impl RowGroupReaderBuilder { let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics) .with_cache_options(Some(&cache_options)) + .with_parquet_metadata(&self.metadata) .build_array_reader(self.fields.as_deref(), predicate.projection())?; plan_builder = @@ -573,7 +574,8 @@ impl RowGroupReaderBuilder { let plan = plan_builder.build(); // if we have any cached results, connect them up - let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics); + let array_reader_builder = ArrayReaderBuilder::new(&row_group, &self.metrics) + .with_parquet_metadata(&self.metadata); let array_reader = if let Some(cache_info) = cache_info.as_ref() { let cache_options = cache_info.builder().consumer(); array_reader_builder diff --git a/parquet/src/arrow/schema/complex.rs b/parquet/src/arrow/schema/complex.rs index 9622f6270d9e..8b85cac479c1 100644 --- a/parquet/src/arrow/schema/complex.rs +++ b/parquet/src/arrow/schema/complex.rs @@ -20,12 +20,13 @@ use std::sync::Arc; use crate::arrow::schema::extension::try_add_extension_type; use crate::arrow::schema::primitive::convert_primitive; +use crate::arrow::schema::virtual_type::RowNumber; use crate::arrow::{PARQUET_FIELD_ID_META_KEY, ProjectionMask}; use crate::basic::{ConvertedType, Repetition}; use crate::errors::ParquetError; use crate::errors::Result; use crate::schema::types::{SchemaDescriptor, Type, TypePtr}; -use arrow_schema::{DataType, Field, Fields, SchemaBuilder}; +use arrow_schema::{DataType, Field, Fields, SchemaBuilder, extension::ExtensionType}; fn get_repetition(t: &Type) -> Repetition { let info = t.get_basic_info(); @@ -77,10 +78,18 @@ impl ParquetField { match &self.field_type { ParquetFieldType::Primitive { .. } => None, ParquetFieldType::Group { children } => Some(children), + ParquetFieldType::Virtual(_) => None, } } } +/// Types of virtual columns that can be computed at read time +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum VirtualColumnType { + /// Row number within the file + RowNumber, +} + #[derive(Debug, Clone)] pub enum ParquetFieldType { Primitive { @@ -92,6 +101,9 @@ pub enum ParquetFieldType { Group { children: Vec, }, + /// Virtual column that doesn't exist in the parquet file + /// but is computed at read time (e.g., row_number) + Virtual(VirtualColumnType), } /// Encodes the context of the parent of the field currently under consideration @@ -541,6 +553,55 @@ impl Visitor { } } +/// Converts a virtual Arrow [`Field`] to a [`ParquetField`] +/// +/// Virtual fields don't correspond to any data in the parquet file, +/// but are computed at read time (e.g., row_number) +/// +/// The levels are computed based on the parent context: +/// - If nullable: def_level = parent_def_level + 1 +/// - If required: def_level = parent_def_level +/// - rep_level = parent_rep_level (virtual fields are not repeated) +pub(super) fn convert_virtual_field( + arrow_field: &Field, + parent_rep_level: i16, + parent_def_level: i16, +) -> Result { + let nullable = arrow_field.is_nullable(); + let def_level = if nullable { + parent_def_level + 1 + } else { + parent_def_level + }; + + // Determine the virtual column type based on the extension type name + let extension_name = arrow_field.extension_type_name().ok_or_else(|| { + ParquetError::ArrowError(format!( + "virtual column field '{}' must have an extension type", + arrow_field.name() + )) + })?; + + let virtual_type = match extension_name { + RowNumber::NAME => VirtualColumnType::RowNumber, + _ => { + return Err(ParquetError::ArrowError(format!( + "unsupported virtual column type '{}' for field '{}'", + extension_name, + arrow_field.name() + ))); + } + }; + + Ok(ParquetField { + rep_level: parent_rep_level, + def_level, + nullable, + arrow_type: arrow_field.data_type().clone(), + field_type: ParquetFieldType::Virtual(virtual_type), + }) +} + /// Computes the Arrow [`Field`] for a child column /// /// The resulting Arrow [`Field`] will have the type dictated by the Parquet `field`, a name diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 1d8d2e1fd369..b4a9ba7b7f1d 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -23,7 +23,7 @@ use std::collections::HashMap; use std::sync::Arc; use arrow_ipc::writer; -use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; +use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, TimeUnit}; use crate::basic::{ ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType, @@ -35,6 +35,7 @@ use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type}; mod complex; mod extension; mod primitive; +pub mod virtual_type; use super::PARQUET_FIELD_ID_META_KEY; use crate::arrow::ProjectionMask; @@ -42,7 +43,7 @@ use crate::arrow::schema::extension::{ has_extension_type, logical_type_for_fixed_size_binary, logical_type_for_string, logical_type_for_struct, try_add_extension_type, }; -pub(crate) use complex::{ParquetField, ParquetFieldType}; +pub(crate) use complex::{ParquetField, ParquetFieldType, VirtualColumnType}; /// Convert Parquet schema to Arrow schema including optional metadata /// @@ -62,7 +63,7 @@ pub fn parquet_to_arrow_schema_by_columns( mask: ProjectionMask, key_value_metadata: Option<&Vec>, ) -> Result { - Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata)?.0) + Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata, &[])?.0) } /// Determines the Arrow Schema from a Parquet schema @@ -74,6 +75,7 @@ pub(crate) fn parquet_to_arrow_schema_and_fields( parquet_schema: &SchemaDescriptor, mask: ProjectionMask, key_value_metadata: Option<&Vec>, + virtual_columns: &[FieldRef], ) -> Result<(Schema, Option)> { let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default(); let maybe_schema = metadata @@ -89,7 +91,8 @@ pub(crate) fn parquet_to_arrow_schema_and_fields( } let hint = maybe_schema.as_ref().map(|s| s.fields()); - let field_levels = parquet_to_arrow_field_levels(parquet_schema, mask, hint)?; + let field_levels = + parquet_to_arrow_field_levels_with_virtual(parquet_schema, mask, hint, virtual_columns)?; let schema = Schema::new_with_metadata(field_levels.fields, metadata); Ok((schema, field_levels.levels)) } @@ -132,18 +135,123 @@ pub fn parquet_to_arrow_field_levels( mask: ProjectionMask, hint: Option<&Fields>, ) -> Result { - match complex::convert_schema(schema, mask, hint)? { - Some(field) => match &field.arrow_type { - DataType::Struct(fields) => Ok(FieldLevels { - fields: fields.clone(), - levels: Some(field), - }), - _ => unreachable!(), - }, - None => Ok(FieldLevels { - fields: Fields::empty(), - levels: None, + parquet_to_arrow_field_levels_with_virtual(schema, mask, hint, &[]) +} + +/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`] with support for virtual columns +/// +/// Columns not included within [`ProjectionMask`] will be ignored. +/// +/// The optional `hint` parameter is the desired Arrow schema. See the +/// [`arrow`] module documentation for more information. +/// +/// [`arrow`]: crate::arrow +/// +/// # Arguments +/// * `schema` - The Parquet schema descriptor +/// * `mask` - Projection mask to select which columns to include +/// * `hint` - Optional hint for Arrow field types to use instead of defaults +/// * `virtual_columns` - Virtual columns to append to the schema (e.g., row numbers) +/// +/// # Notes: +/// Where a field type in `hint` is compatible with the corresponding parquet type in `schema`, it +/// will be used, otherwise the default arrow type for the given parquet column type will be used. +/// +/// Virtual columns are columns that don't exist in the Parquet file but are generated during reading. +/// They must have extension type names starting with "arrow.virtual.". +/// +/// This is to accommodate arrow types that cannot be round-tripped through parquet natively. +/// Depending on the parquet writer, this can lead to a mismatch between a file's parquet schema +/// and its embedded arrow schema. The parquet `schema` must be treated as authoritative in such +/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for more information +/// +/// Note: this is a low-level API, most users will want to make use of the higher-level +/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file. +pub fn parquet_to_arrow_field_levels_with_virtual( + schema: &SchemaDescriptor, + mask: ProjectionMask, + hint: Option<&Fields>, + virtual_columns: &[FieldRef], +) -> Result { + // Validate that all fields are virtual columns + for field in virtual_columns { + if !virtual_type::is_virtual_column(field) { + return Err(ParquetError::General(format!( + "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'", + field.name() + ))); + } + } + + // Convert the regular schema first + let mut parquet_field = match complex::convert_schema(schema, mask, hint)? { + Some(field) => field, + None if virtual_columns.is_empty() => { + return Ok(FieldLevels { + fields: Fields::empty(), + levels: None, + }); + } + None => { + // No regular fields, but we have virtual columns - create empty root struct + ParquetField { + rep_level: 0, + def_level: 0, + nullable: false, + arrow_type: DataType::Struct(Fields::empty()), + field_type: ParquetFieldType::Group { + children: Vec::new(), + }, + } + } + }; + + // Append virtual columns if any + if !virtual_columns.is_empty() { + match &mut parquet_field.field_type { + ParquetFieldType::Group { children } => { + // Get the mutable fields from the struct type + let DataType::Struct(ref mut fields) = parquet_field.arrow_type else { + unreachable!("Root field must be a struct"); + }; + + // Convert to mutable Vec to append + let mut fields_vec: Vec = fields.iter().cloned().collect(); + + // Append each virtual column + for virtual_column in virtual_columns { + // Virtual columns can only be added at the root level + assert_eq!( + parquet_field.rep_level, 0, + "Virtual columns can only be added at rep level 0" + ); + assert_eq!( + parquet_field.def_level, 0, + "Virtual columns can only be added at def level 0" + ); + + fields_vec.push(virtual_column.clone()); + let virtual_parquet_field = complex::convert_virtual_field( + virtual_column, + parquet_field.rep_level, + parquet_field.def_level, + )?; + children.push(virtual_parquet_field); + } + + // Update the fields + parquet_field.arrow_type = DataType::Struct(Fields::from(fields_vec)); + } + _ => unreachable!("Root field must be a group"), + } + } + + match &parquet_field.arrow_type { + DataType::Struct(fields) => Ok(FieldLevels { + fields: fields.clone(), + levels: Some(parquet_field), }), + _ => unreachable!(), } } @@ -2249,4 +2357,32 @@ mod tests { Ok(()) } + + #[test] + fn test_parquet_to_arrow_field_levels_with_virtual_rejects_non_virtual() { + let message_type = " + message test_schema { + REQUIRED INT32 id; + } + "; + let parquet_schema = Arc::new(parse_message_type(message_type).unwrap()); + let descriptor = SchemaDescriptor::new(parquet_schema); + + // Try to pass a regular field (not a virtual column) + let regular_field = Arc::new(Field::new("regular_column", DataType::Int64, false)); + let result = parquet_to_arrow_field_levels_with_virtual( + &descriptor, + ProjectionMask::all(), + None, + &[regular_field], + ); + + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("is not a virtual column") + ); + } } diff --git a/parquet/src/arrow/schema/virtual_type.rs b/parquet/src/arrow/schema/virtual_type.rs new file mode 100644 index 000000000000..eca2aef08dca --- /dev/null +++ b/parquet/src/arrow/schema/virtual_type.rs @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! RowNumber +//! + +use arrow_schema::{ArrowError, DataType, Field, extension::ExtensionType}; + +/// Prefix for virtual column extension type names. +macro_rules! VIRTUAL_PREFIX { + () => { + "parquet.virtual." + }; +} + +/// The extension type for row numbers. +/// +/// Extension name: `parquet.virtual.row_number`. +/// +/// This virtual column has storage type `Int64` and uses empty string metadata. +#[derive(Debug, Default, Clone, Copy, PartialEq)] +pub struct RowNumber; + +impl ExtensionType for RowNumber { + const NAME: &'static str = concat!(VIRTUAL_PREFIX!(), "row_number"); + type Metadata = &'static str; + + fn metadata(&self) -> &Self::Metadata { + &"" + } + + fn serialize_metadata(&self) -> Option { + Some(String::default()) + } + + fn deserialize_metadata(metadata: Option<&str>) -> Result { + if metadata.is_some_and(str::is_empty) { + Ok("") + } else { + Err(ArrowError::InvalidArgumentError( + "Virtual column extension type expects an empty string as metadata".to_owned(), + )) + } + } + + fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { + match data_type { + DataType::Int64 => Ok(()), + data_type => Err(ArrowError::InvalidArgumentError(format!( + "Virtual column data type mismatch, expected Int64, found {data_type}" + ))), + } + } + + fn try_new(data_type: &DataType, _metadata: Self::Metadata) -> Result { + Self.supports_data_type(data_type).map(|_| Self) + } +} + +/// Returns `true` if the field is a virtual column. +/// +/// Virtual columns have extension type names starting with `parquet.virtual.`. +pub fn is_virtual_column(field: &Field) -> bool { + field + .extension_type_name() + .map(|name| name.starts_with(VIRTUAL_PREFIX!())) + .unwrap_or(false) +} + +#[cfg(test)] +mod tests { + use arrow_schema::{ + ArrowError, DataType, Field, + extension::{EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY}, + }; + + use super::*; + + #[test] + fn valid() -> Result<(), ArrowError> { + let mut field = Field::new("", DataType::Int64, false); + field.try_with_extension_type(RowNumber)?; + field.try_extension_type::()?; + + Ok(()) + } + + #[test] + #[should_panic(expected = "Field extension type name missing")] + fn missing_name() { + let field = Field::new("", DataType::Int64, false).with_metadata( + [(EXTENSION_TYPE_METADATA_KEY.to_owned(), "".to_owned())] + .into_iter() + .collect(), + ); + field.extension_type::(); + } + + #[test] + #[should_panic(expected = "expected Int64, found Int32")] + fn invalid_type() { + Field::new("", DataType::Int32, false).with_extension_type(RowNumber); + } + + #[test] + #[should_panic(expected = "Virtual column extension type expects an empty string as metadata")] + fn missing_metadata() { + let field = Field::new("", DataType::Int64, false).with_metadata( + [( + EXTENSION_TYPE_NAME_KEY.to_owned(), + RowNumber::NAME.to_owned(), + )] + .into_iter() + .collect(), + ); + field.extension_type::(); + } + + #[test] + #[should_panic(expected = "Virtual column extension type expects an empty string as metadata")] + fn invalid_metadata() { + let field = Field::new("", DataType::Int64, false).with_metadata( + [ + ( + EXTENSION_TYPE_NAME_KEY.to_owned(), + RowNumber::NAME.to_owned(), + ), + ( + EXTENSION_TYPE_METADATA_KEY.to_owned(), + "non-empty".to_owned(), + ), + ] + .into_iter() + .collect(), + ); + field.extension_type::(); + } +} diff --git a/parquet/src/file/metadata/thrift/encryption.rs b/parquet/src/file/metadata/thrift/encryption.rs index 56c5a6a4b9da..3be9d4d6a369 100644 --- a/parquet/src/file/metadata/thrift/encryption.rs +++ b/parquet/src/file/metadata/thrift/encryption.rs @@ -17,6 +17,7 @@ //! Encryption support for Thrift serialization +use crate::file::metadata::thrift::OrdinalAssigner; use crate::{ encryption::decrypt::{FileDecryptionProperties, FileDecryptor}, errors::{ParquetError, Result}, @@ -294,10 +295,19 @@ pub(crate) fn parquet_metadata_with_encryption( file_decryptor = Some(file_decryptor_value); } - // decrypt column chunk info + // decrypt column chunk info and handle ordinal assignment + let mut assigner = OrdinalAssigner::new(); let row_groups = row_groups .into_iter() - .map(|rg| row_group_from_encrypted_thrift(rg, file_decryptor.as_ref())) + .enumerate() + .map(|(ordinal, rg)| { + let ordinal: i16 = ordinal.try_into().map_err(|_| { + ParquetError::General(format!("Row group ordinal {ordinal} exceeds i32 range")) + })?; + let rg = row_group_from_encrypted_thrift(rg, file_decryptor.as_ref())?; + let rg = assigner.ensure(ordinal, rg)?; + Ok(rg) + }) .collect::>>()?; let metadata = ParquetMetaDataBuilder::new(file_metadata) diff --git a/parquet/src/file/metadata/thrift/mod.rs b/parquet/src/file/metadata/thrift/mod.rs index 6fdb0e0c4bb1..979b399a1662 100644 --- a/parquet/src/file/metadata/thrift/mod.rs +++ b/parquet/src/file/metadata/thrift/mod.rs @@ -765,8 +765,17 @@ pub(crate) fn parquet_metadata_from_bytes( let schema_descr = schema_descr.as_ref().unwrap(); let list_ident = prot.read_list_begin()?; let mut rg_vec = Vec::with_capacity(list_ident.size as usize); - for _ in 0..list_ident.size { - rg_vec.push(read_row_group(&mut prot, schema_descr)?); + + // Read row groups and handle ordinal assignment + let mut assigner = OrdinalAssigner::new(); + for ordinal in 0..list_ident.size { + let ordinal: i16 = ordinal.try_into().map_err(|_| { + ParquetError::General(format!( + "Row group ordinal {ordinal} exceeds i16 max value", + )) + })?; + let rg = read_row_group(&mut prot, schema_descr)?; + rg_vec.push(assigner.ensure(ordinal, rg)?); } row_groups = Some(rg_vec); } @@ -858,6 +867,55 @@ pub(crate) fn parquet_metadata_from_bytes( Ok(ParquetMetaData::new(fmd, row_groups)) } +/// Assign [`RowGroupMetaData::ordinal`] if it is missing. +#[derive(Debug, Default)] +pub(crate) struct OrdinalAssigner { + first_has_ordinal: bool, +} + +impl OrdinalAssigner { + fn new() -> Self { + Default::default() + } + + /// Sets [`RowGroupMetaData::ordinal`] if it is missing. + /// + /// # Arguments + /// - actual_ordinal: The ordinal (index) of the row group being processed + /// in the file metadata. + /// - rg: The [`RowGroupMetaData`] to potentially modify. + /// + /// Ensures: + /// 1. If the first row group has an ordinal, all subsequent row groups must + /// also have ordinals. + /// 2. If the first row group does NOT have an ordinal, all subsequent row + /// groups must also not have ordinals. + fn ensure( + &mut self, + actual_ordinal: i16, + mut rg: RowGroupMetaData, + ) -> Result { + let rg_has_ordinal = rg.ordinal.is_some(); + if actual_ordinal == 0 { + self.first_has_ordinal = rg_has_ordinal; + } + + // assign ordinal if missing and consistent with first row group + if !self.first_has_ordinal && !rg_has_ordinal { + rg.ordinal = Some(actual_ordinal); + } else if self.first_has_ordinal != rg_has_ordinal { + return Err(general_err!( + "Inconsistent ordinal assignment: first_has_ordinal is set to \ + {} but row-group with actual ordinal {} has rg_has_ordinal set to {}", + self.first_has_ordinal, + actual_ordinal, + rg_has_ordinal + )); + } + Ok(rg) + } +} + thrift_struct!( pub(crate) struct IndexPageHeader {} );