Skip to content

Commit

Permalink
Support skip_values in ColumnValueDecoderImpl (#2089)
Browse files Browse the repository at this point in the history
* Add skip method to ParquetValueType

* Tests

* PR comments

* Update parquet/src/encodings/decoding.rs

Co-authored-by: Yang Jiang <jiangyang381@163.com>

* PR comments

Co-authored-by: Yang Jiang <jiangyang381@163.com>
  • Loading branch information
thinkharderdev and Ted-Jiang authored Jul 16, 2022
1 parent 13adaa7 commit c585544
Show file tree
Hide file tree
Showing 5 changed files with 752 additions and 10 deletions.
13 changes: 11 additions & 2 deletions parquet/src/column/reader/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,17 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
current_decoder.get(&mut out[range])
}

fn skip_values(&mut self, _num_values: usize) -> Result<usize> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
fn skip_values(&mut self, num_values: usize) -> Result<usize> {
let encoding = self
.current_encoding
.expect("current_encoding should be set");

let current_decoder = self
.decoders
.get_mut(&encoding)
.unwrap_or_else(|| panic!("decoder for encoding {} should be set", encoding));

current_decoder.skip(num_values)
}
}

Expand Down
85 changes: 85 additions & 0 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,8 @@ pub(crate) mod private {
decoder: &mut PlainDecoderDetails,
) -> Result<usize>;

fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize>;

/// Return the encoded size for a type
fn dict_encoding_size(&self) -> (usize, usize) {
(std::mem::size_of::<Self>(), 1)
Expand Down Expand Up @@ -690,6 +692,14 @@ pub(crate) mod private {
Ok(values_read)
}

fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
let bit_reader = decoder.bit_reader.as_mut().unwrap();
let num_values = std::cmp::min(num_values, decoder.num_values);
let values_read = bit_reader.skip(num_values, 1);
decoder.num_values -= values_read;
Ok(values_read)
}

#[inline]
fn as_i64(&self) -> Result<i64> {
Ok(*self as i64)
Expand Down Expand Up @@ -764,6 +774,23 @@ pub(crate) mod private {
Ok(num_values)
}

#[inline]
fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
let data = decoder.data.as_ref().expect("set_data should have been called");
let num_values = num_values.min(decoder.num_values);
let bytes_left = data.len() - decoder.start;
let bytes_to_skip = std::mem::size_of::<Self>() * num_values;

if bytes_left < bytes_to_skip {
return Err(eof_err!("Not enough bytes to skip"));
}

decoder.start += bytes_to_skip;
decoder.num_values -= num_values;

Ok(num_values)
}

#[inline]
fn as_i64(&$self) -> Result<i64> {
$as_i64
Expand Down Expand Up @@ -853,6 +880,24 @@ pub(crate) mod private {
Ok(num_values)
}

fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
let data = decoder
.data
.as_ref()
.expect("set_data should have been called");
let num_values = std::cmp::min(num_values, decoder.num_values);
let bytes_left = data.len() - decoder.start;
let bytes_to_skip = 12 * num_values;

if bytes_left < bytes_to_skip {
return Err(eof_err!("Not enough bytes to skip"));
}
decoder.start += bytes_to_skip;
decoder.num_values -= num_values;

Ok(num_values)
}

#[inline]
fn as_any(&self) -> &dyn std::any::Any {
self
Expand Down Expand Up @@ -936,6 +981,24 @@ pub(crate) mod private {
Ok(num_values)
}

fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
let data = decoder
.data
.as_mut()
.expect("set_data should have been called");
let num_values = num_values.min(decoder.num_values);

for _ in 0..num_values {
let len: usize =
read_num_bytes!(u32, 4, data.start_from(decoder.start).as_ref())
as usize;
decoder.start += std::mem::size_of::<u32>() + len;
}
decoder.num_values -= num_values;

Ok(num_values)
}

#[inline]
fn dict_encoding_size(&self) -> (usize, usize) {
(std::mem::size_of::<u32>(), self.len())
Expand Down Expand Up @@ -1005,6 +1068,28 @@ pub(crate) mod private {
Ok(num_values)
}

fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
assert!(decoder.type_length > 0);

let data = decoder
.data
.as_mut()
.expect("set_data should have been called");
let num_values = std::cmp::min(num_values, decoder.num_values);
for _ in 0..num_values {
let len = decoder.type_length as usize;

if data.len() < decoder.start + len {
return Err(eof_err!("Not enough bytes to skip"));
}

decoder.start += len;
}
decoder.num_values -= num_values;

Ok(num_values)
}

#[inline]
fn dict_encoding_size(&self) -> (usize, usize) {
(std::mem::size_of::<u32>(), self.len())
Expand Down
Loading

0 comments on commit c585544

Please sign in to comment.