Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-10816: [Rust][DF] Operations with Intervals #9434

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ pub enum IntervalUnit {
/// Indicates the number of elapsed whole months, stored as 4-byte integers.
YearMonth,
/// Indicates the number of elapsed days and milliseconds,
/// stored as 2 contiguous 32-bit integers (8-bytes in total).
/// stored as 2 contiguous 32-bit integers (days, milliseconds) (8-bytes in total).
DayTime,
}

Expand Down Expand Up @@ -411,6 +411,7 @@ impl BooleanType {
pub const DATA_TYPE: DataType = DataType::Boolean;
}

#[macro_export]
macro_rules! make_type {
($name:ident, $native_ty:ty, $data_ty:expr) => {
#[derive(Debug)]
Expand Down Expand Up @@ -610,6 +611,7 @@ where
#[cfg(not(simd))]
pub trait ArrowNumericType: ArrowPrimitiveType {}

#[macro_export]
macro_rules! make_numeric_type {
($impl_ty:ty, $native_ty:ty, $simd_ty:ident, $simd_mask_ty:ident) => {
#[cfg(simd)]
Expand Down
33 changes: 33 additions & 0 deletions rust/arrow/src/datatypes/interval.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use super::{ArrowNumericType, ArrowPrimitiveType, DataType, IntervalUnit};
use crate::{make_numeric_type, make_type};

make_type!(
IntervalYearMonthType,
i32,
Copy link
Member

@jorgecarleitao jorgecarleitao Feb 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this comment has nothing to do with changes from this PR; this is something already in master.

fwiw, I do not think this is correct and I opened a issue for this. IMO this should be [i32;2] or something like that.

As it stands, we will be trying to read an i32 offseted by 4 bytes from an i64, which is asking us to write masks and other things to get the values.

So, I think that we should hold this PR until we have the interval implementation correct, or I think that this may end up implementing operations under the assumption of a different physical representation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jorgecarleitao You are right, it's will be better.

DataType::Interval(IntervalUnit::YearMonth)
);
make_type!(
IntervalDayTimeType,
i64,
DataType::Interval(IntervalUnit::DayTime)
);

make_numeric_type!(IntervalYearMonthType, i32, i32x16, m32x16);
make_numeric_type!(IntervalDayTimeType, i64, i64x8, m64x8);
22 changes: 22 additions & 0 deletions rust/arrow/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

pub mod common;
pub mod interval;

pub use self::common::*;
pub use self::interval::*;
70 changes: 69 additions & 1 deletion rust/arrow/src/util/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
//! purposes. See the `pretty` crate for additional functions for
//! record batch pretty printing.

use crate::array;
use crate::array::Array;
use crate::datatypes::{
ArrowNativeType, ArrowPrimitiveType, DataType, Int16Type, Int32Type, Int64Type,
Int8Type, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
};
use crate::{array, datatypes::IntervalUnit};

use array::DictionaryArray;

Expand All @@ -44,6 +44,66 @@ macro_rules! make_string {
}};
}

macro_rules! make_string_interval_year_month {
($column: ident, $row: ident) => {{
let array = $column
.as_any()
.downcast_ref::<array::IntervalYearMonthArray>()
.unwrap();

let s = if array.is_null($row) {
"NULL".to_string()
} else {
let interval = array.value($row) as f64;
let years = (interval / 12_f64).floor();
let month = interval - (years * 12_f64);

format!(
"{} years {} mons 0 days 0 hours 0 mins 0.00 secs",
years, month,
)
};

Ok(s)
}};
}

macro_rules! make_string_interval_day_time {
($column: ident, $row: ident) => {{
let array = $column
.as_any()
.downcast_ref::<array::IntervalDayTimeArray>()
.unwrap();

let s = if array.is_null($row) {
"NULL".to_string()
} else {
let value: u64 = array.value($row) as u64;

let days_parts: i32 = ((value & 0xFFFFFFFF00000000) >> 32) as i32;
let milliseconds_part: i32 = (value & 0xFFFFFFFF) as i32;

let secs = milliseconds_part / 1000;
let mins = secs / 60;
let hours = mins / 60;

let secs = secs - (mins * 60);
let mins = mins - (hours * 60);

format!(
"0 years 0 mons {} days {} hours {} mins {}.{:02} secs",
days_parts,
hours,
mins,
secs,
(milliseconds_part % 1000),
)
};

Ok(s)
}};
}

macro_rules! make_string_date {
($array_type:ty, $column: ident, $row: ident) => {{
let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
Expand Down Expand Up @@ -180,6 +240,14 @@ pub fn array_value_to_string(column: &array::ArrayRef, row: usize) -> Result<Str
DataType::Time64(unit) if *unit == TimeUnit::Nanosecond => {
make_string_time!(array::Time64NanosecondArray, column, row)
}
DataType::Interval(unit) => match unit {
IntervalUnit::DayTime => {
make_string_interval_day_time!(column, row)
}
IntervalUnit::YearMonth => {
make_string_interval_year_month!(column, row)
}
},
DataType::List(_) => make_string_from_list!(column, row),
DataType::Dictionary(index_type, _value_type) => match **index_type {
DataType::Int8 => dict_array_value_to_string::<Int8Type>(column, row),
Expand Down
21 changes: 19 additions & 2 deletions rust/datafusion/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use arrow::compute::kernels::comparison::{
eq_utf8_scalar, gt_eq_utf8_scalar, gt_utf8_scalar, lt_eq_utf8_scalar, lt_utf8_scalar,
neq_utf8_scalar,
};
use arrow::datatypes::{DataType, Schema, TimeUnit};
use arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit};
use arrow::record_batch::RecordBatch;

use crate::error::{DataFusionError, Result};
Expand All @@ -41,7 +41,10 @@ use crate::physical_plan::expressions::cast;
use crate::physical_plan::{ColumnarValue, PhysicalExpr};
use crate::scalar::ScalarValue;

use super::coercion::{eq_coercion, numerical_coercion, order_coercion, string_coercion};
use super::coercion::{
eq_coercion, interval_coercion, numerical_coercion, order_coercion, string_coercion,
temporal_coercion,
};

/// Binary expression
#[derive(Debug)]
Expand Down Expand Up @@ -201,6 +204,12 @@ macro_rules! binary_primitive_array_op {
DataType::UInt64 => compute_op!($LEFT, $RIGHT, $OP, UInt64Array),
DataType::Float32 => compute_op!($LEFT, $RIGHT, $OP, Float32Array),
DataType::Float64 => compute_op!($LEFT, $RIGHT, $OP, Float64Array),
DataType::Interval(IntervalUnit::YearMonth) => {
compute_op!($LEFT, $RIGHT, $OP, IntervalYearMonthArray)
}
DataType::Interval(IntervalUnit::DayTime) => {
compute_op!($LEFT, $RIGHT, $OP, IntervalDayTimeArray)
}
other => Err(DataFusionError::Internal(format!(
"Unsupported data type {:?}",
other
Expand Down Expand Up @@ -231,6 +240,12 @@ macro_rules! binary_array_op_scalar {
DataType::Date32 => {
compute_op_scalar!($LEFT, $RIGHT, $OP, Date32Array)
}
DataType::Interval(IntervalUnit::YearMonth) => {
compute_op_scalar!($LEFT, $RIGHT, $OP, IntervalYearMonthArray)
}
DataType::Interval(IntervalUnit::DayTime) => {
compute_op_scalar!($LEFT, $RIGHT, $OP, IntervalDayTimeArray)
}
other => Err(DataFusionError::Internal(format!(
"Unsupported data type {:?}",
other
Expand Down Expand Up @@ -315,6 +330,8 @@ fn common_binary_type(
// because coercion favours higher information types
Operator::Plus | Operator::Minus | Operator::Divide | Operator::Multiply => {
numerical_coercion(lhs_type, rhs_type)
.or_else(|| temporal_coercion(lhs_type, rhs_type))
.or_else(|| interval_coercion(lhs_type, rhs_type))
}
Operator::Modulus => {
return Err(DataFusionError::NotImplemented(
Expand Down
36 changes: 36 additions & 0 deletions rust/datafusion/src/physical_plan/expressions/coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,47 @@ pub fn string_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataT
/// casted to for the purpose of a date computation
pub fn temporal_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
use arrow::datatypes::DataType::*;
use arrow::datatypes::IntervalUnit;
use arrow::datatypes::TimeUnit;

match (lhs_type, rhs_type) {
// Date32
(Utf8, Date32) => Some(Date32),
(Date32, Utf8) => Some(Date32),
// Date64
(Utf8, Date64) => Some(Date64),
(Date64, Utf8) => Some(Date64),
//
(
DataType::Timestamp(TimeUnit::Nanosecond, tz),
Interval(IntervalUnit::DayTime),
) => match tz {
Some(tz) => Some(DataType::Timestamp(TimeUnit::Nanosecond, Some(tz.clone()))),
None => Some(DataType::Timestamp(TimeUnit::Nanosecond, None)),
},
(
Interval(IntervalUnit::DayTime),
DataType::Timestamp(TimeUnit::Nanosecond, tz),
) => match tz {
Some(tz) => Some(DataType::Timestamp(TimeUnit::Nanosecond, Some(tz.clone()))),
None => Some(DataType::Timestamp(TimeUnit::Nanosecond, None)),
},
//
_ => None,
}
}

pub fn interval_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
use arrow::datatypes::DataType::*;
use arrow::datatypes::IntervalUnit;

match (lhs_type, rhs_type) {
(Interval(IntervalUnit::YearMonth), Interval(IntervalUnit::YearMonth)) => {
Some(Interval(IntervalUnit::YearMonth))
}
(Interval(IntervalUnit::DayTime), Interval(IntervalUnit::DayTime)) => {
Some(Interval(IntervalUnit::DayTime))
}
_ => None,
}
}
Expand Down
46 changes: 41 additions & 5 deletions rust/datafusion/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@

use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc};

use arrow::array::{
Array, BooleanArray, Date32Array, Float32Array, Float64Array, Int16Array, Int32Array,
Int64Array, Int8Array, LargeStringArray, ListArray, StringArray, UInt16Array,
UInt32Array, UInt64Array, UInt8Array,
};
use arrow::array::{
Int16Builder, Int32Builder, Int64Builder, Int8Builder, ListBuilder,
TimestampMicrosecondArray, TimestampNanosecondArray, UInt16Builder, UInt32Builder,
Expand All @@ -33,6 +28,15 @@ use arrow::{
array::ArrayRef,
datatypes::{DataType, Field},
};
use arrow::{
array::{
Array, BooleanArray, Date32Array, Float32Array, Float64Array, Int16Array,
Int32Array, Int64Array, Int8Array, IntervalDayTimeArray, IntervalYearMonthArray,
LargeStringArray, ListArray, StringArray, UInt16Array, UInt32Array, UInt64Array,
UInt8Array,
},
datatypes::IntervalUnit,
};

use crate::error::{DataFusionError, Result};
use arrow::datatypes::TimeUnit;
Expand Down Expand Up @@ -75,6 +79,10 @@ pub enum ScalarValue {
TimeMicrosecond(Option<i64>),
/// Timestamp Nanoseconds
TimeNanosecond(Option<i64>),
/// Interval with YearMonth unit
IntervalYearMonth(Option<i32>),
/// Interval with DayTime unit
IntervalDayTime(Option<i64>),
}

macro_rules! typed_cast {
Expand Down Expand Up @@ -148,6 +156,10 @@ impl ScalarValue {
DataType::List(Box::new(Field::new("item", data_type.clone(), true)))
}
ScalarValue::Date32(_) => DataType::Date32,
ScalarValue::IntervalYearMonth(_) => {
DataType::Interval(IntervalUnit::YearMonth)
}
ScalarValue::IntervalDayTime(_) => DataType::Interval(IntervalUnit::DayTime),
}
}

Expand Down Expand Up @@ -317,6 +329,22 @@ impl ScalarValue {
}
None => Arc::new(repeat(None).take(size).collect::<Date32Array>()),
},
ScalarValue::IntervalDayTime(e) => match e {
Some(value) => Arc::new(IntervalDayTimeArray::from_iter_values(
repeat(*value).take(size),
)),
None => {
Arc::new(repeat(None).take(size).collect::<IntervalDayTimeArray>())
}
},
ScalarValue::IntervalYearMonth(e) => match e {
Some(value) => Arc::new(IntervalYearMonthArray::from_iter_values(
repeat(*value).take(size),
)),
None => {
Arc::new(repeat(None).take(size).collect::<IntervalYearMonthArray>())
}
},
}
}

Expand Down Expand Up @@ -552,6 +580,8 @@ impl fmt::Display for ScalarValue {
None => write!(f, "NULL")?,
},
ScalarValue::Date32(e) => format_option!(f, e)?,
ScalarValue::IntervalDayTime(e) => format_option!(f, e)?,
ScalarValue::IntervalYearMonth(e) => format_option!(f, e)?,
};
Ok(())
}
Expand Down Expand Up @@ -579,6 +609,12 @@ impl fmt::Debug for ScalarValue {
ScalarValue::LargeUtf8(Some(_)) => write!(f, "LargeUtf8(\"{}\")", self),
ScalarValue::List(_, _) => write!(f, "List([{}])", self),
ScalarValue::Date32(_) => write!(f, "Date32(\"{}\")", self),
ScalarValue::IntervalDayTime(_) => {
write!(f, "IntervalDayTime(\"{}\")", self)
}
ScalarValue::IntervalYearMonth(_) => {
write!(f, "IntervalYearMonth(\"{}\")", self)
}
}
}
}
Expand Down
Loading