Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,8 @@ fn criterion_benchmark(c: &mut Criterion) {
});
});

for partitioning_columns in [4, 7, 8] {
// It was observed in production that queries with window functions sometimes partition over more than 30 columns
for partitioning_columns in [4, 7, 8, 12, 30] {
c.bench_function(
&format!(
"physical_window_function_partition_by_{partitioning_columns}_on_values"
Expand Down
1 change: 1 addition & 0 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ impl FileOpener for ParquetOpener {
let encryption_context = self.get_encryption_context();

Ok(Box::pin(async move {
#[cfg(feature = "parquet_encryption")]
let file_decryption_properties = encryption_context
.get_file_decryption_properties(&file_location)
.await?;
Expand Down
50 changes: 41 additions & 9 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,17 @@ pub(crate) fn window_equivalence_properties(
// variations for them. Then, we will check each one whether it satisfies
// the existing ordering provided by the input plan.
let mut all_satisfied_lexs = vec![];
// TODO ok_to_trigger_exponential_planning_time_limit is a workaround for https://github.com/apache/datafusion/issues/17624
let mut ok_to_trigger_exponential_planning_time_limit = 4;
for lex in partitioning_exprs
.iter()
.map(|pb_order| sort_options_resolving_constant(Arc::clone(pb_order)))
.map(|pb_order| {
ok_to_trigger_exponential_planning_time_limit -= 1;
sort_options_resolving_constant(
Arc::clone(pb_order),
ok_to_trigger_exponential_planning_time_limit >= 0,
)
})
.multi_cartesian_product()
.filter_map(LexOrdering::new)
{
Expand Down Expand Up @@ -409,9 +417,14 @@ pub(crate) fn window_equivalence_properties(
} else {
// Window function results in a partial constant value in
// some ordering. Adjust the ordering equivalences accordingly:
// TODO ok_to_trigger_exponential_planning_time_limit is a workaround for https://github.com/apache/datafusion/issues/17624
let mut ok_to_trigger_exponential_planning_time_limit = 4;
let new_lexs = all_satisfied_lexs.into_iter().flat_map(|lex| {
let new_partial_consts =
sort_options_resolving_constant(Arc::clone(&window_col));
ok_to_trigger_exponential_planning_time_limit -= 1;
let new_partial_consts = sort_options_resolving_constant(
Arc::clone(&window_col),
ok_to_trigger_exponential_planning_time_limit >= 0,
);

new_partial_consts.into_iter().map(move |partial| {
let mut existing = lex.clone();
Expand Down Expand Up @@ -467,11 +480,19 @@ pub(crate) fn window_equivalence_properties(
// utilize set-monotonicity since the set shrinks as the frame
// boundary starts "touching" the end of the table.
else if frame.is_causal() {
// TODO ok_to_trigger_exponential_planning_time_limit is a workaround for https://github.com/apache/datafusion/issues/17624
let mut ok_to_trigger_exponential_planning_time_limit = 4;
let args_all_lexs = sliding_expr
.get_aggregate_expr()
.expressions()
.into_iter()
.map(sort_options_resolving_constant)
.map(|expr| {
ok_to_trigger_exponential_planning_time_limit -= 1;
sort_options_resolving_constant(
expr,
ok_to_trigger_exponential_planning_time_limit >= 0,
)
})
.multi_cartesian_product();

let (mut asc, mut satisfied) = (false, false);
Expand Down Expand Up @@ -634,11 +655,22 @@ pub fn get_window_mode(
Ok(None)
}

fn sort_options_resolving_constant(expr: Arc<dyn PhysicalExpr>) -> Vec<PhysicalSortExpr> {
vec![
PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)),
PhysicalSortExpr::new(expr, SortOptions::new(true, true)),
]
fn sort_options_resolving_constant(
expr: Arc<dyn PhysicalExpr>,
ok_to_trigger_exponential_planning_time: bool,
) -> Vec<PhysicalSortExpr> {
if ok_to_trigger_exponential_planning_time {
vec![
PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)),
PhysicalSortExpr::new(expr, SortOptions::new(true, true)),
]
} else {
vec![
// TODO (https://github.com/apache/datafusion/issues/17624) restore while avoiding exponential planning time
// PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, false)),
PhysicalSortExpr::new(expr, SortOptions::new(true, true)),
]
}
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/projection.slt
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ physical_plan
statement ok
drop table t;

# Regression test for
# Regression test for
# https://github.com/apache/datafusion/issues/17513

query I
Expand Down
36 changes: 36 additions & 0 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6034,3 +6034,39 @@ LIMIT 5
0 2 NULL NULL 0 NULL NULL
0 3 NULL NULL 0 NULL NULL
0 4 NULL NULL 0 NULL NULL

# regression test for https://github.com/apache/datafusion/issues/17401
query I
WITH source AS (
SELECT
1 AS n,
'' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8,
'' AS a9, '' AS a10, '' AS a11, '' AS a12
)
SELECT
sum(n) OVER (PARTITION BY
a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12
)
FROM source;
----
1

# regression test for https://github.com/apache/datafusion/issues/17401
query I
WITH source AS (
SELECT
1 AS n,
'' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, '' AS a8,
'' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS a15, '' AS a16,
'' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' AS a23, '' AS a24,
'' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' AS a31, '' AS a32,
'' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' AS a39, '' AS a40
)
SELECT
sum(n) OVER (PARTITION BY
a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, a17, a18, a19, a20,
a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, a35, a36, a37, a38, a39, a40
)
FROM source;
----
1