Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Nested
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jan 30, 2022
1 parent ebe7261 commit d0b78b0
Show file tree
Hide file tree
Showing 11 changed files with 672 additions and 521 deletions.
49 changes: 19 additions & 30 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
error::Result,
};

use super::super::utils::{extend_from_decoder, next, MaybeNext, OptionalPageValidity};
use super::super::utils::{extend_from_decoder, next, BinaryIter, MaybeNext, OptionalPageValidity};
use super::super::DataPages;
use super::{super::utils, utils::Binary};

Expand Down Expand Up @@ -57,33 +57,16 @@ fn read_delta_optional<O: Offset>(
}
*/

struct Optional<'a> {
values: utils::BinaryIter<'a>,
validity: OptionalPageValidity<'a>,
}

impl<'a> Optional<'a> {
fn new(page: &'a DataPage) -> Self {
let (_, validity_buffer, values_buffer, _) = utils::split_buffer(page, page.descriptor());

let values = utils::BinaryIter::new(values_buffer);

Self {
values,
validity: OptionalPageValidity::new(page),
}
}
}

struct Required<'a> {
pub values: utils::BinaryIter<'a>,
#[derive(Debug)]
pub(super) struct Required<'a> {
pub values: BinaryIter<'a>,
pub remaining: usize,
}

impl<'a> Required<'a> {
fn new(page: &'a DataPage) -> Self {
pub fn new(page: &'a DataPage) -> Self {
Self {
values: utils::BinaryIter::new(page.buffer()),
values: BinaryIter::new(page.buffer()),
remaining: page.num_values(),
}
}
Expand Down Expand Up @@ -149,7 +132,7 @@ impl<'a> OptionalDictionary<'a> {
}

enum State<'a> {
Optional(Optional<'a>),
Optional(OptionalPageValidity<'a>, BinaryIter<'a>),
Required(Required<'a>),
RequiredDictionary(RequiredDictionary<'a>),
OptionalDictionary(OptionalDictionary<'a>),
Expand All @@ -158,7 +141,7 @@ enum State<'a> {
impl<'a> utils::PageState<'a> for State<'a> {
fn len(&self) -> usize {
match self {
State::Optional(state) => state.validity.len(),
State::Optional(validity, _) => validity.len(),
State::Required(state) => state.remaining,
State::RequiredDictionary(state) => state.remaining,
State::OptionalDictionary(state) => state.validity.len(),
Expand Down Expand Up @@ -210,7 +193,13 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary<O>> for BinaryDecoder<O>
page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional;

match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::Plain, None, true) => Ok(State::Optional(Optional::new(page))),
(Encoding::Plain, None, true) => {
let (_, _, values, _) = utils::split_buffer(page, page.descriptor());

let values = BinaryIter::new(values);

Ok(State::Optional(OptionalPageValidity::new(page), values))
}
(Encoding::Plain, None, false) => Ok(State::Required(Required::new(page))),
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
Ok(State::RequiredDictionary(RequiredDictionary::new(
Expand Down Expand Up @@ -245,12 +234,12 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary<O>> for BinaryDecoder<O>
additional: usize,
) {
match state {
State::Optional(page) => extend_from_decoder(
State::Optional(page_validity, page_values) => extend_from_decoder(
validity,
&mut page.validity,
page_validity,
Some(additional),
values,
&mut page.values,
page_values,
),
State::Required(page) => {
page.remaining -= additional;
Expand All @@ -275,7 +264,7 @@ impl<'a, O: Offset> utils::Decoder<'a, &'a [u8], Binary<O>> for BinaryDecoder<O>
}
}

fn finish<O: Offset, A: TraitBinaryArray<O>>(
pub(super) fn finish<O: Offset, A: TraitBinaryArray<O>>(
data_type: &DataType,
values: Binary<O>,
validity: MutableBitmap,
Expand Down
27 changes: 25 additions & 2 deletions src/io/parquet/read/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use crate::{
array::{Array, Offset},
datatypes::DataType,
datatypes::{DataType, Field},
error::Result,
};

Expand All @@ -15,7 +15,8 @@ pub use dictionary::iter_to_arrays as iter_to_dict_arrays;

use self::basic::TraitBinaryArray;

use super::DataPages;
use self::nested::ArrayIterator;
use super::{nested_utils::NestedState, DataPages};
use basic::BinaryArrayIterator;

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
Expand All @@ -34,3 +35,25 @@ where
.map(|x| x.map(|x| Arc::new(x) as Arc<dyn Array>)),
)
}

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays_nested<'a, O, A, I>(
iter: I,
field: Field,
data_type: DataType,
chunk_size: usize,
) -> Box<dyn Iterator<Item = Result<(NestedState, Arc<dyn Array>)>> + 'a>
where
I: 'a + DataPages,
A: TraitBinaryArray<O>,
O: Offset,
{
Box::new(
ArrayIterator::<O, A, I>::new(iter, field, data_type, chunk_size).map(|x| {
x.map(|(nested, array)| {
let values = Arc::new(array) as Arc<dyn Array>;
(nested, values)
})
}),
)
}
Loading

0 comments on commit d0b78b0

Please sign in to comment.