From 8b486d53b09ff1c7a6b9cf4687796ba1c13d6160 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Thu, 20 May 2021 23:17:22 +0800 Subject: [PATCH] adding more built-in functions --- ballista/rust/core/proto/ballista.proto | 12 ++- .../core/src/serde/logical_plan/from_proto.rs | 6 ++ .../core/src/serde/logical_plan/to_proto.rs | 4 + datafusion/src/logical_plan/builder.rs | 18 ++++- .../src/physical_plan/window_functions.rs | 78 ++++++++++++++++--- 5 files changed, 99 insertions(+), 19 deletions(-) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index b5f3835f1f8c..926aefa6ab01 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -157,10 +157,14 @@ enum BuiltInWindowFunction { ROW_NUMBER = 0; RANK = 1; DENSE_RANK = 2; - LAG = 3; - LEAD = 4; - FIRST_VALUE = 5; - LAST_VALUE = 6; + PERCENT_RANK = 3; + CUME_DIST = 4; + NTILE = 5; + LAG = 6; + LEAD = 7; + FIRST_VALUE = 8; + LAST_VALUE = 9; + NTH_VALUE = 10; } message WindowExprNode { diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index e963c9c3b225..7d0f429f1e4e 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -1356,6 +1356,9 @@ impl From for BuiltInWindowFunction { BuiltInWindowFunction::RowNumber } protobuf::BuiltInWindowFunction::Rank => BuiltInWindowFunction::Rank, + protobuf::BuiltInWindowFunction::PercentRank => { + BuiltInWindowFunction::PercentRank + } protobuf::BuiltInWindowFunction::DenseRank => { BuiltInWindowFunction::DenseRank } @@ -1364,6 +1367,9 @@ impl From for BuiltInWindowFunction { protobuf::BuiltInWindowFunction::FirstValue => { BuiltInWindowFunction::FirstValue } + protobuf::BuiltInWindowFunction::CumeDist => BuiltInWindowFunction::CumeDist, + protobuf::BuiltInWindowFunction::Ntile => BuiltInWindowFunction::Ntile, + protobuf::BuiltInWindowFunction::NthValue => BuiltInWindowFunction::NthValue, protobuf::BuiltInWindowFunction::LastValue => { BuiltInWindowFunction::LastValue } diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index a2df1756c927..4050d0bea282 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -1255,6 +1255,10 @@ impl From<&BuiltInWindowFunction> for protobuf::BuiltInWindowFunction { match value { BuiltInWindowFunction::FirstValue => Self::FirstValue, BuiltInWindowFunction::LastValue => Self::LastValue, + BuiltInWindowFunction::NthValue => Self::NthValue, + BuiltInWindowFunction::Ntile => Self::Ntile, + BuiltInWindowFunction::CumeDist => Self::CumeDist, + BuiltInWindowFunction::PercentRank => Self::PercentRank, BuiltInWindowFunction::RowNumber => Self::RowNumber, BuiltInWindowFunction::Rank => Self::Rank, BuiltInWindowFunction::Lag => Self::Lag, diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index e5e9a0e51572..9515ac2ff373 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -289,18 +289,30 @@ impl LogicalPlanBuilder { } /// Apply a window - /// - /// NOTE: this feature is under development and this API will be changing + /// + /// NOTE: this feature is under development and this API will be changing + /// + /// - https://github.com/apache/arrow-datafusion/issues/359 basic structure + /// - https://github.com/apache/arrow-datafusion/issues/298 empty over clause + /// - https://github.com/apache/arrow-datafusion/issues/299 with partition clause + /// - https://github.com/apache/arrow-datafusion/issues/360 with order by + /// - https://github.com/apache/arrow-datafusion/issues/361 with window frame pub fn window( &self, window_expr: impl IntoIterator, - // filter: impl IntoIterator, + // FIXME: implement next + // filter_by_expr: impl IntoIterator, + // FIXME: implement next // partition_by_expr: impl IntoIterator, + // FIXME: implement next // order_by_expr: impl IntoIterator, + // FIXME: implement next // window_frame: Option, ) -> Result { let window_expr = window_expr.into_iter().collect::>(); + // FIXME: implement next // let partition_by_expr = partition_by_expr.into_iter().collect::>(); + // FIXME: implement next // let order_by_expr = order_by_expr.into_iter().collect::>(); let all_expr = window_expr.iter(); validate_unique_names("Windows", all_expr.clone(), self.plan.schema())?; diff --git a/datafusion/src/physical_plan/window_functions.rs b/datafusion/src/physical_plan/window_functions.rs index 1fac9709be4a..e6267250bdad 100644 --- a/datafusion/src/physical_plan/window_functions.rs +++ b/datafusion/src/physical_plan/window_functions.rs @@ -72,33 +72,51 @@ impl fmt::Display for WindowFunction { /// An aggregate function that is part of a built-in window function #[derive(Debug, Clone, PartialEq, Eq)] pub enum BuiltInWindowFunction { - /// row number + /// number of the current row within its partition, counting from 1 RowNumber, - /// rank + /// rank of the current row with gaps; same as row_number of its first peer Rank, - /// dense rank + /// ank of the current row without gaps; this function counts peer groups DenseRank, - /// lag + /// relative rank of the current row: (rank - 1) / (total rows - 1) + PercentRank, + /// relative rank of the current row: (number of rows preceding or peer with current row) / (total rows) + CumeDist, + /// integer ranging from 1 to the argument value, dividing the partition as equally as possible + Ntile, + /// returns value evaluated at the row that is offset rows before the current row within the partition; + /// if there is no such row, instead return default (which must be of the same type as value). + /// Both offset and default are evaluated with respect to the current row. + /// If omitted, offset defaults to 1 and default to null Lag, - /// lead + /// returns value evaluated at the row that is offset rows after the current row within the partition; + /// if there is no such row, instead return default (which must be of the same type as value). + /// Both offset and default are evaluated with respect to the current row. + /// If omitted, offset defaults to 1 and default to null Lead, - /// first value + /// returns value evaluated at the row that is the first row of the window frame FirstValue, - /// last value + /// returns value evaluated at the row that is the last row of the window frame LastValue, + /// returns value evaluated at the row that is the nth row of the window frame (counting from 1); null if no such row + NthValue, } impl FromStr for BuiltInWindowFunction { type Err = DataFusionError; fn from_str(name: &str) -> Result { - Ok(match name { + Ok(match name.to_lowercase().as_str() { "row_number" => BuiltInWindowFunction::RowNumber, "rank" => BuiltInWindowFunction::Rank, "dense_rank" => BuiltInWindowFunction::DenseRank, - "first_value" => BuiltInWindowFunction::FirstValue, - "last_value" => BuiltInWindowFunction::LastValue, + "percent_rank" => BuiltInWindowFunction::PercentRank, + "cume_dist" => BuiltInWindowFunction::CumeDist, + "ntile" => BuiltInWindowFunction::Ntile, "lag" => BuiltInWindowFunction::Lag, "lead" => BuiltInWindowFunction::Lead, + "first_value" => BuiltInWindowFunction::FirstValue, + "last_value" => BuiltInWindowFunction::LastValue, + "nth_value" => BuiltInWindowFunction::NthValue, _ => { return Err(DataFusionError::Plan(format!( "There is no built-in window function named {}", @@ -123,10 +141,15 @@ pub fn return_type(fun: &WindowFunction, arg_types: &[DataType]) -> Result Ok(DataType::UInt64), + BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => { + Ok(DataType::Float64) + } + BuiltInWindowFunction::Ntile => Ok(DataType::UInt32), BuiltInWindowFunction::Lag | BuiltInWindowFunction::Lead | BuiltInWindowFunction::FirstValue - | BuiltInWindowFunction::LastValue => Ok(arg_types[0].clone()), + | BuiltInWindowFunction::LastValue + | BuiltInWindowFunction::NthValue => Ok(arg_types[0].clone()), }, } } @@ -139,11 +162,42 @@ fn signature(fun: &WindowFunction) -> Signature { WindowFunction::BuiltInWindowFunction(fun) => match fun { BuiltInWindowFunction::RowNumber | BuiltInWindowFunction::Rank - | BuiltInWindowFunction::DenseRank => Signature::Any(0), + | BuiltInWindowFunction::DenseRank + | BuiltInWindowFunction::PercentRank + | BuiltInWindowFunction::CumeDist => Signature::Any(0), BuiltInWindowFunction::Lag | BuiltInWindowFunction::Lead | BuiltInWindowFunction::FirstValue | BuiltInWindowFunction::LastValue => Signature::Any(1), + BuiltInWindowFunction::Ntile => Signature::Exact(vec![DataType::UInt64]), + BuiltInWindowFunction::NthValue => Signature::Any(2), }, } } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field}; + + #[test] + fn test_window_function_from_str() -> Result<()> { + assert_eq!( + WindowFunction::from_str("max")?, + WindowFunction::AggregateFunction(AggregateFunction::Max) + ); + assert_eq!( + WindowFunction::from_str("min")?, + WindowFunction::AggregateFunction(AggregateFunction::Min) + ); + assert_eq!( + WindowFunction::from_str("avg")?, + WindowFunction::AggregateFunction(AggregateFunction::Avg) + ); + assert_eq!( + WindowFunction::from_str("cum_dist")?, + WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::CumeDist) + ); + Ok(()) + } +}