Skip to content

Commit

Permalink
closing up type checks
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiayu Liu committed Jun 4, 2021
1 parent e713bc3 commit 023509d
Show file tree
Hide file tree
Showing 12 changed files with 340 additions and 47 deletions.
2 changes: 1 addition & 1 deletion ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ futures = "0.3"
log = "0.4"
prost = "0.7"
serde = {version = "1", features = ["derive"]}
sqlparser = "0.8"
sqlparser = "0.9.0"
tokio = "1.0"
tonic = "0.4"
uuid = { version = "0.8", features = ["v4"] }
Expand Down
6 changes: 3 additions & 3 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ message WindowExprNode {
// repeated LogicalExprNode partition_by = 5;
repeated LogicalExprNode order_by = 6;
// repeated LogicalExprNode filter = 7;
// oneof window_frame {
// WindowFrame frame = 8;
// }
oneof window_frame {
WindowFrame frame = 8;
}
}

message BetweenNode {
Expand Down
35 changes: 19 additions & 16 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,6 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, _>>()?;

// let partition_by_expr = window
// .partition_by_expr
// .iter()
// .map(|expr| expr.try_into())
// .collect::<Result<Vec<_>, _>>()?;
// let order_by_expr = window
// .order_by_expr
// .iter()
// .map(|expr| expr.try_into())
// .collect::<Result<Vec<_>, _>>()?;
// // FIXME: add filter by expr
// // FIXME: parse the window_frame data
// let window_frame = None;
LogicalPlanBuilder::from(&input)
.window(window_expr)?
.build()
Expand Down Expand Up @@ -928,6 +914,15 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
.map(|e| e.try_into())
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
let window_frame: Option<WindowFrame> = expr
.window_frame
.as_ref()
.map::<Result<WindowFrame, _>, _>(|e| match e {
window_expr_node::WindowFrame::Frame(frame) => {
frame.clone().try_into()
}
})
.transpose()?;
match window_function {
window_expr_node::WindowFunction::AggrFunction(i) => {
let aggr_function = protobuf::AggregateFunction::from_i32(*i)
Expand All @@ -944,6 +939,7 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
),
args: vec![parse_required_expr(&expr.expr)?],
order_by,
window_frame,
})
}
window_expr_node::WindowFunction::BuiltInFunction(i) => {
Expand All @@ -963,6 +959,7 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
),
args: vec![parse_required_expr(&expr.expr)?],
order_by,
window_frame,
})
}
}
Expand Down Expand Up @@ -1332,8 +1329,14 @@ impl TryFrom<protobuf::WindowFrame> for WindowFrame {
)
})?
.try_into()?;
// FIXME parse end bound
let end_bound = None;
let end_bound = window
.end_bound
.map(|end_bound| match end_bound {
protobuf::window_frame::EndBound::Bound(end_bound) => {
end_bound.try_into()
}
})
.transpose()?;
Ok(WindowFrame {
units,
start_bound,
Expand Down
46 changes: 32 additions & 14 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,7 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
ref fun,
ref args,
ref order_by,
ref window_frame,
..
} => {
let window_function = match fun {
Expand All @@ -1025,10 +1026,15 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
.iter()
.map(|e| e.try_into())
.collect::<Result<Vec<_>, _>>()?;
let window_frame = window_frame.as_ref().map(|window_frame| {
let window_frame: protobuf::WindowFrame = window_frame.clone().into();
protobuf::window_expr_node::WindowFrame::Frame(window_frame)
});
let window_expr = Box::new(protobuf::WindowExprNode {
expr: Some(Box::new(arg.try_into()?)),
window_function: Some(window_function),
order_by,
window_frame,
});
Ok(protobuf::LogicalExprNode {
expr_type: Some(ExprType::WindowExpr(window_expr)),
Expand Down Expand Up @@ -1255,23 +1261,35 @@ impl From<WindowFrameUnits> for protobuf::WindowFrameUnits {
}
}

impl TryFrom<WindowFrameBound> for protobuf::WindowFrameBound {
type Error = BallistaError;

fn try_from(_bound: WindowFrameBound) -> Result<Self, Self::Error> {
Err(BallistaError::NotImplemented(
"WindowFrameBound => protobuf::WindowFrameBound".to_owned(),
))
impl From<WindowFrameBound> for protobuf::WindowFrameBound {
fn from(bound: WindowFrameBound) -> Self {
match bound {
WindowFrameBound::CurrentRow => protobuf::WindowFrameBound {
window_frame_bound_type: protobuf::WindowFrameBoundType::CurrentRow
.into(),
bound_value: None,
},
WindowFrameBound::Preceding(v) => protobuf::WindowFrameBound {
window_frame_bound_type: protobuf::WindowFrameBoundType::Preceding.into(),
bound_value: v.map(protobuf::window_frame_bound::BoundValue::Value),
},
WindowFrameBound::Following(v) => protobuf::WindowFrameBound {
window_frame_bound_type: protobuf::WindowFrameBoundType::Following.into(),
bound_value: v.map(protobuf::window_frame_bound::BoundValue::Value),
},
}
}
}

impl TryFrom<WindowFrame> for protobuf::WindowFrame {
type Error = BallistaError;

fn try_from(_window: WindowFrame) -> Result<Self, Self::Error> {
Err(BallistaError::NotImplemented(
"WindowFrame => protobuf::WindowFrame".to_owned(),
))
impl From<WindowFrame> for protobuf::WindowFrame {
fn from(window: WindowFrame) -> Self {
protobuf::WindowFrame {
window_frame_units: protobuf::WindowFrameUnits::from(window.units).into(),
start_bound: Some(window.start_bound.into()),
end_bound: window.end_bound.map(|end_bound| {
protobuf::window_frame::EndBound::Bound(end_bound.into())
}),
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
fun,
args,
order_by,
window_frame,
} => {
let arg = df_planner
.create_physical_expr(
Expand All @@ -250,6 +251,9 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
if !order_by.is_empty() {
return Err(BallistaError::NotImplemented("Window function with order by is not yet implemented".to_owned()));
}
if window_frame.is_some() {
return Err(BallistaError::NotImplemented("Window function with window frame is not yet implemented".to_owned()));
}
let window_expr = create_window_expr(
&fun,
&[arg],
Expand Down
16 changes: 9 additions & 7 deletions datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,20 @@
//! such as `col = 5` or `SUM(col)`. See examples on the [`Expr`] struct.
pub use super::Operator;

use std::fmt;
use std::sync::Arc;

use aggregates::{AccumulatorFunctionImplementation, StateTypeFunction};
use arrow::{compute::can_cast_types, datatypes::DataType};

use crate::error::{DataFusionError, Result};
use crate::logical_plan::{DFField, DFSchema};
use crate::physical_plan::{
aggregates, expressions::binary_operator_data_type, functions, udf::ScalarUDF,
window_functions,
};
use crate::{physical_plan::udaf::AggregateUDF, scalar::ScalarValue};
use aggregates::{AccumulatorFunctionImplementation, StateTypeFunction};
use arrow::{compute::can_cast_types, datatypes::DataType};
use functions::{ReturnTypeFunction, ScalarFunctionImplementation, Signature};
use sqlparser::ast::WindowFrame;
use std::collections::HashSet;
use std::fmt;
use std::sync::Arc;

/// `Expr` is a central struct of DataFusion's query API, and
/// represent logical expressions such as `A + 1`, or `CAST(c1 AS
Expand Down Expand Up @@ -199,6 +197,8 @@ pub enum Expr {
args: Vec<Expr>,
/// List of order by expressions
order_by: Vec<Expr>,
/// Window frame
window_frame: Option<WindowFrame>,
},
/// aggregate function
AggregateUDF {
Expand Down Expand Up @@ -735,10 +735,12 @@ impl Expr {
args,
fun,
order_by,
window_frame,
} => Expr::WindowFunction {
args: rewrite_vec(args, rewriter)?,
fun,
order_by: rewrite_vec(order_by, rewriter)?,
window_frame,
},
Expr::AggregateFunction {
args,
Expand Down
5 changes: 4 additions & 1 deletion datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,9 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
fun: fun.clone(),
args: expressions.to_vec(),
}),
Expr::WindowFunction { fun, .. } => {
Expr::WindowFunction {
fun, window_frame, ..
} => {
let index = expressions
.iter()
.position(|expr| {
Expand All @@ -353,6 +355,7 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
fun: fun.clone(),
args: expressions[..index].to_vec(),
order_by: expressions[index + 1..].to_vec(),
window_frame: window_frame.clone(),
})
}
Expr::AggregateFunction { fun, distinct, .. } => Ok(Expr::AggregateFunction {
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,5 +617,6 @@ pub mod udf;
#[cfg(feature = "unicode_expressions")]
pub mod unicode_expressions;
pub mod union;
pub mod window_frames;
pub mod window_functions;
pub mod windows;
9 changes: 8 additions & 1 deletion datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sort::SortExec;
use crate::physical_plan::udf;
use crate::physical_plan::window_frames;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::{hash_utils, Partitioning};
use crate::physical_plan::{
Expand Down Expand Up @@ -746,7 +747,12 @@ impl DefaultPhysicalPlanner {
};

match e {
Expr::WindowFunction { fun, args, .. } => {
Expr::WindowFunction {
fun,
args,
window_frame,
..
} => {
let args = args
.iter()
.map(|e| {
Expand All @@ -758,6 +764,7 @@ impl DefaultPhysicalPlanner {
// "Window function with order by is not yet implemented".to_owned(),
// ));
// }
let _window_frame = window_frames::validate_window_frame(window_frame)?;
windows::create_window_expr(fun, &args, physical_input_schema, name)
}
other => Err(DataFusionError::Internal(format!(
Expand Down
Loading

0 comments on commit 023509d

Please sign in to comment.