Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,10 @@ object FunctionRegistry {
expression[DayOfYear]("dayofyear"),
expression[DayOfMonth]("dayofmonth"),
expression[Hour]("hour"),
expression[Month]("month"),
expression[LastDay]("last_day"),
expression[Minute]("minute"),
expression[Month]("month"),
expression[NextDay]("next_day"),
expression[Quarter]("quarter"),
expression[Second]("second"),
expression[WeekOfYear]("weekofyear"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,75 @@ case class DateFormatClass(left: Expression, right: Expression) extends BinaryEx
})
}
}

/**
* Returns the last day of the month which the date belongs to.
*/
case class LastDay(startDate: Expression) extends UnaryExpression with ImplicitCastInputTypes {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a prettyName last_day

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

override def child: Expression = startDate

override def inputTypes: Seq[AbstractDataType] = Seq(DateType)

override def dataType: DataType = DateType

override def prettyName: String = "last_day"

override def nullSafeEval(date: Any): Any = {
val days = date.asInstanceOf[Int]
DateTimeUtils.getLastDayOfMonth(days)
}

override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, (sd) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nullSafeCodeGen

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defineCodeGen is a shortcut for nullSafeCodeGen which is used for one line codegen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh ok - it is a little bit confusing with the naming.

s"$dtu.getLastDayOfMonth($sd)"
})
}
}

/**
* Returns the first date which is later than startDate and named as dayOfWeek.
* For example, NextDay(2015-07-27, Sunday) would return 2015-08-02, which is the first
* sunday later than 2015-07-27.
*/
case class NextDay(startDate: Expression, dayOfWeek: Expression)
extends BinaryExpression with ImplicitCastInputTypes {

override def left: Expression = startDate
override def right: Expression = dayOfWeek

override def inputTypes: Seq[AbstractDataType] = Seq(DateType, StringType)

override def dataType: DataType = DateType

override def nullSafeEval(start: Any, dayOfW: Any): Any = {
val dow = DateTimeUtils.getDayOfWeekFromString(dayOfW.asInstanceOf[UTF8String])
if (dow == -1) {
null
} else {
val sd = start.asInstanceOf[Int]
DateTimeUtils.getNextDateForDayOfWeek(sd, dow)
}
}

override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
nullSafeCodeGen(ctx, ev, (sd, dowS) => {
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
val dow = ctx.freshName("dow")
val genDow = if (right.foldable) {
val dowVal = DateTimeUtils.getDayOfWeekFromString(
dayOfWeek.eval(InternalRow.empty).asInstanceOf[UTF8String])
s"int $dow = $dowVal;"
} else {
s"int $dow = $dtu.getDayOfWeekFromString($dowS);"
}
genDow + s"""
if ($dow == -1) {
${ev.isNull} = true;
} else {
${ev.primitive} = $dtu.getNextDateForDayOfWeek($sd, $dow);
}
"""
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -573,4 +573,50 @@ object DateTimeUtils {
dayInYear - 334
}
}

/**
* Returns Day of week from String. Starting from Thursday, marked as 0.
* (Because 1970-01-01 is Thursday).
*/
def getDayOfWeekFromString(string: UTF8String): Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you document inline why we choose Fri as 0, since it is a weird choice (usually Sun or Mon is considered the beginning of the week). Is it because 1970-1-1 is a Thu?

val dowString = string.toString.toUpperCase
dowString match {
case "SU" | "SUN" | "SUNDAY" => 3
case "MO" | "MON" | "MONDAY" => 4
case "TU" | "TUE" | "TUESDAY" => 5
case "WE" | "WED" | "WEDNESDAY" => 6
case "TH" | "THU" | "THURSDAY" => 0
case "FR" | "FRI" | "FRIDAY" => 1
case "SA" | "SAT" | "SATURDAY" => 2
case _ => -1
}
}

/**
* Returns the first date which is later than startDate and is of the given dayOfWeek.
* dayOfWeek is an integer ranges in [0, 6], and 0 is Thu, 1 is Fri, etc,.
*/
def getNextDateForDayOfWeek(startDate: Int, dayOfWeek: Int): Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should document what dayOfWeek here means, i.e. in this case, Fri is 0, Sat is 1 ...

startDate + 1 + ((dayOfWeek - 1 - startDate) % 7 + 7) % 7
}

/**
* number of days in a non-leap year.
*/
private[this] val daysInNormalYear = Array(31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)

