Skip to content

Commit

Permalink
Use bytes in parquet (#1474) (#1683)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored May 11, 2022
1 parent 19f0ada commit b9a41f3
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 545 deletions.
1 change: 1 addition & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ rust-version = "1.57"

[dependencies]
parquet-format = "4.0.0"
bytes = "1.1"
byteorder = "1"
thrift = "0.13"
snap = { version = "1.0", optional = true }
Expand Down
4 changes: 1 addition & 3 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1381,7 +1381,6 @@ mod tests {
#[test]
fn test_complex_array_reader_dict_enc_string() {
use crate::encodings::encoding::{DictEncoder, Encoder};
use crate::util::memory::MemTracker;
// Construct column schema
let message_type = "
message test_schema {
Expand Down Expand Up @@ -1412,9 +1411,8 @@ mod tests {
let mut all_values = Vec::with_capacity(num_pages * values_per_page);

for i in 0..num_pages {
let mem_tracker = Arc::new(MemTracker::new());
let mut dict_encoder =
DictEncoder::<ByteArrayType>::new(column_desc.clone(), mem_tracker);
DictEncoder::<ByteArrayType>::new(column_desc.clone());
// add data page
let mut values = Vec::with_capacity(values_per_page);

Expand Down
9 changes: 3 additions & 6 deletions parquet/src/arrow/array_reader/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::errors::Result;
use crate::schema::types::{
ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type,
};
use crate::util::memory::{ByteBufferPtr, MemTracker};
use crate::util::memory::ByteBufferPtr;

/// Returns a descriptor for a UTF-8 column
pub fn utf8_column() -> ColumnDescPtr {
Expand All @@ -49,18 +49,15 @@ pub fn utf8_column() -> ColumnDescPtr {
/// Encode `data` with the provided `encoding`
pub fn encode_byte_array(encoding: Encoding, data: &[ByteArray]) -> ByteBufferPtr {
let descriptor = utf8_column();
let mem_tracker = Arc::new(MemTracker::new());
let mut encoder =
get_encoder::<ByteArrayType>(descriptor, encoding, mem_tracker).unwrap();
let mut encoder = get_encoder::<ByteArrayType>(descriptor, encoding).unwrap();

encoder.put(data).unwrap();
encoder.flush_buffer().unwrap()
}

/// Returns the encoded dictionary and value data
pub fn encode_dictionary(data: &[ByteArray]) -> (ByteBufferPtr, ByteBufferPtr) {
let mut dict_encoder =
DictEncoder::<ByteArrayType>::new(utf8_column(), Arc::new(MemTracker::new()));
let mut dict_encoder = DictEncoder::<ByteArrayType>::new(utf8_column());

dict_encoder.put(data).unwrap();
let encoded_rle = dict_encoder.flush_buffer().unwrap();
Expand Down
16 changes: 8 additions & 8 deletions parquet/src/column/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,13 +338,13 @@ where
let mut offset = 0;

if max_rep_level > 0 {
let level_data = parse_v1_level(
let (bytes_read, level_data) = parse_v1_level(
max_rep_level,
num_values,
rep_level_encoding,
buf.start_from(offset),
)?;
offset = level_data.end();
offset += bytes_read;

let decoder =
R::new(max_rep_level, rep_level_encoding, level_data);
Expand All @@ -353,13 +353,13 @@ where
}

if max_def_level > 0 {
let level_data = parse_v1_level(
let (bytes_read, level_data) = parse_v1_level(
max_def_level,
num_values,
def_level_encoding,
buf.start_from(offset),
)?;
offset = level_data.end();
offset += bytes_read;

let decoder =
D::new(max_def_level, def_level_encoding, level_data);
Expand Down Expand Up @@ -460,20 +460,20 @@ fn parse_v1_level(
num_buffered_values: u32,
encoding: Encoding,
buf: ByteBufferPtr,
) -> Result<ByteBufferPtr> {
) -> Result<(usize, ByteBufferPtr)> {
match encoding {
Encoding::RLE => {
let i32_size = std::mem::size_of::<i32>();
let data_size = read_num_bytes!(i32, i32_size, buf.as_ref()) as usize;
Ok(buf.range(i32_size, data_size))
Ok((i32_size + data_size, buf.range(i32_size, data_size)))
}
Encoding::BIT_PACKED => {
let bit_width = crate::util::bit_util::log2(max_level as u64 + 1) as u8;
let num_bytes = ceil(
(num_buffered_values as usize * bit_width as usize) as i64,
8,
);
Ok(buf.range(0, num_bytes as usize))
) as usize;
Ok((num_bytes, buf.range(0, num_bytes)))
}
_ => Err(general_err!("invalid level encoding: {}", encoding)),
}
Expand Down
8 changes: 4 additions & 4 deletions parquet/src/column/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

//! Contains column writer API.
use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData, sync::Arc};
use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData};

use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
Expand All @@ -36,7 +36,7 @@ use crate::file::{
};
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::FromBytes;
use crate::util::memory::{ByteBufferPtr, MemTracker};
use crate::util::memory::ByteBufferPtr;

/// Column writer for a Parquet type.
pub enum ColumnWriter {
Expand Down Expand Up @@ -213,7 +213,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
let dict_encoder = if props.dictionary_enabled(descr.path())
&& has_dictionary_support(T::get_physical_type(), &props)
{
Some(DictEncoder::new(descr.clone(), Arc::new(MemTracker::new())))
Some(DictEncoder::new(descr.clone()))
} else {
None
};
Expand All @@ -227,7 +227,6 @@ impl<T: DataType> ColumnWriterImpl<T> {
props
.encoding(descr.path())
.unwrap_or_else(|| fallback_encoding(T::get_physical_type(), &props)),
Arc::new(MemTracker::new()),
)
.unwrap();

Expand Down Expand Up @@ -1135,6 +1134,7 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
#[cfg(test)]
mod tests {
use rand::distributions::uniform::SampleUniform;
use std::sync::Arc;

use crate::column::{
page::PageReader,
Expand Down
13 changes: 2 additions & 11 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
use crate::errors::{ParquetError, Result};
use crate::util::{
bit_util::{from_ne_slice, FromBytes},
memory::{ByteBuffer, ByteBufferPtr},
memory::ByteBufferPtr,
};

/// Rust representation for logical type INT96, value is backed by an array of `u32`.
Expand Down Expand Up @@ -217,14 +217,6 @@ impl From<ByteBufferPtr> for ByteArray {
}
}

impl From<ByteBuffer> for ByteArray {
fn from(mut buf: ByteBuffer) -> ByteArray {
Self {
data: Some(buf.consume()),
}
}
}

impl PartialEq for ByteArray {
fn eq(&self, other: &ByteArray) -> bool {
match (&self.data, &other.data) {
Expand Down Expand Up @@ -1322,8 +1314,7 @@ mod tests {
ByteArray::from(ByteBufferPtr::new(vec![1u8, 2u8, 3u8, 4u8, 5u8])).data(),
&[1u8, 2u8, 3u8, 4u8, 5u8]
);
let mut buf = ByteBuffer::new();
buf.set_data(vec![6u8, 7u8, 8u8, 9u8, 10u8]);
let buf = vec![6u8, 7u8, 8u8, 9u8, 10u8];
assert_eq!(ByteArray::from(buf).data(), &[6u8, 7u8, 8u8, 9u8, 10u8]);
}

Expand Down
11 changes: 3 additions & 8 deletions parquet/src/encodings/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -936,11 +936,7 @@ mod tests {
use crate::schema::types::{
ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType,
};
use crate::util::{
bit_util::set_array_bit,
memory::{BufferPtr, MemTracker},
test_common::RandGen,
};
use crate::util::{bit_util::set_array_bit, test_common::RandGen};

#[test]
fn test_get_decoders() {
Expand Down Expand Up @@ -1389,7 +1385,7 @@ mod tests {

let length = data.len();

let ptr = BufferPtr::new(data);
let ptr = ByteBufferPtr::new(data);
let mut reader = BitReader::new(ptr.clone());
assert_eq!(reader.get_vlq_int().unwrap(), 256);
assert_eq!(reader.get_vlq_int().unwrap(), 4);
Expand Down Expand Up @@ -1472,8 +1468,7 @@ mod tests {

// Encode data
let mut encoder =
get_encoder::<T>(col_descr.clone(), encoding, Arc::new(MemTracker::new()))
.expect("get encoder");
get_encoder::<T>(col_descr.clone(), encoding).expect("get encoder");

for v in &data[..] {
encoder.put(&v[..]).expect("ok to encode");
Expand Down
Loading

0 comments on commit b9a41f3

Please sign in to comment.