Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to read and write nested dictionaries to parquet #1175

Merged
merged 4 commits into from
Jul 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions examples/parquet_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@ use arrow2::{
},
};

fn write_batch(path: &str, schema: Schema, columns: Chunk<Box<dyn Array>>) -> Result<()> {
fn write_chunk(path: &str, schema: Schema, chunk: Chunk<Box<dyn Array>>) -> Result<()> {
let options = WriteOptions {
write_statistics: true,
compression: CompressionOptions::Uncompressed,
version: Version::V2,
};

let iter = vec![Ok(columns)];
let iter = vec![Ok(chunk)];

let encodings = schema
.fields
.iter()
.map(|f| transverse(&f.data_type, |_| Encoding::Plain))
.map(|f| transverse(&f.data_type, |_| Encoding::RleDictionary))
.collect();

let row_groups = RowGroupIterator::try_new(iter.into_iter(), &schema, options, encodings)?;
Expand Down Expand Up @@ -52,7 +52,7 @@ fn main() -> Result<()> {
]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::from(vec![field]);
let columns = Chunk::new(vec![array.boxed()]);
let chunk = Chunk::new(vec![array.boxed()]);

write_batch("test.parquet", schema, columns)
write_chunk("test.parquet", schema, chunk)
}
2 changes: 1 addition & 1 deletion src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ fn check<T: NativeType>(

if data_type.to_physical_type() != PhysicalType::Primitive(T::PRIMITIVE) {
return Err(Error::oos(
"BooleanArray can only be initialized with a DataType whose physical type is Primitive",
"PrimitiveArray can only be initialized with a DataType whose physical type is Primitive",
));
}
Ok(())
Expand Down
102 changes: 95 additions & 7 deletions src/io/parquet/read/deserialize/binary/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
bitmap::MutableBitmap,
datatypes::{DataType, PhysicalType},
error::Result,
io::parquet::read::deserialize::nested_utils::{InitNested, NestedArrayIter, NestedState},
};

use super::super::dictionary::*;
Expand All @@ -23,7 +24,6 @@ where
{
iter: I,
data_type: DataType,
values_data_type: DataType,
values: Dict,
items: VecDeque<(Vec<K>, MutableBitmap)>,
chunk_size: Option<usize>,
Expand All @@ -37,14 +37,9 @@ where
I: DataPages,
{
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>) -> Self {
let values_data_type = match &data_type {
DataType::Dictionary(_, values, _) => values.as_ref().clone(),
_ => unreachable!(),
};
Self {
iter,
data_type,
values_data_type,
values: Dict::Empty,
items: VecDeque::new(),
chunk_size,
Expand All @@ -54,6 +49,11 @@ where
}

fn read_dict<O: Offset>(data_type: DataType, dict: &dyn DictPage) -> Box<dyn Array> {
let data_type = match data_type {
DataType::Dictionary(_, values, _) => *values,
_ => data_type,
};

let dict = dict.as_any().downcast_ref::<BinaryPageDict>().unwrap();
let offsets = dict
.offsets()
Expand Down Expand Up @@ -94,7 +94,74 @@ where
&mut self.values,
self.data_type.clone(),
self.chunk_size,
|dict| read_dict::<O>(self.values_data_type.clone(), dict),
|dict| read_dict::<O>(self.data_type.clone(), dict),
);
match maybe_state {
MaybeNext::Some(Ok(dict)) => Some(Ok(dict)),
MaybeNext::Some(Err(e)) => Some(Err(e)),
MaybeNext::None => None,
MaybeNext::More => self.next(),
}
}
}

#[derive(Debug)]
pub struct NestedDictIter<K, O, I>
where
I: DataPages,
O: Offset,
K: DictionaryKey,
{
iter: I,
init: Vec<InitNested>,
data_type: DataType,
values: Dict,
items: VecDeque<(NestedState, (Vec<K>, MutableBitmap))>,
chunk_size: Option<usize>,
phantom: std::marker::PhantomData<O>,
}

impl<K, O, I> NestedDictIter<K, O, I>
where
I: DataPages,
O: Offset,
K: DictionaryKey,
{
pub fn new(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: Option<usize>,
) -> Self {
Self {
iter,
init,
data_type,
values: Dict::Empty,
items: VecDeque::new(),
chunk_size,
phantom: Default::default(),
}
}
}

impl<K, O, I> Iterator for NestedDictIter<K, O, I>
where
I: DataPages,
O: Offset,
K: DictionaryKey,
{
type Item = Result<(NestedState, DictionaryArray<K>)>;

fn next(&mut self) -> Option<Self::Item> {
let maybe_state = nested_next_dict(
&mut self.iter,
&mut self.items,
&self.init,
&mut self.values,
self.data_type.clone(),
self.chunk_size,
|dict| read_dict::<O>(self.data_type.clone(), dict),
);
match maybe_state {
MaybeNext::Some(Ok(dict)) => Some(Ok(dict)),
Expand All @@ -104,3 +171,24 @@ where
}
}
}

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays_nested<'a, K, O, I>(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: 'a + DataPages,
O: Offset,
K: DictionaryKey,
{
Box::new(
NestedDictIter::<K, O, I>::new(iter, init, data_type, chunk_size).map(|result| {
let (mut nested, array) = result?;
let _ = nested.nested.pop().unwrap(); // the primitive
Ok((nested, array.boxed()))
}),
)
}
39 changes: 3 additions & 36 deletions src/io/parquet/read/deserialize/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,7 @@ mod dictionary;
mod nested;
mod utils;

use crate::{
array::{Array, Offset},
datatypes::DataType,
};

use self::basic::TraitBinaryArray;
use self::nested::ArrayIterator;
use super::{
nested_utils::{InitNested, NestedArrayIter},
DataPages,
};

pub use self::nested::NestedIter;
pub use basic::Iter;
pub use dictionary::DictIter;

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays_nested<'a, O, A, I>(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: 'a + DataPages,
A: TraitBinaryArray<O>,
O: Offset,
{
Box::new(
ArrayIterator::<O, A, I>::new(iter, init, data_type, chunk_size).map(|x| {
x.map(|(mut nested, array)| {
let _ = nested.nested.pop().unwrap(); // the primitive
let values = Box::new(array) as Box<dyn Array>;
(nested, values)
})
}),
)
}
pub use dictionary::{iter_to_arrays_nested as iter_to_dict_arrays_nested, DictIter};
pub use nested::iter_to_arrays_nested;
29 changes: 26 additions & 3 deletions src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use parquet2::{
schema::Repetition,
};

use crate::array::Array;
use crate::{
array::Offset, bitmap::MutableBitmap, datatypes::DataType, error::Result,
io::parquet::read::DataPages,
Expand Down Expand Up @@ -141,7 +142,7 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
}
}