/**
* Returns last day of the month for the given date. The date is expressed in days
* since 1.1.1970.
*/
def getLastDayOfMonth(date: Int): Int = {
val dayOfMonth = getDayOfMonth(date)
val month = getMonth(date)
if (month == 2 && isLeapYear(getYear(date))) {
date + daysInNormalYear(month - 1) + 1 - dayOfMonth
} else {
date + daysInNormalYear(month - 1) - dayOfMonth
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.text.SimpleDateFormat
import java.util.Calendar

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types.{StringType, TimestampType, DateType}

class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
Expand Down Expand Up @@ -246,4 +247,31 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}

test("last_day") {
checkEvaluation(LastDay(Literal(Date.valueOf("2015-02-28"))), Date.valueOf("2015-02-28"))
checkEvaluation(LastDay(Literal(Date.valueOf("2015-03-27"))), Date.valueOf("2015-03-31"))
checkEvaluation(LastDay(Literal(Date.valueOf("2015-04-26"))), Date.valueOf("2015-04-30"))
checkEvaluation(LastDay(Literal(Date.valueOf("2015-05-25"))), Date.valueOf("2015-05-31"))
checkEvaluation(LastDay(Literal(Date.valueOf("2015-06-24"))), Date.valueOf("2015-06-30"))
checkEvaluation(LastDay(Literal(Date.valueOf("2015-07-23"))), Date.valueOf("2015-07-31"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add more unit tests?

you should test one for each month, and also test the leap years.

checkEvaluation(LastDay(Literal(Date.valueOf("2015-08-01"))), Date.valueOf("2015-08-31"))
checkEvaluation(LastDay(Literal(Date.valueOf("2015-09-02"))), Date.valueOf("2015-09-30"))
checkEvaluation(LastDay(Literal(Date.valueOf("2015-10-03"))), Date.valueOf("2015-10-31"))
checkEvaluation(LastDay(Literal(Date.valueOf("2015-11-04"))), Date.valueOf("2015-11-30"))
checkEvaluation(LastDay(Literal(Date.valueOf("2015-12-05"))), Date.valueOf("2015-12-31"))
checkEvaluation(LastDay(Literal(Date.valueOf("2016-01-06"))), Date.valueOf("2016-01-31"))
checkEvaluation(LastDay(Literal(Date.valueOf("2016-02-07"))), Date.valueOf("2016-02-29"))
}

test("next_day") {
checkEvaluation(
NextDay(Literal(Date.valueOf("2015-07-23")), Literal("Thu")),
DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-30")))
checkEvaluation(
NextDay(Literal(Date.valueOf("2015-07-23")), Literal("THURSDAY")),
DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-30")))
checkEvaluation(
NextDay(Literal(Date.valueOf("2015-07-23")), Literal("th")),
DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-30")))
}
}
17 changes: 17 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2377,6 +2377,13 @@ object functions {
*/
def hour(columnName: String): Column = hour(Column(columnName))

/**
* Returns the last day of the month which the given date belongs to.
* @group datetime_funcs
* @since 1.5.0
*/
def last_day(e: Column): Column = LastDay(e.expr)

/**
* Extracts the minutes as an integer from a given date/timestamp/string.
* @group datetime_funcs
Expand All @@ -2391,6 +2398,16 @@ object functions {
*/
def minute(columnName: String): Column = minute(Column(columnName))

/**
* Returns the first date which is later than given date sd and named as dow.
* For example, `next_day('2015-07-27', "Sunday")` would return 2015-08-02, which is the
* first Sunday later than 2015-07-27. The parameter dayOfWeek could be 2-letter, 3-letter,
* or full name of the day of the week (e.g. Mo, tue, FRIDAY).
* @group datetime_funcs
* @since 1.5.0
*/
def next_day(sd: Column, dayOfWeek: String): Column = NextDay(sd.expr, lit(dayOfWeek).expr)

/**
* Extracts the seconds as an integer from a given date/timestamp/string.
* @group datetime_funcs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,26 @@ class DateFunctionsSuite extends QueryTest {
Row(15, 15, 15))
}

test("function last_day") {
val df1 = Seq((1, "2015-07-23"), (2, "2015-07-24")).toDF("i", "d")
val df2 = Seq((1, "2015-07-23 00:11:22"), (2, "2015-07-24 11:22:33")).toDF("i", "t")
checkAnswer(
df1.select(last_day(col("d"))),
Seq(Row(Date.valueOf("2015-07-31")), Row(Date.valueOf("2015-07-31"))))
checkAnswer(
df2.select(last_day(col("t"))),
Seq(Row(Date.valueOf("2015-07-31")), Row(Date.valueOf("2015-07-31"))))
}

test("function next_day") {
val df1 = Seq(("mon", "2015-07-23"), ("tuesday", "2015-07-20")).toDF("dow", "d")
val df2 = Seq(("th", "2015-07-23 00:11:22"), ("xx", "2015-07-24 11:22:33")).toDF("dow", "t")
checkAnswer(
df1.select(next_day(col("d"), "MONDAY")),
Seq(Row(Date.valueOf("2015-07-27")), Row(Date.valueOf("2015-07-27"))))
checkAnswer(
df2.select(next_day(col("t"), "th")),
Seq(Row(Date.valueOf("2015-07-30")), Row(null)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that this actually fails unit tests. I thought it passed but it was from a previous commit.

Can you please pay a little bit more attention in the future and at least run your own unit tests locally? Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for that...

}

}