Skip to content

Commit

Permalink
row number done
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiayu Liu committed May 21, 2021
1 parent 1723926 commit 7fb3640
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 45 deletions.
2 changes: 2 additions & 0 deletions datafusion/src/physical_plan/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ mod min_max;
mod negative;
mod not;
mod nullif;
mod row_number;
mod sum;
mod try_cast;

Expand All @@ -58,6 +59,7 @@ pub use min_max::{Max, Min};
pub use negative::{negative, NegativeExpr};
pub use not::{not, NotExpr};
pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES};
pub use row_number::RowNumber;
pub use sum::{sum_return_type, Sum};
pub use try_cast::{try_cast, TryCastExpr};
/// returns the name of the state
Expand Down
13 changes: 8 additions & 5 deletions datafusion/src/physical_plan/expressions/row_number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use std::convert::TryFrom;
use std::sync::Arc;

use crate::error::{DataFusionError, Result};
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
use crate::physical_plan::{
Accumulator, AggregateExpr, BuiltInWindowFunctionExpr, PhysicalExpr,
};
use crate::scalar::ScalarValue;
use arrow::compute;
use arrow::datatypes::{DataType, TimeUnit};
Expand All @@ -36,15 +38,16 @@ use arrow::{
datatypes::Field,
};

/// row_number expression
#[derive(Debug)]
pub struct RowNumber {
name: String,
expr: Arc<dyn PhysicalSortExpr>,
}

impl RowNumber {
/// Create a new MAX aggregate function
pub fn new(expr: Arc<dyn PhysicalExpr>, name: String) -> Self {
Self { name, expr }
pub fn new(name: String) -> Self {
Self { name }
}
}

Expand All @@ -61,7 +64,7 @@ impl BuiltInWindowFunctionExpr for RowNumber {
}

fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.expr.clone()]
vec![]
}

