Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Re-added support for reading nested parquet lists (#881)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Mar 5, 2022
1 parent 9d4342c commit f71124b
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 93 deletions.
20 changes: 20 additions & 0 deletions src/io/parquet/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,26 @@ where
|x: i64| x,
)
}
Float32 => {
types.pop();
primitive::iter_to_arrays_nested(
columns.pop().unwrap(),
init.pop().unwrap(),
field.data_type().clone(),
chunk_size,
|x: f32| x,
)
}
Float64 => {
types.pop();
primitive::iter_to_arrays_nested(
columns.pop().unwrap(),
init.pop().unwrap(),
field.data_type().clone(),
chunk_size,
|x: f64| x,
)
}
Utf8 => {
types.pop();
binary::iter_to_arrays_nested::<i32, Utf8Array<i32>, _>(
Expand Down
182 changes: 91 additions & 91 deletions src/io/parquet/read/deserialize/nested_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,15 @@ use super::utils::{split_buffer, Decoder, MaybeNext, Pushable};
pub trait Nested: std::fmt::Debug + Send + Sync {
fn inner(&mut self) -> (Buffer<i64>, Option<Bitmap>);

fn last_offset(&self) -> i64;

fn push(&mut self, length: i64, is_valid: bool);

fn offsets(&mut self) -> &[i64];

fn close(&mut self, length: i64);

fn is_nullable(&self) -> bool;

/// number of rows
fn len(&self) -> usize;

fn len1(&self) -> usize;

/// number of values associated to the primitive type this nested tracks
fn num_values(&self) -> usize;
}
Expand All @@ -57,11 +51,6 @@ impl Nested for NestedPrimitive {
(Default::default(), Default::default())
}

#[inline]
fn last_offset(&self) -> i64 {
0
}

fn is_nullable(&self) -> bool {
self.is_nullable
}
Expand All @@ -70,20 +59,12 @@ impl Nested for NestedPrimitive {
self.length += 1
}

fn offsets(&mut self) -> &[i64] {
&[]
}

fn close(&mut self, _length: i64) {}

fn len(&self) -> usize {
self.length
}

fn len1(&self) -> usize {
self.length
}

fn num_values(&self) -> usize {
self.length
}
Expand All @@ -102,11 +83,6 @@ impl Nested for NestedOptional {
(offsets.into(), validity.into())
}

#[inline]
fn last_offset(&self) -> i64 {
*self.offsets.last().unwrap()
}

fn is_nullable(&self) -> bool {
true
}
Expand All @@ -116,19 +92,11 @@ impl Nested for NestedOptional {
self.validity.push(is_valid);
}

fn offsets(&mut self) -> &[i64] {
&self.offsets
}

fn close(&mut self, length: i64) {
self.offsets.push(length)
}

fn len(&self) -> usize {
self.offsets.len().saturating_sub(1)
}

fn len1(&self) -> usize {
self.offsets.len()
}

Expand Down Expand Up @@ -160,19 +128,10 @@ impl Nested for NestedValid {
false
}

#[inline]
fn last_offset(&self) -> i64 {
*self.offsets.last().unwrap()
}

fn push(&mut self, value: i64, _is_valid: bool) {
self.offsets.push(value);
}

fn offsets(&mut self) -> &[i64] {
&self.offsets
}

fn close(&mut self, length: i64) {
self.offsets.push(length)
}
Expand All @@ -181,10 +140,6 @@ impl Nested for NestedValid {
self.offsets.len().saturating_sub(1)
}

fn len1(&self) -> usize {
self.offsets.len()
}

fn num_values(&self) -> usize {
self.offsets.last().copied().unwrap_or(0) as usize
}
Expand All @@ -197,6 +152,78 @@ impl NestedValid {
}
}

#[derive(Debug, Default)]
pub struct NestedStructValid {
length: usize,
}

impl NestedStructValid {
pub fn new() -> Self {
Self { length: 0 }
}
}

impl Nested for NestedStructValid {
fn inner(&mut self) -> (Buffer<i64>, Option<Bitmap>) {
(Default::default(), None)
}

fn is_nullable(&self) -> bool {
false
}

fn push(&mut self, _value: i64, _is_valid: bool) {
self.length += 1;
}

fn close(&mut self, _length: i64) {}

fn len(&self) -> usize {
self.length
}

fn num_values(&self) -> usize {
self.length
}
}

#[derive(Debug, Default)]
pub struct NestedStruct {
validity: MutableBitmap,
}

impl NestedStruct {
pub fn with_capacity(capacity: usize) -> Self {
Self {
validity: MutableBitmap::with_capacity(capacity),
}
}
}

impl Nested for NestedStruct {
fn inner(&mut self) -> (Buffer<i64>, Option<Bitmap>) {
(Default::default(), None)
}

fn is_nullable(&self) -> bool {
false
}

fn push(&mut self, _value: i64, is_valid: bool) {
self.validity.push(is_valid)
}

fn close(&mut self, _length: i64) {}

fn len(&self) -> usize {
self.validity.len()
}

fn num_values(&self) -> usize {
self.validity.len()
}
}

pub(super) fn read_optional_values<D, C, G, P>(
def_levels: D,
max_def: u32,
Expand Down Expand Up @@ -254,9 +281,9 @@ fn init_nested_recursive(init: &InitNested, capacity: usize, container: &mut Vec
}
InitNested::Struct(inner, is_nullable) => {
if *is_nullable {
container.push(Box::new(NestedOptional::with_capacity(capacity)) as Box<dyn Nested>)
container.push(Box::new(NestedStruct::with_capacity(capacity)) as Box<dyn Nested>)
} else {
container.push(Box::new(NestedValid::with_capacity(capacity)) as Box<dyn Nested>)
container.push(Box::new(NestedStructValid::new()) as Box<dyn Nested>)
}
init_nested_recursive(inner, capacity, container)
}
Expand All @@ -273,7 +300,6 @@ pub struct NestedPage<'a> {
repetitions: HybridRleDecoder<'a>,
_max_rep_level: u32,
definitions: HybridRleDecoder<'a>,
max_def_level: u32,
}

impl<'a> NestedPage<'a> {
Expand All @@ -295,7 +321,6 @@ impl<'a> NestedPage<'a> {
get_bit_width(max_def_level),
page.num_values(),
),
max_def_level: max_def_level as u32,
}
}

