Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 7fb3640
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 16:38:25 2021 +0800

    row number done

commit 1723926
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 16:05:50 2021 +0800

    add row number

commit bf5b8a5
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 15:04:49 2021 +0800

    save

commit d2ce852
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 14:53:05 2021 +0800

    add streams

commit 0a861a7
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 20 22:28:34 2021 +0800

    save stream

commit a9121af
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 20 22:01:51 2021 +0800

    update unit test

commit 2af2a27
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 14:25:12 2021 +0800

    fix unit test

commit bb57c76
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 14:23:34 2021 +0800

    use upper case

commit 5d96e52
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 14:16:16 2021 +0800

    fix unit test

commit 1ecae8f
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 12:27:26 2021 +0800

    fix unit test

commit bc2271d
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 10:04:29 2021 +0800

    fix error

commit 880b94f
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 08:24:00 2021 +0800

    fix unit test

commit 4e792e1
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 08:05:17 2021 +0800

    fix test

commit c36c04a
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 00:07:54 2021 +0800

    add more tests

commit f5e64de
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 20 23:41:36 2021 +0800

    update

commit a1eae86
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 20 23:36:15 2021 +0800

    enrich unit test

commit 0d2a214
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 20 23:25:43 2021 +0800

    adding filter by todo

commit 8b486d5
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 20 23:17:22 2021 +0800

    adding more built-in functions

commit abf08cd
Author: Jiayu Liu <Jimexist@users.noreply.github.com>
Date:   Thu May 20 22:36:27 2021 +0800

    Update datafusion/src/physical_plan/window_functions.rs

    Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

commit 0cbca53
Author: Jiayu Liu <Jimexist@users.noreply.github.com>
Date:   Thu May 20 22:34:57 2021 +0800

    Update datafusion/src/physical_plan/window_functions.rs

    Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

commit 831c069
Author: Jiayu Liu <Jimexist@users.noreply.github.com>
Date:   Thu May 20 22:34:04 2021 +0800

    Update datafusion/src/logical_plan/builder.rs

    Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

commit f70c739
Author: Jiayu Liu <Jimexist@users.noreply.github.com>
Date:   Thu May 20 22:33:04 2021 +0800

    Update datafusion/src/logical_plan/builder.rs

    Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

commit 3ee87aa
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Wed May 19 22:55:08 2021 +0800

    fix unit test

commit 5c4d92d
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Wed May 19 22:48:26 2021 +0800

    fix clippy

commit a0b7526
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Wed May 19 22:46:38 2021 +0800

    fix unused imports

commit 1d3b076
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 13 18:51:14 2021 +0800

    add window expr
  • Loading branch information
Jiayu Liu committed May 24, 2021
1 parent 7359e4b commit 6e4d662
Show file tree
Hide file tree
Showing 10 changed files with 566 additions and 66 deletions.
7 changes: 7 additions & 0 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,13 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn window() -> Result<()> {
let results = execute("SELECT c1, MAX(c2) OVER () FROM test", 4).await?;
assert_eq!(results.len(), 1);
Ok(())
}

#[tokio::test]
async fn aggregate() -> Result<()> {
let results = execute("SELECT SUM(c1), SUM(c2) FROM test", 4).await?;
Expand Down
2 changes: 2 additions & 0 deletions datafusion/src/physical_plan/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ mod min_max;
mod negative;
mod not;
mod nullif;
mod row_number;
mod sum;
mod try_cast;

Expand All @@ -58,6 +59,7 @@ pub use min_max::{Max, Min};
pub use negative::{negative, NegativeExpr};
pub use not::{not, NotExpr};
pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES};
pub use row_number::RowNumber;
pub use sum::{sum_return_type, Sum};
pub use try_cast::{try_cast, TryCastExpr};
/// returns the name of the state
Expand Down
58 changes: 58 additions & 0 deletions datafusion/src/physical_plan/expressions/row_number.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines physical expressions that can evaluated at runtime during query execution
use crate::error::Result;
use crate::physical_plan::{BuiltInWindowFunctionExpr, PhysicalExpr};
use arrow::datatypes::{DataType, Field};
use std::any::Any;
use std::sync::Arc;

/// row_number expression
#[derive(Debug)]
pub struct RowNumber {
name: String,
}

impl RowNumber {
/// Create a new MAX aggregate function
pub fn new(name: String) -> Self {
Self { name }
}
}

