Skip to content

Commit

Permalink
Generalized
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 26, 2022
1 parent 03a4fd8 commit d8350ea
Show file tree
Hide file tree
Showing 11 changed files with 296 additions and 527 deletions.
22 changes: 12 additions & 10 deletions examples/read_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use parquet2::bloom_filter;
use parquet2::error::Error;

// ANCHOR: deserialize
use parquet2::deserialize::{NativePage, NativePageValues, NominalNativePage};
use parquet2::deserialize::{FullDecoder, NativeDecoder, NativeValuesDecoder};
use parquet2::page::{DataPage, DictPage, Page};
use parquet2::schema::types::PhysicalType;
use parquet2::types::decode;
Expand Down Expand Up @@ -38,21 +38,23 @@ fn deserialize(page: &DataPage, dict: Option<&Dict>) -> Result<Vec<Option<i32>>,
unreachable!()
}
});
let page = NativePage::<i32, Vec<i32>>::try_new(page, dict)?;
let values = NativeValuesDecoder::<i32, Vec<i32>>::try_new(page, dict)?;
let decoder = FullDecoder::try_new(page, values)?;
let decoder = NativeDecoder::try_new(page, decoder)?;
// page is an enum comprising of the different possible encodings:
match page {
NativePage::Nominal(values) => match values {
NominalNativePage::Optional(_, _) => todo!("optional pages"),
NominalNativePage::Required(values) => match values {
NativePageValues::Plain(values) => Ok(values.map(Some).collect()),
NativePageValues::Dictionary(dict) => dict
match decoder {
NativeDecoder::Full(values) => match values {
FullDecoder::Optional(_, _) => todo!("optional pages"),
FullDecoder::Required(values) => match values {
NativeValuesDecoder::Plain(values) => Ok(values.map(Some).collect()),
NativeValuesDecoder::Dictionary(dict) => dict
.indexes
.map(|x| x.map(|x| Some(dict.dict[x as usize])))
.collect(),
},
NominalNativePage::Levels(_, _, _) => todo!("nested pages"),
FullDecoder::Levels(_, _, _) => todo!("nested pages"),
},
NativePage::Filtered(_) => todo!("Filtered page"),
NativeDecoder::Filtered(_) => todo!("Filtered page"),
}
}
PhysicalType::Int64 => todo!("int64"),
Expand Down
134 changes: 20 additions & 114 deletions src/deserialize/binary.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,18 @@
use std::collections::VecDeque;

use crate::{
encoding::{
delta_length_byte_array,
hybrid_rle::{self, HybridRleDecoder},
plain_byte_array::BinaryIter,
},
encoding::{delta_length_byte_array, hybrid_rle, plain_byte_array::BinaryIter},
error::Error,
indexes::Interval,
page::{split_buffer, DataPage},
parquet_bridge::{Encoding, Repetition},
read::levels::get_bit_width,
};

use super::SliceFilteredIter;
use super::{
utils::{
dict_indices_decoder, get_selected_rows, FilteredOptionalPageValidity, OptionalPageValidity,
},
FilteredHybridRleDecoderIter,
utils::{dict_indices_decoder, get_selected_rows},
values::Decoder,
};
use super::{values::ValuesDecoder, SliceFilteredIter};

#[derive(Debug)]
pub struct Dictionary<'a, P> {
Expand Down Expand Up @@ -150,13 +143,13 @@ impl<'a, P> FilteredDictionary<'a, P> {
}

#[derive(Debug)]
pub enum BinaryPageValues<'a, P> {
pub enum BinaryValuesDecoder<'a, P> {
Plain(BinaryIter<'a>),
Dictionary(Dictionary<'a, P>),
Delta(Delta<'a>),
}

impl<'a, P> BinaryPageValues<'a, P> {
impl<'a, P> BinaryValuesDecoder<'a, P> {
pub fn try_new(page: &'a DataPage, dict: Option<&'a P>) -> Result<Self, Error> {
let is_optional =
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;
Expand Down Expand Up @@ -192,124 +185,37 @@ impl<'a, P> BinaryPageValues<'a, P> {
}
}

#[derive(Debug)]
pub enum NominalBinaryPage<'a, P> {
Optional(OptionalPageValidity<'a>, BinaryPageValues<'a, P>),
Required(BinaryPageValues<'a, P>),
Levels(HybridRleDecoder<'a>, u32, BinaryPageValues<'a, P>),
}

impl<'a, P> NominalBinaryPage<'a, P> {
pub fn try_new(page: &'a DataPage, dict: Option<&'a P>) -> Result<Self, Error> {
let values = BinaryPageValues::try_new(page, dict)?;

if page.descriptor.max_def_level > 1 {
let (_, def_levels, _) = split_buffer(page)?;
let max = page.descriptor.max_def_level as u32;
let validity = HybridRleDecoder::try_new(
def_levels,
get_bit_width(max as i16),
page.num_values(),
)?;
return Ok(Self::Levels(validity, max, values));
}

let is_optional =
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;
Ok(if is_optional {
Self::Optional(OptionalPageValidity::try_new(page)?, values)
} else {
Self::Required(values)
})
}

pub fn len(&self) -> usize {
match self {
Self::Optional(validity, _) => validity.len(),
Self::Required(state) => state.len(),
Self::Levels(state, _, _) => state.len(),
}
}

#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
impl<'a, P> ValuesDecoder for BinaryValuesDecoder<'a, P> {
fn len(&self) -> usize {
self.len()
}
}

#[derive(Debug)]
pub enum FilteredBinaryPageValues<'a, P> {
pub enum FilteredBinaryValuesDecoder<'a, P> {
Plain(SliceFilteredIter<BinaryIter<'a>>),
Dictionary(FilteredDictionary<'a, P>),
Delta(SliceFilteredIter<Delta<'a>>),
}

impl<'a, P> FilteredBinaryPageValues<'a, P> {
pub fn new(page: BinaryPageValues<'a, P>, intervals: VecDeque<Interval>) -> Self {
match page {
BinaryPageValues::Plain(values) => {
impl<'a, P> From<(BinaryValuesDecoder<'a, P>, VecDeque<Interval>)>
for FilteredBinaryValuesDecoder<'a, P>
{
fn from((decoder, intervals): (BinaryValuesDecoder<'a, P>, VecDeque<Interval>)) -> Self {
match decoder {
BinaryValuesDecoder::Plain(values) => {
Self::Plain(SliceFilteredIter::new(values, intervals))
}
BinaryPageValues::Dictionary(values) => Self::Dictionary(FilteredDictionary {
BinaryValuesDecoder::Dictionary(values) => Self::Dictionary(FilteredDictionary {
indexes: SliceFilteredIter::new(values.indexes, intervals),
dict: values.dict,
}),
BinaryPageValues::Delta(values) => {
BinaryValuesDecoder::Delta(values) => {
Self::Delta(SliceFilteredIter::new(values, intervals))
}
}
}
}

#[derive(Debug)]
pub enum FilteredBinaryPage<'a, P> {
Optional(FilteredOptionalPageValidity<'a>, BinaryPageValues<'a, P>),
Required(FilteredBinaryPageValues<'a, P>),
// todo: levels
}

impl<'a, P> FilteredBinaryPage<'a, P> {
pub fn try_new(
page: NominalBinaryPage<'a, P>,
intervals: VecDeque<Interval>,
) -> Result<Self, Error> {
Ok(match page {
NominalBinaryPage::Optional(iter, values) => Self::Optional(
FilteredOptionalPageValidity::new(FilteredHybridRleDecoderIter::new(
iter.iter, intervals,
)),
values,
),
NominalBinaryPage::Required(values) => {
Self::Required(FilteredBinaryPageValues::new(values, intervals))
}
NominalBinaryPage::Levels(_, _, _) => {
return Err(Error::FeatureNotSupported("Filtered levels".to_string()))
}
})
}
}

/// The deserialization state of a [`DataPage`] of a parquet binary type
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum BinaryPage<'a, P> {
Nominal(NominalBinaryPage<'a, P>),
Filtered(FilteredBinaryPage<'a, P>),
}

impl<'a, P> BinaryPage<'a, P> {
/// Tries to create [`BinaryPage`]
/// # Error
/// Errors iff the page is not a `BinaryPage`
pub fn try_new(page: &'a DataPage, dict: Option<&'a P>) -> Result<Self, Error> {
let native_page = NominalBinaryPage::try_new(page, dict)?;

if let Some(selected_rows) = page.selected_rows() {
FilteredBinaryPage::try_new(native_page, selected_rows.iter().copied().collect())
.map(Self::Filtered)
} else {
Ok(Self::Nominal(native_page))
}
}
}
pub type BinaryDecoder<'a, P> =
Decoder<'a, BinaryValuesDecoder<'a, P>, FilteredBinaryValuesDecoder<'a, P>>;
Loading

0 comments on commit d8350ea

Please sign in to comment.