Skip to content

Commit

Permalink
Move from_unixtime, now, current_date, current_time functions to data…
Browse files Browse the repository at this point in the history
…fusion-functions (#9537)

* Move date_part, date_trunc, date_bin functions to datafusion-functions

* I do not understand why the logical plan changed but updating the explain text to reflect the change. The physical plan is unchanged.

* Fix fmt

* Improvements to remove datafusion-functions dependency from sq and physical-expr

* WIP

* Fix function arguments for date_bin, date_trunc and date_part.

* WIP

* Fix projection change. Add new test date_bin monotonicity

* Move now, current_date and current_time functions to datafusion-functions

* Force exact version of chrono

* Merge updates.

* Updates for chrono changes

* Merge fixes

* Removed make_now from incorrect merge.

* fmt fix.

* Updates after correcting merge conflicts.

* Only move the tests using now() function from optimizer_integration.rs to the core/tests folder, leave the rest in place.

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
  • Loading branch information
Omega359 and mustafasrepo authored Mar 13, 2024
1 parent a43938d commit 9d47dca
Show file tree
Hide file tree
Showing 21 changed files with 614 additions and 276 deletions.
4 changes: 2 additions & 2 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

198 changes: 198 additions & 0 deletions datafusion/core/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{plan_err, Result};
use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, WindowUDF};
use datafusion_optimizer::analyzer::Analyzer;
use datafusion_optimizer::optimizer::Optimizer;
use datafusion_optimizer::{OptimizerConfig, OptimizerContext};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::ast::Statement;
use datafusion_sql::sqlparser::dialect::GenericDialect;
use datafusion_sql::sqlparser::parser::Parser;
use datafusion_sql::TableReference;

use chrono::DateTime;
use datafusion_functions::datetime;

#[cfg(test)]
#[ctor::ctor]
fn init() {
// enable logging so RUST_LOG works
let _ = env_logger::try_init();
}

#[test]
fn timestamp_nano_ts_none_predicates() -> Result<()> {
let sql = "SELECT col_int32
FROM test
WHERE col_ts_nano_none < (now() - interval '1 hour')";
let plan = test_sql(sql)?;
// a scan should have the now()... predicate folded to a single
// constant and compared to the column without a cast so it can be
// pushed down / pruned
let expected =
"Projection: test.col_int32\
\n Filter: test.col_ts_nano_none < TimestampNanosecond(1666612093000000000, None)\
\n TableScan: test projection=[col_int32, col_ts_nano_none]";
assert_eq!(expected, format!("{plan:?}"));
Ok(())
}

#[test]
fn timestamp_nano_ts_utc_predicates() {
let sql = "SELECT col_int32
FROM test
WHERE col_ts_nano_utc < (now() - interval '1 hour')";
let plan = test_sql(sql).unwrap();
// a scan should have the now()... predicate folded to a single
// constant and compared to the column without a cast so it can be
// pushed down / pruned
let expected =
"Projection: test.col_int32\n Filter: test.col_ts_nano_utc < TimestampNanosecond(1666612093000000000, Some(\"+00:00\"))\
\n TableScan: test projection=[col_int32, col_ts_nano_utc]";
assert_eq!(expected, format!("{plan:?}"));
}

fn test_sql(sql: &str) -> Result<LogicalPlan> {
// parse the SQL
let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ...
let ast: Vec<Statement> = Parser::parse_sql(&dialect, sql).unwrap();
let statement = &ast[0];

// create a logical query plan
let now_udf = datetime::functions()
.iter()
.find(|f| f.name() == "now")
.unwrap()
.to_owned();
let context_provider = MyContextProvider::default().with_udf(now_udf);
let sql_to_rel = SqlToRel::new(&context_provider);
let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap();

// hard code the return value of now()
let now_time = DateTime::from_timestamp(1666615693, 0).unwrap();
let config = OptimizerContext::new()
.with_skip_failing_rules(false)
.with_query_execution_start_time(now_time);
let analyzer = Analyzer::new();
let optimizer = Optimizer::new();
// analyze and optimize the logical plan
let plan = analyzer.execute_and_check(&plan, config.options(), |_, _| {})?;
optimizer.optimize(&plan, &config, |_, _| {})
}

#[derive(Default)]
struct MyContextProvider {
options: ConfigOptions,
udfs: HashMap<String, Arc<ScalarUDF>>,
}

impl MyContextProvider {
fn with_udf(mut self, udf: Arc<ScalarUDF>) -> Self {
self.udfs.insert(udf.name().to_string(), udf);
self
}
}

