Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions datafusion/core/benches/aggregate_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ use parking_lot::Mutex;
use std::sync::Arc;
use tokio::runtime::Runtime;

fn query(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
let rt = Runtime::new().unwrap();
fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
criterion::black_box(rt.block_on(df.collect()).unwrap());
}
Expand All @@ -51,11 +50,13 @@ fn criterion_benchmark(c: &mut Criterion) {
let array_len = 32768 * 2; // 2^16
let batch_size = 2048; // 2^11
let ctx = create_context(partitions_len, array_len, batch_size).unwrap();
let rt = Runtime::new().unwrap();

c.bench_function("aggregate_query_no_group_by 15 12", |b| {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT MIN(f64), AVG(f64), COUNT(f64) \
FROM t",
)
Expand All @@ -66,6 +67,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT MIN(f64), MAX(f64) \
FROM t",
)
Expand All @@ -76,6 +78,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT COUNT(DISTINCT u64_wide) \
FROM t",
)
Expand All @@ -86,6 +89,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT COUNT(DISTINCT u64_narrow) \
FROM t",
)
Expand All @@ -96,6 +100,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) \
FROM t GROUP BY utf8",
)
Expand All @@ -106,6 +111,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) \
FROM t \
WHERE f32 > 10 AND f32 < 20 GROUP BY utf8",
Expand All @@ -117,6 +123,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT u64_narrow, MIN(f64), AVG(f64), COUNT(f64) \
FROM t GROUP BY u64_narrow",
)
Expand All @@ -127,6 +134,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT u64_narrow, MIN(f64), AVG(f64), COUNT(f64) \
FROM t \
WHERE f32 > 10 AND f32 < 20 GROUP BY u64_narrow",
Expand All @@ -138,6 +146,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT u64_wide, utf8, MIN(f64), AVG(f64), COUNT(f64) \
FROM t GROUP BY u64_wide, utf8",
)
Expand All @@ -148,6 +157,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT utf8, approx_percentile_cont(u64_wide, 0.5, 2500) \
FROM t GROUP BY utf8",
)
Expand All @@ -158,6 +168,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT utf8, approx_percentile_cont(f32, 0.5, 2500) \
FROM t GROUP BY utf8",
)
Expand All @@ -168,6 +179,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT MEDIAN(DISTINCT u64_wide), MEDIAN(DISTINCT u64_narrow) \
FROM t",
)
Expand All @@ -178,6 +190,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT first_value(u64_wide order by f64, u64_narrow, utf8),\
last_value(u64_wide order by f64, u64_narrow, utf8) \
FROM t GROUP BY u64_narrow",
Expand All @@ -189,6 +202,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT first_value(u64_wide ignore nulls order by f64, u64_narrow, utf8), \
last_value(u64_wide ignore nulls order by f64, u64_narrow, utf8) \
FROM t GROUP BY u64_narrow",
Expand All @@ -200,6 +214,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT first_value(u64_wide order by f64), \
last_value(u64_wide order by f64) \
FROM t GROUP BY u64_narrow",
Expand Down
11 changes: 9 additions & 2 deletions datafusion/core/benches/csv_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ use std::time::Duration;
use test_utils::AccessLogGenerator;
use tokio::runtime::Runtime;

fn load_csv(ctx: Arc<Mutex<SessionContext>>, path: &str, options: CsvReadOptions) {
let rt = Runtime::new().unwrap();
fn load_csv(
ctx: Arc<Mutex<SessionContext>>,
rt: &Runtime,
path: &str,
options: CsvReadOptions,
) {
let df = rt.block_on(ctx.lock().read_csv(path, options)).unwrap();
criterion::black_box(rt.block_on(df.collect()).unwrap());
}
Expand Down Expand Up @@ -61,6 +65,7 @@ fn generate_test_file() -> TestCsvFile {

fn criterion_benchmark(c: &mut Criterion) {
let ctx = create_context().unwrap();
let rt = Runtime::new().unwrap();
let test_file = generate_test_file();

let mut group = c.benchmark_group("load csv testing");
Expand All @@ -70,6 +75,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
load_csv(
ctx.clone(),
&rt,
test_file.path().to_str().unwrap(),
CsvReadOptions::default(),
)
Expand All @@ -80,6 +86,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
load_csv(
ctx.clone(),
&rt,
test_file.path().to_str().unwrap(),
CsvReadOptions::default().null_regex(Some("^NULL$|^$".to_string())),
)
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/benches/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ fn create_context(field_count: u32) -> datafusion_common::Result<Arc<SessionCont
Ok(Arc::new(ctx))
}

fn run(column_count: u32, ctx: Arc<SessionContext>) {
let rt = Runtime::new().unwrap();

fn run(column_count: u32, ctx: Arc<SessionContext>, rt: &Runtime) {
criterion::black_box(rt.block_on(async {
let mut data_frame = ctx.table("t").await.unwrap();

Expand All @@ -67,11 +65,13 @@ fn run(column_count: u32, ctx: Arc<SessionContext>) {
}

fn criterion_benchmark(c: &mut Criterion) {
let rt = Runtime::new().unwrap();

for column_count in [10, 100, 200, 500] {
let ctx = create_context(column_count).unwrap();

c.bench_function(&format!("with_column_{column_count}"), |b| {
b.iter(|| run(column_count, ctx.clone()))
b.iter(|| run(column_count, ctx.clone(), &rt))
});
}
}
Expand Down
9 changes: 7 additions & 2 deletions datafusion/core/benches/distinct_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ use parking_lot::Mutex;
use std::{sync::Arc, time::Duration};
use tokio::runtime::Runtime;

fn query(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
let rt = Runtime::new().unwrap();
fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
criterion::black_box(rt.block_on(df.collect()).unwrap());
}
Expand All @@ -55,6 +54,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
let array_len = 1 << 26; // 64 M
let batch_size = 8192;
let ctx = create_context(partitions_len, array_len, batch_size).unwrap();
let rt = Runtime::new().unwrap();

let mut group = c.benchmark_group("custom-measurement-time");
group.measurement_time(Duration::from_secs(40));
Expand All @@ -63,6 +63,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 10",
)
})
Expand All @@ -72,6 +73,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 100",
)
})
Expand All @@ -81,6 +83,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 1000",
)
})
Expand All @@ -90,6 +93,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 10000",
)
})
Expand All @@ -99,6 +103,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
b.iter(|| {
query(
ctx.clone(),
&rt,
"SELECT u64_narrow, u64_wide, utf8, f64 FROM t GROUP BY 1, 2, 3, 4 LIMIT 10",
)
})
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/benches/filter_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ use futures::executor::block_on;
use std::sync::Arc;
use tokio::runtime::Runtime;

