Skip to content

Commit bde9803

Browse files
authored
FIX : some benchmarks are failing (#15367)
* distinct_query_sql, topk_aggregate * cargo clippy * cargo fmt * share runtime
1 parent 615fba5 commit bde9803

File tree

2 files changed

+63
-63
lines changed

2 files changed

+63
-63
lines changed

datafusion/core/benches/distinct_query_sql.rs

Lines changed: 21 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,9 @@ async fn distinct_with_limit(
118118
Ok(())
119119
}
120120

121-
fn run(plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>) {
122-
let rt = Runtime::new().unwrap();
123-
criterion::black_box(
124-
rt.block_on(async { distinct_with_limit(plan.clone(), ctx.clone()).await }),
125-
)
126-
.unwrap();
121+
fn run(rt: &Runtime, plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>) {
122+
criterion::black_box(rt.block_on(distinct_with_limit(plan.clone(), ctx.clone())))
123+
.unwrap();
127124
}
128125

129126
pub async fn create_context_sampled_data(
@@ -145,58 +142,47 @@ pub async fn create_context_sampled_data(
145142

146143
fn criterion_benchmark_limited_distinct_sampled(c: &mut Criterion) {
147144
let rt = Runtime::new().unwrap();
148-
149145
let limit = 10;
150146
let partitions = 100;
151147
let samples = 100_000;
152148
let sql =
153149
format!("select DISTINCT trace_id from traces group by trace_id limit {limit};");
154-
155-
let distinct_trace_id_100_partitions_100_000_samples_limit_100 = rt.block_on(async {
156-
create_context_sampled_data(sql.as_str(), partitions, samples)
157-
.await
158-
.unwrap()
159-
});
160-
161150
c.bench_function(
162151
format!("distinct query with {} partitions and {} samples per partition with limit {}", partitions, samples, limit).as_str(),
163-
|b| b.iter(|| run(distinct_trace_id_100_partitions_100_000_samples_limit_100.0.clone(),
164-
distinct_trace_id_100_partitions_100_000_samples_limit_100.1.clone())),
152+
|b| b.iter(|| {
153+
let (plan, ctx) = rt.block_on(
154+
create_context_sampled_data(sql.as_str(), partitions, samples)
155+
).unwrap();
156+
run(&rt, plan.clone(), ctx.clone())
157+
}),
165158
);
166159

167160
let partitions = 10;
168161
let samples = 1_000_000;
169162
let sql =
170163
format!("select DISTINCT trace_id from traces group by trace_id limit {limit};");
171-
172-
let distinct_trace_id_10_partitions_1_000_000_samples_limit_10 = rt.block_on(async {
173-
create_context_sampled_data(sql.as_str(), partitions, samples)
174-
.await
175-
.unwrap()
176-
});
177-
178164
c.bench_function(
179165
format!("distinct query with {} partitions and {} samples per partition with limit {}", partitions, samples, limit).as_str(),
180-
|b| b.iter(|| run(distinct_trace_id_10_partitions_1_000_000_samples_limit_10.0.clone(),
181-
distinct_trace_id_10_partitions_1_000_000_samples_limit_10.1.clone())),
166+
|b| b.iter(|| {
167+
let (plan, ctx) = rt.block_on(
168+
create_context_sampled_data(sql.as_str(), partitions, samples)
169+
).unwrap();
170+
run(&rt, plan.clone(), ctx.clone())
171+
}),
182172
);
183173

184174
let partitions = 1;
185175
let samples = 10_000_000;
186176
let sql =
187177
format!("select DISTINCT trace_id from traces group by trace_id limit {limit};");
188-
189-
let rt = Runtime::new().unwrap();
190-
let distinct_trace_id_1_partition_10_000_000_samples_limit_10 = rt.block_on(async {
191-
create_context_sampled_data(sql.as_str(), partitions, samples)
192-
.await
193-
.unwrap()
194-
});
195-
196178
c.bench_function(
197179
format!("distinct query with {} partitions and {} samples per partition with limit {}", partitions, samples, limit).as_str(),
198-
|b| b.iter(|| run(distinct_trace_id_1_partition_10_000_000_samples_limit_10.0.clone(),
199-
distinct_trace_id_1_partition_10_000_000_samples_limit_10.1.clone())),
180+
|b| b.iter(|| {
181+
let (plan, ctx) = rt.block_on(
182+
create_context_sampled_data(sql.as_str(), partitions, samples)
183+
).unwrap();
184+
run(&rt, plan.clone(), ctx.clone())
185+
}),
200186
);
201187
}
202188

datafusion/core/benches/topk_aggregate.rs

Lines changed: 42 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@ async fn create_context(
5555
Ok((physical_plan, ctx.task_ctx()))
5656
}
5757

58-
fn run(plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>, asc: bool) {
59-
let rt = Runtime::new().unwrap();
58+
fn run(rt: &Runtime, plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>, asc: bool) {
6059
criterion::black_box(
6160
rt.block_on(async { aggregate(plan.clone(), ctx.clone(), asc).await }),
6261
)
@@ -99,40 +98,37 @@ async fn aggregate(
9998
}
10099

101100
fn criterion_benchmark(c: &mut Criterion) {
101+
let rt = Runtime::new().unwrap();
102102
let limit = 10;
103103
let partitions = 10;
104104
let samples = 1_000_000;
105105

106-
let rt = Runtime::new().unwrap();
107-
let topk_real = rt.block_on(async {
108-
create_context(limit, partitions, samples, false, true)
109-
.await
110-
.unwrap()
111-
});
112-
let topk_asc = rt.block_on(async {
113-
create_context(limit, partitions, samples, true, true)
114-
.await
115-
.unwrap()
116-
});
117-
let real = rt.block_on(async {
118-
create_context(limit, partitions, samples, false, false)
119-
.await
120-
.unwrap()
121-
});
122-
let asc = rt.block_on(async {
123-
create_context(limit, partitions, samples, true, false)
124-
.await
125-
.unwrap()
126-
});
127-
128106
c.bench_function(
129107
format!("aggregate {} time-series rows", partitions * samples).as_str(),
130-
|b| b.iter(|| run(real.0.clone(), real.1.clone(), false)),
108+
|b| {
109+
b.iter(|| {
110+
let real = rt.block_on(async {
111+
create_context(limit, partitions, samples, false, false)
112+
.await
113+
.unwrap()
114+
});
115+
run(&rt, real.0.clone(), real.1.clone(), false)
116+
})
117+
},
131118
);
132119

133120
c.bench_function(
134121
format!("aggregate {} worst-case rows", partitions * samples).as_str(),
135-
|b| b.iter(|| run(asc.0.clone(), asc.1.clone(), true)),
122+
|b| {
123+
b.iter(|| {
124+
let asc = rt.block_on(async {
125+
create_context(limit, partitions, samples, true, false)
126+
.await
127+
.unwrap()
128+
});
129+
run(&rt, asc.0.clone(), asc.1.clone(), true)
130+
})
131+
},
136132
);
137133

138134
c.bench_function(
@@ -141,7 +137,16 @@ fn criterion_benchmark(c: &mut Criterion) {
141137
partitions * samples
142138
)
143139
.as_str(),
144-
|b| b.iter(|| run(topk_real.0.clone(), topk_real.1.clone(), false)),
140+
|b| {
141+
b.iter(|| {
142+
let topk_real = rt.block_on(async {
143+
create_context(limit, partitions, samples, false, true)
144+
.await
145+
.unwrap()
146+
});
147+
run(&rt, topk_real.0.clone(), topk_real.1.clone(), false)
148+
})
149+
},
145150
);
146151

147152
c.bench_function(
@@ -150,7 +155,16 @@ fn criterion_benchmark(c: &mut Criterion) {
150155
partitions * samples
151156
)
152157
.as_str(),
153-
|b| b.iter(|| run(topk_asc.0.clone(), topk_asc.1.clone(), true)),
158+
|b| {
159+
b.iter(|| {
160+
let topk_asc = rt.block_on(async {
161+
create_context(limit, partitions, samples, true, true)
162+
.await
163+
.unwrap()
164+
});
165+
run(&rt, topk_asc.0.clone(), topk_asc.1.clone(), true)
166+
})
167+
},
154168
);
155169
}
156170

0 commit comments

Comments
 (0)