Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to read and write Parquet's delta-bitpacked (integer en…
Browse files Browse the repository at this point in the history
…coding) (#1226)
  • Loading branch information
jorgecarleitao authored Aug 19, 2022
1 parent f75ce42 commit cfb0d58
Show file tree
Hide file tree
Showing 27 changed files with 665 additions and 171 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ futures = { version = "0.3", optional = true }
async-stream = { version = "0.3.2", optional = true }

# parquet support
parquet2 = { version = "0.15.0", optional = true, default_features = false, features = ["async"] }
parquet2 = { version = "0.16", optional = true, default_features = false, features = ["async"] }

# avro support
avro-schema = { version = "0.3", optional = true }
Expand Down
26 changes: 18 additions & 8 deletions arrow-parquet-integration-testing/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ def get_file_path(file: str):


def _prepare(
file: str, version: str, compression: str, encoding_utf8: str, projection=None
file: str,
version: str,
compression: str,
encoding_utf8: str,
encoding_int: str,
projection=None,
):
write = f"{file}.parquet"

Expand All @@ -26,6 +31,8 @@ def _prepare(
version,
"--encoding-utf8",
encoding_utf8,
"--encoding-int",
encoding_int,
"--compression",
compression,
]
Expand All @@ -38,7 +45,7 @@ def _prepare(
return write


def _expected(file: str):
def _expected(file: str) -> pyarrow.Table:
return pyarrow.ipc.RecordBatchFileReader(get_file_path(file)).read_all()


Expand Down Expand Up @@ -75,16 +82,19 @@ def variations():
# "generated_custom_metadata",
]:
# pyarrow does not support decoding "delta"-encoded values.
# for encoding in ["plain", "delta"]:
for encoding in ["plain"]:
for encoding_int in ["plain", "delta"]:
if encoding_int == "delta" and file in {"generated_primitive", "generated_null"}:
# see https://issues.apache.org/jira/browse/ARROW-17465
continue

for compression in ["uncompressed", "zstd", "snappy"]:
yield (version, file, compression, encoding)
yield (version, file, compression, "plain", encoding_int)


if __name__ == "__main__":
for (version, file, compression, encoding_utf8) in variations():
for (version, file, compression, encoding_utf8, encoding_int) in variations():
expected = _expected(file)
path = _prepare(file, version, compression, encoding_utf8)
path = _prepare(file, version, compression, encoding_utf8, encoding_int)

table = pq.read_table(path)
os.remove(path)
Expand All @@ -95,4 +105,4 @@ def variations():
if str(c1.type) in ["month_interval", "day_time_interval"]:
# pyarrow does not support interval types from parquet
continue
assert c1 == c2
assert c1 == c2, (c1, c2)
56 changes: 34 additions & 22 deletions arrow-parquet-integration-testing/main_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@
from main import _prepare, _expected


def test(file: str, version: str, column, compression: str, encoding: str):
def test(
file: str,
version: str,
column: str,
compression: str,
encoding: str,
):
"""
Tests that pyspark can read a parquet file written by arrow2.
Expand All @@ -16,13 +22,13 @@ def test(file: str, version: str, column, compression: str, encoding: str):
In pyspark: read (written) parquet to Python
assert that they are equal
"""
# write parquet
path = _prepare(file, version, compression, encoding, [column[1]])

# read IPC to Python
expected = _expected(file)
expected = next(c for i, c in enumerate(expected) if i == column[1])
expected = expected.combine_chunks().tolist()
column_index = next(i for i, c in enumerate(expected.column_names) if c == column)
expected = expected[column].combine_chunks().tolist()

# write parquet
path = _prepare(file, version, compression, encoding, encoding, [column_index])

# read parquet to Python
spark = pyspark.sql.SparkSession.builder.config(
Expand All @@ -31,28 +37,34 @@ def test(file: str, version: str, column, compression: str, encoding: str):
"false",
).getOrCreate()

result = spark.read.parquet(path).select(column[0]).collect()
result = [r[column[0]] for r in result]
result = spark.read.parquet(path).select(column).collect()
result = [r[column] for r in result]
os.remove(path)

# assert equality
assert expected == result


test("generated_primitive", "2", ("utf8_nullable", 24), "uncompressed", "delta")
test("generated_primitive", "2", ("utf8_nullable", 24), "snappy", "delta")
test("generated_null", "2", "f1", "uncompressed", "delta")

test("generated_primitive", "2", "utf8_nullable", "uncompressed", "delta")
test("generated_primitive", "2", "utf8_nullable", "snappy", "delta")
test("generated_primitive", "2", "int32_nullable", "uncompressed", "delta")
test("generated_primitive", "2", "int32_nullable", "snappy", "delta")
test("generated_primitive", "2", "int16_nullable", "uncompressed", "delta")
test("generated_primitive", "2", "int16_nullable", "snappy", "delta")

test("generated_dictionary", "1", ("dict0", 0), "uncompressed", "plain")
test("generated_dictionary", "1", ("dict0", 0), "snappy", "plain")
test("generated_dictionary", "2", ("dict0", 0), "uncompressed", "plain")
test("generated_dictionary", "2", ("dict0", 0), "snappy", "plain")
test("generated_dictionary", "1", "dict0", "uncompressed", "plain")
test("generated_dictionary", "1", "dict0", "snappy", "plain")
test("generated_dictionary", "2", "dict0", "uncompressed", "plain")
test("generated_dictionary", "2", "dict0", "snappy", "plain")

test("generated_dictionary", "1", ("dict1", 1), "uncompressed", "plain")
test("generated_dictionary", "1", ("dict1", 1), "snappy", "plain")
test("generated_dictionary", "2", ("dict1", 1), "uncompressed", "plain")
test("generated_dictionary", "2", ("dict1", 1), "snappy", "plain")
test("generated_dictionary", "1", "dict1", "uncompressed", "plain")
test("generated_dictionary", "1", "dict1", "snappy", "plain")
test("generated_dictionary", "2", "dict1", "uncompressed", "plain")
test("generated_dictionary", "2", "dict1", "snappy", "plain")

test("generated_dictionary", "1", ("dict2", 2), "uncompressed", "plain")
test("generated_dictionary", "1", ("dict2", 2), "snappy", "plain")
test("generated_dictionary", "2", ("dict2", 2), "uncompressed", "plain")
test("generated_dictionary", "2", ("dict2", 2), "snappy", "plain")
test("generated_dictionary", "1", "dict2", "uncompressed", "plain")
test("generated_dictionary", "1", "dict2", "snappy", "plain")
test("generated_dictionary", "2", "dict2", "uncompressed", "plain")
test("generated_dictionary", "2", "dict2", "snappy", "plain")
13 changes: 11 additions & 2 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::fs::File;
use std::{io::Read};
use std::io::Read;

use arrow2::array::Array;
use arrow2::io::ipc::IpcField;
use arrow2::{
AHashMap,
chunk::Chunk,
datatypes::{DataType, Schema},
error::Result,
Expand All @@ -16,6 +15,7 @@ use arrow2::{
RowGroupIterator, Version as ParquetVersion, WriteOptions,
},
},
AHashMap,
};
use clap::Parser;
use flate2::read::GzDecoder;
Expand Down Expand Up @@ -110,6 +110,8 @@ struct Args {
projection: Option<String>,
#[clap(short, long, arg_enum, help = "encoding scheme for utf8", default_value_t = EncodingScheme::Plain)]
encoding_utf8: EncodingScheme,
#[clap(short('i'), long, arg_enum, help = "encoding scheme for int", default_value_t = EncodingScheme::Plain)]
encoding_int: EncodingScheme,
#[clap(short, long, arg_enum)]
compression: Compression,
}
Expand Down Expand Up @@ -178,6 +180,13 @@ fn main() -> Result<()> {
.map(|f| {
transverse(&f.data_type, |dt| match dt {
DataType::Dictionary(..) => Encoding::RleDictionary,
DataType::Int32 => {
if args.encoding_int == EncodingScheme::Delta {
Encoding::DeltaBinaryPacked
} else {
Encoding::Plain
}
}
DataType::Utf8 | DataType::LargeUtf8 => {
if args.encoding_utf8 == EncodingScheme::Delta {
Encoding::DeltaLengthByteArray
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ impl From<parquet2::error::Error> for Error {

impl From<Error> for parquet2::error::Error {
fn from(error: Error) -> Self {
parquet2::error::Error::General(error.to_string())
parquet2::error::Error::OutOfSpec(error.to_string())
}
}
37 changes: 24 additions & 13 deletions src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
bitmap::{Bitmap, MutableBitmap},
buffer::Buffer,
datatypes::DataType,
error::Result,
error::{Error, Result},
};

use super::super::utils::{
Expand Down Expand Up @@ -51,13 +51,13 @@ impl<'a> Delta<'a> {
pub fn try_new(page: &'a DataPage) -> Result<Self> {
let (_, _, values) = split_buffer(page)?;

let mut lengths_iter = delta_length_byte_array::Decoder::new(values);
let mut lengths_iter = delta_length_byte_array::Decoder::try_new(values)?;

#[allow(clippy::needless_collect)] // we need to consume it to get the values
let lengths = lengths_iter
.by_ref()
.map(|x| x as usize)
.collect::<Vec<_>>();
.map(|x| x.map(|x| x as usize).map_err(Error::from))
.collect::<Result<Vec<_>>>()?;

let values = lengths_iter.into_values();
Ok(Self {
Expand Down Expand Up @@ -405,20 +405,26 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
}
State::OptionalDictionary(page_validity, page_values) => {
let page_dict = &page_values.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();
utils::extend_from_decoder(
validity,
page_validity,
Some(additional),
values,
&mut page_values.values.by_ref().map(op),
&mut page_values
.values
.by_ref()
.map(|index| page_dict[index.unwrap() as usize].as_ref()),
)
}
State::RequiredDictionary(page) => {
let page_dict = &page.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();

for x in page.values.by_ref().map(op).take(additional) {
for x in page
.values
.by_ref()
.map(|index| page_dict[index.unwrap() as usize].as_ref())
.take(additional)
{
values.push(x)
}
}
Expand All @@ -442,21 +448,26 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
}
State::FilteredRequiredDictionary(page) => {
let page_dict = &page.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();

for x in page.values.by_ref().map(op).take(additional) {
for x in page
.values
.by_ref()
.map(|index| page_dict[index.unwrap() as usize].as_ref())
.take(additional)
{
values.push(x)
}
}
State::FilteredOptionalDictionary(page_validity, page_values) => {
let page_dict = &page_values.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();
utils::extend_from_decoder(
validity,
page_validity,
Some(additional),
values,
&mut page_values.values.by_ref().map(op),
&mut page_values
.values
.by_ref()
.map(|index| page_dict[index.unwrap() as usize].as_ref()),
)
}
}
Expand Down
17 changes: 12 additions & 5 deletions src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
)
}

fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) {
fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) -> Result<()> {
let (values, validity) = decoded;
match state {
State::Optional(page) => {
Expand All @@ -104,18 +104,25 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
}
State::RequiredDictionary(page) => {
let dict_values = &page.dict;
let op = move |index: u32| dict_values[index as usize].as_ref();
let item = page.values.next().map(op).unwrap_or_default();
let item = page
.values
.next()
.map(|index| dict_values[index.unwrap() as usize].as_ref())
.unwrap_or_default();
values.push(item);
}
State::OptionalDictionary(page) => {
let dict_values = &page.dict;
let op = move |index: u32| dict_values[index as usize].as_ref();
let item = page.values.next().map(op).unwrap_or_default();
let item = page
.values
.next()
.map(|index| dict_values[index.unwrap() as usize].as_ref())
.unwrap_or_default();
values.push(item);
validity.push(true);
}
}
Ok(())
}

fn push_null(&self, decoded: &mut Self::DecodedState) {
Expand Down
3 changes: 2 additions & 1 deletion src/io/parquet/read/deserialize/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<'a> NestedDecoder<'a> for BooleanDecoder {
)
}

fn push_valid(&self, state: &mut State, decoded: &mut Self::DecodedState) {
fn push_valid(&self, state: &mut State, decoded: &mut Self::DecodedState) -> Result<()> {
let (values, validity) = decoded;
match state {
State::Optional(page_values) => {
Expand All @@ -95,6 +95,7 @@ impl<'a> NestedDecoder<'a> for BooleanDecoder {
values.push(value);
}
}
Ok(())
}

fn push_null(&self, decoded: &mut Self::DecodedState) {
Expand Down
12 changes: 8 additions & 4 deletions src/io/parquet/read/deserialize/dictionary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ where
Some(remaining),
values,
&mut page.values.by_ref().map(|x| {
let x: usize = x.try_into().unwrap();
// todo: rm unwrap
let x: usize = x.unwrap().try_into().unwrap();
match x.try_into() {
Ok(key) => key,
// todo: convert this to an error.
Expand All @@ -176,7 +177,8 @@ where
page.values
.by_ref()
.map(|x| {
let x: usize = x.try_into().unwrap();
// todo: rm unwrap
let x: usize = x.unwrap().try_into().unwrap();
let x: K = match x.try_into() {
Ok(key) => key,
// todo: convert this to an error.
Expand All @@ -195,7 +197,8 @@ where
Some(remaining),
values,
&mut page_values.by_ref().map(|x| {
let x: usize = x.try_into().unwrap();
// todo: rm unwrap
let x: usize = x.unwrap().try_into().unwrap();
let x: K = match x.try_into() {
Ok(key) => key,
// todo: convert this to an error.
Expand All @@ -211,7 +214,8 @@ where
page.values
.by_ref()
.map(|x| {
let x: usize = x.try_into().unwrap();
// todo: rm unwrap
let x: usize = x.unwrap().try_into().unwrap();
let x: K = match x.try_into() {
Ok(key) => key,
// todo: convert this to an error.
Expand Down
Loading

0 comments on commit cfb0d58

Please sign in to comment.