Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simpler
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jul 22, 2022
1 parent d6f3966 commit 0e08174
Show file tree
Hide file tree
Showing 17 changed files with 357 additions and 150 deletions.
10 changes: 5 additions & 5 deletions examples/parquet_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@ use arrow2::{
},
};

fn write_batch(path: &str, schema: Schema, columns: Chunk<Box<dyn Array>>) -> Result<()> {
fn write_chunk(path: &str, schema: Schema, chunk: Chunk<Box<dyn Array>>) -> Result<()> {
let options = WriteOptions {
write_statistics: true,
compression: CompressionOptions::Uncompressed,
version: Version::V2,
};

let iter = vec![Ok(columns)];
let iter = vec![Ok(chunk)];

let encodings = schema
.fields
.iter()
.map(|f| transverse(&f.data_type, |_| Encoding::Plain))
.map(|f| transverse(&f.data_type, |_| Encoding::RleDictionary))
.collect();

let row_groups = RowGroupIterator::try_new(iter.into_iter(), &schema, options, encodings)?;
Expand Down Expand Up @@ -52,7 +52,7 @@ fn main() -> Result<()> {
]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::from(vec![field]);
let columns = Chunk::new(vec![array.boxed()]);
let chunk = Chunk::new(vec![array.boxed()]);

write_batch("test.parquet", schema, columns)
write_chunk("test.parquet", schema, chunk)
}
2 changes: 1 addition & 1 deletion src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ fn check<T: NativeType>(

if data_type.to_physical_type() != PhysicalType::Primitive(T::PRIMITIVE) {
return Err(Error::oos(
"BooleanArray can only be initialized with a DataType whose physical type is Primitive",
"PrimitiveArray can only be initialized with a DataType whose physical type is Primitive",
));
}
Ok(())
Expand Down
89 changes: 89 additions & 0 deletions src/io/parquet/read/deserialize/binary/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
bitmap::MutableBitmap,
datatypes::{DataType, PhysicalType},
error::Result,
io::parquet::read::deserialize::nested_utils::{InitNested, NestedArrayIter, NestedState},
};

use super::super::dictionary::*;
Expand Down Expand Up @@ -104,3 +105,91 @@ where
}
}
}

#[derive(Debug)]
pub struct NestedDictIter<K, O, I>
where
I: DataPages,
O: Offset,
K: DictionaryKey,
{
iter: I,
init: Vec<InitNested>,
data_type: DataType,
values: Dict,
items: VecDeque<(NestedState, (Vec<K>, MutableBitmap))>,
chunk_size: Option<usize>,
phantom: std::marker::PhantomData<O>,
}

impl<K, O, I> NestedDictIter<K, O, I>
where
I: DataPages,
O: Offset,
K: DictionaryKey,
{
pub fn new(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: Option<usize>,
) -> Self {
Self {
iter,
init,
data_type,
values: Dict::Empty,
items: VecDeque::new(),
chunk_size,
phantom: Default::default(),
}
}
}

impl<K, O, I> Iterator for NestedDictIter<K, O, I>
where
I: DataPages,
O: Offset,
K: DictionaryKey,
{
type Item = Result<(NestedState, DictionaryArray<K>)>;

fn next(&mut self) -> Option<Self::Item> {
let maybe_state = nested_next_dict(
&mut self.iter,
&mut self.items,
&self.init,
&mut self.values,
self.data_type.clone(),
self.chunk_size,
|dict| read_dict::<O>(self.data_type.clone(), dict),
);
match maybe_state {
MaybeNext::Some(Ok(dict)) => Some(Ok(dict)),
MaybeNext::Some(Err(e)) => Some(Err(e)),
MaybeNext::None => None,
MaybeNext::More => self.next(),
}
}
}

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays_nested<'a, K, O, I>(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: 'a + DataPages,
O: Offset,
K: DictionaryKey,
{
Box::new(
NestedDictIter::<K, O, I>::new(iter, init, data_type, chunk_size).map(|result| {
let (mut nested, array) = result?;
let _ = nested.nested.pop().unwrap(); // the primitive
Ok((nested, array.boxed()))
}),
)
}
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ mod utils;

pub use self::nested::NestedIter;
pub use basic::Iter;
pub use dictionary::DictIter;
pub use dictionary::{iter_to_arrays_nested as iter_to_dict_arrays_nested, DictIter};
pub use nested::iter_to_arrays_nested;
83 changes: 83 additions & 0 deletions src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
bitmap::MutableBitmap,
datatypes::DataType,
error::Result,
io::parquet::read::deserialize::nested_utils::{InitNested, NestedArrayIter, NestedState},
};

use super::super::dictionary::*;
Expand Down Expand Up @@ -87,3 +88,85 @@ where
}
}
}

#[derive(Debug)]
pub struct NestedDictIter<K, I>
where
I: DataPages,
K: DictionaryKey,
{
iter: I,
init: Vec<InitNested>,
data_type: DataType,
values: Dict,
items: VecDeque<(NestedState, (Vec<K>, MutableBitmap))>,
chunk_size: Option<usize>,
}

impl<K, I> NestedDictIter<K, I>
where
I: DataPages,
K: DictionaryKey,
{
pub fn new(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: Option<usize>,
) -> Self {
Self {
iter,
init,
data_type,
values: Dict::Empty,
items: VecDeque::new(),
chunk_size,
}
}
}