Expand Down Expand Up @@ -323,12 +348,7 @@ impl NestedState {

/// The number of values associated with the primitive type
pub fn num_values(&self) -> usize {
self.nested[0].num_values()
}

pub fn depth(&self) -> usize {
// outermost is the number of rows
self.nested.len()
self.nested.last().unwrap().num_values()
}
}

Expand Down Expand Up @@ -430,13 +450,14 @@ pub fn extend_offsets1<'a>(
}

fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, additional: usize) {
let max_depth = nested.depth() - 1;
let mut values_count = vec![0; max_depth + 1];

let is_optional = nested.nested.last().unwrap().is_nullable();
let max_def = page.max_def_level;
let nested = &mut nested.nested;
let mut values_count = vec![0; nested.len()];

let rate = if max_def == 1 { 1 } else { 2 };
let mut cum_sum = vec![0u32; nested.len() + 1];
for (i, nest) in nested.iter().enumerate() {
let delta = if nest.is_nullable() { 2 } else { 1 };
cum_sum[i + 1] = cum_sum[i] + delta;
}

let mut iter = page.repetitions.by_ref().zip(page.definitions.by_ref());

Expand All @@ -448,42 +469,21 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi
rows += 1
}

let closures = rep + 1 + (def / rate);

nested
.nested
.iter_mut()
.enumerate()
.zip(values_count.iter())
.skip(rep as usize)
.take(max_depth as usize - rep as usize)
.take(closures as usize)
.for_each(|((depth, nested), length)| {
let is_null = def - rep == depth as u32;
nested.push(*length, !is_null);
});

// add to the primitive
if (is_optional && def >= max_def - 1) || (!is_optional && def == max_def) {
let is_valid = def == max_def;
let length = values_count.last_mut().unwrap();
nested.nested.last_mut().unwrap().push(*length, is_valid);
*length += 1;
for (depth, (nest, length)) in nested.iter_mut().zip(values_count.iter()).enumerate() {
if depth as u32 >= rep && def >= cum_sum[depth] {
let is_valid = nest.is_nullable() && def as u32 != cum_sum[depth];
nest.push(*length, is_valid)
}
}

values_count
.iter_mut()
.rev()
.skip(1)
.zip(nested.nested.iter().rev())
.for_each(|(length, nested)| {
*length = nested.len1() as i64;
});
for (depth, nest) in nested.iter().enumerate().skip(1) {
values_count[depth - 1] = nest.len() as i64
}
values_count[nested.len() - 1] = nested[nested.len() - 1].len() as i64
}

// close validities
nested
.nested
.iter_mut()
.zip(values_count.iter())
.for_each(|(nested, length)| {
Expand Down
1 change: 1 addition & 0 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ pub fn pyarrow_nested_nullable(column: usize) -> Box<dyn Array> {
))
}
7 => {
// [[0, 1]], None, [[2, None], [3]], [[4, 5], [6]], [], [[7], None, [9]], [[], [None], None], [[10]]
let data = [
Some(vec![Some(vec![Some(0), Some(1)])]),
None,
Expand Down
2 changes: 0 additions & 2 deletions tests/it/io/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,11 @@ fn v2_nested_nested() -> Result<()> {
}

#[test]
#[ignore] // todo
fn v2_nested_nested_required() -> Result<()> {
test_pyarrow_integration(8, 2, "nested", false, false, None)
}

#[test]
#[ignore] // todo
fn v2_nested_nested_required_required() -> Result<()> {
test_pyarrow_integration(9, 2, "nested", false, false, None)
}
Expand Down

0 comments on commit f71124b

Please sign in to comment.