-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid generate duplicate sort Keys from Window Expressions, fix bug when decide Window Expressions ordering #4643
Conversation
…hen decide Window Expressions ordering
@yahoNanJing @Ted-Jiang @alamb @jackwener |
…e UTs for Null String sort testing
datafusion/expr/src/utils.rs
Outdated
} => Ok(Expr::Sort { | ||
expr: expr.clone(), | ||
asc: true, | ||
nulls_first: false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think here normalized_order_by_keys
, using the order_by_keys
do sort partition inner each hashed partition. Only care about same value are adjacent. So no need call about the sort order ? am i right 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, here the normalized_order_by_keys
are for dedup purpose. The generated sort keys for window expressions are partition_by_keys + sort_keys. There might be over lap between partition_by_keys and sort_keys. The original implement leverage the contains()
method to do the dedup but this is not enough.
Thanks for ping me ❤️, I will carefully review this today. |
}, | ||
]; | ||
let result = generate_sort_key(partition_by, order_by)?; | ||
assert_eq!(expected, result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked with this commit it will
change :
[age ASC NULLS FIRST, name ASC NULLS FIRST, created_at ASC NULLS FIRST, age ASC NULLS LAST, name ASC NULLS LAST]
to:
[age DESC NULLS FIRST, name DESC NULLS FIRST, created_at ASC NULLS LAST]
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this PR will do additional dedup to avoid generate duplicate sort keys.
let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2)); | ||
register_aggregate_csv(&ctx).await?; | ||
|
||
let sql = "SELECT c2, MAX(c9) OVER (ORDER BY c2), SUM(c9) OVER (), MIN(c9) OVER (ORDER BY c2, c9) from aggregate_test_100"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think without commit, this test runs well, Is Enforcement
physical rule eliminate the ORDER BY c2
🤔 @mingmwang
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as COUNT(UInt8(1))]", | ||
" WindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]", | ||
" SortExec: [c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST]", | ||
" CoalesceBatchesExec: target_batch_size=4096", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here i found this also eliminate the RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }, Column { name: \"c2\", index: 1 }], 2)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this pr only deal with sort
, who did this🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, without this PR, the original physical plan added additional RepartitionExecs. this was because the additional Sort causes the two WindowAggExecs can not be collapsed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining, print the origin plan for anyone need it:
[
"ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as COUNT(UInt8(1))]",
" WindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
" SortExec: [c1@1 ASC,c2@2 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 1 }], 2)",
" WindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }]",
" SortExec: [c1@0 ASC,c2@1 ASC,c2@1 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }, Column { name: \"c2\", index: 1 }], 2)",
" RepartitionExec: partitioning=RoundRobinBatch(2)",
]
After this pr, let mut groups = group_window_expr_by_sort_keys(&window_exprs)?;
will generate one group, that's why WindowAggExecs are collapsed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mingmwang This improvement make sense to me. 👍 Only some question about the optimizer and need fix the clippy.
Thanks. Those clippy errors are not caused by this PR. I'm waiting for someone else to fix those in the master and I will rebase. |
I believe @jackwener has fixed them in #4652 |
cc @metesynnada and @retikulum as I believe you have interest in and are working on window expressions as well |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, a nice job, and it consider many details carefully.👍
// FIXME sort on LargeUtf8 String has bug. | ||
// let sql = | ||
// "SELECT d3, row_number() OVER (partition by d3) as rn1 FROM test"; | ||
// let actual = execute_to_batches(&ctx, sql).await; | ||
// let expected = vec![ | ||
// "+-------+-----+", | ||
// "| d3 | rn1 |", | ||
// "+-------+-----+", | ||
// "| | 1 |", | ||
// "| One | 1 |", | ||
// "| Three | 1 |", | ||
// "+-------+-----+", | ||
// ]; | ||
// assert_batches_eq!(expected, &actual); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, we can add some test in sqllogicaltest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️ -- if anyone wants to help we are working on porting tests as part of #4495
// To align with the behavior of PostgreSQL, we want the sort_keys sorted as same rule as PostgreSQL that first | ||
// we compare the sort key themselves and if one window's sort keys are a prefix of another | ||
// put the window with more sort keys first. so more deeply sorted plans gets nested further down as children. | ||
// The sort_by() implementation here is a stable sort. | ||
// Note that by this rule if there's an empty over, it'll be at the top level |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Thanks @mingmwang @jackwener and @Ted-Jiang |
Benchmark runs are scheduled for baseline = dba34fc and contender = 891a800. 891a800 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes: #4635
Closes: #4641
Rationale for this change
The current implementation might generate duplicate sort keys from Window Expressions, and the window Expressions
ordering is not consistent with the PG.
This PR addressed those issues.
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?