Skip to content

Commit

Permalink
feat: Add date_bin built-in function
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartcarnie committed Aug 4, 2022
1 parent c57fc86 commit 2de4f26
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 5 deletions.
76 changes: 76 additions & 0 deletions datafusion/core/tests/sql/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1065,3 +1065,79 @@ async fn cast_to_timestamp_micros_twice() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn date_bin() {
let ctx = SessionContext::new();

let sql = "SELECT DATE_BIN(INTERVAL '15 minutes', TIMESTAMP '2022-08-03 14:38:50Z', TIMESTAMP '1970-01-01T00:00:00Z') AS res";
let results = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+---------------------+",
"| res |",
"+---------------------+",
"| 2022-08-03 14:30:00 |",
"+---------------------+",
];
assert_batches_eq!(expected, &results);

// Shift forward by 5 minutes
let sql = "SELECT DATE_BIN(INTERVAL '15 minutes', TIMESTAMP '2022-08-03 14:38:50Z', TIMESTAMP '1970-01-01T00:05:00Z') AS res";
let results = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+---------------------+",
"| res |",
"+---------------------+",
"| 2022-08-03 14:35:00 |",
"+---------------------+",
];
assert_batches_eq!(expected, &results);

// Shift backward by 5 minutes
let sql = "SELECT DATE_BIN(INTERVAL '15 minutes', TIMESTAMP '2022-08-03 14:38:50Z', TIMESTAMP '1970-01-01T23:55:00Z') AS res";
let results = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+---------------------+",
"| res |",
"+---------------------+",
"| 2022-08-03 14:25:00 |",
"+---------------------+",
];
assert_batches_eq!(expected, &results);

// origin after source, timestamp in previous bucket
let sql = "SELECT DATE_BIN(INTERVAL '15 minutes', TIMESTAMP '2022-08-03 14:38:50Z', TIMESTAMP '2022-08-03 14:40:00Z') AS res";
let results = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+---------------------+",
"| res |",
"+---------------------+",
"| 2022-08-03 14:25:00 |",
"+---------------------+",
];
assert_batches_eq!(expected, &results);

// stride by 7 days
let sql = "SELECT DATE_BIN(INTERVAL '7 days', TIMESTAMP '2022-08-03 14:38:50Z', TIMESTAMP '1970-01-01 00:00:00Z') AS res";
let results = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+---------------------+",
"| res |",
"+---------------------+",
"| 2022-07-28 00:00:00 |",
"+---------------------+",
];
assert_batches_eq!(expected, &results);

