Skip to content

Commit

Permalink
Support arrow readers for strings with DELTA_BYTE_ARRAY encoding (apa…
Browse files Browse the repository at this point in the history
…che#709)

* Support arrow readers for strings with DELTA_BYTE_ARRAY encoding

* Review fixes

1. move slice init out of the loop,
2. add tests for nulls,
3. use `debug_assert` for programming error assertion.
  • Loading branch information
ilya-biryukov authored Aug 26, 2021
1 parent 9d6ca6b commit 5a12d97
Showing 1 changed file with 164 additions and 2 deletions.
166 changes: 164 additions & 2 deletions parquet/src/arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
use super::array_reader::ArrayReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::Encoding;
use crate::data_type::{ByteArray, ByteArrayType};
use crate::decoding::{Decoder, DeltaByteArrayDecoder};
use crate::errors::{ParquetError, Result};
use crate::{
column::page::{Page, PageIterator},
Expand Down Expand Up @@ -485,7 +487,10 @@ impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> {
// Encoding::RLE => Box::new(RleValueDecoder::new()),
// Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackDecoder::new()),
// Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayDecoder::new()),
// Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayDecoder::new()),
Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayValueDecoder::new(
values_buffer,
num_values,
)?)),
e => return Err(nyi_err!("Encoding {} is not supported", e)),
}
}
Expand Down Expand Up @@ -1074,6 +1079,39 @@ impl ValueDecoder for VariableLenDictionaryDecoder {
}
}

pub(crate) struct DeltaByteArrayValueDecoder {
decoder: DeltaByteArrayDecoder<ByteArrayType>,
}

impl DeltaByteArrayValueDecoder {
pub fn new(data: ByteBufferPtr, num_values: usize) -> Result<Self> {
let mut decoder = DeltaByteArrayDecoder::new();
decoder.set_data(data, num_values)?;
Ok(Self { decoder })
}
}

impl ValueDecoder for DeltaByteArrayValueDecoder {
fn read_value_bytes(
&mut self,
mut num_values: usize,
read_bytes: &mut dyn FnMut(&[u8], usize),
) -> Result<usize> {
num_values = std::cmp::min(num_values, self.decoder.values_left());
let mut values_read = 0;
let mut buf = [ByteArray::new()];
while values_read < num_values {
let num_read = self.decoder.get(&mut buf)?;
debug_assert_eq!(num_read, 1);

read_bytes(buf[0].data(), 1);

values_read += 1;
}
Ok(values_read)
}
}

use arrow::datatypes::ArrowPrimitiveType;

pub struct PrimitiveArrayConverter<T: ArrowPrimitiveType> {
Expand Down Expand Up @@ -1163,9 +1201,16 @@ impl ArrayConverter for StringArrayConverter {
#[cfg(test)]
mod tests {
use super::*;
use crate::arrow::{ArrowReader, ParquetFileArrowReader};
use crate::basic::ConvertedType;
use crate::column::page::Page;
use crate::column::writer::ColumnWriter;
use crate::data_type::ByteArray;
use crate::data_type::ByteArrayType;
use crate::file::properties::WriterProperties;
use crate::file::reader::SerializedFileReader;
use crate::file::serialized_reader::SliceableCursor;
use crate::file::writer::{FileWriter, SerializedFileWriter, TryClone};
use crate::schema::parser::parse_message_type;
use crate::schema::types::SchemaDescriptor;
use crate::util::test_common::page_util::{
Expand All @@ -1177,7 +1222,8 @@ mod tests {
use arrow::array::{PrimitiveArray, StringArray};
use arrow::datatypes::Int32Type as ArrowInt32;
use rand::{distributions::uniform::SampleUniform, thread_rng, Rng};
use std::sync::Arc;
use std::io::{Cursor, Seek, SeekFrom, Write};
use std::sync::{Arc, Mutex};

/// Iterator for testing reading empty columns
struct EmptyPageIterator {
Expand Down Expand Up @@ -1559,4 +1605,120 @@ mod tests {
array_reader.get_rep_levels()
);
}

/// Allows to write parquet into memory. Intended only for use in tests.
#[derive(Clone)]
struct VecWriter {
data: Arc<Mutex<Cursor<Vec<u8>>>>,
}

impl VecWriter {
pub fn new() -> VecWriter {
VecWriter {
data: Arc::new(Mutex::new(Cursor::new(Vec::new()))),
}
}

pub fn consume(self) -> Vec<u8> {
Arc::try_unwrap(self.data)
.unwrap()
.into_inner()
.unwrap()
.into_inner()
}
}

impl TryClone for VecWriter {
fn try_clone(&self) -> std::io::Result<Self> {
Ok(self.clone())
}
}

impl Seek for VecWriter {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
self.data.lock().unwrap().seek(pos)
}

fn stream_position(&mut self) -> std::io::Result<u64> {
self.data.lock().unwrap().stream_position()
}
}

impl Write for VecWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.data.lock().unwrap().write(buf)
}

fn flush(&mut self) -> std::io::Result<()> {
self.data.lock().unwrap().flush()
}
}

#[test]
fn test_string_delta_byte_array() {
use crate::basic;
use crate::schema::types::Type;

let data = VecWriter::new();
let schema = Arc::new(
Type::group_type_builder("string_test")
.with_fields(&mut vec![Arc::new(
Type::primitive_type_builder("c", basic::Type::BYTE_ARRAY)
.with_converted_type(ConvertedType::UTF8)
.build()
.unwrap(),
)])
.build()
.unwrap(),
);
// Disable dictionary and use the fallback encoding.
let p = Arc::new(
WriterProperties::builder()
.set_dictionary_enabled(false)
.set_encoding(Encoding::DELTA_BYTE_ARRAY)
.build(),
);
// Write a few strings.
let mut w = SerializedFileWriter::new(data.clone(), schema, p).unwrap();
let mut rg = w.next_row_group().unwrap();
let mut c = rg.next_column().unwrap().unwrap();
match &mut c {
ColumnWriter::ByteArrayColumnWriter(c) => {
c.write_batch(
&[ByteArray::from("foo"), ByteArray::from("bar")],
Some(&[0, 1, 0, 0, 1, 0]),
Some(&[0, 0, 0, 0, 0, 0]),
)
.unwrap();
}
_ => panic!("unexpected column"),
};
rg.close_column(c).unwrap();
w.close_row_group(rg).unwrap();
w.close().unwrap();
std::mem::drop(w);

// Check we can read them back.
let r = SerializedFileReader::new(SliceableCursor::new(Arc::new(data.consume())))
.unwrap();
let mut r = ParquetFileArrowReader::new(Arc::new(r));

let batch = r
.get_record_reader_by_columns([0], 1024)
.unwrap()
.next()
.unwrap()
.unwrap();
assert_eq!(batch.columns().len(), 1);

let strings = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(
strings.into_iter().collect::<Vec<_>>(),
vec![None, Some("foo"), None, None, Some("bar"), None]
);
}
}

0 comments on commit 5a12d97

Please sign in to comment.