Skip to content

Commit

Permalink
[FEAT] date and timestamp parsers (Eventual-Inc#2353)
Browse files Browse the repository at this point in the history
Resolves Eventual-Inc#2309
Resolves Eventual-Inc#2302

---------

Co-authored-by: Colin Ho <colin.ho99@gmail.com>
  • Loading branch information
murex971 and colin-ho authored Jun 18, 2024
1 parent f0b35e3 commit 042f483
Show file tree
Hide file tree
Showing 17 changed files with 643 additions and 4 deletions.
4 changes: 4 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,8 @@ class PyExpr:
def utf8_like(self, pattern: PyExpr) -> PyExpr: ...
def utf8_ilike(self, pattern: PyExpr) -> PyExpr: ...
def utf8_substr(self, start: PyExpr, length: PyExpr) -> PyExpr: ...
def utf8_to_date(self, format: str) -> PyExpr: ...
def utf8_to_datetime(self, format: str, timezone: str | None = None) -> PyExpr: ...
def image_decode(self, raise_error_on_failure: bool) -> PyExpr: ...
def image_encode(self, image_format: ImageFormat) -> PyExpr: ...
def image_resize(self, w: int, h: int) -> PyExpr: ...
Expand Down Expand Up @@ -1217,6 +1219,8 @@ class PySeries:
def utf8_like(self, pattern: PySeries) -> PySeries: ...
def utf8_ilike(self, pattern: PySeries) -> PySeries: ...
def utf8_substr(self, start: PySeries, length: PySeries | None = None) -> PySeries: ...
def utf8_to_date(self, format: str) -> PySeries: ...
def utf8_to_datetime(self, format: str, timezone: str | None = None) -> PySeries: ...
def is_nan(self) -> PySeries: ...
def is_inf(self) -> PySeries: ...
def not_nan(self) -> PySeries: ...
Expand Down
75 changes: 75 additions & 0 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1532,6 +1532,81 @@ def substr(self, start: int | Expression, length: int | Expression | None = None
length_expr = Expression._to_expression(length)
return Expression._from_pyexpr(self._expr.utf8_substr(start_expr._expr, length_expr._expr))

def to_date(self, format: str) -> Expression:
"""Converts a string to a date using the specified format
.. NOTE::
The format must be a valid date format string.
See: https://docs.rs/chrono/latest/chrono/format/strftime/index.html
Example:
>>> df = daft.from_pydict({"x": ["2021-01-01", "2021-01-02", None]})
>>> df = df.with_column("date", df["x"].str.to_date("%Y-%m-%d"))
>>> df.show()
╭────────────┬────────────╮
│ x ┆ date │
│ --- ┆ --- │
│ Utf8 ┆ Date │
╞════════════╪════════════╡
│ 2021-01-01 ┆ 2021-01-01 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2021-01-02 ┆ 2021-01-02 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ None ┆ None │
╰────────────┴────────────╯
Returns:
Expression: a Date expression which is parsed by given format
"""
return Expression._from_pyexpr(self._expr.utf8_to_date(format))

def to_datetime(self, format: str, timezone: str | None = None) -> Expression:
"""Converts a string to a datetime using the specified format and timezone
.. NOTE::
The format must be a valid datetime format string.
See: https://docs.rs/chrono/latest/chrono/format/strftime/index.html
Example:
>>> df = daft.from_pydict({"x": ["2021-01-01 00:00:00.123", "2021-01-02 12:30:00.456", None]})
>>> df = df.with_column("datetime", df["x"].str.to_datetime("%Y-%m-%d %H:%M:%S%.3f"))
>>> df.show()
╭─────────────────────────┬───────────────────────────────╮
│ x ┆ datetime │
│ --- ┆ --- │
│ Utf8 ┆ Timestamp(Milliseconds, None) │
╞═════════════════════════╪═══════════════════════════════╡
│ 2021-01-01 00:00:00.123 ┆ 2021-01-01 00:00:00.123 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2021-01-02 12:30:00.456 ┆ 2021-01-02 12:30:00.456 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ None ┆ None │
╰─────────────────────────┴───────────────────────────────╯
If a timezone is provided, the datetime will be parsed in that timezone
>>> df = daft.from_pydict({"x": ["2021-01-01 00:00:00.123 +0800", "2021-01-02 12:30:00.456 +0800", None]})
>>> df = df.with_column("datetime", df["x"].str.to_datetime("%Y-%m-%d %H:%M:%S%.3f %z", timezone="Asia/Shanghai"))
>>> df.show()
╭───────────────────────────────┬────────────────────────────────────────────────╮
│ x ┆ datetime │
│ --- ┆ --- │
│ Utf8 ┆ Timestamp(Milliseconds, Some("Asia/Shanghai")) │
╞═══════════════════════════════╪════════════════════════════════════════════════╡
│ 2021-01-01 00:00:00.123 +0800 ┆ 2021-01-01 00:00:00.123 CST │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2021-01-02 12:30:00.456 +0800 ┆ 2021-01-02 12:30:00.456 CST │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ None ┆ None │
╰───────────────────────────────┴────────────────────────────────────────────────╯
Returns:
Expression: a DateTime expression which is parsed by given format and timezone
"""
return Expression._from_pyexpr(self._expr.utf8_to_datetime(format, timezone))


class ExpressionListNamespace(ExpressionNamespace):
def join(self, delimiter: str | Expression) -> Expression:
Expand Down
14 changes: 14 additions & 0 deletions daft/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,20 @@ def ilike(self, pattern: Series) -> Series:
assert self._series is not None and pattern._series is not None
return Series._from_pyseries(self._series.utf8_ilike(pattern._series))

def to_date(self, format: str) -> Series:
if not isinstance(format, str):
raise ValueError(f"expected str for format but got {type(format)}")
assert self._series is not None
return Series._from_pyseries(self._series.utf8_to_date(format))

def to_datetime(self, format: str, timezone: str | None = None) -> Series:
if not isinstance(format, str):
raise ValueError(f"expected str for format but got {type(format)}")
if timezone is not None and not isinstance(timezone, str):
raise ValueError(f"expected str for timezone but got {type(timezone)}")
assert self._series is not None
return Series._from_pyseries(self._series.utf8_to_datetime(format, timezone))

def substr(self, start: Series, length: Series | None = None) -> Series:
if not isinstance(start, Series):
raise ValueError(f"expected another Series but got {type(start)}")
Expand Down
2 changes: 2 additions & 0 deletions docs/source/api_docs/expressions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ The following methods are available under the ``expr.str`` attribute.
Expression.str.like
Expression.str.ilike
Expression.str.substr
Expression.str.to_date
Expression.str.to_datetime

.. _api-float-expression-operations:

Expand Down
94 changes: 91 additions & 3 deletions src/daft-core/src/array/ops/utf8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ use std::{
use crate::{
array::{DataArray, ListArray},
datatypes::{
BooleanArray, DaftIntegerType, DaftNumericType, DaftPhysicalType, Field, Int64Array,
UInt64Array, Utf8Array,
infer_timeunit_from_format_string,
logical::{DateArray, TimestampArray},
BooleanArray, DaftIntegerType, DaftNumericType, DaftPhysicalType, Field, Int32Array,
Int64Array, TimeUnit, UInt64Array, Utf8Array,
},
DataType, Series,
};
use arrow2::array::Array;
use arrow2::{array::Array, temporal_conversions};
use chrono::Datelike;
use common_error::{DaftError, DaftResult};
use itertools::Itertools;
use num_traits::NumCast;
Expand Down Expand Up @@ -890,6 +893,91 @@ impl Utf8Array {
Ok(result)
}

pub fn to_date(&self, format: &str) -> DaftResult<DateArray> {
let len = self.len();
let self_iter = self.as_arrow().iter();

let arrow_result = self_iter
.map(|val| match val {
Some(val) => {
let date = chrono::NaiveDate::parse_from_str(val, format).map_err(|e| {
DaftError::ComputeError(format!(
"Error in to_date: failed to parse date {val} with format {format} : {e}"
))
})?;
Ok(Some(
date.num_days_from_ce() - temporal_conversions::EPOCH_DAYS_FROM_CE,
))
}
_ => Ok(None),
})
.collect::<DaftResult<arrow2::array::Int32Array>>()?;

let result = Int32Array::from((self.name(), Box::new(arrow_result)));
let result = DateArray::new(Field::new(self.name(), DataType::Date), result);
assert_eq!(result.len(), len);
Ok(result)
}

pub fn to_datetime(&self, format: &str, timezone: Option<&str>) -> DaftResult<TimestampArray> {
let len = self.len();
let self_iter = self.as_arrow().iter();
let timeunit = infer_timeunit_from_format_string(format);

let arrow_result = self_iter
.map(|val| match val {
Some(val) => {
let timestamp = match timezone {
Some(tz) => {
let datetime = chrono::DateTime::parse_from_str(val, format).map_err(|e| {
DaftError::ComputeError(format!(
"Error in to_datetime: failed to parse datetime {val} with format {format} : {e}"
))
})?;
let datetime_with_timezone = datetime.with_timezone(&tz.parse::<chrono_tz::Tz>().map_err(|e| {
DaftError::ComputeError(format!(
"Error in to_datetime: failed to parse timezone {tz} : {e}"
))
})?);
match timeunit {
TimeUnit::Seconds => datetime_with_timezone.timestamp(),
TimeUnit::Milliseconds => datetime_with_timezone.timestamp_millis(),
TimeUnit::Microseconds => datetime_with_timezone.timestamp_micros(),
TimeUnit::Nanoseconds => datetime_with_timezone.timestamp_nanos_opt().ok_or_else(|| DaftError::ComputeError(format!("Error in to_datetime: failed to get nanoseconds for {val}")))?,
}
}
None => {
let naive_datetime = chrono::NaiveDateTime::parse_from_str(val, format).map_err(|e| {
DaftError::ComputeError(format!(
"Error in to_datetime: failed to parse datetime {val} with format {format} : {e}"
))
})?;
match timeunit {
TimeUnit::Seconds => naive_datetime.and_utc().timestamp(),
TimeUnit::Milliseconds => naive_datetime.and_utc().timestamp_millis(),
TimeUnit::Microseconds => naive_datetime.and_utc().timestamp_micros(),
TimeUnit::Nanoseconds => naive_datetime.and_utc().timestamp_nanos_opt().ok_or_else(|| DaftError::ComputeError(format!("Error in to_datetime: failed to get nanoseconds for {val}")))?,
}
}
};
Ok(Some(timestamp))
}
_ => Ok(None),
})
.collect::<DaftResult<arrow2::array::Int64Array>>()?;

let result = Int64Array::from((self.name(), Box::new(arrow_result)));
let result = TimestampArray::new(
Field::new(
self.name(),
DataType::Timestamp(timeunit, timezone.map(|tz| tz.to_string())),
),
result,
);
assert_eq!(result.len(), len);
Ok(result)
}

pub fn repeat<I>(&self, n: &DataArray<I>) -> DaftResult<Utf8Array>
where
I: DaftIntegerType,
Expand Down
1 change: 1 addition & 0 deletions src/daft-core/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub use image_mode::ImageMode;
use num_traits::{Bounded, Float, FromPrimitive, Num, NumCast, ToPrimitive, Zero};
use serde::Serialize;
use std::ops::{Add, Div, Mul, Rem, Sub};
pub use time_unit::infer_timeunit_from_format_string;
pub use time_unit::TimeUnit;

pub mod logical;
Expand Down
10 changes: 10 additions & 0 deletions src/daft-core/src/datatypes/time_unit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,13 @@ impl From<&ArrowTimeUnit> for TimeUnit {
}
}
}

pub fn infer_timeunit_from_format_string(format: &str) -> TimeUnit {
if format.contains("%9f") || format.contains("%.9f") {
TimeUnit::Nanoseconds
} else if format.contains("%3f") || format.contains("%.3f") {
TimeUnit::Milliseconds
} else {
TimeUnit::Microseconds
}
}
8 changes: 8 additions & 0 deletions src/daft-core/src/python/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,14 @@ impl PySeries {
.into())
}

