diff --git a/src/io/parquet/read/deserialize/binary/basic.rs b/src/io/parquet/read/deserialize/binary/basic.rs index 37440c9ba7f..545008ad119 100644 --- a/src/io/parquet/read/deserialize/binary/basic.rs +++ b/src/io/parquet/read/deserialize/binary/basic.rs @@ -3,7 +3,7 @@ use std::default::Default; use parquet2::{ deserialize::SliceFilteredIter, - encoding::{hybrid_rle, Encoding}, + encoding::{delta_length_byte_array, hybrid_rle, Encoding}, page::{split_buffer, DataPage, DictPage}, schema::Repetition, }; @@ -23,44 +23,6 @@ use super::super::utils::{ use super::super::Pages; use super::{super::utils, utils::*}; -/* -fn read_delta_optional( - validity_buffer: &[u8], - values_buffer: &[u8], - additional: usize, - values: &mut Binary, - validity: &mut MutableBitmap, -) { - let Binary { - offsets, - values, - last_offset, - } = values; - - // values_buffer: first 4 bytes are len, remaining is values - let mut values_iterator = delta_length_byte_array::Decoder::new(values_buffer); - let offsets_iterator = values_iterator.by_ref().map(|x| { - *last_offset += O::from_usize(x as usize).unwrap(); - *last_offset - }); - - let mut page_validity = OptionalPageValidity::new(validity_buffer, additional); - - // offsets: - extend_from_decoder( - validity, - &mut page_validity, - None, - offsets, - offsets_iterator, - ); - - // values: - let new_values = values_iterator.into_values(); - values.extend_from_slice(new_values); -} - */ - #[derive(Debug)] pub(super) struct Required<'a> { pub values: SizedBinaryIter<'a>, @@ -79,6 +41,52 @@ impl<'a> Required<'a> { } } +#[derive(Debug)] +pub(super) struct Delta<'a> { + pub lengths: std::vec::IntoIter, + pub values: &'a [u8], +} + +impl<'a> Delta<'a> { + pub fn try_new(page: &'a DataPage) -> Result { + let (_, _, values) = split_buffer(page)?; + + let mut lengths_iter = delta_length_byte_array::Decoder::new(values); + + #[allow(clippy::needless_collect)] // we need to consume it to get the values + let lengths = lengths_iter + .by_ref() + .map(|x| x as usize) + .collect::>(); + + let values = lengths_iter.into_values(); + Ok(Self { + lengths: lengths.into_iter(), + values, + }) + } + + pub fn len(&self) -> usize { + self.lengths.size_hint().0 + } +} + +impl<'a> Iterator for Delta<'a> { + type Item = &'a [u8]; + + #[inline] + fn next(&mut self) -> Option { + let length = self.lengths.next()?; + let (item, remaining) = self.values.split_at(length); + self.values = remaining; + Some(item) + } + + fn size_hint(&self) -> (usize, Option) { + self.lengths.size_hint() + } +} + #[derive(Debug)] pub(super) struct FilteredRequired<'a> { pub values: SliceFilteredIter>, @@ -99,6 +107,26 @@ impl<'a> FilteredRequired<'a> { } } +#[derive(Debug)] +pub(super) struct FilteredDelta<'a> { + pub values: SliceFilteredIter>, +} + +impl<'a> FilteredDelta<'a> { + pub fn try_new(page: &'a DataPage) -> Result { + let values = Delta::try_new(page)?; + + let rows = get_selected_rows(page); + let values = SliceFilteredIter::new(values, rows); + + Ok(Self { values }) + } + + pub fn len(&self) -> usize { + self.values.size_hint().0 + } +} + pub(super) type Dict = Vec>; #[derive(Debug)] @@ -167,7 +195,11 @@ enum State<'a> { Required(Required<'a>), RequiredDictionary(RequiredDictionary<'a>), OptionalDictionary(OptionalPageValidity<'a>, ValuesDictionary<'a>), + Delta(Delta<'a>), + OptionalDelta(OptionalPageValidity<'a>, Delta<'a>), FilteredRequired(FilteredRequired<'a>), + FilteredDelta(FilteredDelta<'a>), + FilteredOptionalDelta(FilteredOptionalPageValidity<'a>, Delta<'a>), FilteredOptional(FilteredOptionalPageValidity<'a>, BinaryIter<'a>), FilteredRequiredDictionary(FilteredRequiredDictionary<'a>), FilteredOptionalDictionary(FilteredOptionalPageValidity<'a>, ValuesDictionary<'a>), @@ -178,10 +210,14 @@ impl<'a> utils::PageState<'a> for State<'a> { match self { State::Optional(validity, _) => validity.len(), State::Required(state) => state.len(), + State::Delta(state) => state.len(), + State::OptionalDelta(state, _) => state.len(), State::RequiredDictionary(values) => values.len(), State::OptionalDictionary(optional, _) => optional.len(), State::FilteredRequired(state) => state.len(), State::FilteredOptional(validity, _) => validity.len(), + State::FilteredDelta(state) => state.len(), + State::FilteredOptionalDelta(state, _) => state.len(), State::FilteredRequiredDictionary(values) => values.len(), State::FilteredOptionalDictionary(optional, _) => optional.len(), } @@ -284,6 +320,20 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { BinaryIter::new(values), )) } + (Encoding::DeltaLengthByteArray, _, false, false) => { + Delta::try_new(page).map(State::Delta) + } + (Encoding::DeltaLengthByteArray, _, true, false) => Ok(State::OptionalDelta( + OptionalPageValidity::try_new(page)?, + Delta::try_new(page)?, + )), + (Encoding::DeltaLengthByteArray, _, false, true) => { + FilteredDelta::try_new(page).map(State::FilteredDelta) + } + (Encoding::DeltaLengthByteArray, _, true, true) => Ok(State::FilteredOptionalDelta( + FilteredOptionalPageValidity::try_new(page)?, + Delta::try_new(page)?, + )), _ => Err(utils::not_implemented(page)), } } @@ -315,11 +365,44 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { values.push(x) } } + State::Delta(page) => { + values.extend_lengths(page.lengths.by_ref().take(additional), &mut page.values); + } + State::OptionalDelta(page_validity, page_values) => { + let Binary { + offsets, + values: values_, + last_offset, + } = values; + + let offset = *last_offset; + extend_from_decoder( + validity, + page_validity, + Some(additional), + offsets, + page_values.lengths.by_ref().map(|x| { + *last_offset += O::from_usize(x).unwrap(); + *last_offset + }), + ); + + let length = *last_offset - offset; + + let (consumed, remaining) = page_values.values.split_at(length.to_usize()); + page_values.values = remaining; + values_.extend_from_slice(consumed); + } State::FilteredRequired(page) => { for x in page.values.by_ref().take(additional) { values.push(x) } } + State::FilteredDelta(page) => { + for x in page.values.by_ref().take(additional) { + values.push(x) + } + } State::OptionalDictionary(page_validity, page_values) => { let page_dict = &page_values.dict; let op = move |index: u32| page_dict[index as usize].as_ref(); @@ -348,6 +431,15 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { page_values.by_ref(), ); } + State::FilteredOptionalDelta(page_validity, page_values) => { + utils::extend_from_decoder( + validity, + page_validity, + Some(additional), + values, + page_values.by_ref(), + ); + } State::FilteredRequiredDictionary(page) => { let page_dict = &page.dict; let op = move |index: u32| page_dict[index as usize].as_ref(); diff --git a/src/io/parquet/read/deserialize/binary/utils.rs b/src/io/parquet/read/deserialize/binary/utils.rs index eac2b806643..2d0b5f38788 100644 --- a/src/io/parquet/read/deserialize/binary/utils.rs +++ b/src/io/parquet/read/deserialize/binary/utils.rs @@ -13,6 +13,17 @@ pub struct Binary { #[derive(Debug)] pub struct Offsets(pub Vec); +impl Offsets { + #[inline] + pub fn extend_lengths>(&mut self, lengths: I) { + let mut last_offset = *self.0.last().unwrap(); + self.0.extend(lengths.map(|length| { + last_offset += O::from_usize(length).unwrap(); + last_offset + })); + } +} + impl Pushable for Offsets { #[inline] fn len(&self) -> usize { @@ -63,6 +74,17 @@ impl Binary { pub fn len(&self) -> usize { self.offsets.len() } + + #[inline] + pub fn extend_lengths>(&mut self, lengths: I, values: &mut &[u8]) { + let current_offset = self.last_offset; + self.offsets.extend_lengths(lengths); + self.last_offset = *self.offsets.0.last().unwrap(); // guaranteed to have one + let length = self.last_offset.to_usize() - current_offset.to_usize(); + let (consumed, remaining) = values.split_at(length); + *values = remaining; + self.values.extend_from_slice(consumed); + } } impl<'a, O: Offset> Pushable<&'a [u8]> for Binary { diff --git a/src/io/parquet/write/binary/basic.rs b/src/io/parquet/write/binary/basic.rs index 58156c901e6..f8a8cdc47ae 100644 --- a/src/io/parquet/write/binary/basic.rs +++ b/src/io/parquet/write/binary/basic.rs @@ -138,6 +138,13 @@ pub(crate) fn encode_delta( delta_bitpacked::encode(lengths, buffer); } else { + println!( + "{:?}", + offsets + .windows(2) + .map(|w| (w[1] - w[0]).to_usize() as i64) + .collect::>() + ); let lengths = offsets.windows(2).map(|w| (w[1] - w[0]).to_usize() as i64); delta_bitpacked::encode(lengths, buffer); } diff --git a/tests/it/io/parquet/read_indexes.rs b/tests/it/io/parquet/read_indexes.rs index a9776fdb5a9..bff9346006c 100644 --- a/tests/it/io/parquet/read_indexes.rs +++ b/tests/it/io/parquet/read_indexes.rs @@ -167,6 +167,18 @@ fn indexed_required_utf8() -> Result<()> { read_with_indexes(pages(&[&array21, &array22], Encoding::Plain)?, expected) } +#[test] +fn indexed_required_utf8_delta() -> Result<()> { + let array21 = Utf8Array::::from_slice(["a", "b", "c"]); + let array22 = Utf8Array::::from_slice(["d", "e", "f"]); + let expected = Utf8Array::::from_slice(["e"]).boxed(); + + read_with_indexes( + pages(&[&array21, &array22], Encoding::DeltaLengthByteArray)?, + expected, + ) +} + #[test] fn indexed_required_i32() -> Result<()> { let array21 = Int32Array::from_slice([1, 2, 3]); @@ -194,6 +206,18 @@ fn indexed_optional_utf8() -> Result<()> { read_with_indexes(pages(&[&array21, &array22], Encoding::Plain)?, expected) } +#[test] +fn indexed_optional_utf8_delta() -> Result<()> { + let array21 = Utf8Array::::from([Some("a"), Some("b"), None]); + let array22 = Utf8Array::::from([None, Some("e"), Some("f")]); + let expected = Utf8Array::::from_slice(["e"]).boxed(); + + read_with_indexes( + pages(&[&array21, &array22], Encoding::DeltaLengthByteArray)?, + expected, + ) +} + #[test] fn indexed_required_fixed_len() -> Result<()> { let array21 = FixedSizeBinaryArray::from_slice([[127], [128], [129]]); diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index 343bd419f3b..f95b53331c5 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -339,7 +339,6 @@ fn list_large_binary_optional_v1() -> Result<()> { } #[test] -#[ignore] fn utf8_optional_v2_delta() -> Result<()> { round_trip( "string", @@ -350,6 +349,17 @@ fn utf8_optional_v2_delta() -> Result<()> { ) } +#[test] +fn utf8_required_v2_delta() -> Result<()> { + round_trip( + "string", + "required", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::DeltaLengthByteArray], + ) +} + #[test] fn i32_optional_v2_dict() -> Result<()> { round_trip(