fn name(&self) -> &str {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,10 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + Debug {
/// the field of the final result of this aggregation.
fn field(&self) -> Result<Field>;

/// expressions that are passed to the Accumulator.
/// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;

/// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
/// implementation returns placeholder text.
fn name(&self) -> &str {
Expand Down
77 changes: 46 additions & 31 deletions datafusion/src/physical_plan/window_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,49 +143,64 @@ impl FromStr for BuiltInWindowFunction {

/// Returns the datatype of the window function
pub fn return_type(fun: &WindowFunction, arg_types: &[DataType]) -> Result<DataType> {
match fun {
WindowFunction::AggregateFunction(fun) => aggregates::return_type(fun, arg_types),
WindowFunction::BuiltInWindowFunction(fun) => {
return_type_for_built_in(fun, arg_types)
}
}
}

/// Returns the datatype of the built-in window function
pub(super) fn return_type_for_built_in(
fun: &BuiltInWindowFunction,
arg_types: &[DataType],
) -> Result<DataType> {
// Note that this function *must* return the same type that the respective physical expression returns
// or the execution panics.

// verify that this is a valid set of data types for this function
data_types(arg_types, &signature(fun))?;
data_types(arg_types, &signature_for_built_in(fun))?;

match fun {
WindowFunction::AggregateFunction(fun) => aggregates::return_type(fun, arg_types),
WindowFunction::BuiltInWindowFunction(fun) => match fun {
BuiltInWindowFunction::RowNumber
| BuiltInWindowFunction::Rank
| BuiltInWindowFunction::DenseRank => Ok(DataType::UInt64),
BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => {
Ok(DataType::Float64)
}
BuiltInWindowFunction::Ntile => Ok(DataType::UInt32),
BuiltInWindowFunction::Lag
| BuiltInWindowFunction::Lead
| BuiltInWindowFunction::FirstValue
| BuiltInWindowFunction::LastValue
| BuiltInWindowFunction::NthValue => Ok(arg_types[0].clone()),
},
BuiltInWindowFunction::RowNumber
| BuiltInWindowFunction::Rank
| BuiltInWindowFunction::DenseRank => Ok(DataType::UInt64),
BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => {
Ok(DataType::Float64)
}
BuiltInWindowFunction::Ntile => Ok(DataType::UInt32),
BuiltInWindowFunction::Lag
| BuiltInWindowFunction::Lead
| BuiltInWindowFunction::FirstValue
| BuiltInWindowFunction::LastValue
| BuiltInWindowFunction::NthValue => Ok(arg_types[0].clone()),
}
}

/// the signatures supported by the function `fun`.
fn signature(fun: &WindowFunction) -> Signature {
// note: the physical expression must accept the type returned by this function or the execution panics.
pub fn signature(fun: &WindowFunction) -> Signature {
match fun {
WindowFunction::AggregateFunction(fun) => aggregates::signature(fun),
WindowFunction::BuiltInWindowFunction(fun) => match fun {
BuiltInWindowFunction::RowNumber
| BuiltInWindowFunction::Rank
| BuiltInWindowFunction::DenseRank
| BuiltInWindowFunction::PercentRank
| BuiltInWindowFunction::CumeDist => Signature::Any(0),
BuiltInWindowFunction::Lag
| BuiltInWindowFunction::Lead
| BuiltInWindowFunction::FirstValue
| BuiltInWindowFunction::LastValue => Signature::Any(1),
BuiltInWindowFunction::Ntile => Signature::Exact(vec![DataType::UInt64]),
BuiltInWindowFunction::NthValue => Signature::Any(2),
},
WindowFunction::BuiltInWindowFunction(fun) => signature_for_built_in(fun),
}
}

/// the signatures supported by the built-in window function `fun`.
pub(super) fn signature_for_built_in(fun: &BuiltInWindowFunction) -> Signature {
// note: the physical expression must accept the type returned by this function or the execution panics.
match fun {
BuiltInWindowFunction::RowNumber
| BuiltInWindowFunction::Rank
| BuiltInWindowFunction::DenseRank
| BuiltInWindowFunction::PercentRank
| BuiltInWindowFunction::CumeDist => Signature::Any(0),
BuiltInWindowFunction::Lag
| BuiltInWindowFunction::Lead
| BuiltInWindowFunction::FirstValue
| BuiltInWindowFunction::LastValue => Signature::Any(1),
BuiltInWindowFunction::Ntile => Signature::Exact(vec![DataType::UInt64]),
BuiltInWindowFunction::NthValue => Signature::Any(2),
}
}

Expand Down
43 changes: 34 additions & 9 deletions datafusion/src/physical_plan/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
aggregates, expressions::Column, window_functions::WindowFunction, AggregateExpr,
BuiltInWindowFunctionExpr, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
RecordBatchStream, SendableRecordBatchStream, WindowExpr,
aggregates,
expressions::RowNumber,
type_coercion::coerce,
window_functions::{
return_type_for_built_in, signature_for_built_in, BuiltInWindowFunction,
WindowFunction,
},
AggregateExpr, BuiltInWindowFunctionExpr, Distribution, ExecutionPlan, Partitioning,
PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, WindowExpr,
};
use arrow::{
array::{Array, UInt32Builder},
Expand Down Expand Up @@ -69,12 +75,31 @@ pub fn create_window_expr(
name,
)?,
})),
WindowFunction::BuiltInWindowFunction(fun) => {
Err(DataFusionError::NotImplemented(format!(
"window function with {:?} not implemented",
fun
)))
}
WindowFunction::BuiltInWindowFunction(fun) => Ok(Arc::new(BuiltInWindowExpr {
window: create_built_in_window_expr(fun, args, input_schema, name)?,
})),
}
}

fn create_built_in_window_expr(
fun: &BuiltInWindowFunction,
args: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
name: String,
) -> Result<Arc<dyn BuiltInWindowFunctionExpr>> {
// let arg_types = args
// .iter()
// .map(|e| e.data_type(input_schema))
// .collect::<Result<Vec<_>>>()?;

// let return_type = return_type_for_built_in(&fun, &arg_types)?;

match fun {
BuiltInWindowFunction::RowNumber => Ok(Arc::new(RowNumber::new(name))),
_ => Err(DataFusionError::NotImplemented(format!(
"window function with {:?} not implemented",
fun
))),
}
}

Expand Down

0 comments on commit 7fb3640

Please sign in to comment.