Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Bumping parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jun 25, 2022
1 parent 88f05bb commit c47041d
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 155 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ futures = { version = "0.3", optional = true }
ahash = { version = "0.7", optional = true }

# parquet support
parquet2 = { version = "0.13.1", optional = true, default_features = false }
#parquet2 = { version = "0.13.1", optional = true, default_features = false }
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "fix_panic", optional = true, default_features = false }

# avro support
avro-schema = { version = "0.2", optional = true }
Expand Down
56 changes: 29 additions & 27 deletions src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::default::Default;
use parquet2::{
deserialize::SliceFilteredIter,
encoding::{hybrid_rle, Encoding},
page::{BinaryPageDict, DataPage},
page::{split_buffer, BinaryPageDict, DataPage},
schema::Repetition,
};

Expand Down Expand Up @@ -67,11 +67,11 @@ pub(super) struct Required<'a> {
}

impl<'a> Required<'a> {
pub fn new(page: &'a DataPage) -> Self {
let (_, _, values) = utils::split_buffer(page);
pub fn try_new(page: &'a DataPage) -> Result<Self> {
let (_, _, values) = split_buffer(page)?;
let values = SizedBinaryIter::new(values, page.num_values());

Self { values }
Ok(Self { values })
}

pub fn len(&self) -> usize {
Expand Down Expand Up @@ -106,10 +106,10 @@ pub(super) struct RequiredDictionary<'a> {
}

impl<'a> RequiredDictionary<'a> {
pub fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self {
let values = utils::dict_indices_decoder(page);
pub fn try_new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Result<Self> {
let values = utils::dict_indices_decoder(page)?;

Self { dict, values }
Ok(Self { dict, values })
}

#[inline]
Expand All @@ -125,13 +125,13 @@ pub(super) struct FilteredRequiredDictionary<'a> {
}

impl<'a> FilteredRequiredDictionary<'a> {
pub fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self {
let values = utils::dict_indices_decoder(page);
pub fn try_new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Result<Self> {
let values = utils::dict_indices_decoder(page)?;

let rows = get_selected_rows(page);
let values = SliceFilteredIter::new(values, rows);

Self { values, dict }
Ok(Self { values, dict })
}

#[inline]
Expand All @@ -147,10 +147,10 @@ pub(super) struct ValuesDictionary<'a> {
}

impl<'a> ValuesDictionary<'a> {
pub fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self {
let values = utils::dict_indices_decoder(page);
pub fn try_new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Result<Self> {
let values = utils::dict_indices_decoder(page)?;

Self { dict, values }
Ok(Self { dict, values })
}

#[inline]
Expand Down Expand Up @@ -246,50 +246,52 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
is_filtered,
) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => {
Ok(State::RequiredDictionary(RequiredDictionary::new(
Ok(State::RequiredDictionary(RequiredDictionary::try_new(
page,
dict.as_any().downcast_ref().unwrap(),
)))
)?))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => {
let dict = dict.as_any().downcast_ref().unwrap();

Ok(State::OptionalDictionary(
OptionalPageValidity::new(page),
ValuesDictionary::new(page, dict),
OptionalPageValidity::try_new(page)?,
ValuesDictionary::try_new(page, dict)?,
))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, true) => {
let dict = dict.as_any().downcast_ref().unwrap();

Ok(State::FilteredRequiredDictionary(
FilteredRequiredDictionary::new(page, dict),
))
FilteredRequiredDictionary::try_new(page, dict)
.map(State::FilteredRequiredDictionary)
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, true) => {
let dict = dict.as_any().downcast_ref().unwrap();

Ok(State::FilteredOptionalDictionary(
FilteredOptionalPageValidity::new(page),
ValuesDictionary::new(page, dict),
FilteredOptionalPageValidity::try_new(page)?,
ValuesDictionary::try_new(page, dict)?,
))
}
(Encoding::Plain, _, true, false) => {
let (_, _, values) = utils::split_buffer(page);
let (_, _, values) = split_buffer(page)?;

let values = BinaryIter::new(values);

Ok(State::Optional(OptionalPageValidity::new(page), values))
Ok(State::Optional(
OptionalPageValidity::try_new(page)?,
values,
))
}
(Encoding::Plain, _, false, false) => Ok(State::Required(Required::new(page))),
(Encoding::Plain, _, false, false) => Ok(State::Required(Required::try_new(page)?)),
(Encoding::Plain, _, false, true) => {
Ok(State::FilteredRequired(FilteredRequired::new(page)))
}
(Encoding::Plain, _, true, true) => {
let (_, _, values) = utils::split_buffer(page);
let (_, _, values) = split_buffer(page)?;

Ok(State::FilteredOptional(
FilteredOptionalPageValidity::new(page),
FilteredOptionalPageValidity::try_new(page)?,
BinaryIter::new(values),
))
}
Expand Down
20 changes: 13 additions & 7 deletions src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::collections::VecDeque;

use parquet2::{encoding::Encoding, page::DataPage, schema::Repetition};
use parquet2::{
encoding::Encoding,
page::{split_buffer, DataPage},
schema::Repetition,
};

use crate::{
array::Offset, bitmap::MutableBitmap, datatypes::DataType, error::Result,
Expand Down Expand Up @@ -58,23 +62,25 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(State::RequiredDictionary(ValuesDictionary::new(page, dict)))
Ok(State::RequiredDictionary(ValuesDictionary::try_new(
page, dict,
)?))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(State::OptionalDictionary(
Optional::new(page),
ValuesDictionary::new(page, dict),
Optional::try_new(page)?,
ValuesDictionary::try_new(page, dict)?,
))
}
(Encoding::Plain, _, true, false) => {
let (_, _, values) = utils::split_buffer(page);
let (_, _, values) = split_buffer(page)?;

let values = BinaryIter::new(values);

Ok(State::Optional(Optional::new(page), values))
Ok(State::Optional(Optional::try_new(page)?, values))
}
(Encoding::Plain, _, false, false) => Ok(State::Required(Required::new(page))),
(Encoding::Plain, _, false, false) => Ok(State::Required(Required::try_new(page)?)),
_ => Err(utils::not_implemented(page)),
}
}
Expand Down
29 changes: 16 additions & 13 deletions src/io/parquet/read/deserialize/boolean/basic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::collections::VecDeque;

use parquet2::{
deserialize::SliceFilteredIter, encoding::Encoding, page::DataPage, schema::Repetition,
deserialize::SliceFilteredIter,
encoding::Encoding,
page::{split_buffer, DataPage},
schema::Repetition,
};

use crate::{
Expand All @@ -13,7 +16,7 @@ use crate::{

use super::super::utils;
use super::super::utils::{
extend_from_decoder, get_selected_rows, next, split_buffer, DecodedState, Decoder,
extend_from_decoder, get_selected_rows, next, DecodedState, Decoder,
FilteredOptionalPageValidity, MaybeNext, OptionalPageValidity,
};
use super::super::DataPages;
Expand All @@ -22,10 +25,10 @@ use super::super::DataPages;
struct Values<'a>(BitmapIter<'a>);

impl<'a> Values<'a> {
pub fn new(page: &'a DataPage) -> Self {
let (_, _, values) = split_buffer(page);
pub fn try_new(page: &'a DataPage) -> Result<Self> {
let (_, _, values) = split_buffer(page)?;

Self(BitmapIter::new(values, 0, values.len() * 8))
Ok(Self(BitmapIter::new(values, 0, values.len() * 8)))
}
}

Expand Down Expand Up @@ -54,15 +57,15 @@ struct FilteredRequired<'a> {
}

impl<'a> FilteredRequired<'a> {
pub fn new(page: &'a DataPage) -> Self {
let (_, _, values) = utils::split_buffer(page);
pub fn try_new(page: &'a DataPage) -> Result<Self> {
let (_, _, values) = split_buffer(page)?;
// todo: replace this by an iterator over slices, for faster deserialization
let values = BitmapIter::new(values, 0, page.num_values());

let rows = get_selected_rows(page);
let values = SliceFilteredIter::new(values, rows);

Self { values }
Ok(Self { values })
}

#[inline]
Expand Down Expand Up @@ -117,16 +120,16 @@ impl<'a> Decoder<'a> for BooleanDecoder {

match (page.encoding(), is_optional, is_filtered) {
(Encoding::Plain, true, false) => Ok(State::Optional(
OptionalPageValidity::new(page),
Values::new(page),
OptionalPageValidity::try_new(page)?,
Values::try_new(page)?,
)),
(Encoding::Plain, false, false) => Ok(State::Required(Required::new(page))),
(Encoding::Plain, true, true) => Ok(State::FilteredOptional(
FilteredOptionalPageValidity::new(page),
Values::new(page),
FilteredOptionalPageValidity::try_new(page)?,
Values::try_new(page)?,
)),
(Encoding::Plain, false, true) => {
Ok(State::FilteredRequired(FilteredRequired::new(page)))
Ok(State::FilteredRequired(FilteredRequired::try_new(page)?))
}
_ => Err(utils::not_implemented(page)),
}
Expand Down
20 changes: 12 additions & 8 deletions src/io/parquet/read/deserialize/boolean/nested.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::collections::VecDeque;

use parquet2::{encoding::Encoding, page::DataPage, schema::Repetition};
use parquet2::{
encoding::Encoding,
page::{split_buffer, DataPage},
schema::Repetition,
};

use crate::{
array::BooleanArray,
Expand All @@ -24,13 +28,13 @@ struct Required<'a> {
}

impl<'a> Required<'a> {
pub fn new(page: &'a DataPage) -> Self {
let (_, _, values) = utils::split_buffer(page);
Self {
pub fn try_new(page: &'a DataPage) -> Result<Self> {
let (_, _, values) = split_buffer(page)?;
Ok(Self {
values,
offset: 0,
length: page.num_values(),
}
})
}
}

Expand Down Expand Up @@ -71,12 +75,12 @@ impl<'a> Decoder<'a> for BooleanDecoder {

match (page.encoding(), is_optional, is_filtered) {
(Encoding::Plain, true, false) => {
let (_, _, values) = utils::split_buffer(page);
let (_, _, values) = split_buffer(page)?;
let values = BitmapIter::new(values, 0, values.len() * 8);

Ok(State::Optional(Optional::new(page), values))
Ok(State::Optional(Optional::try_new(page)?, values))
}
(Encoding::Plain, false, false) => Ok(State::Required(Required::new(page))),
(Encoding::Plain, false, false) => Ok(State::Required(Required::try_new(page)?)),
_ => Err(utils::not_implemented(page)),
}
}
Expand Down
32 changes: 16 additions & 16 deletions src/io/parquet/read/deserialize/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ pub struct Required<'a> {
}

impl<'a> Required<'a> {
fn new(page: &'a DataPage) -> Self {
let values = dict_indices_decoder(page);
Self { values }
fn try_new(page: &'a DataPage) -> Result<Self> {
let values = dict_indices_decoder(page)?;
Ok(Self { values })
}
}

Expand All @@ -48,13 +48,13 @@ pub struct FilteredRequired<'a> {
}

impl<'a> FilteredRequired<'a> {
fn new(page: &'a DataPage) -> Self {
let values = dict_indices_decoder(page);
fn try_new(page: &'a DataPage) -> Result<Self> {
let values = dict_indices_decoder(page)?;

let rows = get_selected_rows(page);
let values = SliceFilteredIter::new(values, rows);

Self { values }
Ok(Self { values })
}
}

Expand All @@ -65,13 +65,13 @@ pub struct Optional<'a> {
}

impl<'a> Optional<'a> {
fn new(page: &'a DataPage) -> Self {
let values = dict_indices_decoder(page);
fn try_new(page: &'a DataPage) -> Result<Self> {
let values = dict_indices_decoder(page)?;

Self {
Ok(Self {
values,
validity: OptionalPageValidity::new(page),
}
validity: OptionalPageValidity::try_new(page)?,
})
}
}

Expand Down Expand Up @@ -120,18 +120,18 @@ where

match (page.encoding(), is_optional, is_filtered) {
(Encoding::PlainDictionary | Encoding::RleDictionary, false, false) => {
Ok(State::Required(Required::new(page)))
Required::try_new(page).map(State::Required)
}
(Encoding::PlainDictionary | Encoding::RleDictionary, true, false) => {
Ok(State::Optional(Optional::new(page)))
Optional::try_new(page).map(State::Optional)
}
(Encoding::PlainDictionary | Encoding::RleDictionary, false, true) => {
Ok(State::FilteredRequired(FilteredRequired::new(page)))
FilteredRequired::try_new(page).map(State::FilteredRequired)
}
(Encoding::PlainDictionary | Encoding::RleDictionary, true, true) => {
Ok(State::FilteredOptional(
FilteredOptionalPageValidity::new(page),
dict_indices_decoder(page),
FilteredOptionalPageValidity::try_new(page)?,
dict_indices_decoder(page)?,
))
}
_ => Err(utils::not_implemented(page)),
Expand Down
Loading

0 comments on commit c47041d

Please sign in to comment.