pub fn utf8_to_date(&self, format: &str) -> PyResult<Self> {
Ok(self.series.utf8_to_date(format)?.into())
}

pub fn utf8_to_datetime(&self, format: &str, timezone: Option<&str>) -> PyResult<Self> {
Ok(self.series.utf8_to_datetime(format, timezone)?.into())
}

pub fn is_nan(&self) -> PyResult<Self> {
Ok(self.series.is_nan()?.into())
}
Expand Down
8 changes: 8 additions & 0 deletions src/daft-core/src/series/ops/utf8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,12 @@ impl Series {
}
})
}

pub fn utf8_to_date(&self, format: &str) -> DaftResult<Series> {
self.with_utf8_array(|arr| Ok(arr.to_date(format)?.into_series()))
}

pub fn utf8_to_datetime(&self, format: &str, timezone: Option<&str>) -> DaftResult<Series> {
self.with_utf8_array(|arr| Ok(arr.to_datetime(format, timezone)?.into_series()))
}
}
27 changes: 27 additions & 0 deletions src/daft-dsl/src/functions/utf8/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ mod rstrip;
mod split;
mod startswith;
mod substr;
mod to_date;
mod to_datetime;
mod upper;

use capitalize::CapitalizeEvaluator;
Expand All @@ -46,6 +48,8 @@ use serde::{Deserialize, Serialize};
use split::SplitEvaluator;
use startswith::StartswithEvaluator;
use substr::SubstrEvaluator;
use to_date::ToDateEvaluator;
use to_datetime::ToDatetimeEvaluator;
use upper::UpperEvaluator;

