Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.expressions
import java.text.ParseException
import java.time.{DateTimeException, LocalDate, LocalDateTime, ZoneId}
import java.time.format.DateTimeParseException
import java.time.temporal.IsoFields
import java.util.Locale

import org.apache.commons.text.StringEscapeUtils
Expand Down Expand Up @@ -386,7 +385,7 @@ case class DayOfYear(child: Expression) extends GetDateField {
override val funcName = "getDayInYear"
}

abstract class NumberToTimestampBase extends UnaryExpression
abstract class IntegralToTimestampBase extends UnaryExpression
with ExpectsInputTypes with NullIntolerant {

protected def upScaleFactor: Long
Expand All @@ -408,19 +407,66 @@ abstract class NumberToTimestampBase extends UnaryExpression
}
}

// scalastyle:off line.size.limit
@ExpressionDescription(
usage = "_FUNC_(seconds) - Creates timestamp from the number of seconds since UTC epoch.",
usage = "_FUNC_(seconds) - Creates timestamp from the number of seconds (can be fractional) since UTC epoch.",
examples = """
Examples:
> SELECT _FUNC_(1230219000);
2008-12-25 07:30:00
> SELECT _FUNC_(1230219000.123);
2008-12-25 07:30:00.123
""",
group = "datetime_funcs",
since = "3.1.0")
case class SecondsToTimestamp(child: Expression)
extends NumberToTimestampBase {
// scalastyle:on line.size.limit
case class SecondsToTimestamp(child: Expression) extends UnaryExpression
with ExpectsInputTypes with NullIntolerant {

override def inputTypes: Seq[AbstractDataType] = Seq(NumericType)

override def dataType: DataType = TimestampType

override def upScaleFactor: Long = MICROS_PER_SECOND
override def nullable: Boolean = child.dataType match {
case _: FloatType | _: DoubleType => true
case _ => child.nullable
}

@transient
private lazy val evalFunc: Any => Any = child.dataType match {
case _: IntegralType => input =>
Math.multiplyExact(input.asInstanceOf[Number].longValue(), MICROS_PER_SECOND)
case _: DecimalType => input =>
val operand = new java.math.BigDecimal(MICROS_PER_SECOND)
input.asInstanceOf[Decimal].toJavaBigDecimal.multiply(operand).longValueExact()
case _: FloatType => input =>
val f = input.asInstanceOf[Float]
if (f.isNaN || f.isInfinite) null else (f * MICROS_PER_SECOND).toLong
case _: DoubleType => input =>
val d = input.asInstanceOf[Double]
if (d.isNaN || d.isInfinite) null else (d * MICROS_PER_SECOND).toLong
}

override def nullSafeEval(input: Any): Any = evalFunc(input)

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = child.dataType match {
case _: IntegralType =>
defineCodeGen(ctx, ev, c => s"java.lang.Math.multiplyExact($c, ${MICROS_PER_SECOND}L)")
case _: DecimalType =>
val operand = s"new java.math.BigDecimal($MICROS_PER_SECOND)"
defineCodeGen(ctx, ev, c => s"$c.toJavaBigDecimal().multiply($operand).longValueExact()")
case other =>
nullSafeCodeGen(ctx, ev, c => {
val typeStr = CodeGenerator.boxedType(other)
s"""
|if ($typeStr.isNaN($c) || $typeStr.isInfinite($c)) {
| ${ev.isNull} = true;
|} else {
| ${ev.value} = (long)($c * $MICROS_PER_SECOND);
|}
|""".stripMargin
})
}

override def prettyName: String = "timestamp_seconds"
}
Expand All @@ -437,7 +483,7 @@ case class SecondsToTimestamp(child: Expression)
since = "3.1.0")
// scalastyle:on line.size.limit
case class MillisToTimestamp(child: Expression)
extends NumberToTimestampBase {
extends IntegralToTimestampBase {

override def upScaleFactor: Long = MICROS_PER_MILLIS

Expand All @@ -456,7 +502,7 @@ case class MillisToTimestamp(child: Expression)
since = "3.1.0")
// scalastyle:on line.size.limit
case class MicrosToTimestamp(child: Expression)
extends NumberToTimestampBase {
extends IntegralToTimestampBase {

override def upScaleFactor: Long = 1L

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1142,28 +1142,6 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}

test("SPARK-31710:Adds TIMESTAMP_SECONDS, " +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is rewritten to increase test coverage.

"TIMESTAMP_MILLIS and TIMESTAMP_MICROS functions") {
checkEvaluation(SecondsToTimestamp(Literal(1230219000)), 1230219000L * MICROS_PER_SECOND)
checkEvaluation(SecondsToTimestamp(Literal(-1230219000)), -1230219000L * MICROS_PER_SECOND)
checkEvaluation(SecondsToTimestamp(Literal(null, IntegerType)), null)
checkEvaluation(MillisToTimestamp(Literal(1230219000123L)), 1230219000123L * MICROS_PER_MILLIS)
checkEvaluation(MillisToTimestamp(
Literal(-1230219000123L)), -1230219000123L * MICROS_PER_MILLIS)
checkEvaluation(MillisToTimestamp(Literal(null, IntegerType)), null)
checkEvaluation(MicrosToTimestamp(Literal(1230219000123123L)), 1230219000123123L)
checkEvaluation(MicrosToTimestamp(Literal(-1230219000123123L)), -1230219000123123L)
checkEvaluation(MicrosToTimestamp(Literal(null, IntegerType)), null)
checkExceptionInExpression[ArithmeticException](
SecondsToTimestamp(Literal(1230219000123123L)), "long overflow")
checkExceptionInExpression[ArithmeticException](
SecondsToTimestamp(Literal(-1230219000123123L)), "long overflow")
checkExceptionInExpression[ArithmeticException](
MillisToTimestamp(Literal(92233720368547758L)), "long overflow")
checkExceptionInExpression[ArithmeticException](
MillisToTimestamp(Literal(-92233720368547758L)), "long overflow")
}

test("Consistent error handling for datetime formatting and parsing functions") {

def checkException[T <: Exception : ClassTag](c: String): Unit = {
Expand Down Expand Up @@ -1194,4 +1172,118 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
new ParseToTimestamp(Literal("11:11 PM"), Literal("mm:ss a")).child,
Timestamp.valueOf("1970-01-01 12:11:11.0"))
}

def testIntegralInput(testFunc: Number => Unit): Unit = {
def checkResult(input: Long): Unit = {
if (input.toByte == input) {
testFunc(input.toByte)
} else if (input.toShort == input) {
testFunc(input.toShort)
} else if (input.toInt == input) {
testFunc(input.toInt)
} else {
testFunc(input)
}
}
checkResult(0)
checkResult(Byte.MaxValue)
checkResult(Byte.MinValue)
checkResult(Short.MaxValue)
checkResult(Short.MinValue)
checkResult(Int.MaxValue)
checkResult(Int.MinValue)
checkResult(Int.MaxValue.toLong + 100)
checkResult(Int.MinValue.toLong - 100)
}

test("TIMESTAMP_SECONDS") {
def testIntegralFunc(value: Number): Unit = {
checkEvaluation(
SecondsToTimestamp(Literal(value)),
Instant.ofEpochSecond(value.longValue()))
}

// test null input
checkEvaluation(
SecondsToTimestamp(Literal(null, IntegerType)),
null)

// test integral input
testIntegralInput(testIntegralFunc)
// test overflow
checkExceptionInExpression[ArithmeticException](
SecondsToTimestamp(Literal(Long.MaxValue, LongType)), EmptyRow, "long overflow")

def testFractionalInput(input: String): Unit = {
Seq(input.toFloat, input.toDouble, Decimal(input)).foreach { value =>
checkEvaluation(
SecondsToTimestamp(Literal(value)),
(input.toDouble * MICROS_PER_SECOND).toLong)
}
}

testFractionalInput("1.0")
testFractionalInput("-1.0")
testFractionalInput("1.234567")
testFractionalInput("-1.234567")

// test overflow for decimal input
checkExceptionInExpression[ArithmeticException](
SecondsToTimestamp(Literal(Decimal("9" * 38))), "Overflow"
)
// test truncation error for decimal input
checkExceptionInExpression[ArithmeticException](
SecondsToTimestamp(Literal(Decimal("0.1234567"))), "Rounding necessary"
)

// test NaN
checkEvaluation(
SecondsToTimestamp(Literal(Double.NaN)),
null)
checkEvaluation(
SecondsToTimestamp(Literal(Float.NaN)),
null)
// double input can truncate
checkEvaluation(
SecondsToTimestamp(Literal(123.456789123)),
Instant.ofEpochSecond(123, 456789000))
}

test("TIMESTAMP_MILLIS") {
def testIntegralFunc(value: Number): Unit = {
checkEvaluation(
MillisToTimestamp(Literal(value)),
Instant.ofEpochMilli(value.longValue()))
}

// test null input
checkEvaluation(
MillisToTimestamp(Literal(null, IntegerType)),
null)

// test integral input
testIntegralInput(testIntegralFunc)
// test overflow
checkExceptionInExpression[ArithmeticException](
MillisToTimestamp(Literal(Long.MaxValue, LongType)), EmptyRow, "long overflow")
}

test("TIMESTAMP_MICROS") {
def testIntegralFunc(value: Number): Unit = {
checkEvaluation(
MicrosToTimestamp(Literal(value)),
value.longValue())
}

// test null input
checkEvaluation(
MicrosToTimestamp(Literal(null, IntegerType)),
null)

// test integral input
testIntegralInput(testIntegralFunc)
// test max/min input
testIntegralFunc(Long.MaxValue)
testIntegralFunc(Long.MinValue)
}
}
7 changes: 6 additions & 1 deletion sql/core/src/test/resources/sql-tests/inputs/datetime.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@

-- [SPARK-31710] TIMESTAMP_SECONDS, TIMESTAMP_MILLISECONDS and TIMESTAMP_MICROSECONDS to timestamp transfer
select TIMESTAMP_SECONDS(1230219000),TIMESTAMP_SECONDS(-1230219000),TIMESTAMP_SECONDS(null);
select TIMESTAMP_SECONDS(1.23), TIMESTAMP_SECONDS(1.23d), TIMESTAMP_SECONDS(FLOAT(1.23));
select TIMESTAMP_MILLIS(1230219000123),TIMESTAMP_MILLIS(-1230219000123),TIMESTAMP_MILLIS(null);
select TIMESTAMP_MICROS(1230219000123123),TIMESTAMP_MICROS(-1230219000123123),TIMESTAMP_MICROS(null);
-- overflow exception:
-- overflow exception
select TIMESTAMP_SECONDS(1230219000123123);
select TIMESTAMP_SECONDS(-1230219000123123);
select TIMESTAMP_MILLIS(92233720368547758);
select TIMESTAMP_MILLIS(-92233720368547758);
-- truncate exception
select TIMESTAMP_SECONDS(0.1234567);
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we have a test case for allow truncation together because this PR allows truncation for double type?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR aims to allow truncation for both ANSI and legacy mode. Did I understand correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, float/double are approximate values and truncation should be always allowed.

-- truncation is OK for float/double
select TIMESTAMP_SECONDS(0.1234567d), TIMESTAMP_SECONDS(FLOAT(0.1234567));

-- [SPARK-16836] current_date and current_timestamp literals
select current_date = current_date(), current_timestamp = current_timestamp();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 103
-- Number of queries: 106


-- !query
select TIMESTAMP_SECONDS(1230219000),TIMESTAMP_SECONDS(-1230219000),TIMESTAMP_SECONDS(null)
-- !query schema
struct<timestamp_seconds(1230219000):timestamp,timestamp_seconds(-1230219000):timestamp,timestamp_seconds(CAST(NULL AS INT)):timestamp>
struct<timestamp_seconds(1230219000):timestamp,timestamp_seconds(-1230219000):timestamp,timestamp_seconds(CAST(NULL AS DOUBLE)):timestamp>
-- !query output
2008-12-25 07:30:00 1931-01-07 00:30:00 NULL


-- !query
select TIMESTAMP_SECONDS(1.23), TIMESTAMP_SECONDS(1.23d), TIMESTAMP_SECONDS(FLOAT(1.23))
-- !query schema
struct<timestamp_seconds(1.23):timestamp,timestamp_seconds(1.23):timestamp,timestamp_seconds(CAST(1.23 AS FLOAT)):timestamp>
-- !query output
1969-12-31 16:00:01.23 1969-12-31 16:00:01.23 1969-12-31 16:00:01.23


-- !query
select TIMESTAMP_MILLIS(1230219000123),TIMESTAMP_MILLIS(-1230219000123),TIMESTAMP_MILLIS(null)
-- !query schema
Expand Down Expand Up @@ -62,6 +70,23 @@ java.lang.ArithmeticException
long overflow


-- !query
select TIMESTAMP_SECONDS(0.1234567)
-- !query schema
struct<>
-- !query output
java.lang.ArithmeticException
Rounding necessary


-- !query
select TIMESTAMP_SECONDS(0.1234567d), TIMESTAMP_SECONDS(FLOAT(0.1234567))
-- !query schema
struct<timestamp_seconds(0.1234567):timestamp,timestamp_seconds(CAST(0.1234567 AS FLOAT)):timestamp>
-- !query output
1969-12-31 16:00:00.123456 1969-12-31 16:00:00.123456


-- !query
select current_date = current_date(), current_timestamp = current_timestamp()
-- !query schema
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 103
-- Number of queries: 106


-- !query
select TIMESTAMP_SECONDS(1230219000),TIMESTAMP_SECONDS(-1230219000),TIMESTAMP_SECONDS(null)
-- !query schema
struct<timestamp_seconds(1230219000):timestamp,timestamp_seconds(-1230219000):timestamp,timestamp_seconds(CAST(NULL AS INT)):timestamp>
struct<timestamp_seconds(1230219000):timestamp,timestamp_seconds(-1230219000):timestamp,timestamp_seconds(CAST(NULL AS DOUBLE)):timestamp>
-- !query output
2008-12-25 07:30:00 1931-01-07 00:30:00 NULL


-- !query
select TIMESTAMP_SECONDS(1.23), TIMESTAMP_SECONDS(1.23d), TIMESTAMP_SECONDS(FLOAT(1.23))
-- !query schema
struct<timestamp_seconds(1.23):timestamp,timestamp_seconds(1.23):timestamp,timestamp_seconds(CAST(1.23 AS FLOAT)):timestamp>
-- !query output
1969-12-31 16:00:01.23 1969-12-31 16:00:01.23 1969-12-31 16:00:01.23


-- !query
select TIMESTAMP_MILLIS(1230219000123),TIMESTAMP_MILLIS(-1230219000123),TIMESTAMP_MILLIS(null)
-- !query schema
Expand Down Expand Up @@ -62,6 +70,23 @@ java.lang.ArithmeticException
long overflow


-- !query
select TIMESTAMP_SECONDS(0.1234567)
-- !query schema
struct<>
-- !query output
java.lang.ArithmeticException
Rounding necessary


-- !query
select TIMESTAMP_SECONDS(0.1234567d), TIMESTAMP_SECONDS(FLOAT(0.1234567))
-- !query schema
struct<timestamp_seconds(0.1234567):timestamp,timestamp_seconds(CAST(0.1234567 AS FLOAT)):timestamp>
-- !query output
1969-12-31 16:00:00.123456 1969-12-31 16:00:00.123456


-- !query
select current_date = current_date(), current_timestamp = current_timestamp()
-- !query schema
Expand Down
Loading