diff --git a/datafusion-examples/examples/to_unixtime.rs b/datafusion-examples/examples/to_unixtime.rs new file mode 100644 index 0000000000000..2e9a64ac46999 --- /dev/null +++ b/datafusion-examples/examples/to_unixtime.rs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion::arrow::array::StringArray; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::error::Result; +use datafusion::prelude::*; + +/// This example demonstrates how to use the to_date series +/// of functions in the DataFrame API as well as via sql. +#[tokio::main] +async fn main() -> Result<()> { + // define a schema. + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)])); + + // define data. + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(StringArray::from(vec![ + "2020-09-08T13:42:29Z", + "2020-09-08T13:42:29.190855-05:00", + "2020-08-09 12:13:29", + "2020-01-02", + ]))], + )?; + + // declare a new context. In spark API, this corresponds to a new spark SQLsession + let ctx = SessionContext::new(); + + // declare a table in memory. In spark API, this corresponds to createDataFrame(...). + ctx.register_batch("t", batch)?; + let df = ctx.table("t").await?; + + // use to_date function to convert col 'a' to timestamp type using the default parsing + let df = df.with_column("a", to_unixtime(vec![col("a")]))?; + + let df = df.select_columns(&["a"])?; + + // print the results + df.show().await?; + + Ok(()) +} diff --git a/datafusion/functions/src/datetime/mod.rs b/datafusion/functions/src/datetime/mod.rs index 233e8b2cdbb4a..6c9ea1944b1bf 100644 --- a/datafusion/functions/src/datetime/mod.rs +++ b/datafusion/functions/src/datetime/mod.rs @@ -24,9 +24,11 @@ use datafusion_expr::ScalarUDF; mod common; mod to_date; mod to_timestamp; +mod to_unixtime; // create UDFs make_udf_function!(to_date::ToDateFunc, TO_DATE, to_date); +make_udf_function!(to_unixtime::ToUnixtimeFunc, TO_UNIXTIME, to_unixtime); make_udf_function!(to_timestamp::ToTimestampFunc, TO_TIMESTAMP, to_timestamp); make_udf_function!( to_timestamp::ToTimestampSecondsFunc, @@ -68,7 +70,7 @@ pub mod expr_fn { /// # use datafusion_expr::col; /// # use datafusion::prelude::*; /// # use datafusion_functions::expr_fn::to_date; - /// + /// /// // define a schema. /// let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)])); /// @@ -105,6 +107,11 @@ pub mod expr_fn { super::to_date().call(args) } + #[doc = "converts a string and optional formats to a Unixtime"] + pub fn to_unixtime(args: Vec) -> Expr { + super::to_unixtime().call(args) + } + #[doc = "converts a string and optional formats to a `Timestamp(Nanoseconds, None)`"] pub fn to_timestamp(args: Vec) -> Expr { super::to_timestamp().call(args) @@ -135,6 +142,7 @@ pub mod expr_fn { pub fn functions() -> Vec> { vec![ to_date(), + to_unixtime(), to_timestamp(), to_timestamp_seconds(), to_timestamp_millis(), diff --git a/datafusion/functions/src/datetime/to_unixtime.rs b/datafusion/functions/src/datetime/to_unixtime.rs new file mode 100644 index 0000000000000..db46a334d4aa8 --- /dev/null +++ b/datafusion/functions/src/datetime/to_unixtime.rs @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; + +use arrow::datatypes::{DataType, TimeUnit}; + +use crate::datetime::common::*; +use datafusion_common::{exec_err, Result}; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; + +use super::to_timestamp::ToTimestampSecondsFunc; + +#[derive(Debug)] +pub(super) struct ToUnixtimeFunc { + signature: Signature, +} + +impl ToUnixtimeFunc { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for ToUnixtimeFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "to_unixtime" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int64) + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + if args.is_empty() { + return exec_err!("to_unixtime function requires 1 or more arguments, got 0"); + } + + // validate that any args after the first one are Utf8 + if args.len() > 1 { + if let Some(value) = validate_data_types(args, "to_unixtime") { + return value; + } + } + + match args[0].data_type() { + DataType::Int32 | DataType::Int64 | DataType::Null | DataType::Float64 => { + args[0].cast_to(&DataType::Int64, None) + } + DataType::Date64 | DataType::Date32 | DataType::Timestamp(_, None) => args[0] + .cast_to(&DataType::Timestamp(TimeUnit::Second, None), None)? + .cast_to(&DataType::Int64, None), + DataType::Utf8 => ToTimestampSecondsFunc::new() + .invoke(args)? + .cast_to(&DataType::Int64, None), + other => { + exec_err!("Unsupported data type {:?} for function to_unixtime", other) + } + } + } +} diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 6700877f2610e..faffe57c071a1 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1,5 +1,6 @@ /* + * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -684,6 +685,7 @@ enum ScalarFunction { /// 135 is RegexpLike ToChar = 136; /// 137 was ToDate + /// 138 was ToUnixtime } message ScalarFunctionNode { diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index c12d04f33f9e4..4c4b17f1a80bb 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2773,6 +2773,7 @@ pub enum ScalarFunction { /// / 135 is RegexpLike /// /// / 137 was ToDate + /// / 138 was ToUnixtime ToChar = 136, } impl ScalarFunction { diff --git a/datafusion/sqllogictest/test_files/timestamps.slt b/datafusion/sqllogictest/test_files/timestamps.slt index f0483aec8946b..0b8284a45a04b 100644 --- a/datafusion/sqllogictest/test_files/timestamps.slt +++ b/datafusion/sqllogictest/test_files/timestamps.slt @@ -2682,3 +2682,72 @@ SELECT to_char(null, '%d-%m-%Y'); statement ok drop table formats; + +########## +## to_unixtime tests +########## + +query I +select to_unixtime('2020-09-08T12:00:00+00:00'); +---- +1599566400 + +query I +select to_unixtime('01-14-2023 01:01:30+05:30', '%q', '%d-%m-%Y %H/%M/%S', '%+', '%m-%d-%Y %H:%M:%S%#z'); +---- +1673638290 + +query I +select to_unixtime('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y'); +---- +1684295940 + +query I +select to_unixtime(arrow_cast('2020-09-08T12:00:00+00:00', 'Date64')); +---- +1599566400 + +query I +select to_unixtime(arrow_cast('2020-09-08', 'Date32')); +---- +1599523200 + +query I +select to_unixtime(to_timestamp('2020-09-08')); +---- +1599523200 + +query I +select to_unixtime(to_timestamp_seconds('2020-09-08')); +---- +1599523200 + +query I +select to_unixtime(to_timestamp_millis('2020-09-08')); +---- +1599523200 + +query I +select to_unixtime(to_timestamp_micros('2020-09-08')); +---- +1599523200 + +query I +select to_unixtime(to_timestamp_nanos('2020-09-08')); +---- +1599523200 + +query I +select to_unixtime(arrow_cast(1599523200, 'Int32')); +---- +1599523200 + +query I +select to_unixtime(arrow_cast(1599523200, 'Int64')); +---- +1599523200 + +query I +select to_unixtime(arrow_cast(1599523200.414, 'Float64')); +---- +1599523200