use crate::{functions::utf8::match_::MatchEvaluator, Expr, ExprRef};
Expand Down Expand Up @@ -78,6 +82,8 @@ pub enum Utf8Expr {
Like,
Ilike,
Substr,
ToDate(String),
ToDatetime(String, Option<String>),
}

impl Utf8Expr {
Expand Down Expand Up @@ -109,6 +115,8 @@ impl Utf8Expr {
Like => &LikeEvaluator {},
Ilike => &IlikeEvaluator {},
Substr => &SubstrEvaluator {},
ToDate(_) => &ToDateEvaluator {},
ToDatetime(_, _) => &ToDatetimeEvaluator {},
}
}
}
Expand Down Expand Up @@ -304,3 +312,22 @@ pub fn substr(data: ExprRef, start: ExprRef, length: ExprRef) -> ExprRef {
}
.into()
}

pub fn to_date(data: ExprRef, format: &str) -> ExprRef {
Expr::Function {
func: super::FunctionExpr::Utf8(Utf8Expr::ToDate(format.to_string())),
inputs: vec![data],
}
.into()
}

pub fn to_datetime(data: ExprRef, format: &str, timezone: Option<&str>) -> ExprRef {
Expr::Function {
func: super::FunctionExpr::Utf8(Utf8Expr::ToDatetime(
format.to_string(),
timezone.map(|s| s.to_string()),
)),
inputs: vec![data],
}
.into()
}
Loading

0 comments on commit 042f483

Please sign in to comment.