From 9019013dba3c3a8276f9d70bccc2c0ff73f4e856 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 21 Jun 2022 16:11:20 +0000 Subject: [PATCH 1/4] Moved file --- .../parquet/read/deserialize/{dictionary.rs => dictionary/mod.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/io/parquet/read/deserialize/{dictionary.rs => dictionary/mod.rs} (100%) diff --git a/src/io/parquet/read/deserialize/dictionary.rs b/src/io/parquet/read/deserialize/dictionary/mod.rs similarity index 100% rename from src/io/parquet/read/deserialize/dictionary.rs rename to src/io/parquet/read/deserialize/dictionary/mod.rs From 9ad04ca48f025faca75647187c010fbc574f909f Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 22 Jun 2022 04:00:57 +0000 Subject: [PATCH 2/4] Simpler --- src/io/parquet/read/deserialize/binary/mod.rs | 37 +------------------ .../parquet/read/deserialize/binary/nested.rs | 29 +++++++++++++-- .../parquet/read/deserialize/boolean/mod.rs | 25 +------------ .../read/deserialize/boolean/nested.rs | 22 +++++++++-- .../parquet/read/deserialize/primitive/mod.rs | 33 +---------------- .../read/deserialize/primitive/nested.rs | 23 ++++++++++++ 6 files changed, 73 insertions(+), 96 deletions(-) diff --git a/src/io/parquet/read/deserialize/binary/mod.rs b/src/io/parquet/read/deserialize/binary/mod.rs index 6eee318772d..613b95e9ca9 100644 --- a/src/io/parquet/read/deserialize/binary/mod.rs +++ b/src/io/parquet/read/deserialize/binary/mod.rs @@ -3,40 +3,7 @@ mod dictionary; mod nested; mod utils; -use crate::{ - array::{Array, Offset}, - datatypes::DataType, -}; - -use self::basic::TraitBinaryArray; -use self::nested::ArrayIterator; -use super::{ - nested_utils::{InitNested, NestedArrayIter}, - DataPages, -}; - +pub use self::nested::NestedIter; pub use basic::Iter; pub use dictionary::DictIter; - -/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays_nested<'a, O, A, I>( - iter: I, - init: Vec, - data_type: DataType, - chunk_size: Option, -) -> NestedArrayIter<'a> -where - I: 'a + DataPages, - A: TraitBinaryArray, - O: Offset, -{ - Box::new( - ArrayIterator::::new(iter, init, data_type, chunk_size).map(|x| { - x.map(|(mut nested, array)| { - let _ = nested.nested.pop().unwrap(); // the primitive - let values = Box::new(array) as Box; - (nested, values) - }) - }), - ) -} +pub use nested::iter_to_arrays_nested; diff --git a/src/io/parquet/read/deserialize/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs index f7b16fe03ef..b5703812940 100644 --- a/src/io/parquet/read/deserialize/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -6,6 +6,7 @@ use parquet2::{ schema::Repetition, }; +use crate::array::Array; use crate::{ array::Offset, bitmap::MutableBitmap, datatypes::DataType, error::Result, io::parquet::read::DataPages, @@ -141,7 +142,7 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder { } } -pub struct ArrayIterator, I: DataPages> { +pub struct NestedIter, I: DataPages> { iter: I, data_type: DataType, init: Vec, @@ -150,7 +151,7 @@ pub struct ArrayIterator, I: DataPages> { phantom_a: std::marker::PhantomData, } -impl, I: DataPages> ArrayIterator { +impl, I: DataPages> NestedIter { pub fn new( iter: I, init: Vec, @@ -168,7 +169,7 @@ impl, I: DataPages> ArrayIterator { } } -impl, I: DataPages> Iterator for ArrayIterator { +impl, I: DataPages> Iterator for NestedIter { type Item = Result<(NestedState, A)>; fn next(&mut self) -> Option { @@ -189,3 +190,25 @@ impl, I: DataPages> Iterator for ArrayIterator } } } + +/// Converts [`DataPages`] to an [`Iterator`] of [`TraitBinaryArray`] +pub fn iter_to_arrays_nested<'a, O, A, I>( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: Option, +) -> NestedArrayIter<'a> +where + I: 'a + DataPages, + A: TraitBinaryArray, + O: Offset, +{ + Box::new( + NestedIter::::new(iter, init, data_type, chunk_size).map(|result| { + let (mut nested, array) = result?; + let _ = nested.nested.pop().unwrap(); // the primitive + let array = Box::new(array) as Box; + Ok((nested, array)) + }), + ) +} diff --git a/src/io/parquet/read/deserialize/boolean/mod.rs b/src/io/parquet/read/deserialize/boolean/mod.rs index dde0a14852a..01ca1fb1122 100644 --- a/src/io/parquet/read/deserialize/boolean/mod.rs +++ b/src/io/parquet/read/deserialize/boolean/mod.rs @@ -1,28 +1,5 @@ mod basic; mod nested; -use self::nested::ArrayIterator; -use super::{ - nested_utils::{InitNested, NestedArrayIter}, - DataPages, -}; - pub use self::basic::Iter; - -/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays_nested<'a, I: 'a>( - iter: I, - init: Vec, - chunk_size: Option, -) -> NestedArrayIter<'a> -where - I: DataPages, -{ - Box::new(ArrayIterator::new(iter, init, chunk_size).map(|x| { - x.map(|(mut nested, array)| { - let _ = nested.nested.pop().unwrap(); // the primitive - let values = array.boxed(); - (nested, values) - }) - })) -} +pub use nested::iter_to_arrays_nested; diff --git a/src/io/parquet/read/deserialize/boolean/nested.rs b/src/io/parquet/read/deserialize/boolean/nested.rs index 8c20694ed5c..f2b4ccd983f 100644 --- a/src/io/parquet/read/deserialize/boolean/nested.rs +++ b/src/io/parquet/read/deserialize/boolean/nested.rs @@ -101,14 +101,14 @@ impl<'a> NestedDecoder<'a> for BooleanDecoder { /// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays #[derive(Debug)] -pub struct ArrayIterator { +pub struct NestedIter { iter: I, init: Vec, items: VecDeque<(NestedState, (MutableBitmap, MutableBitmap))>, chunk_size: Option, } -impl ArrayIterator { +impl NestedIter { pub fn new(iter: I, init: Vec, chunk_size: Option) -> Self { Self { iter, @@ -123,7 +123,7 @@ fn finish(data_type: &DataType, values: MutableBitmap, validity: MutableBitmap) BooleanArray::new(data_type.clone(), values.into(), validity.into()) } -impl Iterator for ArrayIterator { +impl Iterator for NestedIter { type Item = Result<(NestedState, BooleanArray)>; fn next(&mut self) -> Option { @@ -144,3 +144,19 @@ impl Iterator for ArrayIterator { } } } + +/// Converts [`DataPages`] to an [`Iterator`] of [`BooleanArray`] +pub fn iter_to_arrays_nested<'a, I: 'a>( + iter: I, + init: Vec, + chunk_size: Option, +) -> NestedArrayIter<'a> +where + I: DataPages, +{ + Box::new(NestedIter::new(iter, init, chunk_size).map(|result| { + let (mut nested, array) = result?; + let _ = nested.nested.pop().unwrap(); // the primitive + Ok((nested, array.boxed())) + })) +} diff --git a/src/io/parquet/read/deserialize/primitive/mod.rs b/src/io/parquet/read/deserialize/primitive/mod.rs index e49cdb80ea5..b9f87520c8d 100644 --- a/src/io/parquet/read/deserialize/primitive/mod.rs +++ b/src/io/parquet/read/deserialize/primitive/mod.rs @@ -2,35 +2,6 @@ mod basic; mod dictionary; mod nested; -pub use dictionary::DictIter; - -use crate::datatypes::DataType; - -use super::{nested_utils::*, DataPages}; - pub use basic::Iter; -use nested::ArrayIterator; - -/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] -pub fn iter_to_arrays_nested<'a, I, T, P, F>( - iter: I, - init: Vec, - data_type: DataType, - chunk_size: Option, - op: F, -) -> NestedArrayIter<'a> -where - I: 'a + DataPages, - T: crate::types::NativeType, - P: parquet2::types::NativeType, - F: 'a + Copy + Send + Sync + Fn(P) -> T, -{ - Box::new( - ArrayIterator::::new(iter, init, data_type, chunk_size, op).map(|x| { - x.map(|(mut nested, array)| { - let _ = nested.nested.pop().unwrap(); // the primitive - (nested, array.boxed()) - }) - }), - ) -} +pub use dictionary::DictIter; +pub use nested::iter_to_arrays_nested; diff --git a/src/io/parquet/read/deserialize/primitive/nested.rs b/src/io/parquet/read/deserialize/primitive/nested.rs index 587e1967adc..ce6d7a94eb5 100644 --- a/src/io/parquet/read/deserialize/primitive/nested.rs +++ b/src/io/parquet/read/deserialize/primitive/nested.rs @@ -229,3 +229,26 @@ where } } } + +/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] +pub fn iter_to_arrays_nested<'a, I, T, P, F>( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: Option, + op: F, +) -> NestedArrayIter<'a> +where + I: 'a + DataPages, + T: crate::types::NativeType, + P: parquet2::types::NativeType, + F: 'a + Copy + Send + Sync + Fn(P) -> T, +{ + Box::new( + ArrayIterator::::new(iter, init, data_type, chunk_size, op).map(|result| { + let (mut nested, array) = result?; + let _ = nested.nested.pop().unwrap(); // the primitive + Ok((nested, array.boxed())) + }), + ) +} From d6f3966b37117d9bef57d7fd98b3462a6d2bf291 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 22 Jun 2022 04:50:44 +0000 Subject: [PATCH 3/4] Initial take --- .../read/deserialize/dictionary/mod.rs | 8 +- .../read/deserialize/dictionary/nested.rs | 205 ++++++++++++++++++ src/io/parquet/read/deserialize/mod.rs | 126 ++++++++++- .../parquet/read/deserialize/nested_utils.rs | 2 +- .../read/deserialize/primitive/dictionary.rs | 108 ++++++++- .../parquet/read/deserialize/primitive/mod.rs | 2 +- tests/it/io/parquet/mod.rs | 42 +++- 7 files changed, 480 insertions(+), 13 deletions(-) create mode 100644 src/io/parquet/read/deserialize/dictionary/nested.rs diff --git a/src/io/parquet/read/deserialize/dictionary/mod.rs b/src/io/parquet/read/deserialize/dictionary/mod.rs index f89a5e06d5a..4171c1f8118 100644 --- a/src/io/parquet/read/deserialize/dictionary/mod.rs +++ b/src/io/parquet/read/deserialize/dictionary/mod.rs @@ -1,3 +1,5 @@ +mod nested; + use std::collections::VecDeque; use parquet2::{ @@ -292,8 +294,7 @@ pub(super) fn next_dict< MaybeNext::More } else { let (values, validity) = items.pop_front().unwrap(); - let keys = - PrimitiveArray::from_data(K::PRIMITIVE.into(), values.into(), validity.into()); + let keys = finish_key(values, validity); MaybeNext::Some(DictionaryArray::try_new(data_type, keys, dict.unwrap())) } } @@ -304,7 +305,6 @@ pub(super) fn next_dict< debug_assert!(values.len() <= chunk_size.unwrap_or(usize::MAX)); let keys = finish_key(values, validity); - MaybeNext::Some(DictionaryArray::try_new(data_type, keys, dict.unwrap())) } else { MaybeNext::None @@ -312,3 +312,5 @@ pub(super) fn next_dict< } } } + +pub use nested::next_dict as nested_next_dict; diff --git a/src/io/parquet/read/deserialize/dictionary/nested.rs b/src/io/parquet/read/deserialize/dictionary/nested.rs new file mode 100644 index 00000000000..23a8a5f5491 --- /dev/null +++ b/src/io/parquet/read/deserialize/dictionary/nested.rs @@ -0,0 +1,205 @@ +use std::collections::VecDeque; + +use parquet2::{ + encoding::{hybrid_rle::HybridRleDecoder, Encoding}, + page::{DataPage, DictPage}, + schema::Repetition, +}; + +use crate::datatypes::DataType; +use crate::{ + array::{Array, DictionaryArray, DictionaryKey}, + bitmap::MutableBitmap, + error::{Error, Result}, +}; + +use super::{ + super::super::DataPages, + super::nested_utils::*, + super::utils::{dict_indices_decoder, not_implemented, MaybeNext, PageState}, + finish_key, Dict, +}; + +// The state of a required DataPage with a boolean physical type +#[derive(Debug)] +pub struct Required<'a> { + values: HybridRleDecoder<'a>, + length: usize, +} + +impl<'a> Required<'a> { + fn try_new(page: &'a DataPage) -> Result { + let values = dict_indices_decoder(page)?; + let length = page.num_values(); + Ok(Self { values, length }) + } +} + +// The state of a `DataPage` of a `Dictionary` type +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +pub enum State<'a> { + Optional(HybridRleDecoder<'a>), + Required(Required<'a>), +} + +impl<'a> State<'a> { + pub fn len(&self) -> usize { + match self { + State::Optional(page) => page.len(), + State::Required(page) => page.length, + } + } +} + +impl<'a> PageState<'a> for State<'a> { + fn len(&self) -> usize { + self.len() + } +} + +#[derive(Debug)] +pub struct DictionaryDecoder +where + K: DictionaryKey, +{ + phantom_k: std::marker::PhantomData, +} + +impl Default for DictionaryDecoder +where + K: DictionaryKey, +{ + #[inline] + fn default() -> Self { + Self { + phantom_k: std::marker::PhantomData, + } + } +} + +impl<'a, K: DictionaryKey> NestedDecoder<'a> for DictionaryDecoder { + type State = State<'a>; + type DecodedState = (Vec, MutableBitmap); + + fn build_state(&self, page: &'a DataPage) -> Result { + let is_optional = + page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; + let is_filtered = page.selected_rows().is_some(); + + match (page.encoding(), is_optional, is_filtered) { + (Encoding::RleDictionary | Encoding::PlainDictionary, true, false) => { + dict_indices_decoder(page).map(State::Optional) + } + (Encoding::RleDictionary | Encoding::PlainDictionary, false, false) => { + Required::try_new(page).map(State::Required) + } + _ => Err(not_implemented(page)), + } + } + + fn with_capacity(&self, capacity: usize) -> Self::DecodedState { + ( + Vec::with_capacity(capacity), + MutableBitmap::with_capacity(capacity), + ) + } + + fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) { + let (values, validity) = decoded; + match state { + State::Optional(page_values) => { + let key = page_values.next(); + // todo: convert unwrap to error + let key = match K::try_from(key.unwrap_or_default() as usize) { + Ok(key) => key, + Err(_) => todo!(), + }; + values.push(key); + validity.push(true); + } + State::Required(page_values) => { + let key = page_values.values.next(); + let key = match K::try_from(key.unwrap_or_default() as usize) { + Ok(key) => key, + Err(_) => todo!(), + }; + values.push(key); + } + } + } + + fn push_null(&self, decoded: &mut Self::DecodedState) { + let (values, validity) = decoded; + values.push(K::default()); + validity.push(false) + } +} + +pub fn next_dict<'a, K: DictionaryKey, I: DataPages, F: Fn(&dyn DictPage) -> Box>( + iter: &'a mut I, + items: &mut VecDeque<(NestedState, (Vec, MutableBitmap))>, + init: &[InitNested], + dict: &mut Dict, + data_type: DataType, + chunk_size: Option, + read_dict: F, +) -> MaybeNext)>> { + if items.len() > 1 { + let (nested, (values, validity)) = items.pop_front().unwrap(); + let keys = finish_key(values, validity); + let dict = DictionaryArray::try_new(data_type, keys, dict.unwrap()); + return MaybeNext::Some(dict.map(|dict| (nested, dict))); + } + match iter.next() { + Err(e) => MaybeNext::Some(Err(e.into())), + Ok(Some(page)) => { + // consume the dictionary page + match (&dict, page.dictionary_page()) { + (Dict::Empty, None) => { + return MaybeNext::Some(Err(Error::nyi( + "dictionary arrays from non-dict-encoded pages", + ))); + } + (Dict::Empty, Some(dict_page)) => { + *dict = Dict::Complete(read_dict(dict_page.as_ref())) + } + (Dict::Complete(_), _) => {} + }; + + let error = extend( + page, + init, + items, + &DictionaryDecoder::::default(), + chunk_size, + ); + match error { + Ok(_) => {} + Err(e) => return MaybeNext::Some(Err(e)), + }; + + if items.front().unwrap().0.len() < chunk_size.unwrap_or(usize::MAX) { + MaybeNext::More + } else { + let (nested, (values, validity)) = items.pop_front().unwrap(); + let keys = finish_key(values, validity); + let dict = DictionaryArray::try_new(data_type, keys, dict.unwrap()); + MaybeNext::Some(dict.map(|dict| (nested, dict))) + } + } + Ok(None) => { + if let Some((nested, (values, validity))) = items.pop_front() { + // we have a populated item and no more pages + // the only case where an item's length may be smaller than chunk_size + debug_assert!(values.len() <= chunk_size.unwrap_or(usize::MAX)); + + let keys = finish_key(values, validity); + let dict = DictionaryArray::try_new(data_type, keys, dict.unwrap()); + MaybeNext::Some(dict.map(|dict| (nested, dict))) + } else { + MaybeNext::None + } + } + } +} diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index 3d229555b79..2ee84b6ca7d 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -14,7 +14,9 @@ use parquet2::read::get_page_iterator as _get_page_iterator; use parquet2::schema::types::PrimitiveType; use crate::{ - array::{Array, BinaryArray, FixedSizeListArray, ListArray, MapArray, Utf8Array}, + array::{ + Array, BinaryArray, DictionaryKey, FixedSizeListArray, ListArray, MapArray, Utf8Array, + }, datatypes::{DataType, Field}, error::{Error, Result}, }; @@ -289,6 +291,14 @@ where } _ => match field.data_type().to_logical_type() { + DataType::Dictionary(key_type, _, _) => { + let type_ = types.pop().unwrap(); + let iter = columns.pop().unwrap(); + let data_type = field.data_type().clone(); + match_integer_type!(key_type, |$K| { + dict_read::<$K, _>(iter, init, type_, data_type, chunk_size) + })? + } DataType::List(inner) | DataType::LargeList(inner) | DataType::FixedSizeList(inner, _) => { @@ -401,3 +411,117 @@ where .map(|x| x.map(|x| x.1)), )) } + +fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( + iter: I, + init: Vec, + _type_: &PrimitiveType, + data_type: DataType, + chunk_size: Option, +) -> Result> { + use DataType::*; + let values_data_type = if let Dictionary(_, v, _) = &data_type { + v.as_ref() + } else { + panic!() + }; + + Ok(match values_data_type.to_logical_type() { + UInt8 => primitive::iter_to_dict_arrays_nested::( + iter, + init, + data_type, + chunk_size, + |x: i32| x as u8, + ), + Float32 => primitive::iter_to_dict_arrays_nested::( + iter, + init, + data_type, + chunk_size, + |x: f32| x, + ), + Float64 => primitive::iter_to_dict_arrays_nested::( + iter, + init, + data_type, + chunk_size, + |x: f64| x, + ), + /* + UInt16 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as u16, + )), + UInt32 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as u32, + )), + Int8 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as i8, + )), + Int16 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: i32| x as i16, + )), + Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter( + primitive::DictIter::::new(iter, data_type, chunk_size, |x: i32| { + x as i32 + }), + ), + + Timestamp(time_unit, _) => { + let time_unit = *time_unit; + return timestamp_dict::( + iter, + physical_type, + logical_type, + data_type, + chunk_size, + time_unit, + ); + } + + Int64 | Date64 | Time64(_) | Duration(_) => dyn_iter( + primitive::DictIter::::new(iter, data_type, chunk_size, |x: i64| x), + ), + Float32 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: f32| x, + )), + Float64 => dyn_iter(primitive::DictIter::::new( + iter, + data_type, + chunk_size, + |x: f64| x, + )), + + Utf8 | Binary => dyn_iter(binary::DictIter::::new( + iter, data_type, chunk_size, + )), + LargeUtf8 | LargeBinary => dyn_iter(binary::DictIter::::new( + iter, data_type, chunk_size, + )), + FixedSizeBinary(_) => dyn_iter(fixed_size_binary::DictIter::::new( + iter, data_type, chunk_size, + )), + */ + other => { + return Err(Error::nyi(format!( + "Reading nested dictionaries of type {:?}", + other + ))) + } + }) +} diff --git a/src/io/parquet/read/deserialize/nested_utils.rs b/src/io/parquet/read/deserialize/nested_utils.rs index 5fafdceee2f..d7851f76e05 100644 --- a/src/io/parquet/read/deserialize/nested_utils.rs +++ b/src/io/parquet/read/deserialize/nested_utils.rs @@ -336,7 +336,7 @@ impl NestedState { /// Extends `items` by consuming `page`, first trying to complete the last `item` /// and extending it if more are needed -fn extend<'a, D: NestedDecoder<'a>>( +pub(super) fn extend<'a, D: NestedDecoder<'a>>( page: &'a DataPage, init: &[InitNested], items: &mut VecDeque<(NestedState, D::DecodedState)>, diff --git a/src/io/parquet/read/deserialize/primitive/dictionary.rs b/src/io/parquet/read/deserialize/primitive/dictionary.rs index 2b3f0ca5491..bc0a5f43e8f 100644 --- a/src/io/parquet/read/deserialize/primitive/dictionary.rs +++ b/src/io/parquet/read/deserialize/primitive/dictionary.rs @@ -13,11 +13,12 @@ use crate::{ types::NativeType, }; +use super::super::dictionary::nested_next_dict; use super::super::dictionary::*; +use super::super::nested_utils::{InitNested, NestedArrayIter, NestedState}; use super::super::utils::MaybeNext; use super::super::DataPages; -#[inline] fn read_dict(data_type: DataType, op: F, dict: &dyn DictPage) -> Box where T: NativeType, @@ -107,3 +108,108 @@ where } } } + +#[derive(Debug)] +pub struct NestedDictIter +where + I: DataPages, + T: NativeType, + K: DictionaryKey, + P: ParquetNativeType, + F: Fn(P) -> T, +{ + iter: I, + init: Vec, + data_type: DataType, + values: Dict, + items: VecDeque<(NestedState, (Vec, MutableBitmap))>, + chunk_size: Option, + op: F, + phantom: std::marker::PhantomData

, +} + +impl NestedDictIter +where + K: DictionaryKey, + I: DataPages, + T: NativeType, + + P: ParquetNativeType, + F: Copy + Fn(P) -> T, +{ + pub fn new( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: Option, + op: F, + ) -> Self { + let data_type = match data_type { + DataType::Dictionary(_, values, _) => *values, + _ => data_type, + }; + Self { + iter, + init, + data_type, + values: Dict::Empty, + items: VecDeque::new(), + chunk_size, + op, + phantom: Default::default(), + } + } +} + +impl Iterator for NestedDictIter +where + I: DataPages, + T: NativeType, + K: DictionaryKey, + P: ParquetNativeType, + F: Copy + Fn(P) -> T, +{ + type Item = Result<(NestedState, DictionaryArray)>; + + fn next(&mut self) -> Option { + let maybe_state = nested_next_dict( + &mut self.iter, + &mut self.items, + &self.init, + &mut self.values, + self.data_type.clone(), + self.chunk_size, + |dict| read_dict::(self.data_type.clone(), self.op, dict), + ); + match maybe_state { + MaybeNext::Some(Ok(dict)) => Some(Ok(dict)), + MaybeNext::Some(Err(e)) => Some(Err(e)), + MaybeNext::None => None, + MaybeNext::More => self.next(), + } + } +} + +/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] +pub fn iter_to_arrays_nested<'a, K, I, T, P, F>( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: Option, + op: F, +) -> NestedArrayIter<'a> +where + I: 'a + DataPages, + K: DictionaryKey, + T: crate::types::NativeType, + P: parquet2::types::NativeType, + F: 'a + Copy + Send + Sync + Fn(P) -> T, +{ + Box::new( + NestedDictIter::::new(iter, init, data_type, chunk_size, op).map(|result| { + let (mut nested, array) = result?; + let _ = nested.nested.pop().unwrap(); // the primitive + Ok((nested, array.boxed())) + }), + ) +} diff --git a/src/io/parquet/read/deserialize/primitive/mod.rs b/src/io/parquet/read/deserialize/primitive/mod.rs index b9f87520c8d..e30d813d20a 100644 --- a/src/io/parquet/read/deserialize/primitive/mod.rs +++ b/src/io/parquet/read/deserialize/primitive/mod.rs @@ -3,5 +3,5 @@ mod dictionary; mod nested; pub use basic::Iter; -pub use dictionary::DictIter; +pub use dictionary::{iter_to_arrays_nested as iter_to_dict_arrays_nested, DictIter}; pub use nested::iter_to_arrays_nested; diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 346208e8951..2eab739a4ad 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -1121,12 +1121,14 @@ fn integration_write(schema: &Schema, chunks: &[Chunk>]) -> Resul let encodings = schema .fields .iter() - .map(|x| { - vec![if let DataType::Dictionary(..) = x.data_type() { - Encoding::RleDictionary - } else { - Encoding::Plain - }] + .map(|f| { + transverse(&f.data_type, |x| { + if let DataType::Dictionary(..) = x { + Encoding::RleDictionary + } else { + Encoding::Plain + } + }) }) .collect(); @@ -1429,3 +1431,31 @@ fn list_int_nullable() -> Result<()> { array.try_extend(data).unwrap(); list_array_generic(true, array.into()) } + +#[test] +fn nested_dict() -> Result<()> { + let indices = PrimitiveArray::from_values((0..3u64).map(|x| x % 2)); + let values = PrimitiveArray::from_slice([1.0f32, 3.0]); + let floats = DictionaryArray::try_from_keys(indices, values.boxed()).unwrap(); + let floats = ListArray::try_new( + DataType::List(Box::new(Field::new( + "item", + floats.data_type().clone(), + true, + ))), + vec![0i32, 0, 2, 3, 3].into(), + floats.boxed(), + Some([true, false, true, true].into()), + )?; + + let schema = Schema::from(vec![Field::new("floats", floats.data_type().clone(), true)]); + let batch = Chunk::try_new(vec![floats.boxed()])?; + + let r = integration_write(&schema, &[batch.clone()])?; + + let (new_schema, new_batches) = integration_read(&r)?; + + assert_eq!(new_schema, schema); + assert_eq!(new_batches, vec![batch]); + Ok(()) +} From eae92440a9f882ba3912fbad05ecc59fdf34fe92 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Fri, 22 Jul 2022 04:38:26 +0000 Subject: [PATCH 4/4] Simpler --- examples/parquet_write.rs | 10 +- src/array/primitive/mod.rs | 2 +- .../read/deserialize/binary/dictionary.rs | 102 ++++++++++- src/io/parquet/read/deserialize/binary/mod.rs | 2 +- .../fixed_size_binary/dictionary.rs | 95 +++++++++- .../read/deserialize/fixed_size_binary/mod.rs | 2 +- src/io/parquet/read/deserialize/mod.rs | 98 +++++----- .../read/deserialize/primitive/dictionary.rs | 16 +- src/io/parquet/write/binary/nested.rs | 6 +- src/io/parquet/write/boolean/nested.rs | 6 +- src/io/parquet/write/dictionary.rs | 170 +++++++++++------- src/io/parquet/write/mod.rs | 9 +- src/io/parquet/write/pages.rs | 2 +- src/io/parquet/write/primitive/nested.rs | 6 +- src/io/parquet/write/utf8/nested.rs | 6 +- tests/it/io/parquet/mod.rs | 32 +++- tests/it/io/parquet/read_indexes.rs | 4 +- 17 files changed, 397 insertions(+), 171 deletions(-) diff --git a/examples/parquet_write.rs b/examples/parquet_write.rs index fef11079ac9..5b9e4331bf9 100644 --- a/examples/parquet_write.rs +++ b/examples/parquet_write.rs @@ -11,19 +11,19 @@ use arrow2::{ }, }; -fn write_batch(path: &str, schema: Schema, columns: Chunk>) -> Result<()> { +fn write_chunk(path: &str, schema: Schema, chunk: Chunk>) -> Result<()> { let options = WriteOptions { write_statistics: true, compression: CompressionOptions::Uncompressed, version: Version::V2, }; - let iter = vec![Ok(columns)]; + let iter = vec![Ok(chunk)]; let encodings = schema .fields .iter() - .map(|f| transverse(&f.data_type, |_| Encoding::Plain)) + .map(|f| transverse(&f.data_type, |_| Encoding::RleDictionary)) .collect(); let row_groups = RowGroupIterator::try_new(iter.into_iter(), &schema, options, encodings)?; @@ -52,7 +52,7 @@ fn main() -> Result<()> { ]); let field = Field::new("c1", array.data_type().clone(), true); let schema = Schema::from(vec![field]); - let columns = Chunk::new(vec![array.boxed()]); + let chunk = Chunk::new(vec![array.boxed()]); - write_batch("test.parquet", schema, columns) + write_chunk("test.parquet", schema, chunk) } diff --git a/src/array/primitive/mod.rs b/src/array/primitive/mod.rs index 74604ed7e8b..e83c56bbffd 100644 --- a/src/array/primitive/mod.rs +++ b/src/array/primitive/mod.rs @@ -69,7 +69,7 @@ fn check( if data_type.to_physical_type() != PhysicalType::Primitive(T::PRIMITIVE) { return Err(Error::oos( - "BooleanArray can only be initialized with a DataType whose physical type is Primitive", + "PrimitiveArray can only be initialized with a DataType whose physical type is Primitive", )); } Ok(()) diff --git a/src/io/parquet/read/deserialize/binary/dictionary.rs b/src/io/parquet/read/deserialize/binary/dictionary.rs index bf656929e65..7095afdeaf2 100644 --- a/src/io/parquet/read/deserialize/binary/dictionary.rs +++ b/src/io/parquet/read/deserialize/binary/dictionary.rs @@ -7,6 +7,7 @@ use crate::{ bitmap::MutableBitmap, datatypes::{DataType, PhysicalType}, error::Result, + io::parquet::read::deserialize::nested_utils::{InitNested, NestedArrayIter, NestedState}, }; use super::super::dictionary::*; @@ -23,7 +24,6 @@ where { iter: I, data_type: DataType, - values_data_type: DataType, values: Dict, items: VecDeque<(Vec, MutableBitmap)>, chunk_size: Option, @@ -37,14 +37,9 @@ where I: DataPages, { pub fn new(iter: I, data_type: DataType, chunk_size: Option) -> Self { - let values_data_type = match &data_type { - DataType::Dictionary(_, values, _) => values.as_ref().clone(), - _ => unreachable!(), - }; Self { iter, data_type, - values_data_type, values: Dict::Empty, items: VecDeque::new(), chunk_size, @@ -54,6 +49,11 @@ where } fn read_dict(data_type: DataType, dict: &dyn DictPage) -> Box { + let data_type = match data_type { + DataType::Dictionary(_, values, _) => *values, + _ => data_type, + }; + let dict = dict.as_any().downcast_ref::().unwrap(); let offsets = dict .offsets() @@ -94,7 +94,74 @@ where &mut self.values, self.data_type.clone(), self.chunk_size, - |dict| read_dict::(self.values_data_type.clone(), dict), + |dict| read_dict::(self.data_type.clone(), dict), + ); + match maybe_state { + MaybeNext::Some(Ok(dict)) => Some(Ok(dict)), + MaybeNext::Some(Err(e)) => Some(Err(e)), + MaybeNext::None => None, + MaybeNext::More => self.next(), + } + } +} + +#[derive(Debug)] +pub struct NestedDictIter +where + I: DataPages, + O: Offset, + K: DictionaryKey, +{ + iter: I, + init: Vec, + data_type: DataType, + values: Dict, + items: VecDeque<(NestedState, (Vec, MutableBitmap))>, + chunk_size: Option, + phantom: std::marker::PhantomData, +} + +impl NestedDictIter +where + I: DataPages, + O: Offset, + K: DictionaryKey, +{ + pub fn new( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: Option, + ) -> Self { + Self { + iter, + init, + data_type, + values: Dict::Empty, + items: VecDeque::new(), + chunk_size, + phantom: Default::default(), + } + } +} + +impl Iterator for NestedDictIter +where + I: DataPages, + O: Offset, + K: DictionaryKey, +{ + type Item = Result<(NestedState, DictionaryArray)>; + + fn next(&mut self) -> Option { + let maybe_state = nested_next_dict( + &mut self.iter, + &mut self.items, + &self.init, + &mut self.values, + self.data_type.clone(), + self.chunk_size, + |dict| read_dict::(self.data_type.clone(), dict), ); match maybe_state { MaybeNext::Some(Ok(dict)) => Some(Ok(dict)), @@ -104,3 +171,24 @@ where } } } + +/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] +pub fn iter_to_arrays_nested<'a, K, O, I>( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: Option, +) -> NestedArrayIter<'a> +where + I: 'a + DataPages, + O: Offset, + K: DictionaryKey, +{ + Box::new( + NestedDictIter::::new(iter, init, data_type, chunk_size).map(|result| { + let (mut nested, array) = result?; + let _ = nested.nested.pop().unwrap(); // the primitive + Ok((nested, array.boxed())) + }), + ) +} diff --git a/src/io/parquet/read/deserialize/binary/mod.rs b/src/io/parquet/read/deserialize/binary/mod.rs index 613b95e9ca9..e17557c4b41 100644 --- a/src/io/parquet/read/deserialize/binary/mod.rs +++ b/src/io/parquet/read/deserialize/binary/mod.rs @@ -5,5 +5,5 @@ mod utils; pub use self::nested::NestedIter; pub use basic::Iter; -pub use dictionary::DictIter; +pub use dictionary::{iter_to_arrays_nested as iter_to_dict_arrays_nested, DictIter}; pub use nested::iter_to_arrays_nested; diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs b/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs index 4d44ef4f724..2fec20fcd64 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs @@ -7,6 +7,7 @@ use crate::{ bitmap::MutableBitmap, datatypes::DataType, error::Result, + io::parquet::read::deserialize::nested_utils::{InitNested, NestedArrayIter, NestedState}, }; use super::super::dictionary::*; @@ -22,7 +23,6 @@ where { iter: I, data_type: DataType, - values_data_type: DataType, values: Dict, items: VecDeque<(Vec, MutableBitmap)>, chunk_size: Option, @@ -34,14 +34,9 @@ where I: DataPages, { pub fn new(iter: I, data_type: DataType, chunk_size: Option) -> Self { - let values_data_type = match &data_type { - DataType::Dictionary(_, values, _) => values.as_ref().clone(), - _ => unreachable!(), - }; Self { iter, data_type, - values_data_type, values: Dict::Empty, items: VecDeque::new(), chunk_size, @@ -50,6 +45,10 @@ where } fn read_dict(data_type: DataType, dict: &dyn DictPage) -> Box { + let data_type = match data_type { + DataType::Dictionary(_, values, _) => *values, + _ => data_type, + }; let dict = dict .as_any() .downcast_ref::() @@ -77,7 +76,7 @@ where &mut self.values, self.data_type.clone(), self.chunk_size, - |dict| read_dict(self.values_data_type.clone(), dict), + |dict| read_dict(self.data_type.clone(), dict), ); match maybe_state { MaybeNext::Some(Ok(dict)) => Some(Ok(dict)), @@ -87,3 +86,85 @@ where } } } + +#[derive(Debug)] +pub struct NestedDictIter +where + I: DataPages, + K: DictionaryKey, +{ + iter: I, + init: Vec, + data_type: DataType, + values: Dict, + items: VecDeque<(NestedState, (Vec, MutableBitmap))>, + chunk_size: Option, +} + +impl NestedDictIter +where + I: DataPages, + K: DictionaryKey, +{ + pub fn new( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: Option, + ) -> Self { + Self { + iter, + init, + data_type, + values: Dict::Empty, + items: VecDeque::new(), + chunk_size, + } + } +} + +impl Iterator for NestedDictIter +where + I: DataPages, + K: DictionaryKey, +{ + type Item = Result<(NestedState, DictionaryArray)>; + + fn next(&mut self) -> Option { + let maybe_state = nested_next_dict( + &mut self.iter, + &mut self.items, + &self.init, + &mut self.values, + self.data_type.clone(), + self.chunk_size, + |dict| read_dict(self.data_type.clone(), dict), + ); + match maybe_state { + MaybeNext::Some(Ok(dict)) => Some(Ok(dict)), + MaybeNext::Some(Err(e)) => Some(Err(e)), + MaybeNext::None => None, + MaybeNext::More => self.next(), + } + } +} + +/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] +pub fn iter_to_arrays_nested<'a, K, I>( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: Option, +) -> NestedArrayIter<'a> +where + I: 'a + DataPages, + K: DictionaryKey, +{ + Box::new( + NestedDictIter::::new(iter, init, data_type, chunk_size).map(|result| { + let (mut nested, array) = result?; + let _ = nested.nested.pop().unwrap(); // the primitive + Ok((nested, array.boxed())) + }), + ) +} diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/mod.rs b/src/io/parquet/read/deserialize/fixed_size_binary/mod.rs index 8173065d37c..55b57a519f6 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/mod.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/mod.rs @@ -3,4 +3,4 @@ mod dictionary; mod utils; pub use basic::Iter; -pub use dictionary::DictIter; +pub use dictionary::{iter_to_arrays_nested as iter_to_dict_arrays_nested, DictIter}; diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index 2ee84b6ca7d..ffe19ac8d9f 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -17,7 +17,7 @@ use crate::{ array::{ Array, BinaryArray, DictionaryKey, FixedSizeListArray, ListArray, MapArray, Utf8Array, }, - datatypes::{DataType, Field}, + datatypes::{DataType, Field, IntervalUnit}, error::{Error, Result}, }; @@ -289,9 +289,9 @@ where chunk_size, ) } - _ => match field.data_type().to_logical_type() { DataType::Dictionary(key_type, _, _) => { + init.push(InitNested::Primitive(field.is_nullable)); let type_ = types.pop().unwrap(); let iter = columns.pop().unwrap(); let data_type = field.data_type().clone(); @@ -434,50 +434,76 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( chunk_size, |x: i32| x as u8, ), - Float32 => primitive::iter_to_dict_arrays_nested::( + UInt16 => primitive::iter_to_dict_arrays_nested::( iter, init, data_type, chunk_size, - |x: f32| x, + |x: i32| x as u16, ), - Float64 => primitive::iter_to_dict_arrays_nested::( + UInt32 => primitive::iter_to_dict_arrays_nested::( iter, init, data_type, chunk_size, - |x: f64| x, + |x: i32| x as u32, ), - /* - UInt16 => dyn_iter(primitive::DictIter::::new( + Int8 => primitive::iter_to_dict_arrays_nested::( iter, + init, data_type, chunk_size, - |x: i32| x as u16, - )), - UInt32 => dyn_iter(primitive::DictIter::::new( + |x: i32| x as i8, + ), + Int16 => primitive::iter_to_dict_arrays_nested::( iter, + init, data_type, chunk_size, - |x: i32| x as u32, - )), - Int8 => dyn_iter(primitive::DictIter::::new( + |x: i32| x as i16, + ), + Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => { + primitive::iter_to_dict_arrays_nested::( + iter, + init, + data_type, + chunk_size, + |x: i32| x, + ) + } + Int64 | Date64 | Time64(_) | Duration(_) => { + primitive::iter_to_dict_arrays_nested::( + iter, + init, + data_type, + chunk_size, + |x: i64| x as i32, + ) + } + Float32 => primitive::iter_to_dict_arrays_nested::( iter, + init, data_type, chunk_size, - |x: i32| x as i8, - )), - Int16 => dyn_iter(primitive::DictIter::::new( + |x: f32| x, + ), + Float64 => primitive::iter_to_dict_arrays_nested::( iter, + init, data_type, chunk_size, - |x: i32| x as i16, - )), - Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter( - primitive::DictIter::::new(iter, data_type, chunk_size, |x: i32| { - x as i32 - }), + |x: f64| x, ), + Utf8 | Binary => { + binary::iter_to_dict_arrays_nested::(iter, init, data_type, chunk_size) + } + LargeUtf8 | LargeBinary => { + binary::iter_to_dict_arrays_nested::(iter, init, data_type, chunk_size) + } + FixedSizeBinary(_) => { + fixed_size_binary::iter_to_dict_arrays_nested::(iter, init, data_type, chunk_size) + } + /* Timestamp(time_unit, _) => { let time_unit = *time_unit; @@ -490,32 +516,6 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( time_unit, ); } - - Int64 | Date64 | Time64(_) | Duration(_) => dyn_iter( - primitive::DictIter::::new(iter, data_type, chunk_size, |x: i64| x), - ), - Float32 => dyn_iter(primitive::DictIter::::new( - iter, - data_type, - chunk_size, - |x: f32| x, - )), - Float64 => dyn_iter(primitive::DictIter::::new( - iter, - data_type, - chunk_size, - |x: f64| x, - )), - - Utf8 | Binary => dyn_iter(binary::DictIter::::new( - iter, data_type, chunk_size, - )), - LargeUtf8 | LargeBinary => dyn_iter(binary::DictIter::::new( - iter, data_type, chunk_size, - )), - FixedSizeBinary(_) => dyn_iter(fixed_size_binary::DictIter::::new( - iter, data_type, chunk_size, - )), */ other => { return Err(Error::nyi(format!( diff --git a/src/io/parquet/read/deserialize/primitive/dictionary.rs b/src/io/parquet/read/deserialize/primitive/dictionary.rs index bc0a5f43e8f..9db058ba6e8 100644 --- a/src/io/parquet/read/deserialize/primitive/dictionary.rs +++ b/src/io/parquet/read/deserialize/primitive/dictionary.rs @@ -25,6 +25,10 @@ where P: ParquetNativeType, F: Copy + Fn(P) -> T, { + let data_type = match data_type { + DataType::Dictionary(_, values, _) => *values, + _ => data_type, + }; let dict = dict .as_any() .downcast_ref::>() @@ -46,7 +50,6 @@ where { iter: I, data_type: DataType, - values_data_type: DataType, values: Dict, items: VecDeque<(Vec, MutableBitmap)>, chunk_size: Option, @@ -64,14 +67,9 @@ where F: Copy + Fn(P) -> T, { pub fn new(iter: I, data_type: DataType, chunk_size: Option, op: F) -> Self { - let values_data_type = match &data_type { - DataType::Dictionary(_, values, _) => *(values.clone()), - _ => unreachable!(), - }; Self { iter, data_type, - values_data_type, values: Dict::Empty, items: VecDeque::new(), chunk_size, @@ -98,7 +96,7 @@ where &mut self.values, self.data_type.clone(), self.chunk_size, - |dict| read_dict::(self.values_data_type.clone(), self.op, dict), + |dict| read_dict::(self.data_type.clone(), self.op, dict), ); match maybe_state { MaybeNext::Some(Ok(dict)) => Some(Ok(dict)), @@ -144,10 +142,6 @@ where chunk_size: Option, op: F, ) -> Self { - let data_type = match data_type { - DataType::Dictionary(_, values, _) => *values, - _ => data_type, - }; Self { iter, init, diff --git a/src/io/parquet/write/binary/nested.rs b/src/io/parquet/write/binary/nested.rs index 198e1d2156c..55f5d2ef247 100644 --- a/src/io/parquet/write/binary/nested.rs +++ b/src/io/parquet/write/binary/nested.rs @@ -14,7 +14,7 @@ pub fn array_to_page( array: &BinaryArray, options: WriteOptions, type_: PrimitiveType, - nested: Vec, + nested: &[Nested], ) -> Result where O: Offset, @@ -23,7 +23,7 @@ where let mut buffer = vec![]; let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, &nested, &mut buffer)?; + nested::write_rep_and_def(options.version, nested, &mut buffer)?; encode_plain(array, is_optional, &mut buffer); @@ -35,7 +35,7 @@ where utils::build_plain_page( buffer, - nested::num_values(&nested), + nested::num_values(nested), nested[0].len(), array.null_count(), repetition_levels_byte_length, diff --git a/src/io/parquet/write/boolean/nested.rs b/src/io/parquet/write/boolean/nested.rs index 4bd741ab52a..9d9e49100f6 100644 --- a/src/io/parquet/write/boolean/nested.rs +++ b/src/io/parquet/write/boolean/nested.rs @@ -14,13 +14,13 @@ pub fn array_to_page( array: &BooleanArray, options: WriteOptions, type_: PrimitiveType, - nested: Vec, + nested: &[Nested], ) -> Result { let is_optional = is_nullable(&type_.field_info); let mut buffer = vec![]; let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, &nested, &mut buffer)?; + nested::write_rep_and_def(options.version, nested, &mut buffer)?; encode_plain(array, is_optional, &mut buffer)?; @@ -32,7 +32,7 @@ pub fn array_to_page( utils::build_plain_page( buffer, - nested::num_values(&nested), + nested::num_values(nested), nested[0].len(), array.null_count(), repetition_levels_byte_length, diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index f6a9bcabbb6..befd8a5447c 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -6,8 +6,17 @@ use parquet2::{ write::DynIter, }; -use super::binary::build_statistics as binary_build_statistics; -use super::binary::encode_plain as binary_encode_plain; +use crate::io::parquet::write::utils; +use crate::{ + array::{Array, DictionaryArray, DictionaryKey}, + io::parquet::read::schema::is_nullable, +}; +use crate::{bitmap::Bitmap, datatypes::DataType}; +use crate::{ + bitmap::MutableBitmap, + error::{Error, Result}, +}; + use super::fixed_len_bytes::build_statistics as fixed_binary_build_statistics; use super::fixed_len_bytes::encode_plain as fixed_binary_encode_plain; use super::primitive::build_statistics as primitive_build_statistics; @@ -15,94 +24,118 @@ use super::primitive::encode_plain as primitive_encode_plain; use super::utf8::build_statistics as utf8_build_statistics; use super::utf8::encode_plain as utf8_encode_plain; use super::WriteOptions; -use crate::bitmap::Bitmap; -use crate::datatypes::DataType; -use crate::error::{Error, Result}; -use crate::io::parquet::write::utils; -use crate::{ - array::{Array, DictionaryArray, DictionaryKey}, - io::parquet::read::schema::is_nullable, -}; +use super::{binary::build_statistics as binary_build_statistics, Nested}; +use super::{binary::encode_plain as binary_encode_plain, nested}; -fn encode_keys( - array: &DictionaryArray, - type_: PrimitiveType, - statistics: ParquetStatistics, +fn serialize_def_levels_simple( + validity: Option<&Bitmap>, + length: usize, + is_optional: bool, options: WriteOptions, -) -> Result { - let validity = array.values().validity(); - let is_optional = is_nullable(&type_.field_info); - - let mut buffer = vec![]; - - let null_count = if let Some(validity) = validity { - let projected_validity = array - .keys_iter() - .map(|x| x.map(|x| validity.get_bit(x)).unwrap_or(false)); - let projected_val = Bitmap::from_trusted_len_iter(projected_validity); - - let null_count = projected_val.unset_bits(); - - utils::write_def_levels( - &mut buffer, - is_optional, - Some(&projected_val), - array.len(), - options.version, - )?; - null_count - } else { - utils::write_def_levels( - &mut buffer, - is_optional, - array.validity(), - array.len(), - options.version, - )?; - array.null_count() - }; - - let definition_levels_byte_length = buffer.len(); + buffer: &mut Vec, +) -> Result<()> { + utils::write_def_levels(buffer, is_optional, validity, length, options.version) +} - // encode indices - // compute the required number of bits +fn serialize_keys_values( + array: &DictionaryArray, + validity: Option<&Bitmap>, + buffer: &mut Vec, +) -> Result<()> { + let keys = array.keys_values_iter().map(|x| x as u32); if let Some(validity) = validity { - let keys = array.keys_iter().flatten().filter_map(|index| { - // discard indices whose values are null, since they are part of the def levels. - if validity.get_bit(index) { - Some(index as u32) - } else { - None - } - }); + // discard indices whose values are null. + let keys = keys + .zip(validity.iter()) + .filter_map(|(key, is_valid)| is_valid.then(|| key)); let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64) as u8; - let keys = utils::ExactSizedIter::new(keys, array.len() - null_count); + let keys = utils::ExactSizedIter::new(keys, array.len() - validity.unset_bits()); // num_bits as a single byte buffer.push(num_bits); // followed by the encoded indices. - encode_u32(&mut buffer, keys, num_bits)?; + Ok(encode_u32(buffer, keys, num_bits)?) } else { - let keys = array.keys_iter().flatten().map(|x| x as u32); let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64) as u8; - let keys = utils::ExactSizedIter::new(keys, array.len() - array.null_count()); - // num_bits as a single byte buffer.push(num_bits); // followed by the encoded indices. - encode_u32(&mut buffer, keys, num_bits)?; + Ok(encode_u32(buffer, keys, num_bits)?) } +} + +fn serialize_levels( + validity: Option<&Bitmap>, + length: usize, + type_: &PrimitiveType, + nested: &[Nested], + options: WriteOptions, + buffer: &mut Vec, +) -> Result<(usize, usize)> { + if nested.len() == 1 { + let is_optional = is_nullable(&type_.field_info); + serialize_def_levels_simple(validity, length, is_optional, options, buffer)?; + let definition_levels_byte_length = buffer.len(); + Ok((0, definition_levels_byte_length)) + } else { + nested::write_rep_and_def(options.version, nested, buffer) + } +} + +fn normalized_validity(array: &DictionaryArray) -> Option { + match (array.keys().validity(), array.values().validity()) { + (None, None) => None, + (None, rhs) => rhs.cloned(), + (lhs, None) => lhs.cloned(), + (Some(_), Some(rhs)) => { + let projected_validity = array + .keys_iter() + .map(|x| x.map(|x| rhs.get_bit(x)).unwrap_or(false)); + MutableBitmap::from_trusted_len_iter(projected_validity).into() + } + } +} + +fn serialize_keys( + array: &DictionaryArray, + type_: PrimitiveType, + nested: &[Nested], + statistics: ParquetStatistics, + options: WriteOptions, +) -> Result { + let mut buffer = vec![]; + + // parquet only accepts a single validity - we "&" the validities into a single one + // and ignore keys whole _value_ is null. + let validity = normalized_validity(array); + + let (repetition_levels_byte_length, definition_levels_byte_length) = serialize_levels( + validity.as_ref(), + array.len(), + &type_, + nested, + options, + &mut buffer, + )?; + + serialize_keys_values(array, validity.as_ref(), &mut buffer)?; + + let (num_values, num_rows) = if nested.len() == 1 { + (array.len(), array.len()) + } else { + (nested::num_values(nested), nested[0].len()) + }; utils::build_plain_page( buffer, - array.len(), - array.len(), + num_values, + num_rows, array.null_count(), - 0, + repetition_levels_byte_length, definition_levels_byte_length, Some(statistics), type_, @@ -127,6 +160,7 @@ macro_rules! dyn_prim { pub fn array_to_pages( array: &DictionaryArray, type_: PrimitiveType, + nested: &[Nested], options: WriteOptions, encoding: Encoding, ) -> Result>> { @@ -200,7 +234,7 @@ pub fn array_to_pages( let dict_page = EncodedPage::Dict(dict_page); // write DataPage pointing to DictPage - let data_page = encode_keys(array, type_, statistics, options)?; + let data_page = serialize_keys(array, type_, nested, statistics, options)?; let iter = std::iter::once(Ok(dict_page)).chain(std::iter::once(Ok(data_page))); Ok(DynIter::new(Box::new(iter))) diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 4fea3d9c4aa..9f16ea9fb01 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -95,7 +95,7 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool { pub fn array_to_pages( array: &dyn Array, type_: ParquetPrimitiveType, - nested: Vec, + nested: &[Nested], options: WriteOptions, encoding: Encoding, ) -> Result>> { @@ -110,7 +110,7 @@ pub fn array_to_pages( let right = array.slice(split_at, array.len() - split_at); Ok(DynIter::new( - array_to_pages(&*left, type_.clone(), nested.clone(), options, encoding)? + array_to_pages(&*left, type_.clone(), nested, options, encoding)? .chain(array_to_pages(&*right, type_, nested, options, encoding)?), )) } else { @@ -120,6 +120,7 @@ pub fn array_to_pages( dictionary::array_to_pages::<$T>( array.as_any().downcast_ref().unwrap(), type_, + nested, options, encoding, ) @@ -135,7 +136,7 @@ pub fn array_to_pages( pub fn array_to_page( array: &dyn Array, type_: ParquetPrimitiveType, - nested: Vec, + nested: &[Nested], options: WriteOptions, encoding: Encoding, ) -> Result { @@ -373,7 +374,7 @@ pub fn array_to_page_simple( fn array_to_page_nested( array: &dyn Array, type_: ParquetPrimitiveType, - nested: Vec, + nested: &[Nested], options: WriteOptions, _encoding: Encoding, ) -> Result { diff --git a/src/io/parquet/write/pages.rs b/src/io/parquet/write/pages.rs index 64f6da9a0f6..0e8dcf3d69d 100644 --- a/src/io/parquet/write/pages.rs +++ b/src/io/parquet/write/pages.rs @@ -215,7 +215,7 @@ pub fn array_to_columns + Send + Sync>( .zip(types.into_iter()) .zip(encoding.iter()) .map(|(((values, nested), type_), encoding)| { - array_to_pages(*values, type_, nested, options, *encoding) + array_to_pages(*values, type_, &nested, options, *encoding) }) .collect() } diff --git a/src/io/parquet/write/primitive/nested.rs b/src/io/parquet/write/primitive/nested.rs index 9bd13184ca4..ffbfde6554c 100644 --- a/src/io/parquet/write/primitive/nested.rs +++ b/src/io/parquet/write/primitive/nested.rs @@ -18,7 +18,7 @@ pub fn array_to_page( array: &PrimitiveArray, options: WriteOptions, type_: PrimitiveType, - nested: Vec, + nested: &[Nested], ) -> Result where T: ArrowNativeType, @@ -29,7 +29,7 @@ where let mut buffer = vec![]; let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, &nested, &mut buffer)?; + nested::write_rep_and_def(options.version, nested, &mut buffer)?; encode_plain(array, is_optional, &mut buffer); @@ -44,7 +44,7 @@ where utils::build_plain_page( buffer, - nested::num_values(&nested), + nested::num_values(nested), nested[0].len(), array.null_count(), repetition_levels_byte_length, diff --git a/src/io/parquet/write/utf8/nested.rs b/src/io/parquet/write/utf8/nested.rs index 7a6e0ee05be..42babd46cd7 100644 --- a/src/io/parquet/write/utf8/nested.rs +++ b/src/io/parquet/write/utf8/nested.rs @@ -14,7 +14,7 @@ pub fn array_to_page( array: &Utf8Array, options: WriteOptions, type_: PrimitiveType, - nested: Vec, + nested: &[Nested], ) -> Result where O: Offset, @@ -23,7 +23,7 @@ where let mut buffer = vec![]; let (repetition_levels_byte_length, definition_levels_byte_length) = - nested::write_rep_and_def(options.version, &nested, &mut buffer)?; + nested::write_rep_and_def(options.version, nested, &mut buffer)?; encode_plain(array, is_optional, &mut buffer); @@ -35,7 +35,7 @@ where utils::build_plain_page( buffer, - nested::num_values(&nested), + nested::num_values(nested), nested[0].len(), array.null_count(), repetition_levels_byte_length, diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 2eab739a4ad..9b701192143 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -1441,9 +1441,37 @@ fn nested_dict() -> Result<()> { DataType::List(Box::new(Field::new( "item", floats.data_type().clone(), - true, + false, ))), - vec![0i32, 0, 2, 3, 3].into(), + vec![0i32, 0, 0, 2, 3].into(), + floats.boxed(), + Some([true, false, true, true].into()), + )?; + + let schema = Schema::from(vec![Field::new("floats", floats.data_type().clone(), true)]); + let batch = Chunk::try_new(vec![floats.boxed()])?; + + let r = integration_write(&schema, &[batch.clone()])?; + + let (new_schema, new_batches) = integration_read(&r)?; + + assert_eq!(new_schema, schema); + assert_eq!(new_batches, vec![batch]); + Ok(()) +} + +#[test] +fn nested_dict_utf8() -> Result<()> { + let indices = PrimitiveArray::from_values((0..3u64).map(|x| x % 2)); + let values = Utf8Array::::from_slice(["a", "b"]); + let floats = DictionaryArray::try_from_keys(indices, values.boxed()).unwrap(); + let floats = ListArray::try_new( + DataType::List(Box::new(Field::new( + "item", + floats.data_type().clone(), + false, + ))), + vec![0i32, 0, 0, 2, 3].into(), floats.boxed(), Some([true, false, true, true].into()), )?; diff --git a/tests/it/io/parquet/read_indexes.rs b/tests/it/io/parquet/read_indexes.rs index ee91ba846a5..b5fb2ba9c19 100644 --- a/tests/it/io/parquet/read_indexes.rs +++ b/tests/it/io/parquet/read_indexes.rs @@ -41,7 +41,7 @@ fn pages( .descriptor .primitive_type .clone(), - vec![Nested::Primitive(None, true, array.len())], + &[Nested::Primitive(None, true, array.len())], options, Encoding::Plain, ) @@ -57,7 +57,7 @@ fn pages( .descriptor .primitive_type .clone(), - vec![Nested::Primitive(None, true, array.len())], + &[Nested::Primitive(None, true, array.len())], options, encoding, )