impl<K, I> Iterator for NestedDictIter<K, I>
where
I: DataPages,
K: DictionaryKey,
{
type Item = Result<(NestedState, DictionaryArray<K>)>;

fn next(&mut self) -> Option<Self::Item> {
let maybe_state = nested_next_dict(
&mut self.iter,
&mut self.items,
&self.init,
&mut self.values,
self.data_type.clone(),
self.chunk_size,
|dict| read_dict(self.data_type.clone(), dict),
);
match maybe_state {
MaybeNext::Some(Ok(dict)) => Some(Ok(dict)),
MaybeNext::Some(Err(e)) => Some(Err(e)),
MaybeNext::None => None,
MaybeNext::More => self.next(),
}
}
}

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays_nested<'a, K, I>(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: Option<usize>,
) -> NestedArrayIter<'a>
where
I: 'a + DataPages,
K: DictionaryKey,
{
Box::new(
NestedDictIter::<K, I>::new(iter, init, data_type, chunk_size).map(|result| {
let (mut nested, array) = result?;
let _ = nested.nested.pop().unwrap(); // the primitive
Ok((nested, array.boxed()))
}),
)
}
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/fixed_size_binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ mod dictionary;
mod utils;

pub use basic::Iter;
pub use dictionary::DictIter;
pub use dictionary::{iter_to_arrays_nested as iter_to_dict_arrays_nested, DictIter};
98 changes: 49 additions & 49 deletions src/io/parquet/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
array::{
Array, BinaryArray, DictionaryKey, FixedSizeListArray, ListArray, MapArray, Utf8Array,
},
datatypes::{DataType, Field},
datatypes::{DataType, Field, IntervalUnit},
error::{Error, Result},
};

Expand Down Expand Up @@ -289,9 +289,9 @@ where
chunk_size,
)
}

_ => match field.data_type().to_logical_type() {
DataType::Dictionary(key_type, _, _) => {
init.push(InitNested::Primitive(field.is_nullable));
let type_ = types.pop().unwrap();
let iter = columns.pop().unwrap();
let data_type = field.data_type().clone();
Expand Down Expand Up @@ -434,50 +434,76 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>(
chunk_size,
|x: i32| x as u8,
),
Float32 => primitive::iter_to_dict_arrays_nested::<K, _, _, _, _>(
UInt16 => primitive::iter_to_dict_arrays_nested::<K, _, _, _, _>(
iter,
init,
data_type,
chunk_size,
|x: f32| x,
|x: i32| x as u16,
),
Float64 => primitive::iter_to_dict_arrays_nested::<K, _, _, _, _>(
UInt32 => primitive::iter_to_dict_arrays_nested::<K, _, _, _, _>(
iter,
init,
data_type,
chunk_size,
|x: f64| x,
|x: i32| x as u32,
),
/*
UInt16 => dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
Int8 => primitive::iter_to_dict_arrays_nested::<K, _, _, _, _>(
iter,
init,
data_type,
chunk_size,
|x: i32| x as u16,
)),
UInt32 => dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
|x: i32| x as i8,
),
Int16 => primitive::iter_to_dict_arrays_nested::<K, _, _, _, _>(
iter,
init,
data_type,
chunk_size,
|x: i32| x as u32,
)),
Int8 => dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
|x: i32| x as i16,
),
Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => {
primitive::iter_to_dict_arrays_nested::<K, _, _, _, _>(
iter,
init,
data_type,
chunk_size,
|x: i32| x,
)
}
Int64 | Date64 | Time64(_) | Duration(_) => {
primitive::iter_to_dict_arrays_nested::<K, _, _, _, _>(
iter,
init,
data_type,
chunk_size,
|x: i64| x as i32,
)
}
Float32 => primitive::iter_to_dict_arrays_nested::<K, _, _, _, _>(
iter,
init,
data_type,
chunk_size,
|x: i32| x as i8,
)),
Int16 => dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
|x: f32| x,
),
Float64 => primitive::iter_to_dict_arrays_nested::<K, _, _, _, _>(
iter,
init,
data_type,
chunk_size,
|x: i32| x as i16,
)),
Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter(
primitive::DictIter::<K, _, _, _, _>::new(iter, data_type, chunk_size, |x: i32| {
x as i32
}),
|x: f64| x,
),
Utf8 | Binary => {
binary::iter_to_dict_arrays_nested::<K, i32, _>(iter, init, data_type, chunk_size)
}
LargeUtf8 | LargeBinary => {
binary::iter_to_dict_arrays_nested::<K, i64, _>(iter, init, data_type, chunk_size)
}
FixedSizeBinary(_) => {
fixed_size_binary::iter_to_dict_arrays_nested::<K, _>(iter, init, data_type, chunk_size)
}
/*
Timestamp(time_unit, _) => {
let time_unit = *time_unit;
Expand All @@ -490,32 +516,6 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>(
time_unit,
);
}
Int64 | Date64 | Time64(_) | Duration(_) => dyn_iter(
primitive::DictIter::<K, _, _, _, _>::new(iter, data_type, chunk_size, |x: i64| x),
),
Float32 => dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
iter,
data_type,
chunk_size,
|x: f32| x,
)),
Float64 => dyn_iter(primitive::DictIter::<K, _, _, _, _>::new(
iter,
data_type,
chunk_size,
|x: f64| x,
)),
Utf8 | Binary => dyn_iter(binary::DictIter::<K, i32, _>::new(
iter, data_type, chunk_size,
)),
LargeUtf8 | LargeBinary => dyn_iter(binary::DictIter::<K, i64, _>::new(
iter, data_type, chunk_size,
)),
FixedSizeBinary(_) => dyn_iter(fixed_size_binary::DictIter::<K, _>::new(
iter, data_type, chunk_size,
)),
*/
other => {
return Err(Error::nyi(format!(
Expand Down
Loading

0 comments on commit 0e08174

Please sign in to comment.