Skip to content

Commit

Permalink
fix window expression with alias (#463)
Browse files Browse the repository at this point in the history
  • Loading branch information
jimexist authored Jun 3, 2021
1 parent 28b0dad commit e82d053
Show file tree
Hide file tree
Showing 13 changed files with 508 additions and 170 deletions.
14 changes: 6 additions & 8 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ message WindowExprNode {
// udaf = 3
}
LogicalExprNode expr = 4;
// repeated LogicalExprNode partition_by = 5;
repeated LogicalExprNode order_by = 6;
// repeated LogicalExprNode filter = 7;
// oneof window_frame {
// WindowFrame frame = 8;
// }
}

message BetweenNode {
Expand Down Expand Up @@ -317,14 +323,6 @@ message AggregateNode {
message WindowNode {
LogicalPlanNode input = 1;
repeated LogicalExprNode window_expr = 2;
repeated LogicalExprNode partition_by_expr = 3;
repeated LogicalExprNode order_by_expr = 4;
// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see https://github.com/danburkert/prost/issues/430)
// this syntax is ugly but is binary compatible with the "optional" keyword (see https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3)
oneof window_frame {
WindowFrame frame = 5;
}
// TODO add filter by expr
}

enum WindowFrameUnits {
Expand Down
12 changes: 9 additions & 3 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
// // FIXME: parse the window_frame data
// let window_frame = None;
LogicalPlanBuilder::from(&input)
.window(
window_expr, /* filter_by_expr, partition_by_expr, order_by_expr, window_frame*/
)?
.window(window_expr)?
.build()
.map_err(|e| e.into())
}
Expand Down Expand Up @@ -924,6 +922,12 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
.window_function
.as_ref()
.ok_or_else(|| proto_error("Received empty window function"))?;
let order_by = expr
.order_by
.iter()
.map(|e| e.try_into())
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
match window_function {
window_expr_node::WindowFunction::AggrFunction(i) => {
let aggr_function = protobuf::AggregateFunction::from_i32(*i)
Expand All @@ -939,6 +943,7 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
AggregateFunction::from(aggr_function),
),
args: vec![parse_required_expr(&expr.expr)?],
order_by,
})
}
window_expr_node::WindowFunction::BuiltInFunction(i) => {
Expand All @@ -957,6 +962,7 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
BuiltInWindowFunction::from(built_in_function),
),
args: vec![parse_required_expr(&expr.expr)?],
order_by,
})
}
}
Expand Down
39 changes: 13 additions & 26 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,38 +761,17 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
})
}
LogicalPlan::Window {
input,
window_expr,
// FIXME implement next
// filter_by_expr,
// FIXME implement next
// partition_by_expr,
// FIXME implement next
// order_by_expr,
// FIXME implement next
// window_frame,
..
input, window_expr, ..
} => {
let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?;
// FIXME: implement
// let filter_by_expr = vec![];
// FIXME: implement
let partition_by_expr = vec![];
// FIXME: implement
let order_by_expr = vec![];
// FIXME: implement
let window_frame = None;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Window(Box::new(
protobuf::WindowNode {
input: Some(Box::new(input)),
window_expr: window_expr
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, BallistaError>>()?,
partition_by_expr,
order_by_expr,
window_frame,
.collect::<Result<Vec<_>, _>>()?,
},
))),
})
Expand All @@ -811,11 +790,11 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
group_expr: group_expr
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, BallistaError>>()?,
.collect::<Result<Vec<_>, _>>()?,
aggr_expr: aggr_expr
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, BallistaError>>()?,
.collect::<Result<Vec<_>, _>>()?,
},
))),
})
Expand Down Expand Up @@ -1024,7 +1003,10 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
})
}
Expr::WindowFunction {
ref fun, ref args, ..
ref fun,
ref args,
ref order_by,
..
} => {
let window_function = match fun {
WindowFunction::AggregateFunction(fun) => {
Expand All @@ -1039,9 +1021,14 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
}
};
let arg = &args[0];
let order_by = order_by
.iter()
.map(|e| e.try_into())
.collect::<Result<Vec<_>, _>>()?;
let window_expr = Box::new(protobuf::WindowExprNode {
expr: Some(Box::new(arg.try_into()?)),
window_function: Some(window_function),
order_by,
});
Ok(protobuf::LogicalExprNode {
expr_type: Some(ExprType::WindowExpr(window_expr)),
Expand Down
14 changes: 11 additions & 3 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,11 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {

for (expr, name) in &window_agg_expr {
match expr {
Expr::WindowFunction { fun, args } => {
Expr::WindowFunction {
fun,
args,
order_by,
} => {
let arg = df_planner
.create_physical_expr(
&args[0],
Expand All @@ -243,12 +247,16 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
.map_err(|e| {
BallistaError::General(format!("{:?}", e))
})?;
physical_window_expr.push(create_window_expr(
if !order_by.is_empty() {
return Err(BallistaError::NotImplemented("Window function with order by is not yet implemented".to_owned()));
}
let window_expr = create_window_expr(
&fun,
&[arg],
&physical_schema,
name.to_owned(),
)?);
)?;
physical_window_expr.push(window_expr);
}
_ => {
return Err(BallistaError::General(
Expand Down
24 changes: 1 addition & 23 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,23 +297,7 @@ impl LogicalPlanBuilder {
/// - https://github.com/apache/arrow-datafusion/issues/299 with partition clause
/// - https://github.com/apache/arrow-datafusion/issues/360 with order by
/// - https://github.com/apache/arrow-datafusion/issues/361 with window frame
pub fn window(
&self,
window_expr: impl IntoIterator<Item = Expr>,
// FIXME: implement next
// filter_by_expr: impl IntoIterator<Item = Expr>,
// FIXME: implement next
// partition_by_expr: impl IntoIterator<Item = Expr>,
// FIXME: implement next
// order_by_expr: impl IntoIterator<Item = Expr>,
// FIXME: implement next
// window_frame: Option<WindowFrame>,
) -> Result<Self> {
let window_expr = window_expr.into_iter().collect::<Vec<_>>();
// FIXME: implement next
// let partition_by_expr = partition_by_expr.into_iter().collect::<Vec<Expr>>();
// FIXME: implement next
// let order_by_expr = order_by_expr.into_iter().collect::<Vec<Expr>>();
pub fn window(&self, window_expr: Vec<Expr>) -> Result<Self> {
let all_expr = window_expr.iter();
validate_unique_names("Windows", all_expr.clone(), self.plan.schema())?;

Expand All @@ -323,12 +307,6 @@ impl LogicalPlanBuilder {

Ok(Self::from(&LogicalPlan::Window {
input: Arc::new(self.plan.clone()),
// FIXME implement next
// partition_by_expr,
// FIXME implement next
// order_by_expr,
// FIXME implement next
// window_frame,
window_expr,
schema: Arc::new(DFSchema::new(window_fields)?),
}))
Expand Down
23 changes: 18 additions & 5 deletions datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ pub enum Expr {
fun: window_functions::WindowFunction,
/// List of expressions to feed to the functions as arguments
args: Vec<Expr>,
/// List of order by expressions
order_by: Vec<Expr>,
},
/// aggregate function
AggregateUDF {
Expand Down Expand Up @@ -587,9 +589,15 @@ impl Expr {
Expr::ScalarUDF { args, .. } => args
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor)),
Expr::WindowFunction { args, .. } => args
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor)),
Expr::WindowFunction { args, order_by, .. } => {
let visitor = args
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
let visitor = order_by
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
Ok(visitor)
}
Expr::AggregateFunction { args, .. } => args
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor)),
Expand Down Expand Up @@ -723,9 +731,14 @@ impl Expr {
args: rewrite_vec(args, rewriter)?,
fun,
},
Expr::WindowFunction { args, fun } => Expr::WindowFunction {
Expr::WindowFunction {
args,
fun,
order_by,
} => Expr::WindowFunction {
args: rewrite_vec(args, rewriter)?,
fun,
order_by: rewrite_vec(order_by, rewriter)?,
},
Expr::AggregateFunction {
args,
Expand Down Expand Up @@ -1388,7 +1401,7 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
Expr::ScalarUDF { fun, args, .. } => {
create_function_name(&fun.name, false, args, input_schema)
}
Expr::WindowFunction { fun, args } => {
Expr::WindowFunction { fun, args, .. } => {
create_function_name(&fun.to_string(), false, args, input_schema)
}
Expr::AggregateFunction {
Expand Down
28 changes: 4 additions & 24 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ pub enum LogicalPlan {
// filter_by_expr: Vec<Expr>,
/// Partition by expressions
// partition_by_expr: Vec<Expr>,
/// Order by expressions
// order_by_expr: Vec<Expr>,
/// Window Frame
// window_frame: Option<WindowFrame>,
/// The schema description of the window output
Expand Down Expand Up @@ -306,25 +304,12 @@ impl LogicalPlan {
Partitioning::Hash(expr, _) => expr.clone(),
_ => vec![],
},
LogicalPlan::Window {
window_expr,
// FIXME implement next
// filter_by_expr,
// FIXME implement next
// partition_by_expr,
// FIXME implement next
// order_by_expr,
..
} => window_expr.clone(),
LogicalPlan::Window { window_expr, .. } => window_expr.clone(),
LogicalPlan::Aggregate {
group_expr,
aggr_expr,
..
} => {
let mut result = group_expr.clone();
result.extend(aggr_expr.clone());
result
}
} => group_expr.iter().chain(aggr_expr.iter()).cloned().collect(),
LogicalPlan::Join { on, .. } => {
on.iter().flat_map(|(l, r)| vec![col(l), col(r)]).collect()
}
Expand Down Expand Up @@ -698,16 +683,11 @@ impl LogicalPlan {
..
} => write!(f, "Filter: {:?}", expr),
LogicalPlan::Window {
ref window_expr,
// FIXME implement next
// ref partition_by_expr,
// FIXME implement next
// ref order_by_expr,
..
ref window_expr, ..
} => {
write!(
f,
"WindowAggr: windowExpr=[{:?}] partitionBy=[], orderBy=[]",
"WindowAggr: windowExpr=[{:?}] partitionBy=[]",
window_expr
)
}
Expand Down
45 changes: 20 additions & 25 deletions datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::execution::context::ExecutionProps;
use crate::logical_plan::{DFField, DFSchema, DFSchemaRef, LogicalPlan, ToDFSchema};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use crate::sql::utils::find_sort_exprs;
use arrow::datatypes::Schema;
use arrow::error::Result as ArrowResult;
use std::{collections::HashSet, sync::Arc};
Expand Down Expand Up @@ -197,29 +198,29 @@ fn optimize_plan(
schema,
window_expr,
input,
// FIXME implement next
// filter_by_expr,
// FIXME implement next
// partition_by_expr,
// FIXME implement next
// order_by_expr,
// FIXME implement next
// window_frame,
..
} => {
// Gather all columns needed for expressions in this Window
let mut new_window_expr = Vec::new();
window_expr.iter().try_for_each(|expr| {
let name = &expr.name(&schema)?;
if required_columns.contains(name) {
new_window_expr.push(expr.clone());
new_required_columns.insert(name.clone());
// add to the new set of required columns
utils::expr_to_column_names(expr, &mut new_required_columns)
} else {
Ok(())
}
})?;
{
window_expr.iter().try_for_each(|expr| {
let name = &expr.name(&schema)?;
if required_columns.contains(name) {
new_window_expr.push(expr.clone());
new_required_columns.insert(name.clone());
// add to the new set of required columns
utils::expr_to_column_names(expr, &mut new_required_columns)
} else {
Ok(())
}
})?;
}

// for all the retained window expr, find their sort expressions if any, and retain these
utils::exprlist_to_column_names(
&find_sort_exprs(&new_window_expr),
&mut new_required_columns,
)?;

let new_schema = DFSchema::new(
schema
Expand All @@ -232,12 +233,6 @@ fn optimize_plan(

Ok(LogicalPlan::Window {
window_expr: new_window_expr,
// FIXME implement next
// partition_by_expr: partition_by_expr.clone(),
// FIXME implement next
// order_by_expr: order_by_expr.clone(),
// FIXME implement next
// window_frame: window_frame.clone(),
input: Arc::new(optimize_plan(
optimizer,
&input,
Expand Down
Loading

0 comments on commit e82d053

Please sign in to comment.