-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid generate duplicate sort Keys from Window Expressions, fix bug when decide Window Expressions ordering #4643
Changes from all commits
1ab7708
f5b6dfb
43687ec
c2d49c1
952a30b
5565341
2b0ac0f
a0302cf
a7f9667
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1642,7 +1642,120 @@ async fn test_window_agg_sort() -> Result<()> { | |
assert_eq!( | ||
expected, actual_trim_last, | ||
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", | ||
expected, actual | ||
expected, actual_trim_last | ||
); | ||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn over_order_by_sort_keys_sorting_prefix_compacting() -> Result<()> { | ||
let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2)); | ||
register_aggregate_csv(&ctx).await?; | ||
|
||
let sql = "SELECT c2, MAX(c9) OVER (ORDER BY c2), SUM(c9) OVER (), MIN(c9) OVER (ORDER BY c2, c9) from aggregate_test_100"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think without commit, this test runs well, Is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
let msg = format!("Creating logical plan for '{}'", sql); | ||
let plan = ctx.create_logical_plan(sql).expect(&msg); | ||
let state = ctx.state(); | ||
let logical_plan = state.optimize(&plan)?; | ||
let physical_plan = state.create_physical_plan(&logical_plan).await?; | ||
let formatted = displayable(physical_plan.as_ref()).indent().to_string(); | ||
// Only 1 SortExec was added | ||
let expected = { | ||
vec![ | ||
"ProjectionExec: expr=[c2@3 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@0 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MIN(aggregate_test_100.c9)]", | ||
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]", | ||
" WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", | ||
" WindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", | ||
" SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]" | ||
] | ||
}; | ||
|
||
let actual: Vec<&str> = formatted.trim().lines().collect(); | ||
let actual_len = actual.len(); | ||
let actual_trim_last = &actual[..actual_len - 1]; | ||
assert_eq!( | ||
expected, actual_trim_last, | ||
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", | ||
expected, actual_trim_last | ||
); | ||
Ok(()) | ||
} | ||
|
||
/// FIXME: for now we are not detecting prefix of sorting keys in order to re-arrange with global and save one SortExec | ||
#[tokio::test] | ||
async fn over_order_by_sort_keys_sorting_global_order_compacting() -> Result<()> { | ||
let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2)); | ||
register_aggregate_csv(&ctx).await?; | ||
|
||
let sql = "SELECT c2, MAX(c9) OVER (ORDER BY c9, c2), SUM(c9) OVER (), MIN(c9) OVER (ORDER BY c2, c9) from aggregate_test_100 ORDER BY c2"; | ||
let msg = format!("Creating logical plan for '{}'", sql); | ||
let plan = ctx.create_logical_plan(sql).expect(&msg); | ||
let state = ctx.state(); | ||
let logical_plan = state.optimize(&plan)?; | ||
let physical_plan = state.create_physical_plan(&logical_plan).await?; | ||
let formatted = displayable(physical_plan.as_ref()).indent().to_string(); | ||
// 3 SortExec are added | ||
let expected = { | ||
vec![ | ||
"SortExec: [c2@0 ASC NULLS LAST]", | ||
" CoalescePartitionsExec", | ||
" ProjectionExec: expr=[c2@3 as c2, MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as MAX(aggregate_test_100.c9), SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@0 as SUM(aggregate_test_100.c9), MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as MIN(aggregate_test_100.c9)]", | ||
" RepartitionExec: partitioning=RoundRobinBatch(2)", | ||
" WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]", | ||
" WindowAggExec: wdw=[MAX(aggregate_test_100.c9): Ok(Field { name: \"MAX(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", | ||
" SortExec: [c9@2 ASC NULLS LAST,c2@1 ASC NULLS LAST]", | ||
" WindowAggExec: wdw=[MIN(aggregate_test_100.c9): Ok(Field { name: \"MIN(aggregate_test_100.c9)\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(UInt32(NULL)), end_bound: CurrentRow }]", | ||
" SortExec: [c2@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]", | ||
] | ||
}; | ||
|
||
let actual: Vec<&str> = formatted.trim().lines().collect(); | ||
let actual_len = actual.len(); | ||
let actual_trim_last = &actual[..actual_len - 1]; | ||
assert_eq!( | ||
expected, actual_trim_last, | ||
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", | ||
expected, actual_trim_last | ||
); | ||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_window_partition_by_order_by() -> Result<()> { | ||
let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2)); | ||
register_aggregate_csv(&ctx).await?; | ||
|
||
let sql = "SELECT \ | ||
SUM(c4) OVER(PARTITION BY c1, c2 ORDER BY c2 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\ | ||
COUNT(*) OVER(PARTITION BY c1 ORDER BY c2 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) \ | ||
FROM aggregate_test_100"; | ||
|
||
let msg = format!("Creating logical plan for '{}'", sql); | ||
let plan = ctx.create_logical_plan(sql).expect(&msg); | ||
let state = ctx.state(); | ||
let logical_plan = state.optimize(&plan)?; | ||
let physical_plan = state.create_physical_plan(&logical_plan).await?; | ||
let formatted = displayable(physical_plan.as_ref()).indent().to_string(); | ||
// Only 1 SortExec was added | ||
let expected = { | ||
vec![ | ||
"ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as COUNT(UInt8(1))]", | ||
" WindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]", | ||
" SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]", | ||
" CoalesceBatchesExec: target_batch_size=4096", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here i found this also eliminate the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this pr only deal with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, without this PR, the original physical plan added additional RepartitionExecs. this was because the additional Sort causes the two WindowAggExecs can not be collapsed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for explaining, print the origin plan for anyone need it:
After this pr, |
||
" RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2)", | ||
" RepartitionExec: partitioning=RoundRobinBatch(2)", | ||
] | ||
}; | ||
|
||
let actual: Vec<&str> = formatted.trim().lines().collect(); | ||
let actual_len = actual.len(); | ||
let actual_trim_last = &actual[..actual_len - 1]; | ||
assert_eq!( | ||
expected, actual_trim_last, | ||
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", | ||
expected, actual_trim_last | ||
); | ||
Ok(()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,7 @@ use crate::expr_rewriter::{ | |
normalize_cols, rewrite_sort_cols_by_aggs, | ||
}; | ||
use crate::type_coercion::binary::comparison_coercion; | ||
use crate::utils::{columnize_expr, exprlist_to_fields, from_plan}; | ||
use crate::utils::{columnize_expr, compare_sort_expr, exprlist_to_fields, from_plan}; | ||
use crate::{and, binary_expr, Operator}; | ||
use crate::{ | ||
logical_plan::{ | ||
|
@@ -43,6 +43,7 @@ use datafusion_common::{ | |
ToDFSchema, | ||
}; | ||
use std::any::Any; | ||
use std::cmp::Ordering; | ||
use std::collections::{HashMap, HashSet}; | ||
use std::convert::TryFrom; | ||
use std::sync::Arc; | ||
|
@@ -250,16 +251,29 @@ impl LogicalPlanBuilder { | |
) -> Result<LogicalPlan> { | ||
let mut plan = input; | ||
let mut groups = group_window_expr_by_sort_keys(&window_exprs)?; | ||
// sort by sort_key len descending, so that more deeply sorted plans gets nested further | ||
// down as children; to further mimic the behavior of PostgreSQL, we want stable sort | ||
// and a reverse so that tieing sort keys are reversed in order; note that by this rule | ||
// if there's an empty over, it'll be at the top level | ||
groups.sort_by(|(key_a, _), (key_b, _)| key_a.len().cmp(&key_b.len())); | ||
groups.reverse(); | ||
// To align with the behavior of PostgreSQL, we want the sort_keys sorted as same rule as PostgreSQL that first | ||
// we compare the sort key themselves and if one window's sort keys are a prefix of another | ||
// put the window with more sort keys first. so more deeply sorted plans gets nested further down as children. | ||
// The sort_by() implementation here is a stable sort. | ||
// Note that by this rule if there's an empty over, it'll be at the top level | ||
Comment on lines
+254
to
+258
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
groups.sort_by(|(key_a, _), (key_b, _)| { | ||
for (first, second) in key_a.iter().zip(key_b.iter()) { | ||
let key_ordering = compare_sort_expr(first, second, plan.schema()); | ||
match key_ordering { | ||
Ordering::Less => { | ||
return Ordering::Less; | ||
} | ||
Ordering::Greater => { | ||
return Ordering::Greater; | ||
} | ||
Ordering::Equal => {} | ||
} | ||
} | ||
key_b.len().cmp(&key_a.len()) | ||
}); | ||
for (_, exprs) in groups { | ||
let window_exprs = exprs.into_iter().cloned().collect::<Vec<_>>(); | ||
// the partition and sort itself is done at physical level, see physical_planner's | ||
// fn create_initial_plan | ||
// the partition and sort itself is done at physical level, see the BasicEnforcement rule | ||
plan = LogicalPlanBuilder::from(plan) | ||
.window(window_exprs)? | ||
.build()?; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, we can add some test in
sqllogicaltest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️ -- if anyone wants to help we are working on porting tests as part of #4495