Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed May 24, 2022
1 parent 985dddb commit e773c10
Show file tree
Hide file tree
Showing 4 changed files with 675 additions and 10 deletions.
76 changes: 67 additions & 9 deletions src/io/parquet/write/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use parquet2::write::Version;

use crate::{array::Offset, bitmap::Bitmap, error::Result};

fn num_values_iter<I: Iterator<Item = usize>>(lengths: I) -> usize {
pub fn num_values_iter<I: Iterator<Item = usize>>(lengths: I) -> usize {
lengths
.map(|length| if length == 0 { 1 } else { length })
.sum()
}

fn to_length<O: Offset>(
pub fn to_length<O: Offset>(
offsets: &[O],
) -> impl Iterator<Item = usize> + std::fmt::Debug + Clone + '_ {
offsets
Expand All @@ -21,23 +21,41 @@ pub fn num_values<O: Offset>(offsets: &[O]) -> usize {
num_values_iter(to_length(offsets))
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RepetitionRecord {
Empty,
FirstItem(usize),
Item,
}

impl From<RepetitionRecord> for u32 {
#[inline]
fn from(other: RepetitionRecord) -> Self {
match other {
RepetitionRecord::Empty => 0,
RepetitionRecord::FirstItem(_) => 0,
RepetitionRecord::Item => 1,
}
}
}

/// Iterator adapter of dremel repetition levels. The adapted iterator is assumed to be a sequence of lengths.
///
/// This iterator returns 0 or 1 depending on whether it is the start of the record.
/// For example, the lengths [0, 1, 2, 0] yield [0, 0, 0, 1, 0]:
/// * 0 -> 0
/// * 0 -> Empty
/// * 1 -> 0
/// * 2 -> 0, 1
/// * 0 -> 0
#[derive(Debug)]
pub struct RepLevelsIter<I: Iterator<Item = usize> + std::fmt::Debug + Clone> {
pub struct RepRecordIter<I: Iterator<Item = usize> + std::fmt::Debug + Clone> {
iter: I,
remaining: usize,
length: usize,
total_size: usize,
}

impl<I: Iterator<Item = usize> + std::fmt::Debug + Clone> RepLevelsIter<I> {
impl<I: Iterator<Item = usize> + std::fmt::Debug + Clone> RepRecordIter<I> {
/// `iter` is expected to be a list of lengths.
pub fn new(iter: I) -> Self {
let total_size = num_values_iter(iter.clone());
Expand All @@ -51,29 +69,69 @@ impl<I: Iterator<Item = usize> + std::fmt::Debug + Clone> RepLevelsIter<I> {
}
}

impl<I: Iterator<Item = usize> + std::fmt::Debug + Clone> Iterator for RepLevelsIter<I> {
type Item = u32;
impl<I: Iterator<Item = usize> + std::fmt::Debug + Clone> Iterator for RepRecordIter<I> {
type Item = RepetitionRecord;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.remaining == self.length {
self.length = self.iter.next()?;
self.remaining = 0;
if self.length == 0 {
self.total_size -= 1;
return Some(0);
return Some(RepetitionRecord::Empty);
}
}
let old = self.remaining;
self.remaining += 1;
self.total_size -= 1;
Some((old >= 1) as u32)
Some(if old >= 1 {
RepetitionRecord::Item
} else {
RepetitionRecord::FirstItem(self.length)
})
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.total_size, Some(self.total_size))
}
}

/// Iterator adapter of dremel repetition levels. The adapted iterator is assumed to be a sequence of lengths.
///
/// This iterator returns 0 or 1 depending on whether it is the start of the record.
/// For example, the lengths [0, 1, 2, 0] yield [0, 0, 0, 1, 0]:
/// * 0 -> 0
/// * 1 -> 0
/// * 2 -> 0, 1
/// * 0 -> 0
#[derive(Debug)]
pub struct RepLevelsIter<I: Iterator<Item = usize> + std::fmt::Debug + Clone> {
iter: RepRecordIter<I>,
}

impl<I: Iterator<Item = usize> + std::fmt::Debug + Clone> RepLevelsIter<I> {
/// `iter` is expected to be a list of lengths.
pub fn new(iter: I) -> Self {
Self {
iter: RepRecordIter::new(iter),
}
}
}

impl<I: Iterator<Item = usize> + std::fmt::Debug + Clone> Iterator for RepLevelsIter<I> {
type Item = u32;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.iter.next().map(|x| x.into())
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}
}

/// Iterator adapter of parquet / dremel definition levels
pub struct DefLevelsIter<L, II, I>
where
Expand Down
4 changes: 3 additions & 1 deletion src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ mod dictionary;
mod file;
mod fixed_len_bytes;
mod levels;
mod nested;
mod pages;
mod primitive;
mod row_group;
mod schema;
Expand All @@ -29,7 +31,7 @@ pub use parquet2::{
fallible_streaming_iterator,
metadata::{Descriptor, KeyValue, SchemaDescriptor},
page::{CompressedDataPage, CompressedPage, EncodedPage},
schema::types::ParquetType,
schema::types::{FieldInfo, ParquetType, PhysicalType as ParquetPhysicalType},
write::{compress, Compressor, DynIter, DynStreamingIterator, RowGroupIter, Version},
FallibleStreamingIterator,
};
Expand Down
182 changes: 182 additions & 0 deletions src/io/parquet/write/nested.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
use super::levels::to_length;
use super::pages::Nested;

trait DebugIter: Iterator<Item = usize> + std::fmt::Debug {}

impl<A: Iterator<Item = usize> + std::fmt::Debug> DebugIter for A {}

fn iter<'a>(nested: &'a [Nested]) -> Vec<Box<dyn DebugIter + 'a>> {
nested
.iter()
.filter_map(|nested| match nested {
Nested::Primitive(_, _) => None,
Nested::List(nested) => Some(Box::new(to_length(nested.offsets)) as Box<dyn DebugIter>),
Nested::LargeList(nested) => {
Some(Box::new(to_length(nested.offsets)) as Box<dyn DebugIter>)
}
Nested::Struct(_, _) => None,
})
.collect()
}

fn num_values(nested: &[Nested]) -> usize {
let iterators = iter(nested);
let depth = iterators.len();

iterators
.into_iter()
.enumerate()
.map(|(index, lengths)| {
let length = if index == depth - 1 {
lengths
.map(|length| if length == 0 { 1 } else { length })
.sum::<usize>()
} else {
lengths
.map(|length| if length == 0 { 1 } else { 0 })
.sum::<usize>()
};
length
})
.sum()
}

/// Iterator adapter of parquet / dremel repetition levels
#[derive(Debug)]
pub struct RepLevelsIter<'a> {
// iterators of lengths. E.g. [[[a,b,c], [d,e,f,g]], [[h], [i,j]]] -> [[2, 2], [3, 4, 1, 2]]
iter: Vec<Box<dyn DebugIter + 'a>>,
// vector containing the remaining number of values of each iterator.
// e.g. the iters [[2, 2], [3, 4, 1, 2]] after the first iteration will return [2, 3],
// and remaining will be [2, 3].
// on the second iteration, it will be `[2, 2]` (since iterations consume the last items)
remaining: Vec<usize>, /* < remaining.len() == iter.len() */
// cache of the first `remaining` that is non-zero. Examples:
// * `remaining = [2, 2] => current_level = 2`
// * `remaining = [2, 0] => current_level = 1`
// * `remaining = [0, 0] => current_level = 0`
current_level: usize, /* < iter.len() */
// the number to discount due to being the first element of the iterators.
total: usize, /* < iter.len() */

// the total number of items that this iterator will return
remaining_values: usize,
}

impl<'a> RepLevelsIter<'a> {
pub fn new(nested: &'a [Nested]) -> Self {
let remaining_values = num_values(nested);

let iter = iter(nested);
let remaining = std::iter::repeat(0).take(iter.len()).collect();

Self {
iter,
remaining,
total: 0,
current_level: 0,
remaining_values,
}
}
}

impl<'a> Iterator for RepLevelsIter<'a> {
type Item = u32;

fn next(&mut self) -> Option<Self::Item> {
if *self.remaining.last().unwrap() > 0 {
*self.remaining.last_mut().unwrap() -= 1;

let total = self.total;
self.total = 0;
let r = Some((self.current_level - total) as u32);

for level in 0..self.current_level - 1 {
let level = self.remaining.len() - level - 1;
if self.remaining[level] == 0 {
self.current_level -= 1;
self.remaining[level.saturating_sub(1)] -= 1;
}
}
if self.remaining[0] == 0 {
self.current_level -= 1;
}
self.remaining_values -= 1;
return r;
}

self.total = 0;
for (iter, remaining) in self
.iter
.iter_mut()
.zip(self.remaining.iter_mut())
.skip(self.current_level)
{
let length: usize = iter.next()?;
if length == 0 {
self.remaining_values -= 1;
return Some(self.current_level as u32);
}
*remaining = length;
self.current_level += 1;
self.total += 1;
}
self.next()
}

fn size_hint(&self) -> (usize, Option<usize>) {
let length = self.remaining_values;
(length, Some(length))
}
}

#[cfg(test)]
mod tests {
use crate::io::parquet::write::levels::NestedInfo;

use super::*;

#[test]
fn test_rep_levels() {
let nested = vec![
Nested::List(NestedInfo::<i32> {
is_optional: false,
offsets: &[0, 2, 4],
validity: None,
}),
Nested::List(NestedInfo::<i32> {
is_optional: false,
offsets: &[0, 3, 7, 8, 10],
validity: None,
}),
Nested::Primitive(None, false),
];
let mut iter = RepLevelsIter::new(&nested);
assert_eq!(iter.size_hint().0, 10);
let result = iter.by_ref().collect::<Vec<_>>();
assert_eq!(result, vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]);
assert_eq!(iter.size_hint().0, 0);
}

#[test]
fn test_rep_levels_with_zero() {
let nested = vec![
Nested::List(NestedInfo::<i32> {
is_optional: false,
offsets: &[0, 2, 2, 4],
validity: None,
}),
Nested::List(NestedInfo::<i32> {
is_optional: false,
offsets: &[0, 3, 7, 8, 10],
validity: None,
}),
Nested::Primitive(None, false),
];
let mut iter = RepLevelsIter::new(&nested);
assert_eq!(iter.size_hint().0, 11);
let result = iter.by_ref().collect::<Vec<_>>();
assert_eq!(result, vec![0, 2, 2, 1, 2, 2, 2, 0, 0, 1, 2]);
assert_eq!(iter.size_hint().0, 0);
}
}
Loading

0 comments on commit e773c10

Please sign in to comment.