Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 128 additions & 4 deletions datafusion/functions/src/datetime/date_part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@ use std::any::Any;
use std::str::FromStr;
use std::sync::Arc;

use arrow::array::{Array, ArrayRef, Float64Array, Int32Array};
use arrow::array::timezone::Tz;
use arrow::array::{Array, ArrayRef, Float64Array, Int32Array, PrimitiveBuilder};
use arrow::compute::kernels::cast_utils::IntervalUnit;
use arrow::compute::{binary, date_part, DatePart};
use arrow::datatypes::DataType::{
Date32, Date64, Duration, Interval, Time32, Time64, Timestamp,
};
use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
use arrow::datatypes::{
ArrowTimestampType, DataType, Field, FieldRef, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
};
use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc};
use datafusion_common::cast::as_primitive_array;
use datafusion_common::types::{logical_date, NativeType};
use std::ops::Add;

use datafusion_common::{
cast::{
Expand All @@ -36,7 +43,7 @@ use datafusion_common::{
as_timestamp_microsecond_array, as_timestamp_millisecond_array,
as_timestamp_nanosecond_array, as_timestamp_second_array,
},
exec_err, internal_err, not_impl_err,
exec_err, internal_datafusion_err, internal_err, not_impl_err,
types::logical_string,
utils::take_function_args,
Result, ScalarValue,
Expand All @@ -56,7 +63,7 @@ use datafusion_macros::user_doc;
argument(
name = "part",
description = r#"Part of the date to return. The following date parts are supported:

- year
- quarter (emits value in inclusive range [1, 4] based on which quartile of the year the date is in)
- month
Expand Down Expand Up @@ -173,6 +180,7 @@ impl ScalarUDFImpl for DatePartFunc {
&self,
args: datafusion_expr::ScalarFunctionArgs,
) -> Result<ColumnarValue> {
let config = &args.config_options;
let args = args.args;
let [part, array] = take_function_args(self.name(), args)?;

Expand All @@ -193,6 +201,56 @@ impl ScalarUDFImpl for DatePartFunc {
ColumnarValue::Scalar(scalar) => scalar.to_array()?,
};

let (is_timezone_aware, tz_str_opt) = match array.data_type() {
Timestamp(_, Some(tz_str)) => (true, Some(Arc::clone(tz_str))),
_ => (false, None),
};

// Adjust timestamps for extraction
let array = if is_timezone_aware {
// For timezone-aware timestamps, extract in their own timezone
let tz_str = tz_str_opt.as_ref().unwrap();
let tz = match tz_str.parse::<Tz>() {
Ok(tz) => tz,
Err(_) => return exec_err!("Invalid timezone"),
};
match array.data_type() {
Timestamp(time_unit, _) => match time_unit {
Nanosecond => {
adjust_timestamp_array::<TimestampNanosecondType>(&array, tz)?
}
Microsecond => {
adjust_timestamp_array::<TimestampMicrosecondType>(&array, tz)?
}
Millisecond => {
adjust_timestamp_array::<TimestampMillisecondType>(&array, tz)?
}
Second => adjust_timestamp_array::<TimestampSecondType>(&array, tz)?,
},
_ => array,
}
} else if let Timestamp(time_unit, None) = array.data_type() {
// For naive timestamps, interpret in session timezone
let tz = match config.execution.time_zone.parse::<Tz>() {
Ok(tz) => tz,
Err(_) => return exec_err!("Invalid timezone"),
};
match time_unit {
Nanosecond => {
adjust_timestamp_array::<TimestampNanosecondType>(&array, tz)?
}
Microsecond => {
adjust_timestamp_array::<TimestampMicrosecondType>(&array, tz)?
}
Millisecond => {
adjust_timestamp_array::<TimestampMillisecondType>(&array, tz)?
}
Second => adjust_timestamp_array::<TimestampSecondType>(&array, tz)?,
}
} else {
array
};

let part_trim = part_normalization(&part);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that timezone-adjusting the input before computing parts will also affect epoch, yielding an offset result; epoch should not depend on timezone. Consider bypassing the adjustment when the requested part is "epoch".

🤖 React with 👍 or 👎 to let us know if the comment was useful.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The Augment AI reviewer is correct that there shouldn't be adjusting of the timestamp when the user needs it as epoch seconds. The finding prevents wrong values for the epoch case.


// using IntervalUnit here means we hand off all the work of supporting plurals (like "seconds")
Expand Down Expand Up @@ -240,6 +298,72 @@ impl ScalarUDFImpl for DatePartFunc {
}
}

fn adjust_to_local_time<T: ArrowTimestampType>(ts: i64, tz: Tz) -> Result<i64> {
fn convert_timestamp<F>(ts: i64, converter: F) -> Result<DateTime<Utc>>
where
F: Fn(i64) -> MappedLocalTime<DateTime<Utc>>,
{
match converter(ts) {
MappedLocalTime::Ambiguous(earliest, latest) => exec_err!(
"Ambiguous timestamp. Do you mean {:?} or {:?}",
earliest,
latest
),
MappedLocalTime::None => exec_err!(
"The local time does not exist because there is a gap in the local time."
),
MappedLocalTime::Single(date_time) => Ok(date_time),
}
}

let date_time = match T::UNIT {
Nanosecond => Utc.timestamp_nanos(ts),
Microsecond => convert_timestamp(ts, |ts| Utc.timestamp_micros(ts))?,
Millisecond => convert_timestamp(ts, |ts| Utc.timestamp_millis_opt(ts))?,
Second => convert_timestamp(ts, |ts| Utc.timestamp_opt(ts, 0))?,
};

let offset_seconds: i64 = tz
.offset_from_utc_datetime(&date_time.naive_utc())
.fix()
.local_minus_utc() as i64;

let adjusted_date_time = date_time.add(
TimeDelta::try_seconds(offset_seconds)
.ok_or_else(|| internal_datafusion_err!("Offset seconds should be less than i64::MAX / 1_000 or greater than -i64::MAX / 1_000"))?,
);

// convert back to i64
match T::UNIT {
Nanosecond => adjusted_date_time.timestamp_nanos_opt().ok_or_else(|| {
internal_datafusion_err!(
"Failed to convert DateTime to timestamp in nanosecond. This error may occur if the date is out of range. The supported date ranges are between 1677-09-21T00:12:43.145224192 and 2262-04-11T23:47:16.854775807"
)
}),
Microsecond => Ok(adjusted_date_time.timestamp_micros()),
Millisecond => Ok(adjusted_date_time.timestamp_millis()),
Second => Ok(adjusted_date_time.timestamp()),
}
}

fn adjust_timestamp_array<T: ArrowTimestampType>(
array: &ArrayRef,
tz: Tz,
) -> Result<ArrayRef> {
let mut builder = PrimitiveBuilder::<T>::new();
let primitive_array = as_primitive_array::<T>(array)?;
for ts_opt in primitive_array.iter() {
match ts_opt {
None => builder.append_null(),
Some(ts) => {
let adjusted_ts = adjust_to_local_time::<T>(ts, tz)?;
builder.append_value(adjusted_ts);
}
}
}
Ok(Arc::new(builder.finish()))
}
Comment on lines +301 to +365
Copy link

@coderabbitai coderabbitai bot Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Refactor to eliminate code duplication.

The helper functions adjust_to_local_time and adjust_timestamp_array are duplicated across three files:

  • datafusion/functions/src/datetime/date_part.rs (this file)
  • datafusion/functions/src/datetime/to_local_time.rs (lines 295-343, 369-385)
  • datafusion/functions/src/datetime/extract.rs (lines 321-367, 369-385)

This violates the DRY principle and creates a maintenance burden. Consider extracting these functions into a shared utility module (e.g., datafusion/functions/src/datetime/utils.rs or similar) and importing them in all three locations.

// In a new shared module (e.g., datetime/timezone_utils.rs):
pub(crate) fn adjust_to_local_time<T: ArrowTimestampType>(ts: i64, tz: Tz) -> Result<i64> {
    // ... existing implementation
}

pub(crate) fn adjust_timestamp_array<T: ArrowTimestampType>(
    array: &ArrayRef,
    tz: Tz,
) -> Result<ArrayRef> {
    // ... existing implementation
}

Then import in each file:

use crate::datetime::timezone_utils::{adjust_to_local_time, adjust_timestamp_array};

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The CodeRabbit AI reviewer is correct that these helper functions could be reused instead of copied in several files. This will prevent technical dept and maintenance cost.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martin-augment, thank you for confirming this issue! You're absolutely right that extracting these helpers will prevent technical debt and reduce maintenance burden.

Since this refactoring affects multiple files in the datetime module, would you like to:

  1. Address it in this PR - I can help identify all the duplication points and suggest the refactoring structure
  2. Create a follow-up issue - Track this as a separate refactoring task to keep this PR focused on the timezone-aware functionality

If you'd like to go with option 2, I can open a GitHub issue for you with details about the duplicated functions and the suggested approach for consolidating them into a shared utility module.

What would you prefer?


fn is_epoch(part: &str) -> bool {
let part = part_normalization(part);
matches!(part.to_lowercase().as_str(), "epoch")
Expand Down
Loading
Loading