-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Round robin polling between tied winners in sort preserving merge #13133
Merged
Merged
Changes from 49 commits
Commits
Show all changes
50 commits
Select commit
Hold shift + click to select a range
4926799
first draft
jayzhan211 666f9fe
add data
jayzhan211 449c930
fix benchmark
jayzhan211 2566bab
add more bencmark data
jayzhan211 6000d87
fix benchmark
jayzhan211 5cd8733
fmt
jayzhan211 74d3d9b
get max size
jayzhan211 e628764
add license
jayzhan211 4277eec
rm code for merge
jayzhan211 625b925
cleanup
jayzhan211 e4f9bfe
cleanup
jayzhan211 3fa2e32
update poll count only we have tie
jayzhan211 e8da793
upd comment
jayzhan211 d22ba25
fix logic
jayzhan211 aadf69c
Merge branch 'rrt-spm' into rrt-spm-for-merge
jayzhan211 4b2a4ac
configurable
jayzhan211 920fe6a
fmt
jayzhan211 d80286c
add mem limit test
jayzhan211 750261a
rm test
jayzhan211 79a6df0
escape bracket
jayzhan211 afdd981
add test
jayzhan211 6358843
rm per consumer record
jayzhan211 c9337c6
repartition limit
jayzhan211 dbc1037
Merge branch 'main' of https://github.com/apache/datafusion into rrt-…
jayzhan211 bc81681
add benchmark
jayzhan211 e4970f5
cleanup
jayzhan211 e7abf68
benchmark with parameter
jayzhan211 7cb1198
only calculate consumer pool if the limit is set
jayzhan211 5f4c83e
combine eq and gt
jayzhan211 e418d6c
review part 1
berkaysynnada 288e2fe
Update merge.rs
berkaysynnada bca3bde
Merge branch 'main' of https://github.com/apache/datafusion into rrt-…
jayzhan211 23d5fc7
upd doc
jayzhan211 2d02aec
no need index comparison
jayzhan211 010f869
combine handle tie and eq check
jayzhan211 3605f85
upd doc
jayzhan211 18f86e8
fmt
jayzhan211 8adf14e
add more comment
jayzhan211 8d6c0a6
remove flag
jayzhan211 a18cba8
upd comment
jayzhan211 d2f3a84
Revert "remove flag"
jayzhan211 905eea7
Revert "upd comment"
jayzhan211 98135ee
add more comment
jayzhan211 1b418c4
add more comment
jayzhan211 8297f58
fmt
jayzhan211 b652802
simpliy mem pool
jayzhan211 f68fa9b
clippy
jayzhan211 b63a631
Update merge.rs
berkaysynnada 666b3fe
minor
berkaysynnada 003fce3
add comment
jayzhan211 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use std::sync::Arc; | ||
|
||
use arrow::record_batch::RecordBatch; | ||
use arrow_array::{ArrayRef, Int32Array, Int64Array, StringArray}; | ||
use datafusion_execution::TaskContext; | ||
use datafusion_physical_expr::expressions::col; | ||
use datafusion_physical_expr::PhysicalSortExpr; | ||
use datafusion_physical_plan::memory::MemoryExec; | ||
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; | ||
use datafusion_physical_plan::{collect, ExecutionPlan}; | ||
|
||
use criterion::async_executor::FuturesExecutor; | ||
use criterion::{black_box, criterion_group, criterion_main, Criterion}; | ||
|
||
fn generate_spm_for_round_robin_tie_breaker( | ||
has_same_value: bool, | ||
enable_round_robin_repartition: bool, | ||
batch_count: usize, | ||
partition_count: usize, | ||
) -> SortPreservingMergeExec { | ||
let row_size = 256; | ||
let rb = if has_same_value { | ||
let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size])); | ||
let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("a"); row_size])); | ||
let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![0; row_size])); | ||
RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap() | ||
} else { | ||
let v = (0i32..row_size as i32).collect::<Vec<_>>(); | ||
let a: ArrayRef = Arc::new(Int32Array::from(v)); | ||
|
||
// Use alphanumeric characters | ||
let charset: Vec<char> = | ||
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" | ||
.chars() | ||
.collect(); | ||
|
||
let mut strings = Vec::new(); | ||
for i in 0..256 { | ||
let mut s = String::new(); | ||
s.push(charset[i % charset.len()]); | ||
s.push(charset[(i / charset.len()) % charset.len()]); | ||
strings.push(Some(s)); | ||
} | ||
|
||
let b: ArrayRef = Arc::new(StringArray::from_iter(strings)); | ||
|
||
let v = (0i64..row_size as i64).collect::<Vec<_>>(); | ||
let c: ArrayRef = Arc::new(Int64Array::from_iter(v)); | ||
RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap() | ||
}; | ||
|
||
let rbs = (0..batch_count).map(|_| rb.clone()).collect::<Vec<_>>(); | ||
let partitiones = vec![rbs.clone(); partition_count]; | ||
|
||
let schema = rb.schema(); | ||
let sort = vec![ | ||
PhysicalSortExpr { | ||
expr: col("b", &schema).unwrap(), | ||
options: Default::default(), | ||
}, | ||
PhysicalSortExpr { | ||
expr: col("c", &schema).unwrap(), | ||
options: Default::default(), | ||
}, | ||
]; | ||
|
||
let exec = MemoryExec::try_new(&partitiones, schema, None).unwrap(); | ||
SortPreservingMergeExec::new(sort, Arc::new(exec)) | ||
.with_round_robin_repartition(enable_round_robin_repartition) | ||
} | ||
|
||
fn run_bench( | ||
c: &mut Criterion, | ||
has_same_value: bool, | ||
enable_round_robin_repartition: bool, | ||
batch_count: usize, | ||
partition_count: usize, | ||
description: &str, | ||
) { | ||
let task_ctx = TaskContext::default(); | ||
let task_ctx = Arc::new(task_ctx); | ||
|
||
let spm = Arc::new(generate_spm_for_round_robin_tie_breaker( | ||
has_same_value, | ||
enable_round_robin_repartition, | ||
batch_count, | ||
partition_count, | ||
)) as Arc<dyn ExecutionPlan>; | ||
|
||
c.bench_function(description, |b| { | ||
b.to_async(FuturesExecutor) | ||
.iter(|| black_box(collect(Arc::clone(&spm), Arc::clone(&task_ctx)))) | ||
}); | ||
} | ||
|
||
fn criterion_benchmark(c: &mut Criterion) { | ||
let params = [ | ||
(true, false, "low_card_without_tiebreaker"), // low cardinality, no tie breaker | ||
(true, true, "low_card_with_tiebreaker"), // low cardinality, with tie breaker | ||
(false, false, "high_card_without_tiebreaker"), // high cardinality, no tie breaker | ||
(false, true, "high_card_with_tiebreaker"), // high cardinality, with tie breaker | ||
]; | ||
|
||
let batch_counts = [1, 25, 625]; | ||
let partition_counts = [2, 8, 32]; | ||
|
||
for &(has_same_value, enable_round_robin_repartition, cardinality_label) in ¶ms { | ||
for &batch_count in &batch_counts { | ||
for &partition_count in &partition_counts { | ||
let description = format!( | ||
"{}_batch_count_{}_partition_count_{}", | ||
cardinality_label, batch_count, partition_count | ||
); | ||
run_bench( | ||
c, | ||
has_same_value, | ||
enable_round_robin_repartition, | ||
batch_count, | ||
partition_count, | ||
&description, | ||
); | ||
} | ||
} | ||
} | ||
} | ||
|
||
criterion_group!(benches, criterion_benchmark); | ||
criterion_main!(benches); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are already some SortPreservingMerge benchmarks in
datafusion/datafusion/core/benches/sort.rs
Line 161 in 223bb02
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we done this in follow on PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have suggested to @jayzhan211 to take the first steps in creating operator-specific benchmarks. I believe there's already a goal for this (I recall an older issue related to it). Perhaps we should extract these benchmarks from core and port them here @alamb ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Operator specific benchmarks sounds good to me -- I think it is very valuable to the have the benchmarks for the same operator together so they can be found / don't get duplicated