Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parquet SQL benchmarks #1738

Merged
merged 6 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,7 @@ harness = false
[[bench]]
name = "physical_plan"
harness = false

[[bench]]
name = "parquet_query_sql"
harness = false
237 changes: 237 additions & 0 deletions datafusion/benches/parquet_query_sql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Benchmarks of SQL queries again parquet data

use arrow::array::{ArrayRef, DictionaryArray, PrimitiveArray, StringArray};
use arrow::datatypes::{
ArrowPrimitiveType, DataType, Field, Float64Type, Int32Type, Int64Type, Schema,
SchemaRef,
};
use arrow::record_batch::RecordBatch;
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::prelude::ExecutionContext;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::{WriterProperties, WriterVersion};
use rand::distributions::uniform::SampleUniform;
use rand::distributions::Alphanumeric;
use rand::prelude::*;
use std::fs::File;
use std::io::Read;
use std::ops::Range;
use std::path::Path;
use std::sync::Arc;
use std::time::Instant;
use tempfile::NamedTempFile;
use tokio_stream::StreamExt;

/// The number of batches to write
const NUM_BATCHES: usize = 2048;
/// The number of rows in each record batch to write
const WRITE_RECORD_BATCH_SIZE: usize = 1024;
/// The number of rows in a row group
const ROW_GROUP_SIZE: usize = 1024 * 1024;
/// The number of row groups expected
const EXPECTED_ROW_GROUPS: usize = 2;

fn schema() -> SchemaRef {
let string_dictionary_type =
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));

Arc::new(Schema::new(vec![
Field::new("dict_10_required", string_dictionary_type.clone(), false),
Field::new("dict_10_optional", string_dictionary_type.clone(), true),
Field::new("dict_100_required", string_dictionary_type.clone(), false),
Field::new("dict_100_optional", string_dictionary_type.clone(), true),
Field::new("dict_1000_required", string_dictionary_type.clone(), false),
Field::new("dict_1000_optional", string_dictionary_type.clone(), true),
Field::new("string_required", DataType::Utf8, false),
Field::new("string_optional", DataType::Utf8, true),
Field::new("i64_required", DataType::Int64, false),
Field::new("i64_optional", DataType::Int64, true),
Field::new("f64_required", DataType::Float64, false),
Field::new("f64_optional", DataType::Float64, true),
]))
}

fn generate_batch() -> RecordBatch {
let schema = schema();
let len = WRITE_RECORD_BATCH_SIZE;
RecordBatch::try_new(
schema,
vec![
generate_string_dictionary("prefix", 10, len, 1.0),
generate_string_dictionary("prefix", 10, len, 0.5),
generate_string_dictionary("prefix", 100, len, 1.0),
generate_string_dictionary("prefix", 100, len, 0.5),
generate_string_dictionary("prefix", 1000, len, 1.0),
generate_string_dictionary("prefix", 1000, len, 0.5),
generate_strings(0..100, len, 1.0),
generate_strings(0..100, len, 0.5),
generate_primitive::<Int64Type>(len, 1.0, -2000..2000),
generate_primitive::<Int64Type>(len, 0.5, -2000..2000),
generate_primitive::<Float64Type>(len, 1.0, -1000.0..1000.0),
generate_primitive::<Float64Type>(len, 0.5, -1000.0..1000.0),
],
)
.unwrap()
}

fn generate_string_dictionary(
prefix: &str,
cardinality: usize,
len: usize,
valid_percent: f64,
) -> ArrayRef {
let mut rng = thread_rng();
let strings: Vec<_> = (0..cardinality)
.map(|x| format!("{}#{}", prefix, x))
.collect();

Arc::new(DictionaryArray::<Int32Type>::from_iter((0..len).map(
|_| {
rng.gen_bool(valid_percent)
.then(|| strings[rng.gen_range(0..cardinality)].as_str())
},
)))
}

fn generate_strings(
string_length_range: Range<usize>,
len: usize,
valid_percent: f64,
) -> ArrayRef {
let mut rng = thread_rng();
Arc::new(StringArray::from_iter((0..len).map(|_| {
rng.gen_bool(valid_percent).then(|| {
let string_len = rng.gen_range(string_length_range.clone());
(0..string_len)
.map(|_| char::from(rng.sample(Alphanumeric)))
.collect::<String>()
})
})))
}

fn generate_primitive<T>(
len: usize,
valid_percent: f64,
range: Range<T::Native>,
) -> ArrayRef
where
T: ArrowPrimitiveType,
T::Native: SampleUniform,
{
let mut rng = thread_rng();
Arc::new(PrimitiveArray::<T>::from_iter((0..len).map(|_| {
rng.gen_bool(valid_percent)
.then(|| rng.gen_range(range.clone()))
})))
}

