diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 096521b975..b9fcbdc58a 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -268,6 +268,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.expression.IsNull.enabled` | Enable Comet acceleration for `IsNull` | true | | `spark.comet.expression.JsonToStructs.enabled` | Enable Comet acceleration for `JsonToStructs` | true | | `spark.comet.expression.KnownFloatingPointNormalized.enabled` | Enable Comet acceleration for `KnownFloatingPointNormalized` | true | +| `spark.comet.expression.LastDay.enabled` | Enable Comet acceleration for `LastDay` | true | | `spark.comet.expression.Length.enabled` | Enable Comet acceleration for `Length` | true | | `spark.comet.expression.LessThan.enabled` | Enable Comet acceleration for `LessThan` | true | | `spark.comet.expression.LessThanOrEqual.enabled` | Enable Comet acceleration for `LessThanOrEqual` | true | diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 75c53198b8..56569bc69c 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -44,6 +44,7 @@ use datafusion_spark::function::bitwise::bit_get::SparkBitGet; use datafusion_spark::function::bitwise::bitwise_not::SparkBitwiseNot; use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; +use datafusion_spark::function::datetime::last_day::SparkLastDay; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; use datafusion_spark::function::math::expm1::SparkExpm1; @@ -345,6 +346,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitGet::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkDateAdd::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkDateSub::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(SparkLastDay::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 627a5e42c0..37645c3e86 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -190,6 +190,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[DateSub] -> CometDateSub, classOf[UnixDate] -> CometUnixDate, classOf[FromUnixTime] -> CometFromUnixTime, + classOf[LastDay] -> CometLastDay, classOf[Hour] -> CometHour, classOf[Minute] -> CometMinute, classOf[Second] -> CometSecond, diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index 5e7e9c68b9..a623146916 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType} import org.apache.spark.unsafe.types.UTF8String @@ -310,6 +310,8 @@ object CometDateAdd extends CometScalarFunction[DateAdd]("date_add") object CometDateSub extends CometScalarFunction[DateSub]("date_sub") +object CometLastDay extends CometScalarFunction[LastDay]("last_day") + object CometDateDiff extends CometScalarFunction[DateDiff]("date_diff") /** diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 0ccc210771..1ae6926e05 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -182,6 +182,32 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH FuzzDataGenerator.generateDataFrame(r, spark, schema, 1000, DataGenOptions()) } + test("last_day") { + val r = new Random(42) + val schema = StructType(Seq(StructField("c0", DataTypes.DateType, true))) + val df = FuzzDataGenerator.generateDataFrame(r, spark, schema, 1000, DataGenOptions()) + df.createOrReplaceTempView("tbl") + + // Basic test with random dates + checkSparkAnswerAndOperator("SELECT c0, last_day(c0) FROM tbl ORDER BY c0") + + // Disable constant folding to ensure literal expressions are executed by Comet + withSQLConf( + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> + "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { + // Test with literal dates - various months + checkSparkAnswerAndOperator( + "SELECT last_day(DATE('2024-01-15')), last_day(DATE('2024-02-15')), last_day(DATE('2024-12-01'))") + + // Test leap year handling (February) + checkSparkAnswerAndOperator( + "SELECT last_day(DATE('2024-02-01')), last_day(DATE('2023-02-01'))") + + // Test null handling + checkSparkAnswerAndOperator("SELECT last_day(NULL)") + } + } + test("datediff") { val r = new Random(42) val schema = StructType(