From fc7f320f264f747425d46c94bc8104ceeca18be3 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Thu, 30 Oct 2025 13:16:04 -0400 Subject: [PATCH 1/3] Extract out super slow planning benchmark to it's own benchmark. #18366 --- datafusion/core/Cargo.toml | 4 + datafusion/core/benches/sql_planner.rs | 197 +-------------- .../core/benches/sql_planner_extended.rs | 225 ++++++++++++++++++ 3 files changed, 232 insertions(+), 194 deletions(-) create mode 100644 datafusion/core/benches/sql_planner_extended.rs diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 22c9f43a902e..f672e3a94681 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -241,6 +241,10 @@ required-features = ["parquet"] harness = false name = "sql_planner" +[[bench]] +harness = false +name = "sql_planner_extended" + [[bench]] harness = false name = "sql_query_with_io" diff --git a/datafusion/core/benches/sql_planner.rs b/datafusion/core/benches/sql_planner.rs index 83563099cad6..7bfe326aeff6 100644 --- a/datafusion/core/benches/sql_planner.rs +++ b/datafusion/core/benches/sql_planner.rs @@ -25,18 +25,11 @@ mod data_utils; use crate::criterion::Criterion; use arrow::array::{ArrayRef, RecordBatch}; use arrow::datatypes::{DataType, Field, Fields, Schema}; -use arrow_schema::TimeUnit::Nanosecond; use criterion::Bencher; use datafusion::datasource::MemTable; use datafusion::execution::context::SessionContext; -use datafusion::prelude::DataFrame; use datafusion_common::{config::Dialect, ScalarValue}; -use datafusion_expr::Expr::Literal; -use datafusion_expr::{cast, col, lit, not, try_cast, when}; -use datafusion_functions::expr_fn::{ - btrim, length, regexp_like, regexp_replace, to_timestamp, upper, -}; -use std::ops::Rem; +use datafusion_expr::col; use std::path::PathBuf; use std::sync::Arc; use test_utils::tpcds::tpcds_schemas; @@ -65,150 +58,6 @@ fn physical_plan(ctx: &SessionContext, rt: &Runtime, sql: &str) { })); } -/// Build a dataframe for testing logical plan optimization -fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame { - register_string_table(ctx, 100, 1000); - - rt.block_on(async { - let mut df = ctx.table("t").await.unwrap(); - // add some columns in - for i in 100..150 { - df = df - .with_column(&format!("c{i}"), Literal(ScalarValue::Utf8(None), None)) - .unwrap(); - } - // add in some columns with string encoded timestamps - for i in 150..175 { - df = df - .with_column( - &format!("c{i}"), - Literal(ScalarValue::Utf8(Some("2025-08-21 09:43:17".into())), None), - ) - .unwrap(); - } - // do a bunch of ops on the columns - for i in 0..175 { - // trim the columns - df = df - .with_column(&format!("c{i}"), btrim(vec![col(format!("c{i}"))])) - .unwrap(); - } - - for i in 0..175 { - let c_name = format!("c{i}"); - let c = col(&c_name); - - // random ops - if i % 5 == 0 && i < 150 { - // the actual ops here are largely unimportant as they are just a sample - // of ops that could occur on a dataframe - df = df - .with_column(&c_name, cast(c.clone(), DataType::Utf8)) - .unwrap() - .with_column( - &c_name, - when( - cast(c.clone(), DataType::Int32).gt(lit(135)), - cast( - cast(c.clone(), DataType::Int32) - lit(i + 3), - DataType::Utf8, - ), - ) - .otherwise(c.clone()) - .unwrap(), - ) - .unwrap() - .with_column( - &c_name, - when( - c.clone().is_not_null().and( - cast(c.clone(), DataType::Int32) - .between(lit(120), lit(130)), - ), - Literal(ScalarValue::Utf8(None), None), - ) - .otherwise( - when( - c.clone().is_not_null().and(regexp_like( - cast(c.clone(), DataType::Utf8View), - lit("[0-9]*"), - None, - )), - upper(c.clone()), - ) - .otherwise(c.clone()) - .unwrap(), - ) - .unwrap(), - ) - .unwrap() - .with_column( - &c_name, - when( - c.clone().is_not_null().and( - cast(c.clone(), DataType::Int32) - .between(lit(90), lit(100)), - ), - cast(c.clone(), DataType::Utf8View), - ) - .otherwise(Literal(ScalarValue::Date32(None), None)) - .unwrap(), - ) - .unwrap() - .with_column( - &c_name, - when( - c.clone().is_not_null().and( - cast(c.clone(), DataType::Int32).rem(lit(10)).gt(lit(7)), - ), - regexp_replace( - cast(c.clone(), DataType::Utf8View), - lit("1"), - lit("a"), - None, - ), - ) - .otherwise(Literal(ScalarValue::Date32(None), None)) - .unwrap(), - ) - .unwrap() - } - if i >= 150 { - df = df - .with_column( - &c_name, - try_cast( - to_timestamp(vec![c.clone(), lit("%Y-%m-%d %H:%M:%S")]), - DataType::Timestamp(Nanosecond, Some("UTC".into())), - ), - ) - .unwrap() - .with_column(&c_name, try_cast(c.clone(), DataType::Date32)) - .unwrap() - } - - // add in a few unions - if i % 30 == 0 { - let df1 = df - .clone() - .filter(length(c.clone()).gt(lit(2))) - .unwrap() - .with_column(&format!("c{i}_filtered"), lit(true)) - .unwrap(); - let df2 = df - .filter(not(length(c.clone()).gt(lit(2)))) - .unwrap() - .with_column(&format!("c{i}_filtered"), lit(false)) - .unwrap(); - - df = df1.union_by_name(df2).unwrap() - } - } - - df - }) -} - /// Create schema with the specified number of columns fn create_schema(column_prefix: &str, num_columns: usize) -> Schema { let fields: Fields = (0..num_columns) @@ -334,33 +183,6 @@ fn register_union_order_table(ctx: &SessionContext, num_columns: usize, num_rows ctx.register_table("t", Arc::new(table)).unwrap(); } -/// Registers a table like this: -/// c0,c1,c2...,c99 -/// "0","100"..."9900" -/// "0","200"..."19800" -/// "0","300"..."29700" -fn register_string_table(ctx: &SessionContext, num_columns: usize, num_rows: usize) { - // ("c0", ["0", "0", ...]) - // ("c1": ["100", "200", ...]) - // etc - let iter = (0..num_columns).map(|i| i as u64).map(|i| { - let array: ArrayRef = Arc::new(arrow::array::StringViewArray::from_iter_values( - (0..num_rows) - .map(|j| format!("c{}", j as u64 * 100 + i)) - .collect::>(), - )); - (format!("c{i}"), array) - }); - let batch = RecordBatch::try_from_iter(iter).unwrap(); - let schema = batch.schema(); - let partitions = vec![vec![batch]]; - - // create the table - let table = MemTable::try_new(schema, partitions).unwrap(); - - ctx.register_table("t", Arc::new(table)).unwrap(); -} - /// return a query like /// ```sql /// select c1, 2 as c2, ... n as cn from t ORDER BY c1 @@ -579,7 +401,8 @@ fn criterion_benchmark(c: &mut Criterion) { }); // -- Sorted Queries -- - for column_count in [10, 50, 100, 200, 300] { + // 300 is taking too long - https://github.com/apache/datafusion/issues/18366 + for column_count in [10, 50, 100, 200 /*, 300 */] { register_union_order_table(&ctx, column_count, 1000); // this query has many expressions in its sort order so stresses @@ -596,20 +419,6 @@ fn criterion_benchmark(c: &mut Criterion) { let _ = ctx.deregister_table("t"); } - // -- validate logical plan optimize performance - let df = build_test_data_frame(&ctx, &rt); - - c.bench_function("logical_plan_optimize", |b| { - b.iter(|| { - let df_clone = df.clone(); - criterion::black_box( - rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), - ); - }) - }); - - let _ = ctx.deregister_table("t"); - // --- TPC-H --- let tpch_ctx = register_defs(SessionContext::new(), tpch_schemas()); diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs new file mode 100644 index 000000000000..e7355029bcfd --- /dev/null +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, RecordBatch}; +use arrow_schema::DataType; +use arrow_schema::TimeUnit::Nanosecond; +use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion::prelude::{DataFrame, SessionContext}; +use datafusion_catalog::MemTable; +use datafusion_common::ScalarValue; +use datafusion_expr::Expr::Literal; +use datafusion_expr::{cast, col, lit, not, try_cast, when}; +use datafusion_functions::expr_fn::{ + btrim, length, regexp_like, regexp_replace, to_timestamp, upper, +}; +use std::ops::Rem; +use std::sync::Arc; +use tokio::runtime::Runtime; + +/// Registers a table like this: +/// c0,c1,c2...,c99 +/// "0","100"..."9900" +/// "0","200"..."19800" +/// "0","300"..."29700" +fn register_string_table(ctx: &SessionContext, num_columns: usize, num_rows: usize) { + // ("c0", ["0", "0", ...]) + // ("c1": ["100", "200", ...]) + // etc + let iter = (0..num_columns).map(|i| i as u64).map(|i| { + let array: ArrayRef = Arc::new(arrow::array::StringViewArray::from_iter_values( + (0..num_rows) + .map(|j| format!("c{}", j as u64 * 100 + i)) + .collect::>(), + )); + (format!("c{i}"), array) + }); + let batch = RecordBatch::try_from_iter(iter).unwrap(); + let schema = batch.schema(); + let partitions = vec![vec![batch]]; + + // create the table + let table = MemTable::try_new(schema, partitions).unwrap(); + + ctx.register_table("t", Arc::new(table)).unwrap(); +} + +/// Build a dataframe for testing logical plan optimization +fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame { + register_string_table(ctx, 100, 1000); + + rt.block_on(async { + let mut df = ctx.table("t").await.unwrap(); + // add some columns in + for i in 100..150 { + df = df + .with_column(&format!("c{i}"), Literal(ScalarValue::Utf8(None), None)) + .unwrap(); + } + // add in some columns with string encoded timestamps + for i in 150..175 { + df = df + .with_column( + &format!("c{i}"), + Literal(ScalarValue::Utf8(Some("2025-08-21 09:43:17".into())), None), + ) + .unwrap(); + } + // do a bunch of ops on the columns + for i in 0..175 { + // trim the columns + df = df + .with_column(&format!("c{i}"), btrim(vec![col(format!("c{i}"))])) + .unwrap(); + } + + for i in 0..175 { + let c_name = format!("c{i}"); + let c = col(&c_name); + + // random ops + if i % 5 == 0 && i < 150 { + // the actual ops here are largely unimportant as they are just a sample + // of ops that could occur on a dataframe + df = df + .with_column(&c_name, cast(c.clone(), DataType::Utf8)) + .unwrap() + .with_column( + &c_name, + when( + cast(c.clone(), DataType::Int32).gt(lit(135)), + cast( + cast(c.clone(), DataType::Int32) - lit(i + 3), + DataType::Utf8, + ), + ) + .otherwise(c.clone()) + .unwrap(), + ) + .unwrap() + .with_column( + &c_name, + when( + c.clone().is_not_null().and( + cast(c.clone(), DataType::Int32) + .between(lit(120), lit(130)), + ), + Literal(ScalarValue::Utf8(None), None), + ) + .otherwise( + when( + c.clone().is_not_null().and(regexp_like( + cast(c.clone(), DataType::Utf8View), + lit("[0-9]*"), + None, + )), + upper(c.clone()), + ) + .otherwise(c.clone()) + .unwrap(), + ) + .unwrap(), + ) + .unwrap() + .with_column( + &c_name, + when( + c.clone().is_not_null().and( + cast(c.clone(), DataType::Int32) + .between(lit(90), lit(100)), + ), + cast(c.clone(), DataType::Utf8View), + ) + .otherwise(Literal(ScalarValue::Date32(None), None)) + .unwrap(), + ) + .unwrap() + .with_column( + &c_name, + when( + c.clone().is_not_null().and( + cast(c.clone(), DataType::Int32).rem(lit(10)).gt(lit(7)), + ), + regexp_replace( + cast(c.clone(), DataType::Utf8View), + lit("1"), + lit("a"), + None, + ), + ) + .otherwise(Literal(ScalarValue::Date32(None), None)) + .unwrap(), + ) + .unwrap() + } + if i >= 150 { + df = df + .with_column( + &c_name, + try_cast( + to_timestamp(vec![c.clone(), lit("%Y-%m-%d %H:%M:%S")]), + DataType::Timestamp(Nanosecond, Some("UTC".into())), + ), + ) + .unwrap() + .with_column(&c_name, try_cast(c.clone(), DataType::Date32)) + .unwrap() + } + + // add in a few unions + if i % 30 == 0 { + let df1 = df + .clone() + .filter(length(c.clone()).gt(lit(2))) + .unwrap() + .with_column(&format!("c{i}_filtered"), lit(true)) + .unwrap(); + let df2 = df + .filter(not(length(c.clone()).gt(lit(2)))) + .unwrap() + .with_column(&format!("c{i}_filtered"), lit(false)) + .unwrap(); + + df = df1.union_by_name(df2).unwrap() + } + } + + df + }) +} + +fn criterion_benchmark(c: &mut Criterion) { + let ctx = SessionContext::new(); + let rt = Runtime::new().unwrap(); + + // validate logical plan optimize performance + // https://github.com/apache/datafusion/issues/17261 + + let df = build_test_data_frame(&ctx, &rt); + + c.bench_function("logical_plan_optimize", |b| { + b.iter(|| { + let df_clone = df.clone(); + criterion::black_box( + rt.block_on(async { df_clone.into_optimized_plan().unwrap() }), + ); + }) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); From c3701622efb53df6338cd8e4574f466e94a26f3a Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Thu, 30 Oct 2025 16:23:26 -0400 Subject: [PATCH 2/3] Adding comments to extended explaining its reason for existing. Commented out 100 && 200 columns in physical_sorted_union_order_by_ benchmarks. --- datafusion/core/benches/sql_planner.rs | 4 ++-- datafusion/core/benches/sql_planner_extended.rs | 9 +++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/datafusion/core/benches/sql_planner.rs b/datafusion/core/benches/sql_planner.rs index 7bfe326aeff6..a3044006cbb4 100644 --- a/datafusion/core/benches/sql_planner.rs +++ b/datafusion/core/benches/sql_planner.rs @@ -401,8 +401,8 @@ fn criterion_benchmark(c: &mut Criterion) { }); // -- Sorted Queries -- - // 300 is taking too long - https://github.com/apache/datafusion/issues/18366 - for column_count in [10, 50, 100, 200 /*, 300 */] { + // 100, 200 && 300 is taking too long - https://github.com/apache/datafusion/issues/18366 + for column_count in [10, 50 /* 100, 200, 300 */] { register_union_order_table(&ctx, column_count, 1000); // this query has many expressions in its sort order so stresses diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index e7355029bcfd..b55b2508f7c9 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -31,6 +31,15 @@ use std::ops::Rem; use std::sync::Arc; use tokio::runtime::Runtime; +/// This benchmark suite is designed to test the performance of +/// logical planning with a large plan containing unions, many columns +/// with a variety of operations in it. +/// +/// Since it is (currently) very slow to execute it has been separated +/// out from the sql_planner benchmark suite to this file. +/// +/// See https://github.com/apache/datafusion/issues/17261 for details. + /// Registers a table like this: /// c0,c1,c2...,c99 /// "0","100"..."9900" From 1029a6c99195e95bc0822f42e6cb770d6b10e24a Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Thu, 30 Oct 2025 17:27:49 -0400 Subject: [PATCH 3/3] We love clippy. doccomment -> comment. --- datafusion/core/benches/sql_planner_extended.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index b55b2508f7c9..9e665ef40d2c 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -31,14 +31,14 @@ use std::ops::Rem; use std::sync::Arc; use tokio::runtime::Runtime; -/// This benchmark suite is designed to test the performance of -/// logical planning with a large plan containing unions, many columns -/// with a variety of operations in it. -/// -/// Since it is (currently) very slow to execute it has been separated -/// out from the sql_planner benchmark suite to this file. -/// -/// See https://github.com/apache/datafusion/issues/17261 for details. +// This benchmark suite is designed to test the performance of +// logical planning with a large plan containing unions, many columns +// with a variety of operations in it. +// +// Since it is (currently) very slow to execute it has been separated +// out from the sql_planner benchmark suite to this file. +// +// See https://github.com/apache/datafusion/issues/17261 for details. /// Registers a table like this: /// c0,c1,c2...,c99