// origin shifts bins forward 1 day
let sql = "SELECT DATE_BIN(INTERVAL '7 days', TIMESTAMP '2022-08-03 14:38:50Z', TIMESTAMP '1970-01-02 00:00:00Z') AS res";
let results = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+---------------------+",
"| res |",
"+---------------------+",
"| 2022-07-29 00:00:00 |",
"+---------------------+",
];
assert_batches_eq!(expected, &results);
}
4 changes: 4 additions & 0 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ pub enum BuiltinScalarFunction {
DatePart,
/// date_trunc
DateTrunc,
/// date_bin
DateBin,
/// initcap
InitCap,
/// left
Expand Down Expand Up @@ -210,6 +212,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ConcatWithSeparator => Volatility::Immutable,
BuiltinScalarFunction::DatePart => Volatility::Immutable,
BuiltinScalarFunction::DateTrunc => Volatility::Immutable,
BuiltinScalarFunction::DateBin => Volatility::Immutable,
BuiltinScalarFunction::InitCap => Volatility::Immutable,
BuiltinScalarFunction::Left => Volatility::Immutable,
BuiltinScalarFunction::Lpad => Volatility::Immutable,
Expand Down Expand Up @@ -303,6 +306,7 @@ impl FromStr for BuiltinScalarFunction {
"chr" => BuiltinScalarFunction::Chr,
"date_part" | "datepart" => BuiltinScalarFunction::DatePart,
"date_trunc" | "datetrunc" => BuiltinScalarFunction::DateTrunc,
"date_bin" | "datebin" => BuiltinScalarFunction::DateBin,
"initcap" => BuiltinScalarFunction::InitCap,
"left" => BuiltinScalarFunction::Left,
"length" => BuiltinScalarFunction::CharacterLength,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ nary_scalar_expr!(Now, now_expr);
// date functions
scalar_expr!(DatePart, date_part, part, date);
scalar_expr!(DateTrunc, date_trunc, part, date);
scalar_expr!(DateBin, date_bin, stride, source, origin);
scalar_expr!(ToTimestampMillis, to_timestamp_millis, date);
scalar_expr!(ToTimestampMicros, to_timestamp_micros, date);
scalar_expr!(ToTimestampSeconds, to_timestamp_seconds, date);
Expand Down Expand Up @@ -604,6 +605,7 @@ mod test {

test_scalar_expr!(DatePart, date_part, part, date);
test_scalar_expr!(DateTrunc, date_trunc, part, date);
test_scalar_expr!(DateBin, date_bin, stride, source, origin);
test_scalar_expr!(FromUnixtime, from_unixtime, unixtime);
}

Expand Down
20 changes: 19 additions & 1 deletion datafusion/expr/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
array_expressions, conditional_expressions, struct_expressions, Accumulator,
BuiltinScalarFunction, Signature, TypeSignature,
};
use arrow::datatypes::{DataType, Field, TimeUnit};
use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
use datafusion_common::{DataFusionError, Result};
use std::sync::Arc;

Expand Down Expand Up @@ -120,6 +120,9 @@ pub fn return_type(
BuiltinScalarFunction::DateTrunc => {
Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
}
BuiltinScalarFunction::DateBin => {
Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
}
BuiltinScalarFunction::InitCap => {
utf8_to_str_type(&input_expr_types[0], "initcap")
}
Expand Down Expand Up @@ -406,6 +409,21 @@ pub fn signature(fun: &BuiltinScalarFunction) -> Signature {
],
fun.volatility(),
),
BuiltinScalarFunction::DateBin => Signature::one_of(
vec![
TypeSignature::Exact(vec![
DataType::Interval(IntervalUnit::DayTime),
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, None),
]),
TypeSignature::Exact(vec![
DataType::Utf8,
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, None),
]),
],
fun.volatility(),
),
BuiltinScalarFunction::DatePart => Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Utf8, DataType::Date32]),
Expand Down
132 changes: 130 additions & 2 deletions datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use arrow::{
array::{Array, ArrayRef, GenericStringArray, OffsetSizeTrait, PrimitiveArray},
compute::kernels::cast_utils::string_to_timestamp_nanos,
datatypes::{
ArrowPrimitiveType, DataType, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
ArrowPrimitiveType, DataType, IntervalDayTimeType, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
},
};
use arrow::{
Expand Down Expand Up @@ -275,6 +275,90 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
})
}

fn date_bin_single(stride: i64, source: i64, origin: i64) -> Result<i64> {
let time_diff = source - origin;
// distance to bin
let time_delta = time_diff - (time_diff % stride);

let time_delta = if time_diff < 0 && stride > 1 {
// The origin is later than the source timestamp, ∴ round down to the previous bin
time_delta - stride
} else {
time_delta
};

Ok(origin + time_delta)
}

/// DATE_BIN sql function
pub fn date_bin(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 3 {
return Err(DataFusionError::Execution(
"Expected three arguments for DATE_BIN".to_string(),
));
}

let (stride, array, origin) = (&args[0], &args[1], &args[2]);

let stride = match stride {
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
let (days, ms) = IntervalDayTimeType::to_parts(*v);
let nanos = (Duration::days(days as i64) + Duration::milliseconds(ms as i64))
.num_nanoseconds();
match nanos {
Some(v) => v,
_ => {
return Err(DataFusionError::Execution(
"stride of `DATE_BIN` is too large".to_string(),
))
}
}
}
_ => {
return Err(DataFusionError::Execution(
"stride of `DATE_BIN` is an invalid type".to_string(),
))
}
};

let origin = match origin {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(v), _)) => *v,
_ => {
return Err(DataFusionError::Execution(
"origin of `DATE_BIN` must ".to_string(),
))
}
};

let f = |x: Option<i64>| x.map(|x| date_bin_single(stride, x, origin)).transpose();

Ok(match array {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
(f)(*v)?,
tz_opt.clone(),
))
}
ColumnarValue::Array(array) => {
let array = array
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
let array = array
.iter()
.map(f)
.collect::<Result<TimestampNanosecondArray>>()?;

ColumnarValue::Array(Arc::new(array))
}
_ => {
return Err(DataFusionError::Execution(
"array of `DATE_BIN` must be non-null scalar timestamp".to_string(),
));
}
})
}

