Skip to content

Commit

Permalink
Merge pull request #1961 from ygf11/feat_runningDifference_function
Browse files Browse the repository at this point in the history
ISSUE-1787: add running difference function
  • Loading branch information
BohuTANG authored Sep 30, 2021
2 parents db89915 + 594f2f7 commit 3f9533a
Show file tree
Hide file tree
Showing 8 changed files with 717 additions and 3 deletions.
2 changes: 2 additions & 0 deletions common/functions/src/scalars/function_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::scalars::Function;
use crate::scalars::HashesFunction;
use crate::scalars::LogicFunction;
use crate::scalars::NullableFunction;
use crate::scalars::OtherFunction;
use crate::scalars::StringFunction;
use crate::scalars::ToCastFunction;
use crate::scalars::UdfFunction;
Expand All @@ -52,6 +53,7 @@ lazy_static! {
ToCastFunction::register(map.clone()).unwrap();
ConditionalFunction::register(map.clone()).unwrap();
DateFunction::register(map.clone()).unwrap();
OtherFunction::register(map.clone()).unwrap();

map
};
Expand Down
7 changes: 4 additions & 3 deletions common/functions/src/scalars/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#[cfg(test)]
mod function_column_test;

mod arithmetics;
mod comparisons;
mod conditionals;
Expand All @@ -23,11 +20,14 @@ mod expressions;
mod function;
mod function_alias;
mod function_column;
#[cfg(test)]
mod function_column_test;
mod function_factory;
mod function_literal;
mod hashes;
mod logics;
mod nullables;
mod others;
mod strings;
mod udfs;

Expand All @@ -45,5 +45,6 @@ pub use function_literal::LiteralFunction;
pub use hashes::*;
pub use logics::*;
pub use nullables::*;
pub use others::*;
pub use strings::*;
pub use udfs::*;
20 changes: 20 additions & 0 deletions common/functions/src/scalars/others/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2020 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(test)]
mod running_difference_function_test;

mod other;
mod running_difference_function;
pub use other::OtherFunction;
pub use running_difference_function::RunningDifferenceFunction;
32 changes: 32 additions & 0 deletions common/functions/src/scalars/others/other.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2020 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_exception::Result;

use super::running_difference_function::RunningDifferenceFunction;
use crate::scalars::FactoryFuncRef;

#[derive(Clone)]
pub struct OtherFunction {}

impl OtherFunction {
pub fn register(map: FactoryFuncRef) -> Result<()> {
let mut map = map.write();
map.insert(
"runningDifference".into(),
RunningDifferenceFunction::try_create,
);
Ok(())
}
}
149 changes: 149 additions & 0 deletions common/functions/src/scalars/others/running_difference_function.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2020 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

use common_arrow::arrow::array::Array;
use common_datavalues::columns::DataColumn;
use common_datavalues::prelude::DataColumnsWithField;
use common_datavalues::prelude::*;
use common_datavalues::DataSchema;
use common_datavalues::DataType;
use common_exception::ErrorCode;
use common_exception::Result;

use crate::scalars::Function;

#[derive(Clone)]
pub struct RunningDifferenceFunction {
display_name: String,
}

impl RunningDifferenceFunction {
pub fn try_create(display_name: &str) -> Result<Box<dyn Function>> {
Ok(Box::new(RunningDifferenceFunction {
display_name: display_name.to_string(),
}))
}
}

impl Function for RunningDifferenceFunction {
fn name(&self) -> &str {
self.display_name.as_str()
}

fn return_type(&self, args: &[DataType]) -> Result<DataType> {
match args[0] {
DataType::Int8 | DataType::UInt8 => Ok(DataType::Int16),
DataType::Int16 | DataType::UInt16 | DataType::Date16 => Ok(DataType::Int32),
DataType::Int32
| DataType::UInt32
| DataType::Int64
| DataType::UInt64
| DataType::Date32
| DataType::DateTime32(_) => Ok(DataType::Int64),
DataType::Float32 | DataType::Float64 => Ok(DataType::Float64),
_ => Result::Err(ErrorCode::IllegalDataType(
"Argument for function runningDifference must have numeric type",
)),
}
}

fn nullable(&self, _input_schema: &DataSchema) -> Result<bool> {
Ok(true)
}

fn eval(&self, columns: &DataColumnsWithField, input_rows: usize) -> Result<DataColumn> {
match columns[0].data_type() {
DataType::Int8 => compute_i8(columns[0].column(), input_rows),
DataType::UInt8 => compute_u8(columns[0].column(), input_rows),
DataType::Int16 => compute_i16(columns[0].column(), input_rows),
DataType::UInt16 | DataType::Date16 => compute_u16(columns[0].column(), input_rows),
DataType::Int32 => compute_i32(columns[0].column(), input_rows),
DataType::UInt32 | DataType::Date32 | DataType::DateTime32(_) => {
compute_u32(columns[0].column(), input_rows)
}
DataType::Int64 => compute_i64(columns[0].column(), input_rows),
DataType::UInt64 => compute_u64(columns[0].column(), input_rows),
DataType::Float32 => compute_f32(columns[0].column(), input_rows),
DataType::Float64 => compute_f64(columns[0].column(), input_rows),
_ => Result::Err(ErrorCode::IllegalDataType(
format!(
"Argument for function runningDifference must have numeric type.: While processing runningDifference({})",
columns[0].field().name(),
))),
}
}

fn is_deterministic(&self) -> bool {
false
}

fn num_arguments(&self) -> usize {
1
}
}

macro_rules! run_difference_compute {
($method_name:ident, $to_df_array:ident, $result_logic_type:ident, $result_primitive_type:ty) => {
fn $method_name(column: &DataColumn, input_rows: usize) -> Result<DataColumn> {
if let DataColumn::Constant(_, _) = column {
Ok(DataColumn::Constant(
DataValue::$result_logic_type(Some(0_i8 as $result_primitive_type)),
input_rows,
))
} else {
let series = column.to_array()?;
let array = series.$to_df_array()?.inner();

let mut result_vec = Vec::with_capacity(array.len());
for index in 0..array.len() {
match array.is_null(index) {
true => result_vec.push(None),
false => {
if index == 0 {
result_vec.push(Some(0_i8 as $result_primitive_type))
} else if array.is_null(index - 1) {
result_vec.push(None)
} else {
let diff = array.value(index) as $result_primitive_type
- array.value(index - 1) as $result_primitive_type;
result_vec.push(Some(diff))
}
}
}
}

Ok(Series::new(result_vec).into())
}
}
};
}

run_difference_compute!(compute_i8, i8, Int16, i16);
run_difference_compute!(compute_u8, u8, Int16, i16);
run_difference_compute!(compute_i16, i16, Int32, i32);
run_difference_compute!(compute_u16, u16, Int32, i32);
run_difference_compute!(compute_i32, i32, Int64, i64);
run_difference_compute!(compute_u32, u32, Int64, i64);
run_difference_compute!(compute_i64, i64, Int64, i64);
run_difference_compute!(compute_u64, u64, Int64, i64);
run_difference_compute!(compute_f32, f32, Float64, f64);
run_difference_compute!(compute_f64, f64, Float64, f64);

impl fmt::Display for RunningDifferenceFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}()", self.display_name)
}
}
Loading

0 comments on commit 3f9533a

Please sign in to comment.