Skip to content

Commit a216aa5

Browse files
authored
Extract out super slow planning benchmark to it's own benchmark (#18388)
## Which issue does this PR close? - Closes #18366 ## Rationale for this change Speed up the running of sql planner benchmarks ## What changes are included in this PR? Extracted out the 'logical_plan_optimize' benchmark to its own file. ## Are these changes tested? Yes. ## Are there any user-facing changes? No.
1 parent b6e5732 commit a216aa5

File tree

3 files changed

+241
-194
lines changed

3 files changed

+241
-194
lines changed

datafusion/core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,10 @@ required-features = ["parquet"]
241241
harness = false
242242
name = "sql_planner"
243243

244+
[[bench]]
245+
harness = false
246+
name = "sql_planner_extended"
247+
244248
[[bench]]
245249
harness = false
246250
name = "sql_query_with_io"

datafusion/core/benches/sql_planner.rs

Lines changed: 3 additions & 194 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,11 @@ mod data_utils;
2525
use crate::criterion::Criterion;
2626
use arrow::array::{ArrayRef, RecordBatch};
2727
use arrow::datatypes::{DataType, Field, Fields, Schema};
28-
use arrow_schema::TimeUnit::Nanosecond;
2928
use criterion::Bencher;
3029
use datafusion::datasource::MemTable;
3130
use datafusion::execution::context::SessionContext;
32-
use datafusion::prelude::DataFrame;
3331
use datafusion_common::{config::Dialect, ScalarValue};
34-
use datafusion_expr::Expr::Literal;
35-
use datafusion_expr::{cast, col, lit, not, try_cast, when};
36-
use datafusion_functions::expr_fn::{
37-
btrim, length, regexp_like, regexp_replace, to_timestamp, upper,
38-
};
39-
use std::ops::Rem;
32+
use datafusion_expr::col;
4033
use std::path::PathBuf;
4134
use std::sync::Arc;
4235
use test_utils::tpcds::tpcds_schemas;
@@ -65,150 +58,6 @@ fn physical_plan(ctx: &SessionContext, rt: &Runtime, sql: &str) {
6558
}));
6659
}
6760

