Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 7fb3640
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 16:38:25 2021 +0800

    row number done

commit 1723926
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 16:05:50 2021 +0800

    add row number

commit bf5b8a5
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 15:04:49 2021 +0800

    save

commit d2ce852
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 14:53:05 2021 +0800

    add streams

commit 0a861a7
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 20 22:28:34 2021 +0800

    save stream

commit a9121af
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 20 22:01:51 2021 +0800

    update unit test

commit 2af2a27
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 14:25:12 2021 +0800

    fix unit test

commit bb57c76
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 14:23:34 2021 +0800

    use upper case

commit 5d96e52
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 14:16:16 2021 +0800

    fix unit test

commit 1ecae8f
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 12:27:26 2021 +0800

    fix unit test

commit bc2271d
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 10:04:29 2021 +0800

    fix error

commit 880b94f
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 08:24:00 2021 +0800

    fix unit test

commit 4e792e1
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 08:05:17 2021 +0800

    fix test

commit c36c04a
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Fri May 21 00:07:54 2021 +0800

    add more tests

commit f5e64de
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 20 23:41:36 2021 +0800

    update

commit a1eae86
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 20 23:36:15 2021 +0800

    enrich unit test

commit 0d2a214
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 20 23:25:43 2021 +0800

    adding filter by todo

commit 8b486d5
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 20 23:17:22 2021 +0800

    adding more built-in functions

commit abf08cd
Author: Jiayu Liu <Jimexist@users.noreply.github.com>
Date:   Thu May 20 22:36:27 2021 +0800

    Update datafusion/src/physical_plan/window_functions.rs

    Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

commit 0cbca53
Author: Jiayu Liu <Jimexist@users.noreply.github.com>
Date:   Thu May 20 22:34:57 2021 +0800

    Update datafusion/src/physical_plan/window_functions.rs

    Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

commit 831c069
Author: Jiayu Liu <Jimexist@users.noreply.github.com>
Date:   Thu May 20 22:34:04 2021 +0800

    Update datafusion/src/logical_plan/builder.rs

    Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

commit f70c739
Author: Jiayu Liu <Jimexist@users.noreply.github.com>
Date:   Thu May 20 22:33:04 2021 +0800

    Update datafusion/src/logical_plan/builder.rs

    Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

commit 3ee87aa
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Wed May 19 22:55:08 2021 +0800

    fix unit test

commit 5c4d92d
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Wed May 19 22:48:26 2021 +0800

    fix clippy

commit a0b7526
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Wed May 19 22:46:38 2021 +0800

    fix unused imports

commit 1d3b076
Author: Jiayu Liu <jiayu.liu@airbnb.com>
Date:   Thu May 13 18:51:14 2021 +0800

    add window expr
  • Loading branch information
Jiayu Liu committed May 26, 2021
1 parent ad3d9c1 commit d1f53ce
Show file tree
Hide file tree
Showing 5 changed files with 372 additions and 52 deletions.
2 changes: 2 additions & 0 deletions datafusion/src/physical_plan/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod literal;
mod min_max;
mod negative;
mod not;
mod nth_value;
mod nullif;
mod row_number;
mod sum;
Expand All @@ -58,6 +59,7 @@ pub use literal::{lit, Literal};
pub use min_max::{Max, Min};
pub use negative::{negative, NegativeExpr};
pub use not::{not, NotExpr};
pub use nth_value::{FirstValue, LastValue, NthValue};
pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES};
pub use row_number::RowNumber;
pub use sum::{sum_return_type, Sum};
Expand Down
223 changes: 223 additions & 0 deletions datafusion/src/physical_plan/expressions/nth_value.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines physical expressions that can evaluated at runtime during query execution
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
window_functions::BuiltInWindowFunctionExpr, PhysicalExpr, WindowAccumulator,
};
use crate::scalar::ScalarValue;
use arrow::datatypes::{DataType, Field};
use std::any::Any;
use std::convert::TryFrom;
use std::sync::Arc;

