Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement dictionary support for reading ByteView from parquet #5973

Merged
merged 2 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 104 additions & 9 deletions parquet/src/arrow/array_reader/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::view_buffer::ViewBuffer;
use crate::arrow::decoder::DictIndexDecoder;
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Encoding};
Expand All @@ -25,6 +26,7 @@ use crate::column::reader::decoder::ColumnValueDecoder;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use arrow_array::ArrayRef;
use arrow_data::ByteView;
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;
use std::any::Any;
Expand Down Expand Up @@ -210,6 +212,7 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder {
/// A generic decoder from uncompressed parquet value data to [`ViewBuffer`]
pub enum ByteViewArrayDecoder {
Plain(ByteViewArrayDecoderPlain),
Dictionary(ByteViewArrayDecoderDictionary),
}

impl ByteViewArrayDecoder {
Expand All @@ -227,10 +230,14 @@ impl ByteViewArrayDecoder {
num_values,
validate_utf8,
)),
Encoding::RLE_DICTIONARY
| Encoding::PLAIN_DICTIONARY
| Encoding::DELTA_LENGTH_BYTE_ARRAY
| Encoding::DELTA_BYTE_ARRAY => unimplemented!("stay tuned!"),
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new(
data, num_levels, num_values,
))
}
Encoding::DELTA_LENGTH_BYTE_ARRAY | Encoding::DELTA_BYTE_ARRAY => {
unimplemented!("stay tuned!")
}
_ => {
return Err(general_err!(
"unsupported encoding for byte array: {}",
Expand All @@ -247,17 +254,27 @@ impl ByteViewArrayDecoder {
&mut self,
out: &mut ViewBuffer,
len: usize,
_dict: Option<&ViewBuffer>,
dict: Option<&ViewBuffer>,
) -> Result<usize> {
match self {
ByteViewArrayDecoder::Plain(d) => d.read(out, len),
ByteViewArrayDecoder::Dictionary(d) => {
let dict = dict
.ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
d.read(out, dict, len)
}
}
}

/// Skip `len` values
pub fn skip(&mut self, len: usize, _dict: Option<&ViewBuffer>) -> Result<usize> {
pub fn skip(&mut self, len: usize, dict: Option<&ViewBuffer>) -> Result<usize> {
match self {
ByteViewArrayDecoder::Plain(d) => d.skip(len),
ByteViewArrayDecoder::Dictionary(d) => {
let dict = dict
.ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
d.skip(dict, len)
}
}
}
}
Expand Down Expand Up @@ -348,6 +365,82 @@ impl ByteViewArrayDecoderPlain {
}
}

pub struct ByteViewArrayDecoderDictionary {
decoder: DictIndexDecoder,
}

impl ByteViewArrayDecoderDictionary {
fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
Self {
decoder: DictIndexDecoder::new(data, num_levels, num_values),
}
}

fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result<usize> {
if dict.is_empty() || len == 0 {
return Ok(0);
}

// Check if the last few buffer of `output`` are the same as the `dict` buffer
// This is to avoid creating a new buffers if the same dictionary is used for multiple `read`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a clever optimization

let need_to_create_new_buffer = {
if output.buffers.len() >= dict.buffers.len() {
let offset = output.buffers.len() - dict.buffers.len();
output.buffers[offset..]
.iter()
.zip(dict.buffers.iter())
.any(|(a, b)| !a.ptr_eq(b))
} else {
true
}
};

// Calculate the offset of the dictionary buffers in the output buffers
XiangpengHao marked this conversation as resolved.
Show resolved Hide resolved
// For example if the 2nd buffer in the dictionary is the 5th buffer in the output buffers,
// then the base_buffer_idx is 5 - 2 = 3
let base_buffer_idx = if need_to_create_new_buffer {
XiangpengHao marked this conversation as resolved.
Show resolved Hide resolved
let old_len = output.buffers.len();
for b in dict.buffers.iter() {
output.buffers.push(b.clone());
}
old_len as u32
} else {
output.buffers.len() as u32 - dict.buffers.len() as u32
};

self.decoder.read(len, |keys| {
for k in keys {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know for sure k are valid dictionary indices (aka since this is from untrusted input, do we have to verify that k is within the bound of the dictionary)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The input is untrusted, but I think we can do very little to make it safer, e.g., if the view is maliciously crafted we can easily run into memory issues due to too large offset or invalid buffer idx.
But I agree that having this early check is useful, if it doesn't make the code significantly slower. I added the check here, will come back to this when I file the PR to optimize the performance.

let view = unsafe { dict.views.get_unchecked(*k as usize) };
let len = *view as u32;
if len <= 12 {
// directly append the view if it is inlined
// Safety: the view is from the dictionary, so it is valid
unsafe {
output.append_raw_view_unchecked(view);
}
} else {
// correct the buffer index and append the view
let mut view = ByteView::from(*view);
view.buffer_index += base_buffer_idx;
// Safety: the view is from the dictionary,
// we corrected the index value to point it to output buffer, so it is valid
unsafe {
output.append_raw_view_unchecked(&view.into());
}
}
}
Ok(())
})
}

fn skip(&mut self, dict: &ViewBuffer, to_skip: usize) -> Result<usize> {
if dict.is_empty() {
return Ok(0);
}
self.decoder.skip(to_skip)
}
}

/// Check that `val` is a valid UTF-8 sequence
pub fn check_valid_utf8(val: &[u8]) -> Result<()> {
match std::str::from_utf8(val) {
Expand Down Expand Up @@ -386,8 +479,11 @@ mod tests {
.unwrap();

for (encoding, page) in pages {
if encoding != Encoding::PLAIN {
// skip non-plain encodings for now as they are not yet implemented
if encoding != Encoding::PLAIN
&& encoding != Encoding::RLE_DICTIONARY
&& encoding != Encoding::PLAIN_DICTIONARY
{
// skip unsupported encodings for now as they are not yet implemented
continue;
}
let mut output = ViewBuffer::default();
Expand All @@ -399,7 +495,6 @@ mod tests {
assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);

assert_eq!(output.views.len(), 4);
assert_eq!(output.buffers.len(), 4);

let valid = [false, false, true, true, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
Expand Down
13 changes: 13 additions & 0 deletions parquet/src/arrow/buffer/view_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ pub struct ViewBuffer {
}

impl ViewBuffer {
pub fn is_empty(&self) -> bool {
self.views.is_empty()
}

#[allow(unused)]
pub fn append_block(&mut self, block: Buffer) -> u32 {
let block_id = self.buffers.len() as u32;
Expand All @@ -56,6 +60,15 @@ impl ViewBuffer {
self.views.push(view);
}

/// Directly append a view to the view array.
/// This is used when we create a StringViewArray from a dictionary whose values are StringViewArray.
///
/// # Safety
/// The `view` must be a valid view as per the ByteView spec.
pub unsafe fn append_raw_view_unchecked(&mut self, view: &u128) {
self.views.push(*view);
}

/// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer`
#[allow(unused)]
pub fn into_array(self, null_buffer: Option<Buffer>, data_type: &ArrowType) -> ArrayRef {
Expand Down
Loading