Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add order by construct in window function and logical plans #463

Merged
merged 1 commit into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ message WindowExprNode {
// udaf = 3
}
LogicalExprNode expr = 4;
// repeated LogicalExprNode partition_by = 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can probably just delete the old fields in the protobuf files -- I suspect no one is using them in a way that requires backwards compatibility

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these commented out ones are not old, they are reminders of future fields.

repeated LogicalExprNode order_by = 6;
// repeated LogicalExprNode filter = 7;
// oneof window_frame {
// WindowFrame frame = 8;
// }
}

message BetweenNode {
Expand Down Expand Up @@ -317,14 +323,6 @@ message AggregateNode {
message WindowNode {
LogicalPlanNode input = 1;
repeated LogicalExprNode window_expr = 2;
repeated LogicalExprNode partition_by_expr = 3;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

turns out these are no longer useful

repeated LogicalExprNode order_by_expr = 4;
// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see https://github.com/danburkert/prost/issues/430)
// this syntax is ugly but is binary compatible with the "optional" keyword (see https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3)
oneof window_frame {
WindowFrame frame = 5;
}
// TODO add filter by expr
}

enum WindowFrameUnits {
Expand Down
12 changes: 9 additions & 3 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
// // FIXME: parse the window_frame data
// let window_frame = None;
LogicalPlanBuilder::from(&input)
.window(
window_expr, /* filter_by_expr, partition_by_expr, order_by_expr, window_frame*/
)?
.window(window_expr)?
.build()
.map_err(|e| e.into())
}
Expand Down Expand Up @@ -924,6 +922,12 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
.window_function
.as_ref()
.ok_or_else(|| proto_error("Received empty window function"))?;
let order_by = expr
.order_by
.iter()
.map(|e| e.try_into())
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
match window_function {
window_expr_node::WindowFunction::AggrFunction(i) => {
let aggr_function = protobuf::AggregateFunction::from_i32(*i)
Expand All @@ -939,6 +943,7 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
AggregateFunction::from(aggr_function),
),
args: vec![parse_required_expr(&expr.expr)?],
order_by,
})
}
window_expr_node::WindowFunction::BuiltInFunction(i) => {
Expand All @@ -957,6 +962,7 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
BuiltInWindowFunction::from(built_in_function),
),
args: vec![parse_required_expr(&expr.expr)?],
order_by,
})
}
}
Expand Down
39 changes: 13 additions & 26 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,38 +761,17 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
})
}
LogicalPlan::Window {
input,
window_expr,
// FIXME implement next
// filter_by_expr,
// FIXME implement next
// partition_by_expr,
// FIXME implement next
// order_by_expr,
// FIXME implement next
// window_frame,
..
input, window_expr, ..
} => {
let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?;
// FIXME: implement
// let filter_by_expr = vec![];
// FIXME: implement
let partition_by_expr = vec![];
// FIXME: implement
let order_by_expr = vec![];
// FIXME: implement
let window_frame = None;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Window(Box::new(
protobuf::WindowNode {
input: Some(Box::new(input)),
window_expr: window_expr
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, BallistaError>>()?,
partition_by_expr,
order_by_expr,
window_frame,
.collect::<Result<Vec<_>, _>>()?,
},
))),
})
Expand All @@ -811,11 +790,11 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
group_expr: group_expr
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, BallistaError>>()?,
.collect::<Result<Vec<_>, _>>()?,
aggr_expr: aggr_expr
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, BallistaError>>()?,
.collect::<Result<Vec<_>, _>>()?,
},
))),
})
Expand Down Expand Up @@ -1024,7 +1003,10 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
})
}
Expr::WindowFunction {
ref fun, ref args, ..
ref fun,
ref args,
ref order_by,
..
} => {
let window_function = match fun {
WindowFunction::AggregateFunction(fun) => {
Expand All @@ -1039,9 +1021,14 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
}
};
let arg = &args[0];
let order_by = order_by
.iter()
.map(|e| e.try_into())
.collect::<Result<Vec<_>, _>>()?;
let window_expr = Box::new(protobuf::WindowExprNode {
expr: Some(Box::new(arg.try_into()?)),
window_function: Some(window_function),
order_by,
});
Ok(protobuf::LogicalExprNode {
expr_type: Some(ExprType::WindowExpr(window_expr)),
Expand Down
14 changes: 11 additions & 3 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,11 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {

for (expr, name) in &window_agg_expr {
match expr {
Expr::WindowFunction { fun, args } => {
Expr::WindowFunction {
fun,
args,
order_by,
} => {
let arg = df_planner
.create_physical_expr(
&args[0],
Expand All @@ -243,12 +247,16 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
.map_err(|e| {
BallistaError::General(format!("{:?}", e))
})?;
physical_window_expr.push(create_window_expr(
if !order_by.is_empty() {
return Err(BallistaError::NotImplemented("Window function with order by is not yet implemented".to_owned()));
}
let window_expr = create_window_expr(
&fun,
&[arg],
&physical_schema,
name.to_owned(),
)?);
)?;
physical_window_expr.push(window_expr);
}
_ => {
return Err(BallistaError::General(
Expand Down
24 changes: 1 addition & 23 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,23 +297,7 @@ impl LogicalPlanBuilder {
/// - https://github.com/apache/arrow-datafusion/issues/299 with partition clause
/// - https://github.com/apache/arrow-datafusion/issues/360 with order by
/// - https://github.com/apache/arrow-datafusion/issues/361 with window frame
pub fn window(
&self,
window_expr: impl IntoIterator<Item = Expr>,
// FIXME: implement next
// filter_by_expr: impl IntoIterator<Item = Expr>,
// FIXME: implement next
// partition_by_expr: impl IntoIterator<Item = Expr>,
// FIXME: implement next
// order_by_expr: impl IntoIterator<Item = Expr>,
// FIXME: implement next
// window_frame: Option<WindowFrame>,
) -> Result<Self> {
let window_expr = window_expr.into_iter().collect::<Vec<_>>();
// FIXME: implement next
// let partition_by_expr = partition_by_expr.into_iter().collect::<Vec<Expr>>();
// FIXME: implement next
// let order_by_expr = order_by_expr.into_iter().collect::<Vec<Expr>>();
pub fn window(&self, window_expr: Vec<Expr>) -> Result<Self> {
let all_expr = window_expr.iter();
validate_unique_names("Windows", all_expr.clone(), self.plan.schema())?;

Expand All @@ -323,12 +307,6 @@ impl LogicalPlanBuilder {

Ok(Self::from(&LogicalPlan::Window {
input: Arc::new(self.plan.clone()),
// FIXME implement next
// partition_by_expr,
// FIXME implement next
// order_by_expr,
// FIXME implement next
// window_frame,
window_expr,
schema: Arc::new(DFSchema::new(window_fields)?),
}))
Expand Down
23 changes: 18 additions & 5 deletions datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ pub enum Expr {
fun: window_functions::WindowFunction,
/// List of expressions to feed to the functions as arguments
args: Vec<Expr>,
/// List of order by expressions
order_by: Vec<Expr>,
},
/// aggregate function
AggregateUDF {
Expand Down Expand Up @@ -587,9 +589,15 @@ impl Expr {
Expr::ScalarUDF { args, .. } => args
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor)),
Expr::WindowFunction { args, .. } => args
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor)),
Expr::WindowFunction { args, order_by, .. } => {
let visitor = args
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
let visitor = order_by
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
Ok(visitor)
}
Expr::AggregateFunction { args, .. } => args
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor)),
Expand Down Expand Up @@ -723,9 +731,14 @@ impl Expr {
args: rewrite_vec(args, rewriter)?,
fun,
},
Expr::WindowFunction { args, fun } => Expr::WindowFunction {
Expr::WindowFunction {
args,
fun,
order_by,
} => Expr::WindowFunction {
args: rewrite_vec(args, rewriter)?,
fun,
order_by: rewrite_vec(order_by, rewriter)?,
},
Expr::AggregateFunction {
args,
Expand Down Expand Up @@ -1388,7 +1401,7 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
Expr::ScalarUDF { fun, args, .. } => {
create_function_name(&fun.name, false, args, input_schema)
}
Expr::WindowFunction { fun, args } => {
Expr::WindowFunction { fun, args, .. } => {
create_function_name(&fun.to_string(), false, args, input_schema)
}
Expr::AggregateFunction {
Expand Down
28 changes: 4 additions & 24 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ pub enum LogicalPlan {
// filter_by_expr: Vec<Expr>,
/// Partition by expressions
// partition_by_expr: Vec<Expr>,
/// Order by expressions
// order_by_expr: Vec<Expr>,
/// Window Frame
// window_frame: Option<WindowFrame>,
/// The schema description of the window output
Expand Down Expand Up @@ -306,25 +304,12 @@ impl LogicalPlan {
Partitioning::Hash(expr, _) => expr.clone(),
_ => vec![],
},
LogicalPlan::Window {
window_expr,
// FIXME implement next
// filter_by_expr,
// FIXME implement next
// partition_by_expr,
// FIXME implement next
// order_by_expr,
..
} => window_expr.clone(),
LogicalPlan::Window { window_expr, .. } => window_expr.clone(),
LogicalPlan::Aggregate {
group_expr,
aggr_expr,
..
} => {
let mut result = group_expr.clone();
result.extend(aggr_expr.clone());
result
}
} => group_expr.iter().chain(aggr_expr.iter()).cloned().collect(),
LogicalPlan::Join { on, .. } => {
on.iter().flat_map(|(l, r)| vec![col(l), col(r)]).collect()
}
Expand Down Expand Up @@ -698,16 +683,11 @@ impl LogicalPlan {
..
} => write!(f, "Filter: {:?}", expr),
LogicalPlan::Window {
ref window_expr,
// FIXME implement next
// ref partition_by_expr,
// FIXME implement next
// ref order_by_expr,
..
ref window_expr, ..
} => {
write!(
f,
"WindowAggr: windowExpr=[{:?}] partitionBy=[], orderBy=[]",
"WindowAggr: windowExpr=[{:?}] partitionBy=[]",
window_expr
)
}
Expand Down
45 changes: 20 additions & 25 deletions datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::execution::context::ExecutionProps;
use crate::logical_plan::{DFField, DFSchema, DFSchemaRef, LogicalPlan, ToDFSchema};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use crate::sql::utils::find_sort_exprs;
use arrow::datatypes::Schema;
use arrow::error::Result as ArrowResult;
use std::{collections::HashSet, sync::Arc};
Expand Down Expand Up @@ -197,29 +198,29 @@ fn optimize_plan(
schema,
window_expr,
input,
// FIXME implement next
// filter_by_expr,
// FIXME implement next
// partition_by_expr,
// FIXME implement next
// order_by_expr,
// FIXME implement next
// window_frame,
..
} => {
// Gather all columns needed for expressions in this Window
let mut new_window_expr = Vec::new();
window_expr.iter().try_for_each(|expr| {
let name = &expr.name(&schema)?;
if required_columns.contains(name) {
new_window_expr.push(expr.clone());
new_required_columns.insert(name.clone());
// add to the new set of required columns
utils::expr_to_column_names(expr, &mut new_required_columns)
} else {
Ok(())
}
})?;
{
window_expr.iter().try_for_each(|expr| {
let name = &expr.name(&schema)?;
if required_columns.contains(name) {
new_window_expr.push(expr.clone());
new_required_columns.insert(name.clone());
// add to the new set of required columns
utils::expr_to_column_names(expr, &mut new_required_columns)
} else {
Ok(())
}
})?;
}

// for all the retained window expr, find their sort expressions if any, and retain these
utils::exprlist_to_column_names(
&find_sort_exprs(&new_window_expr),
&mut new_required_columns,
)?;

let new_schema = DFSchema::new(
schema
Expand All @@ -232,12 +233,6 @@ fn optimize_plan(

Ok(LogicalPlan::Window {
window_expr: new_window_expr,
// FIXME implement next
// partition_by_expr: partition_by_expr.clone(),
// FIXME implement next
// order_by_expr: order_by_expr.clone(),
// FIXME implement next
// window_frame: window_frame.clone(),
input: Arc::new(optimize_plan(
optimizer,
&input,
Expand Down
Loading