Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache common referred expression at the window input #9009

Merged
merged 26 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8a1705b
Initial commit
mustafasrepo Jan 25, 2024
20d5aba
Update test
mustafasrepo Jan 25, 2024
ee34597
Minor changes
mustafasrepo Jan 25, 2024
001f1d5
Tmp
mustafasrepo Jan 25, 2024
b6379d2
Retract some changes
mustafasrepo Jan 25, 2024
6b89835
Add lias to window
mustafasrepo Jan 26, 2024
f019ce5
Fix name change issue
mustafasrepo Jan 26, 2024
c39d59c
Minor changes
mustafasrepo Jan 26, 2024
5d35279
Minor changes
mustafasrepo Jan 26, 2024
bff5989
Un comment new rule
mustafasrepo Jan 26, 2024
d3a8e9b
Open up new rules
mustafasrepo Jan 26, 2024
ebf6a0c
Minor changes
mustafasrepo Jan 26, 2024
c12af19
Change test
mustafasrepo Jan 26, 2024
660aaa0
remove prints
mustafasrepo Jan 26, 2024
e1d0126
Update slt tests
mustafasrepo Jan 26, 2024
1e5ffd4
Remove leftover code
mustafasrepo Jan 26, 2024
727abab
Resolve linter errors
mustafasrepo Jan 26, 2024
a2cfbf0
Minor changes
mustafasrepo Jan 26, 2024
c5f567c
Merge branch 'apache_main' into feature/window_expr_refer
mustafasrepo Jan 26, 2024
386fc25
Remove group window rule
mustafasrepo Jan 26, 2024
132771c
Remove unnecessary changes
mustafasrepo Jan 26, 2024
ea6cfac
Minor changes
mustafasrepo Jan 26, 2024
db697d0
Update datafusion/optimizer/src/common_subexpr_eliminate.rs
mustafasrepo Jan 29, 2024
f203c0e
Merge branch 'apache_main' into feature/window_expr_refer
mustafasrepo Jan 29, 2024
13f532c
Update comment, add new test
mustafasrepo Jan 29, 2024
d4a81d7
Merge branch 'apache_main' into feature/window_expr_refer
mustafasrepo Jan 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 64 additions & 15 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,25 +166,74 @@ impl CommonSubexprEliminate {
window: &Window,
config: &dyn OptimizerConfig,
) -> Result<LogicalPlan> {
let Window {
input,
window_expr,
schema,
} = window;
let mut window_exprs = vec![];
let mut arrays_per_window = vec![];
let mut expr_set = ExprSet::new();

let input_schema = Arc::clone(input.schema());
let arrays =
to_arrays(window_expr, input_schema, &mut expr_set, ExprMask::Normal)?;
// Get all window expressions inside the consecutive window operators.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can add a comment here about why this is recursively looking down into all window operations (e.g. because they all get the same input schema and append on some window functions, but the window functions can't refer to previous window functions).

I think perhaps you could reuse the (very nicely written) description from this PR which explains it very well

// Consecutive window expressions may refer to same complex expression.
// If same complex expression is referred more than once by subsequent `WindowAggr`s,
// we can cache complex expression by evaluating it with a projection before the
// first WindowAggr.
// This enables us to cache complex expression "c3+c4" for following plan:
// WindowAggr: windowExpr=[[SUM(c9) ORDER BY [c3 + c4] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
// --WindowAggr: windowExpr=[[SUM(c9) ORDER BY [c3 + c4] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
// where, it is referred once by each `WindowAggr` (total of 2) in the plan.
let mut plan = LogicalPlan::Window(window.clone());
while let LogicalPlan::Window(window) = plan {
let Window {
input, window_expr, ..
} = window;
plan = input.as_ref().clone();

let input_schema = Arc::clone(input.schema());
let arrays =
to_arrays(&window_expr, input_schema, &mut expr_set, ExprMask::Normal)?;

window_exprs.push(window_expr);
arrays_per_window.push(arrays);
}

let (mut new_expr, new_input) =
self.rewrite_expr(&[window_expr], &[&arrays], input, &expr_set, config)?;
let mut window_exprs = window_exprs
.iter()
.map(|expr| expr.as_slice())
.collect::<Vec<_>>();
let arrays_per_window = arrays_per_window
.iter()
.map(|arrays| arrays.as_slice())
.collect::<Vec<_>>();

Ok(LogicalPlan::Window(Window {
input: Arc::new(new_input),
window_expr: pop_expr(&mut new_expr)?,
schema: schema.clone(),
}))
assert_eq!(window_exprs.len(), arrays_per_window.len());
let (mut new_expr, new_input) = self.rewrite_expr(
&window_exprs,
&arrays_per_window,
&plan,
&expr_set,
config,
)?;
assert_eq!(window_exprs.len(), new_expr.len());

// Construct consecutive window operator, with their corresponding new window expressions.
plan = new_input;
while let Some(new_window_expr) = new_expr.pop() {
// Since `new_expr` and `window_exprs` length are same. We can safely `.unwrap` here.
let orig_window_expr = window_exprs.pop().unwrap();
assert_eq!(new_window_expr.len(), orig_window_expr.len());

// Rename new re-written window expressions with original name (by giving alias)
// Otherwise we may receive schema error, in subsequent operators.
let new_window_expr = new_window_expr
.into_iter()
.zip(orig_window_expr.iter())
.map(|(new_window_expr, window_expr)| {
let original_name = window_expr.name_for_alias()?;
new_window_expr.alias_if_changed(original_name)
})
.collect::<Result<Vec<_>>>()?;
plan = LogicalPlan::Window(Window::try_new(new_window_expr, Arc::new(plan))?);
}

Ok(plan)
}

fn try_optimize_aggregate(
Expand Down
Loading
Loading