Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 21 additions & 35 deletions datafusion/core/benches/distinct_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,9 @@ async fn distinct_with_limit(
Ok(())
}

fn run(plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>) {
let rt = Runtime::new().unwrap();
criterion::black_box(
rt.block_on(async { distinct_with_limit(plan.clone(), ctx.clone()).await }),
)
.unwrap();
fn run(rt: &Runtime, plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>) {
criterion::black_box(rt.block_on(distinct_with_limit(plan.clone(), ctx.clone())))
.unwrap();
}

pub async fn create_context_sampled_data(
Expand All @@ -145,58 +142,47 @@ pub async fn create_context_sampled_data(

fn criterion_benchmark_limited_distinct_sampled(c: &mut Criterion) {
let rt = Runtime::new().unwrap();

let limit = 10;
let partitions = 100;
let samples = 100_000;
let sql =
format!("select DISTINCT trace_id from traces group by trace_id limit {limit};");

let distinct_trace_id_100_partitions_100_000_samples_limit_100 = rt.block_on(async {
create_context_sampled_data(sql.as_str(), partitions, samples)
.await
.unwrap()
});

c.bench_function(
format!("distinct query with {} partitions and {} samples per partition with limit {}", partitions, samples, limit).as_str(),
|b| b.iter(|| run(distinct_trace_id_100_partitions_100_000_samples_limit_100.0.clone(),
distinct_trace_id_100_partitions_100_000_samples_limit_100.1.clone())),
|b| b.iter(|| {
let (plan, ctx) = rt.block_on(
create_context_sampled_data(sql.as_str(), partitions, samples)
).unwrap();
run(&rt, plan.clone(), ctx.clone())
}),
);

let partitions = 10;
let samples = 1_000_000;
let sql =
format!("select DISTINCT trace_id from traces group by trace_id limit {limit};");

let distinct_trace_id_10_partitions_1_000_000_samples_limit_10 = rt.block_on(async {
create_context_sampled_data(sql.as_str(), partitions, samples)
.await
.unwrap()
});

c.bench_function(
format!("distinct query with {} partitions and {} samples per partition with limit {}", partitions, samples, limit).as_str(),
|b| b.iter(|| run(distinct_trace_id_10_partitions_1_000_000_samples_limit_10.0.clone(),
distinct_trace_id_10_partitions_1_000_000_samples_limit_10.1.clone())),
|b| b.iter(|| {
let (plan, ctx) = rt.block_on(
create_context_sampled_data(sql.as_str(), partitions, samples)
).unwrap();
run(&rt, plan.clone(), ctx.clone())
}),
);

let partitions = 1;
let samples = 10_000_000;
let sql =
format!("select DISTINCT trace_id from traces group by trace_id limit {limit};");

let rt = Runtime::new().unwrap();
let distinct_trace_id_1_partition_10_000_000_samples_limit_10 = rt.block_on(async {
create_context_sampled_data(sql.as_str(), partitions, samples)
.await
.unwrap()
});

c.bench_function(
format!("distinct query with {} partitions and {} samples per partition with limit {}", partitions, samples, limit).as_str(),
|b| b.iter(|| run(distinct_trace_id_1_partition_10_000_000_samples_limit_10.0.clone(),
distinct_trace_id_1_partition_10_000_000_samples_limit_10.1.clone())),
|b| b.iter(|| {
let (plan, ctx) = rt.block_on(
create_context_sampled_data(sql.as_str(), partitions, samples)
).unwrap();
run(&rt, plan.clone(), ctx.clone())
}),
);
}

Expand Down
70 changes: 42 additions & 28 deletions datafusion/core/benches/topk_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ async fn create_context(
Ok((physical_plan, ctx.task_ctx()))
}

fn run(plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>, asc: bool) {
let rt = Runtime::new().unwrap();
fn run(rt: &Runtime, plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>, asc: bool) {
criterion::black_box(
rt.block_on(async { aggregate(plan.clone(), ctx.clone(), asc).await }),
)
Expand Down Expand Up @@ -99,40 +98,37 @@ async fn aggregate(
}

fn criterion_benchmark(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let limit = 10;
let partitions = 10;
let samples = 1_000_000;

let rt = Runtime::new().unwrap();
let topk_real = rt.block_on(async {
create_context(limit, partitions, samples, false, true)
.await
.unwrap()
});
let topk_asc = rt.block_on(async {
create_context(limit, partitions, samples, true, true)
.await
.unwrap()
});
let real = rt.block_on(async {
create_context(limit, partitions, samples, false, false)
.await
.unwrap()
});
let asc = rt.block_on(async {
create_context(limit, partitions, samples, true, false)
.await
.unwrap()
});

c.bench_function(
format!("aggregate {} time-series rows", partitions * samples).as_str(),
|b| b.iter(|| run(real.0.clone(), real.1.clone(), false)),
|b| {
b.iter(|| {
let real = rt.block_on(async {
create_context(limit, partitions, samples, false, false)
.await
.unwrap()
});
run(&rt, real.0.clone(), real.1.clone(), false)
})
},
);

c.bench_function(
format!("aggregate {} worst-case rows", partitions * samples).as_str(),
|b| b.iter(|| run(asc.0.clone(), asc.1.clone(), true)),
|b| {
b.iter(|| {
let asc = rt.block_on(async {
create_context(limit, partitions, samples, true, false)
.await
.unwrap()
});
run(&rt, asc.0.clone(), asc.1.clone(), true)
})
},
);

c.bench_function(
Expand All @@ -141,7 +137,16 @@ fn criterion_benchmark(c: &mut Criterion) {
partitions * samples
)
.as_str(),
|b| b.iter(|| run(topk_real.0.clone(), topk_real.1.clone(), false)),
|b| {
b.iter(|| {
let topk_real = rt.block_on(async {
create_context(limit, partitions, samples, false, true)
.await
.unwrap()
});
run(&rt, topk_real.0.clone(), topk_real.1.clone(), false)
})
},
);

c.bench_function(
Expand All @@ -150,7 +155,16 @@ fn criterion_benchmark(c: &mut Criterion) {
partitions * samples
)
.as_str(),
|b| b.iter(|| run(topk_asc.0.clone(), topk_asc.1.clone(), true)),
|b| {
b.iter(|| {
let topk_asc = rt.block_on(async {
create_context(limit, partitions, samples, true, true)
.await
.unwrap()
});
run(&rt, topk_asc.0.clone(), topk_asc.1.clone(), true)
})
},
);
}

Expand Down