diff --git a/arrow-parquet-integration-testing/src/main.rs b/arrow-parquet-integration-testing/src/main.rs index d99c7888aa8..eef4acc9cb0 100644 --- a/arrow-parquet-integration-testing/src/main.rs +++ b/arrow-parquet-integration-testing/src/main.rs @@ -175,15 +175,15 @@ fn main() -> Result<()> { .fields .iter() .map(|x| match x.data_type() { - DataType::Dictionary(..) => Encoding::RleDictionary, + DataType::Dictionary(..) => vec![Encoding::RleDictionary], DataType::Utf8 | DataType::LargeUtf8 => { - if args.encoding_utf8 == EncodingScheme::Delta { + vec![if args.encoding_utf8 == EncodingScheme::Delta { Encoding::DeltaLengthByteArray } else { Encoding::Plain - } + }] } - _ => Encoding::Plain, + _ => vec![Encoding::Plain], }) .collect(); diff --git a/benches/write_parquet.rs b/benches/write_parquet.rs index 05309820dd7..5bd2848c2eb 100644 --- a/benches/write_parquet.rs +++ b/benches/write_parquet.rs @@ -25,7 +25,7 @@ fn write(array: &dyn Array, encoding: Encoding) -> Result<()> { vec![Ok(columns)].into_iter(), &schema, options, - vec![encoding], + vec![vec![encoding]], )?; let writer = vec![]; diff --git a/examples/parquet_write.rs b/examples/parquet_write.rs index f2134c78475..ad5487ebfa1 100644 --- a/examples/parquet_write.rs +++ b/examples/parquet_write.rs @@ -20,8 +20,12 @@ fn write_batch(path: &str, schema: Schema, columns: Chunk>) -> Re let iter = vec![Ok(columns)]; - let row_groups = - RowGroupIterator::try_new(iter.into_iter(), &schema, options, vec![Encoding::Plain])?; + let row_groups = RowGroupIterator::try_new( + iter.into_iter(), + &schema, + options, + vec![vec![Encoding::Plain]], + )?; // Create a new empty file let file = File::create(path)?; diff --git a/src/doc/lib.md b/src/doc/lib.md index efc89876e51..e27e0ff2c37 100644 --- a/src/doc/lib.md +++ b/src/doc/lib.md @@ -51,7 +51,7 @@ fn main() -> Result<()> { vec![Ok(chunk)].into_iter(), &schema, options, - vec![Encoding::Plain, Encoding::Plain], + vec![vec![Encoding::Plain], vec![Encoding::Plain]], )?; // anything implementing `std::io::Write` works diff --git a/src/io/parquet/write/binary/nested.rs b/src/io/parquet/write/binary/nested.rs index 04a387a895c..198e1d2156c 100644 --- a/src/io/parquet/write/binary/nested.rs +++ b/src/io/parquet/write/binary/nested.rs @@ -1,34 +1,29 @@ use parquet2::schema::types::PrimitiveType; use parquet2::{encoding::Encoding, page::DataPage}; -use super::super::{levels, utils, WriteOptions}; +use super::super::{nested, utils, WriteOptions}; use super::basic::{build_statistics, encode_plain}; use crate::io::parquet::read::schema::is_nullable; +use crate::io::parquet::write::Nested; use crate::{ array::{Array, BinaryArray, Offset}, error::Result, }; -pub fn array_to_page( +pub fn array_to_page( array: &BinaryArray, options: WriteOptions, type_: PrimitiveType, - nested: levels::NestedInfo, + nested: Vec, ) -> Result where - OO: Offset, O: Offset, { let is_optional = is_nullable(&type_.field_info); - let validity = array.validity(); - let mut buffer = vec![]; - levels::write_rep_levels(&mut buffer, &nested, options.version)?; - let repetition_levels_byte_length = buffer.len(); - - levels::write_def_levels(&mut buffer, &nested, validity, is_optional, options.version)?; - let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length; + let (repetition_levels_byte_length, definition_levels_byte_length) = + nested::write_rep_and_def(options.version, &nested, &mut buffer)?; encode_plain(array, is_optional, &mut buffer); @@ -40,8 +35,8 @@ where utils::build_plain_page( buffer, - levels::num_values(nested.offsets()), - nested.offsets().len().saturating_sub(1), + nested::num_values(&nested), + nested[0].len(), array.null_count(), repetition_levels_byte_length, definition_levels_byte_length, diff --git a/src/io/parquet/write/boolean/nested.rs b/src/io/parquet/write/boolean/nested.rs index 0ebe62cd11c..4bd741ab52a 100644 --- a/src/io/parquet/write/boolean/nested.rs +++ b/src/io/parquet/write/boolean/nested.rs @@ -1,33 +1,26 @@ use parquet2::schema::types::PrimitiveType; use parquet2::{encoding::Encoding, page::DataPage}; -use super::super::{levels, utils, WriteOptions}; +use super::super::{nested, utils, WriteOptions}; use super::basic::{build_statistics, encode_plain}; use crate::io::parquet::read::schema::is_nullable; +use crate::io::parquet::write::Nested; use crate::{ - array::{Array, BooleanArray, Offset}, + array::{Array, BooleanArray}, error::Result, }; -pub fn array_to_page( +pub fn array_to_page( array: &BooleanArray, options: WriteOptions, type_: PrimitiveType, - nested: levels::NestedInfo, -) -> Result -where - O: Offset, -{ + nested: Vec, +) -> Result { let is_optional = is_nullable(&type_.field_info); - let validity = array.validity(); - let mut buffer = vec![]; - levels::write_rep_levels(&mut buffer, &nested, options.version)?; - let repetition_levels_byte_length = buffer.len(); - - levels::write_def_levels(&mut buffer, &nested, validity, is_optional, options.version)?; - let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length; + let (repetition_levels_byte_length, definition_levels_byte_length) = + nested::write_rep_and_def(options.version, &nested, &mut buffer)?; encode_plain(array, is_optional, &mut buffer)?; @@ -39,8 +32,8 @@ where utils::build_plain_page( buffer, - levels::num_values(nested.offsets()), - nested.offsets().len().saturating_sub(1), + nested::num_values(&nested), + nested[0].len(), array.null_count(), repetition_levels_byte_length, definition_levels_byte_length, diff --git a/src/io/parquet/write/levels.rs b/src/io/parquet/write/levels.rs deleted file mode 100644 index 26e6a25cd7f..00000000000 --- a/src/io/parquet/write/levels.rs +++ /dev/null @@ -1,339 +0,0 @@ -use parquet2::encoding::hybrid_rle::encode_u32; -use parquet2::write::Version; - -use crate::{array::Offset, bitmap::Bitmap, error::Result}; - -pub fn num_values(offsets: &[O]) -> usize { - offsets - .windows(2) - .map(|w| { - let length = w[1].to_usize() - w[0].to_usize(); - if length == 0 { - 1 - } else { - length - } - }) - .sum() -} - -/// Iterator adapter of parquet / dremel repetition levels -#[derive(Debug)] -pub struct RepLevelsIter<'a, O: Offset> { - iter: std::slice::Windows<'a, O>, - remaining: usize, - length: usize, - total_size: usize, -} - -impl<'a, O: Offset> RepLevelsIter<'a, O> { - pub fn new(offsets: &'a [O]) -> Self { - let total_size = num_values(offsets); - - Self { - iter: offsets.windows(2), - remaining: 0, - length: 0, - total_size, - } - } -} - -impl Iterator for RepLevelsIter<'_, O> { - type Item = u32; - - fn next(&mut self) -> Option { - if self.remaining == self.length { - if let Some(w) = self.iter.next() { - let start = w[0].to_usize(); - let end = w[1].to_usize(); - self.length = end - start; - self.remaining = 0; - if self.length == 0 { - self.total_size -= 1; - return Some(0); - } - } else { - return None; - } - } - let old = self.remaining; - self.remaining += 1; - self.total_size -= 1; - Some((old >= 1) as u32) - } - - fn size_hint(&self) -> (usize, Option) { - (self.total_size, Some(self.total_size)) - } -} - -/// Iterator adapter of parquet / dremel definition levels -pub struct DefLevelsIter<'a, O: Offset, II: Iterator, I: Iterator> { - iter: std::iter::Zip, II>, - primitive_validity: I, - remaining: usize, - is_valid: u32, - total_size: usize, -} - -impl<'a, O: Offset, II: Iterator, I: Iterator> DefLevelsIter<'a, O, II, I> { - pub fn new(offsets: &'a [O], validity: II, primitive_validity: I) -> Self { - let total_size = num_values(offsets); - - let iter = offsets.windows(2).zip(validity); - - Self { - iter, - primitive_validity, - remaining: 0, - is_valid: 0, - total_size, - } - } -} - -impl, I: Iterator> Iterator - for DefLevelsIter<'_, O, II, I> -{ - type Item = u32; - - fn next(&mut self) -> Option { - if self.remaining == 0 { - let (w, is_valid) = self.iter.next()?; - let start = w[0].to_usize(); - let end = w[1].to_usize(); - self.remaining = end - start; - self.is_valid = is_valid + 1; - if self.remaining == 0 { - self.total_size -= 1; - return Some(self.is_valid - 1); - } - } - self.remaining -= 1; - self.total_size -= 1; - - let p_is_valid = self.primitive_validity.next().unwrap_or_default(); - Some(self.is_valid + p_is_valid) - } - - fn size_hint(&self) -> (usize, Option) { - (self.total_size, Some(self.total_size)) - } -} - -#[derive(Debug)] -pub struct NestedInfo<'a, O: Offset> { - is_optional: bool, - offsets: &'a [O], - validity: Option<&'a Bitmap>, -} - -impl<'a, O: Offset> NestedInfo<'a, O> { - pub fn new(offsets: &'a [O], validity: Option<&'a Bitmap>, is_optional: bool) -> Self { - Self { - is_optional, - offsets, - validity, - } - } - - pub fn offsets(&self) -> &'a [O] { - self.offsets - } -} - -fn write_levels_v1) -> Result<()>>( - buffer: &mut Vec, - encode: F, -) -> Result<()> { - buffer.extend_from_slice(&[0; 4]); - let start = buffer.len(); - - encode(buffer)?; - - let end = buffer.len(); - let length = end - start; - - // write the first 4 bytes as length - let length = (length as i32).to_le_bytes(); - (0..4).for_each(|i| buffer[start - 4 + i] = length[i]); - Ok(()) -} - -/// writes the rep levels to a `Vec`. -pub fn write_rep_levels( - buffer: &mut Vec, - nested: &NestedInfo, - version: Version, -) -> Result<()> { - let num_bits = 1; // todo: compute this - - match version { - Version::V1 => { - write_levels_v1(buffer, |buffer: &mut Vec| { - let levels = RepLevelsIter::new(nested.offsets); - encode_u32(buffer, levels, num_bits)?; - Ok(()) - })?; - } - Version::V2 => { - let levels = RepLevelsIter::new(nested.offsets); - - encode_u32(buffer, levels, num_bits)?; - } - } - - Ok(()) -} - -fn write_def_levels1>( - buffer: &mut Vec, - levels: I, - num_bits: u8, - version: Version, -) -> Result<()> { - match version { - Version::V1 => write_levels_v1(buffer, move |buffer: &mut Vec| { - encode_u32(buffer, levels, num_bits)?; - Ok(()) - }), - Version::V2 => Ok(encode_u32(buffer, levels, num_bits)?), - } -} - -/// writes the rep levels to a `Vec`. -pub fn write_def_levels( - buffer: &mut Vec, - nested: &NestedInfo, - validity: Option<&Bitmap>, - primitive_is_optional: bool, - version: Version, -) -> Result<()> { - let mut num_bits = 1 + nested.is_optional as u8 + primitive_is_optional as u8; - if num_bits == 3 { - // brute-force log2 - this needs to be generalized for e.g. list of list - num_bits = 2 - }; - - // this match ensures that irrespectively of the arrays' validities, we write def levels - // that are consistent with the declared parquet schema. - // see comments on some of the variants - match ( - nested.is_optional, - nested.validity.as_ref(), - primitive_is_optional, - validity.as_ref(), - ) { - // if the validity is optional and there is no validity in the array, we - // need to write 1 to mark the fields as valid - (true, None, true, None) => { - let nested_validity = std::iter::repeat(1); - let validity = std::iter::repeat(1); - let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity); - write_def_levels1(buffer, levels, num_bits, version) - } - (true, Some(nested_validity), true, None) => { - let levels = DefLevelsIter::new( - nested.offsets, - nested_validity.iter().map(|x| x as u32), - std::iter::repeat(1), - ); - write_def_levels1(buffer, levels, num_bits, version) - } - (true, None, true, Some(validity)) => { - let levels = DefLevelsIter::new( - nested.offsets, - std::iter::repeat(1), - validity.iter().map(|x| x as u32), - ); - write_def_levels1(buffer, levels, num_bits, version) - } - (true, Some(nested_validity), true, Some(validity)) => { - let levels = DefLevelsIter::new( - nested.offsets, - nested_validity.iter().map(|x| x as u32), - validity.iter().map(|x| x as u32), - ); - write_def_levels1(buffer, levels, num_bits, version) - } - (true, None, false, _) => { - let nested_validity = std::iter::repeat(1); - let validity = std::iter::repeat(0); - let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity); - write_def_levels1(buffer, levels, num_bits, version) - } - (true, Some(nested_validity), false, _) => { - let nested_validity = nested_validity.iter().map(|x| x as u32); - let validity = std::iter::repeat(0); - let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity); - write_def_levels1(buffer, levels, num_bits, version) - } - (false, _, true, None) => { - let nested_validity = std::iter::repeat(0); - let validity = std::iter::repeat(1); - let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity); - write_def_levels1(buffer, levels, num_bits, version) - } - (false, _, true, Some(validity)) => { - let nested_validity = std::iter::repeat(0); - let validity = validity.iter().map(|x| x as u32); - let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity); - write_def_levels1(buffer, levels, num_bits, version) - } - (false, _, false, _) => { - let nested_validity = std::iter::repeat(0); - let validity = std::iter::repeat(0); - let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity); - write_def_levels1(buffer, levels, num_bits, version) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_rep_levels() { - let offsets = [0, 2, 2, 5, 8, 8, 11, 11, 12].as_ref(); - let expected = vec![0u32, 1, 0, 0, 1, 1, 0, 1, 1, 0, 0, 1, 1, 0, 0]; - - let result = RepLevelsIter::new(offsets).collect::>(); - assert_eq!(result, expected) - } - - #[test] - fn test_def_levels() { - // [[0, 1], None, [2, None, 3], [4, 5, 6], [], [7, 8, 9], None, [10]] - let offsets = [0, 2, 2, 5, 8, 8, 11, 11, 12].as_ref(); - let validity = [true, false, true, true, true, true, false, true] - .into_iter() - .map(|x| x as u32); - let primitive_validity = [ - true, true, //[0, 1] - true, false, true, //[2, None, 3] - true, true, true, //[4, 5, 6] - true, true, true, //[7, 8, 9] - true, //[10] - ] - .into_iter() - .map(|x| x as u32); - let expected = vec![3u32, 3, 0, 3, 2, 3, 3, 3, 3, 1, 3, 3, 3, 0, 3]; - - let result = DefLevelsIter::new(offsets, validity, primitive_validity).collect::>(); - assert_eq!(result, expected) - } - - #[test] - fn test_def_levels1() { - // [[0, 1], [], [2, 0, 3], [4, 5, 6], [], [7, 8, 9], [], [10]] - let offsets = [0, 2, 2, 5, 8, 8, 11, 11, 12].as_ref(); - let validity = std::iter::repeat(0); - let primitive_validity = std::iter::repeat(0); - let expected = vec![1u32, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 1]; - - let result = DefLevelsIter::new(offsets, validity, primitive_validity).collect::>(); - assert_eq!(result, expected) - } -} diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 94fa584d5b7..9ec248ac973 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -4,7 +4,9 @@ mod boolean; mod dictionary; mod file; mod fixed_len_bytes; -mod levels; +//mod levels; +mod nested; +mod pages; mod primitive; mod row_group; mod schema; @@ -13,15 +15,11 @@ mod utf8; mod utils; use crate::array::*; -use crate::bitmap::Bitmap; use crate::datatypes::*; use crate::error::{ArrowError, Result}; -use crate::io::parquet::read::schema::is_nullable; -use crate::io::parquet::write::levels::NestedInfo; use crate::types::days_ms; use crate::types::NativeType; -use parquet2::page::DataPage; use parquet2::schema::types::PrimitiveType as ParquetPrimitiveType; pub use parquet2::{ compression::CompressionOptions, @@ -29,7 +27,7 @@ pub use parquet2::{ fallible_streaming_iterator, metadata::{Descriptor, KeyValue, SchemaDescriptor}, page::{CompressedDataPage, CompressedPage, EncodedPage}, - schema::types::ParquetType, + schema::types::{FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType}, write::{compress, Compressor, DynIter, DynStreamingIterator, RowGroupIter, Version}, FallibleStreamingIterator, }; @@ -51,6 +49,9 @@ pub use row_group::{row_group_iter, RowGroupIterator}; pub use schema::to_parquet_type; pub use sink::FileSink; +pub use pages::array_to_columns; +pub use pages::Nested; + pub(self) fn decimal_length_from_precision(precision: usize) -> usize { // digits = floor(log_10(2^(8*n - 1) - 1)) // ceil(digits) = log10(2^(8*n - 1) - 1) @@ -91,7 +92,8 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool { /// Returns an iterator of [`EncodedPage`]. pub fn array_to_pages( array: &dyn Array, - type_: ParquetType, + type_: ParquetPrimitiveType, + nested: Vec, options: WriteOptions, encoding: Encoding, ) -> Result>> { @@ -106,8 +108,8 @@ pub fn array_to_pages( let right = array.slice(split_at, array.len() - split_at); Ok(DynIter::new( - array_to_pages(&*left, type_.clone(), options, encoding)? - .chain(array_to_pages(&*right, type_, options, encoding)?), + array_to_pages(&*left, type_.clone(), nested.clone(), options, encoding)? + .chain(array_to_pages(&*right, type_, nested, options, encoding)?), )) } else { match array.data_type() { @@ -115,33 +117,37 @@ pub fn array_to_pages( match_integer_type!(key_type, |$T| { dictionary::array_to_pages::<$T>( array.as_any().downcast_ref().unwrap(), - get_primitive(type_)?, + type_, options, encoding, ) }) } - _ => array_to_page(array, type_, options, encoding) + _ => array_to_page(array, type_, nested, options, encoding) .map(|page| DynIter::new(std::iter::once(Ok(page)))), } } } -fn get_primitive(type_: ParquetType) -> Result { - if let ParquetType::PrimitiveType(t) = type_ { - Ok(t) - } else { - Err(ArrowError::InvalidArgumentError(format!( - "The {:?} is not a primitive type but it is trying to describe a primitive array", - type_ - ))) +/// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`. +pub fn array_to_page( + array: &dyn Array, + type_: ParquetPrimitiveType, + nested: Vec, + options: WriteOptions, + encoding: Encoding, +) -> Result { + if nested.len() == 1 { + // special case where validity == def levels + return array_to_page_simple(array, type_, options, encoding); } + array_to_page_nested(array, type_, nested, options, encoding) } /// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`. -pub fn array_to_page( +pub fn array_to_page_simple( array: &dyn Array, - type_: ParquetType, + type_: ParquetPrimitiveType, options: WriteOptions, encoding: Encoding, ) -> Result { @@ -155,45 +161,44 @@ pub fn array_to_page( match data_type.to_logical_type() { DataType::Boolean => { - let type_ = get_primitive(type_)?; boolean::array_to_page(array.as_any().downcast_ref().unwrap(), options, type_) } // casts below MUST match the casts done at the metadata (field -> parquet type). DataType::UInt8 => primitive::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - get_primitive(type_)?, + type_, ), DataType::UInt16 => primitive::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - get_primitive(type_)?, + type_, ), DataType::UInt32 => primitive::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - get_primitive(type_)?, + type_, ), DataType::UInt64 => primitive::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - get_primitive(type_)?, + type_, ), DataType::Int8 => primitive::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - get_primitive(type_)?, + type_, ), DataType::Int16 => primitive::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - get_primitive(type_)?, + type_, ), DataType::Int32 | DataType::Date32 | DataType::Time32(_) => { primitive::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - get_primitive(type_)?, + type_, ) } DataType::Int64 @@ -203,48 +208,48 @@ pub fn array_to_page( | DataType::Duration(_) => primitive::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - get_primitive(type_)?, + type_, ), DataType::Float32 => primitive::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - get_primitive(type_)?, + type_, ), DataType::Float64 => primitive::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - get_primitive(type_)?, + type_, ), DataType::Utf8 => utf8::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - get_primitive(type_)?, + type_, encoding, ), DataType::LargeUtf8 => utf8::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - get_primitive(type_)?, + type_, encoding, ), DataType::Binary => binary::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - get_primitive(type_)?, + type_, encoding, ), DataType::LargeBinary => binary::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - get_primitive(type_)?, + type_, encoding, ), DataType::Null => { let array = Int32Array::new_null(DataType::Int32, array.len()); - primitive::array_to_page::(&array, options, get_primitive(type_)?) + primitive::array_to_page::(&array, options, type_) } DataType::Interval(IntervalUnit::YearMonth) => { - let type_ = get_primitive(type_)?; + let type_ = type_; let array = array .as_any() .downcast_ref::>() @@ -268,7 +273,7 @@ pub fn array_to_page( fixed_len_bytes::array_to_page(&array, options, type_, statistics) } DataType::Interval(IntervalUnit::DayTime) => { - let type_ = get_primitive(type_)?; + let type_ = type_; let array = array .as_any() .downcast_ref::>() @@ -292,7 +297,7 @@ pub fn array_to_page( fixed_len_bytes::array_to_page(&array, options, type_, statistics) } DataType::FixedSizeBinary(_) => { - let type_ = get_primitive(type_)?; + let type_ = type_; let array = array.as_any().downcast_ref().unwrap(); let statistics = if options.write_statistics { Some(fixed_len_bytes::build_statistics(array, type_.clone())) @@ -303,7 +308,7 @@ pub fn array_to_page( fixed_len_bytes::array_to_page(array, options, type_, statistics) } DataType::Decimal(precision, _) => { - let type_ = get_primitive(type_)?; + let type_ = type_; let precision = *precision; let array = array .as_any() @@ -355,141 +360,91 @@ pub fn array_to_page( fixed_len_bytes::array_to_page(&array, options, type_, statistics) } } - DataType::FixedSizeList(_, _) | DataType::List(_) | DataType::LargeList(_) => { - nested_array_to_page(array, type_, options) - } other => Err(ArrowError::NotYetImplemented(format!( - "Writing parquet V1 pages for data type {:?}", + "Writing parquet pages for data type {:?}", other ))), } .map(EncodedPage::Data) } -macro_rules! dyn_nested_prim { - ($from:ty, $to:ty, $offset:ty, $values:expr, $nested:expr,$descriptor:expr, $options:expr) => {{ - let values = $values.as_any().downcast_ref().unwrap(); - - primitive::nested_array_to_page::<$from, $to, $offset>( - values, - $options, - $descriptor, - $nested, - ) - }}; -} - -fn list_array_to_page( - offsets: &[O], - validity: Option<&Bitmap>, - values: &dyn Array, - type_: ParquetType, +fn array_to_page_nested( + array: &dyn Array, + type_: ParquetPrimitiveType, + nested: Vec, options: WriteOptions, -) -> Result { + _encoding: Encoding, +) -> Result { use DataType::*; - - let is_optional = is_nullable(type_.get_field_info()); - - let type_ = if let ParquetType::GroupType { mut fields, .. } = type_ { - let inner = fields.pop().unwrap(); - if let ParquetType::GroupType { mut fields, .. } = inner { - get_primitive(fields.pop().unwrap())? - } else { - return Err(ArrowError::InvalidArgumentError(format!( - "The {:?} is not a valid inner type of a list but it is trying to describe a list array", - inner - ))); + match array.data_type().to_logical_type() { + Null => { + let array = Int32Array::new_null(DataType::Int32, array.len()); + primitive::nested_array_to_page::(&array, options, type_, nested) } - } else { - return Err(ArrowError::InvalidArgumentError(format!( - "The {:?} is not a group type but it is trying to describe a list array", - type_ - ))); - }; - - let nested = NestedInfo::new(offsets, validity, is_optional); - - match values.data_type() { Boolean => { - let values = values.as_any().downcast_ref().unwrap(); - boolean::nested_array_to_page::(values, options, type_, nested) - } - UInt8 => dyn_nested_prim!(u8, i32, O, values, nested, type_, options), - UInt16 => dyn_nested_prim!(u16, i32, O, values, nested, type_, options), - UInt32 => dyn_nested_prim!(u32, i32, O, values, nested, type_, options), - UInt64 => dyn_nested_prim!(u64, i64, O, values, nested, type_, options), - - Int8 => dyn_nested_prim!(i8, i32, O, values, nested, type_, options), - Int16 => dyn_nested_prim!(i16, i32, O, values, nested, type_, options), - Int32 | Date32 | Time32(_) => { - dyn_nested_prim!(i32, i32, O, values, nested, type_, options) - } - Int64 | Date64 | Time64(_) | Timestamp(_, _) | Duration(_) => { - dyn_nested_prim!(i64, i64, O, values, nested, type_, options) + let array = array.as_any().downcast_ref().unwrap(); + boolean::nested_array_to_page(array, options, type_, nested) } - - Float32 => dyn_nested_prim!(f32, f32, O, values, nested, type_, options), - Float64 => dyn_nested_prim!(f64, f64, O, values, nested, type_, options), - Utf8 => { - let values = values.as_any().downcast_ref().unwrap(); - utf8::nested_array_to_page::(values, options, type_, nested) + let array = array.as_any().downcast_ref().unwrap(); + utf8::nested_array_to_page::(array, options, type_, nested) } LargeUtf8 => { - let values = values.as_any().downcast_ref().unwrap(); - utf8::nested_array_to_page::(values, options, type_, nested) + let array = array.as_any().downcast_ref().unwrap(); + utf8::nested_array_to_page::(array, options, type_, nested) } Binary => { - let values = values.as_any().downcast_ref().unwrap(); - binary::nested_array_to_page::(values, options, type_, nested) + let array = array.as_any().downcast_ref().unwrap(); + binary::nested_array_to_page::(array, options, type_, nested) } LargeBinary => { - let values = values.as_any().downcast_ref().unwrap(); - binary::nested_array_to_page::(values, options, type_, nested) + let array = array.as_any().downcast_ref().unwrap(); + binary::nested_array_to_page::(array, options, type_, nested) } - _ => todo!(), - } -} - -fn nested_array_to_page( - array: &dyn Array, - type_: ParquetType, - options: WriteOptions, -) -> Result { - match array.data_type() { - DataType::List(_) => { - let array = array.as_any().downcast_ref::>().unwrap(); - list_array_to_page( - array.offsets(), - array.validity(), - array.values().as_ref(), - type_, - options, - ) + UInt8 => { + let array = array.as_any().downcast_ref().unwrap(); + primitive::nested_array_to_page::(array, options, type_, nested) } - DataType::LargeList(_) => { - let array = array.as_any().downcast_ref::>().unwrap(); - list_array_to_page( - array.offsets(), - array.validity(), - array.values().as_ref(), - type_, - options, - ) + UInt16 => { + let array = array.as_any().downcast_ref().unwrap(); + primitive::nested_array_to_page::(array, options, type_, nested) } - DataType::FixedSizeList(_, size) => { - let array = array.as_any().downcast_ref::().unwrap(); - let offsets = (0..=array.len()) - .map(|x| (*size * x) as i32) - .collect::>(); - list_array_to_page( - &offsets, - array.validity(), - array.values().as_ref(), - type_, - options, - ) + UInt32 => { + let array = array.as_any().downcast_ref().unwrap(); + primitive::nested_array_to_page::(array, options, type_, nested) + } + UInt64 => { + let array = array.as_any().downcast_ref().unwrap(); + primitive::nested_array_to_page::(array, options, type_, nested) } - _ => todo!(), + Int8 => { + let array = array.as_any().downcast_ref().unwrap(); + primitive::nested_array_to_page::(array, options, type_, nested) + } + Int16 => { + let array = array.as_any().downcast_ref().unwrap(); + primitive::nested_array_to_page::(array, options, type_, nested) + } + Int32 | Date32 | Time32(_) => { + let array = array.as_any().downcast_ref().unwrap(); + primitive::nested_array_to_page::(array, options, type_, nested) + } + Int64 | Date64 | Time64(_) | Timestamp(_, _) | Duration(_) => { + let array = array.as_any().downcast_ref().unwrap(); + primitive::nested_array_to_page::(array, options, type_, nested) + } + Float32 => { + let array = array.as_any().downcast_ref().unwrap(); + primitive::nested_array_to_page::(array, options, type_, nested) + } + Float64 => { + let array = array.as_any().downcast_ref().unwrap(); + primitive::nested_array_to_page::(array, options, type_, nested) + } + other => Err(ArrowError::NotYetImplemented(format!( + "Writing nested parquet pages for data type {:?}", + other + ))), } + .map(EncodedPage::Data) } diff --git a/src/io/parquet/write/nested/def.rs b/src/io/parquet/write/nested/def.rs new file mode 100644 index 00000000000..ea62c1c14dc --- /dev/null +++ b/src/io/parquet/write/nested/def.rs @@ -0,0 +1,342 @@ +use crate::{array::Offset, bitmap::Bitmap}; + +use super::super::pages::{ListNested, Nested}; +use super::rep::num_values; +use super::to_length; + +trait DebugIter: Iterator + std::fmt::Debug {} + +impl + std::fmt::Debug> DebugIter for A {} + +fn single_iter<'a>( + validity: Option<&'a Bitmap>, + is_optional: bool, + length: usize, +) -> Box { + match (is_optional, validity) { + (false, _) => { + Box::new(std::iter::repeat((0u32, 1usize)).take(length)) as Box + } + (true, None) => { + Box::new(std::iter::repeat((1u32, 1usize)).take(length)) as Box + } + (true, Some(validity)) => { + Box::new(validity.iter().map(|v| (v as u32, 1usize)).take(length)) as Box + } + } +} + +fn single_list_iter<'a, O: Offset>(nested: &ListNested<'a, O>) -> Box { + match (nested.is_optional, nested.validity) { + (false, _) => { + Box::new(std::iter::repeat(1u32).zip(to_length(nested.offsets))) as Box + } + (true, None) => { + Box::new(std::iter::repeat(2u32).zip(to_length(nested.offsets))) as Box + } + (true, Some(validity)) => Box::new( + validity + .iter() + // lists have 2 groups, so + // True => 2 + // False => 1 + .map(|x| (x as u32) + 1) + .zip(to_length(nested.offsets)), + ) as Box, + } +} + +fn iter<'a>(nested: &'a [Nested]) -> Vec> { + nested + .iter() + .map(|nested| match nested { + Nested::Primitive(validity, is_optional, length) => { + single_iter(*validity, *is_optional, *length) + } + Nested::List(nested) => single_list_iter(nested), + Nested::LargeList(nested) => single_list_iter(nested), + Nested::Struct(validity, is_optional, length) => { + single_iter(*validity, *is_optional, *length) + } + }) + .collect() +} + +/// Iterator adapter of parquet / dremel definition levels +#[derive(Debug)] +pub struct DefLevelsIter<'a> { + // iterators of validities and lengths. E.g. [[[None,b,c], None], None] -> [[(true, 2), (false, 0)], [(true, 3), (false, 0)], [(false, 1), (true, 1), (true, 1)]] + iter: Vec>, + primitive_validity: Box, + // vector containing the remaining number of values of each iterator. + // e.g. the iters [[2, 2], [3, 4, 1, 2]] after the first iteration will return [2, 3], + // and remaining will be [2, 3]. + // on the second iteration, it will be `[2, 2]` (since iterations consume the last items) + remaining: Vec, /* < remaining.len() == iter.len() */ + validity: Vec, + // cache of the first `remaining` that is non-zero. Examples: + // * `remaining = [2, 2] => current_level = 2` + // * `remaining = [2, 0] => current_level = 1` + // * `remaining = [0, 0] => current_level = 0` + current_level: usize, /* < iter.len() */ + // the total definition level at any given point during the iteration + total: u32, /* < iter.len() */ + // the total number of items that this iterator will return + remaining_values: usize, +} + +impl<'a> DefLevelsIter<'a> { + pub fn new(nested: &'a [Nested]) -> Self { + let remaining_values = num_values(nested); + + let primitive_validity = iter(&nested[nested.len() - 1..]).pop().unwrap(); + + let iter = iter(&nested[..nested.len() - 1]); + let remaining = std::iter::repeat(0).take(iter.len()).collect(); + let validity = std::iter::repeat(0).take(iter.len()).collect(); + + Self { + iter, + primitive_validity, + remaining, + validity, + total: 0, + current_level: 0, + remaining_values, + } + } +} + +impl<'a> Iterator for DefLevelsIter<'a> { + type Item = u32; + + fn next(&mut self) -> Option { + if *self.remaining.last().unwrap() > 0 { + *self.remaining.last_mut().unwrap() -= 1; + + let primitive = self.primitive_validity.next()?.0 as u32; + let r = Some(self.total + primitive); + + for level in 0..self.current_level - 1 { + let level = self.remaining.len() - level - 1; + if self.remaining[level] == 0 { + self.current_level -= 1; + self.total -= self.validity[level]; + self.remaining[level.saturating_sub(1)] -= 1; + } + } + if self.remaining[0] == 0 { + self.current_level -= 1; + self.total -= self.validity[0] as u32; + } + self.remaining_values -= 1; + return r; + } + + for ((iter, remaining), validity) in self + .iter + .iter_mut() + .zip(self.remaining.iter_mut()) + .zip(self.validity.iter_mut()) + .skip(self.current_level) + { + let (is_valid, length): (u32, usize) = iter.next()?; + *validity = is_valid; + if length == 0 { + self.remaining_values -= 1; + return Some(self.total + is_valid / 2); + } + *remaining = length; + self.current_level += 1; + self.total += is_valid; + } + self.next() + } + + fn size_hint(&self) -> (usize, Option) { + let length = self.remaining_values; + (length, Some(length)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test(nested: Vec, expected: Vec) { + let mut iter = DefLevelsIter::new(&nested); + assert_eq!(iter.size_hint().0, expected.len()); + let result = iter.by_ref().collect::>(); + assert_eq!(result, expected); + assert_eq!(iter.size_hint().0, 0); + } + + #[test] + fn struct_optional() { + let b = Bitmap::from([ + true, false, true, true, false, true, false, false, true, true, + ]); + let nested = vec![ + Nested::Struct(None, true, 10), + Nested::Primitive(Some(&b), true, 10), + ]; + let expected = vec![2, 1, 2, 2, 1, 2, 1, 1, 2, 2]; + + test(nested, expected) + } + + #[test] + fn struct_optional_1() { + let b = Bitmap::from([ + true, false, true, true, false, true, false, false, true, true, + ]); + let nested = vec![ + Nested::Struct(None, true, 10), + Nested::Primitive(Some(&b), true, 10), + ]; + let expected = vec![2, 1, 2, 2, 1, 2, 1, 1, 2, 2]; + + test(nested, expected) + } + + #[test] + fn struct_optional_optional() { + let nested = vec![ + Nested::Struct(None, true, 10), + Nested::Primitive(None, true, 10), + ]; + let expected = vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2]; + + test(nested, expected) + } + + #[test] + fn l1_required_required() { + let nested = vec![ + // [[0, 1], [], [2, 0, 3], [4, 5, 6], [], [7, 8, 9], [], [10]] + Nested::List(ListNested:: { + is_optional: false, + offsets: &[0, 2, 2, 5, 8, 8, 11, 11, 12], + validity: None, + }), + Nested::Primitive(None, false, 12), + ]; + let expected = vec![1, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 1]; + + test(nested, expected) + } + + #[test] + fn l1_optional_optional() { + // [[0, 1], None, [2, None, 3], [4, 5, 6], [], [7, 8, 9], None, [10]] + + let v0 = Bitmap::from([true, false, true, true, true, true, false, true]); + let v1 = Bitmap::from([ + true, true, //[0, 1] + true, false, true, //[2, None, 3] + true, true, true, //[4, 5, 6] + true, true, true, //[7, 8, 9] + true, //[10] + ]); + let nested = vec![ + Nested::List(ListNested:: { + is_optional: true, + offsets: &[0, 2, 2, 5, 8, 8, 11, 11, 12], + validity: Some(&v0), + }), + Nested::Primitive(Some(&v1), true, 12), + ]; + let expected = vec![3u32, 3, 0, 3, 2, 3, 3, 3, 3, 1, 3, 3, 3, 0, 3]; + + test(nested, expected) + } + + #[test] + fn l2_required_required_required() { + let nested = vec![ + Nested::List(ListNested:: { + is_optional: false, + offsets: &[0, 2, 4], + validity: None, + }), + Nested::List(ListNested:: { + is_optional: false, + offsets: &[0, 3, 7, 8, 10], + validity: None, + }), + Nested::Primitive(None, false, 12), + ]; + let expected = vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2]; + + test(nested, expected) + } + + #[test] + fn l2_optional_required_required() { + let a = Bitmap::from([true, false, true, true]); + // e.g. [[[1,2,3], [4,5,6,7]], None, [[8], [], [9, 10]]] + let nested = vec![ + Nested::List(ListNested:: { + is_optional: true, + offsets: &[0, 2, 2, 2, 5], + validity: Some(&a), + }), + Nested::List(ListNested:: { + is_optional: false, + offsets: &[0, 3, 7, 8, 8, 10], + validity: None, + }), + Nested::Primitive(None, false, 12), + ]; + let expected = vec![3, 3, 3, 3, 3, 3, 3, 0, 1, 3, 2, 3, 3]; + + test(nested, expected) + } + + #[test] + fn l2_optional_optional_required() { + let a = Bitmap::from([true, false, true]); + let b = Bitmap::from([true, true, true, true, false]); + // e.g. [[[1,2,3], [4,5,6,7]], None, [[8], [], None]] + let nested = vec![ + Nested::List(ListNested:: { + is_optional: true, + offsets: &[0, 2, 2, 5], + validity: Some(&a), + }), + Nested::List(ListNested:: { + is_optional: true, + offsets: &[0, 3, 7, 8, 8, 8], + validity: Some(&b), + }), + Nested::Primitive(None, false, 12), + ]; + let expected = vec![4, 4, 4, 4, 4, 4, 4, 0, 4, 3, 2]; + + test(nested, expected) + } + + #[test] + fn l2_optional_optional_optional() { + let a = Bitmap::from([true, false, true]); + let b = Bitmap::from([true, true, true, false]); + let c = Bitmap::from([true, true, true, true, false, true, true, true]); + // e.g. [[[1,2,3], [4,None,6,7]], None, [[8], None]] + let nested = vec![ + Nested::List(ListNested:: { + is_optional: true, + offsets: &[0, 2, 2, 4], + validity: Some(&a), + }), + Nested::List(ListNested:: { + is_optional: true, + offsets: &[0, 3, 7, 8, 8], + validity: Some(&b), + }), + Nested::Primitive(Some(&c), true, 12), + ]; + let expected = vec![5, 5, 5, 5, 4, 5, 5, 0, 5, 2]; + + test(nested, expected) + } +} diff --git a/src/io/parquet/write/nested/mod.rs b/src/io/parquet/write/nested/mod.rs new file mode 100644 index 00000000000..39a3f4eddf5 --- /dev/null +++ b/src/io/parquet/write/nested/mod.rs @@ -0,0 +1,122 @@ +mod def; +mod rep; + +use parquet2::{encoding::hybrid_rle::encode_u32, read::levels::get_bit_width, write::Version}; + +use crate::{array::Offset, error::Result}; + +use super::Nested; + +pub use rep::num_values; + +fn write_levels_v1) -> Result<()>>( + buffer: &mut Vec, + encode: F, +) -> Result<()> { + buffer.extend_from_slice(&[0; 4]); + let start = buffer.len(); + + encode(buffer)?; + + let end = buffer.len(); + let length = end - start; + + // write the first 4 bytes as length + let length = (length as i32).to_le_bytes(); + (0..4).for_each(|i| buffer[start - 4 + i] = length[i]); + Ok(()) +} + +/// writes the rep levels to a `Vec`. +fn write_rep_levels(buffer: &mut Vec, nested: &[Nested], version: Version) -> Result<()> { + let max_level = max_rep_level(nested) as i16; + if max_level == 0 { + return Ok(()); + } + let num_bits = get_bit_width(max_level) as u8; + + let levels = rep::RepLevelsIter::new(nested); + + let mut buffer1 = vec![]; + encode_u32(&mut buffer1, rep::RepLevelsIter::new(nested), num_bits).unwrap(); + + match version { + Version::V1 => { + write_levels_v1(buffer, |buffer: &mut Vec| { + encode_u32(buffer, levels, num_bits)?; + Ok(()) + })?; + } + Version::V2 => { + encode_u32(buffer, levels, num_bits)?; + } + } + + Ok(()) +} + +/// writes the rep levels to a `Vec`. +fn write_def_levels(buffer: &mut Vec, nested: &[Nested], version: Version) -> Result<()> { + let max_level = max_def_level(nested) as i16; + if max_level == 0 { + return Ok(()); + } + let num_bits = get_bit_width(max_level) as u8; + + let levels = def::DefLevelsIter::new(nested); + + let mut buffer1 = vec![]; + encode_u32(&mut buffer1, def::DefLevelsIter::new(nested), num_bits).unwrap(); + + match version { + Version::V1 => write_levels_v1(buffer, move |buffer: &mut Vec| { + encode_u32(buffer, levels, num_bits)?; + Ok(()) + }), + Version::V2 => Ok(encode_u32(buffer, levels, num_bits)?), + } +} + +fn max_def_level(nested: &[Nested]) -> usize { + nested + .iter() + .map(|nested| match nested { + Nested::Primitive(_, is_optional, _) => *is_optional as usize, + Nested::List(nested) => 1 + (nested.is_optional as usize), + Nested::LargeList(nested) => 1 + (nested.is_optional as usize), + Nested::Struct(_, is_optional, _) => *is_optional as usize, + }) + .sum() +} + +fn max_rep_level(nested: &[Nested]) -> usize { + nested + .iter() + .map(|nested| match nested { + Nested::LargeList(_) | Nested::List(_) => 1, + Nested::Primitive(_, _, _) | Nested::Struct(_, _, _) => 0, + }) + .sum() +} + +fn to_length( + offsets: &[O], +) -> impl Iterator + std::fmt::Debug + Clone + '_ { + offsets + .windows(2) + .map(|w| w[1].to_usize() - w[0].to_usize()) +} + +pub fn write_rep_and_def( + page_version: Version, + nested: &[Nested], + buffer: &mut Vec, +) -> Result<(usize, usize)> { + write_rep_levels(buffer, nested, page_version)?; + let repetition_levels_byte_length = buffer.len(); + + write_def_levels(buffer, nested, page_version)?; + let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length; + + Ok((repetition_levels_byte_length, definition_levels_byte_length)) +} diff --git a/src/io/parquet/write/nested/rep.rs b/src/io/parquet/write/nested/rep.rs new file mode 100644 index 00000000000..4281a6301e1 --- /dev/null +++ b/src/io/parquet/write/nested/rep.rs @@ -0,0 +1,205 @@ +use super::super::pages::Nested; +use super::to_length; + +trait DebugIter: Iterator + std::fmt::Debug {} + +impl + std::fmt::Debug> DebugIter for A {} + +fn iter<'a>(nested: &'a [Nested]) -> Vec> { + nested + .iter() + .filter_map(|nested| match nested { + Nested::Primitive(_, _, _) => None, + Nested::List(nested) => Some(Box::new(to_length(nested.offsets)) as Box), + Nested::LargeList(nested) => { + Some(Box::new(to_length(nested.offsets)) as Box) + } + Nested::Struct(_, _, length) => { + Some(Box::new(std::iter::repeat(0usize).take(*length)) as Box) + } + }) + .collect() +} + +pub fn num_values(nested: &[Nested]) -> usize { + let iterators = iter(nested); + let depth = iterators.len(); + + iterators + .into_iter() + .enumerate() + .map(|(index, lengths)| { + if index == depth - 1 { + lengths + .map(|length| if length == 0 { 1 } else { length }) + .sum::() + } else { + lengths + .map(|length| if length == 0 { 1 } else { 0 }) + .sum::() + } + }) + .sum() +} + +/// Iterator adapter of parquet / dremel repetition levels +#[derive(Debug)] +pub struct RepLevelsIter<'a> { + // iterators of lengths. E.g. [[[a,b,c], [d,e,f,g]], [[h], [i,j]]] -> [[2, 2], [3, 4, 1, 2]] + iter: Vec>, + // vector containing the remaining number of values of each iterator. + // e.g. the iters [[2, 2], [3, 4, 1, 2]] after the first iteration will return [2, 3], + // and remaining will be [2, 3]. + // on the second iteration, it will be `[2, 2]` (since iterations consume the last items) + remaining: Vec, /* < remaining.len() == iter.len() */ + // cache of the first `remaining` that is non-zero. Examples: + // * `remaining = [2, 2] => current_level = 2` + // * `remaining = [2, 0] => current_level = 1` + // * `remaining = [0, 0] => current_level = 0` + current_level: usize, /* < iter.len() */ + // the number to discount due to being the first element of the iterators. + total: usize, /* < iter.len() */ + + // the total number of items that this iterator will return + remaining_values: usize, +} + +impl<'a> RepLevelsIter<'a> { + pub fn new(nested: &'a [Nested]) -> Self { + let remaining_values = num_values(nested); + + let iter = iter(nested); + let remaining = std::iter::repeat(0).take(iter.len()).collect(); + + Self { + iter, + remaining, + total: 0, + current_level: 0, + remaining_values, + } + } +} + +impl<'a> Iterator for RepLevelsIter<'a> { + type Item = u32; + + fn next(&mut self) -> Option { + if *self.remaining.last().unwrap() > 0 { + *self.remaining.last_mut().unwrap() -= 1; + + let total = self.total; + self.total = 0; + let r = Some((self.current_level - total) as u32); + + for level in 0..self.current_level - 1 { + let level = self.remaining.len() - level - 1; + if self.remaining[level] == 0 { + self.current_level -= 1; + self.remaining[level.saturating_sub(1)] -= 1; + } + } + if self.remaining[0] == 0 { + self.current_level -= 1; + } + self.remaining_values -= 1; + return r; + } + + self.total = 0; + for (iter, remaining) in self + .iter + .iter_mut() + .zip(self.remaining.iter_mut()) + .skip(self.current_level) + { + let length: usize = iter.next()?; + if length == 0 { + self.remaining_values -= 1; + return Some(self.current_level as u32); + } + *remaining = length; + self.current_level += 1; + self.total += 1; + } + self.next() + } + + fn size_hint(&self) -> (usize, Option) { + let length = self.remaining_values; + (length, Some(length)) + } +} + +#[cfg(test)] +mod tests { + use super::super::super::pages::ListNested; + + use super::*; + + fn test(nested: Vec, expected: Vec) { + let mut iter = RepLevelsIter::new(&nested); + assert_eq!(iter.size_hint().0, expected.len()); + let result = iter.by_ref().collect::>(); + assert_eq!(result, expected); + assert_eq!(iter.size_hint().0, 0); + } + + #[test] + fn struct_required() { + let nested = vec![ + Nested::Struct(None, false, 10), + Nested::Primitive(None, true, 10), + ]; + let expected = vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; + + test(nested, expected) + } + + #[test] + fn struct_optional() { + let nested = vec![ + Nested::Struct(None, true, 10), + Nested::Primitive(None, true, 10), + ]; + let expected = vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; + + test(nested, expected) + } + + #[test] + fn l1() { + let nested = vec![ + Nested::List(ListNested:: { + is_optional: false, + offsets: &[0, 2, 2, 5, 8, 8, 11, 11, 12], + validity: None, + }), + Nested::Primitive(None, false, 12), + ]; + + let expected = vec![0u32, 1, 0, 0, 1, 1, 0, 1, 1, 0, 0, 1, 1, 0, 0]; + + test(nested, expected) + } + + #[test] + fn l2() { + let nested = vec![ + Nested::List(ListNested:: { + is_optional: false, + offsets: &[0, 2, 2, 4], + validity: None, + }), + Nested::List(ListNested:: { + is_optional: false, + offsets: &[0, 3, 7, 8, 10], + validity: None, + }), + Nested::Primitive(None, false, 10), + ]; + let expected = vec![0, 2, 2, 1, 2, 2, 2, 0, 0, 1, 2]; + + test(nested, expected) + } +} diff --git a/src/io/parquet/write/pages.rs b/src/io/parquet/write/pages.rs new file mode 100644 index 00000000000..3c38d657e39 --- /dev/null +++ b/src/io/parquet/write/pages.rs @@ -0,0 +1,509 @@ +use parquet2::schema::types::{ParquetType, PrimitiveType as ParquetPrimitiveType}; +use parquet2::{page::EncodedPage, write::DynIter}; + +use crate::array::{ListArray, Offset, StructArray}; +use crate::bitmap::Bitmap; +use crate::datatypes::PhysicalType; +use crate::io::parquet::read::schema::is_nullable; +use crate::{ + array::Array, + error::{ArrowError, Result}, +}; + +use super::{array_to_pages, Encoding, WriteOptions}; + +#[derive(Debug, Clone, PartialEq)] +pub struct ListNested<'a, O: Offset> { + pub is_optional: bool, + pub offsets: &'a [O], + pub validity: Option<&'a Bitmap>, +} + +impl<'a, O: Offset> ListNested<'a, O> { + pub fn new(offsets: &'a [O], validity: Option<&'a Bitmap>, is_optional: bool) -> Self { + Self { + is_optional, + offsets, + validity, + } + } +} + +/// Descriptor of nested information of a field +#[derive(Debug, Clone, PartialEq)] +pub enum Nested<'a> { + /// a primitive (leaf or parquet column) + Primitive(Option<&'a Bitmap>, bool, usize), + /// a list + List(ListNested<'a, i32>), + /// a list + LargeList(ListNested<'a, i64>), + /// a struct + Struct(Option<&'a Bitmap>, bool, usize), +} + +impl Nested<'_> { + /// Returns the length (number of rows) of the element + pub fn len(&self) -> usize { + match self { + Nested::Primitive(_, _, length) => *length, + Nested::List(nested) => nested.offsets.len() - 1, + Nested::LargeList(nested) => nested.offsets.len() - 1, + Nested::Struct(_, _, len) => *len, + } + } +} + +/// Constructs the necessary `Vec>` to write the rep and def levels of `array` to parquet +pub fn to_nested<'a>(array: &'a dyn Array, type_: &ParquetType) -> Result>>> { + let mut nested = vec![]; + + to_nested_recursive(array, type_, &mut nested, vec![])?; + Ok(nested) +} + +fn to_nested_recursive<'a>( + array: &'a dyn Array, + type_: &ParquetType, + nested: &mut Vec>>, + mut parents: Vec>, +) -> Result<()> { + let is_optional = is_nullable(type_.get_field_info()); + + use PhysicalType::*; + match array.data_type().to_physical_type() { + Struct => { + let array = array.as_any().downcast_ref::().unwrap(); + let fields = if let ParquetType::GroupType { fields, .. } = type_ { + fields + } else { + return Err(ArrowError::InvalidArgumentError( + "Parquet type must be a group for a struct array".to_string(), + )); + }; + + parents.push(Nested::Struct(array.validity(), is_optional, array.len())); + + for (type_, array) in fields.iter().zip(array.values()) { + to_nested_recursive(array.as_ref(), type_, nested, parents.clone())?; + } + } + List => { + let array = array.as_any().downcast_ref::>().unwrap(); + let type_ = if let ParquetType::GroupType { fields, .. } = type_ { + if let ParquetType::GroupType { fields, .. } = &fields[0] { + &fields[0] + } else { + return Err(ArrowError::InvalidArgumentError( + "Parquet type must be a group for a list array".to_string(), + )); + } + } else { + return Err(ArrowError::InvalidArgumentError( + "Parquet type must be a group for a list array".to_string(), + )); + }; + + parents.push(Nested::List(ListNested::new( + array.offsets(), + array.validity(), + is_optional, + ))); + to_nested_recursive(array.values().as_ref(), type_, nested, parents)?; + } + LargeList => { + let array = array.as_any().downcast_ref::>().unwrap(); + let type_ = if let ParquetType::GroupType { fields, .. } = type_ { + if let ParquetType::GroupType { fields, .. } = &fields[0] { + &fields[0] + } else { + return Err(ArrowError::InvalidArgumentError( + "Parquet type must be a group for a list array".to_string(), + )); + } + } else { + return Err(ArrowError::InvalidArgumentError( + "Parquet type must be a group for a list array".to_string(), + )); + }; + + parents.push(Nested::LargeList(ListNested::new( + array.offsets(), + array.validity(), + is_optional, + ))); + to_nested_recursive(array.values().as_ref(), type_, nested, parents)?; + } + _ => { + parents.push(Nested::Primitive( + array.validity(), + is_optional, + array.len(), + )); + nested.push(parents) + } + } + Ok(()) +} + +fn to_leafs(array: &dyn Array) -> Vec<&dyn Array> { + let mut leafs = vec![]; + to_leafs_recursive(array, &mut leafs); + leafs +} + +fn to_leafs_recursive<'a>(array: &'a dyn Array, leafs: &mut Vec<&'a dyn Array>) { + use PhysicalType::*; + match array.data_type().to_physical_type() { + Struct => { + let array = array.as_any().downcast_ref::().unwrap(); + array + .values() + .iter() + .for_each(|a| to_leafs_recursive(a.as_ref(), leafs)); + } + List => { + let array = array.as_any().downcast_ref::>().unwrap(); + to_leafs_recursive(array.values().as_ref(), leafs); + } + LargeList => { + let array = array.as_any().downcast_ref::>().unwrap(); + to_leafs_recursive(array.values().as_ref(), leafs); + } + Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8 + | LargeUtf8 | Dictionary(_) => leafs.push(array), + _ => todo!(), + } +} + +fn to_parquet_leafs(type_: ParquetType) -> Vec { + let mut leafs = vec![]; + to_parquet_leafs_recursive(type_, &mut leafs); + leafs +} + +fn to_parquet_leafs_recursive(type_: ParquetType, leafs: &mut Vec) { + match type_ { + ParquetType::PrimitiveType(primitive) => leafs.push(primitive), + ParquetType::GroupType { fields, .. } => { + fields + .into_iter() + .for_each(|type_| to_parquet_leafs_recursive(type_, leafs)); + } + } +} + +/// Returns a vector of iterators of [`EncodedPage`], one per leaf column in the array +pub fn array_to_columns + 'static + Send + Sync>( + array: A, + type_: ParquetType, + options: WriteOptions, + encoding: Vec, +) -> Result>>> { + let array = array.as_ref(); + let nested = to_nested(array, &type_)?; + + let types = to_parquet_leafs(type_); + + let values = to_leafs(array); + + assert_eq!(encoding.len(), types.len()); + + values + .iter() + .zip(nested.into_iter()) + .zip(types.into_iter()) + .zip(encoding.into_iter()) + .map(|(((values, nested), type_), encoding)| { + array_to_pages(*values, type_, nested, options, encoding) + }) + .collect() +} + +#[cfg(test)] +mod tests { + use parquet2::schema::Repetition; + + use super::*; + + use crate::array::*; + use crate::bitmap::Bitmap; + use crate::datatypes::*; + + use super::super::{FieldInfo, ParquetPhysicalType, ParquetPrimitiveType}; + + #[test] + fn test_struct() { + use std::sync::Arc; + let boolean = + Arc::new(BooleanArray::from_slice(&[false, false, true, true])) as Arc; + let int = Arc::new(Int32Array::from_slice(&[42, 28, 19, 31])) as Arc; + + let fields = vec![ + Field::new("b", DataType::Boolean, false), + Field::new("c", DataType::Int32, false), + ]; + + let array = StructArray::from_data( + DataType::Struct(fields), + vec![boolean.clone(), int.clone()], + Some(Bitmap::from([true, true, false, true])), + ); + + let type_ = ParquetType::GroupType { + field_info: FieldInfo { + name: "a".to_string(), + repetition: Repetition::Optional, + id: None, + }, + logical_type: None, + converted_type: None, + fields: vec![ + ParquetType::PrimitiveType(ParquetPrimitiveType { + field_info: FieldInfo { + name: "b".to_string(), + repetition: Repetition::Required, + id: None, + }, + logical_type: None, + converted_type: None, + physical_type: ParquetPhysicalType::Boolean, + }), + ParquetType::PrimitiveType(ParquetPrimitiveType { + field_info: FieldInfo { + name: "c".to_string(), + repetition: Repetition::Required, + id: None, + }, + logical_type: None, + converted_type: None, + physical_type: ParquetPhysicalType::Int32, + }), + ], + }; + let a = to_nested(&array, &type_).unwrap(); + + assert_eq!( + a, + vec![ + vec![ + Nested::Struct(Some(&Bitmap::from([true, true, false, true])), true, 4), + Nested::Primitive(None, false, 4), + ], + vec![ + Nested::Struct(Some(&Bitmap::from([true, true, false, true])), true, 4), + Nested::Primitive(None, false, 4), + ], + ] + ); + } + + #[test] + fn test_struct_struct() { + use std::sync::Arc; + let boolean = + Arc::new(BooleanArray::from_slice(&[false, false, true, true])) as Arc; + let int = Arc::new(Int32Array::from_slice(&[42, 28, 19, 31])) as Arc; + + let fields = vec![ + Field::new("b", DataType::Boolean, false), + Field::new("c", DataType::Int32, false), + ]; + + let array = StructArray::from_data( + DataType::Struct(fields), + vec![boolean.clone(), int.clone()], + Some(Bitmap::from([true, true, false, true])), + ); + + let fields = vec![ + Field::new("b", array.data_type().clone(), true), + Field::new("c", array.data_type().clone(), true), + ]; + + let array = StructArray::from_data( + DataType::Struct(fields), + vec![Arc::new(array.clone()), Arc::new(array)], + None, + ); + + let type_ = ParquetType::GroupType { + field_info: FieldInfo { + name: "a".to_string(), + repetition: Repetition::Optional, + id: None, + }, + logical_type: None, + converted_type: None, + fields: vec![ + ParquetType::PrimitiveType(ParquetPrimitiveType { + field_info: FieldInfo { + name: "b".to_string(), + repetition: Repetition::Required, + id: None, + }, + logical_type: None, + converted_type: None, + physical_type: ParquetPhysicalType::Boolean, + }), + ParquetType::PrimitiveType(ParquetPrimitiveType { + field_info: FieldInfo { + name: "c".to_string(), + repetition: Repetition::Required, + id: None, + }, + logical_type: None, + converted_type: None, + physical_type: ParquetPhysicalType::Int32, + }), + ], + }; + + let type_ = ParquetType::GroupType { + field_info: FieldInfo { + name: "a".to_string(), + repetition: Repetition::Required, + id: None, + }, + logical_type: None, + converted_type: None, + fields: vec![type_.clone(), type_], + }; + + let a = to_nested(&array, &type_).unwrap(); + + assert_eq!( + a, + vec![ + // a.b.b + vec![ + Nested::Struct(None, false, 4), + Nested::Struct(Some(&Bitmap::from([true, true, false, true])), true, 4), + Nested::Primitive(None, false, 4), + ], + // a.b.c + vec![ + Nested::Struct(None, false, 4), + Nested::Struct(Some(&Bitmap::from([true, true, false, true])), true, 4), + Nested::Primitive(None, false, 4), + ], + // a.c.b + vec![ + Nested::Struct(None, false, 4), + Nested::Struct(Some(&Bitmap::from([true, true, false, true])), true, 4), + Nested::Primitive(None, false, 4), + ], + // a.c.c + vec![ + Nested::Struct(None, false, 4), + Nested::Struct(Some(&Bitmap::from([true, true, false, true])), true, 4), + Nested::Primitive(None, false, 4), + ], + ] + ); + } + + #[test] + fn test_list_struct() { + use std::sync::Arc; + let boolean = + Arc::new(BooleanArray::from_slice(&[false, false, true, true])) as Arc; + let int = Arc::new(Int32Array::from_slice(&[42, 28, 19, 31])) as Arc; + + let fields = vec![ + Field::new("b", DataType::Boolean, false), + Field::new("c", DataType::Int32, false), + ]; + + let array = StructArray::from_data( + DataType::Struct(fields), + vec![boolean.clone(), int.clone()], + Some(Bitmap::from([true, true, false, true])), + ); + + let array = ListArray::new( + DataType::List(Box::new(Field::new("l", array.data_type().clone(), true))), + vec![0i32, 2, 4].into(), + Arc::new(array), + None, + ); + + let type_ = ParquetType::GroupType { + field_info: FieldInfo { + name: "a".to_string(), + repetition: Repetition::Optional, + id: None, + }, + logical_type: None, + converted_type: None, + fields: vec![ + ParquetType::PrimitiveType(ParquetPrimitiveType { + field_info: FieldInfo { + name: "b".to_string(), + repetition: Repetition::Required, + id: None, + }, + logical_type: None, + converted_type: None, + physical_type: ParquetPhysicalType::Boolean, + }), + ParquetType::PrimitiveType(ParquetPrimitiveType { + field_info: FieldInfo { + name: "c".to_string(), + repetition: Repetition::Required, + id: None, + }, + logical_type: None, + converted_type: None, + physical_type: ParquetPhysicalType::Int32, + }), + ], + }; + + let type_ = ParquetType::GroupType { + field_info: FieldInfo { + name: "l".to_string(), + repetition: Repetition::Required, + id: None, + }, + logical_type: None, + converted_type: None, + fields: vec![ParquetType::GroupType { + field_info: FieldInfo { + name: "list".to_string(), + repetition: Repetition::Repeated, + id: None, + }, + logical_type: None, + converted_type: None, + fields: vec![type_], + }], + }; + + let a = to_nested(&array, &type_).unwrap(); + + assert_eq!( + a, + vec![ + vec![ + Nested::List(ListNested:: { + is_optional: false, + offsets: &[0, 2, 4], + validity: None, + }), + Nested::Struct(Some(&Bitmap::from([true, true, false, true])), true, 4), + Nested::Primitive(None, false, 4), + ], + vec![ + Nested::List(ListNested:: { + is_optional: false, + offsets: &[0, 2, 4], + validity: None, + }), + Nested::Struct(Some(&Bitmap::from([true, true, false, true])), true, 4), + Nested::Primitive(None, false, 4), + ], + ] + ); + } +} diff --git a/src/io/parquet/write/primitive/nested.rs b/src/io/parquet/write/primitive/nested.rs index 19871fa3f41..9bd13184ca4 100644 --- a/src/io/parquet/write/primitive/nested.rs +++ b/src/io/parquet/write/primitive/nested.rs @@ -2,39 +2,34 @@ use parquet2::schema::types::PrimitiveType; use parquet2::statistics::serialize_statistics; use parquet2::{encoding::Encoding, page::DataPage, types::NativeType}; -use super::super::levels; +use super::super::nested; use super::super::utils; use super::super::WriteOptions; use super::basic::{build_statistics, encode_plain}; use crate::io::parquet::read::schema::is_nullable; +use crate::io::parquet::write::Nested; use crate::{ - array::{Array, Offset, PrimitiveArray}, + array::{Array, PrimitiveArray}, error::Result, types::NativeType as ArrowNativeType, }; -pub fn array_to_page( +pub fn array_to_page( array: &PrimitiveArray, options: WriteOptions, type_: PrimitiveType, - nested: levels::NestedInfo, + nested: Vec, ) -> Result where T: ArrowNativeType, R: NativeType, T: num_traits::AsPrimitive, - O: Offset, { let is_optional = is_nullable(&type_.field_info); - let validity = array.validity(); - let mut buffer = vec![]; - levels::write_rep_levels(&mut buffer, &nested, options.version)?; - let repetition_levels_byte_length = buffer.len(); - - levels::write_def_levels(&mut buffer, &nested, validity, is_optional, options.version)?; - let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length; + let (repetition_levels_byte_length, definition_levels_byte_length) = + nested::write_rep_and_def(options.version, &nested, &mut buffer)?; encode_plain(array, is_optional, &mut buffer); @@ -49,8 +44,8 @@ where utils::build_plain_page( buffer, - levels::num_values(nested.offsets()), - nested.offsets().len().saturating_sub(1), + nested::num_values(&nested), + nested[0].len(), array.null_count(), repetition_levels_byte_length, definition_levels_byte_length, diff --git a/src/io/parquet/write/row_group.rs b/src/io/parquet/write/row_group.rs index 06c57562319..d9e25c6c14f 100644 --- a/src/io/parquet/write/row_group.rs +++ b/src/io/parquet/write/row_group.rs @@ -1,3 +1,4 @@ +use parquet2::error::Error as ParquetError; use parquet2::schema::types::ParquetType; use parquet2::write::Compressor; use parquet2::FallibleStreamingIterator; @@ -10,7 +11,7 @@ use crate::{ }; use super::{ - array_to_pages, to_parquet_schema, DynIter, DynStreamingIterator, Encoding, RowGroupIter, + array_to_columns, to_parquet_schema, DynIter, DynStreamingIterator, Encoding, RowGroupIter, SchemaDescriptor, WriteOptions, }; @@ -18,7 +19,7 @@ use super::{ /// write to parquet pub fn row_group_iter + 'static + Send + Sync>( chunk: Chunk, - encodings: Vec, + encodings: Vec>, fields: Vec, options: WriteOptions, ) -> RowGroupIter<'static, ArrowError> { @@ -28,14 +29,24 @@ pub fn row_group_iter + 'static + Send + Sync>( .into_iter() .zip(fields.into_iter()) .zip(encodings.into_iter()) - .map(move |((array, type_), encoding)| { - array_to_pages(array.as_ref(), type_, options, encoding).map(move |pages| { - let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); - let compressed_pages = - Compressor::new(encoded_pages, options.compression, vec![]) + .flat_map(move |((array, type_), encoding)| { + let encoded_columns = array_to_columns(array, type_, options, encoding).unwrap(); + encoded_columns + .into_iter() + .map(|encoded_pages| { + let pages = encoded_pages; + + let pages = DynIter::new( + pages + .into_iter() + .map(|x| x.map_err(|e| ParquetError::General(e.to_string()))), + ); + + let compressed_pages = Compressor::new(pages, options.compression, vec![]) .map_err(ArrowError::from); - DynStreamingIterator::new(compressed_pages) - }) + Ok(DynStreamingIterator::new(compressed_pages)) + }) + .collect::>() }), ) } @@ -47,7 +58,7 @@ pub struct RowGroupIterator + 'static, I: Iterator, + encodings: Vec>, } impl + 'static, I: Iterator>>> RowGroupIterator { @@ -56,10 +67,8 @@ impl + 'static, I: Iterator>>> RowGro iter: I, schema: &Schema, options: WriteOptions, - encodings: Vec, + encodings: Vec>, ) -> Result { - assert_eq!(schema.fields.len(), encodings.len()); - let parquet_schema = to_parquet_schema(schema)?; Ok(Self { diff --git a/src/io/parquet/write/sink.rs b/src/io/parquet/write/sink.rs index 123be726e40..de4fa11da26 100644 --- a/src/io/parquet/write/sink.rs +++ b/src/io/parquet/write/sink.rs @@ -30,7 +30,7 @@ use super::{Encoding, SchemaDescriptor, WriteOptions}; /// let schema = Schema::from(vec![ /// Field::new("values", DataType::Int32, true), /// ]); -/// let encoding = vec![Encoding::Plain]; +/// let encoding = vec![vec![Encoding::Plain]]; /// let options = WriteOptions { /// write_statistics: true, /// compression: CompressionOptions::Uncompressed, @@ -59,7 +59,7 @@ pub struct FileSink<'a, W: AsyncWrite + Send + Unpin> { writer: Option>, task: Option>, ArrowError>>>, options: WriteOptions, - encoding: Vec, + encoding: Vec>, schema: Schema, parquet_schema: SchemaDescriptor, /// Key-value metadata that will be written to the file on close. @@ -77,7 +77,7 @@ where pub fn try_new( writer: W, schema: Schema, - encoding: Vec, + encoding: Vec>, options: WriteOptions, ) -> Result { let parquet_schema = crate::io::parquet::write::to_parquet_schema(&schema)?; diff --git a/src/io/parquet/write/utf8/nested.rs b/src/io/parquet/write/utf8/nested.rs index 7637978d272..7a6e0ee05be 100644 --- a/src/io/parquet/write/utf8/nested.rs +++ b/src/io/parquet/write/utf8/nested.rs @@ -1,34 +1,29 @@ use parquet2::schema::types::PrimitiveType; use parquet2::{encoding::Encoding, page::DataPage}; -use super::super::{levels, utils, WriteOptions}; +use super::super::{nested, utils, WriteOptions}; use super::basic::{build_statistics, encode_plain}; use crate::io::parquet::read::schema::is_nullable; +use crate::io::parquet::write::Nested; use crate::{ array::{Array, Offset, Utf8Array}, error::Result, }; -pub fn array_to_page( +pub fn array_to_page( array: &Utf8Array, options: WriteOptions, type_: PrimitiveType, - nested: levels::NestedInfo, + nested: Vec, ) -> Result where - OO: Offset, O: Offset, { let is_optional = is_nullable(&type_.field_info); - let validity = array.validity(); - let mut buffer = vec![]; - levels::write_rep_levels(&mut buffer, &nested, options.version)?; - let repetition_levels_byte_length = buffer.len(); - - levels::write_def_levels(&mut buffer, &nested, validity, is_optional, options.version)?; - let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length; + let (repetition_levels_byte_length, definition_levels_byte_length) = + nested::write_rep_and_def(options.version, &nested, &mut buffer)?; encode_plain(array, is_optional, &mut buffer); @@ -40,8 +35,8 @@ where utils::build_plain_page( buffer, - levels::num_values(nested.offsets()), - nested.offsets().len().saturating_sub(1), + nested::num_values(&nested), + nested[0].len(), array.null_count(), repetition_levels_byte_length, definition_levels_byte_length, diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index e7786a10ec6..2da19374ae4 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -800,11 +800,11 @@ fn integration_write(schema: &Schema, batches: &[Chunk>]) -> Resu .fields .iter() .map(|x| { - if let DataType::Dictionary(..) = x.data_type() { + vec![if let DataType::Dictionary(..) = x.data_type() { Encoding::RleDictionary } else { Encoding::Plain - } + }] }) .collect(); diff --git a/tests/it/io/parquet/read_indexes.rs b/tests/it/io/parquet/read_indexes.rs index 8dac67ae03e..9053b956a1e 100644 --- a/tests/it/io/parquet/read_indexes.rs +++ b/tests/it/io/parquet/read_indexes.rs @@ -38,7 +38,11 @@ fn pages( .map(|array| { array_to_page( &array, - parquet_schema.fields()[0].clone(), + parquet_schema.columns()[0] + .descriptor + .primitive_type + .clone(), + vec![Nested::Primitive(None, true, array.len())], options, Encoding::Plain, ) @@ -50,7 +54,11 @@ fn pages( .flat_map(|array| { array_to_pages( *array, - parquet_schema.fields()[1].clone(), + parquet_schema.columns()[1] + .descriptor + .primitive_type + .clone(), + vec![Nested::Primitive(None, true, array.len())], options, encoding, ) diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index e412ed593ae..2c41aac5fa2 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -7,31 +7,30 @@ use super::*; fn round_trip( column: &str, - nullable: bool, - nested: bool, + file: &str, version: Version, compression: CompressionOptions, - encoding: Encoding, + encodings: Vec, ) -> Result<()> { - let (array, statistics) = if nested { - ( + let (array, statistics) = match file { + "nested" => ( pyarrow_nested_nullable(column), pyarrow_nested_nullable_statistics(column), - ) - } else if nullable { - ( + ), + "nullable" => ( pyarrow_nullable(column), pyarrow_nullable_statistics(column), - ) - } else { - ( + ), + "required" => ( pyarrow_required(column), pyarrow_required_statistics(column), - ) + ), + "struct" => (pyarrow_struct(column), pyarrow_struct_statistics(column)), + _ => unreachable!(), }; let array: Arc = array.into(); - let field = Field::new("a1", array.data_type().clone(), nullable); + let field = Field::new("a1", array.data_type().clone(), true); let schema = Schema::from(vec![field]); let options = WriteOptions { @@ -42,7 +41,8 @@ fn round_trip( let iter = vec![Chunk::try_new(vec![array.clone()])]; - let row_groups = RowGroupIterator::try_new(iter.into_iter(), &schema, options, vec![encoding])?; + let row_groups = + RowGroupIterator::try_new(iter.into_iter(), &schema, options, vec![encodings])?; let writer = Cursor::new(vec![]); let mut writer = FileWriter::try_new(writer, schema, options)?; @@ -65,11 +65,10 @@ fn round_trip( fn int64_optional_v1() -> Result<()> { round_trip( "int64", - true, - false, + "nullable", Version::V1, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -77,11 +76,10 @@ fn int64_optional_v1() -> Result<()> { fn int64_required_v1() -> Result<()> { round_trip( "int64", - false, - false, + "required", Version::V1, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -89,11 +87,10 @@ fn int64_required_v1() -> Result<()> { fn int64_optional_v2() -> Result<()> { round_trip( "int64", - true, - false, + "nullable", Version::V2, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -102,11 +99,10 @@ fn int64_optional_v2() -> Result<()> { fn int64_optional_v2_compressed() -> Result<()> { round_trip( "int64", - true, - false, + "nullable", Version::V2, CompressionOptions::Snappy, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -114,11 +110,10 @@ fn int64_optional_v2_compressed() -> Result<()> { fn utf8_optional_v1() -> Result<()> { round_trip( "string", - true, - false, + "nullable", Version::V1, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -126,11 +121,10 @@ fn utf8_optional_v1() -> Result<()> { fn utf8_required_v1() -> Result<()> { round_trip( "string", - false, - false, + "required", Version::V1, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -138,11 +132,10 @@ fn utf8_required_v1() -> Result<()> { fn utf8_optional_v2() -> Result<()> { round_trip( "string", - true, - false, + "nullable", Version::V2, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -150,11 +143,10 @@ fn utf8_optional_v2() -> Result<()> { fn utf8_required_v2() -> Result<()> { round_trip( "string", - false, - false, + "required", Version::V2, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -163,11 +155,10 @@ fn utf8_required_v2() -> Result<()> { fn utf8_optional_v2_compressed() -> Result<()> { round_trip( "string", - true, - false, + "nullable", Version::V2, CompressionOptions::Snappy, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -176,11 +167,10 @@ fn utf8_optional_v2_compressed() -> Result<()> { fn utf8_required_v2_compressed() -> Result<()> { round_trip( "string", - false, - false, + "required", Version::V2, CompressionOptions::Snappy, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -188,11 +178,10 @@ fn utf8_required_v2_compressed() -> Result<()> { fn bool_optional_v1() -> Result<()> { round_trip( "bool", - true, - false, + "nullable", Version::V1, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -200,11 +189,10 @@ fn bool_optional_v1() -> Result<()> { fn bool_required_v1() -> Result<()> { round_trip( "bool", - false, - false, + "required", Version::V1, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -212,11 +200,10 @@ fn bool_required_v1() -> Result<()> { fn bool_optional_v2_uncompressed() -> Result<()> { round_trip( "bool", - true, - false, + "nullable", Version::V2, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -224,11 +211,10 @@ fn bool_optional_v2_uncompressed() -> Result<()> { fn bool_required_v2_uncompressed() -> Result<()> { round_trip( "bool", - false, - false, + "required", Version::V2, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -237,11 +223,10 @@ fn bool_required_v2_uncompressed() -> Result<()> { fn bool_required_v2_compressed() -> Result<()> { round_trip( "bool", - false, - false, + "required", Version::V2, CompressionOptions::Snappy, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -249,11 +234,10 @@ fn bool_required_v2_compressed() -> Result<()> { fn list_int64_optional_v2() -> Result<()> { round_trip( "list_int64", - true, - true, + "nested", Version::V2, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -261,11 +245,10 @@ fn list_int64_optional_v2() -> Result<()> { fn list_int64_optional_v1() -> Result<()> { round_trip( "list_int64", - true, - true, + "nested", Version::V1, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -273,11 +256,10 @@ fn list_int64_optional_v1() -> Result<()> { fn list_int64_required_required_v1() -> Result<()> { round_trip( "list_int64_required_required", - false, - true, + "nested", Version::V1, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -285,11 +267,10 @@ fn list_int64_required_required_v1() -> Result<()> { fn list_int64_required_required_v2() -> Result<()> { round_trip( "list_int64_required_required", - false, - true, + "nested", Version::V2, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -297,11 +278,10 @@ fn list_int64_required_required_v2() -> Result<()> { fn list_bool_optional_v2() -> Result<()> { round_trip( "list_bool", - true, - true, + "nested", Version::V2, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -309,11 +289,10 @@ fn list_bool_optional_v2() -> Result<()> { fn list_bool_optional_v1() -> Result<()> { round_trip( "list_bool", - true, - true, + "nested", Version::V1, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -321,11 +300,10 @@ fn list_bool_optional_v1() -> Result<()> { fn list_utf8_optional_v2() -> Result<()> { round_trip( "list_utf8", - true, - true, + "nested", Version::V2, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -333,11 +311,10 @@ fn list_utf8_optional_v2() -> Result<()> { fn list_utf8_optional_v1() -> Result<()> { round_trip( "list_utf8", - true, - true, + "nested", Version::V1, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -345,11 +322,10 @@ fn list_utf8_optional_v1() -> Result<()> { fn list_large_binary_optional_v2() -> Result<()> { round_trip( "list_large_binary", - true, - true, + "nested", Version::V2, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -357,11 +333,10 @@ fn list_large_binary_optional_v2() -> Result<()> { fn list_large_binary_optional_v1() -> Result<()> { round_trip( "list_large_binary", - true, - true, + "nested", Version::V1, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -370,11 +345,10 @@ fn list_large_binary_optional_v1() -> Result<()> { fn utf8_optional_v2_delta() -> Result<()> { round_trip( "string", - true, - false, + "nullable", Version::V2, CompressionOptions::Uncompressed, - Encoding::DeltaLengthByteArray, + vec![Encoding::DeltaLengthByteArray], ) } @@ -382,11 +356,10 @@ fn utf8_optional_v2_delta() -> Result<()> { fn i32_optional_v2_dict() -> Result<()> { round_trip( "int32_dict", - true, - false, + "nullable", Version::V2, CompressionOptions::Uncompressed, - Encoding::RleDictionary, + vec![Encoding::RleDictionary], ) } @@ -395,11 +368,10 @@ fn i32_optional_v2_dict() -> Result<()> { fn i32_optional_v2_dict_compressed() -> Result<()> { round_trip( "int32_dict", - true, - false, + "nullable", Version::V2, CompressionOptions::Snappy, - Encoding::RleDictionary, + vec![Encoding::RleDictionary], ) } @@ -408,11 +380,10 @@ fn i32_optional_v2_dict_compressed() -> Result<()> { fn decimal_9_optional_v1() -> Result<()> { round_trip( "decimal_9", - true, - false, + "nullable", Version::V1, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -420,11 +391,10 @@ fn decimal_9_optional_v1() -> Result<()> { fn decimal_9_required_v1() -> Result<()> { round_trip( "decimal_9", - false, - false, + "required", Version::V1, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -432,11 +402,10 @@ fn decimal_9_required_v1() -> Result<()> { fn decimal_18_optional_v1() -> Result<()> { round_trip( "decimal_18", - true, - false, + "nullable", Version::V1, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -444,11 +413,10 @@ fn decimal_18_optional_v1() -> Result<()> { fn decimal_18_required_v1() -> Result<()> { round_trip( "decimal_18", - false, - false, + "required", Version::V1, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -456,11 +424,10 @@ fn decimal_18_required_v1() -> Result<()> { fn decimal_26_optional_v1() -> Result<()> { round_trip( "decimal_26", - true, - false, + "nullable", Version::V1, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -468,11 +435,10 @@ fn decimal_26_optional_v1() -> Result<()> { fn decimal_26_required_v1() -> Result<()> { round_trip( "decimal_26", - false, - false, + "required", Version::V1, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -480,11 +446,10 @@ fn decimal_26_required_v1() -> Result<()> { fn decimal_9_optional_v2() -> Result<()> { round_trip( "decimal_9", - true, - false, + "nullable", Version::V2, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -492,11 +457,10 @@ fn decimal_9_optional_v2() -> Result<()> { fn decimal_9_required_v2() -> Result<()> { round_trip( "decimal_9", - false, - false, + "required", Version::V2, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -504,11 +468,10 @@ fn decimal_9_required_v2() -> Result<()> { fn decimal_18_optional_v2() -> Result<()> { round_trip( "decimal_18", - true, - false, + "nullable", Version::V2, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -516,11 +479,10 @@ fn decimal_18_optional_v2() -> Result<()> { fn decimal_18_required_v2() -> Result<()> { round_trip( "decimal_18", - false, - false, + "required", Version::V2, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -528,11 +490,10 @@ fn decimal_18_required_v2() -> Result<()> { fn decimal_26_optional_v2() -> Result<()> { round_trip( "decimal_26", - true, - false, + "nullable", Version::V2, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], ) } @@ -540,10 +501,31 @@ fn decimal_26_optional_v2() -> Result<()> { fn decimal_26_required_v2() -> Result<()> { round_trip( "decimal_26", - false, - false, + "required", Version::V2, CompressionOptions::Uncompressed, - Encoding::Plain, + vec![Encoding::Plain], + ) +} + +#[test] +fn struct_v1() -> Result<()> { + round_trip( + "struct", + "struct", + Version::V1, + CompressionOptions::Uncompressed, + vec![Encoding::Plain, Encoding::Plain], + ) +} + +#[test] +fn struct_v2() -> Result<()> { + round_trip( + "struct", + "struct", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::Plain, Encoding::Plain], ) } diff --git a/tests/it/io/parquet/write_async.rs b/tests/it/io/parquet/write_async.rs index 4ed1f38557a..158f8c01e08 100644 --- a/tests/it/io/parquet/write_async.rs +++ b/tests/it/io/parquet/write_async.rs @@ -30,7 +30,7 @@ async fn test_parquet_async_roundtrip() { Field::new("a1", DataType::Int32, true), Field::new("a2", DataType::Float32, true), ]); - let encoding = vec![Encoding::Plain, Encoding::Plain]; + let encoding = vec![vec![Encoding::Plain], vec![Encoding::Plain]]; let options = WriteOptions { write_statistics: true, compression: CompressionOptions::Uncompressed,