Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Improved parquet read benches #533

Merged
merged 2 commits into from
Oct 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 37 additions & 16 deletions benches/read_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,26 @@ use criterion::{criterion_group, criterion_main, Criterion};
use arrow2::error::Result;
use arrow2::io::parquet::read;

fn to_buffer(size: usize) -> Vec<u8> {
fn to_buffer(size: usize, dict: bool, multi_page: bool, compressed: bool) -> Vec<u8> {
let dir = env!("CARGO_MANIFEST_DIR");
let path = PathBuf::from(dir).join(format!("fixtures/pyarrow3/v1/benches_{}.parquet", size));

let dict = if dict { "dict/" } else { "" };
let multi_page = if multi_page { "multi/" } else { "" };
let compressed = if compressed { "snappy/" } else { "" };

let path = PathBuf::from(dir).join(format!(
"fixtures/pyarrow3/v1/{}{}{}benches_{}.parquet",
dict, multi_page, compressed, size
));

let metadata = fs::metadata(&path).expect("unable to read metadata");
let mut file = fs::File::open(path).unwrap();
let mut buffer = vec![0; metadata.len() as usize];
file.read_exact(&mut buffer).expect("buffer overflow");
buffer
}

fn read_decompressed_pages(buffer: &[u8], size: usize, column: usize) -> Result<()> {
fn read_batch(buffer: &[u8], size: usize, column: usize) -> Result<()> {
let file = Cursor::new(buffer);

let reader = read::RecordReader::try_new(file, Some(vec![column]), None, None, None)?;
Expand All @@ -31,26 +40,38 @@ fn read_decompressed_pages(buffer: &[u8], size: usize, column: usize) -> Result<
fn add_benchmark(c: &mut Criterion) {
(10..=20).step_by(2).for_each(|i| {
let size = 2usize.pow(i);
let buffer = to_buffer(size);
let buffer = to_buffer(size, false, false, false);
let a = format!("read i64 2^{}", i);
c.bench_function(&a, |b| {
b.iter(|| read_decompressed_pages(&buffer, size * 8, 0).unwrap())
});
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));

let a = format!("read utf8 2^{}", i);
c.bench_function(&a, |b| {
b.iter(|| read_decompressed_pages(&buffer, size * 8, 2).unwrap())
});
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));

let a = format!("read utf8 large 2^{}", i);
c.bench_function(&a, |b| {
b.iter(|| read_decompressed_pages(&buffer, size * 8, 6).unwrap())
});
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 6).unwrap()));

let a = format!("read bool 2^{}", i);
c.bench_function(&a, |b| {
b.iter(|| read_decompressed_pages(&buffer, size * 8, 3).unwrap())
});
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 3).unwrap()));