async fn query(ctx: &SessionContext, sql: &str) {
let rt = Runtime::new().unwrap();

async fn query(ctx: &SessionContext, rt: &Runtime, sql: &str) {
// execute the query
let df = rt.block_on(ctx.sql(sql)).unwrap();
criterion::black_box(rt.block_on(df.collect()).unwrap());
Expand Down Expand Up @@ -68,17 +66,19 @@ fn create_context(array_len: usize, batch_size: usize) -> Result<SessionContext>
fn criterion_benchmark(c: &mut Criterion) {
let array_len = 524_288; // 2^19
let batch_size = 4096; // 2^12
let rt = Runtime::new().unwrap();

c.bench_function("filter_array", |b| {
let ctx = create_context(array_len, batch_size).unwrap();
b.iter(|| block_on(query(&ctx, "select f32, f64 from t where f32 >= f64")))
b.iter(|| block_on(query(&ctx, &rt, "select f32, f64 from t where f32 >= f64")))
});

c.bench_function("filter_scalar", |b| {
let ctx = create_context(array_len, batch_size).unwrap();
b.iter(|| {
block_on(query(
&ctx,
&rt,
"select f32, f64 from t where f32 >= 250 and f64 > 250",
))
})
Expand All @@ -89,6 +89,7 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
block_on(query(
&ctx,
&rt,
"select f32, f64 from t where f32 in (10, 20, 30, 40)",
))
})
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/benches/math_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ use datafusion::datasource::MemTable;
use datafusion::error::Result;
use datafusion::execution::context::SessionContext;

fn query(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
let rt = Runtime::new().unwrap();

fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
// execute the query
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
rt.block_on(df.collect()).unwrap();
Expand Down Expand Up @@ -81,29 +79,31 @@ fn criterion_benchmark(c: &mut Criterion) {
let array_len = 1048576; // 2^20
let batch_size = 512; // 2^9
let ctx = create_context(array_len, batch_size).unwrap();
let rt = Runtime::new().unwrap();

c.bench_function("sqrt_20_9", |b| {
b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t"))
b.iter(|| query(ctx.clone(), &rt, "SELECT sqrt(f32) FROM t"))
});

let array_len = 1048576; // 2^20
let batch_size = 4096; // 2^12
let ctx = create_context(array_len, batch_size).unwrap();
c.bench_function("sqrt_20_12", |b| {
b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t"))
b.iter(|| query(ctx.clone(), &rt, "SELECT sqrt(f32) FROM t"))
});

let array_len = 4194304; // 2^22
let batch_size = 4096; // 2^12
let ctx = create_context(array_len, batch_size).unwrap();
c.bench_function("sqrt_22_12", |b| {
b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t"))
b.iter(|| query(ctx.clone(), &rt, "SELECT sqrt(f32) FROM t"))
});

let array_len = 4194304; // 2^22
let batch_size = 16384; // 2^14
let ctx = create_context(array_len, batch_size).unwrap();
c.bench_function("sqrt_22_14", |b| {
b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t"))
b.iter(|| query(ctx.clone(), &rt, "SELECT sqrt(f32) FROM t"))
});
}

Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/benches/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use datafusion_physical_expr_common::sort_expr::LexOrdering;
// as inputs. All record batches must have the same schema.
fn sort_preserving_merge_operator(
session_ctx: Arc<SessionContext>,
rt: &Runtime,
batches: Vec<RecordBatch>,
sort: &[&str],
) {
Expand All @@ -63,7 +64,6 @@ fn sort_preserving_merge_operator(
.unwrap();
let merge = Arc::new(SortPreservingMergeExec::new(sort, exec));
let task_ctx = session_ctx.task_ctx();
let rt = Runtime::new().unwrap();
rt.block_on(collect(merge, task_ctx)).unwrap();
}

Expand Down Expand Up @@ -166,14 +166,16 @@ fn criterion_benchmark(c: &mut Criterion) {
];

let ctx = Arc::new(SessionContext::new());
let rt = Runtime::new().unwrap();

for (name, input) in benches {
let ctx_clone = ctx.clone();
c.bench_function(name, move |b| {
c.bench_function(name, |b| {
b.iter_batched(
|| input.clone(),
|input| {
sort_preserving_merge_operator(
ctx_clone.clone(),
ctx.clone(),
&rt,
input,
&["a", "b", "c", "d"],
);
Expand Down
Loading