diff --git a/parquet_integration/bench_compute.py b/parquet_integration/bench_compute.py index 3fd3e331676..b52ecd82116 100644 --- a/parquet_integration/bench_compute.py +++ b/parquet_integration/bench_compute.py @@ -26,7 +26,7 @@ def get_f32(size, null_density): def bench_add_f32_pyarrow(log2_size, null_density): - size = 2 ** log2_size + size = 2**log2_size values, validity = get_f32(size, null_density) array1 = pa.array(values, pa.float32(), mask=validity) @@ -50,7 +50,7 @@ def f(): def bench_add_f32_numpy(log2_size): - size = 2 ** log2_size + size = 2**log2_size array1, _ = get_f32(size, 0) array2, _ = get_f32(size, 0) @@ -71,7 +71,7 @@ def f(): def _bench_unary_f32_pyarrow(log2_size, null_density, name, op): - size = 2 ** log2_size + size = 2**log2_size values, validity = get_f32(size, null_density) array = pa.array(values, pa.float32(), mask=validity) @@ -100,7 +100,7 @@ def bench_min_f32_pyarrow(log2_size, null_density): def _bench_unary_f32_numpy(log2_size, name, op): - size = 2 ** log2_size + size = 2**log2_size values, _ = get_f32(size, 0) @@ -128,7 +128,7 @@ def bench_min_f32_numpy(log2_size): def bench_sort_f32_pyarrow(log2_size, null_density): - size = 2 ** log2_size + size = 2**log2_size values, validity = get_f32(size, null_density) array = pa.array(values, pa.float32(), mask=validity) @@ -151,7 +151,7 @@ def f(): def bench_sort_f32_numpy(log2_size): null_density = 0 - size = 2 ** log2_size + size = 2**log2_size array, _ = get_f32(size, null_density) @@ -171,7 +171,7 @@ def f(): def bench_filter_f32_pyarrow(log2_size, null_density): - size = 2 ** log2_size + size = 2**log2_size values, validity = get_f32(size, null_density) _, mask = get_f32(size, 0.9) @@ -198,7 +198,7 @@ def f(): def bench_filter_f32_numpy(log2_size): null_density = 0 - size = 2 ** log2_size + size = 2**log2_size array, _ = get_f32(size, null_density) _, mask = get_f32(size, 0.1) diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index 7f296aed3f4..3fa56c35795 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -223,6 +223,10 @@ def case_struct() -> Tuple[dict, pa.Schema, str]: ] ), ), + pa.field( + "struct_nullable", + pa.struct(struct_fields), + ), ] ) @@ -230,6 +234,12 @@ def case_struct() -> Tuple[dict, pa.Schema, str]: [pa.array(string), pa.array(boolean)], fields=struct_fields, ) + struct_nullable = pa.StructArray.from_arrays( + [pa.array(string), pa.array(boolean)], + fields=struct_fields, + mask=pa.array([False, False, True, False, False, False, False, False, False, False]), + ) + return ( { "struct": struct, @@ -237,6 +247,7 @@ def case_struct() -> Tuple[dict, pa.Schema, str]: [struct, pa.array(boolean)], names=["f1", "f2"], ), + "struct_nullable": struct_nullable, }, schema, f"struct_nullable_10.parquet", @@ -307,16 +318,16 @@ def write_pyarrow( base_path = f"{base_path}/{compression}" if multiple_pages: - data_page_size = 2 ** 10 # i.e. a small number to ensure multiple pages + data_page_size = 2**10 # i.e. a small number to ensure multiple pages else: - data_page_size = 2 ** 40 # i.e. a large number to ensure a single page + data_page_size = 2**40 # i.e. a large number to ensure a single page t = pa.table(data, schema=schema) os.makedirs(base_path, exist_ok=True) pa.parquet.write_table( t, f"{base_path}/{path}", - row_group_size=2 ** 40, + row_group_size=2**40, use_dictionary=use_dictionary, compression=compression, write_statistics=True, @@ -325,7 +336,14 @@ def write_pyarrow( ) -for case in [case_basic_nullable, case_basic_required, case_nested, case_struct, case_nested_edge, case_map]: +for case in [ + case_basic_nullable, + case_basic_required, + case_nested, + case_struct, + case_nested_edge, + case_map, +]: for version in [1, 2]: for use_dict in [True, False]: for compression in ["lz4", None, "snappy"]: @@ -351,14 +369,14 @@ def case_benches_required(size): # for read benchmarks for i in range(10, 22, 2): # two pages (dict) - write_pyarrow(case_benches(2 ** i), 1, True, False, None) + write_pyarrow(case_benches(2**i), 1, True, False, None) # single page - write_pyarrow(case_benches(2 ** i), 1, False, False, None) + write_pyarrow(case_benches(2**i), 1, False, False, None) # single page required - write_pyarrow(case_benches_required(2 ** i), 1, False, False, None) + write_pyarrow(case_benches_required(2**i), 1, False, False, None) # multiple pages - write_pyarrow(case_benches(2 ** i), 1, False, True, None) + write_pyarrow(case_benches(2**i), 1, False, True, None) # multiple compressed pages - write_pyarrow(case_benches(2 ** i), 1, False, True, "snappy") + write_pyarrow(case_benches(2**i), 1, False, True, "snappy") # single compressed page - write_pyarrow(case_benches(2 ** i), 1, False, False, "snappy") + write_pyarrow(case_benches(2**i), 1, False, False, "snappy") diff --git a/src/io/parquet/read/deserialize/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs index 0d43c4a1cee..f7b16fe03ef 100644 --- a/src/io/parquet/read/deserialize/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -17,25 +17,24 @@ use super::basic::ValuesDictionary; use super::utils::*; use super::{ super::utils, - basic::{finish, Required, TraitBinaryArray}, + basic::{finish, TraitBinaryArray}, }; -#[allow(clippy::large_enum_variant)] #[derive(Debug)] enum State<'a> { - Optional(Optional<'a>, BinaryIter<'a>), - Required(Required<'a>), + Optional(BinaryIter<'a>), + Required(BinaryIter<'a>), RequiredDictionary(ValuesDictionary<'a>), - OptionalDictionary(Optional<'a>, ValuesDictionary<'a>), + OptionalDictionary(ValuesDictionary<'a>), } impl<'a> utils::PageState<'a> for State<'a> { fn len(&self) -> usize { match self { - State::Optional(validity, _) => validity.len(), - State::Required(state) => state.len(), + State::Optional(validity) => validity.size_hint().0, + State::Required(state) => state.size_hint().0, State::RequiredDictionary(required) => required.len(), - State::OptionalDictionary(optional, _) => optional.len(), + State::OptionalDictionary(optional) => optional.len(), } } } @@ -45,7 +44,7 @@ struct BinaryDecoder { phantom_o: std::marker::PhantomData, } -impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { +impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder { type State = State<'a>; type DecodedState = (Binary, MutableBitmap); @@ -62,25 +61,26 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { ) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => { let dict = dict.as_any().downcast_ref().unwrap(); - Ok(State::RequiredDictionary(ValuesDictionary::try_new( - page, dict, - )?)) + ValuesDictionary::try_new(page, dict).map(State::RequiredDictionary) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => { let dict = dict.as_any().downcast_ref().unwrap(); - Ok(State::OptionalDictionary( - Optional::try_new(page)?, - ValuesDictionary::try_new(page, dict)?, - )) + ValuesDictionary::try_new(page, dict).map(State::OptionalDictionary) } (Encoding::Plain, _, true, false) => { let (_, _, values) = split_buffer(page)?; let values = BinaryIter::new(values); - Ok(State::Optional(Optional::try_new(page)?, values)) + Ok(State::Optional(values)) + } + (Encoding::Plain, _, false, false) => { + let (_, _, values) = split_buffer(page)?; + + let values = BinaryIter::new(values); + + Ok(State::Required(values)) } - (Encoding::Plain, _, false, false) => Ok(State::Required(Required::try_new(page)?)), _ => Err(utils::not_implemented(page)), } } @@ -92,24 +92,17 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { ) } - fn extend_from_state( - &self, - state: &mut Self::State, - decoded: &mut Self::DecodedState, - additional: usize, - ) { + fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) { let (values, validity) = decoded; match state { - State::Optional(page_validity, page_values) => { - let items = page_validity.by_ref().take(additional); - let items = Zip::new(items, page_values.by_ref()); - - read_optional_values(items, values, validity) + State::Optional(page) => { + let value = page.next().unwrap_or_default(); + values.push(value); + validity.push(true); } State::Required(page) => { - for x in page.values.by_ref().take(additional) { - values.push(x) - } + let value = page.next().unwrap_or_default(); + values.push(value); } State::RequiredDictionary(page) => { let dict_values = page.dict.values(); @@ -121,13 +114,12 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { let dict_offset_ip1 = dict_offsets[index + 1] as usize; &dict_values[dict_offset_i..dict_offset_ip1] }; - for x in page.values.by_ref().map(op).take(additional) { - values.push(x) - } + let item = page.values.next().map(op).unwrap_or_default(); + values.push(item); } - State::OptionalDictionary(page_validity, page_values) => { - let dict_values = page_values.dict.values(); - let dict_offsets = page_values.dict.offsets(); + State::OptionalDictionary(page) => { + let dict_values = page.dict.values(); + let dict_offsets = page.dict.offsets(); let op = move |index: u32| { let index = index as usize; @@ -135,22 +127,25 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { let dict_offset_ip1 = dict_offsets[index + 1] as usize; &dict_values[dict_offset_i..dict_offset_ip1] }; - - let items = page_validity.by_ref().take(additional); - let items = Zip::new(items, page_values.values.by_ref().map(op)); - - read_optional_values(items, values, validity) + let item = page.values.next().map(op).unwrap_or_default(); + values.push(item); + validity.push(true); } } } + + fn push_null(&self, decoded: &mut Self::DecodedState) { + let (values, validity) = decoded; + values.push(&[]); + validity.push(false); + } } pub struct ArrayIterator, I: DataPages> { iter: I, data_type: DataType, init: Vec, - items: VecDeque<(Binary, MutableBitmap)>, - nested: VecDeque, + items: VecDeque<(NestedState, (Binary, MutableBitmap))>, chunk_size: Option, phantom_a: std::marker::PhantomData, } @@ -167,7 +162,6 @@ impl, I: DataPages> ArrayIterator { data_type, init, items: VecDeque::new(), - nested: VecDeque::new(), chunk_size, phantom_a: Default::default(), } @@ -181,7 +175,6 @@ impl, I: DataPages> Iterator for ArrayIterator let maybe_state = next( &mut self.iter, &mut self.items, - &mut self.nested, &self.init, self.chunk_size, &BinaryDecoder::::default(), diff --git a/src/io/parquet/read/deserialize/boolean/nested.rs b/src/io/parquet/read/deserialize/boolean/nested.rs index 163f23a9f30..8c20694ed5c 100644 --- a/src/io/parquet/read/deserialize/boolean/nested.rs +++ b/src/io/parquet/read/deserialize/boolean/nested.rs @@ -15,42 +15,22 @@ use crate::{ use super::super::nested_utils::*; use super::super::utils; -use super::super::utils::{Decoder, MaybeNext}; +use super::super::utils::MaybeNext; use super::super::DataPages; -// The state of a required DataPage with a boolean physical type -#[derive(Debug)] -struct Required<'a> { - values: &'a [u8], - // invariant: offset <= length; - offset: usize, - length: usize, -} - -impl<'a> Required<'a> { - pub fn try_new(page: &'a DataPage) -> Result { - let (_, _, values) = split_buffer(page)?; - Ok(Self { - values, - offset: 0, - length: page.num_values(), - }) - } -} - // The state of a `DataPage` of `Boolean` parquet boolean type #[allow(clippy::large_enum_variant)] #[derive(Debug)] enum State<'a> { - Optional(Optional<'a>, BitmapIter<'a>), - Required(Required<'a>), + Optional(BitmapIter<'a>), + Required(BitmapIter<'a>), } impl<'a> State<'a> { pub fn len(&self) -> usize { match self { - State::Optional(optional, _) => optional.len(), - State::Required(page) => page.length - page.offset, + State::Optional(iter) => iter.size_hint().0, + State::Required(iter) => iter.size_hint().0, } } } @@ -64,7 +44,7 @@ impl<'a> utils::PageState<'a> for State<'a> { #[derive(Default)] struct BooleanDecoder {} -impl<'a> Decoder<'a> for BooleanDecoder { +impl<'a> NestedDecoder<'a> for BooleanDecoder { type State = State<'a>; type DecodedState = (MutableBitmap, MutableBitmap); @@ -78,9 +58,14 @@ impl<'a> Decoder<'a> for BooleanDecoder { let (_, _, values) = split_buffer(page)?; let values = BitmapIter::new(values, 0, values.len() * 8); - Ok(State::Optional(Optional::try_new(page)?, values)) + Ok(State::Optional(values)) + } + (Encoding::Plain, false, false) => { + let (_, _, values) = split_buffer(page)?; + let values = BitmapIter::new(values, 0, values.len() * 8); + + Ok(State::Required(values)) } - (Encoding::Plain, false, false) => Ok(State::Required(Required::try_new(page)?)), _ => Err(utils::not_implemented(page)), } } @@ -92,26 +77,26 @@ impl<'a> Decoder<'a> for BooleanDecoder { ) } - fn extend_from_state( - &self, - state: &mut State, - decoded: &mut Self::DecodedState, - additional: usize, - ) { + fn push_valid(&self, state: &mut State, decoded: &mut Self::DecodedState) { let (values, validity) = decoded; match state { - State::Optional(page_validity, page_values) => { - let items = page_validity.by_ref().take(additional); - let items = Zip::new(items, page_values.by_ref()); - - read_optional_values(items, values, validity) + State::Optional(page_values) => { + let value = page_values.next().unwrap_or_default(); + values.push(value); + validity.push(true); } - State::Required(page) => { - values.extend_from_slice(page.values, page.offset, additional); - page.offset += additional; + State::Required(page_values) => { + let value = page_values.next().unwrap_or_default(); + values.push(value); } } } + + fn push_null(&self, decoded: &mut Self::DecodedState) { + let (values, validity) = decoded; + values.push(false); + validity.push(false); + } } /// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays @@ -119,9 +104,7 @@ impl<'a> Decoder<'a> for BooleanDecoder { pub struct ArrayIterator { iter: I, init: Vec, - // invariant: items.len() == nested.len() - items: VecDeque<(MutableBitmap, MutableBitmap)>, - nested: VecDeque, + items: VecDeque<(NestedState, (MutableBitmap, MutableBitmap))>, chunk_size: Option, } @@ -131,7 +114,6 @@ impl ArrayIterator { iter, init, items: VecDeque::new(), - nested: VecDeque::new(), chunk_size, } } @@ -148,7 +130,6 @@ impl Iterator for ArrayIterator { let maybe_state = next( &mut self.iter, &mut self.items, - &mut self.nested, &self.init, self.chunk_size, &BooleanDecoder::default(), diff --git a/src/io/parquet/read/deserialize/nested_utils.rs b/src/io/parquet/read/deserialize/nested_utils.rs index 637cdf89197..e369815d5c2 100644 --- a/src/io/parquet/read/deserialize/nested_utils.rs +++ b/src/io/parquet/read/deserialize/nested_utils.rs @@ -8,9 +8,9 @@ use parquet2::{ use crate::{array::Array, bitmap::MutableBitmap, error::Result}; -use super::super::DataPages; pub use super::utils::Zip; -use super::utils::{DecodedState, Decoder, MaybeNext, Pushable}; +use super::utils::{DecodedState, MaybeNext}; +use super::{super::DataPages, utils::PageState}; /// trait describing deserialized repetition and definition levels pub trait Nested: std::fmt::Debug + Send + Sync { @@ -24,6 +24,9 @@ pub trait Nested: std::fmt::Debug + Send + Sync { false } + // Whether the Arrow container requires all items to be filled. + fn is_required(&self) -> bool; + /// number of rows fn len(&self) -> usize; @@ -55,6 +58,10 @@ impl Nested for NestedPrimitive { self.is_nullable } + fn is_required(&self) -> bool { + false + } + fn push(&mut self, _value: i64, _is_valid: bool) { self.length += 1 } @@ -89,6 +96,11 @@ impl Nested for NestedOptional { true } + fn is_required(&self) -> bool { + // it may be for FixedSizeList + false + } + fn push(&mut self, value: i64, is_valid: bool) { self.offsets.push(value); self.validity.push(is_valid); @@ -130,6 +142,11 @@ impl Nested for NestedValid { true } + fn is_required(&self) -> bool { + // it may be for FixedSizeList + false + } + fn push(&mut self, value: i64, _is_valid: bool) { self.offsets.push(value); } @@ -170,6 +187,10 @@ impl Nested for NestedStructValid { false } + fn is_required(&self) -> bool { + true + } + fn push(&mut self, _value: i64, _is_valid: bool) { self.length += 1; } @@ -205,6 +226,10 @@ impl Nested for NestedStruct { true } + fn is_required(&self) -> bool { + true + } + fn push(&mut self, _value: i64, is_valid: bool) { self.validity.push(is_valid) } @@ -218,21 +243,18 @@ impl Nested for NestedStruct { } } -pub(super) fn read_optional_values(items: D, values: &mut P, validity: &mut MutableBitmap) -where - D: Iterator>, - C: Default, - P: Pushable, -{ - for item in items { - if let Some(item) = item { - values.push(item); - validity.push(true); - } else { - values.push_null(); - validity.push(false); - } - } +/// A decoder that knows how to map `State` -> Array +pub(super) trait NestedDecoder<'a> { + type State: PageState<'a>; + type DecodedState: DecodedState<'a>; + + fn build_state(&self, page: &'a DataPage) -> Result; + + /// Initializes a new state + fn with_capacity(&self, capacity: usize) -> Self::DecodedState; + + fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState); + fn push_null(&self, decoded: &mut Self::DecodedState); } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -310,99 +332,78 @@ impl NestedState { // outermost is the number of rows self.nested[0].len() } - - /// The number of values associated with the primitive type - pub fn num_values(&self) -> usize { - self.nested.last().unwrap().num_values() - } } -pub(super) fn extend_from_new_page<'a, T: Decoder<'a>>( - mut page: T::State, - items: &mut VecDeque, - nested: &VecDeque, - decoder: &T, -) { - let needed = nested.back().unwrap().num_values(); - - let mut decoded = if let Some(decoded) = items.pop_back() { - // there is a already a state => it must be incomplete... - debug_assert!( - decoded.len() < needed, - "the temp page is expected to be incomplete ({} < {})", - decoded.len(), - needed - ); - decoded - } else { - // there is no state => initialize it - decoder.with_capacity(needed) - }; - - let remaining = needed - decoded.len(); - - // extend the current state - decoder.extend_from_state(&mut page, &mut decoded, remaining); - - // the number of values required is always fulfilled because - // dremel assigns one (rep, def) to each value and we request - // items that complete a row - assert_eq!(decoded.len(), needed); - - items.push_back(decoded); - - for nest in nested.iter().skip(1) { - let num_values = nest.num_values(); - let mut decoded = decoder.with_capacity(num_values); - decoder.extend_from_state(&mut page, &mut decoded, num_values); - items.push_back(decoded); - } -} - -/// Extends `state` by consuming `page`, optionally extending `items` if `page` -/// has less items than `chunk_size` -pub fn extend_offsets1<'a>( - page: &mut NestedPage<'a>, +/// Extends `items` by consuming `page`, first trying to complete the last `item` +/// and extending it if more are needed +fn extend<'a, D: NestedDecoder<'a>>( + page: &'a DataPage, init: &[InitNested], - items: &mut VecDeque, + items: &mut VecDeque<(NestedState, D::DecodedState)>, + decoder: &D, chunk_size: Option, -) { +) -> Result<()> { + let mut values_page = decoder.build_state(page)?; + let mut page = NestedPage::try_new(page)?; + let capacity = chunk_size.unwrap_or(0); let chunk_size = chunk_size.unwrap_or(usize::MAX); - let mut nested = if let Some(nested) = items.pop_back() { + let (mut nested, mut decoded) = if let Some((nested, decoded)) = items.pop_back() { // there is a already a state => it must be incomplete... debug_assert!( nested.len() < chunk_size, "the temp array is expected to be incomplete" ); - nested + (nested, decoded) } else { // there is no state => initialize it - init_nested(init, capacity) + (init_nested(init, capacity), decoder.with_capacity(0)) }; let remaining = chunk_size - nested.len(); // extend the current state - extend_offsets2(page, &mut nested, remaining); - items.push_back(nested); + extend_offsets2( + &mut page, + &mut values_page, + &mut nested.nested, + &mut decoded, + decoder, + remaining, + ); + items.push_back((nested, decoded)); while page.len() > 0 { let mut nested = init_nested(init, capacity); - extend_offsets2(page, &mut nested, chunk_size); - items.push_back(nested); + let mut decoded = decoder.with_capacity(0); + extend_offsets2( + &mut page, + &mut values_page, + &mut nested.nested, + &mut decoded, + decoder, + chunk_size, + ); + items.push_back((nested, decoded)); } + Ok(()) } -fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, additional: usize) { - let nested = &mut nested.nested; +fn extend_offsets2<'a, D: NestedDecoder<'a>>( + page: &mut NestedPage<'a>, + values_state: &mut D::State, + nested: &mut [Box], + decoded: &mut D::DecodedState, + decoder: &D, + additional: usize, +) { let mut values_count = vec![0; nested.len()]; for (depth, nest) in nested.iter().enumerate().skip(1) { values_count[depth - 1] = nest.len() as i64 } - values_count[nested.len() - 1] = nested[nested.len() - 1].len() as i64; + *values_count.last_mut().unwrap() = nested.last().unwrap().len() as i64; let mut cum_sum = vec![0u32; nested.len() + 1]; for (i, nest) in nested.iter().enumerate() { @@ -410,24 +411,41 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi cum_sum[i + 1] = cum_sum[i] + delta; } + let mut is_required = vec![false; nested.len()]; + for (depth, nest) in nested.iter().enumerate().take(nested.len() - 1) { + is_required[depth + 1] = nest.is_required() && nest.is_nullable() + } + + let max_depth = nested.len() - 1; + let mut rows = 0; while let Some((rep, def)) = page.iter.next() { if rep == 0 { rows += 1; } - for (depth, (nest, length)) in nested.iter_mut().zip(values_count.iter()).enumerate() { - if depth as u32 >= rep && def >= cum_sum[depth] { + for (depth, (nest, &is_required)) in nested.iter_mut().zip(is_required.iter()).enumerate() { + let right_level = depth as u32 >= rep && def >= cum_sum[depth]; + if is_required || right_level { let is_valid = nest.is_nullable() && def != cum_sum[depth]; - nest.push(*length, is_valid) + let length = values_count[depth]; + nest.push(length, is_valid); + if depth > 0 { + values_count[depth - 1] = nest.len() as i64; + }; + + if depth == max_depth { + // the leaf / primitive + let is_valid = (def != cum_sum[depth]) || !nest.is_nullable(); + if right_level && is_valid { + decoder.push_valid(values_state, decoded); + } else { + decoder.push_null(decoded); + } + } } } - for (depth, nest) in nested.iter().enumerate().skip(1) { - values_count[depth - 1] = nest.len() as i64 - } - values_count[nested.len() - 1] = nested[nested.len() - 1].len() as i64; - let next_rep = page.iter.peek().map(|x| x.0).unwrap_or(0); if next_rep == 0 && rows == additional.saturating_add(1) { @@ -436,73 +454,27 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi } } -#[derive(Debug)] -pub struct Optional<'a> { - iter: HybridRleDecoder<'a>, - max_def: u32, -} - -impl<'a> Iterator for Optional<'a> { - type Item = bool; - - #[inline] - fn next(&mut self) -> Option { - self.iter.next().and_then(|def| { - if def == self.max_def { - Some(true) - } else if def == self.max_def - 1 { - Some(false) - } else { - self.next() - } - }) - } -} - -impl<'a> Optional<'a> { - pub fn try_new(page: &'a DataPage) -> Result { - let (_, def_levels, _) = split_buffer(page)?; - - let max_def = page.descriptor.max_def_level; - - Ok(Self { - iter: HybridRleDecoder::new(def_levels, get_bit_width(max_def), page.num_values()), - max_def: max_def as u32, - }) - } - - #[inline] - pub fn len(&self) -> usize { - unreachable!(); - } -} - #[inline] pub(super) fn next<'a, I, D>( iter: &'a mut I, - items: &mut VecDeque, - nested_items: &mut VecDeque, + items: &mut VecDeque<(NestedState, D::DecodedState)>, init: &[InitNested], chunk_size: Option, decoder: &D, ) -> MaybeNext> where I: DataPages, - D: Decoder<'a>, + D: NestedDecoder<'a>, { // front[a1, a2, a3, ...]back if items.len() > 1 { - let nested = nested_items.pop_front().unwrap(); - let decoded = items.pop_front().unwrap(); + let (nested, decoded) = items.pop_front().unwrap(); return MaybeNext::Some(Ok((nested, decoded))); } match iter.next() { Err(e) => MaybeNext::Some(Err(e.into())), Ok(None) => { - if let Some(nested) = nested_items.pop_front() { - // we have a populated item and no more pages - // the only case where an item's length may be smaller than chunk_size - let decoded = items.pop_front().unwrap(); + if let Some((nested, decoded)) = items.pop_front() { MaybeNext::Some(Ok((nested, decoded))) } else { MaybeNext::None @@ -510,27 +482,16 @@ where } Ok(Some(page)) => { // there is a new page => consume the page from the start - let nested_page = NestedPage::try_new(page); - let mut nested_page = match nested_page { - Ok(page) => page, + let error = extend(page, init, items, decoder, chunk_size); + match error { + Ok(_) => {} Err(e) => return MaybeNext::Some(Err(e)), }; - extend_offsets1(&mut nested_page, init, nested_items, chunk_size); - - let maybe_page = decoder.build_state(page); - let page = match maybe_page { - Ok(page) => page, - Err(e) => return MaybeNext::Some(Err(e)), - }; - - extend_from_new_page(page, items, nested_items, decoder); - - if nested_items.front().unwrap().len() < chunk_size.unwrap_or(0) { + if items.front().unwrap().0.len() < chunk_size.unwrap_or(0) { MaybeNext::More } else { - let nested = nested_items.pop_front().unwrap(); - let decoded = items.pop_front().unwrap(); + let (nested, decoded) = items.pop_front().unwrap(); MaybeNext::Some(Ok((nested, decoded))) } } diff --git a/src/io/parquet/read/deserialize/primitive/basic.rs b/src/io/parquet/read/deserialize/primitive/basic.rs index eddf95872d4..1a3d1afef96 100644 --- a/src/io/parquet/read/deserialize/primitive/basic.rs +++ b/src/io/parquet/read/deserialize/primitive/basic.rs @@ -148,7 +148,7 @@ where } } -impl<'a, T> utils::DecodedState<'a> for (Vec, MutableBitmap) { +impl<'a, T: std::fmt::Debug> utils::DecodedState<'a> for (Vec, MutableBitmap) { fn len(&self) -> usize { self.0.len() } diff --git a/src/io/parquet/read/deserialize/primitive/nested.rs b/src/io/parquet/read/deserialize/primitive/nested.rs index 4a2192e6237..587e1967adc 100644 --- a/src/io/parquet/read/deserialize/primitive/nested.rs +++ b/src/io/parquet/read/deserialize/primitive/nested.rs @@ -22,10 +22,10 @@ enum State<'a, P> where P: ParquetNativeType, { - Optional(Optional<'a>, Values<'a>), + Optional(Values<'a>), Required(Values<'a>), RequiredDictionary(ValuesDictionary<'a, P>), - OptionalDictionary(Optional<'a>, ValuesDictionary<'a, P>), + OptionalDictionary(ValuesDictionary<'a, P>), } impl<'a, P> utils::PageState<'a> for State<'a, P> @@ -34,10 +34,10 @@ where { fn len(&self) -> usize { match self { - State::Optional(optional, _) => optional.len(), - State::Required(required) => required.len(), - State::RequiredDictionary(required) => required.len(), - State::OptionalDictionary(optional, _) => optional.len(), + State::Optional(values) => values.len(), + State::Required(values) => values.len(), + State::RequiredDictionary(values) => values.len(), + State::OptionalDictionary(values) => values.len(), } } } @@ -70,7 +70,7 @@ where } } -impl<'a, T, P, F> utils::Decoder<'a> for PrimitiveDecoder +impl<'a, T, P, F> NestedDecoder<'a> for PrimitiveDecoder where T: NativeType, P: ParquetNativeType, @@ -96,20 +96,15 @@ where } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => { let dict = dict.as_any().downcast_ref().unwrap(); - Ok(State::OptionalDictionary( - Optional::try_new(page)?, - ValuesDictionary::try_new(page, dict)?, - )) + ValuesDictionary::try_new(page, dict).map(State::OptionalDictionary) } - (Encoding::Plain, _, true, false) => Ok(State::Optional( - Optional::try_new(page)?, - Values::try_new::

(page)?, - )), - (Encoding::Plain, _, false, false) => Ok(State::Required(Values::try_new::

(page)?)), + (Encoding::Plain, _, true, false) => Values::try_new::