impl ContextProvider for MyContextProvider {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
let table_name = name.table();
if table_name.starts_with("test") {
let schema = Schema::new_with_metadata(
vec![
Field::new("col_int32", DataType::Int32, true),
Field::new("col_uint32", DataType::UInt32, true),
Field::new("col_utf8", DataType::Utf8, true),
Field::new("col_date32", DataType::Date32, true),
Field::new("col_date64", DataType::Date64, true),
// timestamp with no timezone
Field::new(
"col_ts_nano_none",
DataType::Timestamp(TimeUnit::Nanosecond, None),
true,
),
// timestamp with UTC timezone
Field::new(
"col_ts_nano_utc",
DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
true,
),
],
HashMap::new(),
);

Ok(Arc::new(MyTableSource {
schema: Arc::new(schema),
}))
} else {
plan_err!("table does not exist")
}
}

fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
self.udfs.get(name).cloned()
}

fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
None
}

fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
None
}

fn get_window_meta(&self, _name: &str) -> Option<Arc<WindowUDF>> {
None
}

fn options(&self) -> &ConfigOptions {
&self.options
}

fn udfs_names(&self) -> Vec<String> {
Vec::new()
}

fn udafs_names(&self) -> Vec<String> {
Vec::new()
}

fn udwfs_names(&self) -> Vec<String> {
Vec::new()
}
}

struct MyTableSource {
schema: SchemaRef,
}

impl TableSource for MyTableSource {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
34 changes: 26 additions & 8 deletions datafusion/core/tests/simplification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,6 @@ fn make_udf_add(volatility: Volatility) -> Arc<ScalarUDF> {
))
}

fn now_expr() -> Expr {
call_fn("now", vec![]).unwrap()
}

fn cast_to_int64_expr(expr: Expr) -> Expr {
Expr::Cast(Cast::new(expr.into(), DataType::Int64))
}
Expand Down Expand Up @@ -255,7 +251,7 @@ fn now_less_than_timestamp() -> Result<()> {
// cast(now() as int) < cast(to_timestamp(...) as int) + 50000_i64
let plan = LogicalPlanBuilder::from(table_scan)
.filter(
cast_to_int64_expr(now_expr())
cast_to_int64_expr(now())
.lt(cast_to_int64_expr(to_timestamp_expr(ts_string)) + lit(50000_i64)),
)?
.build()?;
Expand Down Expand Up @@ -368,14 +364,14 @@ fn test_const_evaluator_now() {
let time = chrono::Utc.timestamp_nanos(ts_nanos);
let ts_string = "2020-09-08T12:05:00+00:00";
// now() --> ts
test_evaluate_with_start_time(now_expr(), lit_timestamp_nano(ts_nanos), &time);
test_evaluate_with_start_time(now(), lit_timestamp_nano(ts_nanos), &time);

// CAST(now() as int64) + 100_i64 --> ts + 100_i64
let expr = cast_to_int64_expr(now_expr()) + lit(100_i64);
let expr = cast_to_int64_expr(now()) + lit(100_i64);
test_evaluate_with_start_time(expr, lit(ts_nanos + 100), &time);

// CAST(now() as int64) < cast(to_timestamp(...) as int64) + 50000_i64 ---> true
let expr = cast_to_int64_expr(now_expr())
let expr = cast_to_int64_expr(now())
.lt(cast_to_int64_expr(to_timestamp_expr(ts_string)) + lit(50000i64));
test_evaluate_with_start_time(expr, lit(true), &time);
}
Expand Down Expand Up @@ -413,3 +409,25 @@ fn test_evaluator_udfs() {
));
test_evaluate(expr, expected_expr);
}

