Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Improved performance of reading utf8 required from parquet (-15%) #670

Merged
merged 2 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions benches/read_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,23 @@ use criterion::{criterion_group, criterion_main, Criterion};
use arrow2::error::Result;
use arrow2::io::parquet::read;

fn to_buffer(size: usize, dict: bool, multi_page: bool, compressed: bool) -> Vec<u8> {
fn to_buffer(
size: usize,
nullable: bool,
dict: bool,
multi_page: bool,
compressed: bool,
) -> Vec<u8> {
let dir = env!("CARGO_MANIFEST_DIR");

let dict = if dict { "dict/" } else { "" };
let multi_page = if multi_page { "multi/" } else { "" };
let compressed = if compressed { "snappy/" } else { "" };
let nullable = if nullable { "" } else { "_required" };

let path = PathBuf::from(dir).join(format!(
"fixtures/pyarrow3/v1/{}{}{}benches_{}.parquet",
dict, multi_page, compressed, size
"fixtures/pyarrow3/v1/{}{}{}benches{}_{}.parquet",
dict, multi_page, compressed, nullable, size
));

let metadata = fs::metadata(&path).expect("unable to read metadata");
Expand All @@ -40,7 +47,7 @@ fn read_batch(buffer: &[u8], size: usize, column: usize) -> Result<()> {
fn add_benchmark(c: &mut Criterion) {
(10..=20).step_by(2).for_each(|i| {
let size = 2usize.pow(i);
let buffer = to_buffer(size, false, false, false);
let buffer = to_buffer(size, true, false, false, false);
let a = format!("read i64 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));

Expand All @@ -53,25 +60,29 @@ fn add_benchmark(c: &mut Criterion) {
let a = format!("read bool 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 3).unwrap()));

let buffer = to_buffer(size, true, false, false);
let buffer = to_buffer(size, true, true, false, false);
let a = format!("read utf8 dict 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, false, false, true);
let buffer = to_buffer(size, true, false, false, true);
let a = format!("read i64 snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));

let buffer = to_buffer(size, false, true, false);
let buffer = to_buffer(size, true, false, true, false);
let a = format!("read utf8 multi 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, false, true, true);
let buffer = to_buffer(size, true, false, true, true);
let a = format!("read utf8 multi snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, false, true, true);
let buffer = to_buffer(size, true, false, true, true);
let a = format!("read i64 multi snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));

let buffer = to_buffer(size, false, false, false, false);
let a = format!("read required utf8 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));
});
}

Expand Down
10 changes: 10 additions & 0 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,22 @@ def case_benches(size):
return data, schema, f"benches_{size}.parquet"


def case_benches_required(size):
assert size % 8 == 0
data, schema, _ = case_basic_required(1)
for k in data:
data[k] = data[k][:8] * (size // 8)
return data, schema, f"benches_required_{size}.parquet"


# for read benchmarks
for i in range(10, 22, 2):
# two pages (dict)
write_pyarrow(case_benches, 2 ** i, 1, True, False, False)
# single page
write_pyarrow(case_benches, 2 ** i, 1, False, False, False)
# single page required
write_pyarrow(case_benches_required, 2 ** i, 1, False, False, False)
# multiple pages
write_pyarrow(case_benches, 2 ** i, 1, False, True, False)
# multiple compressed pages
Expand Down
6 changes: 5 additions & 1 deletion src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,19 +214,23 @@ fn read_plain_optional<O: Offset>(

pub(super) fn read_plain_required<O: Offset>(
buffer: &[u8],
_length: usize,
additional: usize,
offsets: &mut MutableBuffer<O>,
values: &mut MutableBuffer<u8>,
) {
let mut last_offset = *offsets.as_mut_slice().last().unwrap();

let values_iterator = utils::BinaryIter::new(buffer);

// each value occupies 4 bytes + len declared in 4 bytes => reserve accordingly.
values.reserve(buffer.len() - 4 * additional);
let a = values.capacity();
for value in values_iterator {
last_offset += O::from_usize(value.len()).unwrap();
values.extend_from_slice(value);
offsets.push(last_offset);
}
debug_assert_eq!(a, values.capacity());
}

pub(super) fn extend_from_page<O: Offset>(
Expand Down
6 changes: 4 additions & 2 deletions src/io/parquet/read/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use parquet2::encoding::{get_length, Encoding};
use std::convert::TryInto;

use parquet2::encoding::Encoding;
use parquet2::metadata::ColumnDescriptor;
use parquet2::page::{split_buffer as _split_buffer, DataPage, DataPageHeader};

Expand All @@ -22,7 +24,7 @@ impl<'a> Iterator for BinaryIter<'a> {
if self.values.is_empty() {
return None;
}
let length = get_length(self.values) as usize;
let length = u32::from_le_bytes(self.values[0..4].try_into().unwrap()) as usize;
self.values = &self.values[4..];
let result = &self.values[..length];
self.values = &self.values[length..];
Expand Down