Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add window frame constructs #492

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ futures = "0.3"
log = "0.4"
prost = "0.7"
serde = {version = "1", features = ["derive"]}
sqlparser = "0.8"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

sqlparser = "0.9.0"
tokio = "1.0"
tonic = "0.4"
uuid = { version = "0.8", features = ["v4"] }
Expand Down
6 changes: 3 additions & 3 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ message WindowExprNode {
// repeated LogicalExprNode partition_by = 5;
repeated LogicalExprNode order_by = 6;
// repeated LogicalExprNode filter = 7;
// oneof window_frame {
// WindowFrame frame = 8;
// }
oneof window_frame {
WindowFrame frame = 8;
}
}

message BetweenNode {
Expand Down
35 changes: 19 additions & 16 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,6 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, _>>()?;

// let partition_by_expr = window
// .partition_by_expr
// .iter()
// .map(|expr| expr.try_into())
// .collect::<Result<Vec<_>, _>>()?;
// let order_by_expr = window
// .order_by_expr
// .iter()
// .map(|expr| expr.try_into())
// .collect::<Result<Vec<_>, _>>()?;
// // FIXME: add filter by expr
// // FIXME: parse the window_frame data
// let window_frame = None;
LogicalPlanBuilder::from(&input)
.window(window_expr)?
.build()
Expand Down Expand Up @@ -929,6 +915,15 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
.map(|e| e.try_into())
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
let window_frame: Option<WindowFrame> = expr
.window_frame
.as_ref()
.map::<Result<WindowFrame, _>, _>(|e| match e {
window_expr_node::WindowFrame::Frame(frame) => {
frame.clone().try_into()
}
})
.transpose()?;
match window_function {
window_expr_node::WindowFunction::AggrFunction(i) => {
let aggr_function = protobuf::AggregateFunction::from_i32(*i)
Expand All @@ -945,6 +940,7 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
),
args: vec![parse_required_expr(&expr.expr)?],
order_by,
window_frame,
})
}
window_expr_node::WindowFunction::BuiltInFunction(i) => {
Expand All @@ -964,6 +960,7 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
),
args: vec![parse_required_expr(&expr.expr)?],
order_by,
window_frame,
})
}
}
Expand Down Expand Up @@ -1333,8 +1330,14 @@ impl TryFrom<protobuf::WindowFrame> for WindowFrame {
)
})?
.try_into()?;
// FIXME parse end bound
let end_bound = None;
let end_bound = window
.end_bound
.map(|end_bound| match end_bound {
protobuf::window_frame::EndBound::Bound(end_bound) => {
end_bound.try_into()
}
})
.transpose()?;
Ok(WindowFrame {
units,
start_bound,
Expand Down
46 changes: 32 additions & 14 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
ref fun,
ref args,
ref order_by,
ref window_frame,
..
} => {
let window_function = match fun {
Expand All @@ -1026,10 +1027,15 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
.iter()
.map(|e| e.try_into())
.collect::<Result<Vec<_>, _>>()?;
let window_frame = window_frame.as_ref().map(|window_frame| {
let window_frame: protobuf::WindowFrame = window_frame.clone().into();
protobuf::window_expr_node::WindowFrame::Frame(window_frame)
});
let window_expr = Box::new(protobuf::WindowExprNode {
expr: Some(Box::new(arg.try_into()?)),
window_function: Some(window_function),
order_by,
window_frame,
});
Ok(protobuf::LogicalExprNode {
expr_type: Some(ExprType::WindowExpr(window_expr)),
Expand Down Expand Up @@ -1256,23 +1262,35 @@ impl From<WindowFrameUnits> for protobuf::WindowFrameUnits {
}
}

impl TryFrom<WindowFrameBound> for protobuf::WindowFrameBound {
type Error = BallistaError;

fn try_from(_bound: WindowFrameBound) -> Result<Self, Self::Error> {
Err(BallistaError::NotImplemented(
"WindowFrameBound => protobuf::WindowFrameBound".to_owned(),
))
impl From<WindowFrameBound> for protobuf::WindowFrameBound {
fn from(bound: WindowFrameBound) -> Self {
match bound {
WindowFrameBound::CurrentRow => protobuf::WindowFrameBound {
window_frame_bound_type: protobuf::WindowFrameBoundType::CurrentRow
.into(),
bound_value: None,
},
WindowFrameBound::Preceding(v) => protobuf::WindowFrameBound {
window_frame_bound_type: protobuf::WindowFrameBoundType::Preceding.into(),
bound_value: v.map(protobuf::window_frame_bound::BoundValue::Value),
},
WindowFrameBound::Following(v) => protobuf::WindowFrameBound {
window_frame_bound_type: protobuf::WindowFrameBoundType::Following.into(),
bound_value: v.map(protobuf::window_frame_bound::BoundValue::Value),
},
}
}
}

impl TryFrom<WindowFrame> for protobuf::WindowFrame {
type Error = BallistaError;

fn try_from(_window: WindowFrame) -> Result<Self, Self::Error> {
Err(BallistaError::NotImplemented(
"WindowFrame => protobuf::WindowFrame".to_owned(),
))
impl From<WindowFrame> for protobuf::WindowFrame {
fn from(window: WindowFrame) -> Self {
protobuf::WindowFrame {
window_frame_units: protobuf::WindowFrameUnits::from(window.units).into(),
start_bound: Some(window.start_bound.into()),
end_bound: window.end_bound.map(|end_bound| {
protobuf::window_frame::EndBound::Bound(end_bound.into())
}),
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
fun,
args,
order_by,
window_frame,
} => {
let arg = df_planner
.create_physical_expr(
Expand All @@ -250,6 +251,9 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
if !order_by.is_empty() {
return Err(BallistaError::NotImplemented("Window function with order by is not yet implemented".to_owned()));
}
if window_frame.is_some() {
return Err(BallistaError::NotImplemented("Window function with window frame is not yet implemented".to_owned()));
}
let window_expr = create_window_expr(
&fun,
&[arg],
Expand Down
16 changes: 9 additions & 7 deletions datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,20 @@
//! such as `col = 5` or `SUM(col)`. See examples on the [`Expr`] struct.

pub use super::Operator;

use std::fmt;
use std::sync::Arc;

use aggregates::{AccumulatorFunctionImplementation, StateTypeFunction};
use arrow::{compute::can_cast_types, datatypes::DataType};

use crate::error::{DataFusionError, Result};
use crate::logical_plan::{DFField, DFSchema};
use crate::physical_plan::{
aggregates, expressions::binary_operator_data_type, functions, udf::ScalarUDF,
window_functions,
};
use crate::{physical_plan::udaf::AggregateUDF, scalar::ScalarValue};
use aggregates::{AccumulatorFunctionImplementation, StateTypeFunction};
use arrow::{compute::can_cast_types, datatypes::DataType};
use functions::{ReturnTypeFunction, ScalarFunctionImplementation, Signature};
use sqlparser::ast::WindowFrame;
use std::collections::HashSet;
use std::fmt;
use std::sync::Arc;

/// `Expr` is a central struct of DataFusion's query API, and
/// represent logical expressions such as `A + 1`, or `CAST(c1 AS
Expand Down Expand Up @@ -199,6 +197,8 @@ pub enum Expr {
args: Vec<Expr>,
/// List of order by expressions
order_by: Vec<Expr>,
/// Window frame
window_frame: Option<WindowFrame>,
},
/// aggregate function
AggregateUDF {
Expand Down Expand Up @@ -735,10 +735,12 @@ impl Expr {
args,
fun,
order_by,
window_frame,
} => Expr::WindowFunction {
args: rewrite_vec(args, rewriter)?,
fun,
order_by: rewrite_vec(order_by, rewriter)?,
window_frame,
},
Expr::AggregateFunction {
args,
Expand Down
5 changes: 4 additions & 1 deletion datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,9 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
fun: fun.clone(),
args: expressions.to_vec(),
}),
Expr::WindowFunction { fun, .. } => {
Expr::WindowFunction {
fun, window_frame, ..
} => {
let index = expressions
.iter()
.position(|expr| {
Expand All @@ -353,6 +355,7 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
fun: fun.clone(),
args: expressions[..index].to_vec(),
order_by: expressions[index + 1..].to_vec(),
window_frame: window_frame.clone(),
})
}
Expr::AggregateFunction { fun, distinct, .. } => Ok(Expr::AggregateFunction {
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,5 +617,6 @@ pub mod udf;
#[cfg(feature = "unicode_expressions")]
pub mod unicode_expressions;
pub mod union;
pub mod window_frames;
pub mod window_functions;
pub mod windows;
9 changes: 8 additions & 1 deletion datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sort::SortExec;
use crate::physical_plan::udf;
use crate::physical_plan::window_frames;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::{hash_utils, Partitioning};
use crate::physical_plan::{
Expand Down Expand Up @@ -747,7 +748,12 @@ impl DefaultPhysicalPlanner {
};

match e {
Expr::WindowFunction { fun, args, .. } => {
Expr::WindowFunction {
fun,
args,
window_frame,
..
} => {
let args = args
.iter()
.map(|e| {
Expand All @@ -759,6 +765,7 @@ impl DefaultPhysicalPlanner {
// "Window function with order by is not yet implemented".to_owned(),
// ));
// }
let _window_frame = window_frames::validate_window_frame(window_frame)?;
windows::create_window_expr(fun, &args, physical_input_schema, name)
}
other => Err(DataFusionError::Internal(format!(
Expand Down
Loading