Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partition by constructs in window functions and modify logical planning #501

Merged
merged 2 commits into from
Jun 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ message WindowExprNode {
// udaf = 3
}
LogicalExprNode expr = 4;
// repeated LogicalExprNode partition_by = 5;
repeated LogicalExprNode partition_by = 5;
repeated LogicalExprNode order_by = 6;
// repeated LogicalExprNode filter = 7;
oneof window_frame {
Expand Down
8 changes: 8 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,12 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
.window_function
.as_ref()
.ok_or_else(|| proto_error("Received empty window function"))?;
let partition_by = expr
.partition_by
.iter()
.map(|e| e.try_into())
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
let order_by = expr
.order_by
.iter()
Expand Down Expand Up @@ -940,6 +946,7 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
AggregateFunction::from(aggr_function),
),
args: vec![parse_required_expr(&expr.expr)?],
partition_by,
order_by,
window_frame,
})
Expand All @@ -960,6 +967,7 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
BuiltInWindowFunction::from(built_in_function),
),
args: vec![parse_required_expr(&expr.expr)?],
partition_by,
order_by,
window_frame,
})
Expand Down
6 changes: 6 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
Expr::WindowFunction {
ref fun,
ref args,
ref partition_by,
ref order_by,
ref window_frame,
..
Expand All @@ -1023,6 +1024,10 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
}
};
let arg = &args[0];
let partition_by = partition_by
.iter()
.map(|e| e.try_into())
.collect::<Result<Vec<_>, _>>()?;
let order_by = order_by
.iter()
.map(|e| e.try_into())
Expand All @@ -1035,6 +1040,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
let window_expr = Box::new(protobuf::WindowExprNode {
expr: Some(Box::new(arg.try_into()?)),
window_function: Some(window_function),
partition_by,
order_by,
window_frame,
});
Expand Down
8 changes: 8 additions & 0 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,9 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
Expr::WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
..
} => {
let arg = df_planner
Expand All @@ -248,9 +250,15 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
.map_err(|e| {
BallistaError::General(format!("{:?}", e))
})?;
if !partition_by.is_empty() {
return Err(BallistaError::NotImplemented("Window function with partition by is not yet implemented".to_owned()));
}
if !order_by.is_empty() {
return Err(BallistaError::NotImplemented("Window function with order by is not yet implemented".to_owned()));
}
if window_frame.is_some() {
return Err(BallistaError::NotImplemented("Window function with window frame is not yet implemented".to_owned()));
}
let window_expr = create_window_expr(
&fun,
&[arg],
Expand Down
14 changes: 13 additions & 1 deletion datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ pub enum Expr {
fun: window_functions::WindowFunction,
/// List of expressions to feed to the functions as arguments
args: Vec<Expr>,
/// List of partition by expressions
partition_by: Vec<Expr>,
/// List of order by expressions
order_by: Vec<Expr>,
/// Window frame
Expand Down Expand Up @@ -588,10 +590,18 @@ impl Expr {
Expr::ScalarUDF { args, .. } => args
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor)),
Expr::WindowFunction { args, order_by, .. } => {
Expr::WindowFunction {
args,
partition_by,
order_by,
..
} => {
let visitor = args
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
let visitor = partition_by
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
let visitor = order_by
.iter()
.try_fold(visitor, |visitor, arg| arg.accept(visitor))?;
Expand Down Expand Up @@ -733,11 +743,13 @@ impl Expr {
Expr::WindowFunction {
args,
fun,
partition_by,
order_by,
window_frame,
} => Expr::WindowFunction {
args: rewrite_vec(args, rewriter)?,
fun,
partition_by: rewrite_vec(partition_by, rewriter)?,
order_by: rewrite_vec(order_by, rewriter)?,
window_frame,
},
Expand Down
6 changes: 1 addition & 5 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,11 +687,7 @@ impl LogicalPlan {
LogicalPlan::Window {
ref window_expr, ..
} => {
write!(
f,
"WindowAggr: windowExpr=[{:?}] partitionBy=[]",
window_expr
)
write!(f, "WindowAggr: windowExpr=[{:?}]", window_expr)
}
LogicalPlan::Aggregate {
ref group_expr,
Expand Down
46 changes: 38 additions & 8 deletions datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::{

const CASE_EXPR_MARKER: &str = "__DATAFUSION_CASE_EXPR__";
const CASE_ELSE_MARKER: &str = "__DATAFUSION_CASE_ELSE__";
const WINDOW_PARTITION_MARKER: &str = "__DATAFUSION_WINDOW_PARTITION__";
const WINDOW_SORT_MARKER: &str = "__DATAFUSION_WINDOW_SORT__";

/// Recursively walk a list of expression trees, collecting the unique set of column
Expand Down Expand Up @@ -258,9 +259,16 @@ pub fn expr_sub_expressions(expr: &Expr) -> Result<Vec<Expr>> {
Expr::IsNotNull(e) => Ok(vec![e.as_ref().to_owned()]),
Expr::ScalarFunction { args, .. } => Ok(args.clone()),
Expr::ScalarUDF { args, .. } => Ok(args.clone()),
Expr::WindowFunction { args, order_by, .. } => {
Expr::WindowFunction {
args,
partition_by,
order_by,
..
} => {
let mut expr_list: Vec<Expr> = vec![];
expr_list.extend(args.clone());
expr_list.push(lit(WINDOW_PARTITION_MARKER));
expr_list.extend(partition_by.clone());
expr_list.push(lit(WINDOW_SORT_MARKER));
expr_list.extend(order_by.clone());
Ok(expr_list)
Expand Down Expand Up @@ -340,7 +348,20 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
Expr::WindowFunction {
fun, window_frame, ..
} => {
let index = expressions
let partition_index = expressions
.iter()
.position(|expr| {
matches!(expr, Expr::Literal(ScalarValue::Utf8(Some(str)))
if str == WINDOW_PARTITION_MARKER)
})
.ok_or_else(|| {
DataFusionError::Internal(
"Ill-formed window function expressions: unexpected marker"
.to_owned(),
)
})?;

let sort_index = expressions
.iter()
.position(|expr| {
matches!(expr, Expr::Literal(ScalarValue::Utf8(Some(str)))
Expand All @@ -351,12 +372,21 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
"Ill-formed window function expressions".to_owned(),
)
})?;
Ok(Expr::WindowFunction {
fun: fun.clone(),
args: expressions[..index].to_vec(),
order_by: expressions[index + 1..].to_vec(),
window_frame: *window_frame,
})

if partition_index >= sort_index {
Err(DataFusionError::Internal(
"Ill-formed window function expressions: partition index too large"
.to_owned(),
))
} else {
Ok(Expr::WindowFunction {
fun: fun.clone(),
args: expressions[..partition_index].to_vec(),
partition_by: expressions[partition_index + 1..sort_index].to_vec(),
order_by: expressions[sort_index + 1..].to_vec(),
window_frame: *window_frame,
})
}
}
Expr::AggregateFunction { fun, distinct, .. } => Ok(Expr::AggregateFunction {
fun: fun.clone(),
Expand Down
Loading