Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve deserialize API #192

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 62 additions & 31 deletions examples/read_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,69 @@
use parquet2::bloom_filter;
use parquet2::error::Result;
use parquet2::error::Error;

// ANCHOR: deserialize
use parquet2::encoding::Encoding;
use parquet2::page::{split_buffer, DataPage, DictPage, Page};
use parquet2::deserialize::{FullDecoder, NativeDecoder, NativeValuesDecoder};
use parquet2::page::{DataPage, DictPage, Page};
use parquet2::schema::types::PhysicalType;
use parquet2::types::decode;

fn deserialize(page: &DataPage, dict: Option<&DictPage>) -> Result<()> {
// split the data buffer in repetition levels, definition levels and values
let (_rep_levels, _def_levels, _values_buffer) = split_buffer(page)?;

// decode and deserialize.
match (
page.descriptor.primitive_type.physical_type,
page.encoding(),
dict,
) {
(
PhysicalType::Int32,
Encoding::PlainDictionary | Encoding::RleDictionary,
Some(_dict_page),
) => {
// plain encoded page with a dictionary
// _dict_page can be downcasted based on the descriptor's physical type
todo!()
}
(PhysicalType::Int32, Encoding::Plain, None) => {
// plain encoded page
todo!()
}
enum Dict {
I32(Vec<i32>),
_I64(Vec<i64>),
}

fn deserialize_dict_i32(page: &DictPage) -> Vec<i32> {
page.buffer
.chunks_exact(std::mem::size_of::<i32>())
.map(decode::<i32>)
.collect()
}

fn deserialize_dict(page: &DictPage, physical_type: PhysicalType) -> Result<Dict, Error> {
match physical_type {
PhysicalType::Int32 => Ok(Dict::I32(deserialize_dict_i32(page))),
_ => todo!(),
}
}

/// Deserializes the [`DataPage`] and optional [`Dict`] into `Vec<Option<i32>>` by handling
/// single
fn deserialize(page: &DataPage, dict: Option<&Dict>) -> Result<Vec<Option<i32>>, Error> {
match page.descriptor.primitive_type.physical_type {
PhysicalType::Int32 => {
let dict = dict.map(|dict| {
if let Dict::I32(dict) = dict {
dict
} else {
unreachable!()
}
});
let values = NativeValuesDecoder::<i32, Vec<i32>>::try_new(page, dict)?;
let decoder = FullDecoder::try_new(page, values)?;
let decoder = NativeDecoder::try_new(page, decoder)?;
// decoder is an enum comprising the different cases:
match decoder {
NativeDecoder::Full(values) => match values {
FullDecoder::Optional(_, _) => todo!("optional pages"),
FullDecoder::Required(values) => match values {
NativeValuesDecoder::Plain(values) => Ok(values.map(Some).collect()),
NativeValuesDecoder::Dictionary(dict) => dict
.indexes
.map(|x| x.map(|x| Some(dict.dict[x as usize])))
.collect(),
},
FullDecoder::Levels(_, _, _) => todo!("nested pages"),
},
NativeDecoder::Filtered(_) => todo!("Filtered page"),
}
}
PhysicalType::Int64 => todo!("int64"),
_ => todo!("Other physical types"),
}
}
// ANCHOR_END: deserialize

fn main() -> Result<()> {
fn main() -> Result<(), Error> {
// ANCHOR: metadata
use std::env;
let args: Vec<String> = env::args().collect();
Expand Down Expand Up @@ -119,6 +148,11 @@ fn main() -> Result<()> {
// ANCHOR: pages
use parquet2::read::get_page_iterator;
let pages = get_page_iterator(column_metadata, &mut reader, None, vec![], 1024 * 1024)?;
let type_ = column_metadata
.descriptor()
.descriptor
.primitive_type
.physical_type;
// ANCHOR_END: pages

// ANCHOR: decompress
Expand All @@ -131,10 +165,7 @@ fn main() -> Result<()> {
match page {
Page::Dict(page) => {
// the first page may be a dictionary page, which needs to be deserialized
// depending on your target in-memory format, you may want to deserialize
// the values differently...
// let page = deserialize_dict(&page)?;
dict = Some(page);
dict = Some(deserialize_dict(&page, type_)?);
}
Page::Data(page) => {
let _array = deserialize(&page, dict.as_ref())?;
Expand Down
219 changes: 184 additions & 35 deletions src/deserialize/binary.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
use std::collections::VecDeque;

use crate::{
encoding::{hybrid_rle, plain_byte_array::BinaryIter},
encoding::{delta_length_byte_array, hybrid_rle, plain_byte_array::BinaryIter},
error::Error,
indexes::Interval,
page::{split_buffer, DataPage},
parquet_bridge::{Encoding, Repetition},
};

use super::utils;
use super::{
utils::{dict_indices_decoder, get_selected_rows},
values::Decoder,
};
use super::{values::ValuesDecoder, SliceFilteredIter};

#[derive(Debug)]
pub struct Dictionary<'a, P> {
pub indexes: hybrid_rle::HybridRleDecoder<'a>,
pub dict: P,
pub dict: &'a P,
}

impl<'a, P> Dictionary<'a, P> {
pub fn try_new(page: &'a DataPage, dict: P) -> Result<Self, Error> {
let indexes = utils::dict_indices_decoder(page)?;
pub fn try_new(page: &'a DataPage, dict: &'a P) -> Result<Self, Error> {
let indexes = dict_indices_decoder(page)?;

Ok(Self { indexes, dict })
}
Expand All @@ -26,47 +33,189 @@ impl<'a, P> Dictionary<'a, P> {
}
}

#[allow(clippy::large_enum_variant)]
pub enum BinaryPageState<'a, P> {
Optional(utils::DefLevelsDecoder<'a>, BinaryIter<'a>),
Required(BinaryIter<'a>),
RequiredDictionary(Dictionary<'a, P>),
OptionalDictionary(utils::DefLevelsDecoder<'a>, Dictionary<'a, P>),
#[derive(Debug)]
pub struct Delta<'a> {
pub lengths: std::vec::IntoIter<usize>,
pub values: &'a [u8],
}

impl<'a> Delta<'a> {
pub fn try_new(page: &'a DataPage) -> Result<Self, Error> {
let (_, _, values) = split_buffer(page)?;

let mut lengths_iter = delta_length_byte_array::Decoder::try_new(values)?;

#[allow(clippy::needless_collect)] // we need to consume it to get the values
let lengths = lengths_iter
.by_ref()
.map(|x| x.map(|x| x as usize))
.collect::<Result<Vec<_>, _>>()?;

let values = lengths_iter.into_values();
Ok(Self {
lengths: lengths.into_iter(),
values,
})
}

pub fn len(&self) -> usize {
self.lengths.size_hint().0
}

#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}

impl<'a> Iterator for Delta<'a> {
type Item = Result<&'a [u8], Error>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
let length = self.lengths.next()?;
if length > self.values.len() {
return Some(Err(Error::oos(
"Delta contains a length larger than the values",
)));
}
let (item, remaining) = self.values.split_at(length);
self.values = remaining;
Some(Ok(item))
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.lengths.size_hint()
}
}

#[derive(Debug)]
pub struct FilteredDelta<'a> {
pub values: SliceFilteredIter<Delta<'a>>,
}

impl<'a> FilteredDelta<'a> {
pub fn try_new(page: &'a DataPage) -> Result<Self, Error> {
let values = Delta::try_new(page)?;

let rows = get_selected_rows(page);
let values = SliceFilteredIter::new(values, rows);

Ok(Self { values })
}

/// Returns the length of this [`FilteredDelta`].
pub fn len(&self) -> usize {
self.values.size_hint().0
}

#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}

#[derive(Debug)]
pub struct FilteredDictionary<'a, P> {
pub indexes: SliceFilteredIter<hybrid_rle::HybridRleDecoder<'a>>,
pub dict: &'a P,
}

impl<'a, P> BinaryPageState<'a, P> {
pub fn try_new(page: &'a DataPage, dict: Option<P>) -> Result<Self, Error> {
impl<'a, P> FilteredDictionary<'a, P> {
pub fn try_new(page: &'a DataPage, dict: &'a P) -> Result<Self, Error> {
let indexes = dict_indices_decoder(page)?;

let rows = get_selected_rows(page);
let indexes = SliceFilteredIter::new(indexes, rows);

Ok(Self { indexes, dict })
}

#[inline]
pub fn len(&self) -> usize {
self.indexes.size_hint().0
}

#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}

#[derive(Debug)]
pub enum BinaryValuesDecoder<'a, P> {
Plain(BinaryIter<'a>),
Dictionary(Dictionary<'a, P>),
Delta(Delta<'a>),
}

impl<'a, P> BinaryValuesDecoder<'a, P> {
pub fn try_new(page: &'a DataPage, dict: Option<&'a P>) -> Result<Self, Error> {
let is_optional =
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;
let length = (!is_optional).then(|| page.num_values());

match (page.encoding(), dict, is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
Dictionary::try_new(page, dict).map(Self::RequiredDictionary)
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
Ok(Self::OptionalDictionary(
utils::DefLevelsDecoder::try_new(page)?,
Dictionary::try_new(page, dict)?,
))
match (page.encoding(), dict) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict)) => {
Dictionary::try_new(page, dict).map(Self::Dictionary)
}
(Encoding::Plain, _, true) => {
(Encoding::Plain, _) => {
let (_, _, values) = split_buffer(page)?;
Ok(Self::Plain(BinaryIter::new(values, length)))
}
(Encoding::DeltaLengthByteArray, _) => Delta::try_new(page).map(Self::Delta),
(other, _) => Err(Error::OutOfSpec(format!(
"Binary-encoded non-nested pages cannot be encoded as {other:?}"
))),
}
}

let validity = utils::DefLevelsDecoder::try_new(page)?;
let values = BinaryIter::new(values, None);
#[must_use]
pub fn len(&self) -> usize {
match self {
Self::Plain(validity) => validity.size_hint().0,
Self::Dictionary(state) => state.len(),
Self::Delta(state) => state.len(),
}
}

Ok(Self::Optional(validity, values))
}
(Encoding::Plain, _, false) => {
let (_, _, values) = split_buffer(page)?;
let values = BinaryIter::new(values, Some(page.num_values()));
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}

Ok(Self::Required(values))
impl<'a, P> ValuesDecoder for BinaryValuesDecoder<'a, P> {
fn len(&self) -> usize {
self.len()
}
}

#[derive(Debug)]
pub enum BinaryFilteredValuesDecoder<'a, P> {
Plain(SliceFilteredIter<BinaryIter<'a>>),
Dictionary(FilteredDictionary<'a, P>),
Delta(SliceFilteredIter<Delta<'a>>),
}

impl<'a, P> From<(BinaryValuesDecoder<'a, P>, VecDeque<Interval>)>
for BinaryFilteredValuesDecoder<'a, P>
{
fn from((decoder, intervals): (BinaryValuesDecoder<'a, P>, VecDeque<Interval>)) -> Self {
match decoder {
BinaryValuesDecoder::Plain(values) => {
Self::Plain(SliceFilteredIter::new(values, intervals))
}
BinaryValuesDecoder::Dictionary(values) => Self::Dictionary(FilteredDictionary {
indexes: SliceFilteredIter::new(values.indexes, intervals),
dict: values.dict,
}),
BinaryValuesDecoder::Delta(values) => {
Self::Delta(SliceFilteredIter::new(values, intervals))
}
_ => Err(Error::FeatureNotSupported(format!(
"Viewing page for encoding {:?} for binary type",
page.encoding(),
))),
}
}
}

pub type BinaryDecoder<'a, P> =
Decoder<'a, BinaryValuesDecoder<'a, P>, BinaryFilteredValuesDecoder<'a, P>>;
Loading