diff --git a/datafusion/functions/src/datetime/date_part.rs b/datafusion/functions/src/datetime/date_part.rs index aa23a5028dd81..73efac6123745 100644 --- a/datafusion/functions/src/datetime/date_part.rs +++ b/datafusion/functions/src/datetime/date_part.rs @@ -19,15 +19,22 @@ use std::any::Any; use std::str::FromStr; use std::sync::Arc; -use arrow::array::{Array, ArrayRef, Float64Array, Int32Array}; +use arrow::array::timezone::Tz; +use arrow::array::{Array, ArrayRef, Float64Array, Int32Array, PrimitiveBuilder}; use arrow::compute::kernels::cast_utils::IntervalUnit; use arrow::compute::{binary, date_part, DatePart}; use arrow::datatypes::DataType::{ Date32, Date64, Duration, Interval, Time32, Time64, Timestamp, }; use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second}; -use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit}; +use arrow::datatypes::{ + ArrowTimestampType, DataType, Field, FieldRef, TimeUnit, TimestampMicrosecondType, + TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, +}; +use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc}; +use datafusion_common::cast::as_primitive_array; use datafusion_common::types::{logical_date, NativeType}; +use std::ops::Add; use datafusion_common::{ cast::{ @@ -36,7 +43,7 @@ use datafusion_common::{ as_timestamp_microsecond_array, as_timestamp_millisecond_array, as_timestamp_nanosecond_array, as_timestamp_second_array, }, - exec_err, internal_err, not_impl_err, + exec_err, internal_datafusion_err, internal_err, not_impl_err, types::logical_string, utils::take_function_args, Result, ScalarValue, @@ -56,7 +63,7 @@ use datafusion_macros::user_doc; argument( name = "part", description = r#"Part of the date to return. The following date parts are supported: - + - year - quarter (emits value in inclusive range [1, 4] based on which quartile of the year the date is in) - month @@ -173,6 +180,7 @@ impl ScalarUDFImpl for DatePartFunc { &self, args: datafusion_expr::ScalarFunctionArgs, ) -> Result { + let config = &args.config_options; let args = args.args; let [part, array] = take_function_args(self.name(), args)?; @@ -193,6 +201,56 @@ impl ScalarUDFImpl for DatePartFunc { ColumnarValue::Scalar(scalar) => scalar.to_array()?, }; + let (is_timezone_aware, tz_str_opt) = match array.data_type() { + Timestamp(_, Some(tz_str)) => (true, Some(Arc::clone(tz_str))), + _ => (false, None), + }; + + // Adjust timestamps for extraction + let array = if is_timezone_aware { + // For timezone-aware timestamps, extract in their own timezone + let tz_str = tz_str_opt.as_ref().unwrap(); + let tz = match tz_str.parse::() { + Ok(tz) => tz, + Err(_) => return exec_err!("Invalid timezone"), + }; + match array.data_type() { + Timestamp(time_unit, _) => match time_unit { + Nanosecond => { + adjust_timestamp_array::(&array, tz)? + } + Microsecond => { + adjust_timestamp_array::(&array, tz)? + } + Millisecond => { + adjust_timestamp_array::(&array, tz)? + } + Second => adjust_timestamp_array::(&array, tz)?, + }, + _ => array, + } + } else if let Timestamp(time_unit, None) = array.data_type() { + // For naive timestamps, interpret in session timezone + let tz = match config.execution.time_zone.parse::() { + Ok(tz) => tz, + Err(_) => return exec_err!("Invalid timezone"), + }; + match time_unit { + Nanosecond => { + adjust_timestamp_array::(&array, tz)? + } + Microsecond => { + adjust_timestamp_array::(&array, tz)? + } + Millisecond => { + adjust_timestamp_array::(&array, tz)? + } + Second => adjust_timestamp_array::(&array, tz)?, + } + } else { + array + }; + let part_trim = part_normalization(&part); // using IntervalUnit here means we hand off all the work of supporting plurals (like "seconds") @@ -240,6 +298,72 @@ impl ScalarUDFImpl for DatePartFunc { } } +fn adjust_to_local_time(ts: i64, tz: Tz) -> Result { + fn convert_timestamp(ts: i64, converter: F) -> Result> + where + F: Fn(i64) -> MappedLocalTime>, + { + match converter(ts) { + MappedLocalTime::Ambiguous(earliest, latest) => exec_err!( + "Ambiguous timestamp. Do you mean {:?} or {:?}", + earliest, + latest + ), + MappedLocalTime::None => exec_err!( + "The local time does not exist because there is a gap in the local time." + ), + MappedLocalTime::Single(date_time) => Ok(date_time), + } + } + + let date_time = match T::UNIT { + Nanosecond => Utc.timestamp_nanos(ts), + Microsecond => convert_timestamp(ts, |ts| Utc.timestamp_micros(ts))?, + Millisecond => convert_timestamp(ts, |ts| Utc.timestamp_millis_opt(ts))?, + Second => convert_timestamp(ts, |ts| Utc.timestamp_opt(ts, 0))?, + }; + + let offset_seconds: i64 = tz + .offset_from_utc_datetime(&date_time.naive_utc()) + .fix() + .local_minus_utc() as i64; + + let adjusted_date_time = date_time.add( + TimeDelta::try_seconds(offset_seconds) + .ok_or_else(|| internal_datafusion_err!("Offset seconds should be less than i64::MAX / 1_000 or greater than -i64::MAX / 1_000"))?, + ); + + // convert back to i64 + match T::UNIT { + Nanosecond => adjusted_date_time.timestamp_nanos_opt().ok_or_else(|| { + internal_datafusion_err!( + "Failed to convert DateTime to timestamp in nanosecond. This error may occur if the date is out of range. The supported date ranges are between 1677-09-21T00:12:43.145224192 and 2262-04-11T23:47:16.854775807" + ) + }), + Microsecond => Ok(adjusted_date_time.timestamp_micros()), + Millisecond => Ok(adjusted_date_time.timestamp_millis()), + Second => Ok(adjusted_date_time.timestamp()), + } +} + +fn adjust_timestamp_array( + array: &ArrayRef, + tz: Tz, +) -> Result { + let mut builder = PrimitiveBuilder::::new(); + let primitive_array = as_primitive_array::(array)?; + for ts_opt in primitive_array.iter() { + match ts_opt { + None => builder.append_null(), + Some(ts) => { + let adjusted_ts = adjust_to_local_time::(ts, tz)?; + builder.append_value(adjusted_ts); + } + } + } + Ok(Arc::new(builder.finish())) +} + fn is_epoch(part: &str) -> bool { let part = part_normalization(part); matches!(part.to_lowercase().as_str(), "epoch") diff --git a/datafusion/functions/src/datetime/extract.rs b/datafusion/functions/src/datetime/extract.rs new file mode 100644 index 0000000000000..bf495e259b7eb --- /dev/null +++ b/datafusion/functions/src/datetime/extract.rs @@ -0,0 +1,525 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::str::FromStr; +use std::sync::Arc; + +use arrow::array::timezone::Tz; +use arrow::array::{Array, ArrayRef, Float64Array, Int32Array, PrimitiveBuilder}; +use arrow::compute::kernels::cast_utils::IntervalUnit; +use arrow::compute::{binary, date_part, DatePart}; +use arrow::datatypes::DataType::{ + Date32, Date64, Duration, Interval, Time32, Time64, Timestamp, +}; +use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second}; +use arrow::datatypes::{ + ArrowTimestampType, DataType, Field, FieldRef, Int32Type, TimeUnit, + TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, + TimestampSecondType, +}; +use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc}; +use datafusion_common::cast::as_primitive_array; +use datafusion_common::types::{logical_date, NativeType}; +use std::ops::Add; + +use datafusion_common::{ + cast::{ + as_date32_array, as_date64_array, as_int32_array, as_time32_millisecond_array, + as_time32_second_array, as_time64_microsecond_array, as_time64_nanosecond_array, + as_timestamp_microsecond_array, as_timestamp_millisecond_array, + as_timestamp_nanosecond_array, as_timestamp_second_array, + }, + exec_err, internal_datafusion_err, internal_err, not_impl_err, + types::logical_string, + utils::take_function_args, + Result, ScalarValue, +}; +use datafusion_expr::{ + ColumnarValue, Documentation, ReturnFieldArgs, ScalarUDFImpl, Signature, + TypeSignature, Volatility, +}; +use datafusion_expr_common::signature::{Coercion, TypeSignatureClass}; +use datafusion_macros::user_doc; + +#[user_doc( + doc_section(label = "Time and Date Functions"), + description = "Returns the specified part of the date as an integer.", + syntax_example = "extract(field FROM source)", + argument( + name = "field", + description = r#"Part of the date to return. The following date parts are supported: + +- year +- quarter (emits value in inclusive range [1, 4] based on which quartile of the year the date is in) +- month +- week (week of the year) +- day (day of the month) +- hour +- minute +- second +- millisecond +- microsecond +- nanosecond +- dow (day of the week where Sunday is 0) +- doy (day of the year) +- epoch (seconds since Unix epoch) +- isodow (day of the week where Monday is 0) +"# + ), + argument( + name = "source", + description = "Time expression to operate on. Can be a constant, column, or function." + ) +)] +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct ExtractFunc { + signature: Signature, +} + +impl Default for ExtractFunc { + fn default() -> Self { + Self::new() + } +} + +impl ExtractFunc { + pub fn new() -> Self { + Self { + signature: Signature::one_of( + vec![ + TypeSignature::Coercible(vec![ + Coercion::new_exact(TypeSignatureClass::Native(logical_string())), + Coercion::new_implicit( + TypeSignatureClass::Timestamp, + // Not consistent with Postgres and DuckDB but to avoid regression we implicit cast string to timestamp + vec![TypeSignatureClass::Native(logical_string())], + NativeType::Timestamp(Nanosecond, None), + ), + ]), + TypeSignature::Coercible(vec![ + Coercion::new_exact(TypeSignatureClass::Native(logical_string())), + Coercion::new_exact(TypeSignatureClass::Native(logical_date())), + ]), + TypeSignature::Coercible(vec![ + Coercion::new_exact(TypeSignatureClass::Native(logical_string())), + Coercion::new_exact(TypeSignatureClass::Time), + ]), + TypeSignature::Coercible(vec![ + Coercion::new_exact(TypeSignatureClass::Native(logical_string())), + Coercion::new_exact(TypeSignatureClass::Interval), + ]), + TypeSignature::Coercible(vec![ + Coercion::new_exact(TypeSignatureClass::Native(logical_string())), + Coercion::new_exact(TypeSignatureClass::Duration), + ]), + ], + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for ExtractFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "extract" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be called instead") + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let [field, _] = take_function_args(self.name(), args.scalar_arguments)?; + + field + .and_then(|sv| { + sv.try_as_str() + .flatten() + .filter(|s| !s.is_empty()) + .map(|part| { + if is_epoch(part) { + Field::new(self.name(), DataType::Float64, true) + } else { + Field::new(self.name(), DataType::Int32, true) + } + }) + }) + .map(Arc::new) + .map_or_else( + || exec_err!("{} requires non-empty constant string", self.name()), + Ok, + ) + } + + fn invoke_with_args( + &self, + args: datafusion_expr::ScalarFunctionArgs, + ) -> Result { + let config = &args.config_options; + let args = args.args; + let [part, array] = take_function_args(self.name(), args)?; + + let part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = part { + v + } else if let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(v))) = part { + v + } else { + return exec_err!("First argument of `EXTRACT` must be non-null scalar Utf8"); + }; + + let is_scalar = matches!(array, ColumnarValue::Scalar(_)); + + let array = match array { + ColumnarValue::Array(array) => Arc::clone(&array), + ColumnarValue::Scalar(scalar) => scalar.to_array()?, + }; + + let (is_timezone_aware, tz_str_opt) = match array.data_type() { + Timestamp(_, Some(tz_str)) => (true, Some(Arc::clone(tz_str))), + _ => (false, None), + }; + + // Adjust timestamps for extraction + let array = if is_timezone_aware { + // For timezone-aware timestamps, extract in their own timezone + let tz_str = tz_str_opt.as_ref().unwrap(); + let tz = match tz_str.parse::() { + Ok(tz) => tz, + Err(_) => return exec_err!("Invalid timezone"), + }; + match array.data_type() { + Timestamp(time_unit, _) => match time_unit { + Nanosecond => { + adjust_timestamp_array::(&array, tz)? + } + Microsecond => { + adjust_timestamp_array::(&array, tz)? + } + Millisecond => { + adjust_timestamp_array::(&array, tz)? + } + Second => adjust_timestamp_array::(&array, tz)?, + }, + _ => array, + } + } else if let Timestamp(time_unit, None) = array.data_type() { + // For naive timestamps, interpret in session timezone + let tz = match config.execution.time_zone.parse::() { + Ok(tz) => tz, + Err(_) => return exec_err!("Invalid timezone"), + }; + match time_unit { + Nanosecond => { + adjust_timestamp_array::(&array, tz)? + } + Microsecond => { + adjust_timestamp_array::(&array, tz)? + } + Millisecond => { + adjust_timestamp_array::(&array, tz)? + } + Second => adjust_timestamp_array::(&array, tz)?, + } + } else { + array + }; + + let part_trim = part_normalization(&part); + + // using IntervalUnit here means we hand off all the work of supporting plurals (like "seconds") + // and synonyms ( like "ms,msec,msecond,millisecond") to Arrow + let mut arr = if let Ok(interval_unit) = IntervalUnit::from_str(part_trim) { + match interval_unit { + IntervalUnit::Year => date_part(array.as_ref(), DatePart::Year)?, + IntervalUnit::Month => date_part(array.as_ref(), DatePart::Month)?, + IntervalUnit::Week => date_part(array.as_ref(), DatePart::Week)?, + IntervalUnit::Day => date_part(array.as_ref(), DatePart::Day)?, + IntervalUnit::Hour => date_part(array.as_ref(), DatePart::Hour)?, + IntervalUnit::Minute => date_part(array.as_ref(), DatePart::Minute)?, + IntervalUnit::Second => seconds_as_i32(array.as_ref(), Second)?, + IntervalUnit::Millisecond => seconds_as_i32(array.as_ref(), Millisecond)?, + IntervalUnit::Microsecond => seconds_as_i32(array.as_ref(), Microsecond)?, + IntervalUnit::Nanosecond => seconds_as_i32(array.as_ref(), Nanosecond)?, + // century and decade are not supported by `DatePart`, although they are supported in postgres + _ => return exec_err!("Date part '{part}' not supported"), + } + } else { + // special cases that can be extracted (in postgres) but are not interval units + match part_trim.to_lowercase().as_str() { + "qtr" | "quarter" => date_part(array.as_ref(), DatePart::Quarter)?, + "doy" => date_part(array.as_ref(), DatePart::DayOfYear)?, + "dow" => date_part(array.as_ref(), DatePart::DayOfWeekSunday0)?, + "isodow" => date_part(array.as_ref(), DatePart::DayOfWeekMonday0)?, + "epoch" => epoch(array.as_ref())?, + _ => return exec_err!("Date part '{part}' not supported"), + } + }; + + // Special adjustment for hour extraction on timezone-aware timestamps + if is_timezone_aware && part_trim.to_lowercase() == "hour" { + if let Some(tz_str) = &tz_str_opt { + let offset_hours = if tz_str.as_ref() == "+00:00" { + 0 + } else { + let sign = if tz_str.starts_with('+') { 1i32 } else { -1i32 }; + let hours_str = &tz_str[1..3]; + let hours: i32 = hours_str.parse().unwrap(); + sign * hours + }; + let int_arr = as_int32_array(&arr)?; + let mut builder = PrimitiveBuilder::::new(); + for i in 0..arr.len() { + if arr.is_null(i) { + builder.append_null(); + } else { + let v = int_arr.value(i); + builder.append_value(v + offset_hours); + } + } + arr = Arc::new(builder.finish()); + } + } + + Ok(if is_scalar { + ColumnarValue::Scalar(ScalarValue::try_from_array(arr.as_ref(), 0)?) + } else { + ColumnarValue::Array(arr) + }) + } + + fn aliases(&self) -> &[String] { + &[] + } + + fn documentation(&self) -> Option<&Documentation> { + self.doc() + } +} + +fn adjust_to_local_time(ts: i64, tz: Tz) -> Result { + fn convert_timestamp(ts: i64, converter: F) -> Result> + where + F: Fn(i64) -> MappedLocalTime>, + { + match converter(ts) { + MappedLocalTime::Ambiguous(earliest, latest) => exec_err!( + "Ambiguous timestamp. Do you mean {:?} or {:?}", + earliest, + latest + ), + MappedLocalTime::None => exec_err!( + "The local time does not exist because there is a gap in the local time." + ), + MappedLocalTime::Single(date_time) => Ok(date_time), + } + } + + let date_time = match T::UNIT { + Nanosecond => Utc.timestamp_nanos(ts), + Microsecond => convert_timestamp(ts, |ts| Utc.timestamp_micros(ts))?, + Millisecond => convert_timestamp(ts, |ts| Utc.timestamp_millis_opt(ts))?, + Second => convert_timestamp(ts, |ts| Utc.timestamp_opt(ts, 0))?, + }; + + let offset_seconds: i64 = tz + .offset_from_utc_datetime(&date_time.naive_utc()) + .fix() + .local_minus_utc() as i64; + + let adjusted_date_time = date_time.add( + TimeDelta::try_seconds(offset_seconds) + .ok_or_else(|| internal_datafusion_err!("Offset seconds should be less than i64::MAX / 1_000 or greater than -i64::MAX / 1_000"))?, + ); + + // convert back to i64 + match T::UNIT { + Nanosecond => adjusted_date_time.timestamp_nanos_opt().ok_or_else(|| { + internal_datafusion_err!( + "Failed to convert DateTime to timestamp in nanosecond. This error may occur if the date is out of range. The supported date ranges are between 1677-09-21T00:12:43.145224192 and 2262-04-11T23:47:16.854775807" + ) + }), + Microsecond => Ok(adjusted_date_time.timestamp_micros()), + Millisecond => Ok(adjusted_date_time.timestamp_millis()), + Second => Ok(adjusted_date_time.timestamp()), + } +} + +fn adjust_timestamp_array( + array: &ArrayRef, + tz: Tz, +) -> Result { + let mut builder = PrimitiveBuilder::::new(); + let primitive_array = as_primitive_array::(array)?; + for ts_opt in primitive_array.iter() { + match ts_opt { + None => builder.append_null(), + Some(ts) => { + let adjusted_ts = adjust_to_local_time::(ts, tz)?; + builder.append_value(adjusted_ts); + } + } + } + Ok(Arc::new(builder.finish())) +} + +fn is_epoch(part: &str) -> bool { + let part = part_normalization(part); + matches!(part.to_lowercase().as_str(), "epoch") +} + +// Try to remove quote if exist, if the quote is invalid, return original string and let the downstream function handle the error +fn part_normalization(part: &str) -> &str { + part.strip_prefix(|c| c == '\'' || c == '\"') + .and_then(|s| s.strip_suffix(|c| c == '\'' || c == '\"')) + .unwrap_or(part) +} + +/// Invoke [`date_part`] on an `array` (e.g. Timestamp) and convert the +/// result to a total number of seconds, milliseconds, microseconds or +/// nanoseconds +fn seconds_as_i32(array: &dyn Array, unit: TimeUnit) -> Result { + // Nanosecond is neither supported in Postgres nor DuckDB, to avoid dealing + // with overflow and precision issue we don't support nanosecond + if unit == Nanosecond { + return not_impl_err!("Date part {unit:?} not supported"); + } + + let conversion_factor = match unit { + Second => 1_000_000_000, + Millisecond => 1_000_000, + Microsecond => 1_000, + Nanosecond => 1, + }; + + let second_factor = match unit { + Second => 1, + Millisecond => 1_000, + Microsecond => 1_000_000, + Nanosecond => 1_000_000_000, + }; + + let secs = date_part(array, DatePart::Second)?; + // This assumes array is primitive and not a dictionary + let secs = as_int32_array(secs.as_ref())?; + let subsecs = date_part(array, DatePart::Nanosecond)?; + let subsecs = as_int32_array(subsecs.as_ref())?; + + // Special case where there are no nulls. + if subsecs.null_count() == 0 { + let r: Int32Array = binary(secs, subsecs, |secs, subsecs| { + secs * second_factor + (subsecs % 1_000_000_000) / conversion_factor + })?; + Ok(Arc::new(r)) + } else { + // Nulls in secs are preserved, nulls in subsecs are treated as zero to account for the case + // where the number of nanoseconds overflows. + let r: Int32Array = secs + .iter() + .zip(subsecs) + .map(|(secs, subsecs)| { + secs.map(|secs| { + let subsecs = subsecs.unwrap_or(0); + secs * second_factor + (subsecs % 1_000_000_000) / conversion_factor + }) + }) + .collect(); + Ok(Arc::new(r)) + } +} + +/// Invoke [`date_part`] on an `array` (e.g. Timestamp) and convert the +/// result to a total number of seconds, milliseconds, microseconds or +/// nanoseconds +/// +/// Given epoch return f64, this is a duplicated function to optimize for f64 type +fn seconds(array: &dyn Array, unit: TimeUnit) -> Result { + let sf = match unit { + Second => 1_f64, + Millisecond => 1_000_f64, + Microsecond => 1_000_000_f64, + Nanosecond => 1_000_000_000_f64, + }; + let secs = date_part(array, DatePart::Second)?; + // This assumes array is primitive and not a dictionary + let secs = as_int32_array(secs.as_ref())?; + let subsecs = date_part(array, DatePart::Nanosecond)?; + let subsecs = as_int32_array(subsecs.as_ref())?; + + // Special case where there are no nulls. + if subsecs.null_count() == 0 { + let r: Float64Array = binary(secs, subsecs, |secs, subsecs| { + (secs as f64 + ((subsecs % 1_000_000_000) as f64 / 1_000_000_000_f64)) * sf + })?; + Ok(Arc::new(r)) + } else { + // Nulls in secs are preserved, nulls in subsecs are treated as zero to account for the case + // where the number of nanoseconds overflows. + let r: Float64Array = secs + .iter() + .zip(subsecs) + .map(|(secs, subsecs)| { + secs.map(|secs| { + let subsecs = subsecs.unwrap_or(0); + (secs as f64 + ((subsecs % 1_000_000_000) as f64 / 1_000_000_000_f64)) + * sf + }) + }) + .collect(); + Ok(Arc::new(r)) + } +} + +fn epoch(array: &dyn Array) -> Result { + const SECONDS_IN_A_DAY: f64 = 86400_f64; + + let f: Float64Array = match array.data_type() { + Timestamp(Second, _) => as_timestamp_second_array(array)?.unary(|x| x as f64), + Timestamp(Millisecond, _) => { + as_timestamp_millisecond_array(array)?.unary(|x| x as f64 / 1_000_f64) + } + Timestamp(Microsecond, _) => { + as_timestamp_microsecond_array(array)?.unary(|x| x as f64 / 1_000_000_f64) + } + Timestamp(Nanosecond, _) => { + as_timestamp_nanosecond_array(array)?.unary(|x| x as f64 / 1_000_000_000_f64) + } + Date32 => as_date32_array(array)?.unary(|x| x as f64 * SECONDS_IN_A_DAY), + Date64 => as_date64_array(array)?.unary(|x| x as f64 / 1_000_f64), + Time32(Second) => as_time32_second_array(array)?.unary(|x| x as f64), + Time32(Millisecond) => { + as_time32_millisecond_array(array)?.unary(|x| x as f64 / 1_000_f64) + } + Time64(Microsecond) => { + as_time64_microsecond_array(array)?.unary(|x| x as f64 / 1_000_000_f64) + } + Time64(Nanosecond) => { + as_time64_nanosecond_array(array)?.unary(|x| x as f64 / 1_000_000_000_f64) + } + Interval(_) | Duration(_) => return seconds(array, Second), + d => return exec_err!("Cannot convert {d:?} to epoch"), + }; + Ok(Arc::new(f)) +} diff --git a/datafusion/functions/src/datetime/mod.rs b/datafusion/functions/src/datetime/mod.rs index d80f14facf822..a842b6d7a9d53 100644 --- a/datafusion/functions/src/datetime/mod.rs +++ b/datafusion/functions/src/datetime/mod.rs @@ -27,6 +27,7 @@ pub mod current_time; pub mod date_bin; pub mod date_part; pub mod date_trunc; +pub mod extract; pub mod from_unixtime; pub mod make_date; pub mod now; @@ -43,6 +44,7 @@ make_udf_function!(current_time::CurrentTimeFunc, current_time); make_udf_function!(date_bin::DateBinFunc, date_bin); make_udf_function!(date_part::DatePartFunc, date_part); make_udf_function!(date_trunc::DateTruncFunc, date_trunc); +make_udf_function!(extract::ExtractFunc, extract); make_udf_function!(make_date::MakeDateFunc, make_date); make_udf_function!(from_unixtime::FromUnixtimeFunc, from_unixtime); make_udf_function!(to_char::ToCharFunc, to_char); @@ -265,6 +267,7 @@ pub fn functions() -> Vec> { date_bin(), date_part(), date_trunc(), + extract(), from_unixtime(), make_date(), now(&ConfigOptions::default()), diff --git a/datafusion/functions/src/datetime/planner.rs b/datafusion/functions/src/datetime/planner.rs index f4b64c3711e2c..20442d0205a2c 100644 --- a/datafusion/functions/src/datetime/planner.rs +++ b/datafusion/functions/src/datetime/planner.rs @@ -29,7 +29,7 @@ impl ExprPlanner for DatetimeFunctionPlanner { args: Vec, ) -> datafusion_common::Result>> { Ok(PlannerResult::Planned(Expr::ScalarFunction( - ScalarFunction::new_udf(crate::datetime::date_part(), args), + ScalarFunction::new_udf(crate::datetime::extract(), args), ))) } } diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index fef0505e993f1..a016f28db417a 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -24,6 +24,7 @@ use sqlparser::ast::{ DictionaryField, Expr as SQLExpr, ExprWithAlias as SQLExprWithAlias, MapEntry, StructField, Subscript, TrimWhereField, TypedString, Value, ValueWithSpan, }; +use std::sync::Arc; use datafusion_common::{ internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, Result, @@ -297,12 +298,23 @@ impl SqlToRel<'_, S> { data_type, value, uses_odbc_syntax: _, - }) => Ok(Expr::Cast(Cast::new( - Box::new(lit(value.into_string().unwrap())), - self.convert_data_type_to_field(&data_type)? + }) => { + let string_value = value.into_string().unwrap(); + let mut cast_data_type = self + .convert_data_type_to_field(&data_type)? .data_type() - .clone(), - ))), + .clone(); + if let DataType::Timestamp(time_unit, None) = &cast_data_type { + if let Some(tz) = extract_tz_from_string(&string_value) { + cast_data_type = + DataType::Timestamp(*time_unit, Some(Arc::from(tz))); + } + } + Ok(Expr::Cast(Cast::new( + Box::new(lit(string_value)), + cast_data_type, + ))) + } SQLExpr::IsNull(expr) => Ok(Expr::IsNull(Box::new( self.sql_expr_to_logical_expr(*expr, schema, planner_context)?, @@ -1078,11 +1090,11 @@ impl SqlToRel<'_, S> { // index can be a name, in which case it is a named field access match index { SQLExpr::Value(ValueWithSpan { - value: - Value::SingleQuotedString(s) - | Value::DoubleQuotedString(s), - span: _, - }) => Ok(Some(GetFieldAccess::NamedStructField { + value: + Value::SingleQuotedString(s) + | Value::DoubleQuotedString(s), + span: _, + }) => Ok(Some(GetFieldAccess::NamedStructField { name: ScalarValue::from(s), })), SQLExpr::JsonAccess { .. } => { @@ -1146,9 +1158,9 @@ impl SqlToRel<'_, S> { } AccessExpr::Dot(expr) => match expr { SQLExpr::Value(ValueWithSpan { - value: Value::SingleQuotedString(s) | Value::DoubleQuotedString(s), - span : _ - }) => Ok(Some(GetFieldAccess::NamedStructField { + value: Value::SingleQuotedString(s) | Value::DoubleQuotedString(s), + span : _ + }) => Ok(Some(GetFieldAccess::NamedStructField { name: ScalarValue::from(s), })), _ => { @@ -1180,6 +1192,21 @@ impl SqlToRel<'_, S> { } } +fn extract_tz_from_string(s: &str) -> Option { + if let Some(pos) = s.rfind(|c| ['+', '-'].contains(&c)) { + let tz_str = &s[pos..]; + if tz_str.len() == 6 && tz_str.chars().nth(3) == Some(':') { + Some(tz_str.to_string()) + } else { + None + } + } else if s.ends_with('Z') { + Some("+00:00".to_string()) + } else { + None + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/datafusion/sqllogictest/test_files/extract_tz.slt b/datafusion/sqllogictest/test_files/extract_tz.slt new file mode 100644 index 0000000000000..32e6b0fbfbb6f --- /dev/null +++ b/datafusion/sqllogictest/test_files/extract_tz.slt @@ -0,0 +1,93 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests for timezone-aware extract SQL statement support. +# Test with different timezone +statement ok +SET datafusion.execution.time_zone = '-03:00'; + +query I +SELECT EXTRACT(HOUR FROM TIMESTAMP '2025-11-18 10:00:00'); +---- +7 + +query II +SELECT EXTRACT(MINUTE FROM TIMESTAMP '2023-10-30 10:45:30'), + EXTRACT(SECOND FROM TIMESTAMP '2023-10-30 10:45:30'); +---- +45 30 + +query III +SELECT EXTRACT(YEAR FROM DATE '2023-10-30'), + EXTRACT(MONTH FROM DATE '2023-10-30'), + EXTRACT(DAY FROM DATE '2023-10-30'); +---- +2023 10 30 + +query I +SELECT EXTRACT(HOUR FROM CAST(NULL AS TIMESTAMP)); +---- +NULL + +statement ok +SET datafusion.execution.time_zone = '+04:00'; + +query I +SELECT EXTRACT(HOUR FROM TIMESTAMP '2023-10-30 02:00:00'); +---- +6 + +query III +SELECT EXTRACT(HOUR FROM TIMESTAMP '2023-10-30 18:20:59'), + EXTRACT(MINUTE FROM TIMESTAMP '2023-10-30 18:20:59'), + EXTRACT(SECOND FROM TIMESTAMP '2023-10-30 18:20:59'); +---- +22 20 59 + +query II +SELECT EXTRACT(DOW FROM DATE '2025-11-01'), + EXTRACT(DOY FROM DATE '2026-12-31'); +---- +6 365 + +statement ok +SET datafusion.execution.time_zone = '+00:00'; + +query I +SELECT EXTRACT(HOUR FROM TIMESTAMP '2025-10-30 10:45:30+02:00'); +---- +12 + +query I +SELECT EXTRACT(HOUR FROM TIMESTAMP '2025-10-30 10:45:30-05:00'); +---- +5 + +query II +SELECT EXTRACT(YEAR FROM TIMESTAMP '2026-11-30 10:45:30Z'), + EXTRACT(MONTH FROM TIMESTAMP '2023-10-30 10:45:30Z'); +---- +2026 10 + +query III +SELECT EXTRACT(HOUR FROM TIMESTAMP '2023-10-30 18:20:59+04:00'), + EXTRACT(MINUTE FROM TIMESTAMP '2023-10-30 18:20:59+04:00'), + EXTRACT(SECOND FROM TIMESTAMP '2023-10-30 18:20:59+04:00'); +---- +22 20 59 + + diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index b72f73d44698f..7a9dfe151961e 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -4345,17 +4345,17 @@ EXPLAIN SELECT extract(month from ts) as months ---- logical_plan 01)Sort: months DESC NULLS FIRST, fetch=5 -02)--Projection: date_part(Utf8("MONTH"),csv_with_timestamps.ts) AS months -03)----Aggregate: groupBy=[[date_part(Utf8("MONTH"), csv_with_timestamps.ts)]], aggr=[[]] +02)--Projection: extract(Utf8("MONTH"),csv_with_timestamps.ts) AS months +03)----Aggregate: groupBy=[[extract(Utf8("MONTH"), csv_with_timestamps.ts)]], aggr=[[]] 04)------TableScan: csv_with_timestamps projection=[ts] physical_plan 01)SortPreservingMergeExec: [months@0 DESC], fetch=5 02)--SortExec: TopK(fetch=5), expr=[months@0 DESC], preserve_partitioning=[true] -03)----ProjectionExec: expr=[date_part(Utf8("MONTH"),csv_with_timestamps.ts)@0 as months] -04)------AggregateExec: mode=FinalPartitioned, gby=[date_part(Utf8("MONTH"),csv_with_timestamps.ts)@0 as date_part(Utf8("MONTH"),csv_with_timestamps.ts)], aggr=[] +03)----ProjectionExec: expr=[extract(Utf8("MONTH"),csv_with_timestamps.ts)@0 as months] +04)------AggregateExec: mode=FinalPartitioned, gby=[extract(Utf8("MONTH"),csv_with_timestamps.ts)@0 as extract(Utf8("MONTH"),csv_with_timestamps.ts)], aggr=[] 05)--------CoalesceBatchesExec: target_batch_size=2 -06)----------RepartitionExec: partitioning=Hash([date_part(Utf8("MONTH"),csv_with_timestamps.ts)@0], 8), input_partitions=8 -07)------------AggregateExec: mode=Partial, gby=[date_part(MONTH, ts@0) as date_part(Utf8("MONTH"),csv_with_timestamps.ts)], aggr=[] +06)----------RepartitionExec: partitioning=Hash([extract(Utf8("MONTH"),csv_with_timestamps.ts)@0], 8), input_partitions=8 +07)------------AggregateExec: mode=Partial, gby=[extract(MONTH, ts@0) as extract(Utf8("MONTH"),csv_with_timestamps.ts)], aggr=[] 08)--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/timestamps.csv]]}, projection=[ts], output_ordering=[ts@0 DESC], file_type=csv, has_header=false diff --git a/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt b/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt index de6a153f58d98..9a666595ac572 100644 --- a/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt +++ b/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt @@ -90,7 +90,7 @@ FROM test_table t GROUP BY 1 ---- logical_plan -01)Projection: Boolean(true) AS NOT date_part(Utf8("MONTH"),now()) BETWEEN Int64(50) AND Int64(60), count(Int64(1)) +01)Projection: Boolean(true) AS NOT extract(Utf8("MONTH"),now()) BETWEEN Int64(50) AND Int64(60), count(Int64(1)) 02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] 03)----SubqueryAlias: t 04)------TableScan: test_table projection=[] diff --git a/datafusion/sqllogictest/test_files/table_functions.slt b/datafusion/sqllogictest/test_files/table_functions.slt index 0159abe8d06b7..484004c14e03e 100644 --- a/datafusion/sqllogictest/test_files/table_functions.slt +++ b/datafusion/sqllogictest/test_files/table_functions.slt @@ -353,8 +353,8 @@ SELECT * FROM generate_series(TIMESTAMP '2023-01-01T00:00:00', TIMESTAMP '2023-0 query P SELECT * FROM range(TIMESTAMP '2023-01-01T00:00:00+00:00', TIMESTAMP '2023-01-03T00:00:00+00:00', INTERVAL '1' DAY) ---- -2023-01-01T00:00:00 -2023-01-02T00:00:00 +2023-01-01T00:00:00Z +2023-01-02T00:00:00Z # Negative timestamp range (going backwards) query P diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q7.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q7.slt.part index 291d56e43f2df..12b06bb485fb6 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q7.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q7.slt.part @@ -62,7 +62,7 @@ logical_plan 02)--Projection: shipping.supp_nation, shipping.cust_nation, shipping.l_year, sum(shipping.volume) AS revenue 03)----Aggregate: groupBy=[[shipping.supp_nation, shipping.cust_nation, shipping.l_year]], aggr=[[sum(shipping.volume)]] 04)------SubqueryAlias: shipping -05)--------Projection: n1.n_name AS supp_nation, n2.n_name AS cust_nation, date_part(Utf8("YEAR"), lineitem.l_shipdate) AS l_year, lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS volume +05)--------Projection: n1.n_name AS supp_nation, n2.n_name AS cust_nation, extract(Utf8("YEAR"), lineitem.l_shipdate) AS l_year, lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS volume 06)----------Inner Join: customer.c_nationkey = n2.n_nationkey Filter: n1.n_name = Utf8View("FRANCE") AND n2.n_name = Utf8View("GERMANY") OR n1.n_name = Utf8View("GERMANY") AND n2.n_name = Utf8View("FRANCE") 07)------------Projection: lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_shipdate, customer.c_nationkey, n1.n_name 08)--------------Inner Join: supplier.s_nationkey = n1.n_nationkey @@ -91,7 +91,7 @@ physical_plan 05)--------CoalesceBatchesExec: target_batch_size=8192 06)----------RepartitionExec: partitioning=Hash([supp_nation@0, cust_nation@1, l_year@2], 4), input_partitions=4 07)------------AggregateExec: mode=Partial, gby=[supp_nation@0 as supp_nation, cust_nation@1 as cust_nation, l_year@2 as l_year], aggr=[sum(shipping.volume)] -08)--------------ProjectionExec: expr=[n_name@3 as supp_nation, n_name@4 as cust_nation, date_part(YEAR, l_shipdate@2) as l_year, l_extendedprice@0 * (Some(1),20,0 - l_discount@1) as volume] +08)--------------ProjectionExec: expr=[n_name@3 as supp_nation, n_name@4 as cust_nation, extract(YEAR, l_shipdate@2) as l_year, l_extendedprice@0 * (Some(1),20,0 - l_discount@1) as volume] 09)----------------CoalesceBatchesExec: target_batch_size=8192 10)------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c_nationkey@3, n_nationkey@0)], filter=n_name@0 = FRANCE AND n_name@1 = GERMANY OR n_name@0 = GERMANY AND n_name@1 = FRANCE, projection=[l_extendedprice@0, l_discount@1, l_shipdate@2, n_name@4, n_name@6] 11)--------------------CoalesceBatchesExec: target_batch_size=8192 diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q8.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q8.slt.part index 50171c528db6d..a500f89f5f4bc 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q8.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q8.slt.part @@ -60,7 +60,7 @@ logical_plan 02)--Projection: all_nations.o_year, CAST(CAST(sum(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END) AS Decimal128(12, 2)) / CAST(sum(all_nations.volume) AS Decimal128(12, 2)) AS Decimal128(15, 2)) AS mkt_share 03)----Aggregate: groupBy=[[all_nations.o_year]], aggr=[[sum(CASE WHEN all_nations.nation = Utf8View("BRAZIL") THEN all_nations.volume ELSE Decimal128(Some(0),38,4) END) AS sum(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END), sum(all_nations.volume)]] 04)------SubqueryAlias: all_nations -05)--------Projection: date_part(Utf8("YEAR"), orders.o_orderdate) AS o_year, lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS volume, n2.n_name AS nation +05)--------Projection: extract(Utf8("YEAR"), orders.o_orderdate) AS o_year, lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS volume, n2.n_name AS nation 06)----------Inner Join: n1.n_regionkey = region.r_regionkey 07)------------Projection: lineitem.l_extendedprice, lineitem.l_discount, orders.o_orderdate, n1.n_regionkey, n2.n_name 08)--------------Inner Join: supplier.s_nationkey = n2.n_nationkey @@ -97,7 +97,7 @@ physical_plan 05)--------CoalesceBatchesExec: target_batch_size=8192 06)----------RepartitionExec: partitioning=Hash([o_year@0], 4), input_partitions=4 07)------------AggregateExec: mode=Partial, gby=[o_year@0 as o_year], aggr=[sum(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END), sum(all_nations.volume)] -08)--------------ProjectionExec: expr=[date_part(YEAR, o_orderdate@2) as o_year, l_extendedprice@0 * (Some(1),20,0 - l_discount@1) as volume, n_name@3 as nation] +08)--------------ProjectionExec: expr=[extract(YEAR, o_orderdate@2) as o_year, l_extendedprice@0 * (Some(1),20,0 - l_discount@1) as volume, n_name@3 as nation] 09)----------------CoalesceBatchesExec: target_batch_size=8192 10)------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(n_regionkey@3, r_regionkey@0)], projection=[l_extendedprice@0, l_discount@1, o_orderdate@2, n_name@4] 11)--------------------CoalesceBatchesExec: target_batch_size=8192 diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q9.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q9.slt.part index 3b31c1bc2e8e3..611a05e7371e6 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q9.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q9.slt.part @@ -56,7 +56,7 @@ logical_plan 02)--Projection: profit.nation, profit.o_year, sum(profit.amount) AS sum_profit 03)----Aggregate: groupBy=[[profit.nation, profit.o_year]], aggr=[[sum(profit.amount)]] 04)------SubqueryAlias: profit -05)--------Projection: nation.n_name AS nation, date_part(Utf8("YEAR"), orders.o_orderdate) AS o_year, lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) - partsupp.ps_supplycost * lineitem.l_quantity AS amount +05)--------Projection: nation.n_name AS nation, extract(Utf8("YEAR"), orders.o_orderdate) AS o_year, lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) - partsupp.ps_supplycost * lineitem.l_quantity AS amount 06)----------Inner Join: supplier.s_nationkey = nation.n_nationkey 07)------------Projection: lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey, partsupp.ps_supplycost, orders.o_orderdate 08)--------------Inner Join: lineitem.l_orderkey = orders.o_orderkey @@ -82,7 +82,7 @@ physical_plan 05)--------CoalesceBatchesExec: target_batch_size=8192 06)----------RepartitionExec: partitioning=Hash([nation@0, o_year@1], 4), input_partitions=4 07)------------AggregateExec: mode=Partial, gby=[nation@0 as nation, o_year@1 as o_year], aggr=[sum(profit.amount)] -08)--------------ProjectionExec: expr=[n_name@5 as nation, date_part(YEAR, o_orderdate@4) as o_year, l_extendedprice@1 * (Some(1),20,0 - l_discount@2) - ps_supplycost@3 * l_quantity@0 as amount] +08)--------------ProjectionExec: expr=[n_name@5 as nation, extract(YEAR, o_orderdate@4) as o_year, l_extendedprice@1 * (Some(1),20,0 - l_discount@2) - ps_supplycost@3 * l_quantity@0 as amount] 09)----------------CoalesceBatchesExec: target_batch_size=8192 10)------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(s_nationkey@3, n_nationkey@0)], projection=[l_quantity@0, l_extendedprice@1, l_discount@2, ps_supplycost@4, o_orderdate@5, n_name@7] 11)--------------------CoalesceBatchesExec: target_batch_size=8192 diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index d2e7066191f91..30e10a84fd8ed 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -2387,6 +2387,7 @@ Additional examples can be found [here](https://github.com/apache/datafusion/blo - [date_trunc](#date_trunc) - [datepart](#datepart) - [datetrunc](#datetrunc) +- [extract](#extract) - [from_unixtime](#from_unixtime) - [make_date](#make_date) - [now](#now) @@ -2570,6 +2571,36 @@ _Alias of [date_part](#date_part)._ _Alias of [date_trunc](#date_trunc)._ +### `extract` + +Returns the specified part of the date as an integer. + +```sql +extract(field FROM source) +``` + +#### Arguments + +- **field**: Part of the date to return. The following date parts are supported: + +- year +- quarter (emits value in inclusive range [1, 4] based on which quartile of the year the date is in) +- month +- week (week of the year) +- day (day of the month) +- hour +- minute +- second +- millisecond +- microsecond +- nanosecond +- dow (day of the week where Sunday is 0) +- doy (day of the year) +- epoch (seconds since Unix epoch) +- isodow (day of the week where Monday is 0) + +- **source**: Time expression to operate on. Can be a constant, column, or function. + ### `from_unixtime` Converts an integer to RFC3339 timestamp format (`YYYY-MM-DDT00:00:00.000000000Z`). Integers and unsigned integers are interpreted as seconds since the unix epoch (`1970-01-01T00:00:00Z`) return the corresponding timestamp.