-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Round robin polling between tied winners in sort preserving merge #13133
Conversation
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
I revert the configuration change, so now if you prefer the old behaviour you can set to false with |
use tokio::time::timeout; | ||
|
||
fn generate_task_ctx_for_round_robin_tie_breaker() -> Result<Arc<TaskContext>> { | ||
let mut pool_per_consumer = HashMap::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Previous conversation for the context https://github.com/apache/datafusion/pull/13133/files#r1818369721)
I tried to remove the per-operator memory limit, and set the total limit to 20M, then rerun two unit tests in this file.
let mut pool_per_consumer = HashMap::new();
// Bytes from 660_000 to 30_000_000 (or even more) are all valid limits
// pool_per_consumer.insert("RepartitionExec[0]".to_string(), 10_000_000);
// pool_per_consumer.insert("RepartitionExec[1]".to_string(), 10_000_000);
let runtime = RuntimeEnvBuilder::new()
// Random large number for total mem limit, we only care about RepartitionExec only
.with_memory_limit_per_consumer(20_000_000, 1.0, pool_per_consumer)
.build_arc()?;
test_round_robin_tie_breaker_fail
and test_round_robin_tie_breaker_success
all passed, the error message for fail case is: Expected error: ResourcesExhausted("Additional allocation failed with top memory consumers (across reservations) as: RepartitionExec[1] consumed 18206496 bytes, SortPreservingMergeExec[0] consumed 1446684 bytes, RepartitionExec[0] consumed 216744 bytes. Error: Failed to allocate additional 216744 bytes for SortPreservingMergeExec[0] with 433488 bytes already allocated for this reservation - 130076 bytes remain available for the total pool")
I think this error message is expected: SPM only reserved constant memory, and RepartitionExec
's memory consumption indeed keeps growing.
If that's the case memory pool related changes are not necessary any more, I'd prefer to remove them from this PR.
I think setting per-consumer memory limits by name makes the API less user-friendly and increases the risk of implementation issues with future changes. Perhaps we can explore a better solution later if a more suitable use case arises.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, thank you. I found it quite hard to find the number so I ends up to measure specific Exec
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Makes sense -- I will make a PR to clarify the behavior
I agree it makes sense to use this as the default policy |
It seems most of the issue are addressed. I plan to merge this today 🚀 |
I am taking a final look now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are all agreed on the final state—thanks again @jayzhan211! I have just one minor issue with a comment and a potential improvement in the test.
I was considering whether this commit might bring us too close to run on the edge points of the test, but I concluded it shouldn’t be a problem, as our test runs at both ends of the polling spectrum.
cc @ozankabak
// If the winner doesn't survive in the final match, it means the value has changed. | ||
// The polls count are outdated (because the value advanced) but not yet cleaned-up at this point. | ||
// Given the value is equal, we choose the smaller index if the value is the same. | ||
self.update_winner(cmp_node, winner, challenger); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jayzhan211 This comment confused me. This line is executed when the challenger index is smaller and poll counts are equal. Does it also pass through here when winner does not survive because its value has changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is executed when the challenger index is smaller and poll counts are equal
Poll counts may not be the same, when we have new winner, it should have a different value than the previous tie breaker round. If the value is the same, we will find that the winner survives at the final match, then we need to check the poll counts to select the one.
When we reach the code here, the new winner has the same value with the challenger, but it has different value than the original winner (self.loser_tree[0]
). In this case, we just need to compare with the index since
this should be the new round of the tie breaker, polls count doesn't change the result.
use criterion::async_executor::FuturesExecutor; | ||
use criterion::{black_box, criterion_group, criterion_main, Criterion}; | ||
|
||
fn generate_spm_for_round_robin_tie_breaker( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have suggested to @jayzhan211 to take the first steps in creating operator-specific benchmarks. I believe there's already a goal for this (I recall an older issue related to it). Perhaps we should extract these benchmarks from core and port them here @alamb ?
} | ||
fn generate_spm_for_round_robin_tie_breaker() -> Result<Arc<SortPreservingMergeExec>> | ||
{ | ||
let target_batch_size = 12500; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These numbers and the memory limit in these tests are actually correlated constants and can’t be adjusted independently. Could we encapsulate or link them somehow to emphasize this dependency @jayzhan211 ?
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
I ran the benchmarks and have basically the same results
|
I also verified that some of these queries that got faster actually use SortPreservingMerge which they do:
|
I am just running the |
Not sure what to make of the sort benchmark results (looks like this PR would slow down in some cases) I will also try to run this to compare main/main to see how noisy it is:
|
I suspect they are noisy but let's make sure we don't hurt anything. I think we can merge after we clarify that |
Here is the same benchmark run against a copy of main.
This looks a bit more stable. Given that this change is to the inner loop of SortPreservingMerge it wouldn't surprise me if it got somewhat slower. That being said I think we could also merge this PR and improve performance as a follow on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given all the effort that has gone into this PR I think we can merge it. I will try and do some follow ons to add some tests that mimic what we rely on in InfluxDB to make sure this won't introduce regressions for us.
Thank you @jayzhan211 @berkaysynnada and @Dandandan
Thank you all so much 🚀 |
Which issue does this PR close?
Closes #12231.
Rationale for this change
To address the issue of unbalanced polling between partitions due to tie-breakers being based on partition index, especially in cases of low cardinality, we are making changes to the winner selection mechanism. Previously, partitions with smaller indices were consistently chosen as the winners, leading to an uneven distribution of polling. This caused upstream operator buffers for the other partitions to grow excessively, as they continued receiving data without consuming it.
For example, an upstream operator like a repartition execution would keep sending data to certain partitions, but those partitions wouldn't consume the data if they weren't selected as winners. This resulted in inefficient buffer usage.
To resolve this, we are modifying the tie-breaking logic. Instead of always choosing the partition with the smallest index, we now select the partition that has the fewest poll counts for the same value. This ensures that multiple partitions with the same value are chosen equally, distributing the polling load in a round-robin fashion. This approach balances the workload more effectively across partitions and avoids excessive buffer growth.
What changes are included in this PR?
Are these changes tested?
Test in
physical-plan/src/sorts/sort_preserving_merge.rs
test_round_robin_tie_breaker_success
andtest_round_robin_tie_breaker_fail
ensure the change reduces the memory usageBenmark
Existing benchmark
SPM benchmark
Summary
Performance
I run the benchmark couple of times since it varies each time, I find there is clear improvement for some query like clickbench q32 & q34. I think it only shows difference in larger dataset and extreme cases.
Memory usage
The memory issue is what we focus on and we also observe less memory usage. 👍