From c5e230ae9aff3a9480ccb6f016fb66fcfa5b6c04 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Tue, 16 Jan 2024 13:45:05 -0500 Subject: [PATCH 01/14] Support to_timestamp with chrono formatting #5398 --- .../examples/dataframe_to_timestamp.rs | 90 +++ datafusion/expr/src/built_in_function.rs | 68 +- datafusion/expr/src/expr_fn.rs | 17 +- .../physical-expr/src/datetime_expressions.rs | 667 ++++++++++++++++-- .../proto/src/logical_plan/from_proto.rs | 29 +- .../sqllogictest/test_files/timestamps.slt | 90 ++- 6 files changed, 825 insertions(+), 136 deletions(-) create mode 100644 datafusion-examples/examples/dataframe_to_timestamp.rs diff --git a/datafusion-examples/examples/dataframe_to_timestamp.rs b/datafusion-examples/examples/dataframe_to_timestamp.rs new file mode 100644 index 000000000000..cc135aa9b363 --- /dev/null +++ b/datafusion-examples/examples/dataframe_to_timestamp.rs @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion::arrow::array::StringArray; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::error::Result; +use datafusion::prelude::*; + +/// This example demonstrates how to use the DataFrame API against in-memory data. +#[tokio::main] +async fn main() -> Result<()> { + // define a schema. + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + ])); + + // define data. + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(vec!["2020-09-08T13:42:29Z", "2020-09-08T13:42:29.190855-05:00", "2020-08-09 12:13:29", "2020-01-02"])), + Arc::new(StringArray::from(vec!["2020-09-08T13:42:29Z", "2020-09-08T13:42:29.190855-05:00", "08-09-2020 13/42/29", "09-27-2020 13:42:29-05:30"])), + ], + )?; + + // declare a new context. In spark API, this corresponds to a new spark SQLsession + let ctx = SessionContext::new(); + + // declare a table in memory. In spark API, this corresponds to createDataFrame(...). + ctx.register_batch("t", batch)?; + let df = ctx.table("t").await?; + + // use to_timestamp function to convert col 'a' to timestamp type using the default parsing + let df = df.with_column("a", to_timestamp(vec![col("a")]))?; + // use to_timestamp_seconds function to convert col 'b' to timestamp(Seconds) type using a list of chrono formats to try + let df = df.with_column("b", to_timestamp_seconds(vec![col("b"), lit("%+"), lit("%d-%m-%Y %H/%M/%S"), lit("%m-%d-%Y %H:%M:%S%#z")]))?; + + let df = df.select_columns(&["a", "b"])?; + + // print the results + df.show().await?; + + // use sql to convert col 'a' to timestamp using the default parsing + let df = ctx.sql("select to_timestamp(a) from t").await?; + + // print the results + df.show().await?; + + // use sql to convert col 'b' to timestamp using a list of chrono formats to try + let df = ctx.sql("select to_timestamp(b, '%+', '%d-%m-%Y %H/%M/%S', '%m-%d-%Y %H:%M:%S%#z') from t").await?; + + // print the results + df.show().await?; + + // use sql to convert a static string to a timestamp using a list of chrono formats to try + let df = ctx.sql("select to_timestamp('01-14-2023 01:01:30+05:30', '%+', '%d-%m-%Y %H/%M/%S', '%m-%d-%Y %H:%M:%S%#z')").await?; + + // print the results + df.show().await?; + + // use sql to convert a static string to a timestamp using a non-matching chrono format to try + let result = ctx.sql("select to_timestamp('01-14-2023 01/01/30', '%d-%m-%Y %H:%M:%S')").await?.collect().await; + + if result.is_err() { + println!("Received the expected error: {:?}", result.err().unwrap()); + } + else { + panic!("timestamp parsing with no matching formats should fail") + } + + Ok(()) +} diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index 6f64642f60d9..6219ffb2dcad 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -1053,67 +1053,13 @@ impl BuiltinScalarFunction { vec![Exact(vec![Utf8, Int64]), Exact(vec![LargeUtf8, Int64])], self.volatility(), ), - BuiltinScalarFunction::ToTimestamp => Signature::uniform( - 1, - vec![ - Int64, - Float64, - Timestamp(Nanosecond, None), - Timestamp(Microsecond, None), - Timestamp(Millisecond, None), - Timestamp(Second, None), - Utf8, - ], - self.volatility(), - ), - BuiltinScalarFunction::ToTimestampMillis => Signature::uniform( - 1, - vec![ - Int64, - Timestamp(Nanosecond, None), - Timestamp(Microsecond, None), - Timestamp(Millisecond, None), - Timestamp(Second, None), - Utf8, - ], - self.volatility(), - ), - BuiltinScalarFunction::ToTimestampMicros => Signature::uniform( - 1, - vec![ - Int64, - Timestamp(Nanosecond, None), - Timestamp(Microsecond, None), - Timestamp(Millisecond, None), - Timestamp(Second, None), - Utf8, - ], - self.volatility(), - ), - BuiltinScalarFunction::ToTimestampNanos => Signature::uniform( - 1, - vec![ - Int64, - Timestamp(Nanosecond, None), - Timestamp(Microsecond, None), - Timestamp(Millisecond, None), - Timestamp(Second, None), - Utf8, - ], - self.volatility(), - ), - BuiltinScalarFunction::ToTimestampSeconds => Signature::uniform( - 1, - vec![ - Int64, - Timestamp(Nanosecond, None), - Timestamp(Microsecond, None), - Timestamp(Millisecond, None), - Timestamp(Second, None), - Utf8, - ], - self.volatility(), - ), + BuiltinScalarFunction::ToTimestamp + | BuiltinScalarFunction::ToTimestampSeconds + | BuiltinScalarFunction::ToTimestampMillis + | BuiltinScalarFunction::ToTimestampMicros + | BuiltinScalarFunction::ToTimestampNanos => { + Signature::variadic_any(self.volatility()) + }, BuiltinScalarFunction::FromUnixtime => { Signature::uniform(1, vec![Int64], self.volatility()) } diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 834420e413b0..68770459eabb 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -885,28 +885,29 @@ nary_scalar_expr!( scalar_expr!(DatePart, date_part, part date, "extracts a subfield from the date"); scalar_expr!(DateTrunc, date_trunc, part date, "truncates the date to a specified level of precision"); scalar_expr!(DateBin, date_bin, stride source origin, "coerces an arbitrary timestamp to the start of the nearest specified interval"); -scalar_expr!( +nary_scalar_expr!( + ToTimestamp, + to_timestamp, + "converts a string to a `Timestamp(Nanoseconds, None)`" +); +nary_scalar_expr!( ToTimestampMillis, to_timestamp_millis, - date, "converts a string to a `Timestamp(Milliseconds, None)`" ); -scalar_expr!( +nary_scalar_expr!( ToTimestampMicros, to_timestamp_micros, - date, "converts a string to a `Timestamp(Microseconds, None)`" ); -scalar_expr!( +nary_scalar_expr!( ToTimestampNanos, to_timestamp_nanos, - date, "converts a string to a `Timestamp(Nanoseconds, None)`" ); -scalar_expr!( +nary_scalar_expr!( ToTimestampSeconds, to_timestamp_seconds, - date, "converts a string to a `Timestamp(Seconds, None)`" ); scalar_expr!( diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs index 589bbc8a952b..fef3a0792b62 100644 --- a/datafusion/physical-expr/src/datetime_expressions.rs +++ b/datafusion/physical-expr/src/datetime_expressions.rs @@ -17,7 +17,6 @@ //! DateTime expressions -use crate::datetime_expressions; use crate::expressions::cast_column; use arrow::compute::cast; use arrow::{ @@ -51,6 +50,83 @@ use datafusion_common::{ use datafusion_expr::ColumnarValue; use std::str::FromStr; use std::sync::Arc; +use arrow_array::GenericStringArray; +use chrono::LocalResult::Single; +use itertools::Either; + +/// Error message if nanosecond conversion request beyond supported interval +const ERR_NANOSECONDS_NOT_SUPPORTED: &str = "The dates that can be represented as nanoseconds have to be between 1677-09-21T00:12:44.0 and 2262-04-11T23:47:16.854775804"; + +/// Accepts a string with a `chrono` format and converts it to a +/// nanosecond precision timestamp. +/// +/// See [`chrono::format::strftime`] for the full set of supported formats. +/// +/// Implements the `to_timestamp` function to convert a string to a +/// timestamp, following the model of spark SQL’s to_`timestamp`. +/// +/// Internally, this function uses the `chrono` library for the +/// datetime parsing +/// +/// ## Timestamp Precision +/// +/// Function uses the maximum precision timestamps supported by +/// Arrow (nanoseconds stored as a 64-bit integer) timestamps. This +/// means the range of dates that timestamps can represent is ~1677 AD +/// to 2262 AM +/// +/// ## Timezone / Offset Handling +/// +/// Numerical values of timestamps are stored compared to offset UTC. +/// +/// Any timestamp in the formatting string is handled according to the rules +/// defined by `chrono`. +/// +/// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html +/// +#[inline] +pub(crate) fn string_to_timestamp_nanos_formatted(s: &str, format: &str) -> Result { + string_to_datetime_formatted(&Utc, s, format)?.naive_utc().timestamp_nanos_opt() + .ok_or_else(|| DataFusionError::Execution(ERR_NANOSECONDS_NOT_SUPPORTED.to_string())) +} + +/// Accepts a string and parses it using the [`chrono::format::strftime`] specifiers +/// relative to the provided `timezone` +/// +/// [IANA timezones] are only supported if the `arrow-array/chrono-tz` feature is enabled +/// +/// * `2023-01-01 040506 America/Los_Angeles` +/// +/// If a timestamp is ambiguous, for example as a result of daylight-savings time, an error +/// will be returned +/// +/// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html +/// [IANA timezones]: https://www.iana.org/time-zones +pub(crate) fn string_to_datetime_formatted(timezone: &T, s: &str, format: &str) -> Result, DataFusionError> { + let err = |err_ctx: &str| + DataFusionError::Execution(format!("Error parsing timestamp from '{s}' using format '{format}': {err_ctx}")); + + // attempt to parse the string assuming it has a timezone + let dt = DateTime::parse_from_str(s, format); + + if let Err(e) = &dt { + // no timezone or other failure, try without a timezone + let ndt = NaiveDateTime::parse_from_str(s, format); + if let Err(e) = &ndt { + return Err(err(&e.to_string())); + } + + if let Single(e) = &timezone.from_local_datetime(&ndt.unwrap()) { + Ok(e.to_owned()) + } + else { + return Err(err(&e.to_string())); + } + } + else { + Ok(dt.unwrap().with_timezone(timezone)) + } +} /// given a function `op` that maps a `&str` to a Result of an arrow native type, /// returns a `PrimitiveArray` after the application @@ -84,7 +160,100 @@ where array.iter().map(|x| x.map(&op).transpose()).collect() } -// given an function that maps a `&str` to a arrow native type, +/// given a function `op` that maps `&str`, `&str` to the first successful Result +/// of an arrow native type, returns a `PrimitiveArray` after the application of the +/// function to `args` and the subsequence application of the `op2` function to any +/// successful result. This function calls the `op` function with the first and second +/// argument and if not successful continues with first and third, first and fourth, +/// etc until the result was successful or no more arguments are present. +/// # Errors +/// This function errors iff: +/// * the number of arguments is not > 1 or +/// * the array arguments are not castable to a `GenericStringArray` or +/// * the function `op` errors for all input +pub(crate) fn strings_to_primitive_function<'a, T, O, F, F2>( + args: &'a [ColumnarValue], + op: F, + op2: F2, + name: &str, +) -> Result> + where + O: ArrowPrimitiveType, + T: OffsetSizeTrait, + F: Fn(&'a str, &'a str) -> Result, + F2: Fn(O::Native) -> O::Native, +{ + if args.len() < 2 { + return internal_err!( + "{:?} args were supplied but {} takes 2 or more arguments", + args.len(), + name + ); + } + + // this will throw the error if any of the array args are not castable to GenericStringArray + let data = args.iter() + .map(|a| { + match a { + ColumnarValue::Array(a) => Ok(Either::Left(as_generic_string_array::(a.as_ref())?)), + ColumnarValue::Scalar(s) => { + match s { + ScalarValue::Utf8(a) | ScalarValue::LargeUtf8(a) => { + Ok(Either::Right(a)) + }, + other => return internal_err!("Unexpected scalar type encountered '{other}' for function '{name}'") + } + } + } + }) + .collect::, &Option>>>>()?; + + let first_arg = &data.get(0).unwrap().left().unwrap(); + + first_arg.iter().enumerate().map(|(pos, x)| { + let mut val = None; + + match x { + Some(x) => { + let param_args = data.iter().skip(1); + + // go through the args and find the first successful result. Only the last + // failure will be returned if no successful result was received. + for param_arg in param_args { + // param_arg is an array, use the corresponding index into the array as the arg + // we're currently parsing + let p = param_arg.clone(); + let r = if p.is_left() { + let p = p.left().unwrap(); + op(x, &p.value(pos)) + } + // args is a scalar, use it directly + else { + if let Some(p) = p.right().unwrap() { + op(x, p.as_str()) + } + else { + continue; + } + }; + + if r.is_ok() { + val = Some(Ok(op2(r.unwrap()))); + break; + } + else { + val = Some(r); + } + } + }, + None => () + } + + val.transpose() + }).collect() +} + +// given an function that maps a `&str` to an arrow native type, // returns a `ColumnarValue` where the function is applied to either a `ArrayRef` or `ScalarValue` // depending on the `args`'s variant. fn handle<'a, O, F, S>( @@ -99,20 +268,13 @@ where { match &args[0] { ColumnarValue::Array(a) => match a.data_type() { - DataType::Utf8 => Ok(ColumnarValue::Array(Arc::new( + DataType::Utf8 | DataType::LargeUtf8 => Ok(ColumnarValue::Array(Arc::new( unary_string_to_primitive_function::(&[a.as_ref()], op, name)?, ))), - DataType::LargeUtf8 => Ok(ColumnarValue::Array(Arc::new( - unary_string_to_primitive_function::(&[a.as_ref()], op, name)?, - ))), other => internal_err!("Unsupported data type {other:?} for function {name}"), }, ColumnarValue::Scalar(scalar) => match scalar { - ScalarValue::Utf8(a) => { - let result = a.as_ref().map(|x| (op)(x)).transpose()?; - Ok(ColumnarValue::Scalar(S::scalar(result))) - } - ScalarValue::LargeUtf8(a) => { + ScalarValue::Utf8(a) | ScalarValue::LargeUtf8(a) => { let result = a.as_ref().map(|x| (op)(x)).transpose()?; Ok(ColumnarValue::Scalar(S::scalar(result))) } @@ -121,58 +283,195 @@ where } } +// given an function that maps a `&str`, `&str` to an arrow native type, +// returns a `ColumnarValue` where the function is applied to either a `ArrayRef` or `ScalarValue` +// depending on the `args`'s variant. +fn handle_multiple<'a, O, F, S, M>( + args: &'a [ColumnarValue], + op: F, + op2: M, + name: &str, +) -> Result + where + O: ArrowPrimitiveType, + S: ScalarType, + F: Fn(&'a str, &'a str) -> Result, + M: Fn(O::Native) -> O::Native, +{ + match &args[0] { + ColumnarValue::Array(a) => match a.data_type() { + DataType::Utf8 | DataType::LargeUtf8 => { + // validate the column types + for (pos, arg) in args.iter().enumerate() { + match arg { + ColumnarValue::Array(arg) => match arg.data_type() { + DataType::Utf8 | DataType::LargeUtf8 => { + // all good + }, + other => return internal_err!("Unsupported data type {other:?} for function {name}, arg # {pos}"), + }, + ColumnarValue::Scalar(arg) => { match arg.data_type() { + DataType::Utf8 | DataType::LargeUtf8 => { + // all good + }, + other => return internal_err!("Unsupported data type {other:?} for function {name}, arg # {pos}"), + }} + } + } + + Ok(ColumnarValue::Array(Arc::new( + strings_to_primitive_function::(args, op, op2, name)?, + ))) + }, + other => return internal_err!("Unsupported data type {other:?} for function {name}"), + }, + // if the first argument is a scalar utf8 all arguments are expected to be scalar utf8 + ColumnarValue::Scalar(scalar) => match scalar { + ScalarValue::Utf8(a) | ScalarValue::LargeUtf8(a) => { + let mut val: Option> = None; + let mut err: Option = None; + + match a { + Some(a) => { + // enumerate all the values finding the first one that returns an Ok result + for (pos, &ref v) in args.iter().enumerate().skip(1) { + if let ColumnarValue::Scalar(s) = v { + if let ScalarValue::Utf8(x) | ScalarValue::LargeUtf8(x) = s { + if let Some(s) = x { + match op(a.as_str(), &s.as_str()) { + Ok(r) => { + val = Some(Ok(ColumnarValue::Scalar(S::scalar(Some(op2(r)))))); + break; + }, + Err(e) => { err = Some(e); }, + } + } + } else { + return internal_err!("Unsupported data type {s:?} for function {name}, arg # {pos}"); + } + } else { + return internal_err!("Unsupported data type {v:?} for function {name}, arg # {pos}"); + } + } + }, + None => (), + } + + if val.is_some() { + val.unwrap() + } else { + Err(err.unwrap()) + } + } + other => return internal_err!("Unsupported data type {other:?} for function {name}"), + }, + } +} + /// Calls string_to_timestamp_nanos and converts the error type fn string_to_timestamp_nanos_shim(s: &str) -> Result { string_to_timestamp_nanos(s).map_err(|e| e.into()) } +/// Calls string_to_timestamp_nanos_formatted and converts the error type +fn string_to_timestamp_nanos_with_format_shim(s: &str, f: &str) -> Result { + string_to_timestamp_nanos_formatted(s, f).map_err(|e| e.into()) +} + /// to_timestamp SQL function /// -/// Note: `to_timestamp` returns `Timestamp(Nanosecond)` though its arguments are interpreted as **seconds**. The supported range for integer input is between `-9223372037` and `9223372036`. +/// Note: `to_timestamp` returns `Timestamp(Nanosecond)` though its arguments are interpreted as **seconds**. +/// The supported range for integer input is between `-9223372037` and `9223372036`. /// Supported range for string input is between `1677-09-21T00:12:44.0` and `2262-04-11T23:47:16.0`. /// Please use `to_timestamp_seconds` for the input outside of supported bounds. pub fn to_timestamp(args: &[ColumnarValue]) -> Result { - handle::( - args, - string_to_timestamp_nanos_shim, - "to_timestamp", - ) + match args.len() { + 1 => handle::( + args, + string_to_timestamp_nanos_shim, + "to_timestamp", + ), + n if n >=2 => handle_multiple::( + args, + string_to_timestamp_nanos_with_format_shim, + |n| n, + "to_timestamp", + ), + _ => internal_err!("Unsupported 0 argument count for function to_timestamp"), + } } /// to_timestamp_millis SQL function pub fn to_timestamp_millis(args: &[ColumnarValue]) -> Result { - handle::( - args, - |s| string_to_timestamp_nanos_shim(s).map(|n| n / 1_000_000), - "to_timestamp_millis", - ) + match args.len() { + 1 => handle::( + args, + |s| string_to_timestamp_nanos_shim(s).map(|n| n / 1_000_000), + "to_timestamp_millis", + ), + n if n >= 2 => handle_multiple::( + args, + string_to_timestamp_nanos_with_format_shim, + |n| n / 1_000_000, + "to_timestamp_millis", + ), + _ => internal_err!("Unsupported 0 argument count for function to_timestamp_millis"), + } } /// to_timestamp_micros SQL function pub fn to_timestamp_micros(args: &[ColumnarValue]) -> Result { - handle::( - args, - |s| string_to_timestamp_nanos_shim(s).map(|n| n / 1_000), - "to_timestamp_micros", - ) + match args.len() { + 1 => handle::( + args, + |s| string_to_timestamp_nanos_shim(s).map(|n| n / 1_000), + "to_timestamp_micros", + ), + n if n >= 2 => handle_multiple::( + args, + string_to_timestamp_nanos_with_format_shim, + |n| n / 1_000, + "to_timestamp_micros", + ), + _ => internal_err!("Unsupported 0 argument count for function to_timestamp_micros"), + } } /// to_timestamp_nanos SQL function pub fn to_timestamp_nanos(args: &[ColumnarValue]) -> Result { - handle::( - args, - string_to_timestamp_nanos_shim, - "to_timestamp_nanos", - ) + match args.len() { + 1 => handle::( + args, + string_to_timestamp_nanos_shim, + "to_timestamp_nanos", + ), + n if n >= 2 => handle_multiple::( + args, + string_to_timestamp_nanos_with_format_shim, + |n| n, + "to_timestamp_nanos", + ), + _ => internal_err!("Unsupported 0 argument count for function to_timestamp_nanos"), + } } /// to_timestamp_seconds SQL function pub fn to_timestamp_seconds(args: &[ColumnarValue]) -> Result { - handle::( - args, - |s| string_to_timestamp_nanos_shim(s).map(|n| n / 1_000_000_000), - "to_timestamp_seconds", - ) + match args.len() { + 1 => handle::( + args, + |s| string_to_timestamp_nanos_shim(s).map(|n| n / 1_000_000_000), + "to_timestamp_seconds", + ), + n if n >= 2 => handle_multiple::( + args, + string_to_timestamp_nanos_with_format_shim, + |n| n / 1_000_000_000, + "to_timestamp_seconds", + ), + _ => internal_err!("Unsupported 0 argument count for function to_timestamp_seconds"), + } + } /// Create an implementation of `now()` that always returns the @@ -915,22 +1214,35 @@ where Ok(b) } -/// to_timestammp() SQL function implementation +/// to_timestamp() SQL function implementation pub fn to_timestamp_invoke(args: &[ColumnarValue]) -> Result { - if args.len() != 1 { + if args.len() == 0 { return internal_err!( - "to_timestamp function requires 1 arguments, got {}", + "to_timestamp function requires 1 or more arguments, got {}", args.len() ); } + // validate that any args after the first one are Utf8 + if args.len() > 1 { + for (idx, a) in args.into_iter().skip(1).enumerate() { + if a.data_type() != DataType::Utf8 { + return internal_err!( + "to_timestamp function unsupported data type at index {}: {}", + idx, + a.data_type() + ); + } + } + } + match args[0].data_type() { - DataType::Int64 => cast_column( + DataType::Int32 | DataType::Int64 => cast_column( &cast_column(&args[0], &DataType::Timestamp(TimeUnit::Second, None), None)?, &DataType::Timestamp(TimeUnit::Nanosecond, None), None, ), - DataType::Float64 => cast_column( + DataType::Null | DataType::Float64 => cast_column( &args[0], &DataType::Timestamp(TimeUnit::Nanosecond, None), None, @@ -940,7 +1252,7 @@ pub fn to_timestamp_invoke(args: &[ColumnarValue]) -> Result { &DataType::Timestamp(TimeUnit::Nanosecond, None), None, ), - DataType::Utf8 => datetime_expressions::to_timestamp(args), + DataType::Utf8 => to_timestamp(args), other => { internal_err!( "Unsupported data type {:?} for function to_timestamp", @@ -952,20 +1264,33 @@ pub fn to_timestamp_invoke(args: &[ColumnarValue]) -> Result { /// to_timestamp_millis() SQL function implementation pub fn to_timestamp_millis_invoke(args: &[ColumnarValue]) -> Result { - if args.len() != 1 { + if args.len() == 0 { return internal_err!( - "to_timestamp_millis function requires 1 argument, got {}", + "to_timestamp_millis function requires 1 or more arguments, got {}", args.len() ); } + // validate that any args after the first one are Utf8 + if args.len() > 1 { + for (idx, a) in args.into_iter().skip(1).enumerate() { + if a.data_type() != DataType::Utf8 { + return internal_err!( + "to_timestamp_millis function unsupported data type at index {}: {}", + idx, + a.data_type() + ); + } + } + } + match args[0].data_type() { - DataType::Int64 | DataType::Timestamp(_, None) => cast_column( + DataType::Null | DataType::Int32 | DataType::Int64 | DataType::Timestamp(_, None) => cast_column( &args[0], &DataType::Timestamp(TimeUnit::Millisecond, None), None, ), - DataType::Utf8 => datetime_expressions::to_timestamp_millis(args), + DataType::Utf8 => to_timestamp_millis(args), other => { internal_err!( "Unsupported data type {:?} for function to_timestamp_millis", @@ -977,20 +1302,33 @@ pub fn to_timestamp_millis_invoke(args: &[ColumnarValue]) -> Result Result { - if args.len() != 1 { + if args.len() == 0 { return internal_err!( - "to_timestamp_micros function requires 1 argument, got {}", + "to_timestamp_micros function requires 1 or more arguments, got {}", args.len() ); } + // validate that any args after the first one are Utf8 + if args.len() > 1 { + for (idx, a) in args.into_iter().skip(1).enumerate() { + if a.data_type() != DataType::Utf8 { + return internal_err!( + "to_timestamp_micros function unsupported data type at index {}: {}", + idx, + a.data_type() + ); + } + } + } + match args[0].data_type() { - DataType::Int64 | DataType::Timestamp(_, None) => cast_column( + DataType::Null | DataType::Int32 | DataType::Int64 | DataType::Timestamp(_, None) => cast_column( &args[0], &DataType::Timestamp(TimeUnit::Microsecond, None), None, ), - DataType::Utf8 => datetime_expressions::to_timestamp_micros(args), + DataType::Utf8 => to_timestamp_micros(args), other => { internal_err!( "Unsupported data type {:?} for function to_timestamp_micros", @@ -1002,20 +1340,33 @@ pub fn to_timestamp_micros_invoke(args: &[ColumnarValue]) -> Result Result { - if args.len() != 1 { + if args.len() == 0 { return internal_err!( - "to_timestamp_nanos function requires 1 argument, got {}", + "to_timestamp_nanos function requires 1 or more arguments, got {}", args.len() ); } + // validate that any args after the first one are Utf8 + if args.len() > 1 { + for (idx, a) in args.into_iter().skip(1).enumerate() { + if a.data_type() != DataType::Utf8 { + return internal_err!( + "to_timestamp_nanos function unsupported data type at index {}: {}", + idx, + a.data_type() + ); + } + } + } + match args[0].data_type() { - DataType::Int64 | DataType::Timestamp(_, None) => cast_column( + DataType::Null | DataType::Int32 | DataType::Int64 | DataType::Timestamp(_, None) => cast_column( &args[0], &DataType::Timestamp(TimeUnit::Nanosecond, None), None, ), - DataType::Utf8 => datetime_expressions::to_timestamp_nanos(args), + DataType::Utf8 => to_timestamp_nanos(args), other => { internal_err!( "Unsupported data type {:?} for function to_timestamp_nanos", @@ -1027,18 +1378,31 @@ pub fn to_timestamp_nanos_invoke(args: &[ColumnarValue]) -> Result Result { - if args.len() != 1 { + if args.len() == 0 { return internal_err!( - "to_timestamp_seconds function requires 1 argument, got {}", + "to_timestamp_seconds function requires 1 or more arguments, got {}", args.len() ); } + // validate that any args after the first one are Utf8 + if args.len() > 1 { + for (idx, a) in args.into_iter().skip(1).enumerate() { + if a.data_type() != DataType::Utf8 { + return internal_err!( + "to_timestamp_seconds function unsupported data type at index {}: {}", + idx, + a.data_type() + ); + } + } + } + match args[0].data_type() { - DataType::Int64 | DataType::Timestamp(_, None) => { + DataType::Null | DataType::Int32 | DataType::Int64 | DataType::Timestamp(_, None) => { cast_column(&args[0], &DataType::Timestamp(TimeUnit::Second, None), None) } - DataType::Utf8 => datetime_expressions::to_timestamp_seconds(args), + DataType::Utf8 => to_timestamp_seconds(args), other => { internal_err!( "Unsupported data type {:?} for function to_timestamp_seconds", @@ -1108,6 +1472,48 @@ mod tests { Ok(()) } + #[test] + fn to_timestamp_with_formats_arrays_and_nulls() -> Result<()> { + // ensure that arrow array implementation is wired up and handles nulls correctly + + let mut date_string_builder = StringBuilder::with_capacity(2, 1024); + let mut format1_builder = StringBuilder::with_capacity(2, 1024); + let mut format2_builder = StringBuilder::with_capacity(2, 1024); + let mut format3_builder = StringBuilder::with_capacity(2, 1024); + let mut ts_builder = TimestampNanosecondArray::builder(2); + + + date_string_builder.append_null(); + format1_builder.append_null(); + format2_builder.append_null(); + format3_builder.append_null(); + ts_builder.append_null(); + + date_string_builder.append_value("2020-09-08T13:42:29.19085Z"); + format1_builder.append_value("%s"); + format2_builder.append_value("%c"); + format3_builder.append_value("%+"); + ts_builder.append_value(1599572549190850000); + + let expected_timestamps = &ts_builder.finish() as &dyn Array; + + let string_array = [ + ColumnarValue::Array(Arc::new(date_string_builder.finish()) as ArrayRef), + ColumnarValue::Array(Arc::new(format1_builder.finish()) as ArrayRef), + ColumnarValue::Array(Arc::new(format2_builder.finish()) as ArrayRef), + ColumnarValue::Array(Arc::new(format3_builder.finish()) as ArrayRef), + ]; + let parsed_timestamps = to_timestamp(&string_array) + .expect("that to_timestamp with format args parsed values without error"); + if let ColumnarValue::Array(parsed_array) = parsed_timestamps { + assert_eq!(parsed_array.len(), 2); + assert_eq!(expected_timestamps, parsed_array.as_ref()); + } else { + panic!("Expected a columnar array") + } + Ok(()) + } + #[test] fn date_trunc_test() { let cases = vec![ @@ -1675,4 +2081,147 @@ mod tests { } Ok(()) } + + #[test] + fn to_timestamp_with_formats_invalid_input_type() -> Result<()> { + // pass the wrong type of input array to to_timestamp and test + // that we get an error. + + let mut builder = Int64Array::builder(1); + builder.append_value(1); + let int64array = [ + ColumnarValue::Array(Arc::new(builder.finish())), + ColumnarValue::Array(Arc::new(builder.finish())) + ]; + + let expected_err = + "Internal error: Unsupported data type Int64 for function to_timestamp"; + match to_timestamp(&int64array) { + Ok(_) => panic!("Expected error but got success"), + Err(e) => { + assert!( + e.to_string().contains(expected_err), + "Can not find expected error '{expected_err}'. Actual error '{e}'" + ); + } + } + Ok(()) + } + + #[test] + fn to_timestamp_with_unparseable_data() -> Result<()> { + let mut date_string_builder = StringBuilder::with_capacity(2, 1024); + + date_string_builder.append_null(); + + date_string_builder.append_value("2020-09-08 - 13:42:29.19085Z"); + + let string_array = ColumnarValue::Array(Arc::new(date_string_builder.finish()) as ArrayRef); + + let expected_err = + "Arrow error: Parser error: Error parsing timestamp from '2020-09-08 - 13:42:29.19085Z': error parsing time"; + match to_timestamp(&[string_array]) { + Ok(_) => panic!("Expected error but got success"), + Err(e) => { + assert!( + e.to_string().contains(expected_err), + "Can not find expected error '{expected_err}'. Actual error '{e}'" + ); + } + } + Ok(()) + } + + #[test] + fn to_timestamp_with_no_matching_formats() -> Result<()> { + let mut date_string_builder = StringBuilder::with_capacity(2, 1024); + let mut format1_builder = StringBuilder::with_capacity(2, 1024); + let mut format2_builder = StringBuilder::with_capacity(2, 1024); + let mut format3_builder = StringBuilder::with_capacity(2, 1024); + + date_string_builder.append_null(); + format1_builder.append_null(); + format2_builder.append_null(); + format3_builder.append_null(); + + date_string_builder.append_value("2020-09-08T13:42:29.19085Z"); + format1_builder.append_value("%s"); + format2_builder.append_value("%c"); + format3_builder.append_value("%H:%M:%S"); + + let string_array = [ + ColumnarValue::Array(Arc::new(date_string_builder.finish()) as ArrayRef), + ColumnarValue::Array(Arc::new(format1_builder.finish()) as ArrayRef), + ColumnarValue::Array(Arc::new(format2_builder.finish()) as ArrayRef), + ColumnarValue::Array(Arc::new(format3_builder.finish()) as ArrayRef), + ]; + + let expected_err = + "Execution error: Error parsing timestamp from '2020-09-08T13:42:29.19085Z' using format '%H:%M:%S': input contains invalid characters"; + match to_timestamp(&string_array) { + Ok(_) => panic!("Expected error but got success"), + Err(e) => { + assert!( + e.to_string().contains(expected_err), + "Can not find expected error '{expected_err}'. Actual error '{e}'" + ); + } + } + Ok(()) + } + + #[test] + fn string_to_timestamp_formatted() { + // Explicit timezone + assert_eq!( + 1599572549190855000, + parse_timestamp_formatted("2020-09-08T13:42:29.190855+00:00", "%+").unwrap() + ); + assert_eq!( + 1599572549190855000, + parse_timestamp_formatted("2020-09-08T13:42:29.190855Z", "%+").unwrap() + ); + assert_eq!( + 1599572549000000000, + parse_timestamp_formatted("2020-09-08T13:42:29Z", "%+").unwrap() + ); // no fractional part + assert_eq!( + 1599590549190855000, + parse_timestamp_formatted("2020-09-08T13:42:29.190855-05:00", "%+").unwrap() + ); + assert_eq!( + 1599590549000000000, + parse_timestamp_formatted("1599590549", "%s").unwrap() + ); + assert_eq!( + 1599572549000000000, + parse_timestamp_formatted("09-08-2020 13/42/29", "%m-%d-%Y %H/%M/%S").unwrap() + ); + } + + fn parse_timestamp_formatted(s: &str, format: & str) -> Result { + let result = string_to_timestamp_nanos_formatted(s, format); + if let Err(e) = &result { + eprintln!("Error parsing timestamp '{s}' using format '{format}': {e:?}"); + } + result + } + + #[test] + fn string_to_timestamp_formatted_invalid() { + // Test parsing invalid formats + let cases = [ + ("", "%Y%m%d %H%M%S", "premature end of input"), + ("SS", "%c", "premature end of input"), + ("Wed, 18 Feb 2015 23:16:09 GMT", "", "trailing input"), + ("Wed, 18 Feb 2015 23:16:09 GMT", "%XX", "input contains invalid characters"), + ("Wed, 18 Feb 2015 23:16:09 GMT", "%Y%m%d %H%M%S", "input contains invalid characters"), + ]; + + for (s, f, ctx) in cases { + let expected = format!("Execution error: Error parsing timestamp from '{s}' using format '{f}': {ctx}"); + let actual = string_to_datetime_formatted(&Utc, s, f).unwrap_err().to_string(); + assert_eq!(actual, expected) + } + } } diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index d15cf1db928b..d800caa5fb11 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -1692,17 +1692,35 @@ pub fn parse_expr( parse_expr(&args[1], registry)?, )), ScalarFunction::ToHex => Ok(to_hex(parse_expr(&args[0], registry)?)), + ScalarFunction::ToTimestamp => { + Ok(to_timestamp_seconds(args.to_owned() + .iter() + .map(|expr| parse_expr(expr, registry)) + .collect::, _>>()?)) + } ScalarFunction::ToTimestampMillis => { - Ok(to_timestamp_millis(parse_expr(&args[0], registry)?)) + Ok(to_timestamp_millis(args.to_owned() + .iter() + .map(|expr| parse_expr(expr, registry)) + .collect::, _>>()?)) } ScalarFunction::ToTimestampMicros => { - Ok(to_timestamp_micros(parse_expr(&args[0], registry)?)) + Ok(to_timestamp_micros(args.to_owned() + .iter() + .map(|expr| parse_expr(expr, registry)) + .collect::, _>>()?)) } ScalarFunction::ToTimestampNanos => { - Ok(to_timestamp_nanos(parse_expr(&args[0], registry)?)) + Ok(to_timestamp_nanos(args.to_owned() + .iter() + .map(|expr| parse_expr(expr, registry)) + .collect::, _>>()?)) } ScalarFunction::ToTimestampSeconds => { - Ok(to_timestamp_seconds(parse_expr(&args[0], registry)?)) + Ok(to_timestamp_seconds(args.to_owned() + .iter() + .map(|expr| parse_expr(expr, registry)) + .collect::, _>>()?)) } ScalarFunction::Now => Ok(now()), ScalarFunction::Translate => Ok(translate( @@ -1744,9 +1762,6 @@ pub fn parse_expr( ScalarFunction::ArrowTypeof => { Ok(arrow_typeof(parse_expr(&args[0], registry)?)) } - ScalarFunction::ToTimestamp => { - Ok(to_timestamp_seconds(parse_expr(&args[0], registry)?)) - } ScalarFunction::Flatten => Ok(flatten(parse_expr(&args[0], registry)?)), ScalarFunction::StringToArray => Ok(string_to_array( parse_expr(&args[0], registry)?, diff --git a/datafusion/sqllogictest/test_files/timestamps.slt b/datafusion/sqllogictest/test_files/timestamps.slt index 2ab3dbdac61b..8069177a0416 100644 --- a/datafusion/sqllogictest/test_files/timestamps.slt +++ b/datafusion/sqllogictest/test_files/timestamps.slt @@ -331,6 +331,35 @@ SELECT COUNT(*) FROM ts_data_secs where ts > to_timestamp_seconds('2020-09-08T12 ---- 2 +# to_timestamp with formatting +query I +SELECT COUNT(*) FROM ts_data_nanos where ts > to_timestamp('2020-09-08T12:00:00+00:00', '2020-09-08 12/00/00+00:00', '%c', '%+', '%Y-%m-%d %H/%M/%s%#z') +---- +2 + +# to_timestamp_nanos with formatting +query I +SELECT COUNT(*) FROM ts_data_nanos where ts > to_timestamp_nanos('2020-09-08 12/00/00+00:00', '%c', '%+', '%Y-%m-%d %H/%M/%S%#z') +---- +2 + +# to_timestamp_millis with formatting +query I +SELECT COUNT(*) FROM ts_data_millis where ts > to_timestamp_millis('2020-09-08 12/00/00+00:00', '%c', '%+', '%Y-%m-%d %H/%M/%S%#z') +---- +2 + +# to_timestamp_micros with formatting +query I +SELECT COUNT(*) FROM ts_data_micros where ts > to_timestamp_micros('2020-09-08 12/00/00+00:00', '%c', '%+', '%Y-%m-%d %H/%M/%S%#z') +---- +2 + +# to_timestamp_seconds with formatting +query I +SELECT COUNT(*) FROM ts_data_secs where ts > to_timestamp_seconds('2020-09-08 12/00/00+00:00', '%c', '%+', '%Y-%m-%d %H/%M/%S%#z') +---- +2 # to_timestamp float inputs @@ -1880,7 +1909,7 @@ SELECT to_timestamp(null), to_timestamp(0), to_timestamp(1926632005), to_timesta ---- NULL 1970-01-01T00:00:00 2031-01-19T23:33:25 1970-01-01T00:00:01 1969-12-31T23:59:59 1969-12-31T23:59:59 -# verify timestamp syntax stlyes are consistent +# verify timestamp syntax styles are consistent query BBBBBBBBBBBBB SELECT to_timestamp(null) is null as c1, null::timestamp is null as c2, @@ -1922,6 +1951,65 @@ true true true true true true #---- #0001-04-25T00:00:00 +63022-07-16T12:59:37 0001-04-25T00:00:00 +63022-07-16T12:59:37 0001-04-25T00:00:00 +63022-07-16T12:59:37 +# verify timestamp data with formatting options +query PPPPPP +SELECT to_timestamp(null, '%+'), to_timestamp(0, '%s'), to_timestamp(1926632005, '%s'), to_timestamp(1, '%+', '%s'), to_timestamp(-1, '%c', '%+', '%s'), to_timestamp(0-1, '%c', '%+', '%s') +---- +NULL 1970-01-01T00:00:00 2031-01-19T23:33:25 1970-01-01T00:00:01 1969-12-31T23:59:59 1969-12-31T23:59:59 + +# verify timestamp data with formatting options +query PPPPPP +SELECT to_timestamp(null, '%+'), to_timestamp(0, '%s'), to_timestamp(1926632005, '%s'), to_timestamp(1, '%+', '%s'), to_timestamp(-1, '%c', '%+', '%s'), to_timestamp(0-1, '%c', '%+', '%s') +---- +NULL 1970-01-01T00:00:00 2031-01-19T23:33:25 1970-01-01T00:00:01 1969-12-31T23:59:59 1969-12-31T23:59:59 + +# verify timestamp output types with formatting options +query TTT +SELECT arrow_typeof(to_timestamp(1, '%c', '%s')), arrow_typeof(to_timestamp(null, '%+', '%s')), arrow_typeof(to_timestamp('2023-01-10 12:34:56.000', '%Y-%m-%d %H:%M:%S%.f')) +---- +Timestamp(Nanosecond, None) Timestamp(Nanosecond, None) Timestamp(Nanosecond, None) + +# to_timestamp with invalid formatting +query error input contains invalid characters +SELECT to_timestamp('2020-09-08 12/00/00+00:00', '%c', '%+') + +# to_timestamp_nanos with invalid formatting +query error input contains invalid characters +SELECT to_timestamp_nanos('2020-09-08 12/00/00+00:00', '%c', '%+') + +# to_timestamp_millis with invalid formatting +query error input contains invalid characters +SELECT to_timestamp_millis('2020-09-08 12/00/00+00:00', '%c', '%+') + +# to_timestamp_micros with invalid formatting +query error input contains invalid characters +SELECT to_timestamp_micros('2020-09-08 12/00/00+00:00', '%c', '%+') + +# to_timestamp_seconds with invalid formatting +query error input contains invalid characters +SELECT to_timestamp_seconds('2020-09-08 12/00/00+00:00', '%c', '%+') + +# to_timestamp with broken formatting +query error bad or unsupported format string +SELECT to_timestamp('2020-09-08 12/00/00+00:00', '%q') + +# to_timestamp_nanos with broken formatting +query error bad or unsupported format string +SELECT to_timestamp_nanos('2020-09-08 12/00/00+00:00', '%q') + +# to_timestamp_millis with broken formatting +query error bad or unsupported format string +SELECT to_timestamp_millis('2020-09-08 12/00/00+00:00', '%q') + +# to_timestamp_micros with broken formatting +query error bad or unsupported format string +SELECT to_timestamp_micros('2020-09-08 12/00/00+00:00', '%q') + +# to_timestamp_seconds with broken formatting +query error bad or unsupported format string +SELECT to_timestamp_seconds('2020-09-08 12/00/00+00:00', '%q') + + ########## ## Test binary temporal coercion for Date and Timestamp ########## From a48ab54798c44c9e2e2b567acbe281b85bc692a5 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Tue, 16 Jan 2024 14:45:22 -0500 Subject: [PATCH 02/14] Updated user guide's to_timestamp to include chrono formatting information #5398 --- .../source/user-guide/sql/scalar_functions.md | 46 ++++++++++++++----- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index 9dd008f8fc44..9d16393ecf3c 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -1471,84 +1471,106 @@ extract(field FROM source) Converts a value to a timestamp (`YYYY-MM-DDT00:00:00Z`). Supports strings, integer, unsigned integer, and double types as input. -Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') +Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono formats] are provided. Integers, unsigned integers, and doubles are interpreted as seconds since the unix epoch (`1970-01-01T00:00:00Z`) return the corresponding timestamp. Note: `to_timestamp` returns `Timestamp(Nanosecond)`. The supported range for integer input is between `-9223372037` and `9223372036`. -Supported range for string input is between `1677-09-21T00:12:44.0` and `2262-04-11T23:47:16.0`. Please use `to_timestamp_seconds` for the input outside of supported bounds. +Supported range for string input is between `1677-09-21T00:12:44.0` and `2262-04-11T23:47:16.0`. Please use `to_timestamp_seconds` +for the input outside of supported bounds. ``` -to_timestamp(expression) +to_timestamp(expression[, ..., format_n]) ``` #### Arguments - **expression**: Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators. +- **format_n**: Optional [Chrono format] strings to use to parse the expression. Formats will be tried in the order + they appear with the first successful one being returned. If none of the formats successfully parse the expression + an error will be returned. + +[Chrono format]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html ### `to_timestamp_millis` Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000Z`). Supports strings, integer, and unsigned integer types as input. -Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') +Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono format]s are provided. Integers and unsigned integers are interpreted as milliseconds since the unix epoch (`1970-01-01T00:00:00Z`) return the corresponding timestamp. ``` -to_timestamp_millis(expression) +to_timestamp_millis(expression[, ..., format_n]) ``` #### Arguments - **expression**: Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators. +- **format_n**: Optional [Chrono format] strings to use to parse the expression. Formats will be tried in the order + they appear with the first successful one being returned. If none of the formats successfully parse the expression + an error will be returned. ### `to_timestamp_micros` Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000000Z`). Supports strings, integer, and unsigned integer types as input. -Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') +Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono format]s are provided. Integers and unsigned integers are interpreted as microseconds since the unix epoch (`1970-01-01T00:00:00Z`) return the corresponding timestamp. ``` -to_timestamp_nanos(expression) +to_timestamp_nanos(expression[, ..., format_n]) ``` +#### Arguments + +- **expression**: Expression to operate on. + Can be a constant, column, or function, and any combination of arithmetic operators. +- **format_n**: Optional [Chrono format] strings to use to parse the expression. Formats will be tried in the order + they appear with the first successful one being returned. If none of the formats successfully parse the expression + an error will be returned. + ### `to_timestamp_nanos` Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000000000Z`). Supports strings, integer, and unsigned integer types as input. -Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') +Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono format]s are provided. Integers and unsigned integers are interpreted as nanoseconds since the unix epoch (`1970-01-01T00:00:00Z`) return the corresponding timestamp. ``` -to_timestamp_nanos(expression) +to_timestamp_nanos(expression[, ..., format_n]) ``` #### Arguments - **expression**: Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators. - +- **format_n**: Optional [Chrono format] strings to use to parse the expression. Formats will be tried in the order + they appear with the first successful one being returned. If none of the formats successfully parse the expression + an error will be returned. ### `to_timestamp_seconds` Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000Z`). Supports strings, integer, and unsigned integer types as input. -Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') +Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono format]s are provided. Integers and unsigned integers are interpreted as seconds since the unix epoch (`1970-01-01T00:00:00Z`) return the corresponding timestamp. ``` -to_timestamp_seconds(expression) +to_timestamp_seconds(expression[, ..., format_n]) ``` #### Arguments - **expression**: Expression to operate on. Can be a constant, column, or function, and any combination of arithmetic operators. +- **format_n**: Optional [Chrono format] strings to use to parse the expression. Formats will be tried in the order + they appear with the first successful one being returned. If none of the formats successfully parse the expression + an error will be returned. ### `from_unixtime` From 312353912f49993483f2acec48955c8bb725f8a4 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Tue, 16 Jan 2024 14:55:38 -0500 Subject: [PATCH 03/14] Minor comment update. --- datafusion-examples/examples/dataframe_to_timestamp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-examples/examples/dataframe_to_timestamp.rs b/datafusion-examples/examples/dataframe_to_timestamp.rs index cc135aa9b363..40cb17a897f4 100644 --- a/datafusion-examples/examples/dataframe_to_timestamp.rs +++ b/datafusion-examples/examples/dataframe_to_timestamp.rs @@ -23,7 +23,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::Result; use datafusion::prelude::*; -/// This example demonstrates how to use the DataFrame API against in-memory data. +/// This example demonstrates how to use the to_timestamp function in the DataFrame API as well as via sql. #[tokio::main] async fn main() -> Result<()> { // define a schema. From 573456dd5cded67a6a5c5d40298d7bc119be6a5f Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Tue, 16 Jan 2024 15:04:38 -0500 Subject: [PATCH 04/14] Small documentation updates for to_timestamp functions. --- .../source/user-guide/sql/scalar_functions.md | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index 9d16393ecf3c..9faf39627f7b 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -1472,8 +1472,8 @@ extract(field FROM source) Converts a value to a timestamp (`YYYY-MM-DDT00:00:00Z`). Supports strings, integer, unsigned integer, and double types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono formats] are provided. -Integers, unsigned integers, and doubles are interpreted as seconds since the unix epoch (`1970-01-01T00:00:00Z`) -return the corresponding timestamp. +Integers, unsigned integers, and doubles are interpreted as seconds since the unix epoch (`1970-01-01T00:00:00Z`). +Returns the corresponding timestamp. Note: `to_timestamp` returns `Timestamp(Nanosecond)`. The supported range for integer input is between `-9223372037` and `9223372036`. Supported range for string input is between `1677-09-21T00:12:44.0` and `2262-04-11T23:47:16.0`. Please use `to_timestamp_seconds` @@ -1498,8 +1498,8 @@ to_timestamp(expression[, ..., format_n]) Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000Z`). Supports strings, integer, and unsigned integer types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono format]s are provided. -Integers and unsigned integers are interpreted as milliseconds since the unix epoch (`1970-01-01T00:00:00Z`) -return the corresponding timestamp. +Integers and unsigned integers are interpreted as milliseconds since the unix epoch (`1970-01-01T00:00:00Z`). +Returns the corresponding timestamp. ``` to_timestamp_millis(expression[, ..., format_n]) @@ -1519,10 +1519,10 @@ Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000000Z`). Supports strings, integer, and unsigned integer types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono format]s are provided. Integers and unsigned integers are interpreted as microseconds since the unix epoch (`1970-01-01T00:00:00Z`) -return the corresponding timestamp. +Returns the corresponding timestamp. ``` -to_timestamp_nanos(expression[, ..., format_n]) +to_timestamp_micros(expression[, ..., format_n]) ``` #### Arguments @@ -1538,8 +1538,8 @@ to_timestamp_nanos(expression[, ..., format_n]) Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000000000Z`). Supports strings, integer, and unsigned integer types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono format]s are provided. -Integers and unsigned integers are interpreted as nanoseconds since the unix epoch (`1970-01-01T00:00:00Z`) -return the corresponding timestamp. +Integers and unsigned integers are interpreted as nanoseconds since the unix epoch (`1970-01-01T00:00:00Z`). +Returns the corresponding timestamp. ``` to_timestamp_nanos(expression[, ..., format_n]) @@ -1557,8 +1557,8 @@ to_timestamp_nanos(expression[, ..., format_n]) Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000Z`). Supports strings, integer, and unsigned integer types as input. Strings are parsed as RFC3339 (e.g. '2023-07-20T05:44:00') if no [Chrono format]s are provided. -Integers and unsigned integers are interpreted as seconds since the unix epoch (`1970-01-01T00:00:00Z`) -return the corresponding timestamp. +Integers and unsigned integers are interpreted as seconds since the unix epoch (`1970-01-01T00:00:00Z`). +Returns the corresponding timestamp. ``` to_timestamp_seconds(expression[, ..., format_n]) From a78a85aab8e263ac24ddd453aa625ccd52fbfc29 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Tue, 16 Jan 2024 17:32:00 -0500 Subject: [PATCH 05/14] Cargo fmt and clippy improvements. --- .../examples/dataframe_to_timestamp.rs | 33 +- datafusion/expr/src/built_in_function.rs | 2 +- .../physical-expr/src/datetime_expressions.rs | 304 +++++++++++------- .../proto/src/logical_plan/from_proto.rs | 40 +-- 4 files changed, 230 insertions(+), 149 deletions(-) diff --git a/datafusion-examples/examples/dataframe_to_timestamp.rs b/datafusion-examples/examples/dataframe_to_timestamp.rs index 40cb17a897f4..b988378cafee 100644 --- a/datafusion-examples/examples/dataframe_to_timestamp.rs +++ b/datafusion-examples/examples/dataframe_to_timestamp.rs @@ -36,8 +36,18 @@ async fn main() -> Result<()> { let batch = RecordBatch::try_new( schema, vec![ - Arc::new(StringArray::from(vec!["2020-09-08T13:42:29Z", "2020-09-08T13:42:29.190855-05:00", "2020-08-09 12:13:29", "2020-01-02"])), - Arc::new(StringArray::from(vec!["2020-09-08T13:42:29Z", "2020-09-08T13:42:29.190855-05:00", "08-09-2020 13/42/29", "09-27-2020 13:42:29-05:30"])), + Arc::new(StringArray::from(vec![ + "2020-09-08T13:42:29Z", + "2020-09-08T13:42:29.190855-05:00", + "2020-08-09 12:13:29", + "2020-01-02", + ])), + Arc::new(StringArray::from(vec![ + "2020-09-08T13:42:29Z", + "2020-09-08T13:42:29.190855-05:00", + "08-09-2020 13/42/29", + "09-27-2020 13:42:29-05:30", + ])), ], )?; @@ -51,7 +61,15 @@ async fn main() -> Result<()> { // use to_timestamp function to convert col 'a' to timestamp type using the default parsing let df = df.with_column("a", to_timestamp(vec![col("a")]))?; // use to_timestamp_seconds function to convert col 'b' to timestamp(Seconds) type using a list of chrono formats to try - let df = df.with_column("b", to_timestamp_seconds(vec![col("b"), lit("%+"), lit("%d-%m-%Y %H/%M/%S"), lit("%m-%d-%Y %H:%M:%S%#z")]))?; + let df = df.with_column( + "b", + to_timestamp_seconds(vec![ + col("b"), + lit("%+"), + lit("%d-%m-%Y %H/%M/%S"), + lit("%m-%d-%Y %H:%M:%S%#z"), + ]), + )?; let df = df.select_columns(&["a", "b"])?; @@ -77,12 +95,15 @@ async fn main() -> Result<()> { df.show().await?; // use sql to convert a static string to a timestamp using a non-matching chrono format to try - let result = ctx.sql("select to_timestamp('01-14-2023 01/01/30', '%d-%m-%Y %H:%M:%S')").await?.collect().await; + let result = ctx + .sql("select to_timestamp('01-14-2023 01/01/30', '%d-%m-%Y %H:%M:%S')") + .await? + .collect() + .await; if result.is_err() { println!("Received the expected error: {:?}", result.err().unwrap()); - } - else { + } else { panic!("timestamp parsing with no matching formats should fail") } diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index 6219ffb2dcad..b54cd68164c1 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -1059,7 +1059,7 @@ impl BuiltinScalarFunction { | BuiltinScalarFunction::ToTimestampMicros | BuiltinScalarFunction::ToTimestampNanos => { Signature::variadic_any(self.volatility()) - }, + } BuiltinScalarFunction::FromUnixtime => { Signature::uniform(1, vec![Int64], self.volatility()) } diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs index fef3a0792b62..190f4d26623f 100644 --- a/datafusion/physical-expr/src/datetime_expressions.rs +++ b/datafusion/physical-expr/src/datetime_expressions.rs @@ -36,7 +36,9 @@ use arrow::{ use arrow_array::temporal_conversions::NANOSECONDS; use arrow_array::timezone::Tz; use arrow_array::types::ArrowTimestampType; +use arrow_array::GenericStringArray; use chrono::prelude::*; +use chrono::LocalResult::Single; use chrono::{Duration, Months, NaiveDate}; use datafusion_common::cast::{ as_date32_array, as_date64_array, as_generic_string_array, as_primitive_array, @@ -48,11 +50,9 @@ use datafusion_common::{ ScalarValue, }; use datafusion_expr::ColumnarValue; +use itertools::Either; use std::str::FromStr; use std::sync::Arc; -use arrow_array::GenericStringArray; -use chrono::LocalResult::Single; -use itertools::Either; /// Error message if nanosecond conversion request beyond supported interval const ERR_NANOSECONDS_NOT_SUPPORTED: &str = "The dates that can be represented as nanoseconds have to be between 1677-09-21T00:12:44.0 and 2262-04-11T23:47:16.854775804"; @@ -85,9 +85,16 @@ const ERR_NANOSECONDS_NOT_SUPPORTED: &str = "The dates that can be represented a /// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html /// #[inline] -pub(crate) fn string_to_timestamp_nanos_formatted(s: &str, format: &str) -> Result { - string_to_datetime_formatted(&Utc, s, format)?.naive_utc().timestamp_nanos_opt() - .ok_or_else(|| DataFusionError::Execution(ERR_NANOSECONDS_NOT_SUPPORTED.to_string())) +pub(crate) fn string_to_timestamp_nanos_formatted( + s: &str, + format: &str, +) -> Result { + string_to_datetime_formatted(&Utc, s, format)? + .naive_utc() + .timestamp_nanos_opt() + .ok_or_else(|| { + DataFusionError::Execution(ERR_NANOSECONDS_NOT_SUPPORTED.to_string()) + }) } /// Accepts a string and parses it using the [`chrono::format::strftime`] specifiers @@ -102,9 +109,16 @@ pub(crate) fn string_to_timestamp_nanos_formatted(s: &str, format: &str) -> Resu /// /// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html /// [IANA timezones]: https://www.iana.org/time-zones -pub(crate) fn string_to_datetime_formatted(timezone: &T, s: &str, format: &str) -> Result, DataFusionError> { - let err = |err_ctx: &str| - DataFusionError::Execution(format!("Error parsing timestamp from '{s}' using format '{format}': {err_ctx}")); +pub(crate) fn string_to_datetime_formatted( + timezone: &T, + s: &str, + format: &str, +) -> Result, DataFusionError> { + let err = |err_ctx: &str| { + DataFusionError::Execution(format!( + "Error parsing timestamp from '{s}' using format '{format}': {err_ctx}" + )) + }; // attempt to parse the string assuming it has a timezone let dt = DateTime::parse_from_str(s, format); @@ -118,12 +132,10 @@ pub(crate) fn string_to_datetime_formatted(timezone: &T, s: &str, f if let Single(e) = &timezone.from_local_datetime(&ndt.unwrap()) { Ok(e.to_owned()) + } else { + Err(err(&e.to_string())) } - else { - return Err(err(&e.to_string())); - } - } - else { + } else { Ok(dt.unwrap().with_timezone(timezone)) } } @@ -177,11 +189,11 @@ pub(crate) fn strings_to_primitive_function<'a, T, O, F, F2>( op2: F2, name: &str, ) -> Result> - where - O: ArrowPrimitiveType, - T: OffsetSizeTrait, - F: Fn(&'a str, &'a str) -> Result, - F2: Fn(O::Native) -> O::Native, +where + O: ArrowPrimitiveType, + T: OffsetSizeTrait, + F: Fn(&'a str, &'a str) -> Result, + F2: Fn(O::Native) -> O::Native, { if args.len() < 2 { return internal_err!( @@ -192,29 +204,30 @@ pub(crate) fn strings_to_primitive_function<'a, T, O, F, F2>( } // this will throw the error if any of the array args are not castable to GenericStringArray - let data = args.iter() - .map(|a| { - match a { - ColumnarValue::Array(a) => Ok(Either::Left(as_generic_string_array::(a.as_ref())?)), - ColumnarValue::Scalar(s) => { - match s { - ScalarValue::Utf8(a) | ScalarValue::LargeUtf8(a) => { - Ok(Either::Right(a)) - }, - other => return internal_err!("Unexpected scalar type encountered '{other}' for function '{name}'") - } - } + let data = args + .iter() + .map(|a| match a { + ColumnarValue::Array(a) => { + Ok(Either::Left(as_generic_string_array::(a.as_ref())?)) } + ColumnarValue::Scalar(s) => match s { + ScalarValue::Utf8(a) | ScalarValue::LargeUtf8(a) => Ok(Either::Right(a)), + other => internal_err!( + "Unexpected scalar type encountered '{other}' for function '{name}'" + ), + }, }) .collect::, &Option>>>>()?; - let first_arg = &data.get(0).unwrap().left().unwrap(); + let first_arg = &data.first().unwrap().left().unwrap(); - first_arg.iter().enumerate().map(|(pos, x)| { - let mut val = None; + first_arg + .iter() + .enumerate() + .map(|(pos, x)| { + let mut val = None; - match x { - Some(x) => { + if let Some(x) = x { let param_args = data.iter().skip(1); // go through the args and find the first successful result. Only the last @@ -222,35 +235,30 @@ pub(crate) fn strings_to_primitive_function<'a, T, O, F, F2>( for param_arg in param_args { // param_arg is an array, use the corresponding index into the array as the arg // we're currently parsing - let p = param_arg.clone(); + let p = *param_arg; let r = if p.is_left() { let p = p.left().unwrap(); - op(x, &p.value(pos)) + op(x, p.value(pos)) } // args is a scalar, use it directly - else { - if let Some(p) = p.right().unwrap() { - op(x, p.as_str()) - } - else { - continue; - } + else if let Some(p) = p.right().unwrap() { + op(x, p.as_str()) + } else { + continue; }; if r.is_ok() { val = Some(Ok(op2(r.unwrap()))); break; - } - else { + } else { val = Some(r); } } - }, - None => () - } + }; - val.transpose() - }).collect() + val.transpose() + }) + .collect() } // given an function that maps a `&str` to an arrow native type, @@ -292,11 +300,11 @@ fn handle_multiple<'a, O, F, S, M>( op2: M, name: &str, ) -> Result - where - O: ArrowPrimitiveType, - S: ScalarType, - F: Fn(&'a str, &'a str) -> Result, - M: Fn(O::Native) -> O::Native, +where + O: ArrowPrimitiveType, + S: ScalarType, + F: Fn(&'a str, &'a str) -> Result, + M: Fn(O::Native) -> O::Native, { match &args[0] { ColumnarValue::Array(a) => match a.data_type() { @@ -322,8 +330,12 @@ fn handle_multiple<'a, O, F, S, M>( Ok(ColumnarValue::Array(Arc::new( strings_to_primitive_function::(args, op, op2, name)?, ))) - }, - other => return internal_err!("Unsupported data type {other:?} for function {name}"), + } + other => { + internal_err!( + "Unsupported data type {other:?} for function {name}" + ) + } }, // if the first argument is a scalar utf8 all arguments are expected to be scalar utf8 ColumnarValue::Scalar(scalar) => match scalar { @@ -334,16 +346,22 @@ fn handle_multiple<'a, O, F, S, M>( match a { Some(a) => { // enumerate all the values finding the first one that returns an Ok result - for (pos, &ref v) in args.iter().enumerate().skip(1) { + for (pos, v) in args.iter().enumerate().skip(1) { if let ColumnarValue::Scalar(s) = v { - if let ScalarValue::Utf8(x) | ScalarValue::LargeUtf8(x) = s { + if let ScalarValue::Utf8(x) | ScalarValue::LargeUtf8(x) = + s + { if let Some(s) = x { - match op(a.as_str(), &s.as_str()) { + match op(a.as_str(), s.as_str()) { Ok(r) => { - val = Some(Ok(ColumnarValue::Scalar(S::scalar(Some(op2(r)))))); + val = Some(Ok(ColumnarValue::Scalar( + S::scalar(Some(op2(r))), + ))); break; - }, - Err(e) => { err = Some(e); }, + } + Err(e) => { + err = Some(e); + } } } } else { @@ -353,17 +371,21 @@ fn handle_multiple<'a, O, F, S, M>( return internal_err!("Unsupported data type {v:?} for function {name}, arg # {pos}"); } } - }, + } None => (), } - if val.is_some() { - val.unwrap() + if let Some(v) = val { + v } else { Err(err.unwrap()) } } - other => return internal_err!("Unsupported data type {other:?} for function {name}"), + other => { + internal_err!( + "Unsupported data type {other:?} for function {name}" + ) + } }, } } @@ -375,7 +397,7 @@ fn string_to_timestamp_nanos_shim(s: &str) -> Result { /// Calls string_to_timestamp_nanos_formatted and converts the error type fn string_to_timestamp_nanos_with_format_shim(s: &str, f: &str) -> Result { - string_to_timestamp_nanos_formatted(s, f).map_err(|e| e.into()) + string_to_timestamp_nanos_formatted(s, f) } /// to_timestamp SQL function @@ -387,16 +409,18 @@ fn string_to_timestamp_nanos_with_format_shim(s: &str, f: &str) -> Result { pub fn to_timestamp(args: &[ColumnarValue]) -> Result { match args.len() { 1 => handle::( - args, - string_to_timestamp_nanos_shim, - "to_timestamp", - ), - n if n >=2 => handle_multiple::( args, - string_to_timestamp_nanos_with_format_shim, - |n| n, + string_to_timestamp_nanos_shim, "to_timestamp", ), + n if n >= 2 => { + handle_multiple::( + args, + string_to_timestamp_nanos_with_format_shim, + |n| n, + "to_timestamp", + ) + } _ => internal_err!("Unsupported 0 argument count for function to_timestamp"), } } @@ -409,13 +433,17 @@ pub fn to_timestamp_millis(args: &[ColumnarValue]) -> Result { |s| string_to_timestamp_nanos_shim(s).map(|n| n / 1_000_000), "to_timestamp_millis", ), - n if n >= 2 => handle_multiple::( - args, - string_to_timestamp_nanos_with_format_shim, - |n| n / 1_000_000, - "to_timestamp_millis", - ), - _ => internal_err!("Unsupported 0 argument count for function to_timestamp_millis"), + n if n >= 2 => { + handle_multiple::( + args, + string_to_timestamp_nanos_with_format_shim, + |n| n / 1_000_000, + "to_timestamp_millis", + ) + } + _ => { + internal_err!("Unsupported 0 argument count for function to_timestamp_millis") + } } } @@ -427,13 +455,17 @@ pub fn to_timestamp_micros(args: &[ColumnarValue]) -> Result { |s| string_to_timestamp_nanos_shim(s).map(|n| n / 1_000), "to_timestamp_micros", ), - n if n >= 2 => handle_multiple::( - args, - string_to_timestamp_nanos_with_format_shim, - |n| n / 1_000, - "to_timestamp_micros", - ), - _ => internal_err!("Unsupported 0 argument count for function to_timestamp_micros"), + n if n >= 2 => { + handle_multiple::( + args, + string_to_timestamp_nanos_with_format_shim, + |n| n / 1_000, + "to_timestamp_micros", + ) + } + _ => { + internal_err!("Unsupported 0 argument count for function to_timestamp_micros") + } } } @@ -445,13 +477,17 @@ pub fn to_timestamp_nanos(args: &[ColumnarValue]) -> Result { string_to_timestamp_nanos_shim, "to_timestamp_nanos", ), - n if n >= 2 => handle_multiple::( - args, - string_to_timestamp_nanos_with_format_shim, - |n| n, - "to_timestamp_nanos", - ), - _ => internal_err!("Unsupported 0 argument count for function to_timestamp_nanos"), + n if n >= 2 => { + handle_multiple::( + args, + string_to_timestamp_nanos_with_format_shim, + |n| n, + "to_timestamp_nanos", + ) + } + _ => { + internal_err!("Unsupported 0 argument count for function to_timestamp_nanos") + } } } @@ -469,9 +505,10 @@ pub fn to_timestamp_seconds(args: &[ColumnarValue]) -> Result { |n| n / 1_000_000_000, "to_timestamp_seconds", ), - _ => internal_err!("Unsupported 0 argument count for function to_timestamp_seconds"), + _ => internal_err!( + "Unsupported 0 argument count for function to_timestamp_seconds" + ), } - } /// Create an implementation of `now()` that always returns the @@ -1216,7 +1253,7 @@ where /// to_timestamp() SQL function implementation pub fn to_timestamp_invoke(args: &[ColumnarValue]) -> Result { - if args.len() == 0 { + if args.is_empty() { return internal_err!( "to_timestamp function requires 1 or more arguments, got {}", args.len() @@ -1225,7 +1262,7 @@ pub fn to_timestamp_invoke(args: &[ColumnarValue]) -> Result { // validate that any args after the first one are Utf8 if args.len() > 1 { - for (idx, a) in args.into_iter().skip(1).enumerate() { + for (idx, a) in args.iter().skip(1).enumerate() { if a.data_type() != DataType::Utf8 { return internal_err!( "to_timestamp function unsupported data type at index {}: {}", @@ -1264,7 +1301,7 @@ pub fn to_timestamp_invoke(args: &[ColumnarValue]) -> Result { /// to_timestamp_millis() SQL function implementation pub fn to_timestamp_millis_invoke(args: &[ColumnarValue]) -> Result { - if args.len() == 0 { + if args.is_empty() { return internal_err!( "to_timestamp_millis function requires 1 or more arguments, got {}", args.len() @@ -1273,7 +1310,7 @@ pub fn to_timestamp_millis_invoke(args: &[ColumnarValue]) -> Result 1 { - for (idx, a) in args.into_iter().skip(1).enumerate() { + for (idx, a) in args.iter().skip(1).enumerate() { if a.data_type() != DataType::Utf8 { return internal_err!( "to_timestamp_millis function unsupported data type at index {}: {}", @@ -1285,7 +1322,10 @@ pub fn to_timestamp_millis_invoke(args: &[ColumnarValue]) -> Result cast_column( + DataType::Null + | DataType::Int32 + | DataType::Int64 + | DataType::Timestamp(_, None) => cast_column( &args[0], &DataType::Timestamp(TimeUnit::Millisecond, None), None, @@ -1302,7 +1342,7 @@ pub fn to_timestamp_millis_invoke(args: &[ColumnarValue]) -> Result Result { - if args.len() == 0 { + if args.is_empty() { return internal_err!( "to_timestamp_micros function requires 1 or more arguments, got {}", args.len() @@ -1311,7 +1351,7 @@ pub fn to_timestamp_micros_invoke(args: &[ColumnarValue]) -> Result 1 { - for (idx, a) in args.into_iter().skip(1).enumerate() { + for (idx, a) in args.iter().skip(1).enumerate() { if a.data_type() != DataType::Utf8 { return internal_err!( "to_timestamp_micros function unsupported data type at index {}: {}", @@ -1323,7 +1363,10 @@ pub fn to_timestamp_micros_invoke(args: &[ColumnarValue]) -> Result cast_column( + DataType::Null + | DataType::Int32 + | DataType::Int64 + | DataType::Timestamp(_, None) => cast_column( &args[0], &DataType::Timestamp(TimeUnit::Microsecond, None), None, @@ -1340,7 +1383,7 @@ pub fn to_timestamp_micros_invoke(args: &[ColumnarValue]) -> Result Result { - if args.len() == 0 { + if args.is_empty() { return internal_err!( "to_timestamp_nanos function requires 1 or more arguments, got {}", args.len() @@ -1349,7 +1392,7 @@ pub fn to_timestamp_nanos_invoke(args: &[ColumnarValue]) -> Result 1 { - for (idx, a) in args.into_iter().skip(1).enumerate() { + for (idx, a) in args.iter().skip(1).enumerate() { if a.data_type() != DataType::Utf8 { return internal_err!( "to_timestamp_nanos function unsupported data type at index {}: {}", @@ -1361,7 +1404,10 @@ pub fn to_timestamp_nanos_invoke(args: &[ColumnarValue]) -> Result cast_column( + DataType::Null + | DataType::Int32 + | DataType::Int64 + | DataType::Timestamp(_, None) => cast_column( &args[0], &DataType::Timestamp(TimeUnit::Nanosecond, None), None, @@ -1378,7 +1424,7 @@ pub fn to_timestamp_nanos_invoke(args: &[ColumnarValue]) -> Result Result { - if args.len() == 0 { + if args.is_empty() { return internal_err!( "to_timestamp_seconds function requires 1 or more arguments, got {}", args.len() @@ -1387,7 +1433,7 @@ pub fn to_timestamp_seconds_invoke(args: &[ColumnarValue]) -> Result 1 { - for (idx, a) in args.into_iter().skip(1).enumerate() { + for (idx, a) in args.iter().skip(1).enumerate() { if a.data_type() != DataType::Utf8 { return internal_err!( "to_timestamp_seconds function unsupported data type at index {}: {}", @@ -1399,7 +1445,10 @@ pub fn to_timestamp_seconds_invoke(args: &[ColumnarValue]) -> Result { + DataType::Null + | DataType::Int32 + | DataType::Int64 + | DataType::Timestamp(_, None) => { cast_column(&args[0], &DataType::Timestamp(TimeUnit::Second, None), None) } DataType::Utf8 => to_timestamp_seconds(args), @@ -1482,7 +1531,6 @@ mod tests { let mut format3_builder = StringBuilder::with_capacity(2, 1024); let mut ts_builder = TimestampNanosecondArray::builder(2); - date_string_builder.append_null(); format1_builder.append_null(); format2_builder.append_null(); @@ -2091,7 +2139,7 @@ mod tests { builder.append_value(1); let int64array = [ ColumnarValue::Array(Arc::new(builder.finish())), - ColumnarValue::Array(Arc::new(builder.finish())) + ColumnarValue::Array(Arc::new(builder.finish())), ]; let expected_err = @@ -2116,7 +2164,8 @@ mod tests { date_string_builder.append_value("2020-09-08 - 13:42:29.19085Z"); - let string_array = ColumnarValue::Array(Arc::new(date_string_builder.finish()) as ArrayRef); + let string_array = + ColumnarValue::Array(Arc::new(date_string_builder.finish()) as ArrayRef); let expected_err = "Arrow error: Parser error: Error parsing timestamp from '2020-09-08 - 13:42:29.19085Z': error parsing time"; @@ -2195,11 +2244,12 @@ mod tests { ); assert_eq!( 1599572549000000000, - parse_timestamp_formatted("09-08-2020 13/42/29", "%m-%d-%Y %H/%M/%S").unwrap() + parse_timestamp_formatted("09-08-2020 13/42/29", "%m-%d-%Y %H/%M/%S") + .unwrap() ); } - fn parse_timestamp_formatted(s: &str, format: & str) -> Result { + fn parse_timestamp_formatted(s: &str, format: &str) -> Result { let result = string_to_timestamp_nanos_formatted(s, format); if let Err(e) = &result { eprintln!("Error parsing timestamp '{s}' using format '{format}': {e:?}"); @@ -2214,13 +2264,23 @@ mod tests { ("", "%Y%m%d %H%M%S", "premature end of input"), ("SS", "%c", "premature end of input"), ("Wed, 18 Feb 2015 23:16:09 GMT", "", "trailing input"), - ("Wed, 18 Feb 2015 23:16:09 GMT", "%XX", "input contains invalid characters"), - ("Wed, 18 Feb 2015 23:16:09 GMT", "%Y%m%d %H%M%S", "input contains invalid characters"), + ( + "Wed, 18 Feb 2015 23:16:09 GMT", + "%XX", + "input contains invalid characters", + ), + ( + "Wed, 18 Feb 2015 23:16:09 GMT", + "%Y%m%d %H%M%S", + "input contains invalid characters", + ), ]; for (s, f, ctx) in cases { let expected = format!("Execution error: Error parsing timestamp from '{s}' using format '{f}': {ctx}"); - let actual = string_to_datetime_formatted(&Utc, s, f).unwrap_err().to_string(); + let actual = string_to_datetime_formatted(&Utc, s, f) + .unwrap_err() + .to_string(); assert_eq!(actual, expected) } } diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 4eac78d415d3..b602d16f7fda 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -1689,36 +1689,36 @@ pub fn parse_expr( parse_expr(&args[1], registry)?, )), ScalarFunction::ToHex => Ok(to_hex(parse_expr(&args[0], registry)?)), - ScalarFunction::ToTimestamp => { - Ok(to_timestamp_seconds(args.to_owned() + ScalarFunction::ToTimestamp => Ok(to_timestamp_seconds( + args.to_owned() .iter() .map(|expr| parse_expr(expr, registry)) - .collect::, _>>()?)) - } - ScalarFunction::ToTimestampMillis => { - Ok(to_timestamp_millis(args.to_owned() + .collect::, _>>()?, + )), + ScalarFunction::ToTimestampMillis => Ok(to_timestamp_millis( + args.to_owned() .iter() .map(|expr| parse_expr(expr, registry)) - .collect::, _>>()?)) - } - ScalarFunction::ToTimestampMicros => { - Ok(to_timestamp_micros(args.to_owned() + .collect::, _>>()?, + )), + ScalarFunction::ToTimestampMicros => Ok(to_timestamp_micros( + args.to_owned() .iter() .map(|expr| parse_expr(expr, registry)) - .collect::, _>>()?)) - } - ScalarFunction::ToTimestampNanos => { - Ok(to_timestamp_nanos(args.to_owned() + .collect::, _>>()?, + )), + ScalarFunction::ToTimestampNanos => Ok(to_timestamp_nanos( + args.to_owned() .iter() .map(|expr| parse_expr(expr, registry)) - .collect::, _>>()?)) - } - ScalarFunction::ToTimestampSeconds => { - Ok(to_timestamp_seconds(args.to_owned() + .collect::, _>>()?, + )), + ScalarFunction::ToTimestampSeconds => Ok(to_timestamp_seconds( + args.to_owned() .iter() .map(|expr| parse_expr(expr, registry)) - .collect::, _>>()?)) - } + .collect::, _>>()?, + )), ScalarFunction::Now => Ok(now()), ScalarFunction::Translate => Ok(translate( parse_expr(&args[0], registry)?, From c06adcb32e960d0ec88deebfd94d07c66a992935 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Tue, 16 Jan 2024 17:32:54 -0500 Subject: [PATCH 06/14] Switched to assert and unwrap_err based on feedback --- datafusion-examples/examples/dataframe_to_timestamp.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/datafusion-examples/examples/dataframe_to_timestamp.rs b/datafusion-examples/examples/dataframe_to_timestamp.rs index b988378cafee..0b64647cebd9 100644 --- a/datafusion-examples/examples/dataframe_to_timestamp.rs +++ b/datafusion-examples/examples/dataframe_to_timestamp.rs @@ -101,11 +101,7 @@ async fn main() -> Result<()> { .collect() .await; - if result.is_err() { - println!("Received the expected error: {:?}", result.err().unwrap()); - } else { - panic!("timestamp parsing with no matching formats should fail") - } + assert_eq!(result.unwrap_err(), "timestamp parsing with no matching formats should fail"); Ok(()) } From d14bb0ba324f420a9b42dd5a2f4d9dc980599229 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Tue, 16 Jan 2024 17:48:24 -0500 Subject: [PATCH 07/14] Fixed assert, code compiles and runs as expected now. --- datafusion-examples/examples/dataframe_to_timestamp.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion-examples/examples/dataframe_to_timestamp.rs b/datafusion-examples/examples/dataframe_to_timestamp.rs index 0b64647cebd9..8caa9245596b 100644 --- a/datafusion-examples/examples/dataframe_to_timestamp.rs +++ b/datafusion-examples/examples/dataframe_to_timestamp.rs @@ -22,6 +22,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::Result; use datafusion::prelude::*; +use datafusion_common::assert_contains; /// This example demonstrates how to use the to_timestamp function in the DataFrame API as well as via sql. #[tokio::main] @@ -101,7 +102,8 @@ async fn main() -> Result<()> { .collect() .await; - assert_eq!(result.unwrap_err(), "timestamp parsing with no matching formats should fail"); + let expected = "Error parsing timestamp from '01-14-2023 01/01/30' using format '%d-%m-%Y %H:%M:%S': input contains invalid characters"; + assert_contains!(result.unwrap_err().to_string(), expected); Ok(()) } From a83b900bb5b7290c33684163b18cfc33f654b751 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Tue, 16 Jan 2024 18:11:06 -0500 Subject: [PATCH 08/14] Fix fmt (again). --- datafusion/physical-expr/src/datetime_expressions.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs index 190f4d26623f..cc2fc0b0ffef 100644 --- a/datafusion/physical-expr/src/datetime_expressions.rs +++ b/datafusion/physical-expr/src/datetime_expressions.rs @@ -332,9 +332,7 @@ where ))) } other => { - internal_err!( - "Unsupported data type {other:?} for function {name}" - ) + internal_err!("Unsupported data type {other:?} for function {name}") } }, // if the first argument is a scalar utf8 all arguments are expected to be scalar utf8 @@ -382,9 +380,7 @@ where } } other => { - internal_err!( - "Unsupported data type {other:?} for function {name}" - ) + internal_err!("Unsupported data type {other:?} for function {name}") } }, } From db071cc50583f5b3c3b7df27a50871ac67b2d2ab Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Thu, 18 Jan 2024 16:24:43 -0500 Subject: [PATCH 09/14] Add additional to_timestamp tests covering usage with tables with and without valid formats. --- .../sqllogictest/test_files/timestamps.slt | 53 ++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/timestamps.slt b/datafusion/sqllogictest/test_files/timestamps.slt index 8069177a0416..5c7687aa27b2 100644 --- a/datafusion/sqllogictest/test_files/timestamps.slt +++ b/datafusion/sqllogictest/test_files/timestamps.slt @@ -18,7 +18,7 @@ ########## ## Common timestamp data # -# ts_data: Int64 nanosecods +# ts_data: Int64 nanoseconds # ts_data_nanos: Timestamp(Nanosecond, None) # ts_data_micros: Timestamp(Microsecond, None) # ts_data_millis: Timestamp(Millisecond, None) @@ -2009,6 +2009,57 @@ SELECT to_timestamp_micros('2020-09-08 12/00/00+00:00', '%q') query error bad or unsupported format string SELECT to_timestamp_seconds('2020-09-08 12/00/00+00:00', '%q') +# Create string timestamp table with different formats +# including a few very non-standard formats + +statement ok +create table ts_utf8_data(ts varchar(100), format varchar(100)) as values + ('2020-09-08 12/00/00+00:00', '%Y-%m-%d %H/%M/%S%#z'), + ('2031-01-19T23:33:25+05:00', '%+'), + ('08-09-2020 12:00:00+00:00', '%d-%m-%Y %H:%M:%S%#z'), + ('1926632005', '%s'), + ('2000-01-01T01:01:01+07:00', '%+'); + +# verify timestamp data using tables with formatting options +query P +SELECT to_timestamp(t.ts, t.format) from ts_utf8_data as t +---- +2020-09-08T12:00:00 +2031-01-19T18:33:25 +2020-09-08T12:00:00 +2031-01-19T23:33:25 +1999-12-31T18:01:01 + +# verify timestamp data using tables with formatting options +query P +SELECT to_timestamp(t.ts, '%Y-%m-%d %H/%M/%S%#z', '%+', '%s', '%d-%m-%Y %H:%M:%S%#z') from ts_utf8_data as t +---- +2020-09-08T12:00:00 +2031-01-19T18:33:25 +2020-09-08T12:00:00 +2031-01-19T23:33:25 +1999-12-31T18:01:01 + +# verify timestamp data using tables with formatting options where at least one column cannot be parsed +query error Error parsing timestamp from '1926632005' using format '%d-%m-%Y %H:%M:%S%#z': input contains invalid characters +SELECT to_timestamp(t.ts, '%Y-%m-%d %H/%M/%S%#z', '%+', '%d-%m-%Y %H:%M:%S%#z') from ts_utf8_data as t + +# verify timestamp data using tables with formatting options where one of the formats is invalid +query P +SELECT to_timestamp(t.ts, '%Y-%m-%d %H/%M/%S%#z', '%s', '%q', '%d-%m-%Y %H:%M:%S%#z', '%+') from ts_utf8_data as t +---- +2020-09-08T12:00:00 +2031-01-19T18:33:25 +2020-09-08T12:00:00 +2031-01-19T23:33:25 +1999-12-31T18:01:01 + +# timestamp data using tables with formatting options in an array is not supported at this time +query error function unsupported data type at index 1: +SELECT to_timestamp(t.ts, make_array('%Y-%m-%d %H/%M/%S%#z', '%s', '%q', '%d-%m-%Y %H:%M:%S%#z', '%+')) from ts_utf8_data as t + +statement ok +drop table ts_utf8_data ########## ## Test binary temporal coercion for Date and Timestamp From 40d932cde5c9b131c8012a7e20ae13aecc98e4eb Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Thu, 18 Jan 2024 16:26:09 -0500 Subject: [PATCH 10/14] to_timestamp documentation fixes. --- datafusion/expr/src/expr_fn.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 68770459eabb..ae534f4bb44b 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -888,27 +888,27 @@ scalar_expr!(DateBin, date_bin, stride source origin, "coerces an arbitrary time nary_scalar_expr!( ToTimestamp, to_timestamp, - "converts a string to a `Timestamp(Nanoseconds, None)`" + "converts a string and optional formats to a `Timestamp(Nanoseconds, None)`" ); nary_scalar_expr!( ToTimestampMillis, to_timestamp_millis, - "converts a string to a `Timestamp(Milliseconds, None)`" + "converts a string and optional formats to a `Timestamp(Milliseconds, None)`" ); nary_scalar_expr!( ToTimestampMicros, to_timestamp_micros, - "converts a string to a `Timestamp(Microseconds, None)`" + "converts a string and optional formats to a `Timestamp(Microseconds, None)`" ); nary_scalar_expr!( ToTimestampNanos, to_timestamp_nanos, - "converts a string to a `Timestamp(Nanoseconds, None)`" + "converts a string and optional formats to a `Timestamp(Nanoseconds, None)`" ); nary_scalar_expr!( ToTimestampSeconds, to_timestamp_seconds, - "converts a string to a `Timestamp(Seconds, None)`" + "converts a string and optional formats to a `Timestamp(Seconds, None)`" ); scalar_expr!( FromUnixtime, From 77f08ff983a8a3604a78e8d9eb92cc2b16163862 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Thu, 18 Jan 2024 16:30:41 -0500 Subject: [PATCH 11/14] - Changed internal_err! -> exec_err! for unsupported data type errors. - Extracted out to_timestamp_impl method to reduce code duplication as per PR feedback. - Extracted out validate_to_timestamp_data_types to reduce code duplication as per PR feedback. - Added additional tests for argument validation and invalid arguments. - Removed unnecessary shim function 'string_to_timestamp_nanos_with_format_shim' --- .../physical-expr/src/datetime_expressions.rs | 368 +++++++++++------- 1 file changed, 228 insertions(+), 140 deletions(-) diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs index cc2fc0b0ffef..eb2dcbdd3424 100644 --- a/datafusion/physical-expr/src/datetime_expressions.rs +++ b/datafusion/physical-expr/src/datetime_expressions.rs @@ -279,14 +279,14 @@ where DataType::Utf8 | DataType::LargeUtf8 => Ok(ColumnarValue::Array(Arc::new( unary_string_to_primitive_function::(&[a.as_ref()], op, name)?, ))), - other => internal_err!("Unsupported data type {other:?} for function {name}"), + other => exec_err!("Unsupported data type {other:?} for function {name}"), }, ColumnarValue::Scalar(scalar) => match scalar { ScalarValue::Utf8(a) | ScalarValue::LargeUtf8(a) => { let result = a.as_ref().map(|x| (op)(x)).transpose()?; Ok(ColumnarValue::Scalar(S::scalar(result))) } - other => internal_err!("Unsupported data type {other:?} for function {name}"), + other => exec_err!("Unsupported data type {other:?} for function {name}"), }, } } @@ -316,13 +316,13 @@ where DataType::Utf8 | DataType::LargeUtf8 => { // all good }, - other => return internal_err!("Unsupported data type {other:?} for function {name}, arg # {pos}"), + other => return exec_err!("Unsupported data type {other:?} for function {name}, arg # {pos}"), }, ColumnarValue::Scalar(arg) => { match arg.data_type() { DataType::Utf8 | DataType::LargeUtf8 => { // all good }, - other => return internal_err!("Unsupported data type {other:?} for function {name}, arg # {pos}"), + other => return exec_err!("Unsupported data type {other:?} for function {name}, arg # {pos}"), }} } } @@ -332,7 +332,7 @@ where ))) } other => { - internal_err!("Unsupported data type {other:?} for function {name}") + exec_err!("Unsupported data type {other:?} for function {name}") } }, // if the first argument is a scalar utf8 all arguments are expected to be scalar utf8 @@ -363,10 +363,10 @@ where } } } else { - return internal_err!("Unsupported data type {s:?} for function {name}, arg # {pos}"); + return exec_err!("Unsupported data type {s:?} for function {name}, arg # {pos}"); } } else { - return internal_err!("Unsupported data type {v:?} for function {name}, arg # {pos}"); + return exec_err!("Unsupported data type {v:?} for function {name}, arg # {pos}"); } } } @@ -380,7 +380,7 @@ where } } other => { - internal_err!("Unsupported data type {other:?} for function {name}") + exec_err!("Unsupported data type {other:?} for function {name}") } }, } @@ -391,9 +391,31 @@ fn string_to_timestamp_nanos_shim(s: &str) -> Result { string_to_timestamp_nanos(s).map_err(|e| e.into()) } -/// Calls string_to_timestamp_nanos_formatted and converts the error type -fn string_to_timestamp_nanos_with_format_shim(s: &str, f: &str) -> Result { - string_to_timestamp_nanos_formatted(s, f) +fn to_timestamp_impl>( + args: &[ColumnarValue], + name: &str, +) -> Result { + let factor = match T::UNIT { + TimeUnit::Second => 1_000_000_000, + TimeUnit::Millisecond => 1_000_000, + TimeUnit::Microsecond => 1_000, + TimeUnit::Nanosecond => 1, + }; + + match args.len() { + 1 => handle::( + args, + |s| string_to_timestamp_nanos_shim(s).map(|n| n / factor), + name, + ), + n if n >= 2 => handle_multiple::( + args, + string_to_timestamp_nanos_formatted, + |n| n / factor, + name, + ), + _ => internal_err!("Unsupported 0 argument count for function {name}"), + } } /// to_timestamp SQL function @@ -403,108 +425,27 @@ fn string_to_timestamp_nanos_with_format_shim(s: &str, f: &str) -> Result { /// Supported range for string input is between `1677-09-21T00:12:44.0` and `2262-04-11T23:47:16.0`. /// Please use `to_timestamp_seconds` for the input outside of supported bounds. pub fn to_timestamp(args: &[ColumnarValue]) -> Result { - match args.len() { - 1 => handle::( - args, - string_to_timestamp_nanos_shim, - "to_timestamp", - ), - n if n >= 2 => { - handle_multiple::( - args, - string_to_timestamp_nanos_with_format_shim, - |n| n, - "to_timestamp", - ) - } - _ => internal_err!("Unsupported 0 argument count for function to_timestamp"), - } + to_timestamp_impl::(args, "to_timestamp") } /// to_timestamp_millis SQL function pub fn to_timestamp_millis(args: &[ColumnarValue]) -> Result { - match args.len() { - 1 => handle::( - args, - |s| string_to_timestamp_nanos_shim(s).map(|n| n / 1_000_000), - "to_timestamp_millis", - ), - n if n >= 2 => { - handle_multiple::( - args, - string_to_timestamp_nanos_with_format_shim, - |n| n / 1_000_000, - "to_timestamp_millis", - ) - } - _ => { - internal_err!("Unsupported 0 argument count for function to_timestamp_millis") - } - } + to_timestamp_impl::(args, "to_timestamp_millis") } /// to_timestamp_micros SQL function pub fn to_timestamp_micros(args: &[ColumnarValue]) -> Result { - match args.len() { - 1 => handle::( - args, - |s| string_to_timestamp_nanos_shim(s).map(|n| n / 1_000), - "to_timestamp_micros", - ), - n if n >= 2 => { - handle_multiple::( - args, - string_to_timestamp_nanos_with_format_shim, - |n| n / 1_000, - "to_timestamp_micros", - ) - } - _ => { - internal_err!("Unsupported 0 argument count for function to_timestamp_micros") - } - } + to_timestamp_impl::(args, "to_timestamp_micros") } /// to_timestamp_nanos SQL function pub fn to_timestamp_nanos(args: &[ColumnarValue]) -> Result { - match args.len() { - 1 => handle::( - args, - string_to_timestamp_nanos_shim, - "to_timestamp_nanos", - ), - n if n >= 2 => { - handle_multiple::( - args, - string_to_timestamp_nanos_with_format_shim, - |n| n, - "to_timestamp_nanos", - ) - } - _ => { - internal_err!("Unsupported 0 argument count for function to_timestamp_nanos") - } - } + to_timestamp_impl::(args, "to_timestamp_nanos") } /// to_timestamp_seconds SQL function pub fn to_timestamp_seconds(args: &[ColumnarValue]) -> Result { - match args.len() { - 1 => handle::( - args, - |s| string_to_timestamp_nanos_shim(s).map(|n| n / 1_000_000_000), - "to_timestamp_seconds", - ), - n if n >= 2 => handle_multiple::( - args, - string_to_timestamp_nanos_with_format_shim, - |n| n / 1_000_000_000, - "to_timestamp_seconds", - ), - _ => internal_err!( - "Unsupported 0 argument count for function to_timestamp_seconds" - ), - } + to_timestamp_impl::(args, "to_timestamp_seconds") } /// Create an implementation of `now()` that always returns the @@ -1247,6 +1188,28 @@ where Ok(b) } +fn validate_to_timestamp_data_types( + args: &[ColumnarValue], + name: &str, +) -> Option> { + for (idx, a) in args.iter().skip(1).enumerate() { + match a.data_type() { + DataType::Utf8 | DataType::LargeUtf8 => { + // all good + } + _ => { + return Some(internal_err!( + "{name} function unsupported data type at index {}: {}", + idx + 1, + a.data_type() + )); + } + } + } + + None +} + /// to_timestamp() SQL function implementation pub fn to_timestamp_invoke(args: &[ColumnarValue]) -> Result { if args.is_empty() { @@ -1258,14 +1221,8 @@ pub fn to_timestamp_invoke(args: &[ColumnarValue]) -> Result { // validate that any args after the first one are Utf8 if args.len() > 1 { - for (idx, a) in args.iter().skip(1).enumerate() { - if a.data_type() != DataType::Utf8 { - return internal_err!( - "to_timestamp function unsupported data type at index {}: {}", - idx, - a.data_type() - ); - } + if let Some(value) = validate_to_timestamp_data_types(args, "to_timestamp") { + return value; } } @@ -1306,14 +1263,9 @@ pub fn to_timestamp_millis_invoke(args: &[ColumnarValue]) -> Result 1 { - for (idx, a) in args.iter().skip(1).enumerate() { - if a.data_type() != DataType::Utf8 { - return internal_err!( - "to_timestamp_millis function unsupported data type at index {}: {}", - idx, - a.data_type() - ); - } + if let Some(value) = validate_to_timestamp_data_types(args, "to_timestamp_millis") + { + return value; } } @@ -1347,14 +1299,9 @@ pub fn to_timestamp_micros_invoke(args: &[ColumnarValue]) -> Result 1 { - for (idx, a) in args.iter().skip(1).enumerate() { - if a.data_type() != DataType::Utf8 { - return internal_err!( - "to_timestamp_micros function unsupported data type at index {}: {}", - idx, - a.data_type() - ); - } + if let Some(value) = validate_to_timestamp_data_types(args, "to_timestamp_micros") + { + return value; } } @@ -1388,14 +1335,9 @@ pub fn to_timestamp_nanos_invoke(args: &[ColumnarValue]) -> Result 1 { - for (idx, a) in args.iter().skip(1).enumerate() { - if a.data_type() != DataType::Utf8 { - return internal_err!( - "to_timestamp_nanos function unsupported data type at index {}: {}", - idx, - a.data_type() - ); - } + if let Some(value) = validate_to_timestamp_data_types(args, "to_timestamp_nanos") + { + return value; } } @@ -1429,14 +1371,10 @@ pub fn to_timestamp_seconds_invoke(args: &[ColumnarValue]) -> Result 1 { - for (idx, a) in args.iter().skip(1).enumerate() { - if a.data_type() != DataType::Utf8 { - return internal_err!( - "to_timestamp_seconds function unsupported data type at index {}: {}", - idx, - a.data_type() - ); - } + if let Some(value) = + validate_to_timestamp_data_types(args, "to_timestamp_seconds") + { + return value; } } @@ -1486,7 +1424,12 @@ mod tests { use arrow::array::{ as_primitive_array, ArrayRef, Int64Array, IntervalDayTimeArray, StringBuilder, }; - use arrow_array::TimestampNanosecondArray; + use arrow_array::types::Int64Type; + use arrow_array::{ + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, + }; + use datafusion_common::assert_contains; use super::*; @@ -2113,7 +2056,7 @@ mod tests { let int64array = ColumnarValue::Array(Arc::new(builder.finish())); let expected_err = - "Internal error: Unsupported data type Int64 for function to_timestamp"; + "Execution error: Unsupported data type Int64 for function to_timestamp"; match to_timestamp(&[int64array]) { Ok(_) => panic!("Expected error but got success"), Err(e) => { @@ -2139,7 +2082,7 @@ mod tests { ]; let expected_err = - "Internal error: Unsupported data type Int64 for function to_timestamp"; + "Execution error: Unsupported data type Int64 for function to_timestamp"; match to_timestamp(&int64array) { Ok(_) => panic!("Expected error but got success"), Err(e) => { @@ -2280,4 +2223,149 @@ mod tests { assert_eq!(actual, expected) } } + + #[test] + fn string_to_timestamp_invalid_arguments() { + // Test parsing invalid formats + let cases = [ + ("", "%Y%m%d %H%M%S", "premature end of input"), + ("SS", "%c", "premature end of input"), + ("Wed, 18 Feb 2015 23:16:09 GMT", "", "trailing input"), + ( + "Wed, 18 Feb 2015 23:16:09 GMT", + "%XX", + "input contains invalid characters", + ), + ( + "Wed, 18 Feb 2015 23:16:09 GMT", + "%Y%m%d %H%M%S", + "input contains invalid characters", + ), + ]; + + for (s, f, ctx) in cases { + let expected = format!("Execution error: Error parsing timestamp from '{s}' using format '{f}': {ctx}"); + let actual = string_to_datetime_formatted(&Utc, s, f) + .unwrap_err() + .to_string(); + assert_eq!(actual, expected) + } + } + + #[test] + fn test_to_timestamp_arg_validation() { + let mut date_string_builder = StringBuilder::with_capacity(2, 1024); + date_string_builder.append_value("2020-09-08T13:42:29.19085Z"); + + let data = date_string_builder.finish(); + let mut funcs = Vec::<( + Box Result>, + TimeUnit, + )>::new(); + funcs.push((Box::new(|a| to_timestamp(a)), TimeUnit::Nanosecond)); + funcs.push((Box::new(|a| to_timestamp_micros(a)), TimeUnit::Microsecond)); + funcs.push((Box::new(|a| to_timestamp_millis(a)), TimeUnit::Millisecond)); + funcs.push((Box::new(|a| to_timestamp_nanos(a)), TimeUnit::Nanosecond)); + funcs.push((Box::new(|a| to_timestamp_seconds(a)), TimeUnit::Second)); + + let mut nanos_builder = TimestampNanosecondArray::builder(2); + let mut millis_builder = TimestampMillisecondArray::builder(2); + let mut micros_builder = TimestampMicrosecondArray::builder(2); + let mut sec_builder = TimestampSecondArray::builder(2); + + nanos_builder.append_value(1599572549190850000); + millis_builder.append_value(1599572549190); + micros_builder.append_value(1599572549190850); + sec_builder.append_value(1599572549); + + let nanos_expected_timestamps = &nanos_builder.finish() as &dyn Array; + let millis_expected_timestamps = &millis_builder.finish() as &dyn Array; + let micros_expected_timestamps = µs_builder.finish() as &dyn Array; + let sec_expected_timestamps = &sec_builder.finish() as &dyn Array; + + for (func, time_unit) in funcs { + // test UTF8 + let string_array = [ + ColumnarValue::Array(Arc::new(data.clone()) as ArrayRef), + ColumnarValue::Scalar(ScalarValue::Utf8(Some("%s".to_string()))), + ColumnarValue::Scalar(ScalarValue::Utf8(Some("%c".to_string()))), + ColumnarValue::Scalar(ScalarValue::Utf8(Some("%+".to_string()))), + ]; + let parsed_timestamps = func(&string_array) + .expect("that to_timestamp with format args parsed values without error"); + if let ColumnarValue::Array(parsed_array) = parsed_timestamps { + assert_eq!(parsed_array.len(), 1); + match time_unit { + TimeUnit::Nanosecond => { + assert_eq!(nanos_expected_timestamps, parsed_array.as_ref()) + } + TimeUnit::Millisecond => { + assert_eq!(millis_expected_timestamps, parsed_array.as_ref()) + } + TimeUnit::Microsecond => { + assert_eq!(micros_expected_timestamps, parsed_array.as_ref()) + } + TimeUnit::Second => { + assert_eq!(sec_expected_timestamps, parsed_array.as_ref()) + } + }; + } else { + panic!("Expected a columnar array") + } + + // test LargeUTF8 + let string_array = [ + ColumnarValue::Array(Arc::new(data.clone()) as ArrayRef), + ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some("%s".to_string()))), + ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some("%c".to_string()))), + ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some("%+".to_string()))), + ]; + let parsed_timestamps = func(&string_array) + .expect("that to_timestamp with format args parsed values without error"); + if let ColumnarValue::Array(parsed_array) = parsed_timestamps { + assert_eq!(parsed_array.len(), 1); + match time_unit { + TimeUnit::Nanosecond => { + assert_eq!(nanos_expected_timestamps, parsed_array.as_ref()) + } + TimeUnit::Millisecond => { + assert_eq!(millis_expected_timestamps, parsed_array.as_ref()) + } + TimeUnit::Microsecond => { + assert_eq!(micros_expected_timestamps, parsed_array.as_ref()) + } + TimeUnit::Second => { + assert_eq!(sec_expected_timestamps, parsed_array.as_ref()) + } + }; + } else { + panic!("Expected a columnar array") + } + + // test other types + let string_array = [ + ColumnarValue::Array(Arc::new(data.clone()) as ArrayRef), + ColumnarValue::Scalar(ScalarValue::Int32(Some(1))), + ColumnarValue::Scalar(ScalarValue::Int32(Some(2))), + ColumnarValue::Scalar(ScalarValue::Int32(Some(3))), + ]; + + let expected = "Unsupported data type Int32 for function".to_string(); + let actual = func(&string_array).unwrap_err().to_string(); + assert_contains!(actual, expected); + + // test other types + let string_array = [ + ColumnarValue::Array(Arc::new(data.clone()) as ArrayRef), + ColumnarValue::Array(Arc::new(PrimitiveArray::::new( + vec![1i64].into(), + None, + )) as ArrayRef), + ]; + + let expected = "Unsupported data type".to_string(); + let actual = func(&string_array).unwrap_err().to_string(); + assert_contains!(actual, expected); + } + } } From 951809a6ccac2c7725978e9d752d6541e7d4d663 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Fri, 19 Jan 2024 13:18:44 -0500 Subject: [PATCH 12/14] Resolved merge conflict, updated toStringXXX methods to reflect upstream change --- .../proto/src/logical_plan/from_proto.rs | 71 ++++++++++++------- 1 file changed, 45 insertions(+), 26 deletions(-) diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 93a5f99810ff..aae19a15b89a 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -65,10 +65,9 @@ use datafusion_expr::{ radians, random, regexp_match, regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum, sin, sinh, split_part, sqrt, starts_with, string_to_array, strpos, struct_fun, substr, substr_index, - substring, tan, tanh, to_hex, to_timestamp_micros, to_timestamp_millis, - to_timestamp_nanos, to_timestamp_seconds, translate, trim, trunc, upper, uuid, - AggregateFunction, Between, BinaryExpr, BuiltInWindowFunction, BuiltinScalarFunction, - Case, Cast, Expr, GetFieldAccess, GetIndexedField, GroupingSet, + substring, tan, tanh, to_hex, translate, trim, trunc, upper, uuid, AggregateFunction, + Between, BinaryExpr, BuiltInWindowFunction, BuiltinScalarFunction, Case, Cast, Expr, + GetFieldAccess, GetIndexedField, GroupingSet, GroupingSet::GroupingSets, JoinConstraint, JoinType, Like, Operator, TryCast, WindowFrame, WindowFrameBound, WindowFrameUnits, @@ -476,7 +475,6 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::Trim => Self::Trim, ScalarFunction::Ltrim => Self::Ltrim, ScalarFunction::Rtrim => Self::Rtrim, - ScalarFunction::ToTimestamp => Self::ToTimestamp, ScalarFunction::ArrayAppend => Self::ArrayAppend, ScalarFunction::ArraySort => Self::ArraySort, ScalarFunction::ArrayConcat => Self::ArrayConcat, @@ -523,7 +521,6 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::Digest => Self::Digest, ScalarFunction::Encode => Self::Encode, ScalarFunction::Decode => Self::Decode, - ScalarFunction::ToTimestampMillis => Self::ToTimestampMillis, ScalarFunction::Log2 => Self::Log2, ScalarFunction::Signum => Self::Signum, ScalarFunction::Ascii => Self::Ascii, @@ -548,6 +545,8 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::Strpos => Self::Strpos, ScalarFunction::Substr => Self::Substr, ScalarFunction::ToHex => Self::ToHex, + ScalarFunction::ToTimestamp => Self::ToTimestamp, + ScalarFunction::ToTimestampMillis => Self::ToTimestampMillis, ScalarFunction::ToTimestampMicros => Self::ToTimestampMicros, ScalarFunction::ToTimestampNanos => Self::ToTimestampNanos, ScalarFunction::ToTimestampSeconds => Self::ToTimestampSeconds, @@ -1689,36 +1688,56 @@ pub fn parse_expr( parse_expr(&args[1], registry)?, )), ScalarFunction::ToHex => Ok(to_hex(parse_expr(&args[0], registry)?)), - ScalarFunction::ToTimestamp => Ok(to_timestamp_seconds( - args.to_owned() + ScalarFunction::ToTimestamp => { + let args: Vec<_> = args .iter() .map(|expr| parse_expr(expr, registry)) - .collect::, _>>()?, - )), - ScalarFunction::ToTimestampMillis => Ok(to_timestamp_millis( - args.to_owned() + .collect::>()?; + Ok(Expr::ScalarFunction(expr::ScalarFunction::new( + BuiltinScalarFunction::ToTimestamp, + args, + ))) + } + ScalarFunction::ToTimestampMillis => { + let args: Vec<_> = args .iter() .map(|expr| parse_expr(expr, registry)) - .collect::, _>>()?, - )), - ScalarFunction::ToTimestampMicros => Ok(to_timestamp_micros( - args.to_owned() + .collect::>()?; + Ok(Expr::ScalarFunction(expr::ScalarFunction::new( + BuiltinScalarFunction::ToTimestampMillis, + args, + ))) + } + ScalarFunction::ToTimestampMicros => { + let args: Vec<_> = args .iter() .map(|expr| parse_expr(expr, registry)) - .collect::, _>>()?, - )), - ScalarFunction::ToTimestampNanos => Ok(to_timestamp_nanos( - args.to_owned() + .collect::>()?; + Ok(Expr::ScalarFunction(expr::ScalarFunction::new( + BuiltinScalarFunction::ToTimestampMicros, + args, + ))) + } + ScalarFunction::ToTimestampNanos => { + let args: Vec<_> = args .iter() .map(|expr| parse_expr(expr, registry)) - .collect::, _>>()?, - )), - ScalarFunction::ToTimestampSeconds => Ok(to_timestamp_seconds( - args.to_owned() + .collect::>()?; + Ok(Expr::ScalarFunction(expr::ScalarFunction::new( + BuiltinScalarFunction::ToTimestampNanos, + args, + ))) + } + ScalarFunction::ToTimestampSeconds => { + let args: Vec<_> = args .iter() .map(|expr| parse_expr(expr, registry)) - .collect::, _>>()?, - )), + .collect::>()?; + Ok(Expr::ScalarFunction(expr::ScalarFunction::new( + BuiltinScalarFunction::ToTimestampSeconds, + args, + ))) + } ScalarFunction::Now => Ok(now()), ScalarFunction::Translate => Ok(translate( parse_expr(&args[0], registry)?, From 406fd77b3b4e205671127268d521a80bfbbb1a09 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 19 Jan 2024 16:07:23 -0500 Subject: [PATCH 13/14] prettier --- docs/source/user-guide/sql/scalar_functions.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index 9faf39627f7b..c72ef94f42ea 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -1476,7 +1476,7 @@ Integers, unsigned integers, and doubles are interpreted as seconds since the un Returns the corresponding timestamp. Note: `to_timestamp` returns `Timestamp(Nanosecond)`. The supported range for integer input is between `-9223372037` and `9223372036`. -Supported range for string input is between `1677-09-21T00:12:44.0` and `2262-04-11T23:47:16.0`. Please use `to_timestamp_seconds` +Supported range for string input is between `1677-09-21T00:12:44.0` and `2262-04-11T23:47:16.0`. Please use `to_timestamp_seconds` for the input outside of supported bounds. ``` @@ -1491,7 +1491,7 @@ to_timestamp(expression[, ..., format_n]) they appear with the first successful one being returned. If none of the formats successfully parse the expression an error will be returned. -[Chrono format]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html +[chrono format]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html ### `to_timestamp_millis` @@ -1552,6 +1552,7 @@ to_timestamp_nanos(expression[, ..., format_n]) - **format_n**: Optional [Chrono format] strings to use to parse the expression. Formats will be tried in the order they appear with the first successful one being returned. If none of the formats successfully parse the expression an error will be returned. + ### `to_timestamp_seconds` Converts a value to a timestamp (`YYYY-MM-DDT00:00:00.000Z`). @@ -1570,7 +1571,7 @@ to_timestamp_seconds(expression[, ..., format_n]) Can be a constant, column, or function, and any combination of arithmetic operators. - **format_n**: Optional [Chrono format] strings to use to parse the expression. Formats will be tried in the order they appear with the first successful one being returned. If none of the formats successfully parse the expression - an error will be returned. + an error will be returned. ### `from_unixtime` From 3c01f75e7b167327a917df15b2d05ed4741f0c41 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 19 Jan 2024 16:37:06 -0500 Subject: [PATCH 14/14] Fix clippy --- .../physical-expr/src/datetime_expressions.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs index eb2dcbdd3424..d21d89c19d2e 100644 --- a/datafusion/physical-expr/src/datetime_expressions.rs +++ b/datafusion/physical-expr/src/datetime_expressions.rs @@ -1430,6 +1430,7 @@ mod tests { TimestampSecondArray, }; use datafusion_common::assert_contains; + use datafusion_expr::ScalarFunctionImplementation; use super::*; @@ -2258,15 +2259,14 @@ mod tests { date_string_builder.append_value("2020-09-08T13:42:29.19085Z"); let data = date_string_builder.finish(); - let mut funcs = Vec::<( - Box Result>, - TimeUnit, - )>::new(); - funcs.push((Box::new(|a| to_timestamp(a)), TimeUnit::Nanosecond)); - funcs.push((Box::new(|a| to_timestamp_micros(a)), TimeUnit::Microsecond)); - funcs.push((Box::new(|a| to_timestamp_millis(a)), TimeUnit::Millisecond)); - funcs.push((Box::new(|a| to_timestamp_nanos(a)), TimeUnit::Nanosecond)); - funcs.push((Box::new(|a| to_timestamp_seconds(a)), TimeUnit::Second)); + + let funcs: Vec<(ScalarFunctionImplementation, TimeUnit)> = vec![ + (Arc::new(to_timestamp), TimeUnit::Nanosecond), + (Arc::new(to_timestamp_micros), TimeUnit::Microsecond), + (Arc::new(to_timestamp_millis), TimeUnit::Millisecond), + (Arc::new(to_timestamp_nanos), TimeUnit::Nanosecond), + (Arc::new(to_timestamp_seconds), TimeUnit::Second), + ]; let mut nanos_builder = TimestampNanosecondArray::builder(2); let mut millis_builder = TimestampMillisecondArray::builder(2);