Skip to content

Commit

Permalink
add min/max for time (#3178)
Browse files Browse the repository at this point in the history
  • Loading branch information
waitingkuo authored Aug 16, 2022
1 parent c0d5e23 commit fe42dd7
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 4 deletions.
19 changes: 19 additions & 0 deletions datafusion/core/tests/sql/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1456,6 +1456,25 @@ async fn aggregate_timestamps_avg() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn aggregate_time_min_and_max() -> Result<()> {
let ctx = SessionContext::new();

let sql = "select min(t), max(t) from (select '00:00:00' as t union select '00:00:01' union select '00:00:02');";
let results = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+----------+----------+",
"| MIN(t) | MAX(t) |",
"+----------+----------+",
"| 00:00:00 | 00:00:02 |",
"+----------+----------+",
];

assert_batches_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn aggregate_decimal_min() -> Result<()> {
let ctx = SessionContext::new();
Expand Down
8 changes: 8 additions & 0 deletions datafusion/expr/src/aggregate_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ pub static TIMESTAMPS: &[DataType] = &[

pub static DATES: &[DataType] = &[DataType::Date32, DataType::Date64];

pub static TIMES: &[DataType] = &[
DataType::Time32(TimeUnit::Second),
DataType::Time32(TimeUnit::Millisecond),
DataType::Time64(TimeUnit::Microsecond),
DataType::Time64(TimeUnit::Nanosecond),
];

/// Enum of all built-in aggregate functions
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum AggregateFunction {
Expand Down Expand Up @@ -354,6 +361,7 @@ pub fn signature(fun: &AggregateFunction) -> Signature {
.chain(NUMERICS.iter())
.chain(TIMESTAMPS.iter())
.chain(DATES.iter())
.chain(TIMES.iter())
.cloned()
.collect::<Vec<_>>();
Signature::uniform(1, valid, Volatility::Immutable)
Expand Down
41 changes: 37 additions & 4 deletions datafusion/physical-expr/src/aggregate/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ use arrow::{
array::{
ArrayRef, BasicDecimalArray, Date32Array, Date64Array, Float32Array,
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray,
StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
UInt64Array, UInt8Array,
StringArray, Time64NanosecondArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
UInt16Array, UInt32Array, UInt64Array, UInt8Array,
},
datatypes::Field,
};
Expand Down Expand Up @@ -257,6 +257,9 @@ macro_rules! min_max_batch {
),
DataType::Date32 => typed_min_max_batch!($VALUES, Date32Array, Date32, $OP),
DataType::Date64 => typed_min_max_batch!($VALUES, Date64Array, Date64, $OP),
DataType::Time64(TimeUnit::Nanosecond) => {
typed_min_max_batch!($VALUES, Time64NanosecondArray, Time64, $OP)
}
other => {
// This should have been handled before
return Err(DataFusionError::Internal(format!(
Expand Down Expand Up @@ -433,12 +436,18 @@ macro_rules! min_max {
) => {
typed_min_max!(lhs, rhs, Date32, $OP)
}
(
(
ScalarValue::Date64(lhs),
ScalarValue::Date64(rhs),
) => {
typed_min_max!(lhs, rhs, Date64, $OP)
}
(
ScalarValue::Time64(lhs),
ScalarValue::Time64(rhs),
) => {
typed_min_max!(lhs, rhs, Time64, $OP)
}
e => {
return Err(DataFusionError::Internal(format!(
"MIN/MAX is not expected to receive scalars of incompatible types {:?}",
Expand Down Expand Up @@ -1190,4 +1199,28 @@ mod tests {
DataType::Date64
)
}

#[test]
fn min_time64() -> Result<()> {
let a: ArrayRef = Arc::new(Time64NanosecondArray::from(vec![1, 2, 3, 4, 5]));
generic_test_op!(
a,
DataType::Time64(TimeUnit::Nanosecond),
Max,
ScalarValue::Time64(Some(5)),
DataType::Time64(TimeUnit::Nanosecond)
)
}

#[test]
fn max_time64() -> Result<()> {
let a: ArrayRef = Arc::new(Time64NanosecondArray::from(vec![1, 2, 3, 4, 5]));
generic_test_op!(
a,
DataType::Time64(TimeUnit::Nanosecond),
Max,
ScalarValue::Time64(Some(5)),
DataType::Time64(TimeUnit::Nanosecond)
)
}
}

0 comments on commit fe42dd7

Please sign in to comment.