fn generate_file() -> NamedTempFile {
let now = Instant::now();
let named_file = tempfile::Builder::new()
.prefix("parquet_query_sql")
.suffix(".parquet")
.tempfile()
.unwrap();

println!("Generating parquet file - {}", named_file.path().display());
let schema = schema();

let properties = WriterProperties::builder()
.set_writer_version(WriterVersion::PARQUET_2_0)
.set_max_row_group_size(ROW_GROUP_SIZE)
.build();

let file = named_file.as_file().try_clone().unwrap();
let mut writer = ArrowWriter::try_new(file, schema, Some(properties)).unwrap();

for _ in 0..NUM_BATCHES {
let batch = generate_batch();
writer.write(&batch).unwrap();
}

let metadata = writer.close().unwrap();
assert_eq!(
metadata.num_rows as usize,
WRITE_RECORD_BATCH_SIZE * NUM_BATCHES
);
assert_eq!(metadata.row_groups.len(), EXPECTED_ROW_GROUPS);

println!(
"Generated parquet file in {} seconds",
now.elapsed().as_secs_f32()
);

named_file
}

fn criterion_benchmark(c: &mut Criterion) {
let (file_path, temp_file) = match std::env::var("PARQUET_FILE") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a neat feature (being able to override the file being tested using thePARQUET_FILE environment variable.

I wonder if it would be possible to add a note about this in https://github.com/apache/arrow-datafusion/blob/master/DEVELOPERS.md somewhere? Perhaps "how to run benchmarks" section?

Ok(file) => (file, None),
Err(_) => {
let temp_file = generate_file();
(temp_file.path().display().to_string(), Some(temp_file))
}
};

assert!(Path::new(&file_path).exists(), "path not found");
println!("Using parquet file {}", file_path);

let mut context = ExecutionContext::new();

let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap();
rt.block_on(context.register_parquet("t", file_path.as_str()))
.unwrap();

// We read the queries from a file so they can be changed without recompiling the benchmark
let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap();
let mut queries = String::new();
queries_file.read_to_string(&mut queries).unwrap();

for query in queries.split(';') {
let query = query.trim();

// Remove comment lines
let query: Vec<_> = query.split('\n').filter(|x| !x.starts_with("--")).collect();
let query = query.join(" ");

// Ignore blank lines
if query.is_empty() {
continue;
}

let query = query.as_str();
c.bench_function(query, |b| {
b.iter(|| {
let mut context = context.clone();
rt.block_on(async move {
let query = context.sql(query).await.unwrap();
let mut stream = query.execute_stream().await.unwrap();
while let Some(_) = criterion::black_box(stream.next().await) {}
})
});
});
}

// Clean up temporary file if any
std::mem::drop(temp_file);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to drop the temp file explicitly? Won't it automatically happen when the variable goes out of scope?

Copy link
Contributor Author

@tustvold tustvold Feb 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was intended as a hint that the lifetime of temp_file matters, i.e. it must live to the end of the benchmark block. In the past I've accidentally refactored tests with NamedTempFile and its broken in odd ways that have boiled down to the temporary file getting cleaned up too early.

I'll clarify the comment

}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
32 changes: 32 additions & 0 deletions datafusion/benches/parquet_query_sql.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
-- Test scanning out dictionary columns
select dict_10_optional from t;
select dict_100_optional from t;
select dict_1000_optional from t;

-- Test filtering dictionary columns
select count(*) from t where dict_10_required = 'prefix#0';
select count(*) from t where dict_100_required = 'prefix#0';
select count(*) from t where dict_1000_required = 'prefix#0';

-- Test select integers
select i64_optional from t where dict_10_required = 'prefix#2' and dict_1000_required = 'prefix#10';
select i64_required from t where dict_10_required = 'prefix#2' and dict_1000_required = 'prefix#10';

-- Test select integers
select string_optional from t where dict_10_required = 'prefix#1' and dict_1000_required = 'prefix#1';
select string_required from t where dict_10_required = 'prefix#1' and dict_1000_required = 'prefix#1';

-- Test select distinct
select distinct dict_10_required from t where dict_1000_optional is not NULL and i64_optional > 0;
select distinct dict_10_required from t where dict_1000_optional is not NULL and i64_optional > 0;
select distinct dict_10_required from t where dict_1000_optional is not NULL and i64_required > 0;
select distinct dict_10_required from t where dict_1000_optional is not NULL and i64_required > 0;

-- Test basic aggregations
select dict_10_optional, count(*) from t group by dict_10_optional;
select dict_10_optional, dict_100_optional, count(*) from t group by dict_10_optional, dict_100_optional;

-- Test float aggregations
select dict_10_optional, dict_100_optional, MIN(f64_required), MAX(f64_required), AVG(f64_required) from t group by dict_10_optional, dict_100_optional;
select dict_10_optional, dict_100_optional, MIN(f64_optional), MAX(f64_optional), AVG(f64_optional) from t group by dict_10_optional, dict_100_optional;
select dict_10_required, dict_100_required, MIN(f64_optional), MAX(f64_optional), AVG(f64_optional) from t group by dict_10_required, dict_100_required;
1 change: 1 addition & 0 deletions dev/release/rat_exclude_files.txt
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ ci/*
**/*.svg
**/*.csv
**/*.json
**/*.sql
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the correct thing to do, but someone should probably verify if RAT is needed for SQL files

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine in my opinion

venv/*
testing/*
target/*
Expand Down