Skip to content

Commit

Permalink
Cache common referred expression at the window input (#9009)
Browse files Browse the repository at this point in the history
* Initial commit

* Update test

* Minor changes

* Tmp

* Retract some changes

* Add lias to window

* Fix name change issue

* Minor changes

* Minor changes

* Un comment new rule

* Open up new rules

* Minor changes

* Change test

* remove prints

* Update slt tests

* Remove leftover code

* Resolve linter errors

* Minor changes

* Remove group window rule

* Remove unnecessary changes

* Minor changes

* Update datafusion/optimizer/src/common_subexpr_eliminate.rs

Co-authored-by: Huaijin <haohuaijin@gmail.com>

* Update comment, add new test

---------

Co-authored-by: Huaijin <haohuaijin@gmail.com>
  • Loading branch information
mustafasrepo and haohuaijin committed Jan 29, 2024
1 parent fffc8be commit a57e270
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 127 deletions.
79 changes: 64 additions & 15 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,25 +166,74 @@ impl CommonSubexprEliminate {
window: &Window,
config: &dyn OptimizerConfig,
) -> Result<LogicalPlan> {
let Window {
input,
window_expr,
schema,
} = window;
let mut window_exprs = vec![];
let mut arrays_per_window = vec![];
let mut expr_set = ExprSet::new();

let input_schema = Arc::clone(input.schema());
let arrays =
to_arrays(window_expr, input_schema, &mut expr_set, ExprMask::Normal)?;
// Get all window expressions inside the consecutive window operators.
// Consecutive window expressions may refer to same complex expression.
// If same complex expression is referred more than once by subsequent `WindowAggr`s,
// we can cache complex expression by evaluating it with a projection before the
// first WindowAggr.
// This enables us to cache complex expression "c3+c4" for following plan:
// WindowAggr: windowExpr=[[SUM(c9) ORDER BY [c3 + c4] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
// --WindowAggr: windowExpr=[[SUM(c9) ORDER BY [c3 + c4] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
// where, it is referred once by each `WindowAggr` (total of 2) in the plan.
let mut plan = LogicalPlan::Window(window.clone());
while let LogicalPlan::Window(window) = plan {
let Window {
input, window_expr, ..
} = window;
plan = input.as_ref().clone();

let input_schema = Arc::clone(input.schema());
let arrays =
to_arrays(&window_expr, input_schema, &mut expr_set, ExprMask::Normal)?;

window_exprs.push(window_expr);
arrays_per_window.push(arrays);
}

let (mut new_expr, new_input) =
self.rewrite_expr(&[window_expr], &[&arrays], input, &expr_set, config)?;
let mut window_exprs = window_exprs
.iter()
.map(|expr| expr.as_slice())
.collect::<Vec<_>>();
let arrays_per_window = arrays_per_window
.iter()
.map(|arrays| arrays.as_slice())
.collect::<Vec<_>>();

Ok(LogicalPlan::Window(Window {
input: Arc::new(new_input),
window_expr: pop_expr(&mut new_expr)?,
schema: schema.clone(),
}))
assert_eq!(window_exprs.len(), arrays_per_window.len());
let (mut new_expr, new_input) = self.rewrite_expr(
&window_exprs,
&arrays_per_window,
&plan,
&expr_set,
config,
)?;
assert_eq!(window_exprs.len(), new_expr.len());

// Construct consecutive window operator, with their corresponding new window expressions.
plan = new_input;
while let Some(new_window_expr) = new_expr.pop() {
// Since `new_expr` and `window_exprs` length are same. We can safely `.unwrap` here.
let orig_window_expr = window_exprs.pop().unwrap();
assert_eq!(new_window_expr.len(), orig_window_expr.len());

// Rename new re-written window expressions with original name (by giving alias)
// Otherwise we may receive schema error, in subsequent operators.
let new_window_expr = new_window_expr
.into_iter()
.zip(orig_window_expr.iter())
.map(|(new_window_expr, window_expr)| {
let original_name = window_expr.name_for_alias()?;
new_window_expr.alias_if_changed(original_name)
})
.collect::<Result<Vec<_>>>()?;
plan = LogicalPlan::Window(Window::try_new(new_window_expr, Arc::new(plan))?);
}

Ok(plan)
}

fn try_optimize_aggregate(
Expand Down
Loading

0 comments on commit a57e270

Please sign in to comment.