diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index a6a2b48dc15..0c522366f2d 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -160,6 +160,26 @@ where |x: i64| x, ) } + Float32 => { + types.pop(); + primitive::iter_to_arrays_nested( + columns.pop().unwrap(), + init.pop().unwrap(), + field.data_type().clone(), + chunk_size, + |x: f32| x, + ) + } + Float64 => { + types.pop(); + primitive::iter_to_arrays_nested( + columns.pop().unwrap(), + init.pop().unwrap(), + field.data_type().clone(), + chunk_size, + |x: f64| x, + ) + } Utf8 => { types.pop(); binary::iter_to_arrays_nested::, _>( diff --git a/src/io/parquet/read/deserialize/nested_utils.rs b/src/io/parquet/read/deserialize/nested_utils.rs index 178ababd464..8c9b0eeca4f 100644 --- a/src/io/parquet/read/deserialize/nested_utils.rs +++ b/src/io/parquet/read/deserialize/nested_utils.rs @@ -18,12 +18,8 @@ use super::utils::{split_buffer, Decoder, MaybeNext, Pushable}; pub trait Nested: std::fmt::Debug + Send + Sync { fn inner(&mut self) -> (Buffer, Option); - fn last_offset(&self) -> i64; - fn push(&mut self, length: i64, is_valid: bool); - fn offsets(&mut self) -> &[i64]; - fn close(&mut self, length: i64); fn is_nullable(&self) -> bool; @@ -31,8 +27,6 @@ pub trait Nested: std::fmt::Debug + Send + Sync { /// number of rows fn len(&self) -> usize; - fn len1(&self) -> usize; - /// number of values associated to the primitive type this nested tracks fn num_values(&self) -> usize; } @@ -57,11 +51,6 @@ impl Nested for NestedPrimitive { (Default::default(), Default::default()) } - #[inline] - fn last_offset(&self) -> i64 { - 0 - } - fn is_nullable(&self) -> bool { self.is_nullable } @@ -70,20 +59,12 @@ impl Nested for NestedPrimitive { self.length += 1 } - fn offsets(&mut self) -> &[i64] { - &[] - } - fn close(&mut self, _length: i64) {} fn len(&self) -> usize { self.length } - fn len1(&self) -> usize { - self.length - } - fn num_values(&self) -> usize { self.length } @@ -102,11 +83,6 @@ impl Nested for NestedOptional { (offsets.into(), validity.into()) } - #[inline] - fn last_offset(&self) -> i64 { - *self.offsets.last().unwrap() - } - fn is_nullable(&self) -> bool { true } @@ -116,19 +92,11 @@ impl Nested for NestedOptional { self.validity.push(is_valid); } - fn offsets(&mut self) -> &[i64] { - &self.offsets - } - fn close(&mut self, length: i64) { self.offsets.push(length) } fn len(&self) -> usize { - self.offsets.len().saturating_sub(1) - } - - fn len1(&self) -> usize { self.offsets.len() } @@ -160,19 +128,10 @@ impl Nested for NestedValid { false } - #[inline] - fn last_offset(&self) -> i64 { - *self.offsets.last().unwrap() - } - fn push(&mut self, value: i64, _is_valid: bool) { self.offsets.push(value); } - fn offsets(&mut self) -> &[i64] { - &self.offsets - } - fn close(&mut self, length: i64) { self.offsets.push(length) } @@ -181,10 +140,6 @@ impl Nested for NestedValid { self.offsets.len().saturating_sub(1) } - fn len1(&self) -> usize { - self.offsets.len() - } - fn num_values(&self) -> usize { self.offsets.last().copied().unwrap_or(0) as usize } @@ -197,6 +152,78 @@ impl NestedValid { } } +#[derive(Debug, Default)] +pub struct NestedStructValid { + length: usize, +} + +impl NestedStructValid { + pub fn new() -> Self { + Self { length: 0 } + } +} + +impl Nested for NestedStructValid { + fn inner(&mut self) -> (Buffer, Option) { + (Default::default(), None) + } + + fn is_nullable(&self) -> bool { + false + } + + fn push(&mut self, _value: i64, _is_valid: bool) { + self.length += 1; + } + + fn close(&mut self, _length: i64) {} + + fn len(&self) -> usize { + self.length + } + + fn num_values(&self) -> usize { + self.length + } +} + +#[derive(Debug, Default)] +pub struct NestedStruct { + validity: MutableBitmap, +} + +impl NestedStruct { + pub fn with_capacity(capacity: usize) -> Self { + Self { + validity: MutableBitmap::with_capacity(capacity), + } + } +} + +impl Nested for NestedStruct { + fn inner(&mut self) -> (Buffer, Option) { + (Default::default(), None) + } + + fn is_nullable(&self) -> bool { + false + } + + fn push(&mut self, _value: i64, is_valid: bool) { + self.validity.push(is_valid) + } + + fn close(&mut self, _length: i64) {} + + fn len(&self) -> usize { + self.validity.len() + } + + fn num_values(&self) -> usize { + self.validity.len() + } +} + pub(super) fn read_optional_values( def_levels: D, max_def: u32, @@ -254,9 +281,9 @@ fn init_nested_recursive(init: &InitNested, capacity: usize, container: &mut Vec } InitNested::Struct(inner, is_nullable) => { if *is_nullable { - container.push(Box::new(NestedOptional::with_capacity(capacity)) as Box) + container.push(Box::new(NestedStruct::with_capacity(capacity)) as Box) } else { - container.push(Box::new(NestedValid::with_capacity(capacity)) as Box) + container.push(Box::new(NestedStructValid::new()) as Box) } init_nested_recursive(inner, capacity, container) } @@ -273,7 +300,6 @@ pub struct NestedPage<'a> { repetitions: HybridRleDecoder<'a>, _max_rep_level: u32, definitions: HybridRleDecoder<'a>, - max_def_level: u32, } impl<'a> NestedPage<'a> { @@ -295,7 +321,6 @@ impl<'a> NestedPage<'a> { get_bit_width(max_def_level), page.num_values(), ), - max_def_level: max_def_level as u32, } } @@ -323,12 +348,7 @@ impl NestedState { /// The number of values associated with the primitive type pub fn num_values(&self) -> usize { - self.nested[0].num_values() - } - - pub fn depth(&self) -> usize { - // outermost is the number of rows - self.nested.len() + self.nested.last().unwrap().num_values() } } @@ -430,13 +450,14 @@ pub fn extend_offsets1<'a>( } fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, additional: usize) { - let max_depth = nested.depth() - 1; - let mut values_count = vec![0; max_depth + 1]; - - let is_optional = nested.nested.last().unwrap().is_nullable(); - let max_def = page.max_def_level; + let nested = &mut nested.nested; + let mut values_count = vec![0; nested.len()]; - let rate = if max_def == 1 { 1 } else { 2 }; + let mut cum_sum = vec![0u32; nested.len() + 1]; + for (i, nest) in nested.iter().enumerate() { + let delta = if nest.is_nullable() { 2 } else { 1 }; + cum_sum[i + 1] = cum_sum[i] + delta; + } let mut iter = page.repetitions.by_ref().zip(page.definitions.by_ref()); @@ -448,42 +469,21 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi rows += 1 } - let closures = rep + 1 + (def / rate); - - nested - .nested - .iter_mut() - .enumerate() - .zip(values_count.iter()) - .skip(rep as usize) - .take(max_depth as usize - rep as usize) - .take(closures as usize) - .for_each(|((depth, nested), length)| { - let is_null = def - rep == depth as u32; - nested.push(*length, !is_null); - }); - - // add to the primitive - if (is_optional && def >= max_def - 1) || (!is_optional && def == max_def) { - let is_valid = def == max_def; - let length = values_count.last_mut().unwrap(); - nested.nested.last_mut().unwrap().push(*length, is_valid); - *length += 1; + for (depth, (nest, length)) in nested.iter_mut().zip(values_count.iter()).enumerate() { + if depth as u32 >= rep && def >= cum_sum[depth] { + let is_valid = nest.is_nullable() && def as u32 != cum_sum[depth]; + nest.push(*length, is_valid) + } } - values_count - .iter_mut() - .rev() - .skip(1) - .zip(nested.nested.iter().rev()) - .for_each(|(length, nested)| { - *length = nested.len1() as i64; - }); + for (depth, nest) in nested.iter().enumerate().skip(1) { + values_count[depth - 1] = nest.len() as i64 + } + values_count[nested.len() - 1] = nested[nested.len() - 1].len() as i64 } // close validities nested - .nested .iter_mut() .zip(values_count.iter()) .for_each(|(nested, length)| { diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 0528b85d846..0fa82f69eb8 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -174,6 +174,7 @@ pub fn pyarrow_nested_nullable(column: usize) -> Box { )) } 7 => { + // [[0, 1]], None, [[2, None], [3]], [[4, 5], [6]], [], [[7], None, [9]], [[], [None], None], [[10]] let data = [ Some(vec![Some(vec![Some(0), Some(1)])]), None, diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 90d61f0a97c..10a383fa590 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -259,13 +259,11 @@ fn v2_nested_nested() -> Result<()> { } #[test] -#[ignore] // todo fn v2_nested_nested_required() -> Result<()> { test_pyarrow_integration(8, 2, "nested", false, false, None) } #[test] -#[ignore] // todo fn v2_nested_nested_required_required() -> Result<()> { test_pyarrow_integration(9, 2, "nested", false, false, None) }