Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ required-features = ["parquet"]
harness = false
name = "sql_planner"

[[bench]]
harness = false
name = "sql_planner_extended"

[[bench]]
harness = false
name = "sql_query_with_io"
Expand Down
197 changes: 3 additions & 194 deletions datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,11 @@ mod data_utils;
use crate::criterion::Criterion;
use arrow::array::{ArrayRef, RecordBatch};
use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow_schema::TimeUnit::Nanosecond;
use criterion::Bencher;
use datafusion::datasource::MemTable;
use datafusion::execution::context::SessionContext;
use datafusion::prelude::DataFrame;
use datafusion_common::{config::Dialect, ScalarValue};
use datafusion_expr::Expr::Literal;
use datafusion_expr::{cast, col, lit, not, try_cast, when};
use datafusion_functions::expr_fn::{
btrim, length, regexp_like, regexp_replace, to_timestamp, upper,
};
use std::ops::Rem;
use datafusion_expr::col;
use std::path::PathBuf;
use std::sync::Arc;
use test_utils::tpcds::tpcds_schemas;
Expand Down Expand Up @@ -65,150 +58,6 @@ fn physical_plan(ctx: &SessionContext, rt: &Runtime, sql: &str) {
}));
}

/// Build a dataframe for testing logical plan optimization
fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame {
register_string_table(ctx, 100, 1000);

rt.block_on(async {
let mut df = ctx.table("t").await.unwrap();
// add some columns in
for i in 100..150 {
df = df
.with_column(&format!("c{i}"), Literal(ScalarValue::Utf8(None), None))
.unwrap();
}
// add in some columns with string encoded timestamps
for i in 150..175 {
df = df
.with_column(
&format!("c{i}"),
Literal(ScalarValue::Utf8(Some("2025-08-21 09:43:17".into())), None),
)
.unwrap();
}
// do a bunch of ops on the columns
for i in 0..175 {
// trim the columns
df = df
.with_column(&format!("c{i}"), btrim(vec![col(format!("c{i}"))]))
.unwrap();
}

for i in 0..175 {
let c_name = format!("c{i}");
let c = col(&c_name);

// random ops
if i % 5 == 0 && i < 150 {
// the actual ops here are largely unimportant as they are just a sample
// of ops that could occur on a dataframe
df = df
.with_column(&c_name, cast(c.clone(), DataType::Utf8))
.unwrap()
.with_column(
&c_name,
when(
cast(c.clone(), DataType::Int32).gt(lit(135)),
cast(
cast(c.clone(), DataType::Int32) - lit(i + 3),
DataType::Utf8,
),
)
.otherwise(c.clone())
.unwrap(),
)
.unwrap()
.with_column(
&c_name,
when(
c.clone().is_not_null().and(
cast(c.clone(), DataType::Int32)
.between(lit(120), lit(130)),
),
Literal(ScalarValue::Utf8(None), None),
)
.otherwise(
when(
c.clone().is_not_null().and(regexp_like(
cast(c.clone(), DataType::Utf8View),
lit("[0-9]*"),
None,
)),
upper(c.clone()),
)
.otherwise(c.clone())
.unwrap(),
)
.unwrap(),
)
.unwrap()
.with_column(
&c_name,
when(
c.clone().is_not_null().and(
cast(c.clone(), DataType::Int32)
.between(lit(90), lit(100)),
),
cast(c.clone(), DataType::Utf8View),
)
.otherwise(Literal(ScalarValue::Date32(None), None))
.unwrap(),
)
.unwrap()
.with_column(
&c_name,
when(
c.clone().is_not_null().and(
cast(c.clone(), DataType::Int32).rem(lit(10)).gt(lit(7)),
),
regexp_replace(
cast(c.clone(), DataType::Utf8View),
lit("1"),
lit("a"),
None,
),
)
.otherwise(Literal(ScalarValue::Date32(None), None))
.unwrap(),
)
.unwrap()
}
if i >= 150 {
df = df
.with_column(
&c_name,
try_cast(
to_timestamp(vec![c.clone(), lit("%Y-%m-%d %H:%M:%S")]),
DataType::Timestamp(Nanosecond, Some("UTC".into())),
),
)
.unwrap()
.with_column(&c_name, try_cast(c.clone(), DataType::Date32))
.unwrap()
}

// add in a few unions
if i % 30 == 0 {
let df1 = df
.clone()
.filter(length(c.clone()).gt(lit(2)))
.unwrap()
.with_column(&format!("c{i}_filtered"), lit(true))
.unwrap();
let df2 = df
.filter(not(length(c.clone()).gt(lit(2))))
.unwrap()
.with_column(&format!("c{i}_filtered"), lit(false))
.unwrap();

df = df1.union_by_name(df2).unwrap()
}
}

df
})
}

/// Create schema with the specified number of columns
fn create_schema(column_prefix: &str, num_columns: usize) -> Schema {
let fields: Fields = (0..num_columns)
Expand Down Expand Up @@ -334,33 +183,6 @@ fn register_union_order_table(ctx: &SessionContext, num_columns: usize, num_rows
ctx.register_table("t", Arc::new(table)).unwrap();
}

/// Registers a table like this:
/// c0,c1,c2...,c99
/// "0","100"..."9900"
/// "0","200"..."19800"
/// "0","300"..."29700"
fn register_string_table(ctx: &SessionContext, num_columns: usize, num_rows: usize) {
// ("c0", ["0", "0", ...])
// ("c1": ["100", "200", ...])
// etc
let iter = (0..num_columns).map(|i| i as u64).map(|i| {
let array: ArrayRef = Arc::new(arrow::array::StringViewArray::from_iter_values(
(0..num_rows)
.map(|j| format!("c{}", j as u64 * 100 + i))
.collect::<Vec<_>>(),
));
(format!("c{i}"), array)
});
let batch = RecordBatch::try_from_iter(iter).unwrap();
let schema = batch.schema();
let partitions = vec![vec![batch]];

// create the table
let table = MemTable::try_new(schema, partitions).unwrap();

ctx.register_table("t", Arc::new(table)).unwrap();
}

/// return a query like
/// ```sql
/// select c1, 2 as c2, ... n as cn from t ORDER BY c1
Expand Down Expand Up @@ -579,7 +401,8 @@ fn criterion_benchmark(c: &mut Criterion) {
});

// -- Sorted Queries --
for column_count in [10, 50, 100, 200, 300] {
// 100, 200 && 300 is taking too long - https://github.com/apache/datafusion/issues/18366
for column_count in [10, 50 /* 100, 200, 300 */] {
register_union_order_table(&ctx, column_count, 1000);

// this query has many expressions in its sort order so stresses
Expand All @@ -596,20 +419,6 @@ fn criterion_benchmark(c: &mut Criterion) {
let _ = ctx.deregister_table("t");
}

// -- validate logical plan optimize performance
let df = build_test_data_frame(&ctx, &rt);

c.bench_function("logical_plan_optimize", |b| {
b.iter(|| {
let df_clone = df.clone();
criterion::black_box(
rt.block_on(async { df_clone.into_optimized_plan().unwrap() }),
);
})
});

let _ = ctx.deregister_table("t");

// --- TPC-H ---

let tpch_ctx = register_defs(SessionContext::new(), tpch_schemas());
Expand Down
Loading