diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index d521ead929d4..7da50ad10249 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -116,3 +116,7 @@ harness = false [[bench]] name = "physical_plan" harness = false + +[[bench]] +name = "parquet_query_sql" +harness = false diff --git a/datafusion/benches/parquet_query_sql.rs b/datafusion/benches/parquet_query_sql.rs new file mode 100644 index 000000000000..94cfd5f50846 --- /dev/null +++ b/datafusion/benches/parquet_query_sql.rs @@ -0,0 +1,237 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks of SQL queries again parquet data + +use arrow::array::{ArrayRef, DictionaryArray, PrimitiveArray, StringArray}; +use arrow::datatypes::{ + ArrowPrimitiveType, DataType, Field, Float64Type, Int32Type, Int64Type, Schema, + SchemaRef, +}; +use arrow::record_batch::RecordBatch; +use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion::prelude::ExecutionContext; +use parquet::arrow::ArrowWriter; +use parquet::file::properties::{WriterProperties, WriterVersion}; +use rand::distributions::uniform::SampleUniform; +use rand::distributions::Alphanumeric; +use rand::prelude::*; +use std::fs::File; +use std::io::Read; +use std::ops::Range; +use std::path::Path; +use std::sync::Arc; +use std::time::Instant; +use tempfile::NamedTempFile; +use tokio_stream::StreamExt; + +/// The number of batches to write +const NUM_BATCHES: usize = 2048; +/// The number of rows in each record batch to write +const WRITE_RECORD_BATCH_SIZE: usize = 1024; +/// The number of rows in a row group +const ROW_GROUP_SIZE: usize = 1024 * 1024; +/// The number of row groups expected +const EXPECTED_ROW_GROUPS: usize = 2; + +fn schema() -> SchemaRef { + let string_dictionary_type = + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + + Arc::new(Schema::new(vec![ + Field::new("dict_10_required", string_dictionary_type.clone(), false), + Field::new("dict_10_optional", string_dictionary_type.clone(), true), + Field::new("dict_100_required", string_dictionary_type.clone(), false), + Field::new("dict_100_optional", string_dictionary_type.clone(), true), + Field::new("dict_1000_required", string_dictionary_type.clone(), false), + Field::new("dict_1000_optional", string_dictionary_type, true), + Field::new("string_required", DataType::Utf8, false), + Field::new("string_optional", DataType::Utf8, true), + Field::new("i64_required", DataType::Int64, false), + Field::new("i64_optional", DataType::Int64, true), + Field::new("f64_required", DataType::Float64, false), + Field::new("f64_optional", DataType::Float64, true), + ])) +} + +fn generate_batch() -> RecordBatch { + let schema = schema(); + let len = WRITE_RECORD_BATCH_SIZE; + RecordBatch::try_new( + schema, + vec![ + generate_string_dictionary("prefix", 10, len, 1.0), + generate_string_dictionary("prefix", 10, len, 0.5), + generate_string_dictionary("prefix", 100, len, 1.0), + generate_string_dictionary("prefix", 100, len, 0.5), + generate_string_dictionary("prefix", 1000, len, 1.0), + generate_string_dictionary("prefix", 1000, len, 0.5), + generate_strings(0..100, len, 1.0), + generate_strings(0..100, len, 0.5), + generate_primitive::(len, 1.0, -2000..2000), + generate_primitive::(len, 0.5, -2000..2000), + generate_primitive::(len, 1.0, -1000.0..1000.0), + generate_primitive::(len, 0.5, -1000.0..1000.0), + ], + ) + .unwrap() +} + +fn generate_string_dictionary( + prefix: &str, + cardinality: usize, + len: usize, + valid_percent: f64, +) -> ArrayRef { + let mut rng = thread_rng(); + let strings: Vec<_> = (0..cardinality) + .map(|x| format!("{}#{}", prefix, x)) + .collect(); + + Arc::new(DictionaryArray::::from_iter((0..len).map( + |_| { + rng.gen_bool(valid_percent) + .then(|| strings[rng.gen_range(0..cardinality)].as_str()) + }, + ))) +} + +fn generate_strings( + string_length_range: Range, + len: usize, + valid_percent: f64, +) -> ArrayRef { + let mut rng = thread_rng(); + Arc::new(StringArray::from_iter((0..len).map(|_| { + rng.gen_bool(valid_percent).then(|| { + let string_len = rng.gen_range(string_length_range.clone()); + (0..string_len) + .map(|_| char::from(rng.sample(Alphanumeric))) + .collect::() + }) + }))) +} + +fn generate_primitive( + len: usize, + valid_percent: f64, + range: Range, +) -> ArrayRef +where + T: ArrowPrimitiveType, + T::Native: SampleUniform, +{ + let mut rng = thread_rng(); + Arc::new(PrimitiveArray::::from_iter((0..len).map(|_| { + rng.gen_bool(valid_percent) + .then(|| rng.gen_range(range.clone())) + }))) +} + +fn generate_file() -> NamedTempFile { + let now = Instant::now(); + let named_file = tempfile::Builder::new() + .prefix("parquet_query_sql") + .suffix(".parquet") + .tempfile() + .unwrap(); + + println!("Generating parquet file - {}", named_file.path().display()); + let schema = schema(); + + let properties = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_2_0) + .set_max_row_group_size(ROW_GROUP_SIZE) + .build(); + + let file = named_file.as_file().try_clone().unwrap(); + let mut writer = ArrowWriter::try_new(file, schema, Some(properties)).unwrap(); + + for _ in 0..NUM_BATCHES { + let batch = generate_batch(); + writer.write(&batch).unwrap(); + } + + let metadata = writer.close().unwrap(); + assert_eq!( + metadata.num_rows as usize, + WRITE_RECORD_BATCH_SIZE * NUM_BATCHES + ); + assert_eq!(metadata.row_groups.len(), EXPECTED_ROW_GROUPS); + + println!( + "Generated parquet file in {} seconds", + now.elapsed().as_secs_f32() + ); + + named_file +} + +fn criterion_benchmark(c: &mut Criterion) { + let (file_path, temp_file) = match std::env::var("PARQUET_FILE") { + Ok(file) => (file, None), + Err(_) => { + let temp_file = generate_file(); + (temp_file.path().display().to_string(), Some(temp_file)) + } + }; + + assert!(Path::new(&file_path).exists(), "path not found"); + println!("Using parquet file {}", file_path); + + let mut context = ExecutionContext::new(); + + let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + rt.block_on(context.register_parquet("t", file_path.as_str())) + .unwrap(); + + // We read the queries from a file so they can be changed without recompiling the benchmark + let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap(); + let mut queries = String::new(); + queries_file.read_to_string(&mut queries).unwrap(); + + for query in queries.split(';') { + let query = query.trim(); + + // Remove comment lines + let query: Vec<_> = query.split('\n').filter(|x| !x.starts_with("--")).collect(); + let query = query.join(" "); + + // Ignore blank lines + if query.is_empty() { + continue; + } + + let query = query.as_str(); + c.bench_function(query, |b| { + b.iter(|| { + let mut context = context.clone(); + rt.block_on(async move { + let query = context.sql(query).await.unwrap(); + let mut stream = query.execute_stream().await.unwrap(); + while criterion::black_box(stream.next().await).is_some() {} + }) + }); + }); + } + + // Clean up temporary file if any + std::mem::drop(temp_file); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/benches/parquet_query_sql.sql b/datafusion/benches/parquet_query_sql.sql new file mode 100644 index 000000000000..1efb04a18717 --- /dev/null +++ b/datafusion/benches/parquet_query_sql.sql @@ -0,0 +1,32 @@ +-- Test scanning out dictionary columns +select dict_10_optional from t; +select dict_100_optional from t; +select dict_1000_optional from t; + +-- Test filtering dictionary columns +select count(*) from t where dict_10_required = 'prefix#0'; +select count(*) from t where dict_100_required = 'prefix#0'; +select count(*) from t where dict_1000_required = 'prefix#0'; + +-- Test select integers +select i64_optional from t where dict_10_required = 'prefix#2' and dict_1000_required = 'prefix#10'; +select i64_required from t where dict_10_required = 'prefix#2' and dict_1000_required = 'prefix#10'; + +-- Test select integers +select string_optional from t where dict_10_required = 'prefix#1' and dict_1000_required = 'prefix#1'; +select string_required from t where dict_10_required = 'prefix#1' and dict_1000_required = 'prefix#1'; + +-- Test select distinct +select distinct dict_10_required from t where dict_1000_optional is not NULL and i64_optional > 0; +select distinct dict_10_required from t where dict_1000_optional is not NULL and i64_optional > 0; +select distinct dict_10_required from t where dict_1000_optional is not NULL and i64_required > 0; +select distinct dict_10_required from t where dict_1000_optional is not NULL and i64_required > 0; + +-- Test basic aggregations +select dict_10_optional, count(*) from t group by dict_10_optional; +select dict_10_optional, dict_100_optional, count(*) from t group by dict_10_optional, dict_100_optional; + +-- Test float aggregations +select dict_10_optional, dict_100_optional, MIN(f64_required), MAX(f64_required), AVG(f64_required) from t group by dict_10_optional, dict_100_optional; +select dict_10_optional, dict_100_optional, MIN(f64_optional), MAX(f64_optional), AVG(f64_optional) from t group by dict_10_optional, dict_100_optional; +select dict_10_required, dict_100_required, MIN(f64_optional), MAX(f64_optional), AVG(f64_optional) from t group by dict_10_required, dict_100_required; diff --git a/dev/release/rat_exclude_files.txt b/dev/release/rat_exclude_files.txt index 304c08e80899..745c2ba8a75f 100644 --- a/dev/release/rat_exclude_files.txt +++ b/dev/release/rat_exclude_files.txt @@ -116,6 +116,7 @@ ci/* **/*.svg **/*.csv **/*.json +**/*.sql venv/* testing/* target/*