/// first_value expression
#[derive(Debug)]
pub struct FirstValue {
name: String,
data_type: DataType,
expr: Arc<dyn PhysicalExpr>,
}

impl FirstValue {
/// Create a new FIRST_VALUE window aggregate function
pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
Self {
name,
data_type,
expr,
}
}
}

impl BuiltInWindowFunctionExpr for FirstValue {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn field(&self) -> Result<Field> {
let nullable = true;
Ok(Field::new(&self.name, self.data_type.clone(), nullable))
}

fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.expr.clone()]
}

fn name(&self) -> &str {
&self.name
}

fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
Ok(Box::new(NthValueAccumulator::try_new(
1,
self.data_type.clone(),
)?))
}
}

// sql values start with 1, so we can use 0 to indicate the special last value behavior
const SPECIAL_SIZE_VALUE_FOR_LAST: u32 = 0;

/// last_value expression
#[derive(Debug)]
pub struct LastValue {
name: String,
data_type: DataType,
expr: Arc<dyn PhysicalExpr>,
}

impl LastValue {
/// Create a new FIRST_VALUE window aggregate function
pub fn new(expr: Arc<dyn PhysicalExpr>, name: String, data_type: DataType) -> Self {
Self {
name,
data_type,
expr,
}
}
}

impl BuiltInWindowFunctionExpr for LastValue {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn field(&self) -> Result<Field> {
let nullable = true;
Ok(Field::new(&self.name, self.data_type.clone(), nullable))
}

fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.expr.clone()]
}

fn name(&self) -> &str {
&self.name
}

fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
Ok(Box::new(NthValueAccumulator::try_new(
SPECIAL_SIZE_VALUE_FOR_LAST,
self.data_type.clone(),
)?))
}
}

/// nth_value expression
#[derive(Debug)]
pub struct NthValue {
name: String,
n: u32,
data_type: DataType,
expr: Arc<dyn PhysicalExpr>,
}

impl NthValue {
/// Create a new NTH_VALUE window aggregate function
pub fn try_new(
expr: Arc<dyn PhysicalExpr>,
name: String,
n: u32,
data_type: DataType,
) -> Result<Self> {
if n == SPECIAL_SIZE_VALUE_FOR_LAST {
Err(DataFusionError::Execution(
"nth_value expect n to be > 0".to_owned(),
))
} else {
Ok(Self {
name,
n,
data_type,
expr,
})
}
}
}

impl BuiltInWindowFunctionExpr for NthValue {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn field(&self) -> Result<Field> {
let nullable = true;
Ok(Field::new(&self.name, self.data_type.clone(), nullable))
}

fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.expr.clone()]
}

fn name(&self) -> &str {
&self.name
}

fn create_accumulator(&self) -> Result<Box<dyn WindowAccumulator>> {
Ok(Box::new(NthValueAccumulator::try_new(
self.n,
self.data_type.clone(),
)?))
}
}

#[derive(Debug)]
struct NthValueAccumulator {
// n the target nth_value, however we'll reuse it for last_value acc, so when n == 0 it specifically
// means last; also note that it is totally valid for n to be larger than the number of rows input
// in which case all the values shall be null
n: u32,
offset: u32,
value: ScalarValue,
}

impl NthValueAccumulator {
/// new count accumulator
pub fn try_new(n: u32, data_type: DataType) -> Result<Self> {
Ok(Self {
n,
offset: 0,
// null value of that data_type by default
value: ScalarValue::try_from(&data_type)?,
})
}
}

impl WindowAccumulator for NthValueAccumulator {
fn scan(&mut self, values: &[ScalarValue]) -> Result<Option<ScalarValue>> {
if self.n == SPECIAL_SIZE_VALUE_FOR_LAST {
// for last_value function
self.value = values[0].clone();
} else if self.offset < self.n {
self.offset += 1;
if self.offset == self.n {
self.value = values[0].clone();
}
}
Ok(None)
}

fn evaluate(&self) -> Result<Option<ScalarValue>> {
Ok(Some(self.value.clone()))
}
}
Loading

0 comments on commit d1f53ce

Please sign in to comment.