Skip to content

Commit

Permalink
adding more built-in functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiayu Liu committed May 21, 2021
1 parent abf08cd commit 8b486d5
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 19 deletions.
12 changes: 8 additions & 4 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,14 @@ enum BuiltInWindowFunction {
ROW_NUMBER = 0;
RANK = 1;
DENSE_RANK = 2;
LAG = 3;
LEAD = 4;
FIRST_VALUE = 5;
LAST_VALUE = 6;
PERCENT_RANK = 3;
CUME_DIST = 4;
NTILE = 5;
LAG = 6;
LEAD = 7;
FIRST_VALUE = 8;
LAST_VALUE = 9;
NTH_VALUE = 10;
}

message WindowExprNode {
Expand Down
6 changes: 6 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,9 @@ impl From<protobuf::BuiltInWindowFunction> for BuiltInWindowFunction {
BuiltInWindowFunction::RowNumber
}
protobuf::BuiltInWindowFunction::Rank => BuiltInWindowFunction::Rank,
protobuf::BuiltInWindowFunction::PercentRank => {
BuiltInWindowFunction::PercentRank
}
protobuf::BuiltInWindowFunction::DenseRank => {
BuiltInWindowFunction::DenseRank
}
Expand All @@ -1364,6 +1367,9 @@ impl From<protobuf::BuiltInWindowFunction> for BuiltInWindowFunction {
protobuf::BuiltInWindowFunction::FirstValue => {
BuiltInWindowFunction::FirstValue
}
protobuf::BuiltInWindowFunction::CumeDist => BuiltInWindowFunction::CumeDist,
protobuf::BuiltInWindowFunction::Ntile => BuiltInWindowFunction::Ntile,
protobuf::BuiltInWindowFunction::NthValue => BuiltInWindowFunction::NthValue,
protobuf::BuiltInWindowFunction::LastValue => {
BuiltInWindowFunction::LastValue
}
Expand Down
4 changes: 4 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,10 @@ impl From<&BuiltInWindowFunction> for protobuf::BuiltInWindowFunction {
match value {
BuiltInWindowFunction::FirstValue => Self::FirstValue,
BuiltInWindowFunction::LastValue => Self::LastValue,
BuiltInWindowFunction::NthValue => Self::NthValue,
BuiltInWindowFunction::Ntile => Self::Ntile,
BuiltInWindowFunction::CumeDist => Self::CumeDist,
BuiltInWindowFunction::PercentRank => Self::PercentRank,
BuiltInWindowFunction::RowNumber => Self::RowNumber,
BuiltInWindowFunction::Rank => Self::Rank,
BuiltInWindowFunction::Lag => Self::Lag,
Expand Down
18 changes: 15 additions & 3 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,18 +289,30 @@ impl LogicalPlanBuilder {
}

/// Apply a window
///
/// NOTE: this feature is under development and this API will be changing
///
/// NOTE: this feature is under development and this API will be changing
///
/// - https://github.com/apache/arrow-datafusion/issues/359 basic structure
/// - https://github.com/apache/arrow-datafusion/issues/298 empty over clause
/// - https://github.com/apache/arrow-datafusion/issues/299 with partition clause
/// - https://github.com/apache/arrow-datafusion/issues/360 with order by
/// - https://github.com/apache/arrow-datafusion/issues/361 with window frame
pub fn window(
&self,
window_expr: impl IntoIterator<Item = Expr>,
// filter: impl IntoIterator<Item = Expr>,
// FIXME: implement next
// filter_by_expr: impl IntoIterator<Item = Expr>,
// FIXME: implement next
// partition_by_expr: impl IntoIterator<Item = Expr>,
// FIXME: implement next
// order_by_expr: impl IntoIterator<Item = Expr>,
// FIXME: implement next
// window_frame: Option<WindowFrame>,
) -> Result<Self> {
let window_expr = window_expr.into_iter().collect::<Vec<Expr>>();
// FIXME: implement next
// let partition_by_expr = partition_by_expr.into_iter().collect::<Vec<Expr>>();
// FIXME: implement next
// let order_by_expr = order_by_expr.into_iter().collect::<Vec<Expr>>();
let all_expr = window_expr.iter();
validate_unique_names("Windows", all_expr.clone(), self.plan.schema())?;
Expand Down
78 changes: 66 additions & 12 deletions datafusion/src/physical_plan/window_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,33 +72,51 @@ impl fmt::Display for WindowFunction {
/// An aggregate function that is part of a built-in window function
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BuiltInWindowFunction {
/// row number
/// number of the current row within its partition, counting from 1
RowNumber,
/// rank
/// rank of the current row with gaps; same as row_number of its first peer
Rank,
/// dense rank
/// ank of the current row without gaps; this function counts peer groups
DenseRank,
/// lag
/// relative rank of the current row: (rank - 1) / (total rows - 1)
PercentRank,
/// relative rank of the current row: (number of rows preceding or peer with current row) / (total rows)
CumeDist,
/// integer ranging from 1 to the argument value, dividing the partition as equally as possible
Ntile,
/// returns value evaluated at the row that is offset rows before the current row within the partition;
/// if there is no such row, instead return default (which must be of the same type as value).
/// Both offset and default are evaluated with respect to the current row.
/// If omitted, offset defaults to 1 and default to null
Lag,
/// lead
/// returns value evaluated at the row that is offset rows after the current row within the partition;
/// if there is no such row, instead return default (which must be of the same type as value).
/// Both offset and default are evaluated with respect to the current row.
/// If omitted, offset defaults to 1 and default to null
Lead,
/// first value
/// returns value evaluated at the row that is the first row of the window frame
FirstValue,
/// last value
/// returns value evaluated at the row that is the last row of the window frame
LastValue,
/// returns value evaluated at the row that is the nth row of the window frame (counting from 1); null if no such row
NthValue,
}

impl FromStr for BuiltInWindowFunction {
type Err = DataFusionError;
fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
Ok(match name {
Ok(match name.to_lowercase().as_str() {
"row_number" => BuiltInWindowFunction::RowNumber,
"rank" => BuiltInWindowFunction::Rank,
"dense_rank" => BuiltInWindowFunction::DenseRank,
"first_value" => BuiltInWindowFunction::FirstValue,
"last_value" => BuiltInWindowFunction::LastValue,
"percent_rank" => BuiltInWindowFunction::PercentRank,
"cume_dist" => BuiltInWindowFunction::CumeDist,
"ntile" => BuiltInWindowFunction::Ntile,
"lag" => BuiltInWindowFunction::Lag,
"lead" => BuiltInWindowFunction::Lead,
"first_value" => BuiltInWindowFunction::FirstValue,
"last_value" => BuiltInWindowFunction::LastValue,
"nth_value" => BuiltInWindowFunction::NthValue,
_ => {
return Err(DataFusionError::Plan(format!(
"There is no built-in window function named {}",
Expand All @@ -123,10 +141,15 @@ pub fn return_type(fun: &WindowFunction, arg_types: &[DataType]) -> Result<DataT
BuiltInWindowFunction::RowNumber
| BuiltInWindowFunction::Rank
| BuiltInWindowFunction::DenseRank => Ok(DataType::UInt64),
BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => {
Ok(DataType::Float64)
}
BuiltInWindowFunction::Ntile => Ok(DataType::UInt32),
BuiltInWindowFunction::Lag
| BuiltInWindowFunction::Lead
| BuiltInWindowFunction::FirstValue
| BuiltInWindowFunction::LastValue => Ok(arg_types[0].clone()),
| BuiltInWindowFunction::LastValue
| BuiltInWindowFunction::NthValue => Ok(arg_types[0].clone()),
},
}
}
Expand All @@ -139,11 +162,42 @@ fn signature(fun: &WindowFunction) -> Signature {
WindowFunction::BuiltInWindowFunction(fun) => match fun {
BuiltInWindowFunction::RowNumber
| BuiltInWindowFunction::Rank
| BuiltInWindowFunction::DenseRank => Signature::Any(0),
| BuiltInWindowFunction::DenseRank
| BuiltInWindowFunction::PercentRank
| BuiltInWindowFunction::CumeDist => Signature::Any(0),
BuiltInWindowFunction::Lag
| BuiltInWindowFunction::Lead
| BuiltInWindowFunction::FirstValue
| BuiltInWindowFunction::LastValue => Signature::Any(1),
BuiltInWindowFunction::Ntile => Signature::Exact(vec![DataType::UInt64]),
BuiltInWindowFunction::NthValue => Signature::Any(2),
},
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::{DataType, Field};

#[test]
fn test_window_function_from_str() -> Result<()> {
assert_eq!(
WindowFunction::from_str("max")?,
WindowFunction::AggregateFunction(AggregateFunction::Max)
);
assert_eq!(
WindowFunction::from_str("min")?,
WindowFunction::AggregateFunction(AggregateFunction::Min)
);
assert_eq!(
WindowFunction::from_str("avg")?,
WindowFunction::AggregateFunction(AggregateFunction::Avg)
);
assert_eq!(
WindowFunction::from_str("cum_dist")?,
WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::CumeDist)
);
Ok(())
}
}

0 comments on commit 8b486d5

Please sign in to comment.