-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Prevent exponential planning time for Window functions - v2 #17684
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prevent exponential planning time for Window functions - v2 #17684
Conversation
@findepi can you please review this? I've implemented something similar to what I'd in my mind, and I hope it will solve your problems without sacrificing anything, I don't know how to measure the improvement on my computer though |
cc @alamb also, if you are interested in this and want to review |
🤖 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @berkaysynnada nice to see benches though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't solve the original problem.
I checked out this PR and run cargo run --bin datafusion-cli
Partitioning over 4 columns
DataFusion CLI v50.0.0
> WITH source AS (
SELECT
1 AS n,
'' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8,
'' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS a15, '' AS a16,
'' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' AS a23, '' AS a24,
'' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' AS a31, '' AS a32,
'' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' AS a39, '' AS a40
)
SELECT
sum(n) OVER (PARTITION BY
a1, a2, a3, a4
)
FROM source;
+----------------------------------------------------------------------------------------------------------------------------------+
| sum(source.n) PARTITION BY [source.a1, source.a2, source.a3, source.a4] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING |
+----------------------------------------------------------------------------------------------------------------------------------+
| 1 |
+----------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.673 seconds.
Partitioning over 5 columns
> WITH source AS (
SELECT
1 AS n,
'' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8,
'' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS a15, '' AS a16,
'' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' AS a23, '' AS a24,
'' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' AS a31, '' AS a32,
'' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' AS a39, '' AS a40
)
SELECT
sum(n) OVER (PARTITION BY
a1, a2, a3, a4, a5
)
FROM source;
+---------------------------------------------------------------------------------------------------------------------------------------------+
| sum(source.n) PARTITION BY [source.a1, source.a2, source.a3, source.a4, source.a5] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING |
+---------------------------------------------------------------------------------------------------------------------------------------------+
| 1 |
+---------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 10.065 seconds.
Notice the time!
{ | ||
if window_eq_properties.ordering_satisfy(lex.clone())? { | ||
all_satisfied_lexs.push(lex); | ||
if !no_partitioning { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition looks redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition still looks redundant. IIUC, when partitioning_exprs.is_empty()
the code inside the THEN arm will not do anything
.into_iter() | ||
.map(sort_options_resolving_constant) | ||
.map(|expr| sort_options_resolving_constant(expr, false)) | ||
.multi_cartesian_product(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks exponential still. when is this code path taken?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this part is for calculating orderings of function results, and similar approach can also be applied here. If you think the partitioning expr update reasonable now, I'll apply the same here as well, in addition to the valid parts of your PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's maybe file issue / follow-up PR, so that we don't need to re-review code unnecessarily
// For each current partial ordering, try extending with each sort option | ||
for current in current_orderings.iter() { | ||
for sort_expr in sort_options.iter() { | ||
let mut extended = current.clone(); | ||
extended.push(sort_expr.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this look potentially expensive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In #17563 we have this example query
# regression test for https://github.com/apache/datafusion/issues/17401
query I
WITH source AS (
SELECT
1 AS n,
'' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8,
'' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS a15, '' AS a16,
'' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' AS a23, '' AS a24,
'' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' AS a31, '' AS a32,
'' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' AS a39, '' AS a40
)
SELECT
sum(n) OVER (PARTITION BY
a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20,
a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40
)
FROM source;
with current PR code it never completes, but let's ignore this for a moment.
if i read the code correctly, it "prunes early" when next partitioning expression doesn't provide more benefits. So i thought that it won't prune in some cases. Perhaps when input is pre-partitioned?
Let's add a test query
WITH source AS (
SELECT
1 AS n,
'' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8,
'' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS a15, '' AS a16,
'' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' AS a23, '' AS a24,
'' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' AS a31, '' AS a32,
'' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' AS a39, '' AS a40
)
SELECT
sum(n) OVER (PARTITION BY
a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20,
a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40
)
FROM (SELECT * FROM source ORDER BY a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20,;
a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, I am not sure if i understand correctly the pruning condition and what's the condition for not pruning. Please elaborate.
This seems to actually have made planning time worse -- the benchmark run from yesterday is still running
(note I think 696351.6s means 8 days!) |
I missed the case where there are no ordering constraints. In that scenario, ordering_satisfy() returns true for nearly all combinations, so we were still keeping O(4^n) orderings despite pruning. I've now updated the approach as greedy.
Can you check this updated version @findepi ? |
This comment was marked as outdated.
This comment was marked as outdated.
The PR is currently light on tests. Can you please consider pulling synnada-ai#74 ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the changed piece of code no longer looks like exhibiting exponential complexity
thank you!
please include tests (#17684 (comment))
{ | ||
if window_eq_properties.ordering_satisfy(lex.clone())? { | ||
all_satisfied_lexs.push(lex); | ||
if !no_partitioning { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition still looks redundant. IIUC, when partitioning_exprs.is_empty()
the code inside the THEN arm will not do anything
let mut candidate_ordering = ordering.clone(); | ||
candidate_ordering.push(sort_expr.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
ordering.clone()
is avoidable.
logically, it's an expression being pushed-tried-else-popped:
// Find a single valid ordering using a greedy approach
let mut candidate_ordering = vec![];
for partition_expr in partitioning_exprs.iter() {
let sort_options =
sort_options_resolving_constant(Arc::clone(partition_expr), true);
// Try each sort option and pick the first one that works
let mut found = false;
for sort_expr in sort_options.iter() {
candidate_ordering.push(sort_expr.clone());
if let Some(lex) = LexOrdering::new(candidate_ordering.clone()) {
if window_eq_properties.ordering_satisfy(lex)? {
found = true;
break;
}
} else {
candidate_ordering.pop();
}
}
// If no sort option works for this column, we can't build a valid ordering
if !found {
candidate_ordering.clear();
break;
}
}
// If we successfully built an ordering for all columns, use it
if candidate_ordering.len() == partitioning_exprs.len() {
if let Some(lex) = LexOrdering::new(ordering) {
all_satisfied_lexs.push(lex);
}
}
if let Some(lex) = LexOrdering::new(candidate_ordering.clone()) { | ||
if window_eq_properties.ordering_satisfy(lex)? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With every new window sort expression iterated over, we seem to be processing all previous sort expressions. This feels quadratic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's quadratic as we validate orderings of length 1, 2, 3, n. But that's unavoidable since we need to check that each partial ordering stays valid as we extend it.
Still way better than before. Old version is O(4^n), the new one is O(n^2 * 4)
The quadratic cost is the price of correctness
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No doubt quadratic is so much better than exponential.
What could make turn valid partial ordering into an invalid, when extended with a new expression?
Is it only about duplicates?
IIRC, LexOrdering::new
checks for dups, so something we could easily do O(n) overall. But I don't know yet what window_eq_properties.ordering_satisfy
does.
.into_iter() | ||
.map(sort_options_resolving_constant) | ||
.map(|expr| sort_options_resolving_constant(expr, false)) | ||
.multi_cartesian_product(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's maybe file issue / follow-up PR, so that we don't need to re-review code unnecessarily
🤖 |
🤖: Benchmark completed Details
|
🤖 |
🚀 -- it is not often we see a 173x speedup 🐈 🥳 |
I'll drive this to the finish line, probably tomorrow morning (all the stuff we've talked about). Thank you :) |
🤖: Benchmark completed Details
|
thanks @alamb . these lgtm |
…dering-calc tests copied from v1 pr
I think it's ready. @findepi feel free to commit directly if you'd like to make any changes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @berkaysynnada -- I went through this PR carefully and I think it is a nice improvement over what is on main. Thank you for working on this and thank you to @findepi and @comphead for the reviews
sort_options_resolving_constant(Arc::clone(expr), false); | ||
|
||
// Try each option and pick the first that works | ||
for sort_expr in sort_options.iter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly here you could potentially use sort_options.into_iter()
and save the clone below. Here is what I used
@@ -505,13 +505,14 @@ pub(crate) fn window_equivalence_properties(
sort_options_resolving_constant(Arc::clone(expr), false);
// Try each option and pick the first that works
- for sort_expr in sort_options.iter() {
- candidate_order.push(sort_expr.clone());
+ for sort_expr in sort_options.into_iter() {
+ let is_asc = !sort_expr.options.descending;
+ candidate_order.push(sort_expr);
if let Some(lex) = LexOrdering::new(candidate_order.clone()) {
if window_eq_properties.ordering_satisfy(lex)? {
if idx == 0 {
- asc = !sort_expr.options.descending;
+ asc = is_asc;
}
found = true;
break;
asc = !f.options.descending; | ||
// Find one valid ordering for aggregate arguments instead of | ||
// checking all combinations | ||
let aggregate_exprs = sliding_expr.get_aggregate_expr().expressions(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW this seems very similar to the loop above, I wonder if there is some way (as a follow on PR) to factor it out to reduce the replication
0 3 NULL NULL 0 NULL NULL | ||
0 4 NULL NULL 0 NULL NULL | ||
|
||
# regression test for https://github.com/apache/datafusion/issues/17401 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran this locally, and found
both main and This branch -- took 2 seconds
Main
(venv) andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion$ cargo test --profile=ci --test sqllogictests -- window.slt
Finished `ci` profile [unoptimized + debuginfo] target(s) in 0.20s
Running bin/sqllogictests.rs (target/ci/deps/sqllogictests-4dcb99f83e94c047)
Completed 2 test files in 2 seconds
This branch
(venv) andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion$ cargo test --profile=ci --test sqllogictests -- window.slt
Finished `ci` profile [unoptimized + debuginfo] target(s) in 0.19s
Running bin/sqllogictests.rs (target/ci/deps/sqllogictests-fbba93e4275b7826)
Completed 2 test files in 2 seconds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran this locally, and found
both main and This branch -- took 2 seconds
Main
Do you mean you run window.slt unmodified as currently in main
, or did you apply changes from this PR first?
I tried to reproduce the latter and for me execution "hangs" at
[00:00:01] #######################################- 442/454 "window.slt"
// If we successfully built an ordering for all columns, use it | ||
// When there are no partition expressions, candidate_ordering will be empty and won't be added | ||
if candidate_ordering.len() == partitioning_exprs.len() | ||
&& !candidate_ordering.is_empty() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
&& !candidate_ordering.is_empty()
is redundant.
in the empty case, the LexOrdering::new
returns None
&& !candidate_ordering.is_empty() |
if let Some(lex) = LexOrdering::new(candidate_order.clone()) { | ||
if window_eq_properties.ordering_satisfy(lex)? { | ||
if idx == 0 { | ||
asc = !sort_expr.options.descending; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why first is special? worth a code comment
(i know the logic is pre-existing and did not have a comment, but you seem to know what this is about and I do not)
.expressions() | ||
.into_iter() | ||
.map(sort_options_resolving_constant) | ||
.multi_cartesian_product(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for solving this exponential case too!
I am not sure the regression tests cover this case.
Do we need some more test queries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> { |
SUM(c1 + c2 + c3 + c4 + c5 + c6 + c7 + c8 + c9 + c10 + ... + cN)
OVER (ORDER BY c1, c2, ... cN and a causal window)
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
….com/synnada-ai/datafusion-upstream into fix/exponential-window-ordering-calc
Are there any remaining issues preventing merging this PR? |
Ok, this looks good to me, so I'll merge it in now. Thank you @berkaysynnada and @findepi |
…7684) * fix * Update mod.rs * Update mod.rs * Update mod.rs * tests copied from v1 pr * test case from review comment apache#17684 (comment) * one more test case * Update mod.rs * Update datafusion/physical-plan/src/windows/mod.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * Update datafusion/physical-plan/src/windows/mod.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * Update mod.rs * Update mod.rs --------- Co-authored-by: Piotr Findeisen <piotr.findeisen@gmail.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
…7684) * fix * Update mod.rs * Update mod.rs * Update mod.rs * tests copied from v1 pr * test case from review comment apache#17684 (comment) * one more test case * Update mod.rs * Update datafusion/physical-plan/src/windows/mod.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * Update datafusion/physical-plan/src/windows/mod.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * Update mod.rs * Update mod.rs --------- Co-authored-by: Piotr Findeisen <piotr.findeisen@gmail.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
…17778) * fix * Update mod.rs * Update mod.rs * Update mod.rs * tests copied from v1 pr * test case from review comment #17684 (comment) * one more test case * Update mod.rs * Update datafusion/physical-plan/src/windows/mod.rs * Update datafusion/physical-plan/src/windows/mod.rs * Update mod.rs * Update mod.rs --------- Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com> Co-authored-by: Piotr Findeisen <piotr.findeisen@gmail.com>
Which issue does this PR close?
Rationale for this change
There is a complexity issue with window ordering calculations. Rather than computing all possibilities and eliminating the failing ones, we incrementally refine the ordering, keeping only elements that satisfy requirements.
related content: #14813, #17401, #17563
What changes are included in this PR?
This PR implements the algorithm mentioned in the issue. There is also one minor change: the
sort_options_resolving_constant
function now generates candidates according to the intended usage. This enables a minor optimization possibility.Are these changes tested?
Are there any user-facing changes?