let buffer = to_buffer(size, true, false, false);
let a = format!("read utf8 dict 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, false, false, true);
let a = format!("read i64 snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));

let buffer = to_buffer(size, false, true, false);
let a = format!("read utf8 multi 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, false, true, true);
let a = format!("read utf8 multi snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, false, true, true);
let a = format!("read i64 multi snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));
});
}

Expand Down
1 change: 1 addition & 0 deletions benchmarks/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
runs
49 changes: 49 additions & 0 deletions benchmarks/bench_read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import timeit
import io
import os
import json

import pyarrow.parquet


def _bench_single(log2_size: int, column: str, use_dict: bool) -> float:
if use_dict:
path = f"fixtures/pyarrow3/v1/dict/benches_{2**log2_size}.parquet"
else:
path = f"fixtures/pyarrow3/v1/benches_{2**log2_size}.parquet"
with open(path, "rb") as f:
data = f.read()
data = io.BytesIO(data)

def f():
pyarrow.parquet.read_table(data, columns=[column])

seconds = timeit.Timer(f).timeit(number=512) / 512
ns = seconds * 1000 * 1000 * 1000
return ns


def _report(name: str, result: float):
path = f"benchmarks/runs/{name}/new"
os.makedirs(path, exist_ok=True)
with open(f"{path}/estimates.json", "w") as f:
json.dump({"mean": {"point_estimate": result}}, f)


def _bench(size, ty):
column, use_dict = {
"i64": ("int64", False),
"bool": ("bool", False),
"utf8": ("string", False),
"utf8 dict": ("string", True),
}[ty]

result = _bench_single(size, column, use_dict)
print(result)
_report(f"read {ty} 2_{size}", result)


for size in range(10, 22, 2):
for ty in ["i64", "bool", "utf8", "utf8 dict"]:
print(size, ty)
_bench(size, ty)
20 changes: 20 additions & 0 deletions benchmarks/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import subprocess


# run pyarrow
subprocess.call(["python", "benchmarks/bench_read.py"])


for ty in ["i64", "bool", "utf8", "utf8 dict"]:
args = [
"cargo",
"bench",
"--features",
"io_parquet,io_parquet_compression",
"--bench",
"read_parquet",
"--",
f"{ty} 2",
]

subprocess.call(args)
51 changes: 51 additions & 0 deletions benchmarks/summarize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import json
import os


def _read_reports(engine: str):
root = {
"arrow2": "target/criterion",
"pyarrow": "benchmarks/runs",
}[engine]

result = []
for item in os.listdir(root):
if item == "report":
continue

with open(os.path.join(root, item, "new", "estimates.json")) as f:
data = json.load(f)

ms = data["mean"]["point_estimate"] / 1000
task = item.split()[0]
type = " ".join(item.split()[1:-1])
size = int(item.split()[-1].split("_")[1])
result.append(
{
"engine": engine,
"task": task,
"type": type,
"size": size,
"time": ms,
}
)
return result


def _print_report(result):
for ty in ["i64", "bool", "utf8", "utf8 dict"]:
print(ty)
r = filter(lambda x: x["type"] == ty, result)
r = sorted(r, key=lambda x: x["size"])
for row in r:
print(row["time"])


def print_report():
for engine in ["arrow2", "pyarrow"]:
print(engine)
result = _read_reports(engine)
_print_report(result)


print_report()
26 changes: 0 additions & 26 deletions parquet_integration/bench_read.py

This file was deleted.

65 changes: 49 additions & 16 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ def case_basic_nullable(size=1):
float64 = [0.0, 1.0, None, 3.0, None, 5.0, 6.0, 7.0, None, 9.0]
string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"]
boolean = [True, None, False, False, None, True, None, None, True, True]
string_large = ["ABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCD😃🌚🕳👊"] * 10
string_large = [
"ABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCD😃🌚🕳👊"
] * 10
decimal = [Decimal(e) if e is not None else None for e in int64]

fields = [
Expand All @@ -23,9 +25,9 @@ def case_basic_nullable(size=1):
pa.field("uint32", pa.uint32()),
pa.field("string_large", pa.utf8()),
# decimal testing
pa.field("decimal_9", pa.decimal128(9,0)),
pa.field("decimal_18", pa.decimal128(18,0)),
pa.field("decimal_26", pa.decimal128(26,0)),
pa.field("decimal_9", pa.decimal128(9, 0)),
pa.field("decimal_18", pa.decimal128(18, 0)),
pa.field("decimal_26", pa.decimal128(26, 0)),
]
schema = pa.schema(fields)

Expand Down Expand Up @@ -67,9 +69,9 @@ def case_basic_required(size=1):
nullable=False,
),
pa.field("uint32", pa.uint32(), nullable=False),
pa.field("decimal_9", pa.decimal128(9,0), nullable=False),
pa.field("decimal_18", pa.decimal128(18,0), nullable=False),
pa.field("decimal_26", pa.decimal128(26,0), nullable=False),
pa.field("decimal_9", pa.decimal128(9, 0), nullable=False),
pa.field("decimal_18", pa.decimal128(18, 0), nullable=False),
pa.field("decimal_26", pa.decimal128(26, 0), nullable=False),
]
schema = pa.schema(fields)

Expand Down Expand Up @@ -156,42 +158,73 @@ def case_nested(size):
)


def write_pyarrow(case, size=1, page_version=1, use_dictionary=False):
def write_pyarrow(
case,
size: int,
page_version: int,
use_dictionary: bool,
multiple_pages: bool,
compression: bool,
):
data, schema, path = case(size)

base_path = f"{PYARROW_PATH}/v{page_version}"
if use_dictionary:
base_path = f"{base_path}/dict"

if multiple_pages:
base_path = f"{base_path}/multi"

if compression:
base_path = f"{base_path}/snappy"

if compression:
compression = "snappy"
else:
compression = None

if multiple_pages:
data_page_size = 2 ** 10 # i.e. a small number to ensure multiple pages
else:
data_page_size = 2 ** 40 # i.e. a large number to ensure a single page

t = pa.table(data, schema=schema)
os.makedirs(base_path, exist_ok=True)
pa.parquet.write_table(
t,
f"{base_path}/{path}",
row_group_size=2 ** 40,
use_dictionary=use_dictionary,
compression=None,
compression=compression,
write_statistics=True,
data_page_size=2 ** 40, # i.e. a large number to ensure a single page
data_page_size=data_page_size,
data_page_version=f"{page_version}.0",
)


for case in [case_basic_nullable, case_basic_required, case_nested]:
for version in [1, 2]:
for use_dict in [True, False]:
write_pyarrow(case, 1, version, use_dict)
write_pyarrow(case, 1, version, use_dict, False, False)


def case_benches(size):
assert size % 8 == 0
size //= 8
data, schema, path = case_basic_nullable(1)
data, schema, _ = case_basic_nullable(1)
for k in data:
data[k] = data[k][:8] * size
data[k] = data[k][:8] * (size // 8)
return data, schema, f"benches_{size}.parquet"


# for read benchmarks
for i in range(3 + 10, 3 + 22, 2):
write_pyarrow(case_benches, 2 ** i, 1) # V1
for i in range(10, 22, 2):
# two pages (dict)
write_pyarrow(case_benches, 2 ** i, 1, True, False, False)
# single page
write_pyarrow(case_benches, 2 ** i, 1, False, False, False)
# multiple pages
write_pyarrow(case_benches, 2 ** i, 1, False, True, False)
# multiple compressed pages
write_pyarrow(case_benches, 2 ** i, 1, False, True, True)
# single compressed page
write_pyarrow(case_benches, 2 ** i, 1, False, False, True)