pub struct ArrayIterator<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
pub struct NestedIter<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
iter: I,
data_type: DataType,
init: Vec<InitNested>,
Expand All @@ -150,7 +151,7 @@ pub struct ArrayIterator<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
phantom_a: std::marker::PhantomData<A>,
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> ArrayIterator<O, A, I> {
impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> NestedIter<O, A, I> {
pub fn new(
iter: I,
init: Vec<InitNested>,
Expand All @@ -168,7 +169,7 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> ArrayIterator<O, A, I> {
}
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for ArrayIterator<O, A, I> {
impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for NestedIter<O, A, I> {
type Item = Result<(NestedState, A)>;

fn next(&mut self) -> Option<Self::Item> {
Expand All @@ -189,3 +190,25 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for ArrayIterator
}
}
}

/// Converts [`DataPages`] to an [`Iterator`] of [`TraitBinaryArray`]
pub fn iter_to_arrays_nested<'a, O, A, I>(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: 'a + DataPages,
A: TraitBinaryArray<O>,
O: Offset,
{
Box::new(
NestedIter::<O, A, I>::new(iter, init, data_type, chunk_size).map(|result| {
let (mut nested, array) = result?;
let _ = nested.nested.pop().unwrap(); // the primitive
let array = Box::new(array) as Box<dyn Array>;
Ok((nested, array))
}),
)
}
25 changes: 1 addition & 24 deletions src/io/parquet/read/deserialize/boolean/mod.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,5 @@
mod basic;
mod nested;

use self::nested::ArrayIterator;
use super::{
nested_utils::{InitNested, NestedArrayIter},
DataPages,
};

pub use self::basic::Iter;

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays_nested<'a, I: 'a>(
iter: I,
init: Vec<InitNested>,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: DataPages,
{
Box::new(ArrayIterator::new(iter, init, chunk_size).map(|x| {
x.map(|(mut nested, array)| {
let _ = nested.nested.pop().unwrap(); // the primitive
let values = array.boxed();
(nested, values)
})
}))
}
pub use nested::iter_to_arrays_nested;
22 changes: 19 additions & 3 deletions src/io/parquet/read/deserialize/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ impl<'a> NestedDecoder<'a> for BooleanDecoder {

/// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays
#[derive(Debug)]
pub struct ArrayIterator<I: DataPages> {
pub struct NestedIter<I: DataPages> {
iter: I,
init: Vec<InitNested>,
items: VecDeque<(NestedState, (MutableBitmap, MutableBitmap))>,
chunk_size: Option<usize>,
}

impl<I: DataPages> ArrayIterator<I> {
impl<I: DataPages> NestedIter<I> {
pub fn new(iter: I, init: Vec<InitNested>, chunk_size: Option<usize>) -> Self {
Self {
iter,
Expand All @@ -123,7 +123,7 @@ fn finish(data_type: &DataType, values: MutableBitmap, validity: MutableBitmap)
BooleanArray::new(data_type.clone(), values.into(), validity.into())
}

impl<I: DataPages> Iterator for ArrayIterator<I> {
impl<I: DataPages> Iterator for NestedIter<I> {
type Item = Result<(NestedState, BooleanArray)>;

fn next(&mut self) -> Option<Self::Item> {
Expand All @@ -144,3 +144,19 @@ impl<I: DataPages> Iterator for ArrayIterator<I> {
}
}
}

/// Converts [`DataPages`] to an [`Iterator`] of [`BooleanArray`]
pub fn iter_to_arrays_nested<'a, I: 'a>(
iter: I,
init: Vec<InitNested>,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: DataPages,
{
Box::new(NestedIter::new(iter, init, chunk_size).map(|result| {
let (mut nested, array) = result?;
let _ = nested.nested.pop().unwrap(); // the primitive
Ok((nested, array.boxed()))
}))
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod nested;

use std::collections::VecDeque;

use parquet2::{
Expand Down Expand Up @@ -292,8 +294,7 @@ pub(super) fn next_dict<
MaybeNext::More
} else {
let (values, validity) = items.pop_front().unwrap();
let keys =
PrimitiveArray::from_data(K::PRIMITIVE.into(), values.into(), validity.into());
let keys = finish_key(values, validity);
MaybeNext::Some(DictionaryArray::try_new(data_type, keys, dict.unwrap()))
}
}
Expand All @@ -304,11 +305,12 @@ pub(super) fn next_dict<
debug_assert!(values.len() <= chunk_size.unwrap_or(usize::MAX));

let keys = finish_key(values, validity);

MaybeNext::Some(DictionaryArray::try_new(data_type, keys, dict.unwrap()))
} else {
MaybeNext::None
}
}
}
}

pub use nested::next_dict as nested_next_dict;
Loading