diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 372f80d4a8b16..4049a7655ef96 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -218,6 +218,8 @@ object FunctionRegistry { expression[NextDay]("next_day"), expression[Quarter]("quarter"), expression[Second]("second"), + expression[ToDate]("to_date"), + expression[Trunc]("trunc"), expression[WeekOfYear]("weekofyear"), expression[Year]("year"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala index efecb771f2f5d..9ae5f9ced0a19 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala @@ -272,6 +272,26 @@ case class LastDay(startDate: Expression) extends UnaryExpression with ImplicitC override def prettyName: String = "last_day" } +/** + * Returns the date part of a timestamp string. + */ +case class ToDate(child: Expression) extends UnaryExpression with ImplicitCastInputTypes { + + // Implicit casting of spark will accept string in both date and timestamp format, as + // well as TimestampType. + override def inputTypes: Seq[AbstractDataType] = Seq(DateType) + + override def dataType: DataType = DateType + + override def eval(input: InternalRow): Any = { + child.eval(input) + } + + override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + defineCodeGen(ctx, ev, (time) => time) + } +} + /** * Returns the first date which is later than startDate and named as dayOfWeek. * For example, NextDay(2015-07-27, Sunday) would return 2015-08-02, which is the first @@ -283,6 +303,7 @@ case class NextDay(startDate: Expression, dayOfWeek: Expression) extends BinaryExpression with ImplicitCastInputTypes { override def left: Expression = startDate + override def right: Expression = dayOfWeek override def inputTypes: Seq[AbstractDataType] = Seq(DateType, StringType) @@ -330,3 +351,88 @@ case class NextDay(startDate: Expression, dayOfWeek: Expression) override def prettyName: String = "next_day" } + +/** + * Returns date truncated to the unit specified by the format. + */ +case class Trunc(date: Expression, format: Expression) + extends BinaryExpression with ImplicitCastInputTypes { + override def left: Expression = date + override def right: Expression = format + + override def inputTypes: Seq[AbstractDataType] = Seq(DateType, StringType) + override def dataType: DataType = DateType + + lazy val constFmt = format.eval().asInstanceOf[UTF8String] + + override def eval(input: InternalRow): Any = { + if (format.foldable) { + val minItem = DateTimeUtils.getFmt(constFmt) + if (minItem == -1) { + // unknown format + null + } else { + val d = date.eval(input) + if (d == null) { + null + } else { + DateTimeUtils.dateTrunc(d.asInstanceOf[Int], minItem) + } + } + } else { + val fmt = format.eval(input).asInstanceOf[UTF8String] + val d = date.eval(input) + if (d == null) { + null + } else { + val minItem = DateTimeUtils.getFmt(fmt) + if (minItem == -1) { + // unknown format + null + } else { + DateTimeUtils.dateTrunc(d.asInstanceOf[Int], minItem) + } + } + } + } + + override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") + if (date.foldable) { + val d = date.gen(ctx) + val minItem = DateTimeUtils.getFmt(constFmt) + if (d == null || minItem == -1) { + s""" + boolean ${ev.isNull} = true; + ${ctx.javaType(dataType)} ${ev.primitive} = ${ctx.defaultValue(dataType)}; + """ + } else { + s""" + ${d.code} + boolean ${ev.isNull} = ${d.isNull}; + ${ctx.javaType(dataType)} ${ev.primitive} = ${ctx.defaultValue(dataType)}; + if (!${ev.isNull}) { + if ($minItem == -1) { + ${ev.isNull} = true; + } else { + ${ev.primitive} = $dtu.dateTrunc(${d.primitive}, $minItem); + } + } + """ + } + } else { + nullSafeCodeGen(ctx, ev, (dateVal, fmt) => { + val form = ctx.freshName("form") + s""" + int $form = $dtu.getFmt($fmt); + if ($form == -1) { + ${ev.isNull} = true; + } else { + ${ev.primitive} = $dtu.dateTrunc($dateVal, $form); + } + """ + }) + } + } + +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 93966a503c27c..6214f3f64aacb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -640,4 +640,35 @@ object DateTimeUtils { } date + (lastDayOfMonthInYear - dayInYear) } + + /** + * Returns the trunc date from original date and trunc level. + * Trunc level should be generated using `this.getFmt()`. + */ + def dateTrunc(d: Int, minItem: Int): Int = { + if (minItem == 2) { + // trunc to year + d - DateTimeUtils.getDayInYear(d) + 1 + } else { + // trunc to MONTH + d - DateTimeUtils.getDayOfMonth(d) + 1 + } + } + + /** + * Returns the truncate level, could be 1 for month, 2 for year, -1 for invalid/null + * -1 means unsupported truncate level. + */ + def getFmt(string: UTF8String): Int = { + if (string == null) { + -1 + } else { + val fmtString = string.toString.toUpperCase + fmtString match { + case "MON" | "MONTH" | "MM" => 1 + case "YEAR" | "YYYY" | "YY" => 2 + case _ => -1 + } + } + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index aca8d6eb3500c..2ef7442df1fe0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -303,4 +303,22 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation( NextDay(Literal(Date.valueOf("2015-07-23")), Literal.create(null, StringType)), null) } + + test("function to_date") { + checkEvaluation( + ToDate(Literal(Date.valueOf("2015-07-22"))), + DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-22"))) + } + + test("function trunc") { + checkEvaluation(EqualTo( + Trunc(Literal(Date.valueOf("2015-07-22")), Literal("YYYY")), + Trunc(Literal(Date.valueOf("2015-01-01")), Literal("YEAR"))), true) + + checkEvaluation(EqualTo( + Trunc(Literal(Date.valueOf("2015-07-22")), Literal("MONTH")), + Trunc(Literal(Date.valueOf("2015-07-01")), Literal("mm"))), true) + + checkEvaluation(Trunc(Literal(Date.valueOf("2015-07-22")), Literal("DD")), null) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 4261a5e7cbeb5..c4db92ee393ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -1015,22 +1015,6 @@ object functions { */ def cosh(columnName: String): Column = cosh(Column(columnName)) - /** - * Returns the current date. - * - * @group datetime_funcs - * @since 1.5.0 - */ - def current_date(): Column = CurrentDate() - - /** - * Returns the current timestamp. - * - * @group datetime_funcs - * @since 1.5.0 - */ - def current_timestamp(): Column = CurrentTimestamp() - /** * Computes the exponential of the given value. * @@ -1916,6 +1900,22 @@ object functions { // DateTime functions ////////////////////////////////////////////////////////////////////////////////////////////// + /** + * Returns the current date. + * + * @group datetime_funcs + * @since 1.5.0 + */ + def current_date(): Column = CurrentDate() + + /** + * Returns the current timestamp. + * + * @group datetime_funcs + * @since 1.5.0 + */ + def current_timestamp(): Column = CurrentTimestamp() + /** * Converts a date/timestamp/string to a value of string in the format specified by the date * format given by the second argument. @@ -2099,6 +2099,22 @@ object functions { */ def weekofyear(columnName: String): Column = weekofyear(Column(columnName)) + /** + * Returns date truncated to the unit specified by the format. + * + * @group datetime_funcs + * @since 1.5.0 + */ + def to_date(e: Column): Column = ToDate(e.expr) + + /** + * Returns date truncated to the unit specified by the format. + * + * @group datetime_funcs + * @since 1.5.0 + */ + def trunc(date: Column, format: Column): Column = Trunc(date.expr, format.expr) + ////////////////////////////////////////////////////////////////////////////////////////////// // Collection functions ////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index 07eb6e4a8d8cd..a2e33c2ae0d93 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -228,4 +228,48 @@ class DateFunctionsSuite extends QueryTest { Seq(Row(Date.valueOf("2015-07-30")), Row(Date.valueOf("2015-07-30")))) } + test("function to_date") { + val d1 = Date.valueOf("2015-07-22") + val d2 = Date.valueOf("2015-07-01") + val t1 = Timestamp.valueOf("2015-07-22 10:00:00") + val t2 = Timestamp.valueOf("2014-12-31 23:59:59") + val s1 = "2015-07-22 10:00:00" + val s2 = "2014-12-31" + val df = Seq((d1, t1, s1), (d2, t2, s2)).toDF("d", "t", "s") + + checkAnswer( + df.select(to_date(col("t"))), + Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2014-12-31")))) + checkAnswer( + df.select(to_date(col("d"))), + Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2015-07-01")))) + checkAnswer( + df.select(to_date(col("s"))), + Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2014-12-31")))) + + checkAnswer( + df.selectExpr("to_date(t)"), + Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2014-12-31")))) + checkAnswer( + df.selectExpr("to_date(d)"), + Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2015-07-01")))) + checkAnswer( + df.selectExpr("to_date(s)"), + Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2014-12-31")))) + } + + test("function trunc") { + val df = Seq( + (1, Timestamp.valueOf("2015-07-22 10:00:00")), + (2, Timestamp.valueOf("2014-12-31 00:00:00"))).toDF("i", "t") + + checkAnswer( + df.select(trunc(col("t"), lit("YY"))), + Seq(Row(Date.valueOf("2015-01-01")), Row(Date.valueOf("2014-01-01")))) + + + checkAnswer( + df.selectExpr("trunc(t, 'Month')"), + Seq(Row(Date.valueOf("2015-07-01")), Row(Date.valueOf("2014-12-01")))) + } }