From b75064dc15260f369f852ea05cdd5b9953aa5295 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Fri, 15 Oct 2021 16:40:34 +0000 Subject: [PATCH 1/2] Added benches. --- benches/read_parquet.rs | 53 +++++++++++++++++-------- parquet_integration/write_parquet.py | 58 ++++++++++++++++++++++------ 2 files changed, 83 insertions(+), 28 deletions(-) diff --git a/benches/read_parquet.rs b/benches/read_parquet.rs index 55ce25fc96d..3e8584e0f6e 100644 --- a/benches/read_parquet.rs +++ b/benches/read_parquet.rs @@ -6,9 +6,18 @@ use criterion::{criterion_group, criterion_main, Criterion}; use arrow2::error::Result; use arrow2::io::parquet::read; -fn to_buffer(size: usize) -> Vec { +fn to_buffer(size: usize, dict: bool, multi_page: bool, compressed: bool) -> Vec { let dir = env!("CARGO_MANIFEST_DIR"); - let path = PathBuf::from(dir).join(format!("fixtures/pyarrow3/v1/benches_{}.parquet", size)); + + let dict = if dict { "dict/" } else { "" }; + let multi_page = if multi_page { "multi/" } else { "" }; + let compressed = if compressed { "snappy/" } else { "" }; + + let path = PathBuf::from(dir).join(format!( + "fixtures/pyarrow3/v1/{}{}{}benches_{}.parquet", + dict, multi_page, compressed, size + )); + let metadata = fs::metadata(&path).expect("unable to read metadata"); let mut file = fs::File::open(path).unwrap(); let mut buffer = vec![0; metadata.len() as usize]; @@ -16,7 +25,7 @@ fn to_buffer(size: usize) -> Vec { buffer } -fn read_decompressed_pages(buffer: &[u8], size: usize, column: usize) -> Result<()> { +fn read_batch(buffer: &[u8], size: usize, column: usize) -> Result<()> { let file = Cursor::new(buffer); let reader = read::RecordReader::try_new(file, Some(vec![column]), None, None, None)?; @@ -31,26 +40,38 @@ fn read_decompressed_pages(buffer: &[u8], size: usize, column: usize) -> Result< fn add_benchmark(c: &mut Criterion) { (10..=20).step_by(2).for_each(|i| { let size = 2usize.pow(i); - let buffer = to_buffer(size); + let buffer = to_buffer(size, false, false, false); let a = format!("read i64 2^{}", i); - c.bench_function(&a, |b| { - b.iter(|| read_decompressed_pages(&buffer, size * 8, 0).unwrap()) - }); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 0).unwrap())); let a = format!("read utf8 2^{}", i); - c.bench_function(&a, |b| { - b.iter(|| read_decompressed_pages(&buffer, size * 8, 2).unwrap()) - }); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 2).unwrap())); let a = format!("read utf8 large 2^{}", i); - c.bench_function(&a, |b| { - b.iter(|| read_decompressed_pages(&buffer, size * 8, 6).unwrap()) - }); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 6).unwrap())); let a = format!("read bool 2^{}", i); - c.bench_function(&a, |b| { - b.iter(|| read_decompressed_pages(&buffer, size * 8, 3).unwrap()) - }); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 3).unwrap())); + + let buffer = to_buffer(size, true, false, false); + let a = format!("read utf8 dict 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 2).unwrap())); + + let buffer = to_buffer(size, false, false, true); + let a = format!("read i64 snappy 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 0).unwrap())); + + let buffer = to_buffer(size, false, true, false); + let a = format!("read utf8 multi 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 2).unwrap())); + + let buffer = to_buffer(size, false, true, true); + let a = format!("read utf8 multi snappy 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 2).unwrap())); + + let buffer = to_buffer(size, false, true, true); + let a = format!("read i64 multi snappy 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 0).unwrap())); }); } diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index 0d9e556216d..165d916939f 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -11,7 +11,9 @@ def case_basic_nullable(size=1): float64 = [0.0, 1.0, None, 3.0, None, 5.0, 6.0, 7.0, None, 9.0] string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"] boolean = [True, None, False, False, None, True, None, None, True, True] - string_large = ["ABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDšŸ˜ƒšŸŒššŸ•³šŸ‘Š"] * 10 + string_large = [ + "ABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDšŸ˜ƒšŸŒššŸ•³šŸ‘Š" + ] * 10 decimal = [Decimal(e) if e is not None else None for e in int64] fields = [ @@ -23,9 +25,9 @@ def case_basic_nullable(size=1): pa.field("uint32", pa.uint32()), pa.field("string_large", pa.utf8()), # decimal testing - pa.field("decimal_9", pa.decimal128(9,0)), - pa.field("decimal_18", pa.decimal128(18,0)), - pa.field("decimal_26", pa.decimal128(26,0)), + pa.field("decimal_9", pa.decimal128(9, 0)), + pa.field("decimal_18", pa.decimal128(18, 0)), + pa.field("decimal_26", pa.decimal128(26, 0)), ] schema = pa.schema(fields) @@ -67,9 +69,9 @@ def case_basic_required(size=1): nullable=False, ), pa.field("uint32", pa.uint32(), nullable=False), - pa.field("decimal_9", pa.decimal128(9,0), nullable=False), - pa.field("decimal_18", pa.decimal128(18,0), nullable=False), - pa.field("decimal_26", pa.decimal128(26,0), nullable=False), + pa.field("decimal_9", pa.decimal128(9, 0), nullable=False), + pa.field("decimal_18", pa.decimal128(18, 0), nullable=False), + pa.field("decimal_26", pa.decimal128(26, 0), nullable=False), ] schema = pa.schema(fields) @@ -156,13 +158,36 @@ def case_nested(size): ) -def write_pyarrow(case, size=1, page_version=1, use_dictionary=False): +def write_pyarrow( + case, + size: int, + page_version: int, + use_dictionary: bool, + multiple_pages: bool, + compression: bool, +): data, schema, path = case(size) base_path = f"{PYARROW_PATH}/v{page_version}" if use_dictionary: base_path = f"{base_path}/dict" + if multiple_pages: + base_path = f"{base_path}/multi" + + if compression: + base_path = f"{base_path}/snappy" + + if compression: + compression = "snappy" + else: + compression = None + + if multiple_pages: + data_page_size = 2 ** 10 # i.e. a small number to ensure multiple pages + else: + data_page_size = 2 ** 40 # i.e. a large number to ensure a single page + t = pa.table(data, schema=schema) os.makedirs(base_path, exist_ok=True) pa.parquet.write_table( @@ -170,9 +195,9 @@ def write_pyarrow(case, size=1, page_version=1, use_dictionary=False): f"{base_path}/{path}", row_group_size=2 ** 40, use_dictionary=use_dictionary, - compression=None, + compression=compression, write_statistics=True, - data_page_size=2 ** 40, # i.e. a large number to ensure a single page + data_page_size=data_page_size, data_page_version=f"{page_version}.0", ) @@ -180,7 +205,7 @@ def write_pyarrow(case, size=1, page_version=1, use_dictionary=False): for case in [case_basic_nullable, case_basic_required, case_nested]: for version in [1, 2]: for use_dict in [True, False]: - write_pyarrow(case, 1, version, use_dict) + write_pyarrow(case, 1, version, use_dict, False, False) def case_benches(size): @@ -194,4 +219,13 @@ def case_benches(size): # for read benchmarks for i in range(3 + 10, 3 + 22, 2): - write_pyarrow(case_benches, 2 ** i, 1) # V1 + # two pages (dict) + write_pyarrow(case_benches, 2 ** i, 1, True, False, False) + # single page + write_pyarrow(case_benches, 2 ** i, 1, False, False, False) + # multiple pages + write_pyarrow(case_benches, 2 ** i, 1, False, True, False) + # multiple compressed pages + write_pyarrow(case_benches, 2 ** i, 1, False, True, True) + # single compressed page + write_pyarrow(case_benches, 2 ** i, 1, False, False, True) From c08e54c63a9728181ec01e69ae3f434445b133dc Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 16 Oct 2021 07:13:06 +0000 Subject: [PATCH 2/2] Improved bench reporting. --- benches/read_parquet.rs | 18 +++++----- benchmarks/.gitignore | 1 + benchmarks/bench_read.py | 49 ++++++++++++++++++++++++++ benchmarks/run.py | 20 +++++++++++ benchmarks/summarize.py | 51 ++++++++++++++++++++++++++++ parquet_integration/bench_read.py | 26 -------------- parquet_integration/write_parquet.py | 7 ++-- 7 files changed, 133 insertions(+), 39 deletions(-) create mode 100644 benchmarks/.gitignore create mode 100644 benchmarks/bench_read.py create mode 100644 benchmarks/run.py create mode 100644 benchmarks/summarize.py delete mode 100644 parquet_integration/bench_read.py diff --git a/benches/read_parquet.rs b/benches/read_parquet.rs index 3e8584e0f6e..8f536ed6842 100644 --- a/benches/read_parquet.rs +++ b/benches/read_parquet.rs @@ -42,36 +42,36 @@ fn add_benchmark(c: &mut Criterion) { let size = 2usize.pow(i); let buffer = to_buffer(size, false, false, false); let a = format!("read i64 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 0).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); let a = format!("read utf8 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 2).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); let a = format!("read utf8 large 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 6).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 6).unwrap())); let a = format!("read bool 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 3).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 3).unwrap())); let buffer = to_buffer(size, true, false, false); let a = format!("read utf8 dict 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 2).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); let buffer = to_buffer(size, false, false, true); let a = format!("read i64 snappy 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 0).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); let buffer = to_buffer(size, false, true, false); let a = format!("read utf8 multi 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 2).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); let buffer = to_buffer(size, false, true, true); let a = format!("read utf8 multi snappy 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 2).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); let buffer = to_buffer(size, false, true, true); let a = format!("read i64 multi snappy 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 0).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); }); } diff --git a/benchmarks/.gitignore b/benchmarks/.gitignore new file mode 100644 index 00000000000..344f079e498 --- /dev/null +++ b/benchmarks/.gitignore @@ -0,0 +1 @@ +runs diff --git a/benchmarks/bench_read.py b/benchmarks/bench_read.py new file mode 100644 index 00000000000..cab190fe2cd --- /dev/null +++ b/benchmarks/bench_read.py @@ -0,0 +1,49 @@ +import timeit +import io +import os +import json + +import pyarrow.parquet + + +def _bench_single(log2_size: int, column: str, use_dict: bool) -> float: + if use_dict: + path = f"fixtures/pyarrow3/v1/dict/benches_{2**log2_size}.parquet" + else: + path = f"fixtures/pyarrow3/v1/benches_{2**log2_size}.parquet" + with open(path, "rb") as f: + data = f.read() + data = io.BytesIO(data) + + def f(): + pyarrow.parquet.read_table(data, columns=[column]) + + seconds = timeit.Timer(f).timeit(number=512) / 512 + ns = seconds * 1000 * 1000 * 1000 + return ns + + +def _report(name: str, result: float): + path = f"benchmarks/runs/{name}/new" + os.makedirs(path, exist_ok=True) + with open(f"{path}/estimates.json", "w") as f: + json.dump({"mean": {"point_estimate": result}}, f) + + +def _bench(size, ty): + column, use_dict = { + "i64": ("int64", False), + "bool": ("bool", False), + "utf8": ("string", False), + "utf8 dict": ("string", True), + }[ty] + + result = _bench_single(size, column, use_dict) + print(result) + _report(f"read {ty} 2_{size}", result) + + +for size in range(10, 22, 2): + for ty in ["i64", "bool", "utf8", "utf8 dict"]: + print(size, ty) + _bench(size, ty) diff --git a/benchmarks/run.py b/benchmarks/run.py new file mode 100644 index 00000000000..a707f23f1bd --- /dev/null +++ b/benchmarks/run.py @@ -0,0 +1,20 @@ +import subprocess + + +# run pyarrow +subprocess.call(["python", "benchmarks/bench_read.py"]) + + +for ty in ["i64", "bool", "utf8", "utf8 dict"]: + args = [ + "cargo", + "bench", + "--features", + "io_parquet,io_parquet_compression", + "--bench", + "read_parquet", + "--", + f"{ty} 2", + ] + + subprocess.call(args) diff --git a/benchmarks/summarize.py b/benchmarks/summarize.py new file mode 100644 index 00000000000..a44c0ac182f --- /dev/null +++ b/benchmarks/summarize.py @@ -0,0 +1,51 @@ +import json +import os + + +def _read_reports(engine: str): + root = { + "arrow2": "target/criterion", + "pyarrow": "benchmarks/runs", + }[engine] + + result = [] + for item in os.listdir(root): + if item == "report": + continue + + with open(os.path.join(root, item, "new", "estimates.json")) as f: + data = json.load(f) + + ms = data["mean"]["point_estimate"] / 1000 + task = item.split()[0] + type = " ".join(item.split()[1:-1]) + size = int(item.split()[-1].split("_")[1]) + result.append( + { + "engine": engine, + "task": task, + "type": type, + "size": size, + "time": ms, + } + ) + return result + + +def _print_report(result): + for ty in ["i64", "bool", "utf8", "utf8 dict"]: + print(ty) + r = filter(lambda x: x["type"] == ty, result) + r = sorted(r, key=lambda x: x["size"]) + for row in r: + print(row["time"]) + + +def print_report(): + for engine in ["arrow2", "pyarrow"]: + print(engine) + result = _read_reports(engine) + _print_report(result) + + +print_report() diff --git a/parquet_integration/bench_read.py b/parquet_integration/bench_read.py deleted file mode 100644 index f1db81addee..00000000000 --- a/parquet_integration/bench_read.py +++ /dev/null @@ -1,26 +0,0 @@ -import timeit -import io - -import pyarrow.parquet - - -def bench(log2_size: int, datatype: str): - with open(f"fixtures/pyarrow3/v1/benches_{2**log2_size}.parquet", "rb") as f: - data = f.read() - data = io.BytesIO(data) - - def f(): - pyarrow.parquet.read_table(data, columns=[datatype]) - - seconds = timeit.Timer(f).timeit(number=512) / 512 - microseconds = seconds * 1000 * 1000 - print(f"read {datatype} 2^{log2_size} time: {microseconds:.2f} us") - -#for i in range(10, 22, 2): -# bench(i, "int64") - -for i in range(10, 22, 2): - bench(i, "string") - -for i in range(10, 22, 2): - bench(i, "bool") diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index 165d916939f..d97ff1edc9d 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -210,15 +210,14 @@ def write_pyarrow( def case_benches(size): assert size % 8 == 0 - size //= 8 - data, schema, path = case_basic_nullable(1) + data, schema, _ = case_basic_nullable(1) for k in data: - data[k] = data[k][:8] * size + data[k] = data[k][:8] * (size // 8) return data, schema, f"benches_{size}.parquet" # for read benchmarks -for i in range(3 + 10, 3 + 22, 2): +for i in range(10, 22, 2): # two pages (dict) write_pyarrow(case_benches, 2 ** i, 1, True, False, False) # single page