impl BuiltInWindowFunctionExpr for RowNumber {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn field(&self) -> Result<Field> {
let nullable = false;
let data_type = DataType::UInt64;
Ok(Field::new(&self.name, data_type, nullable))
}

fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![]
}

fn name(&self) -> &str {
&self.name
}
}
7 changes: 4 additions & 3 deletions datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ impl GroupedHashAggregateStream {
tx.send(result)
});

GroupedHashAggregateStream {
Self {
schema,
output: rx,
finished: false,
Expand Down Expand Up @@ -825,7 +825,8 @@ fn aggregate_expressions(
}

pin_project! {
struct HashAggregateStream {
/// stream struct for hash aggregation
pub struct HashAggregateStream {
schema: SchemaRef,
#[pin]
output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
Expand Down Expand Up @@ -878,7 +879,7 @@ impl HashAggregateStream {
tx.send(result)
});

HashAggregateStream {
Self {
schema,
output: rx,
finished: false,
Expand Down
70 changes: 69 additions & 1 deletion datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,41 @@ pub trait WindowExpr: Send + Sync + Debug {
fn name(&self) -> &str {
"WindowExpr: default name"
}

/// the accumulator used to accumulate values from the expressions.
/// the accumulator expects the same number of arguments as `expressions` and must
/// return states with the same description as `state_fields`
fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>>;

/// expressions that are passed to the WindowAccumulator.
/// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
}

/// A window expression that is a built-in window function
pub trait BuiltInWindowFunctionExpr: Send + Sync + Debug {
/// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// the field of the final result of this aggregation.
fn field(&self) -> Result<Field>;

/// expressions that are passed to the Accumulator.
/// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;

/// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
/// implementation returns placeholder text.
fn name(&self) -> &str {
"BuiltInWindowFunctionExpr: default name"
}
}

/// An accumulator represents a stateful object that lives throughout the evaluation of multiple rows and
/// generically accumulates values. An accumulator knows how to:
/// generically accumulates values.
///
/// An accumulator knows how to:
/// * update its state from inputs via `update`
/// * convert its internal state to a vector of scalar values
/// * update its state from multiple accumulators' states via `merge`
Expand Down Expand Up @@ -509,6 +540,43 @@ pub trait Accumulator: Send + Sync + Debug {
fn evaluate(&self) -> Result<ScalarValue>;
}

/// A window accumulator represents a stateful object that lives throughout the evaluation of multiple
/// rows and generically accumulates values.
///
/// An accumulator knows how to:
/// * update its state from inputs via `update`
/// * convert its internal state to a vector of scalar values
/// * update its state from multiple accumulators' states via `merge`
/// * compute the final value from its internal state via `evaluate`
pub trait WindowAccumulator: Send + Sync + Debug {
/// scans the accumulator's state from a vector of scalars, similar to Accumulator it also
/// optionally generates values.
fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>>;

/// scans the accumulator's state from a vector of arrays.
fn scan_batch(&mut self, values: &[ArrayRef]) -> Result<Option<Vec<ScalarValue>>> {
if values.is_empty() {
return Ok(None);
};
// transpose columnar to row based so that we can apply window
let result: Vec<Option<ScalarValue>> = (0..values[0].len())
.map(|index| {
let v = values
.iter()
.map(|array| ScalarValue::try_from_array(array, index))
.collect::<Result<Vec<_>>>()?;
self.scan(&v)
})
.into_iter()
.collect::<Result<Vec<Option<ScalarValue>>>>()?;
let result: Option<Vec<ScalarValue>> = result.into_iter().collect();
Ok(result)
}

/// returns its value based on its current state.
fn evaluate(&self) -> Result<Option<ScalarValue>>;
}

pub mod aggregates;
pub mod array_expressions;
pub mod coalesce_batches;
Expand Down
4 changes: 3 additions & 1 deletion datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,10 @@ impl DefaultPhysicalPlanner {
// Initially need to perform the aggregate and then merge the partitions
let input_exec = self.create_initial_plan(input, ctx_state)?;
let input_schema = input_exec.schema();
let physical_input_schema = input_exec.as_ref().schema();

let logical_input_schema = input.as_ref().schema();
let physical_input_schema = input_exec.as_ref().schema();

let window_expr = window_expr
.iter()
.map(|e| {
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/physical_plan/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ fn sort_batches(
}

pin_project! {
/// stream for sort plan
struct SortStream {
#[pin]
output: futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
Expand Down
77 changes: 46 additions & 31 deletions datafusion/src/physical_plan/window_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,49 +143,64 @@ impl FromStr for BuiltInWindowFunction {

/// Returns the datatype of the window function
pub fn return_type(fun: &WindowFunction, arg_types: &[DataType]) -> Result<DataType> {
match fun {
WindowFunction::AggregateFunction(fun) => aggregates::return_type(fun, arg_types),
WindowFunction::BuiltInWindowFunction(fun) => {
return_type_for_built_in(fun, arg_types)
}
}
}

/// Returns the datatype of the built-in window function
pub(super) fn return_type_for_built_in(
fun: &BuiltInWindowFunction,
arg_types: &[DataType],
) -> Result<DataType> {
// Note that this function *must* return the same type that the respective physical expression returns
// or the execution panics.

// verify that this is a valid set of data types for this function
data_types(arg_types, &signature(fun))?;
data_types(arg_types, &signature_for_built_in(fun))?;

match fun {
WindowFunction::AggregateFunction(fun) => aggregates::return_type(fun, arg_types),
WindowFunction::BuiltInWindowFunction(fun) => match fun {
BuiltInWindowFunction::RowNumber
| BuiltInWindowFunction::Rank
| BuiltInWindowFunction::DenseRank => Ok(DataType::UInt64),
BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => {
Ok(DataType::Float64)
}
BuiltInWindowFunction::Ntile => Ok(DataType::UInt32),
BuiltInWindowFunction::Lag
| BuiltInWindowFunction::Lead
| BuiltInWindowFunction::FirstValue
| BuiltInWindowFunction::LastValue
| BuiltInWindowFunction::NthValue => Ok(arg_types[0].clone()),
},
BuiltInWindowFunction::RowNumber
| BuiltInWindowFunction::Rank
| BuiltInWindowFunction::DenseRank => Ok(DataType::UInt64),
BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => {
Ok(DataType::Float64)
}
BuiltInWindowFunction::Ntile => Ok(DataType::UInt32),
BuiltInWindowFunction::Lag
| BuiltInWindowFunction::Lead
| BuiltInWindowFunction::FirstValue
| BuiltInWindowFunction::LastValue
| BuiltInWindowFunction::NthValue => Ok(arg_types[0].clone()),
}
}

/// the signatures supported by the function `fun`.
fn signature(fun: &WindowFunction) -> Signature {
// note: the physical expression must accept the type returned by this function or the execution panics.
pub fn signature(fun: &WindowFunction) -> Signature {
match fun {
WindowFunction::AggregateFunction(fun) => aggregates::signature(fun),
WindowFunction::BuiltInWindowFunction(fun) => match fun {
BuiltInWindowFunction::RowNumber
| BuiltInWindowFunction::Rank
| BuiltInWindowFunction::DenseRank
| BuiltInWindowFunction::PercentRank
| BuiltInWindowFunction::CumeDist => Signature::Any(0),
BuiltInWindowFunction::Lag
| BuiltInWindowFunction::Lead
| BuiltInWindowFunction::FirstValue
| BuiltInWindowFunction::LastValue => Signature::Any(1),
BuiltInWindowFunction::Ntile => Signature::Exact(vec![DataType::UInt64]),
BuiltInWindowFunction::NthValue => Signature::Any(2),
},
WindowFunction::BuiltInWindowFunction(fun) => signature_for_built_in(fun),
}
}

/// the signatures supported by the built-in window function `fun`.
pub(super) fn signature_for_built_in(fun: &BuiltInWindowFunction) -> Signature {
// note: the physical expression must accept the type returned by this function or the execution panics.
match fun {
BuiltInWindowFunction::RowNumber
| BuiltInWindowFunction::Rank
| BuiltInWindowFunction::DenseRank
| BuiltInWindowFunction::PercentRank
| BuiltInWindowFunction::CumeDist => Signature::Any(0),
BuiltInWindowFunction::Lag
| BuiltInWindowFunction::Lead
| BuiltInWindowFunction::FirstValue
| BuiltInWindowFunction::LastValue => Signature::Any(1),
BuiltInWindowFunction::Ntile => Signature::Exact(vec![DataType::UInt64]),
BuiltInWindowFunction::NthValue => Signature::Any(2),
}
}

Expand Down
Loading

0 comments on commit 6e4d662

Please sign in to comment.