68-
/// Build a dataframe for testing logical plan optimization
69-
fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame {
70-
register_string_table(ctx, 100, 1000);
71-
72-
rt.block_on(async {
73-
let mut df = ctx.table("t").await.unwrap();
74-
// add some columns in
75-
for i in 100..150 {
76-
df = df
77-
.with_column(&format!("c{i}"), Literal(ScalarValue::Utf8(None), None))
78-
.unwrap();
79-
}
80-
// add in some columns with string encoded timestamps
81-
for i in 150..175 {
82-
df = df
83-
.with_column(
84-
&format!("c{i}"),
85-
Literal(ScalarValue::Utf8(Some("2025-08-21 09:43:17".into())), None),
86-
)
87-
.unwrap();
88-
}
89-
// do a bunch of ops on the columns
90-
for i in 0..175 {
91-
// trim the columns
92-
df = df
93-
.with_column(&format!("c{i}"), btrim(vec![col(format!("c{i}"))]))
94-
.unwrap();
95-
}
96-
97-
for i in 0..175 {
98-
let c_name = format!("c{i}");
99-
let c = col(&c_name);
100-
101-
// random ops
102-
if i % 5 == 0 && i < 150 {
103-
// the actual ops here are largely unimportant as they are just a sample
104-
// of ops that could occur on a dataframe
105-
df = df
106-
.with_column(&c_name, cast(c.clone(), DataType::Utf8))
107-
.unwrap()
108-
.with_column(
109-
&c_name,
110-
when(
111-
cast(c.clone(), DataType::Int32).gt(lit(135)),
112-
cast(
113-
cast(c.clone(), DataType::Int32) - lit(i + 3),
114-
DataType::Utf8,
115-
),
116-
)
117-
.otherwise(c.clone())
118-
.unwrap(),
119-
)
120-
.unwrap()
121-
.with_column(
122-
&c_name,
123-
when(
124-
c.clone().is_not_null().and(
125-
cast(c.clone(), DataType::Int32)
126-
.between(lit(120), lit(130)),
127-
),
128-
Literal(ScalarValue::Utf8(None), None),
129-
)
130-
.otherwise(
131-
when(
132-
c.clone().is_not_null().and(regexp_like(
133-
cast(c.clone(), DataType::Utf8View),
134-
lit("[0-9]*"),
135-
None,
136-
)),
137-
upper(c.clone()),
138-
)
139-
.otherwise(c.clone())
140-
.unwrap(),
141-
)
142-
.unwrap(),
143-
)
144-
.unwrap()
145-
.with_column(
146-
&c_name,
147-
when(
148-
c.clone().is_not_null().and(
149-
cast(c.clone(), DataType::Int32)
150-
.between(lit(90), lit(100)),
151-
),
152-
cast(c.clone(), DataType::Utf8View),
153-
)
154-
.otherwise(Literal(ScalarValue::Date32(None), None))
155-
.unwrap(),
156-
)
157-
.unwrap()
158-
.with_column(
159-
&c_name,
160-
when(
161-
c.clone().is_not_null().and(
162-
cast(c.clone(), DataType::Int32).rem(lit(10)).gt(lit(7)),
163-
),
164-
regexp_replace(
165-
cast(c.clone(), DataType::Utf8View),
166-
lit("1"),
167-
lit("a"),
168-
None,
169-
),
170-
)
171-
.otherwise(Literal(ScalarValue::Date32(None), None))
172-
.unwrap(),
173-
)
174-
.unwrap()
175-
}
176-
if i >= 150 {
177-
df = df
178-
.with_column(
179-
&c_name,
180-
try_cast(
181-
to_timestamp(vec![c.clone(), lit("%Y-%m-%d %H:%M:%S")]),
182-
DataType::Timestamp(Nanosecond, Some("UTC".into())),
183-
),
184-
)
185-
.unwrap()
186-
.with_column(&c_name, try_cast(c.clone(), DataType::Date32))
187-
.unwrap()
188-
}
189-
190-
// add in a few unions
191-
if i % 30 == 0 {
192-
let df1 = df
193-
.clone()
194-
.filter(length(c.clone()).gt(lit(2)))
195-
.unwrap()
196-
.with_column(&format!("c{i}_filtered"), lit(true))
197-
.unwrap();
198-
let df2 = df
199-
.filter(not(length(c.clone()).gt(lit(2))))
200-
.unwrap()
201-
.with_column(&format!("c{i}_filtered"), lit(false))
202-
.unwrap();
203-
204-
df = df1.union_by_name(df2).unwrap()
205-
}
206-
}
207-
208-
df
209-
})
210-
}
211-
21261
/// Create schema with the specified number of columns
21362
fn create_schema(column_prefix: &str, num_columns: usize) -> Schema {
21463
let fields: Fields = (0..num_columns)
@@ -334,33 +183,6 @@ fn register_union_order_table(ctx: &SessionContext, num_columns: usize, num_rows
334183
ctx.register_table("t", Arc::new(table)).unwrap();
335184
}
336185

337-
/// Registers a table like this:
338-
/// c0,c1,c2...,c99
339-
/// "0","100"..."9900"
340-
/// "0","200"..."19800"
341-
/// "0","300"..."29700"
342-
fn register_string_table(ctx: &SessionContext, num_columns: usize, num_rows: usize) {
343-
// ("c0", ["0", "0", ...])
344-
// ("c1": ["100", "200", ...])
345-
// etc
346-
let iter = (0..num_columns).map(|i| i as u64).map(|i| {
347-
let array: ArrayRef = Arc::new(arrow::array::StringViewArray::from_iter_values(
348-
(0..num_rows)
349-
.map(|j| format!("c{}", j as u64 * 100 + i))
350-
.collect::<Vec<_>>(),
351-
));
352-
(format!("c{i}"), array)
353-
});
354-
let batch = RecordBatch::try_from_iter(iter).unwrap();
355-
let schema = batch.schema();
356-
let partitions = vec![vec![batch]];
357-
358-
// create the table
359-
let table = MemTable::try_new(schema, partitions).unwrap();
360-
361-
ctx.register_table("t", Arc::new(table)).unwrap();
362-
}
363-
364186
/// return a query like
365187
/// ```sql
366188
/// select c1, 2 as c2, ... n as cn from t ORDER BY c1
@@ -579,7 +401,8 @@ fn criterion_benchmark(c: &mut Criterion) {
579401
});
580402

581403
// -- Sorted Queries --
582-
for column_count in [10, 50, 100, 200, 300] {
404+
// 100, 200 && 300 is taking too long - https://github.com/apache/datafusion/issues/18366
405+
for column_count in [10, 50 /* 100, 200, 300 */] {
583406
register_union_order_table(&ctx, column_count, 1000);
584407

585408
// this query has many expressions in its sort order so stresses
@@ -596,20 +419,6 @@ fn criterion_benchmark(c: &mut Criterion) {
596419
let _ = ctx.deregister_table("t");
597420
}
598421

599-
// -- validate logical plan optimize performance
600-
let df = build_test_data_frame(&ctx, &rt);
601-
602-
c.bench_function("logical_plan_optimize", |b| {
603-
b.iter(|| {
604-
let df_clone = df.clone();
605-
criterion::black_box(
606-
rt.block_on(async { df_clone.into_optimized_plan().unwrap() }),
607-
);
608-
})
609-
});
610-
611-
let _ = ctx.deregister_table("t");
612-
613422
// --- TPC-H ---
614423

615424
let tpch_ctx = register_defs(SessionContext::new(), tpch_schemas());

0 commit comments

Comments
 (0)