(page).map(State::Optional), + (Encoding::Plain, _, false, false) => Values::try_new::

(page).map(State::Required), _ => Err(utils::not_implemented(page)), } } + /// Initializes a new state fn with_capacity(&self, capacity: usize) -> Self::DecodedState { ( Vec::::with_capacity(capacity), @@ -117,43 +112,41 @@ where ) } - fn extend_from_state( - &self, - state: &mut Self::State, - decoded: &mut Self::DecodedState, - additional: usize, - ) { + fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) { let (values, validity) = decoded; match state { - State::Optional(page_validity, page_values) => { - let items = page_validity.by_ref().take(additional); - let items = Zip::new(items, page_values.values.by_ref().map(decode).map(self.op)); - - read_optional_values(items, values, validity) + State::Optional(page_values) => { + let value = page_values.values.by_ref().next().map(decode).map(self.op); + // convert unwrap to error + values.push(value.unwrap_or_default()); + validity.push(true); } - State::Required(page) => { - values.extend( - page.values - .by_ref() - .map(decode) - .map(self.op) - .take(additional), - ); + State::Required(page_values) => { + let value = page_values.values.by_ref().next().map(decode).map(self.op); + // convert unwrap to error + values.push(value.unwrap_or_default()); } State::RequiredDictionary(page) => { let op1 = |index: u32| page.dict[index as usize]; - values.extend(page.values.by_ref().map(op1).map(self.op).take(additional)); - } - State::OptionalDictionary(page_validity, page_values) => { - let op1 = |index: u32| page_values.dict[index as usize]; + let value = page.values.next().map(op1).map(self.op); - let items = page_validity.by_ref().take(additional); - let items = Zip::new(items, page_values.values.by_ref().map(op1).map(self.op)); + values.push(value.unwrap_or_default()); + } + State::OptionalDictionary(page) => { + let op1 = |index: u32| page.dict[index as usize]; + let value = page.values.next().map(op1).map(self.op); - read_optional_values(items, values, validity) + values.push(value.unwrap_or_default()); + validity.push(true); } } } + + fn push_null(&self, decoded: &mut Self::DecodedState) { + let (values, validity) = decoded; + values.push(T::default()); + validity.push(false) + } } fn finish( @@ -177,9 +170,7 @@ where iter: I, init: Vec, data_type: DataType, - // invariant: items.len() == nested.len() - items: VecDeque<(Vec, MutableBitmap)>, - nested: VecDeque, + items: VecDeque<(NestedState, (Vec, MutableBitmap))>, chunk_size: Option, decoder: PrimitiveDecoder, } @@ -204,7 +195,6 @@ where init, data_type, items: VecDeque::new(), - nested: VecDeque::new(), chunk_size, decoder: PrimitiveDecoder::new(op), } @@ -225,7 +215,6 @@ where let maybe_state = next( &mut self.iter, &mut self.items, - &mut self.nested, &self.init, self.chunk_size, &self.decoder, diff --git a/src/io/parquet/read/deserialize/utils.rs b/src/io/parquet/read/deserialize/utils.rs index b29bc48ba4c..4c022850130 100644 --- a/src/io/parquet/read/deserialize/utils.rs +++ b/src/io/parquet/read/deserialize/utils.rs @@ -343,7 +343,8 @@ pub(super) trait PageState<'a>: std::fmt::Debug { } /// The state of a partially deserialized page -pub(super) trait DecodedState<'a> { +pub(super) trait DecodedState<'a>: std::fmt::Debug { + // the number of values that this decoder already consumed fn len(&self) -> usize; } diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 4f3ddd2b6a3..df138d1686c 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -706,36 +706,44 @@ pub fn pyarrow_struct(column: &str) -> Box { Some(true), ]; let boolean = BooleanArray::from(boolean).boxed(); + + let string = [ + Some("Hello"), + None, + Some("aa"), + Some(""), + None, + Some("abc"), + None, + None, + Some("def"), + Some("aaa"), + ]; + let string = Utf8Array::::from(string).boxed(); + let fields = vec![ Field::new("f1", DataType::Utf8, true), Field::new("f2", DataType::Boolean, true), ]; match column { - "struct" => { - let string = [ - Some("Hello"), - None, - Some("aa"), - Some(""), - None, - Some("abc"), - None, - None, - Some("def"), - Some("aaa"), - ]; - let values = vec![Utf8Array::::from(string).boxed(), boolean]; - StructArray::from_data(DataType::Struct(fields), values, None).boxed() + "struct" => StructArray::new(DataType::Struct(fields), vec![string, boolean], None).boxed(), + "struct_nullable" => { + let values = vec![string, boolean]; + StructArray::new( + DataType::Struct(fields), + values, + Some([true, true, false, true, true, true, true, true, true, true].into()), + ) + .boxed() } "struct_struct" => { let struct_ = pyarrow_struct("struct"); - let values = vec![struct_, boolean]; - Box::new(StructArray::from_data( + Box::new(StructArray::new( DataType::Struct(vec![ Field::new("f1", DataType::Struct(fields), true), Field::new("f2", DataType::Boolean, true), ]), - values, + vec![struct_, boolean], None, )) } @@ -756,7 +764,7 @@ pub fn pyarrow_struct_statistics(column: &str) -> Statistics { let names = vec!["f1".to_string(), "f2".to_string()]; match column { - "struct" => Statistics { + "struct" | "struct_nullable" => Statistics { distinct_count: Count::Struct(new_struct( vec![ Box::new(UInt64Array::from([None])), @@ -787,10 +795,64 @@ pub fn pyarrow_struct_statistics(column: &str) -> Statistics { )), }, "struct_struct" => Statistics { - distinct_count: Count::Single(UInt64Array::from([None])), - null_count: Count::Single(UInt64Array::from([Some(1)])), - min_value: Box::new(BooleanArray::from_slice([false])), - max_value: Box::new(BooleanArray::from_slice([true])), + distinct_count: Count::Struct(new_struct( + vec![ + new_struct( + vec![ + Box::new(UInt64Array::from([None])), + Box::new(UInt64Array::from([None])), + ], + names.clone(), + ) + .boxed(), + UInt64Array::from([None]).boxed(), + ], + names.clone(), + )), + null_count: Count::Struct(new_struct( + vec![ + new_struct( + vec![ + Box::new(UInt64Array::from([Some(4)])), + Box::new(UInt64Array::from([Some(4)])), + ], + names.clone(), + ) + .boxed(), + UInt64Array::from([Some(4)]).boxed(), + ], + names.clone(), + )), + min_value: new_struct( + vec![ + new_struct( + vec![ + Utf8Array::::from_slice([""]).boxed(), + BooleanArray::from_slice([false]).boxed(), + ], + names.clone(), + ) + .boxed(), + BooleanArray::from_slice([false]).boxed(), + ], + names.clone(), + ) + .boxed(), + max_value: new_struct( + vec![ + new_struct( + vec![ + Utf8Array::::from_slice(["def"]).boxed(), + BooleanArray::from_slice([true]).boxed(), + ], + names.clone(), + ) + .boxed(), + BooleanArray::from_slice([true]).boxed(), + ], + names, + ) + .boxed(), }, _ => todo!(), } diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 8ab783dbab3..675ec95c6bb 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -65,6 +65,7 @@ fn test_pyarrow_integration( "list_bool", "list_nested_inner_required_required_i64", "list_nested_inner_required_i64", + "struct_nullable", // it counts null struct items as nulls // pyarrow reports an incorrect min/max for MapArray "map", "map_nullable", @@ -447,16 +448,20 @@ fn v2_decimal_26_required_dict() -> Result<()> { } #[test] -fn v1_struct_optional() -> Result<()> { +fn v1_struct_required_optional() -> Result<()> { test_pyarrow_integration("struct", 1, "struct", false, false, None) } #[test] -#[ignore] -fn v1_struct_struct_optional() -> Result<()> { +fn v1_struct_struct() -> Result<()> { test_pyarrow_integration("struct_struct", 1, "struct", false, false, None) } +#[test] +fn v1_struct_optional_optional() -> Result<()> { + test_pyarrow_integration("struct_nullable", 1, "struct", false, false, None) +} + #[test] fn v1_nested_edge_1() -> Result<()> { test_pyarrow_integration("simple", 1, "nested_edge", false, false, None)