diff --git a/src/array/struct_/mod.rs b/src/array/struct_/mod.rs index bf1ee6cf72b..be59347c3bb 100644 --- a/src/array/struct_/mod.rs +++ b/src/array/struct_/mod.rs @@ -74,6 +74,13 @@ impl StructArray { let fields = Self::get_fields(&data_type); assert!(!fields.is_empty()); assert_eq!(fields.len(), values.len()); + assert!( + fields + .iter() + .map(|f| f.data_type()) + .eq(values.iter().map(|a| a.data_type())), + "The fields' datatypes must equal the values datatypes" + ); assert!(values.iter().all(|x| x.len() == values[0].len())); if let Some(ref validity) = validity { assert_eq!(values[0].len(), validity.len()); diff --git a/src/io/parquet/read/binary/mod.rs b/src/io/parquet/read/binary/mod.rs index 6ed55565fcb..421328d66b2 100644 --- a/src/io/parquet/read/binary/mod.rs +++ b/src/io/parquet/read/binary/mod.rs @@ -9,13 +9,16 @@ use std::sync::Arc; use crate::{ array::{Array, Offset}, - datatypes::{DataType, Field}, + datatypes::DataType, }; use self::basic::TraitBinaryArray; use self::nested::ArrayIterator; use super::ArrayIter; -use super::{nested_utils::NestedArrayIter, DataPages}; +use super::{ + nested_utils::{InitNested, NestedArrayIter}, + DataPages, +}; use basic::BinaryArrayIterator; /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] @@ -34,7 +37,7 @@ where /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] pub fn iter_to_arrays_nested<'a, O, A, I>( iter: I, - field: Field, + init: InitNested, data_type: DataType, chunk_size: usize, ) -> NestedArrayIter<'a> @@ -44,8 +47,9 @@ where O: Offset, { Box::new( - ArrayIterator::::new(iter, field, data_type, chunk_size).map(|x| { - x.map(|(nested, array)| { + ArrayIterator::::new(iter, init, data_type, chunk_size).map(|x| { + x.map(|(mut nested, array)| { + let _ = nested.nested.pop().unwrap(); // the primitive let values = Arc::new(array) as Arc; (nested, values) }) diff --git a/src/io/parquet/read/binary/nested.rs b/src/io/parquet/read/binary/nested.rs index ffe810c55b5..abb73503143 100644 --- a/src/io/parquet/read/binary/nested.rs +++ b/src/io/parquet/read/binary/nested.rs @@ -5,7 +5,7 @@ use parquet2::{encoding::Encoding, page::DataPage, schema::Repetition}; use crate::{ array::Offset, bitmap::MutableBitmap, - datatypes::{DataType, Field}, + datatypes::DataType, error::Result, io::parquet::read::{utils::MaybeNext, DataPages}, }; @@ -99,7 +99,7 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary> for BinaryDecoder pub struct ArrayIterator, I: DataPages> { iter: I, data_type: DataType, - field: Field, + init: InitNested, items: VecDeque<(Binary, MutableBitmap)>, nested: VecDeque, chunk_size: usize, @@ -107,11 +107,11 @@ pub struct ArrayIterator, I: DataPages> { } impl, I: DataPages> ArrayIterator { - pub fn new(iter: I, field: Field, data_type: DataType, chunk_size: usize) -> Self { + pub fn new(iter: I, init: InitNested, data_type: DataType, chunk_size: usize) -> Self { Self { iter, data_type, - field, + init, items: VecDeque::new(), nested: VecDeque::new(), chunk_size, @@ -128,7 +128,7 @@ impl, I: DataPages> Iterator for ArrayIterator &mut self.iter, &mut self.items, &mut self.nested, - &self.field, + &self.init, self.chunk_size, &BinaryDecoder::::default(), ); diff --git a/src/io/parquet/read/boolean/mod.rs b/src/io/parquet/read/boolean/mod.rs index dc8e386b9bc..42bd4a8d2b9 100644 --- a/src/io/parquet/read/boolean/mod.rs +++ b/src/io/parquet/read/boolean/mod.rs @@ -3,15 +3,15 @@ mod nested; use std::sync::Arc; -use crate::{ - array::Array, - datatypes::{DataType, Field}, -}; +use crate::{array::Array, datatypes::DataType}; use self::basic::BooleanArrayIterator; use self::nested::ArrayIterator; use super::ArrayIter; -use super::{nested_utils::NestedArrayIter, DataPages}; +use super::{ + nested_utils::{InitNested, NestedArrayIter}, + DataPages, +}; /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] pub fn iter_to_arrays<'a, I: 'a>(iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a> @@ -27,14 +27,15 @@ where /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] pub fn iter_to_arrays_nested<'a, I: 'a>( iter: I, - field: Field, + init: InitNested, chunk_size: usize, ) -> NestedArrayIter<'a> where I: DataPages, { - Box::new(ArrayIterator::new(iter, field, chunk_size).map(|x| { - x.map(|(nested, array)| { + Box::new(ArrayIterator::new(iter, init, chunk_size).map(|x| { + x.map(|(mut nested, array)| { + let _ = nested.nested.pop().unwrap(); // the primitive let values = Arc::new(array) as Arc; (nested, values) }) diff --git a/src/io/parquet/read/boolean/nested.rs b/src/io/parquet/read/boolean/nested.rs index 73b70c337c6..ade7e82719d 100644 --- a/src/io/parquet/read/boolean/nested.rs +++ b/src/io/parquet/read/boolean/nested.rs @@ -5,7 +5,7 @@ use parquet2::{encoding::Encoding, page::DataPage, schema::Repetition}; use crate::{ array::BooleanArray, bitmap::{utils::BitmapIter, MutableBitmap}, - datatypes::{DataType, Field}, + datatypes::DataType, error::Result, }; @@ -117,7 +117,7 @@ impl<'a> Decoder<'a, bool, MutableBitmap> for BooleanDecoder { #[derive(Debug)] pub struct ArrayIterator { iter: I, - field: Field, + init: InitNested, // invariant: items.len() == nested.len() items: VecDeque<(MutableBitmap, MutableBitmap)>, nested: VecDeque, @@ -125,10 +125,10 @@ pub struct ArrayIterator { } impl ArrayIterator { - pub fn new(iter: I, field: Field, chunk_size: usize) -> Self { + pub fn new(iter: I, init: InitNested, chunk_size: usize) -> Self { Self { iter, - field, + init, items: VecDeque::new(), nested: VecDeque::new(), chunk_size, @@ -148,7 +148,7 @@ impl Iterator for ArrayIterator { &mut self.iter, &mut self.items, &mut self.nested, - &self.field, + &self.init, self.chunk_size, &BooleanDecoder::default(), ); diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index cb706234536..d0270cfb59b 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -27,10 +27,10 @@ pub use parquet2::{ }; use crate::{ - array::{Array, BinaryArray, DictionaryKey, PrimitiveArray, StructArray, Utf8Array}, + array::{Array, BinaryArray, DictionaryKey, ListArray, PrimitiveArray, StructArray, Utf8Array}, datatypes::{DataType, Field, IntervalUnit, TimeUnit}, error::{ArrowError, Result}, - io::parquet::read::{nested_utils::create_list, primitive::read_item}, + io::parquet::read::primitive::read_item, }; mod binary; @@ -51,6 +51,8 @@ pub use row_group::*; pub(crate) use schema::is_type_nullable; pub use schema::{get_schema, FileMetaData}; +use self::nested_utils::{InitNested, NestedArrayIter, NestedState}; + pub trait DataPages: FallibleStreamingIterator + Send + Sync { @@ -429,10 +431,10 @@ fn page_iter_to_arrays<'a, I: 'a + DataPages>( dict_read::<$K, _>(pages, type_, field.data_type, chunk_size) }), - LargeList(inner) | List(inner) => { + /*LargeList(inner) | List(inner) => { let data_type = inner.data_type.clone(); page_iter_to_arrays_nested(pages, type_, field, data_type, chunk_size) - } + }*/ other => Err(ArrowError::NotYetImplemented(format!( "Reading {:?} from parquet still not implemented", other @@ -440,218 +442,229 @@ fn page_iter_to_arrays<'a, I: 'a + DataPages>( } } -fn page_iter_to_arrays_nested<'a, I: 'a + DataPages>( - pages: I, - type_: &ParquetType, - field: Field, +fn create_list( data_type: DataType, - chunk_size: usize, -) -> Result> { - use DataType::*; - let iter = match data_type { - Boolean => boolean::iter_to_arrays_nested(pages, field.clone(), chunk_size), - - UInt8 => primitive::iter_to_arrays_nested( - pages, - field.clone(), - data_type, - chunk_size, - read_item, - |x: i32| x as u8, - ), - UInt16 => primitive::iter_to_arrays_nested( - pages, - field.clone(), - data_type, - chunk_size, - read_item, - |x: i32| x as u16, - ), - UInt32 => primitive::iter_to_arrays_nested( - pages, - field.clone(), - data_type, - chunk_size, - read_item, - |x: i32| x as u32, - ), - Int8 => primitive::iter_to_arrays_nested( - pages, - field.clone(), - data_type, - chunk_size, - read_item, - |x: i32| x as i8, - ), - Int16 => primitive::iter_to_arrays_nested( - pages, - field.clone(), - data_type, - chunk_size, - read_item, - |x: i32| x as i16, - ), - Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => { - primitive::iter_to_arrays_nested( - pages, - field.clone(), + nested: &mut NestedState, + values: Arc, +) -> Result> { + Ok(match data_type { + DataType::List(_) => { + let (offsets, validity) = nested.nested.pop().unwrap().inner(); + + let offsets = offsets.iter().map(|x| *x as i32).collect::>(); + Arc::new(ListArray::::from_data( data_type, - chunk_size, - read_item, - |x: i32| x, - ) + offsets.into(), + values, + validity, + )) } - Int64 => primitive::iter_to_arrays_nested( - pages, - field.clone(), - data_type, - chunk_size, - read_item, - |x: i64| x, - ), - - Timestamp(TimeUnit::Nanosecond, None) => match type_ { - ParquetType::PrimitiveType { - physical_type, - logical_type, - .. - } => match (physical_type, logical_type) { - (PhysicalType::Int96, _) => primitive::iter_to_arrays_nested( - pages, - field.clone(), - DataType::Timestamp(TimeUnit::Nanosecond, None), - chunk_size, - read_item, - int96_to_i64_ns, - ), - (_, Some(LogicalType::TIMESTAMP(TimestampType { unit, .. }))) => match unit { - ParquetTimeUnit::MILLIS(_) => primitive::iter_to_arrays_nested( - pages, - field.clone(), - data_type, - chunk_size, - read_item, - |x: i64| x * 1_000_000, - ), - ParquetTimeUnit::MICROS(_) => primitive::iter_to_arrays_nested( - pages, - field.clone(), - data_type, - chunk_size, - read_item, - |x: i64| x * 1_000, - ), - ParquetTimeUnit::NANOS(_) => primitive::iter_to_arrays_nested( - pages, - field.clone(), - data_type, - chunk_size, - read_item, - |x: i64| x, - ), - }, - _ => primitive::iter_to_arrays_nested( - pages, - field.clone(), - data_type, - chunk_size, - read_item, - |x: i64| x, - ), - }, - _ => unreachable!(), - }, - - Binary => binary::iter_to_arrays_nested::, _>( - pages, - field.clone(), - data_type, - chunk_size, - ), - LargeBinary => binary::iter_to_arrays_nested::, _>( - pages, - field.clone(), - data_type, - chunk_size, - ), - Utf8 => binary::iter_to_arrays_nested::, _>( - pages, - field.clone(), - data_type, - chunk_size, - ), - LargeUtf8 => binary::iter_to_arrays_nested::, _>( - pages, - field.clone(), - data_type, - chunk_size, - ), - _ => todo!(), - }; - - let iter = iter.map(move |x| { - let (mut nested, array) = x?; - let _ = nested.nested.pop().unwrap(); // the primitive - create_list(field.data_type().clone(), &mut nested, array) - }); + DataType::LargeList(_) => { + let (offsets, validity) = nested.nested.pop().unwrap().inner(); - Ok(Box::new(iter)) + Arc::new(ListArray::::from_data( + data_type, offsets, values, validity, + )) + } + _ => { + return Err(ArrowError::NotYetImplemented(format!( + "Read nested datatype {:?}", + data_type + ))) + } + }) } struct StructIterator<'a> { - iters: Vec>, + iters: Vec>, fields: Vec, } impl<'a> StructIterator<'a> { - pub fn new(iters: Vec>, fields: Vec) -> Self { + pub fn new(iters: Vec>, fields: Vec) -> Self { assert_eq!(iters.len(), fields.len()); Self { iters, fields } } } impl<'a> Iterator for StructIterator<'a> { - type Item = Result>; + type Item = Result<(NestedState, Arc)>; fn next(&mut self) -> Option { let values = self .iters .iter_mut() .map(|iter| iter.next()) - .collect::>>(); + .collect::>(); if values.iter().any(|x| x.is_none()) { return None; } let values = values .into_iter() - .map(|x| x.unwrap()) + .map(|x| x.unwrap().map(|x| x.1)) .collect::>>(); match values { - Ok(values) => Some(Ok(Arc::new(StructArray::from_data( - DataType::Struct(self.fields.clone()), - values, - None, - )))), + Ok(values) => Some(Ok(( + NestedState::new(vec![]), // todo + Arc::new(StructArray::from_data( + DataType::Struct(self.fields.clone()), + values, + None, + )), + ))), Err(e) => Some(Err(e)), } } } -fn get_fields(field: &Field) -> Vec { +fn columns_to_iter_recursive<'a, I: 'a>( + mut columns: Vec, + mut types: Vec<&ParquetType>, + field: Field, + mut init: Vec, + chunk_size: usize, +) -> Result> +where + I: DataPages, +{ + use DataType::*; + if init.len() == 1 && init[0].is_primitive() { + return Ok(Box::new( + page_iter_to_arrays( + columns.pop().unwrap(), + types.pop().unwrap(), + field, + chunk_size, + )? + .map(|x| Ok((NestedState::new(vec![]), x?))), + )); + } + + Ok(match field.data_type().to_logical_type() { + Boolean => { + types.pop(); + boolean::iter_to_arrays_nested(columns.pop().unwrap(), init.pop().unwrap(), chunk_size) + } + Int16 => { + types.pop(); + primitive::iter_to_arrays_nested( + columns.pop().unwrap(), + init.pop().unwrap(), + field.data_type().clone(), + chunk_size, + read_item, + |x: i32| x as i16, + ) + } + Int64 => { + types.pop(); + primitive::iter_to_arrays_nested( + columns.pop().unwrap(), + init.pop().unwrap(), + field.data_type().clone(), + chunk_size, + read_item, + |x: i64| x, + ) + } + Utf8 => { + types.pop(); + binary::iter_to_arrays_nested::, _>( + columns.pop().unwrap(), + init.pop().unwrap(), + field.data_type().clone(), + chunk_size, + ) + } + LargeBinary => { + types.pop(); + binary::iter_to_arrays_nested::, _>( + columns.pop().unwrap(), + init.pop().unwrap(), + field.data_type().clone(), + chunk_size, + ) + } + List(inner) => { + let iter = columns_to_iter_recursive( + vec![columns.pop().unwrap()], + types, + inner.as_ref().clone(), + init, + chunk_size, + )?; + let iter = iter.map(move |x| { + let (mut nested, array) = x?; + let array = create_list(field.data_type().clone(), &mut nested, array)?; + Ok((nested, array)) + }); + Box::new(iter) as _ + } + Struct(fields) => { + let columns = fields + .iter() + .rev() + .map(|f| { + columns_to_iter_recursive( + vec![columns.pop().unwrap()], + vec![types.pop().unwrap()], + f.clone(), + vec![init.pop().unwrap()], + chunk_size, + ) + }) + .collect::>>()?; + let columns = columns.into_iter().rev().collect(); + Box::new(StructIterator::new(columns, fields.clone())) + } + _ => todo!(), + }) +} + +// [Struct, List, Bool] +// => [Struct(Int), Struct(Utf8), List(Int), Bool] +// [Struct, Utf8>, List, Bool] +// => [Struct(Struct(Int)), Struct(Utf8), List(Int), Bool] +// [List>] +// => [List(Struct(Int)), List(Struct(Bool))] +// [Struct, Utf8>] +// => [Struct(Int), Struct(Bool)] +// => [Struct(Struct(Int)), Struct(Struct(Bool)), Struct(Utf8)] + +fn field_to_init(field: &Field) -> Vec { use crate::datatypes::PhysicalType::*; match field.data_type.to_physical_type() { Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8 - | Dictionary(_) | LargeUtf8 | List | FixedSizeList | LargeList => { - vec![field.clone()] + | Dictionary(_) | LargeUtf8 => vec![InitNested::Primitive(field.is_nullable)], + List | FixedSizeList | LargeList => { + let a = field.data_type().to_logical_type(); + let inner = if let DataType::List(inner) = a { + field_to_init(inner) + } else if let DataType::LargeList(inner) = a { + field_to_init(inner) + } else if let DataType::FixedSizeList(inner, _) = a { + field_to_init(inner) + } else { + unreachable!() + }; + inner + .into_iter() + .map(|x| InitNested::List(Box::new(x), field.is_nullable)) + .collect() } Struct => { - if let DataType::Struct(fields) = field.data_type.to_logical_type() { - fields.clone() + let inner = if let DataType::Struct(fields) = field.data_type.to_logical_type() { + fields.iter().rev().map(field_to_init).collect::>() } else { unreachable!() - } + }; + inner + .into_iter() + .flatten() + .map(|x| InitNested::Struct(Box::new(x), field.is_nullable)) + .collect() } _ => todo!(), } @@ -661,27 +674,17 @@ fn get_fields(field: &Field) -> Vec { pub fn column_iter_to_arrays<'a, I: 'static>( columns: Vec, types: Vec<&ParquetType>, - field: &Field, + field: Field, chunk_size: usize, ) -> Result> where I: DataPages, { - // get fields - let fields = get_fields(field); - - let mut iters = columns - .into_iter() - .zip(types.into_iter()) - .zip(fields.clone().into_iter()) - .map(|((pages, type_), field)| page_iter_to_arrays(pages, type_, field, chunk_size)) - .collect::>>()?; - - Ok(if fields.len() > 1 { - Box::new(StructIterator::new(iters, fields)) - } else { - iters.pop().unwrap() - }) + let init = field_to_init(&field); + + Ok(Box::new( + columns_to_iter_recursive(columns, types, field, init, chunk_size)?.map(|x| x.map(|x| x.1)), + )) } pub type ArrayIter<'a> = Box>> + Send + Sync + 'a>; diff --git a/src/io/parquet/read/nested_utils.rs b/src/io/parquet/read/nested_utils.rs index d4ce5417c0c..f4a5b9ed2bb 100644 --- a/src/io/parquet/read/nested_utils.rs +++ b/src/io/parquet/read/nested_utils.rs @@ -5,11 +5,10 @@ use parquet2::{ }; use crate::{ - array::{Array, ListArray}, + array::Array, bitmap::{Bitmap, MutableBitmap}, buffer::Buffer, - datatypes::{DataType, Field}, - error::{ArrowError, Result}, + error::Result, }; use super::{ @@ -215,83 +214,53 @@ pub(super) fn read_optional_values( } } -fn init_nested_recursive(field: &Field, capacity: usize, container: &mut Vec>) { - let is_nullable = field.is_nullable; +#[derive(Debug, Clone)] +pub enum InitNested { + Primitive(bool), + List(Box, bool), + Struct(Box, bool), +} + +impl InitNested { + pub fn is_primitive(&self) -> bool { + matches!(self, Self::Primitive(_)) + } +} - use crate::datatypes::PhysicalType::*; - match field.data_type().to_physical_type() { - Null | Boolean | Primitive(_) | FixedSizeBinary | Binary | LargeBinary | Utf8 - | LargeUtf8 | Dictionary(_) => { - container.push(Box::new(NestedPrimitive::new(is_nullable)) as Box) +fn init_nested_recursive(init: &InitNested, capacity: usize, container: &mut Vec>) { + match init { + InitNested::Primitive(is_nullable) => { + container.push(Box::new(NestedPrimitive::new(*is_nullable)) as Box) } - List | LargeList | FixedSizeList => { - if is_nullable { - container.push(Box::new(NestedOptional::with_capacity(capacity)) as Box) + InitNested::List(inner, is_nullable) => { + container.push(if *is_nullable { + Box::new(NestedOptional::with_capacity(capacity)) as Box } else { - container.push(Box::new(NestedValid::with_capacity(capacity)) as Box) - } - match field.data_type().to_logical_type() { - DataType::List(ref inner) - | DataType::LargeList(ref inner) - | DataType::FixedSizeList(ref inner, _) => { - init_nested_recursive(inner.as_ref(), capacity, container) - } - _ => unreachable!(), - }; + Box::new(NestedValid::with_capacity(capacity)) as Box + }); + init_nested_recursive(inner, capacity, container) } - Struct => { - container.push(Box::new(NestedPrimitive::new(is_nullable)) as Box); - if let DataType::Struct(fields) = field.data_type().to_logical_type() { - fields - .iter() - .for_each(|field| init_nested_recursive(field, capacity, container)); + InitNested::Struct(inner, is_nullable) => { + if *is_nullable { + container.push(Box::new(NestedOptional::with_capacity(capacity)) as Box) } else { - unreachable!() + container.push(Box::new(NestedValid::with_capacity(capacity)) as Box) } + init_nested_recursive(inner, capacity, container) } - _ => todo!(), } } -fn init_nested(field: &Field, capacity: usize) -> NestedState { +fn init_nested(init: &InitNested, capacity: usize) -> NestedState { let mut container = vec![]; - init_nested_recursive(field, capacity, &mut container); + init_nested_recursive(init, capacity, &mut container); + println!("{:?}", container); NestedState::new(container) } -pub fn create_list( - data_type: DataType, - nested: &mut NestedState, - values: Arc, -) -> Result> { - Ok(match data_type { - DataType::List(_) => { - let (offsets, validity) = nested.nested.pop().unwrap().inner(); - - let offsets = Buffer::::from_trusted_len_iter(offsets.iter().map(|x| *x as i32)); - Arc::new(ListArray::::from_data( - data_type, offsets, values, validity, - )) - } - DataType::LargeList(_) => { - let (offsets, validity) = nested.nested.pop().unwrap().inner(); - - Arc::new(ListArray::::from_data( - data_type, offsets, values, validity, - )) - } - _ => { - return Err(ArrowError::NotYetImplemented(format!( - "Read nested datatype {:?}", - data_type - ))) - } - }) -} - pub struct NestedPage<'a> { repetitions: HybridRleDecoder<'a>, - max_rep_level: u32, + _max_rep_level: u32, definitions: HybridRleDecoder<'a>, max_def_level: u32, } @@ -309,7 +278,7 @@ impl<'a> NestedPage<'a> { get_bit_width(max_rep_level), page.num_values(), ), - max_rep_level: max_rep_level as u32, + _max_rep_level: max_rep_level as u32, definitions: HybridRleDecoder::new( def_levels, get_bit_width(max_def_level), @@ -410,7 +379,7 @@ pub(super) fn extend_from_new_page<'a, T: Decoder<'a, C, P>, C: Default, P: Push pub fn extend_offsets1<'a>( page: &mut NestedPage<'a>, state: Option, - field: &Field, + init: &InitNested, items: &mut VecDeque, chunk_size: usize, ) -> Result> { @@ -423,7 +392,7 @@ pub fn extend_offsets1<'a>( nested } else { // there is no state => initialize it - init_nested(field, chunk_size) + init_nested(init, chunk_size) }; let remaining = chunk_size - nested.len(); @@ -440,7 +409,7 @@ pub fn extend_offsets1<'a>( } while page.len() > 0 { - let mut nested = init_nested(field, chunk_size); + let mut nested = init_nested(init, chunk_size); extend_offsets2(page, &mut nested, chunk_size); items.push_back(nested) } @@ -451,8 +420,6 @@ pub fn extend_offsets1<'a>( fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, additional: usize) { let mut values_count = vec![0; nested.depth()]; - let mut prev_def: u32 = 0; - let mut is_first = true; let mut def_threshold = page.max_def_level; let thres = nested @@ -466,7 +433,7 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi }) .collect::>(); - let max_rep = page.max_rep_level; + let rate = if page.max_def_level == 1 { 1 } else { 2 }; let mut iter = page.repetitions.by_ref().zip(page.definitions.by_ref()); @@ -478,25 +445,17 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi rows += 1 } - let mut closures = max_rep - rep; - if prev_def <= 1 { - closures = 1; - }; - if is_first { - // close on first run to ensure offsets start with 0. - closures = max_rep; - is_first = false; - } + let closures = rep + 1 + (def / rate); nested .nested .iter_mut() - .zip(values_count.iter()) .enumerate() + .zip(values_count.iter()) .skip(rep as usize) - .take((rep + closures) as usize) - .for_each(|(depth, (nested, length))| { - let is_null = (def - rep) as usize == depth && depth == rep as usize; + .take(closures as usize) + .for_each(|((depth, nested), length)| { + let is_null = def - rep == depth as u32; nested.push(*length, !is_null); }); @@ -513,7 +472,6 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi *values += 1; } }); - prev_def = def; } // close validities @@ -565,7 +523,7 @@ pub(super) fn next<'a, I, C, P, D>( iter: &'a mut I, items: &mut VecDeque<(P, MutableBitmap)>, nested_items: &mut VecDeque, - field: &Field, + init: &InitNested, chunk_size: usize, decoder: &D, ) -> MaybeNext> @@ -579,7 +537,6 @@ where if items.len() > 1 { let nested = nested_items.pop_back().unwrap(); let (values, validity) = items.pop_back().unwrap(); - //let array = BooleanArray::from_data(DataType::Boolean, values.into(), validity.into()); return MaybeNext::Some(Ok((nested, values, validity))); } match (nested_items.pop_back(), items.pop_back(), iter.next()) { @@ -594,7 +551,7 @@ where // read next chunk from `nested_page` and get number of values to read let maybe_nested = - extend_offsets1(&mut nested_page, state, field, nested_items, chunk_size); + extend_offsets1(&mut nested_page, state, init, nested_items, chunk_size); let nested = match maybe_nested { Ok(nested) => nested, Err(e) => return MaybeNext::Some(Err(e)), diff --git a/src/io/parquet/read/primitive/mod.rs b/src/io/parquet/read/primitive/mod.rs index 6604c6005ac..e0ca42144fc 100644 --- a/src/io/parquet/read/primitive/mod.rs +++ b/src/io/parquet/read/primitive/mod.rs @@ -8,10 +8,7 @@ pub use utils::read_item; use std::sync::Arc; -use crate::{ - array::Array, - datatypes::{DataType, Field}, -}; +use crate::{array::Array, datatypes::DataType}; use super::ArrayIter; use super::{nested_utils::*, DataPages}; @@ -43,7 +40,7 @@ where /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] pub fn iter_to_arrays_nested<'a, I, T, P, G, F>( iter: I, - field: Field, + init: InitNested, data_type: DataType, chunk_size: usize, op1: G, @@ -57,13 +54,12 @@ where F: 'a + Copy + Send + Sync + Fn(P) -> T, { Box::new( - ArrayIterator::::new(iter, field, data_type, chunk_size, op1, op2).map( - |x| { - x.map(|(nested, array)| { - let values = Arc::new(array) as Arc; - (nested, values) - }) - }, - ), + ArrayIterator::::new(iter, init, data_type, chunk_size, op1, op2).map(|x| { + x.map(|(mut nested, array)| { + let _ = nested.nested.pop().unwrap(); // the primitive + let values = Arc::new(array) as Arc; + (nested, values) + }) + }), ) } diff --git a/src/io/parquet/read/primitive/nested.rs b/src/io/parquet/read/primitive/nested.rs index 442e55a18c7..96f65959d91 100644 --- a/src/io/parquet/read/primitive/nested.rs +++ b/src/io/parquet/read/primitive/nested.rs @@ -5,12 +5,8 @@ use parquet2::{ }; use crate::{ - array::PrimitiveArray, - bitmap::MutableBitmap, - datatypes::{DataType, Field}, - error::Result, - io::parquet::read::utils::MaybeNext, - types::NativeType, + array::PrimitiveArray, bitmap::MutableBitmap, datatypes::DataType, error::Result, + io::parquet::read::utils::MaybeNext, types::NativeType, }; use super::super::nested_utils::*; @@ -174,7 +170,7 @@ where F: Copy + Fn(P) -> T, { iter: I, - field: Field, + init: InitNested, data_type: DataType, // invariant: items.len() == nested.len() items: VecDeque<(Vec, MutableBitmap)>, @@ -194,7 +190,7 @@ where { pub fn new( iter: I, - field: Field, + init: InitNested, data_type: DataType, chunk_size: usize, op1: G, @@ -202,7 +198,7 @@ where ) -> Self { Self { iter, - field, + init, data_type, items: VecDeque::new(), nested: VecDeque::new(), @@ -228,7 +224,7 @@ where &mut self.iter, &mut self.items, &mut self.nested, - &self.field, + &self.init, self.chunk_size, &self.decoder, ); diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index 272d1f08f5e..68afbc79229 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -78,15 +78,15 @@ pub fn read_columns<'a, R: Read + Seek>( // reads all the necessary columns for all fields from the row group // This operation is IO-bounded `O(C)` where C is the number of columns in the row group - let columns = fields + let field_columns = fields .iter() .map(|field| _read_columns(reader, row_group.columns(), &field.name)) .collect::>>()?; - columns + field_columns .into_iter() .map(|columns| { - let (pages, types): (Vec<_>, Vec<_>) = columns + let (columns, types): (Vec<_>, Vec<_>) = columns .into_iter() .map(|(column_meta, chunk)| { let pages = PageIterator::new( @@ -103,10 +103,10 @@ pub fn read_columns<'a, R: Read + Seek>( ) }) .unzip(); - (pages, types) + (columns, types) }) .zip(fields.into_iter()) - .map(|((columns, types), field)| column_iter_to_arrays(columns, types, &field, chunk_size)) + .map(|((columns, types), field)| column_iter_to_arrays(columns, types, field, chunk_size)) .collect() }