diff --git a/datafusion/core/benches/aggregate_query_sql.rs b/datafusion/core/benches/aggregate_query_sql.rs index ebe94450c1f8..b29bfc487340 100644 --- a/datafusion/core/benches/aggregate_query_sql.rs +++ b/datafusion/core/benches/aggregate_query_sql.rs @@ -29,8 +29,7 @@ use parking_lot::Mutex; use std::sync::Arc; use tokio::runtime::Runtime; -fn query(ctx: Arc>, sql: &str) { - let rt = Runtime::new().unwrap(); +fn query(ctx: Arc>, rt: &Runtime, sql: &str) { let df = rt.block_on(ctx.lock().sql(sql)).unwrap(); criterion::black_box(rt.block_on(df.collect()).unwrap()); } @@ -51,11 +50,13 @@ fn criterion_benchmark(c: &mut Criterion) { let array_len = 32768 * 2; // 2^16 let batch_size = 2048; // 2^11 let ctx = create_context(partitions_len, array_len, batch_size).unwrap(); + let rt = Runtime::new().unwrap(); c.bench_function("aggregate_query_no_group_by 15 12", |b| { b.iter(|| { query( ctx.clone(), + &rt, "SELECT MIN(f64), AVG(f64), COUNT(f64) \ FROM t", ) @@ -66,6 +67,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT MIN(f64), MAX(f64) \ FROM t", ) @@ -76,6 +78,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT COUNT(DISTINCT u64_wide) \ FROM t", ) @@ -86,6 +89,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT COUNT(DISTINCT u64_narrow) \ FROM t", ) @@ -96,6 +100,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) \ FROM t GROUP BY utf8", ) @@ -106,6 +111,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) \ FROM t \ WHERE f32 > 10 AND f32 < 20 GROUP BY utf8", @@ -117,6 +123,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT u64_narrow, MIN(f64), AVG(f64), COUNT(f64) \ FROM t GROUP BY u64_narrow", ) @@ -127,6 +134,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT u64_narrow, MIN(f64), AVG(f64), COUNT(f64) \ FROM t \ WHERE f32 > 10 AND f32 < 20 GROUP BY u64_narrow", @@ -138,6 +146,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT u64_wide, utf8, MIN(f64), AVG(f64), COUNT(f64) \ FROM t GROUP BY u64_wide, utf8", ) @@ -148,6 +157,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT utf8, approx_percentile_cont(u64_wide, 0.5, 2500) \ FROM t GROUP BY utf8", ) @@ -158,6 +168,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT utf8, approx_percentile_cont(f32, 0.5, 2500) \ FROM t GROUP BY utf8", ) @@ -168,6 +179,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT MEDIAN(DISTINCT u64_wide), MEDIAN(DISTINCT u64_narrow) \ FROM t", ) @@ -178,6 +190,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT first_value(u64_wide order by f64, u64_narrow, utf8),\ last_value(u64_wide order by f64, u64_narrow, utf8) \ FROM t GROUP BY u64_narrow", @@ -189,6 +202,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT first_value(u64_wide ignore nulls order by f64, u64_narrow, utf8), \ last_value(u64_wide ignore nulls order by f64, u64_narrow, utf8) \ FROM t GROUP BY u64_narrow", @@ -200,6 +214,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT first_value(u64_wide order by f64), \ last_value(u64_wide order by f64) \ FROM t GROUP BY u64_narrow", diff --git a/datafusion/core/benches/csv_load.rs b/datafusion/core/benches/csv_load.rs index 2d42121ec9b2..3f984757466d 100644 --- a/datafusion/core/benches/csv_load.rs +++ b/datafusion/core/benches/csv_load.rs @@ -32,8 +32,12 @@ use std::time::Duration; use test_utils::AccessLogGenerator; use tokio::runtime::Runtime; -fn load_csv(ctx: Arc>, path: &str, options: CsvReadOptions) { - let rt = Runtime::new().unwrap(); +fn load_csv( + ctx: Arc>, + rt: &Runtime, + path: &str, + options: CsvReadOptions, +) { let df = rt.block_on(ctx.lock().read_csv(path, options)).unwrap(); criterion::black_box(rt.block_on(df.collect()).unwrap()); } @@ -61,6 +65,7 @@ fn generate_test_file() -> TestCsvFile { fn criterion_benchmark(c: &mut Criterion) { let ctx = create_context().unwrap(); + let rt = Runtime::new().unwrap(); let test_file = generate_test_file(); let mut group = c.benchmark_group("load csv testing"); @@ -70,6 +75,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { load_csv( ctx.clone(), + &rt, test_file.path().to_str().unwrap(), CsvReadOptions::default(), ) @@ -80,6 +86,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { load_csv( ctx.clone(), + &rt, test_file.path().to_str().unwrap(), CsvReadOptions::default().null_regex(Some("^NULL$|^$".to_string())), ) diff --git a/datafusion/core/benches/dataframe.rs b/datafusion/core/benches/dataframe.rs index 03078e05e105..832553ebed82 100644 --- a/datafusion/core/benches/dataframe.rs +++ b/datafusion/core/benches/dataframe.rs @@ -44,9 +44,7 @@ fn create_context(field_count: u32) -> datafusion_common::Result) { - let rt = Runtime::new().unwrap(); - +fn run(column_count: u32, ctx: Arc, rt: &Runtime) { criterion::black_box(rt.block_on(async { let mut data_frame = ctx.table("t").await.unwrap(); @@ -67,11 +65,13 @@ fn run(column_count: u32, ctx: Arc) { } fn criterion_benchmark(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + for column_count in [10, 100, 200, 500] { let ctx = create_context(column_count).unwrap(); c.bench_function(&format!("with_column_{column_count}"), |b| { - b.iter(|| run(column_count, ctx.clone())) + b.iter(|| run(column_count, ctx.clone(), &rt)) }); } } diff --git a/datafusion/core/benches/distinct_query_sql.rs b/datafusion/core/benches/distinct_query_sql.rs index ccc6a0e74652..4992ae660766 100644 --- a/datafusion/core/benches/distinct_query_sql.rs +++ b/datafusion/core/benches/distinct_query_sql.rs @@ -33,8 +33,7 @@ use parking_lot::Mutex; use std::{sync::Arc, time::Duration}; use tokio::runtime::Runtime; -fn query(ctx: Arc>, sql: &str) { - let rt = Runtime::new().unwrap(); +fn query(ctx: Arc>, rt: &Runtime, sql: &str) { let df = rt.block_on(ctx.lock().sql(sql)).unwrap(); criterion::black_box(rt.block_on(df.collect()).unwrap()); } @@ -55,6 +54,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) { let array_len = 1 << 26; // 64 M let batch_size = 8192; let ctx = create_context(partitions_len, array_len, batch_size).unwrap(); + let rt = Runtime::new().unwrap(); let mut group = c.benchmark_group("custom-measurement-time"); group.measurement_time(Duration::from_secs(40)); @@ -63,6 +63,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 10", ) }) @@ -72,6 +73,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 100", ) }) @@ -81,6 +83,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 1000", ) }) @@ -90,6 +93,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 10000", ) }) @@ -99,6 +103,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT u64_narrow, u64_wide, utf8, f64 FROM t GROUP BY 1, 2, 3, 4 LIMIT 10", ) }) diff --git a/datafusion/core/benches/filter_query_sql.rs b/datafusion/core/benches/filter_query_sql.rs index 0e09ae09d7c2..c82a1607184d 100644 --- a/datafusion/core/benches/filter_query_sql.rs +++ b/datafusion/core/benches/filter_query_sql.rs @@ -27,9 +27,7 @@ use futures::executor::block_on; use std::sync::Arc; use tokio::runtime::Runtime; -async fn query(ctx: &SessionContext, sql: &str) { - let rt = Runtime::new().unwrap(); - +async fn query(ctx: &SessionContext, rt: &Runtime, sql: &str) { // execute the query let df = rt.block_on(ctx.sql(sql)).unwrap(); criterion::black_box(rt.block_on(df.collect()).unwrap()); @@ -68,10 +66,11 @@ fn create_context(array_len: usize, batch_size: usize) -> Result fn criterion_benchmark(c: &mut Criterion) { let array_len = 524_288; // 2^19 let batch_size = 4096; // 2^12 + let rt = Runtime::new().unwrap(); c.bench_function("filter_array", |b| { let ctx = create_context(array_len, batch_size).unwrap(); - b.iter(|| block_on(query(&ctx, "select f32, f64 from t where f32 >= f64"))) + b.iter(|| block_on(query(&ctx, &rt, "select f32, f64 from t where f32 >= f64"))) }); c.bench_function("filter_scalar", |b| { @@ -79,6 +78,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { block_on(query( &ctx, + &rt, "select f32, f64 from t where f32 >= 250 and f64 > 250", )) }) @@ -89,6 +89,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { block_on(query( &ctx, + &rt, "select f32, f64 from t where f32 in (10, 20, 30, 40)", )) }) diff --git a/datafusion/core/benches/math_query_sql.rs b/datafusion/core/benches/math_query_sql.rs index 92c59d506640..76824850c114 100644 --- a/datafusion/core/benches/math_query_sql.rs +++ b/datafusion/core/benches/math_query_sql.rs @@ -36,9 +36,7 @@ use datafusion::datasource::MemTable; use datafusion::error::Result; use datafusion::execution::context::SessionContext; -fn query(ctx: Arc>, sql: &str) { - let rt = Runtime::new().unwrap(); - +fn query(ctx: Arc>, rt: &Runtime, sql: &str) { // execute the query let df = rt.block_on(ctx.lock().sql(sql)).unwrap(); rt.block_on(df.collect()).unwrap(); @@ -81,29 +79,31 @@ fn criterion_benchmark(c: &mut Criterion) { let array_len = 1048576; // 2^20 let batch_size = 512; // 2^9 let ctx = create_context(array_len, batch_size).unwrap(); + let rt = Runtime::new().unwrap(); + c.bench_function("sqrt_20_9", |b| { - b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t")) + b.iter(|| query(ctx.clone(), &rt, "SELECT sqrt(f32) FROM t")) }); let array_len = 1048576; // 2^20 let batch_size = 4096; // 2^12 let ctx = create_context(array_len, batch_size).unwrap(); c.bench_function("sqrt_20_12", |b| { - b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t")) + b.iter(|| query(ctx.clone(), &rt, "SELECT sqrt(f32) FROM t")) }); let array_len = 4194304; // 2^22 let batch_size = 4096; // 2^12 let ctx = create_context(array_len, batch_size).unwrap(); c.bench_function("sqrt_22_12", |b| { - b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t")) + b.iter(|| query(ctx.clone(), &rt, "SELECT sqrt(f32) FROM t")) }); let array_len = 4194304; // 2^22 let batch_size = 16384; // 2^14 let ctx = create_context(array_len, batch_size).unwrap(); c.bench_function("sqrt_22_14", |b| { - b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t")) + b.iter(|| query(ctx.clone(), &rt, "SELECT sqrt(f32) FROM t")) }); } diff --git a/datafusion/core/benches/physical_plan.rs b/datafusion/core/benches/physical_plan.rs index aae1457ab9e6..0a65c52f72de 100644 --- a/datafusion/core/benches/physical_plan.rs +++ b/datafusion/core/benches/physical_plan.rs @@ -42,6 +42,7 @@ use datafusion_physical_expr_common::sort_expr::LexOrdering; // as inputs. All record batches must have the same schema. fn sort_preserving_merge_operator( session_ctx: Arc, + rt: &Runtime, batches: Vec, sort: &[&str], ) { @@ -63,7 +64,6 @@ fn sort_preserving_merge_operator( .unwrap(); let merge = Arc::new(SortPreservingMergeExec::new(sort, exec)); let task_ctx = session_ctx.task_ctx(); - let rt = Runtime::new().unwrap(); rt.block_on(collect(merge, task_ctx)).unwrap(); } @@ -166,14 +166,16 @@ fn criterion_benchmark(c: &mut Criterion) { ]; let ctx = Arc::new(SessionContext::new()); + let rt = Runtime::new().unwrap(); + for (name, input) in benches { - let ctx_clone = ctx.clone(); - c.bench_function(name, move |b| { + c.bench_function(name, |b| { b.iter_batched( || input.clone(), |input| { sort_preserving_merge_operator( - ctx_clone.clone(), + ctx.clone(), + &rt, input, &["a", "b", "c", "d"], ); diff --git a/datafusion/core/benches/sort_limit_query_sql.rs b/datafusion/core/benches/sort_limit_query_sql.rs index cfd4b8bc4bba..e535a018161f 100644 --- a/datafusion/core/benches/sort_limit_query_sql.rs +++ b/datafusion/core/benches/sort_limit_query_sql.rs @@ -37,9 +37,7 @@ use datafusion::execution::context::SessionContext; use tokio::runtime::Runtime; -fn query(ctx: Arc>, sql: &str) { - let rt = Runtime::new().unwrap(); - +fn query(ctx: Arc>, rt: &Runtime, sql: &str) { // execute the query let df = rt.block_on(ctx.lock().sql(sql)).unwrap(); rt.block_on(df.collect()).unwrap(); @@ -104,11 +102,14 @@ fn create_context() -> Arc> { } fn criterion_benchmark(c: &mut Criterion) { + let ctx = create_context(); + let rt = Runtime::new().unwrap(); + c.bench_function("sort_and_limit_by_int", |b| { - let ctx = create_context(); b.iter(|| { query( ctx.clone(), + &rt, "SELECT c1, c13, c6, c10 \ FROM aggregate_test_100 \ ORDER BY c6 @@ -118,10 +119,10 @@ fn criterion_benchmark(c: &mut Criterion) { }); c.bench_function("sort_and_limit_by_float", |b| { - let ctx = create_context(); b.iter(|| { query( ctx.clone(), + &rt, "SELECT c1, c13, c12 \ FROM aggregate_test_100 \ ORDER BY c13 @@ -131,10 +132,10 @@ fn criterion_benchmark(c: &mut Criterion) { }); c.bench_function("sort_and_limit_lex_by_int", |b| { - let ctx = create_context(); b.iter(|| { query( ctx.clone(), + &rt, "SELECT c1, c13, c6, c10 \ FROM aggregate_test_100 \ ORDER BY c6 DESC, c10 DESC @@ -144,10 +145,10 @@ fn criterion_benchmark(c: &mut Criterion) { }); c.bench_function("sort_and_limit_lex_by_string", |b| { - let ctx = create_context(); b.iter(|| { query( ctx.clone(), + &rt, "SELECT c1, c13, c6, c10 \ FROM aggregate_test_100 \ ORDER BY c1, c13 diff --git a/datafusion/core/benches/sql_planner.rs b/datafusion/core/benches/sql_planner.rs index 2d79778d4d42..49cc830d58bc 100644 --- a/datafusion/core/benches/sql_planner.rs +++ b/datafusion/core/benches/sql_planner.rs @@ -45,14 +45,12 @@ const BENCHMARKS_PATH_2: &str = "./benchmarks/"; const CLICKBENCH_DATA_PATH: &str = "data/hits_partitioned/"; /// Create a logical plan from the specified sql -fn logical_plan(ctx: &SessionContext, sql: &str) { - let rt = Runtime::new().unwrap(); +fn logical_plan(ctx: &SessionContext, rt: &Runtime, sql: &str) { criterion::black_box(rt.block_on(ctx.sql(sql)).unwrap()); } /// Create a physical ExecutionPlan (by way of logical plan) -fn physical_plan(ctx: &SessionContext, sql: &str) { - let rt = Runtime::new().unwrap(); +fn physical_plan(ctx: &SessionContext, rt: &Runtime, sql: &str) { criterion::black_box(rt.block_on(async { ctx.sql(sql) .await @@ -104,9 +102,8 @@ fn register_defs(ctx: SessionContext, defs: Vec) -> SessionContext { ctx } -fn register_clickbench_hits_table() -> SessionContext { +fn register_clickbench_hits_table(rt: &Runtime) -> SessionContext { let ctx = SessionContext::new(); - let rt = Runtime::new().unwrap(); // use an external table for clickbench benchmarks let path = @@ -128,7 +125,11 @@ fn register_clickbench_hits_table() -> SessionContext { /// Target of this benchmark: control that placeholders replacing does not get slower, /// if the query does not contain placeholders at all. -fn benchmark_with_param_values_many_columns(ctx: &SessionContext, b: &mut Bencher) { +fn benchmark_with_param_values_many_columns( + ctx: &SessionContext, + rt: &Runtime, + b: &mut Bencher, +) { const COLUMNS_NUM: usize = 200; let mut aggregates = String::new(); for i in 0..COLUMNS_NUM { @@ -140,7 +141,6 @@ fn benchmark_with_param_values_many_columns(ctx: &SessionContext, b: &mut Benche // SELECT max(attr0), ..., max(attrN) FROM t1. let query = format!("SELECT {} FROM t1", aggregates); let statement = ctx.state().sql_to_statement(&query, "Generic").unwrap(); - let rt = Runtime::new().unwrap(); let plan = rt.block_on(async { ctx.state().statement_to_plan(statement).await.unwrap() }); b.iter(|| { @@ -230,33 +230,35 @@ fn criterion_benchmark(c: &mut Criterion) { } let ctx = create_context(); + let rt = Runtime::new().unwrap(); // Test simplest // https://github.com/apache/datafusion/issues/5157 c.bench_function("logical_select_one_from_700", |b| { - b.iter(|| logical_plan(&ctx, "SELECT c1 FROM t700")) + b.iter(|| logical_plan(&ctx, &rt, "SELECT c1 FROM t700")) }); // Test simplest // https://github.com/apache/datafusion/issues/5157 c.bench_function("physical_select_one_from_700", |b| { - b.iter(|| physical_plan(&ctx, "SELECT c1 FROM t700")) + b.iter(|| physical_plan(&ctx, &rt, "SELECT c1 FROM t700")) }); // Test simplest c.bench_function("logical_select_all_from_1000", |b| { - b.iter(|| logical_plan(&ctx, "SELECT * FROM t1000")) + b.iter(|| logical_plan(&ctx, &rt, "SELECT * FROM t1000")) }); // Test simplest c.bench_function("physical_select_all_from_1000", |b| { - b.iter(|| physical_plan(&ctx, "SELECT * FROM t1000")) + b.iter(|| physical_plan(&ctx, &rt, "SELECT * FROM t1000")) }); c.bench_function("logical_trivial_join_low_numbered_columns", |b| { b.iter(|| { logical_plan( &ctx, + &rt, "SELECT t1.a2, t2.b2 \ FROM t1, t2 WHERE a1 = b1", ) @@ -267,6 +269,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { logical_plan( &ctx, + &rt, "SELECT t1.a99, t2.b99 \ FROM t1, t2 WHERE a199 = b199", ) @@ -277,6 +280,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { logical_plan( &ctx, + &rt, "SELECT t1.a99, MIN(t2.b1), MAX(t2.b199), AVG(t2.b123), COUNT(t2.b73) \ FROM t1 JOIN t2 ON t1.a199 = t2.b199 GROUP BY t1.a99", ) @@ -293,7 +297,7 @@ fn criterion_benchmark(c: &mut Criterion) { } let query = format!("SELECT {} FROM t1", aggregates); b.iter(|| { - physical_plan(&ctx, &query); + physical_plan(&ctx, &rt, &query); }); }); @@ -302,6 +306,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { physical_plan( &ctx, + &rt, "SELECT t1.a7, t2.b8 \ FROM t1, t2 WHERE a7 = b7 \ ORDER BY a7", @@ -313,6 +318,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { physical_plan( &ctx, + &rt, "SELECT t1.a7, t2.b8 \ FROM t1, t2 WHERE a7 < b7 \ ORDER BY a7", @@ -324,6 +330,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { physical_plan( &ctx, + &rt, "SELECT ta.a9, tb.a10, tc.a11, td.a12, te.a13, tf.a14 \ FROM t1 AS ta, t1 AS tb, t1 AS tc, t1 AS td, t1 AS te, t1 AS tf \ WHERE ta.a9 = tb.a10 AND tb.a10 = tc.a11 AND tc.a11 = td.a12 AND \ @@ -336,6 +343,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { physical_plan( &ctx, + &rt, "SELECT t1.a7 \ FROM t1 WHERE a7 = (SELECT b8 FROM t2)", ); @@ -346,6 +354,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { physical_plan( &ctx, + &rt, "SELECT t1.a7 FROM t1 \ INTERSECT SELECT t2.b8 FROM t2", ); @@ -356,6 +365,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { logical_plan( &ctx, + &rt, "SELECT DISTINCT t1.a7 \ FROM t1, t2 WHERE t1.a7 = t2.b8", ); @@ -370,7 +380,7 @@ fn criterion_benchmark(c: &mut Criterion) { c.bench_function("physical_sorted_union_orderby", |b| { // SELECT ... UNION ALL ... let query = union_orderby_query(20); - b.iter(|| physical_plan(&ctx, &query)) + b.iter(|| physical_plan(&ctx, &rt, &query)) }); // --- TPC-H --- @@ -393,7 +403,7 @@ fn criterion_benchmark(c: &mut Criterion) { let sql = std::fs::read_to_string(format!("{benchmarks_path}queries/{q}.sql")).unwrap(); c.bench_function(&format!("physical_plan_tpch_{}", q), |b| { - b.iter(|| physical_plan(&tpch_ctx, &sql)) + b.iter(|| physical_plan(&tpch_ctx, &rt, &sql)) }); } @@ -407,7 +417,7 @@ fn criterion_benchmark(c: &mut Criterion) { c.bench_function("physical_plan_tpch_all", |b| { b.iter(|| { for sql in &all_tpch_sql_queries { - physical_plan(&tpch_ctx, sql) + physical_plan(&tpch_ctx, &rt, sql) } }) }); @@ -442,7 +452,7 @@ fn criterion_benchmark(c: &mut Criterion) { c.bench_function("physical_plan_tpcds_all", |b| { b.iter(|| { for sql in &all_tpcds_sql_queries { - physical_plan(&tpcds_ctx, sql) + physical_plan(&tpcds_ctx, &rt, sql) } }) }); @@ -468,7 +478,7 @@ fn criterion_benchmark(c: &mut Criterion) { .map(|l| l.expect("Could not parse line")) .collect_vec(); - let clickbench_ctx = register_clickbench_hits_table(); + let clickbench_ctx = register_clickbench_hits_table(&rt); // for (i, sql) in clickbench_queries.iter().enumerate() { // c.bench_function(&format!("logical_plan_clickbench_q{}", i + 1), |b| { @@ -478,7 +488,7 @@ fn criterion_benchmark(c: &mut Criterion) { for (i, sql) in clickbench_queries.iter().enumerate() { c.bench_function(&format!("physical_plan_clickbench_q{}", i + 1), |b| { - b.iter(|| physical_plan(&clickbench_ctx, sql)) + b.iter(|| physical_plan(&clickbench_ctx, &rt, sql)) }); } @@ -493,13 +503,13 @@ fn criterion_benchmark(c: &mut Criterion) { c.bench_function("physical_plan_clickbench_all", |b| { b.iter(|| { for sql in &clickbench_queries { - physical_plan(&clickbench_ctx, sql) + physical_plan(&clickbench_ctx, &rt, sql) } }) }); c.bench_function("with_param_values_many_columns", |b| { - benchmark_with_param_values_many_columns(&ctx, b); + benchmark_with_param_values_many_columns(&ctx, &rt, b); }); } diff --git a/datafusion/core/benches/struct_query_sql.rs b/datafusion/core/benches/struct_query_sql.rs index 3ef7292c6627..f9cc43d1ea2c 100644 --- a/datafusion/core/benches/struct_query_sql.rs +++ b/datafusion/core/benches/struct_query_sql.rs @@ -27,9 +27,7 @@ use futures::executor::block_on; use std::sync::Arc; use tokio::runtime::Runtime; -async fn query(ctx: &SessionContext, sql: &str) { - let rt = Runtime::new().unwrap(); - +async fn query(ctx: &SessionContext, rt: &Runtime, sql: &str) { // execute the query let df = rt.block_on(ctx.sql(sql)).unwrap(); criterion::black_box(rt.block_on(df.collect()).unwrap()); @@ -68,10 +66,11 @@ fn create_context(array_len: usize, batch_size: usize) -> Result fn criterion_benchmark(c: &mut Criterion) { let array_len = 524_288; // 2^19 let batch_size = 4096; // 2^12 + let ctx = create_context(array_len, batch_size).unwrap(); + let rt = Runtime::new().unwrap(); c.bench_function("struct", |b| { - let ctx = create_context(array_len, batch_size).unwrap(); - b.iter(|| block_on(query(&ctx, "select struct(f32, f64) from t"))) + b.iter(|| block_on(query(&ctx, &rt, "select struct(f32, f64) from t"))) }); } diff --git a/datafusion/core/benches/window_query_sql.rs b/datafusion/core/benches/window_query_sql.rs index 42a1e51be361..a55d17a7c5dc 100644 --- a/datafusion/core/benches/window_query_sql.rs +++ b/datafusion/core/benches/window_query_sql.rs @@ -29,8 +29,7 @@ use parking_lot::Mutex; use std::sync::Arc; use tokio::runtime::Runtime; -fn query(ctx: Arc>, sql: &str) { - let rt = Runtime::new().unwrap(); +fn query(ctx: Arc>, rt: &Runtime, sql: &str) { let df = rt.block_on(ctx.lock().sql(sql)).unwrap(); criterion::black_box(rt.block_on(df.collect()).unwrap()); } @@ -51,11 +50,13 @@ fn criterion_benchmark(c: &mut Criterion) { let array_len = 1024 * 1024; let batch_size = 8 * 1024; let ctx = create_context(partitions_len, array_len, batch_size).unwrap(); + let rt = Runtime::new().unwrap(); c.bench_function("window empty over, aggregate functions", |b| { b.iter(|| { query( ctx.clone(), + &rt, "SELECT \ MAX(f64) OVER (), \ MIN(f32) OVER (), \ @@ -69,6 +70,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT \ FIRST_VALUE(f64) OVER (), \ LAST_VALUE(f32) OVER (), \ @@ -82,6 +84,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT \ MAX(f64) OVER (ORDER BY u64_narrow), \ MIN(f32) OVER (ORDER BY u64_narrow DESC), \ @@ -95,6 +98,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT \ FIRST_VALUE(f64) OVER (ORDER BY u64_narrow), \ LAST_VALUE(f32) OVER (ORDER BY u64_narrow DESC), \ @@ -108,6 +112,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT \ MAX(f64) OVER (PARTITION BY u64_wide), \ MIN(f32) OVER (PARTITION BY u64_wide), \ @@ -123,6 +128,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT \ MAX(f64) OVER (PARTITION BY u64_narrow), \ MIN(f32) OVER (PARTITION BY u64_narrow), \ @@ -137,6 +143,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT \ FIRST_VALUE(f64) OVER (PARTITION BY u64_wide), \ LAST_VALUE(f32) OVER (PARTITION BY u64_wide), \ @@ -150,6 +157,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT \ FIRST_VALUE(f64) OVER (PARTITION BY u64_narrow), \ LAST_VALUE(f32) OVER (PARTITION BY u64_narrow), \ @@ -165,6 +173,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT \ MAX(f64) OVER (PARTITION BY u64_wide ORDER by f64), \ MIN(f32) OVER (PARTITION BY u64_wide ORDER by f64), \ @@ -181,6 +190,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT \ MAX(f64) OVER (PARTITION BY u64_narrow ORDER by f64), \ MIN(f32) OVER (PARTITION BY u64_narrow ORDER by f64), \ @@ -197,6 +207,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT \ FIRST_VALUE(f64) OVER (PARTITION BY u64_wide ORDER by f64), \ LAST_VALUE(f32) OVER (PARTITION BY u64_wide ORDER by f64), \ @@ -213,6 +224,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { query( ctx.clone(), + &rt, "SELECT \ FIRST_VALUE(f64) OVER (PARTITION BY u64_narrow ORDER by f64), \ LAST_VALUE(f32) OVER (PARTITION BY u64_narrow ORDER by f64), \