Skip to content

Commit

Permalink
fix decimal add because arrow2 doesn't include decimal add in arithme…
Browse files Browse the repository at this point in the history
…tics::add
  • Loading branch information
Igosuki committed Feb 3, 2022
1 parent 5ad5f7c commit b8f9bc2
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 44 deletions.
2 changes: 1 addition & 1 deletion datafusion/benches/data_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ fn create_record_batch(
schema,
vec![
Arc::new(Utf8Array::<i32>::from_slice(keys)),
Arc::new(Float32Array::from_slice(&[i as f32; batch_size])),
Arc::new(Float32Array::from_slice(vec![i as f32; batch_size])),
Arc::new(Float64Array::from(values)),
Arc::new(UInt64Array::from(integer_values_wide)),
Arc::new(UInt64Array::from_slice(integer_values_narrow)),
Expand Down
70 changes: 44 additions & 26 deletions datafusion/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::{any::Any, convert::TryInto, sync::Arc};
use crate::record_batch::RecordBatch;
use arrow::array::*;
use arrow::compute;
use arrow::datatypes::DataType::Decimal;
use arrow::datatypes::{DataType, Schema};

use crate::error::{DataFusionError, Result};
Expand Down Expand Up @@ -247,9 +248,24 @@ fn evaluate_regex_case_insensitive<O: Offset>(

fn evaluate(lhs: &dyn Array, op: &Operator, rhs: &dyn Array) -> Result<Arc<dyn Array>> {
use Operator::*;
if matches!(op, Plus | Minus | Divide | Multiply | Modulo | BitwiseAnd) {
if matches!(op, Plus) {
let arr: ArrayRef = match (lhs.data_type(), rhs.data_type()) {
(Decimal(p1, s1), Decimal(p2, s2)) => {
let left_array =
lhs.as_any().downcast_ref::<PrimitiveArray<i128>>().unwrap();
let right_array =
rhs.as_any().downcast_ref::<PrimitiveArray<i128>>().unwrap();
Arc::new(if *p1 == *p2 && *s1 == *s2 {
compute::arithmetics::decimal::add(left_array, right_array)
} else {
compute::arithmetics::decimal::adaptive_add(left_array, right_array)?
})
}
_ => compute::arithmetics::add(lhs, rhs).into(),
};
Ok(arr)
} else if matches!(op, Minus | Divide | Multiply | Modulo) {
let arr = match op {
Operator::Plus => compute::arithmetics::add(lhs, rhs),
Operator::Minus => compute::arithmetics::sub(lhs, rhs),
Operator::Divide => compute::arithmetics::div(lhs, rhs),
Operator::Multiply => compute::arithmetics::mul(lhs, rhs),
Expand Down Expand Up @@ -828,6 +844,7 @@ mod tests {
use crate::error::Result;
use crate::field_util::SchemaExt;
use crate::physical_plan::expressions::{col, lit};
use crate::test_util::create_decimal_array;
use arrow::datatypes::{Field, SchemaRef};
use arrow::error::ArrowError;

Expand Down Expand Up @@ -1015,7 +1032,11 @@ mod tests {
}

fn add_decimal(left: &Int128Array, right: &Int128Array) -> Result<Int128Array> {
let mut decimal_builder = Int128Vec::with_capacity(left.len());
let mut decimal_builder = Int128Vec::from_data(
left.data_type().clone(),
Vec::<i128>::with_capacity(left.len()),
None,
);
for i in 0..left.len() {
if left.is_null(i) || right.is_null(i) {
decimal_builder.push(None);
Expand All @@ -1027,7 +1048,11 @@ mod tests {
}

fn subtract_decimal(left: &Int128Array, right: &Int128Array) -> Result<Int128Array> {
let mut decimal_builder = Int128Vec::with_capacity(left.len());
let mut decimal_builder = Int128Vec::from_data(
left.data_type().clone(),
Vec::<i128>::with_capacity(left.len()),
None,
);
for i in 0..left.len() {
if left.is_null(i) || right.is_null(i) {
decimal_builder.push(None);
Expand All @@ -1043,7 +1068,11 @@ mod tests {
right: &Int128Array,
scale: u32,
) -> Result<Int128Array> {
let mut decimal_builder = Int128Vec::with_capacity(left.len());
let mut decimal_builder = Int128Vec::from_data(
left.data_type().clone(),
Vec::<i128>::with_capacity(left.len()),
None,
);
let divide = 10_i128.pow(scale);
for i in 0..left.len() {
if left.is_null(i) || right.is_null(i) {
Expand All @@ -1061,7 +1090,11 @@ mod tests {
right: &Int128Array,
scale: i32,
) -> Result<Int128Array> {
let mut decimal_builder = Int128Vec::with_capacity(left.len());
let mut decimal_builder = Int128Vec::from_data(
left.data_type().clone(),
Vec::<i128>::with_capacity(left.len()),
None,
);
let mul = 10_f64.powi(scale);
for i in 0..left.len() {
if left.is_null(i) || right.is_null(i) {
Expand All @@ -1081,7 +1114,11 @@ mod tests {
}

fn modulus_decimal(left: &Int128Array, right: &Int128Array) -> Result<Int128Array> {
let mut decimal_builder = Int128Vec::with_capacity(left.len());
let mut decimal_builder = Int128Vec::from_data(
left.data_type().clone(),
Vec::<i128>::with_capacity(left.len()),
None,
);
for i in 0..left.len() {
if left.is_null(i) || right.is_null(i) {
decimal_builder.push(None);
Expand Down Expand Up @@ -2135,25 +2172,6 @@ mod tests {
assert_eq!(result.as_ref(), &expected as &dyn Array);
}

fn create_decimal_array(
array: &[Option<i128>],
_precision: usize,
_scale: usize,
) -> Result<Int128Array> {
let mut decimal_builder = Int128Vec::with_capacity(array.len());
for value in array {
match value {
None => {
decimal_builder.push(None);
}
Some(v) => {
decimal_builder.try_push(Some(*v))?;
}
}
}
Ok(decimal_builder.into())
}

#[test]
fn comparison_decimal_op_test() -> Result<()> {
let value_i128: i128 = 123;
Expand Down
18 changes: 9 additions & 9 deletions datafusion/src/physical_plan/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ pub fn cast_with_error(
) -> Result<Box<dyn Array>> {
let result = cast::cast(array, cast_type, options)?;
if result.null_count() != array.null_count() {
println!("{result:?} : {array:?}");
let casted_valids = result.validity().unwrap();
let failed_casts = match array.validity() {
Some(valids) => valids ^ casted_valids,
Expand Down Expand Up @@ -192,6 +191,7 @@ mod tests {
use crate::error::Result;
use crate::field_util::SchemaExt;
use crate::physical_plan::expressions::col;
use crate::test_util::create_decimal_array_from_slice;
use arrow::{array::*, datatypes::*};

type StringArray = Utf8Array<i32>;
Expand Down Expand Up @@ -298,7 +298,7 @@ mod tests {
#[test]
fn test_cast_decimal_to_decimal() -> Result<()> {
let array: Vec<i128> = vec![1234, 2222, 3, 4000, 5000];
let decimal_array = Int128Array::from_slice(&array);
let decimal_array = create_decimal_array_from_slice(&array, 10, 3)?;
generic_decimal_to_other_test_cast!(
decimal_array,
DataType::Decimal(10, 3),
Expand All @@ -315,7 +315,7 @@ mod tests {
DEFAULT_DATAFUSION_CAST_OPTIONS
);

let decimal_array = Int128Array::from_slice(&array);
let decimal_array = create_decimal_array_from_slice(&array, 10, 3)?;
generic_decimal_to_other_test_cast!(
decimal_array,
DataType::Decimal(10, 3),
Expand All @@ -339,7 +339,7 @@ mod tests {
fn test_cast_decimal_to_numeric() -> Result<()> {
let array: Vec<i128> = vec![1, 2, 3, 4, 5];
// decimal to i8
let decimal_array = Int128Array::from_slice(&array);
let decimal_array = create_decimal_array_from_slice(&array, 10, 3)?;
generic_decimal_to_other_test_cast!(
decimal_array,
DataType::Decimal(10, 0),
Expand All @@ -356,7 +356,7 @@ mod tests {
DEFAULT_DATAFUSION_CAST_OPTIONS
);
// decimal to i16
let decimal_array = Int128Array::from_slice(&array);
let decimal_array = create_decimal_array_from_slice(&array, 10, 3)?;
generic_decimal_to_other_test_cast!(
decimal_array,
DataType::Decimal(10, 0),
Expand All @@ -373,7 +373,7 @@ mod tests {
DEFAULT_DATAFUSION_CAST_OPTIONS
);
// decimal to i32
let decimal_array = Int128Array::from_slice(&array);
let decimal_array = create_decimal_array_from_slice(&array, 10, 3)?;
generic_decimal_to_other_test_cast!(
decimal_array,
DataType::Decimal(10, 0),
Expand All @@ -390,7 +390,7 @@ mod tests {
DEFAULT_DATAFUSION_CAST_OPTIONS
);
// decimal to i64
let decimal_array = Int128Array::from_slice(&array);
let decimal_array = create_decimal_array_from_slice(&array, 10, 3)?;
generic_decimal_to_other_test_cast!(
decimal_array,
DataType::Decimal(10, 0),
Expand All @@ -408,7 +408,7 @@ mod tests {
);
// decimal to float32
let array: Vec<i128> = vec![1234, 2222, 3, 4000, 5000];
let decimal_array = Int128Array::from_slice(&array);
let decimal_array = create_decimal_array_from_slice(&array, 10, 3)?;
generic_decimal_to_other_test_cast!(
decimal_array,
DataType::Decimal(10, 3),
Expand All @@ -425,7 +425,7 @@ mod tests {
DEFAULT_DATAFUSION_CAST_OPTIONS
);
// decimal to float64
let decimal_array = Int128Array::from_slice(&array);
let decimal_array = create_decimal_array_from_slice(&array, 10, 3)?;
generic_decimal_to_other_test_cast!(
decimal_array,
DataType::Decimal(20, 6),
Expand Down
17 changes: 9 additions & 8 deletions datafusion/src/physical_plan/expressions/try_cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ mod tests {
use crate::error::Result;
use crate::field_util::SchemaExt;
use crate::physical_plan::expressions::col;
use crate::test_util::create_decimal_array_from_slice;
use arrow::{array::*, datatypes::*};

type StringArray = Utf8Array<i32>;
Expand Down Expand Up @@ -234,7 +235,7 @@ mod tests {
fn test_try_cast_decimal_to_decimal() -> Result<()> {
// try cast one decimal data type to another decimal data type
let array: Vec<i128> = vec![1234, 2222, 3, 4000, 5000];
let decimal_array = Int128Array::from_slice(&array);
let decimal_array = create_decimal_array_from_slice(&array, 10, 3)?;
generic_decimal_to_other_test_cast!(
decimal_array,
DataType::Decimal(10, 3),
Expand All @@ -250,7 +251,7 @@ mod tests {
]
);

let decimal_array = Int128Array::from_slice(&array);
let decimal_array = create_decimal_array_from_slice(&array, 10, 3)?;
generic_decimal_to_other_test_cast!(
decimal_array,
DataType::Decimal(10, 3),
Expand All @@ -274,7 +275,7 @@ mod tests {
// TODO we should add function to create Int128Array with value and metadata
// https://github.com/apache/arrow-rs/issues/1009
let array: Vec<i128> = vec![1, 2, 3, 4, 5];
let decimal_array = Int128Array::from_slice(&array);
let decimal_array = create_decimal_array_from_slice(&array, 10, 3)?;
// decimal to i8
generic_decimal_to_other_test_cast!(
decimal_array,
Expand All @@ -292,7 +293,7 @@ mod tests {
);

// decimal to i16
let decimal_array = Int128Array::from_slice(&array);
let decimal_array = create_decimal_array_from_slice(&array, 10, 3)?;
generic_decimal_to_other_test_cast!(
decimal_array,
DataType::Decimal(10, 0),
Expand All @@ -309,7 +310,7 @@ mod tests {
);

// decimal to i32
let decimal_array = Int128Array::from_slice(&array);
let decimal_array = create_decimal_array_from_slice(&array, 10, 3)?;
generic_decimal_to_other_test_cast!(
decimal_array,
DataType::Decimal(10, 0),
Expand All @@ -326,7 +327,7 @@ mod tests {
);

// decimal to i64
let decimal_array = Int128Array::from_slice(&array);
let decimal_array = create_decimal_array_from_slice(&array, 10, 3)?;
generic_decimal_to_other_test_cast!(
decimal_array,
DataType::Decimal(10, 0),
Expand All @@ -344,7 +345,7 @@ mod tests {

// decimal to float32
let array: Vec<i128> = vec![1234, 2222, 3, 4000, 5000];
let decimal_array = Int128Array::from_slice(&array);
let decimal_array = create_decimal_array_from_slice(&array, 10, 3)?;
generic_decimal_to_other_test_cast!(
decimal_array,
DataType::Decimal(10, 3),
Expand All @@ -360,7 +361,7 @@ mod tests {
]
);
// decimal to float64
let decimal_array = Int128Array::from_slice(&array);
let decimal_array = create_decimal_array_from_slice(&array, 10, 3)?;
generic_decimal_to_other_test_cast!(
decimal_array,
DataType::Decimal(20, 6),
Expand Down
37 changes: 37 additions & 0 deletions datafusion/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,3 +334,40 @@ mod tests {
assert!(PathBuf::from(res).is_dir());
}
}

#[cfg(test)]
pub fn create_decimal_array(
array: &[Option<i128>],
precision: usize,
scale: usize,
) -> crate::error::Result<arrow::array::Int128Array> {
use arrow::array::{Int128Vec, TryPush};
let mut decimal_builder = Int128Vec::from_data(
DataType::Decimal(precision, scale),
Vec::<i128>::with_capacity(array.len()),
None,
);

for value in array {
match value {
None => {
decimal_builder.push(None);
}
Some(v) => {
decimal_builder.try_push(Some(*v))?;
}
}
}
Ok(decimal_builder.into())
}

#[cfg(test)]
pub fn create_decimal_array_from_slice(
array: &[i128],
precision: usize,
scale: usize,
) -> crate::error::Result<arrow::array::Int128Array> {
let decimal_array_values: Vec<Option<i128>> =
array.into_iter().map(|v| Some(*v)).collect();
create_decimal_array(&decimal_array_values, precision, scale)
}

0 comments on commit b8f9bc2

Please sign in to comment.