#[test]
fn multiple_now() -> Result<()> {
let table_scan = test_table_scan();
let time = Utc::now();
let proj = vec![now(), now().alias("t2")];
let plan = LogicalPlanBuilder::from(table_scan)
.project(proj)?
.build()?;

// expect the same timestamp appears in both exprs
let actual = get_optimized_plan_formatted(&plan, &time);
let expected = format!(
"Projection: TimestampNanosecond({}, Some(\"+00:00\")) AS now(), TimestampNanosecond({}, Some(\"+00:00\")) AS t2\
\n TableScan: test",
time.timestamp_nanos_opt().unwrap(),
time.timestamp_nanos_opt().unwrap()
);

assert_eq!(expected, actual);
Ok(())
}
33 changes: 0 additions & 33 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,6 @@ pub enum BuiltinScalarFunction {
Substr,
/// to_hex
ToHex,
/// from_unixtime
FromUnixtime,
///now
Now,
///current_date
CurrentDate,
/// current_time
CurrentTime,
/// make_date
MakeDate,
/// translate
Expand Down Expand Up @@ -369,17 +361,11 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Translate => Volatility::Immutable,
BuiltinScalarFunction::Trim => Volatility::Immutable,
BuiltinScalarFunction::Upper => Volatility::Immutable,
BuiltinScalarFunction::FromUnixtime => Volatility::Immutable,
BuiltinScalarFunction::OverLay => Volatility::Immutable,
BuiltinScalarFunction::Levenshtein => Volatility::Immutable,
BuiltinScalarFunction::SubstrIndex => Volatility::Immutable,
BuiltinScalarFunction::FindInSet => Volatility::Immutable,

// Stable builtin functions
BuiltinScalarFunction::Now => Volatility::Stable,
BuiltinScalarFunction::CurrentDate => Volatility::Stable,
BuiltinScalarFunction::CurrentTime => Volatility::Stable,

// Volatile builtin functions
BuiltinScalarFunction::Random => Volatility::Volatile,
BuiltinScalarFunction::Uuid => Volatility::Volatile,
Expand All @@ -396,7 +382,6 @@ impl BuiltinScalarFunction {
/// 2. Deduce the output `DataType` based on the provided `input_expr_types`.
pub fn return_type(self, input_expr_types: &[DataType]) -> Result<DataType> {
use DataType::*;
use TimeUnit::*;

// Note that this function *must* return the same type that the respective physical expression returns
// or the execution panics.
Expand Down Expand Up @@ -544,12 +529,6 @@ impl BuiltinScalarFunction {
utf8_to_int_type(&input_expr_types[0], "find_in_set")
}
BuiltinScalarFunction::ToChar => Ok(Utf8),
BuiltinScalarFunction::FromUnixtime => Ok(Timestamp(Second, None)),
BuiltinScalarFunction::Now => {
Ok(Timestamp(Nanosecond, Some("+00:00".into())))
}
BuiltinScalarFunction::CurrentDate => Ok(Date32),
BuiltinScalarFunction::CurrentTime => Ok(Time64(Nanosecond)),
BuiltinScalarFunction::MakeDate => Ok(Date32),
BuiltinScalarFunction::Translate => {
utf8_to_str_type(&input_expr_types[0], "translate")
Expand Down Expand Up @@ -757,9 +736,6 @@ impl BuiltinScalarFunction {
],
self.volatility(),
),
BuiltinScalarFunction::FromUnixtime => {
Signature::uniform(1, vec![Int64], self.volatility())
}
BuiltinScalarFunction::Digest => Signature::one_of(
vec![
Exact(vec![Utf8, Utf8]),
Expand Down Expand Up @@ -904,11 +880,6 @@ impl BuiltinScalarFunction {
// will be as good as the number of digits in the number
Signature::uniform(1, vec![Float64, Float32], self.volatility())
}
BuiltinScalarFunction::Now
| BuiltinScalarFunction::CurrentDate
| BuiltinScalarFunction::CurrentTime => {
Signature::uniform(0, vec![], self.volatility())
}
BuiltinScalarFunction::MakeDate => Signature::uniform(
3,
vec![Int32, Int64, UInt32, UInt64, Utf8],
Expand Down Expand Up @@ -1032,12 +1003,8 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::FindInSet => &["find_in_set"],

// time/date functions
BuiltinScalarFunction::Now => &["now"],
BuiltinScalarFunction::CurrentDate => &["current_date", "today"],
BuiltinScalarFunction::CurrentTime => &["current_time"],
BuiltinScalarFunction::MakeDate => &["make_date"],
BuiltinScalarFunction::ToChar => &["to_char", "date_format"],
BuiltinScalarFunction::FromUnixtime => &["from_unixtime"],

// hashing functions
BuiltinScalarFunction::Digest => &["digest"],
Expand Down
11 changes: 0 additions & 11 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,15 +788,6 @@ scalar_expr!(
datetime format,
"converts a date, time, timestamp or duration to a string based on the provided format"
);
scalar_expr!(
FromUnixtime,
from_unixtime,
unixtime,
"returns the unix time in format"
);
scalar_expr!(CurrentDate, current_date, ,"returns current UTC date as a [`DataType::Date32`] value");
scalar_expr!(Now, now, ,"returns current timestamp in nanoseconds, using the same value for all instances of now() in same statement");
scalar_expr!(CurrentTime, current_time, , "returns current UTC time as a [`DataType::Time64`] value");
scalar_expr!(MakeDate, make_date, year month day, "make a date from year, month and day component parts");
scalar_expr!(Nanvl, nanvl, x y, "returns x if x is not NaN otherwise returns y");
scalar_expr!(
Expand Down Expand Up @@ -1258,8 +1249,6 @@ mod test {
test_scalar_expr!(Trim, trim, string);
test_scalar_expr!(Upper, upper, string);

test_scalar_expr!(FromUnixtime, from_unixtime, unixtime);

test_scalar_expr!(ArrayPopFront, array_pop_front, array);
test_scalar_expr!(ArrayPopBack, array_pop_back, array);
test_scalar_expr!(ArrayPosition, array_position, array, element, index);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub enum Volatility {
Immutable,
/// A stable function may return different values given the same input across different
/// queries but must return the same value for a given input within a query. An example of
/// this is [super::BuiltinScalarFunction::Now]. DataFusion
/// this is the `Now` function. DataFusion
/// will attempt to inline `Stable` functions during planning, when possible.
/// For query `select col1, now() from t1`, it might take a while to execute but
/// `now()` column will be the same for each output row, which is evaluated
Expand Down
Loading

0 comments on commit 9d47dca

Please sign in to comment.