macro_rules! extract_date_part {
($ARRAY: expr, $FN:expr) => {
match $ARRAY.data_type() {
Expand Down Expand Up @@ -502,6 +586,50 @@ mod tests {
});
}

#[test]
fn date_bin() {
use chrono::Duration;

let cases = vec![
(
(
Duration::minutes(15),
"2004-04-09T02:03:04.123456789Z",
"2001-01-01T00:00:00",
),
"2004-04-09T02:00:00Z",
),
(
(
Duration::minutes(15),
"2004-04-09T02:03:04.123456789Z",
"2001-01-01T00:02:30",
),
"2004-04-09T02:02:30Z",
),
(
(
Duration::minutes(15),
"2004-04-09T02:03:04.123456789Z",
"2005-01-01T00:02:30",
),
"2004-04-09T02:02:30Z",
),
];

cases
.iter()
.for_each(|((stride, source, origin), expected)| {
let stride1 = stride.num_nanoseconds().unwrap();
let source1 = string_to_timestamp_nanos(source).unwrap();
let origin1 = string_to_timestamp_nanos(origin).unwrap();

let expected1 = string_to_timestamp_nanos(expected).unwrap();
let result = date_bin_single(stride1, source1, origin1).unwrap();
assert_eq!(result, expected1, "{} = {}", source, expected);
})
}

#[test]
fn to_timestamp_invalid_input_type() -> Result<()> {
// pass the wrong type of input array to to_timestamp and test
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ pub fn create_physical_fun(
}
BuiltinScalarFunction::DatePart => Arc::new(datetime_expressions::date_part),
BuiltinScalarFunction::DateTrunc => Arc::new(datetime_expressions::date_trunc),
BuiltinScalarFunction::DateBin => Arc::new(datetime_expressions::date_bin),
BuiltinScalarFunction::Now => {
// bind value for now at plan time
Arc::new(datetime_expressions::make_now(
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ enum ScalarFunction {
StructFun=65;
FromUnixtime=66;
Atan2=67;
DateBin=68;
}

message ScalarFunctionNode {
Expand Down
10 changes: 8 additions & 2 deletions datafusion/proto/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use datafusion_expr::expr::GroupingSet;
use datafusion_expr::expr::GroupingSet::GroupingSets;
use datafusion_expr::{
abs, acos, array, ascii, asin, atan, atan2, bit_length, btrim, ceil,
character_length, chr, coalesce, concat_expr, concat_ws_expr, cos, date_part,
date_trunc, digest, exp, floor, from_unixtime, left, ln, log10, log2,
character_length, chr, coalesce, concat_expr, concat_ws_expr, cos, date_bin,
date_part, date_trunc, digest, exp, floor, from_unixtime, left, ln, log10, log2,
logical_plan::{PlanType, StringifiedPlan},
lower, lpad, ltrim, md5, now_expr, nullif, octet_length, power, random, regexp_match,
regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim, sha224, sha256,
Expand Down Expand Up @@ -435,6 +435,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
ScalarFunction::NullIf => Self::NullIf,
ScalarFunction::DatePart => Self::DatePart,
ScalarFunction::DateTrunc => Self::DateTrunc,
ScalarFunction::DateBin => Self::DateBin,
ScalarFunction::Md5 => Self::MD5,
ScalarFunction::Sha224 => Self::SHA224,
ScalarFunction::Sha256 => Self::SHA256,
Expand Down Expand Up @@ -1002,6 +1003,11 @@ pub fn parse_expr(
parse_expr(&args[0], registry)?,
parse_expr(&args[1], registry)?,
)),
ScalarFunction::DateBin => Ok(date_bin(
parse_expr(&args[0], registry)?,
parse_expr(&args[1], registry)?,
parse_expr(&args[2], registry)?,
)),
ScalarFunction::Sha224 => Ok(sha224(parse_expr(&args[0], registry)?)),
ScalarFunction::Sha256 => Ok(sha256(parse_expr(&args[0], registry)?)),
ScalarFunction::Sha384 => Ok(sha384(parse_expr(&args[0], registry)?)),
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
BuiltinScalarFunction::NullIf => Self::NullIf,
BuiltinScalarFunction::DatePart => Self::DatePart,
BuiltinScalarFunction::DateTrunc => Self::DateTrunc,
BuiltinScalarFunction::DateBin => Self::DateBin,
BuiltinScalarFunction::MD5 => Self::Md5,
BuiltinScalarFunction::SHA224 => Self::Sha224,
BuiltinScalarFunction::SHA256 => Self::Sha256,
Expand Down

0 comments on commit 2de4f26

Please sign in to comment.