From 7fb3640e733bfbbdbf18d58000896f378ba9644c Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Fri, 21 May 2021 16:38:25 +0800 Subject: [PATCH] row number done --- .../src/physical_plan/expressions/mod.rs | 2 + .../physical_plan/expressions/row_number.rs | 13 ++-- datafusion/src/physical_plan/mod.rs | 4 + .../src/physical_plan/window_functions.rs | 77 +++++++++++-------- datafusion/src/physical_plan/windows.rs | 43 ++++++++--- 5 files changed, 94 insertions(+), 45 deletions(-) diff --git a/datafusion/src/physical_plan/expressions/mod.rs b/datafusion/src/physical_plan/expressions/mod.rs index 4d57c39bb31cc..803870f3f7840 100644 --- a/datafusion/src/physical_plan/expressions/mod.rs +++ b/datafusion/src/physical_plan/expressions/mod.rs @@ -41,6 +41,7 @@ mod min_max; mod negative; mod not; mod nullif; +mod row_number; mod sum; mod try_cast; @@ -58,6 +59,7 @@ pub use min_max::{Max, Min}; pub use negative::{negative, NegativeExpr}; pub use not::{not, NotExpr}; pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES}; +pub use row_number::RowNumber; pub use sum::{sum_return_type, Sum}; pub use try_cast::{try_cast, TryCastExpr}; /// returns the name of the state diff --git a/datafusion/src/physical_plan/expressions/row_number.rs b/datafusion/src/physical_plan/expressions/row_number.rs index d2339a420ae2b..6bcc10815c775 100644 --- a/datafusion/src/physical_plan/expressions/row_number.rs +++ b/datafusion/src/physical_plan/expressions/row_number.rs @@ -22,7 +22,9 @@ use std::convert::TryFrom; use std::sync::Arc; use crate::error::{DataFusionError, Result}; -use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr}; +use crate::physical_plan::{ + Accumulator, AggregateExpr, BuiltInWindowFunctionExpr, PhysicalExpr, +}; use crate::scalar::ScalarValue; use arrow::compute; use arrow::datatypes::{DataType, TimeUnit}; @@ -36,15 +38,16 @@ use arrow::{ datatypes::Field, }; +/// row_number expression +#[derive(Debug)] pub struct RowNumber { name: String, - expr: Arc, } impl RowNumber { /// Create a new MAX aggregate function - pub fn new(expr: Arc, name: String) -> Self { - Self { name, expr } + pub fn new(name: String) -> Self { + Self { name } } } @@ -61,7 +64,7 @@ impl BuiltInWindowFunctionExpr for RowNumber { } fn expressions(&self) -> Vec> { - vec![self.expr.clone()] + vec![] } fn name(&self) -> &str { diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 1f04275c0141a..f77af811d5ebf 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -468,6 +468,10 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + Debug { /// the field of the final result of this aggregation. fn field(&self) -> Result; + /// expressions that are passed to the Accumulator. + /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many. + fn expressions(&self) -> Vec>; + /// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default /// implementation returns placeholder text. fn name(&self) -> &str { diff --git a/datafusion/src/physical_plan/window_functions.rs b/datafusion/src/physical_plan/window_functions.rs index 65d5373d54f47..e3693f310fa70 100644 --- a/datafusion/src/physical_plan/window_functions.rs +++ b/datafusion/src/physical_plan/window_functions.rs @@ -143,49 +143,64 @@ impl FromStr for BuiltInWindowFunction { /// Returns the datatype of the window function pub fn return_type(fun: &WindowFunction, arg_types: &[DataType]) -> Result { + match fun { + WindowFunction::AggregateFunction(fun) => aggregates::return_type(fun, arg_types), + WindowFunction::BuiltInWindowFunction(fun) => { + return_type_for_built_in(fun, arg_types) + } + } +} + +/// Returns the datatype of the built-in window function +pub(super) fn return_type_for_built_in( + fun: &BuiltInWindowFunction, + arg_types: &[DataType], +) -> Result { // Note that this function *must* return the same type that the respective physical expression returns // or the execution panics. // verify that this is a valid set of data types for this function - data_types(arg_types, &signature(fun))?; + data_types(arg_types, &signature_for_built_in(fun))?; match fun { - WindowFunction::AggregateFunction(fun) => aggregates::return_type(fun, arg_types), - WindowFunction::BuiltInWindowFunction(fun) => match fun { - BuiltInWindowFunction::RowNumber - | BuiltInWindowFunction::Rank - | BuiltInWindowFunction::DenseRank => Ok(DataType::UInt64), - BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => { - Ok(DataType::Float64) - } - BuiltInWindowFunction::Ntile => Ok(DataType::UInt32), - BuiltInWindowFunction::Lag - | BuiltInWindowFunction::Lead - | BuiltInWindowFunction::FirstValue - | BuiltInWindowFunction::LastValue - | BuiltInWindowFunction::NthValue => Ok(arg_types[0].clone()), - }, + BuiltInWindowFunction::RowNumber + | BuiltInWindowFunction::Rank + | BuiltInWindowFunction::DenseRank => Ok(DataType::UInt64), + BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => { + Ok(DataType::Float64) + } + BuiltInWindowFunction::Ntile => Ok(DataType::UInt32), + BuiltInWindowFunction::Lag + | BuiltInWindowFunction::Lead + | BuiltInWindowFunction::FirstValue + | BuiltInWindowFunction::LastValue + | BuiltInWindowFunction::NthValue => Ok(arg_types[0].clone()), } } /// the signatures supported by the function `fun`. -fn signature(fun: &WindowFunction) -> Signature { - // note: the physical expression must accept the type returned by this function or the execution panics. +pub fn signature(fun: &WindowFunction) -> Signature { match fun { WindowFunction::AggregateFunction(fun) => aggregates::signature(fun), - WindowFunction::BuiltInWindowFunction(fun) => match fun { - BuiltInWindowFunction::RowNumber - | BuiltInWindowFunction::Rank - | BuiltInWindowFunction::DenseRank - | BuiltInWindowFunction::PercentRank - | BuiltInWindowFunction::CumeDist => Signature::Any(0), - BuiltInWindowFunction::Lag - | BuiltInWindowFunction::Lead - | BuiltInWindowFunction::FirstValue - | BuiltInWindowFunction::LastValue => Signature::Any(1), - BuiltInWindowFunction::Ntile => Signature::Exact(vec![DataType::UInt64]), - BuiltInWindowFunction::NthValue => Signature::Any(2), - }, + WindowFunction::BuiltInWindowFunction(fun) => signature_for_built_in(fun), + } +} + +/// the signatures supported by the built-in window function `fun`. +pub(super) fn signature_for_built_in(fun: &BuiltInWindowFunction) -> Signature { + // note: the physical expression must accept the type returned by this function or the execution panics. + match fun { + BuiltInWindowFunction::RowNumber + | BuiltInWindowFunction::Rank + | BuiltInWindowFunction::DenseRank + | BuiltInWindowFunction::PercentRank + | BuiltInWindowFunction::CumeDist => Signature::Any(0), + BuiltInWindowFunction::Lag + | BuiltInWindowFunction::Lead + | BuiltInWindowFunction::FirstValue + | BuiltInWindowFunction::LastValue => Signature::Any(1), + BuiltInWindowFunction::Ntile => Signature::Exact(vec![DataType::UInt64]), + BuiltInWindowFunction::NthValue => Signature::Any(2), } } diff --git a/datafusion/src/physical_plan/windows.rs b/datafusion/src/physical_plan/windows.rs index a650cbd6016e8..40fddb30985a7 100644 --- a/datafusion/src/physical_plan/windows.rs +++ b/datafusion/src/physical_plan/windows.rs @@ -19,9 +19,15 @@ use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ - aggregates, expressions::Column, window_functions::WindowFunction, AggregateExpr, - BuiltInWindowFunctionExpr, Distribution, ExecutionPlan, Partitioning, PhysicalExpr, - RecordBatchStream, SendableRecordBatchStream, WindowExpr, + aggregates, + expressions::RowNumber, + type_coercion::coerce, + window_functions::{ + return_type_for_built_in, signature_for_built_in, BuiltInWindowFunction, + WindowFunction, + }, + AggregateExpr, BuiltInWindowFunctionExpr, Distribution, ExecutionPlan, Partitioning, + PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, WindowExpr, }; use arrow::{ array::{Array, UInt32Builder}, @@ -69,12 +75,31 @@ pub fn create_window_expr( name, )?, })), - WindowFunction::BuiltInWindowFunction(fun) => { - Err(DataFusionError::NotImplemented(format!( - "window function with {:?} not implemented", - fun - ))) - } + WindowFunction::BuiltInWindowFunction(fun) => Ok(Arc::new(BuiltInWindowExpr { + window: create_built_in_window_expr(fun, args, input_schema, name)?, + })), + } +} + +fn create_built_in_window_expr( + fun: &BuiltInWindowFunction, + args: &[Arc], + input_schema: &Schema, + name: String, +) -> Result> { + // let arg_types = args + // .iter() + // .map(|e| e.data_type(input_schema)) + // .collect::>>()?; + + // let return_type = return_type_for_built_in(&fun, &arg_types)?; + + match fun { + BuiltInWindowFunction::RowNumber => Ok(Arc::new(RowNumber::new(name))), + _ => Err(DataFusionError::NotImplemented(format!( + "window function with {:?} not implemented", + fun + ))), } }