From 936d25598fdfb438b025566dd80f1f50d8e43661 Mon Sep 17 00:00:00 2001 From: Connor Sanders Date: Mon, 18 Aug 2025 13:45:50 -0500 Subject: [PATCH 1/2] Add benchmarks for arrow-avro writer - Introduce `avro_writer` benchmark suite in `arrow-avro/benches/avro_writer.rs`. - Test writing performance for various data types, including Boolean, Int32, Int64, Float32, Float64, Binary, TimestampMicrosecond, and Mixed schemas. - Update `Cargo.toml` to include the `avro_writer` benchmark target. --- arrow-avro/Cargo.toml | 4 + arrow-avro/benches/avro_writer.rs | 240 ++++++++++++++++++++++++++++++ 2 files changed, 244 insertions(+) create mode 100644 arrow-avro/benches/avro_writer.rs diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml index 5cdef83a2d45..dbe3fd8162bb 100644 --- a/arrow-avro/Cargo.toml +++ b/arrow-avro/Cargo.toml @@ -83,4 +83,8 @@ harness = false [[bench]] name = "decoder" +harness = false + +[[bench]] +name = "avro_writer" harness = false \ No newline at end of file diff --git a/arrow-avro/benches/avro_writer.rs b/arrow-avro/benches/avro_writer.rs new file mode 100644 index 000000000000..469ea8427341 --- /dev/null +++ b/arrow-avro/benches/avro_writer.rs @@ -0,0 +1,240 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks for `arrow‑avro` **Decoder** +//! + +extern crate arrow_avro; +extern crate criterion; +extern crate once_cell; + +use arrow_array::{ + types::{Int32Type, Int64Type, TimestampMicrosecondType}, + ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, PrimitiveArray, RecordBatch, +}; +use arrow_avro::writer::AvroWriter; +use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput}; +use once_cell::sync::Lazy; +use std::io::Cursor; +use std::sync::Arc; +use std::time::Duration; +use tempfile::tempfile; + +const SIZES: [usize; 3] = [100, 10_000, 1_000_000]; + +fn make_bool_array(n: usize) -> BooleanArray { + BooleanArray::from_iter((0..n).map(|i| Some(i % 2 == 0))) +} + +fn make_i32_array(n: usize) -> PrimitiveArray { + PrimitiveArray::::from_iter_values((0..n).map(|i| i as i32)) +} + +fn make_i64_array(n: usize) -> PrimitiveArray { + PrimitiveArray::::from_iter_values((0..n).map(|i| i as i64)) +} + +fn make_f32_array(n: usize) -> Float32Array { + Float32Array::from_iter_values((0..n).map(|i| i as f32 + 0.5678)) +} + +fn make_f64_array(n: usize) -> Float64Array { + Float64Array::from_iter_values((0..n).map(|i| i as f64 + 0.1234)) +} + +fn make_binary_array(n: usize) -> BinaryArray { + let payloads: Vec> = (0..n).map(|i| vec![(i & 0xFF) as u8; 16]).collect(); + let views: Vec<&[u8]> = payloads.iter().map(|v| v.as_slice()).collect(); + BinaryArray::from_vec(views) +} + +fn make_ts_micros_array(n: usize) -> PrimitiveArray { + let base: i64 = 1_600_000_000_000_000; + PrimitiveArray::::from_iter_values((0..n).map(|i| base + i as i64)) +} + +fn schema_single(name: &str, dt: DataType) -> Arc { + Arc::new(Schema::new(vec![Field::new(name, dt, false)])) +} + +fn schema_mixed() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("f1", DataType::Int32, false), + Field::new("f2", DataType::Int64, false), + Field::new("f3", DataType::Binary, false), + Field::new("f4", DataType::Float64, false), + ])) +} + +static BOOLEAN_DATA: Lazy> = Lazy::new(|| { + let schema = schema_single("field1", DataType::Boolean); + SIZES + .iter() + .map(|&n| { + let col: ArrayRef = Arc::new(make_bool_array(n)); + RecordBatch::try_new(schema.clone(), vec![col]).unwrap() + }) + .collect() +}); + +static INT32_DATA: Lazy> = Lazy::new(|| { + let schema = schema_single("field1", DataType::Int32); + SIZES + .iter() + .map(|&n| { + let col: ArrayRef = Arc::new(make_i32_array(n)); + RecordBatch::try_new(schema.clone(), vec![col]).unwrap() + }) + .collect() +}); + +static INT64_DATA: Lazy> = Lazy::new(|| { + let schema = schema_single("field1", DataType::Int64); + SIZES + .iter() + .map(|&n| { + let col: ArrayRef = Arc::new(make_i64_array(n)); + RecordBatch::try_new(schema.clone(), vec![col]).unwrap() + }) + .collect() +}); + +static FLOAT32_DATA: Lazy> = Lazy::new(|| { + let schema = schema_single("field1", DataType::Float32); + SIZES + .iter() + .map(|&n| { + let col: ArrayRef = Arc::new(make_f32_array(n)); + RecordBatch::try_new(schema.clone(), vec![col]).unwrap() + }) + .collect() +}); + +static FLOAT64_DATA: Lazy> = Lazy::new(|| { + let schema = schema_single("field1", DataType::Float64); + SIZES + .iter() + .map(|&n| { + let col: ArrayRef = Arc::new(make_f64_array(n)); + RecordBatch::try_new(schema.clone(), vec![col]).unwrap() + }) + .collect() +}); + +static BINARY_DATA: Lazy> = Lazy::new(|| { + let schema = schema_single("field1", DataType::Binary); + SIZES + .iter() + .map(|&n| { + let col: ArrayRef = Arc::new(make_binary_array(n)); + RecordBatch::try_new(schema.clone(), vec![col]).unwrap() + }) + .collect() +}); + +static TIMESTAMP_US_DATA: Lazy> = Lazy::new(|| { + let schema = schema_single("field1", DataType::Timestamp(TimeUnit::Microsecond, None)); + SIZES + .iter() + .map(|&n| { + let col: ArrayRef = Arc::new(make_ts_micros_array(n)); + RecordBatch::try_new(schema.clone(), vec![col]).unwrap() + }) + .collect() +}); + +static MIXED_DATA: Lazy> = Lazy::new(|| { + let schema = schema_mixed(); + SIZES + .iter() + .map(|&n| { + let f1: ArrayRef = Arc::new(make_i32_array(n)); + let f2: ArrayRef = Arc::new(make_i64_array(n)); + let f3: ArrayRef = Arc::new(make_binary_array(n)); + let f4: ArrayRef = Arc::new(make_f64_array(n)); + RecordBatch::try_new(schema.clone(), vec![f1, f2, f3, f4]).unwrap() + }) + .collect() +}); + +fn ocf_size_for_batch(batch: &RecordBatch) -> usize { + let schema_owned: Schema = (*batch.schema()).clone(); + let cursor = Cursor::new(Vec::::with_capacity(1024)); + let mut writer = AvroWriter::new(cursor, schema_owned).expect("create writer"); + writer.write(batch).expect("write batch"); + writer.finish().expect("finish writer"); + let inner = writer.into_inner(); + inner.into_inner().len() +} + +fn bench_writer_scenario(c: &mut Criterion, name: &str, data_sets: &[RecordBatch]) { + let mut group = c.benchmark_group(name); + let schema_owned: Schema = (*data_sets[0].schema()).clone(); + for (idx, &rows) in SIZES.iter().enumerate() { + let batch = &data_sets[idx]; + let bytes = ocf_size_for_batch(batch); + group.throughput(Throughput::Bytes(bytes as u64)); + match rows { + 10_000 => { + group + .sample_size(25) + .measurement_time(Duration::from_secs(10)) + .warm_up_time(Duration::from_secs(3)); + } + 1_000_000 => { + group + .sample_size(10) + .measurement_time(Duration::from_secs(10)) + .warm_up_time(Duration::from_secs(3)); + } + _ => {} + } + group.bench_function(BenchmarkId::from_parameter(rows), |b| { + b.iter_batched_ref( + || { + let file = tempfile().expect("create temp file"); + AvroWriter::new(file, schema_owned.clone()).expect("create writer") + }, + |writer| { + writer.write(batch).unwrap(); + writer.finish().unwrap(); + }, + BatchSize::SmallInput, + ) + }); + } + group.finish(); +} + +fn criterion_benches(c: &mut Criterion) { + bench_writer_scenario(c, "write-Boolean", &BOOLEAN_DATA); + bench_writer_scenario(c, "write-Int32", &INT32_DATA); + bench_writer_scenario(c, "write-Int64", &INT64_DATA); + bench_writer_scenario(c, "write-Float32", &FLOAT32_DATA); + bench_writer_scenario(c, "write-Float64", &FLOAT64_DATA); + bench_writer_scenario(c, "write-Binary(Bytes)", &BINARY_DATA); + bench_writer_scenario(c, "write-TimestampMicros", &TIMESTAMP_US_DATA); + bench_writer_scenario(c, "write-Mixed", &MIXED_DATA); +} + +criterion_group! { + name = avro_writer; + config = Criterion::default().configure_from_args(); + targets = criterion_benches +} +criterion_main!(avro_writer); From 1c74ce16bf4ca4849cb25a35e33453f20649329e Mon Sep 17 00:00:00 2001 From: Connor Sanders Date: Fri, 22 Aug 2025 13:46:45 -0500 Subject: [PATCH 2/2] Address PR Comments --- arrow-avro/benches/avro_writer.rs | 128 +++++++++++++++++++++++++----- 1 file changed, 106 insertions(+), 22 deletions(-) diff --git a/arrow-avro/benches/avro_writer.rs b/arrow-avro/benches/avro_writer.rs index 469ea8427341..7143dff909c8 100644 --- a/arrow-avro/benches/avro_writer.rs +++ b/arrow-avro/benches/avro_writer.rs @@ -30,48 +30,126 @@ use arrow_avro::writer::AvroWriter; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput}; use once_cell::sync::Lazy; +use rand::{ + distr::uniform::{SampleRange, SampleUniform}, + rngs::StdRng, + Rng, SeedableRng, +}; use std::io::Cursor; use std::sync::Arc; use std::time::Duration; use tempfile::tempfile; -const SIZES: [usize; 3] = [100, 10_000, 1_000_000]; +const SIZES: [usize; 4] = [4_096, 8_192, 100_000, 1_000_000]; +const BASE_SEED: u64 = 0x5EED_1234_ABCD_EF01; +const MIX_CONST_1: u64 = 0x9E37_79B1_85EB_CA87; +const MIX_CONST_2: u64 = 0xC2B2_AE3D_27D4_EB4F; -fn make_bool_array(n: usize) -> BooleanArray { - BooleanArray::from_iter((0..n).map(|i| Some(i % 2 == 0))) +#[inline] +fn rng_for(tag: u64, n: usize) -> StdRng { + let seed = BASE_SEED ^ tag.wrapping_mul(MIX_CONST_1) ^ (n as u64).wrapping_mul(MIX_CONST_2); + StdRng::seed_from_u64(seed) } -fn make_i32_array(n: usize) -> PrimitiveArray { - PrimitiveArray::::from_iter_values((0..n).map(|i| i as i32)) +#[inline] +fn sample_in(rng: &mut StdRng, range: Rg) -> T +where + T: SampleUniform, + Rg: SampleRange, +{ + rng.random_range(range) } -fn make_i64_array(n: usize) -> PrimitiveArray { - PrimitiveArray::::from_iter_values((0..n).map(|i| i as i64)) +#[inline] +fn make_bool_array_with_tag(n: usize, tag: u64) -> BooleanArray { + let mut rng = rng_for(tag, n); + let values = (0..n).map(|_| rng.random_bool(0.5)); + BooleanArray::from_iter(values.map(Some)) } -fn make_f32_array(n: usize) -> Float32Array { - Float32Array::from_iter_values((0..n).map(|i| i as f32 + 0.5678)) +#[inline] +fn make_i32_array_with_tag(n: usize, tag: u64) -> PrimitiveArray { + let mut rng = rng_for(tag, n); + let values = (0..n).map(|_| rng.random::()); + PrimitiveArray::::from_iter_values(values) } -fn make_f64_array(n: usize) -> Float64Array { - Float64Array::from_iter_values((0..n).map(|i| i as f64 + 0.1234)) +#[inline] +fn make_i64_array_with_tag(n: usize, tag: u64) -> PrimitiveArray { + let mut rng = rng_for(tag, n); + let values = (0..n).map(|_| rng.random::()); + PrimitiveArray::::from_iter_values(values) } -fn make_binary_array(n: usize) -> BinaryArray { - let payloads: Vec> = (0..n).map(|i| vec![(i & 0xFF) as u8; 16]).collect(); - let views: Vec<&[u8]> = payloads.iter().map(|v| v.as_slice()).collect(); +#[inline] +fn make_f32_array_with_tag(n: usize, tag: u64) -> Float32Array { + let mut rng = rng_for(tag, n); + let values = (0..n).map(|_| rng.random::()); + Float32Array::from_iter_values(values) +} + +#[inline] +fn make_f64_array_with_tag(n: usize, tag: u64) -> Float64Array { + let mut rng = rng_for(tag, n); + let values = (0..n).map(|_| rng.random::()); + Float64Array::from_iter_values(values) +} + +#[inline] +fn make_binary_array_with_tag(n: usize, tag: u64) -> BinaryArray { + let mut rng = rng_for(tag, n); + let mut payloads: Vec<[u8; 16]> = vec![[0; 16]; n]; + for p in payloads.iter_mut() { + rng.fill(&mut p[..]); + } + let views: Vec<&[u8]> = payloads.iter().map(|p| &p[..]).collect(); BinaryArray::from_vec(views) } -fn make_ts_micros_array(n: usize) -> PrimitiveArray { +#[inline] +fn make_ts_micros_array_with_tag(n: usize, tag: u64) -> PrimitiveArray { + let mut rng = rng_for(tag, n); let base: i64 = 1_600_000_000_000_000; - PrimitiveArray::::from_iter_values((0..n).map(|i| base + i as i64)) + let year_us: i64 = 31_536_000_000_000; + let values = (0..n).map(|_| base + sample_in::(&mut rng, 0..year_us)); + PrimitiveArray::::from_iter_values(values) +} + +#[inline] +fn make_bool_array(n: usize) -> BooleanArray { + make_bool_array_with_tag(n, 0xB001) +} +#[inline] +fn make_i32_array(n: usize) -> PrimitiveArray { + make_i32_array_with_tag(n, 0x1337_0032) +} +#[inline] +fn make_i64_array(n: usize) -> PrimitiveArray { + make_i64_array_with_tag(n, 0x1337_0064) +} +#[inline] +fn make_f32_array(n: usize) -> Float32Array { + make_f32_array_with_tag(n, 0xF0_0032) +} +#[inline] +fn make_f64_array(n: usize) -> Float64Array { + make_f64_array_with_tag(n, 0xF0_0064) +} +#[inline] +fn make_binary_array(n: usize) -> BinaryArray { + make_binary_array_with_tag(n, 0xB1_0001) +} +#[inline] +fn make_ts_micros_array(n: usize) -> PrimitiveArray { + make_ts_micros_array_with_tag(n, 0x7157_0001) } +#[inline] fn schema_single(name: &str, dt: DataType) -> Arc { Arc::new(Schema::new(vec![Field::new(name, dt, false)])) } +#[inline] fn schema_mixed() -> Arc { Arc::new(Schema::new(vec![ Field::new("f1", DataType::Int32, false), @@ -163,10 +241,10 @@ static MIXED_DATA: Lazy> = Lazy::new(|| { SIZES .iter() .map(|&n| { - let f1: ArrayRef = Arc::new(make_i32_array(n)); - let f2: ArrayRef = Arc::new(make_i64_array(n)); - let f3: ArrayRef = Arc::new(make_binary_array(n)); - let f4: ArrayRef = Arc::new(make_f64_array(n)); + let f1: ArrayRef = Arc::new(make_i32_array_with_tag(n, 0xA1)); + let f2: ArrayRef = Arc::new(make_i64_array_with_tag(n, 0xA2)); + let f3: ArrayRef = Arc::new(make_binary_array_with_tag(n, 0xA3)); + let f4: ArrayRef = Arc::new(make_f64_array_with_tag(n, 0xA4)); RecordBatch::try_new(schema.clone(), vec![f1, f2, f3, f4]).unwrap() }) .collect() @@ -190,9 +268,15 @@ fn bench_writer_scenario(c: &mut Criterion, name: &str, data_sets: &[RecordBatch let bytes = ocf_size_for_batch(batch); group.throughput(Throughput::Bytes(bytes as u64)); match rows { - 10_000 => { + 4_096 | 8_192 => { + group + .sample_size(40) + .measurement_time(Duration::from_secs(10)) + .warm_up_time(Duration::from_secs(3)); + } + 100_000 => { group - .sample_size(25) + .sample_size(20) .measurement_time(Duration::from_secs(10)) .warm_up_time(Duration::from_secs(3)); }