diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 0130923e694b..d6550c30b955 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -30,6 +30,8 @@ license: | - In Spark 3.1, `from_unixtime`, `unix_timestamp`,`to_unix_timestamp`, `to_timestamp` and `to_date` will fail if the specified datetime pattern is invalid. In Spark 3.0 or earlier, they result `NULL`. + - In Spark 3.1, casting numeric to timestamp will be forbidden by default. It's strongly recommended to use dedicated functions: TIMESTAMP_SECONDS, TIMESTAMP_MILLIS and TIMESTAMP_MICROS. Or you can set `spark.sql.legacy.allowCastNumericToTimestamp` to true to work around it. See more details in SPARK-31710. + ## Upgrading from Spark SQL 2.4 to 3.0 ### Dataset/DataFrame APIs diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 03e3b9ca4bd0..3ad899bcc367 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -534,7 +534,10 @@ def withWatermark(self, eventTime, delayThreshold): .. note:: Evolving - >>> sdf.select('name', sdf.time.cast('timestamp')).withWatermark('time', '10 minutes') + >>> from pyspark.sql.functions import timestamp_seconds + >>> sdf.select( + ... 'name', + ... timestamp_seconds(sdf.time).alias('time')).withWatermark('time', '10 minutes') DataFrame[name: string, time: timestamp] """ if not eventTime or type(eventTime) is not str: diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index de0d38e2aed1..0c8c34dd8799 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1427,6 +1427,19 @@ def to_utc_timestamp(timestamp, tz): return Column(sc._jvm.functions.to_utc_timestamp(_to_java_column(timestamp), tz)) +@since(3.1) +def timestamp_seconds(col): + """ + >>> from pyspark.sql.functions import timestamp_seconds + >>> time_df = spark.createDataFrame([(1230219000,)], ['unix_time']) + >>> time_df.select(timestamp_seconds(time_df.unix_time).alias('ts')).collect() + [Row(ts=datetime.datetime(2008, 12, 25, 7, 30))] + """ + + sc = SparkContext._active_spark_context + return Column(sc._jvm.functions.timestamp_seconds(_to_java_column(col))) + + @since(2.0) @ignore_unicode_prefix def window(timeColumn, windowDuration, slideDuration=None, startTime=None): diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index 062e61663a33..30c3fd4c8d16 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -644,7 +644,7 @@ def test_to_pandas_from_mixed_dataframe(self): CAST(col6 AS DOUBLE) AS double, CAST(col7 AS BOOLEAN) AS boolean, CAST(col8 AS STRING) AS string, - CAST(col9 AS TIMESTAMP) AS timestamp + timestamp_seconds(col9) AS timestamp FROM VALUES (1, 1, 1, 1, 1, 1, 1, 1, 1), (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL) """ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index ef70915a5c96..5576e71b5702 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -59,7 +59,8 @@ object Cast { case (StringType, TimestampType) => true case (BooleanType, TimestampType) => true case (DateType, TimestampType) => true - case (_: NumericType, TimestampType) => true + case (_: NumericType, TimestampType) => + SQLConf.get.getConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP) case (StringType, DateType) => true case (TimestampType, DateType) => true @@ -266,7 +267,15 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit TypeCheckResult.TypeCheckSuccess } else { TypeCheckResult.TypeCheckFailure( - s"cannot cast ${child.dataType.catalogString} to ${dataType.catalogString}") + if (child.dataType.isInstanceOf[NumericType] && dataType.isInstanceOf[TimestampType]) { + s"cannot cast ${child.dataType.catalogString} to ${dataType.catalogString}," + + "you can enable the casting by setting " + + s"${SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key} to true," + + "but we strongly recommend using function " + + "TIMESTAMP_SECONDS/TIMESTAMP_MILLIS/TIMESTAMP_MICROS instead." + } else { + s"cannot cast ${child.dataType.catalogString} to ${dataType.catalogString}" + }) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index c5cf447c103b..b46c3fb349ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -1549,7 +1549,7 @@ case class ParseToDate(left: Expression, format: Option[Expression], child: Expr def this(left: Expression, format: Expression) { this(left, Option(format), - Cast(Cast(UnixTimestamp(left, format), TimestampType), DateType)) + Cast(SecondsToTimestamp(UnixTimestamp(left, format)), DateType)) } def this(left: Expression) = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 189740e31320..1bebdef1ebcb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2595,6 +2595,15 @@ object SQLConf { .checkValue(_ > 0, "The timeout value must be positive") .createWithDefault(10L) + val LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP = + buildConf("spark.sql.legacy.allowCastNumericToTimestamp") + .internal() + .doc("When true, allow casting numeric to timestamp," + + "when false, forbid the cast, more details in SPARK-31710") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + /** * Holds information about keys that have been deprecated. * @@ -3181,6 +3190,9 @@ class SQLConf extends Serializable with Logging { def integerGroupingIdEnabled: Boolean = getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID) + def legacyAllowCastNumericToTimestamp: Boolean = + getConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala index 6af995cab64f..35b401798013 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala @@ -50,7 +50,9 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper { } protected def checkNullCast(from: DataType, to: DataType): Unit = { - checkEvaluation(cast(Literal.create(null, from), to, UTC_OPT), null) + withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> "true") { + checkEvaluation(cast(Literal.create(null, from), to, UTC_OPT), null) + } } test("null cast") { @@ -239,7 +241,9 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper { checkCast(1.5, 1.5f) checkCast(1.5, "1.5") - checkEvaluation(cast(cast(1.toDouble, TimestampType), DoubleType), 1.toDouble) + withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> "true") { + checkEvaluation(cast(cast(1.toDouble, TimestampType), DoubleType), 1.toDouble) + } } test("cast from string") { @@ -305,17 +309,20 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(cast(cast(cast(cast( cast(cast("5", ByteType), ShortType), IntegerType), FloatType), DoubleType), LongType), 5.toLong) - checkEvaluation( - cast(cast(cast(cast(cast(cast("5", ByteType), TimestampType), - DecimalType.SYSTEM_DEFAULT), LongType), StringType), ShortType), - 5.toShort) - checkEvaluation( - cast(cast(cast(cast(cast(cast("5", TimestampType, UTC_OPT), ByteType), - DecimalType.SYSTEM_DEFAULT), LongType), StringType), ShortType), - null) - checkEvaluation(cast(cast(cast(cast(cast(cast("5", DecimalType.SYSTEM_DEFAULT), - ByteType), TimestampType), LongType), StringType), ShortType), - 5.toShort) + + withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> "true") { + checkEvaluation( + cast(cast(cast(cast(cast(cast("5", ByteType), TimestampType), + DecimalType.SYSTEM_DEFAULT), LongType), StringType), ShortType), + 5.toShort) + checkEvaluation( + cast(cast(cast(cast(cast(cast("5", TimestampType, UTC_OPT), ByteType), + DecimalType.SYSTEM_DEFAULT), LongType), StringType), ShortType), + null) + checkEvaluation(cast(cast(cast(cast(cast(cast("5", DecimalType.SYSTEM_DEFAULT), + ByteType), TimestampType), LongType), StringType), ShortType), + 5.toShort) + } checkEvaluation(cast("23", DoubleType), 23d) checkEvaluation(cast("23", IntegerType), 23) @@ -376,29 +383,32 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(cast(ts, LongType), 15.toLong) checkEvaluation(cast(ts, FloatType), 15.003f) checkEvaluation(cast(ts, DoubleType), 15.003) - checkEvaluation(cast(cast(tss, ShortType), TimestampType), - DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND) - checkEvaluation(cast(cast(tss, IntegerType), TimestampType), - DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND) - checkEvaluation(cast(cast(tss, LongType), TimestampType), - DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND) - checkEvaluation( - cast(cast(millis.toFloat / MILLIS_PER_SECOND, TimestampType), FloatType), - millis.toFloat / MILLIS_PER_SECOND) - checkEvaluation( - cast(cast(millis.toDouble / MILLIS_PER_SECOND, TimestampType), DoubleType), - millis.toDouble / MILLIS_PER_SECOND) - checkEvaluation( - cast(cast(Decimal(1), TimestampType), DecimalType.SYSTEM_DEFAULT), - Decimal(1)) - // A test for higher precision than millis - checkEvaluation(cast(cast(0.000001, TimestampType), DoubleType), 0.000001) + withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> "true") { + checkEvaluation(cast(cast(tss, ShortType), TimestampType), + DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND) + checkEvaluation(cast(cast(tss, IntegerType), TimestampType), + DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND) + checkEvaluation(cast(cast(tss, LongType), TimestampType), + DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND) + checkEvaluation( + cast(cast(millis.toFloat / MILLIS_PER_SECOND, TimestampType), FloatType), + millis.toFloat / MILLIS_PER_SECOND) + checkEvaluation( + cast(cast(millis.toDouble / MILLIS_PER_SECOND, TimestampType), DoubleType), + millis.toDouble / MILLIS_PER_SECOND) + checkEvaluation( + cast(cast(Decimal(1), TimestampType), DecimalType.SYSTEM_DEFAULT), + Decimal(1)) - checkEvaluation(cast(Double.NaN, TimestampType), null) - checkEvaluation(cast(1.0 / 0.0, TimestampType), null) - checkEvaluation(cast(Float.NaN, TimestampType), null) - checkEvaluation(cast(1.0f / 0.0f, TimestampType), null) + // A test for higher precision than millis + checkEvaluation(cast(cast(0.000001, TimestampType), DoubleType), 0.000001) + + checkEvaluation(cast(Double.NaN, TimestampType), null) + checkEvaluation(cast(1.0 / 0.0, TimestampType), null) + checkEvaluation(cast(Float.NaN, TimestampType), null) + checkEvaluation(cast(1.0f / 0.0f, TimestampType), null) + } } test("cast from array") { @@ -1026,8 +1036,11 @@ class CastSuite extends CastSuiteBase { test("cast from int 2") { checkEvaluation(cast(1, LongType), 1.toLong) - checkEvaluation(cast(cast(1000, TimestampType), LongType), 1000.toLong) - checkEvaluation(cast(cast(-1200, TimestampType), LongType), -1200.toLong) + + withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> "true") { + checkEvaluation(cast(cast(1000, TimestampType), LongType), 1000.toLong) + checkEvaluation(cast(cast(-1200, TimestampType), LongType), -1200.toLong) + } checkEvaluation(cast(123, DecimalType.USER_DEFAULT), Decimal(123)) checkEvaluation(cast(123, DecimalType(3, 0)), Decimal(123)) @@ -1310,6 +1323,20 @@ class CastSuite extends CastSuiteBase { checkEvaluation(cast(negativeTs, LongType), expectedSecs) } } + + test("SPARK-31710:fail casting from numeric to timestamp by default") { + Seq(true, false).foreach { enable => + withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> enable.toString) { + assert(cast(2.toByte, TimestampType).resolved == enable) + assert(cast(10.toShort, TimestampType).resolved == enable) + assert(cast(3, TimestampType).resolved == enable) + assert(cast(10L, TimestampType).resolved == enable) + assert(cast(Decimal(1.2), TimestampType).resolved == enable) + assert(cast(1.7f, TimestampType).resolved == enable) + assert(cast(2.3d, TimestampType).resolved == enable) + } + } + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 0cca3e7b47c5..62ad5ea9b593 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3358,6 +3358,15 @@ object functions { window(timeColumn, windowDuration, windowDuration, "0 second") } + /** + * Creates timestamp from the number of seconds since UTC epoch. + * @group = datetime_funcs + * @since = 3.1.0 + */ + def timestamp_seconds(e: Column): Column = withExpr { + SecondsToTimestamp(e.expr) + } + ////////////////////////////////////////////////////////////////////////////////////////////// // Collection functions ////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/test/resources/sql-tests/inputs/udf/udf-window.sql b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-window.sql index bcbf87f8a04c..1659f1c81959 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/udf/udf-window.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/udf/udf-window.sql @@ -1,15 +1,15 @@ --This test file was converted from window.sql. -- Test data. CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES -(null, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"), -(1, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"), -(1, 2L, 2.5D, date("2017-08-02"), timestamp(1502000000), "a"), -(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "a"), -(1, null, 1.0D, date("2017-08-01"), timestamp(1501545600), "b"), -(2, 3L, 3.3D, date("2017-08-03"), timestamp(1503000000), "b"), -(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "b"), +(null, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"), +(1, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"), +(1, 2L, 2.5D, date("2017-08-02"), timestamp_seconds(1502000000), "a"), +(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "a"), +(1, null, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "b"), +(2, 3L, 3.3D, date("2017-08-03"), timestamp_seconds(1503000000), "b"), +(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "b"), (null, null, null, null, null, null), -(3, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), null) +(3, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), null) AS testData(val, val_long, val_double, val_date, val_timestamp, cate); -- RowsBetween diff --git a/sql/core/src/test/resources/sql-tests/inputs/window.sql b/sql/core/src/test/resources/sql-tests/inputs/window.sql index 3d05dfda6c3f..72d812d6a4e4 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/window.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/window.sql @@ -5,15 +5,15 @@ -- Test data. CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES -(null, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"), -(1, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"), -(1, 2L, 2.5D, date("2017-08-02"), timestamp(1502000000), "a"), -(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "a"), -(1, null, 1.0D, date("2017-08-01"), timestamp(1501545600), "b"), -(2, 3L, 3.3D, date("2017-08-03"), timestamp(1503000000), "b"), -(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "b"), +(null, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"), +(1, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"), +(1, 2L, 2.5D, date("2017-08-02"), timestamp_seconds(1502000000), "a"), +(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "a"), +(1, null, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "b"), +(2, 3L, 3.3D, date("2017-08-03"), timestamp_seconds(1503000000), "b"), +(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "b"), (null, null, null, null, null, null), -(3, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), null) +(3, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), null) AS testData(val, val_long, val_double, val_date, val_timestamp, cate); -- RowsBetween diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-window.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-window.sql.out index a915c1bd6c71..a84070535b65 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/udf-window.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-window.sql.out @@ -4,15 +4,15 @@ -- !query CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES -(null, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"), -(1, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"), -(1, 2L, 2.5D, date("2017-08-02"), timestamp(1502000000), "a"), -(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "a"), -(1, null, 1.0D, date("2017-08-01"), timestamp(1501545600), "b"), -(2, 3L, 3.3D, date("2017-08-03"), timestamp(1503000000), "b"), -(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "b"), +(null, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"), +(1, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"), +(1, 2L, 2.5D, date("2017-08-02"), timestamp_seconds(1502000000), "a"), +(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "a"), +(1, null, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "b"), +(2, 3L, 3.3D, date("2017-08-03"), timestamp_seconds(1503000000), "b"), +(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "b"), (null, null, null, null, null, null), -(3, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), null) +(3, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), null) AS testData(val, val_long, val_double, val_date, val_timestamp, cate) -- !query schema struct<> diff --git a/sql/core/src/test/resources/sql-tests/results/window.sql.out b/sql/core/src/test/resources/sql-tests/results/window.sql.out index 625088f90ced..ede044a44fda 100644 --- a/sql/core/src/test/resources/sql-tests/results/window.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/window.sql.out @@ -4,15 +4,15 @@ -- !query CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES -(null, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"), -(1, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"), -(1, 2L, 2.5D, date("2017-08-02"), timestamp(1502000000), "a"), -(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "a"), -(1, null, 1.0D, date("2017-08-01"), timestamp(1501545600), "b"), -(2, 3L, 3.3D, date("2017-08-03"), timestamp(1503000000), "b"), -(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "b"), +(null, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"), +(1, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "a"), +(1, 2L, 2.5D, date("2017-08-02"), timestamp_seconds(1502000000), "a"), +(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "a"), +(1, null, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), "b"), +(2, 3L, 3.3D, date("2017-08-03"), timestamp_seconds(1503000000), "b"), +(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp_seconds(1609372800), "b"), (null, null, null, null, null, null), -(3, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), null) +(3, 1L, 1.0D, date("2017-08-01"), timestamp_seconds(1501545600), null) AS testData(val, val_long, val_double, val_date, val_timestamp, cate) -- !query schema struct<> diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index 5cc9e156db1b..9caa4c037700 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -639,7 +639,7 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { val now = sql("select unix_timestamp()").collect().head.getLong(0) checkAnswer( - sql(s"select cast ($now as timestamp)"), + sql(s"select timestamp_seconds($now)"), Row(new java.util.Date(TimeUnit.SECONDS.toMillis(now)))) } } @@ -716,7 +716,7 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss") checkAnswer(df.select(to_timestamp(col("ss"))), - df.select(unix_timestamp(col("ss")).cast("timestamp"))) + df.select(timestamp_seconds(unix_timestamp(col("ss"))))) checkAnswer(df.select(to_timestamp(col("ss"))), Seq( Row(ts1), Row(ts2))) if (legacyParserPolicy == "legacy") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index 708b98e8fe15..91ec1b5ab293 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils} +import org.apache.spark.sql.functions.timestamp_seconds import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.test.SQLTestData.ArrayData @@ -467,7 +468,7 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared withTable(table) { TimeZone.setDefault(srcTimeZone) spark.range(start, end) - .select('id.cast(TimestampType).cast(t).as(column)) + .select(timestamp_seconds($"id").cast(t).as(column)) .write.saveAsTable(table) sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS $column") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala index c228740df07c..c0f25e3a5053 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.functions.{count, window} +import org.apache.spark.sql.functions.{count, timestamp_seconds, window} import org.apache.spark.sql.streaming.StreamTest class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter { @@ -33,7 +33,7 @@ class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter { test("SPARK-24156: do not plan a no-data batch again after it has already been planned") { val inputData = MemoryStream[Int] val df = inputData.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "10 seconds") .groupBy(window($"eventTime", "5 seconds") as 'window) .agg(count("*") as 'count) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala index 0c17320acade..e87bd11f0dca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala @@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.SparkException import org.apache.spark.sql.ForeachWriter import org.apache.spark.sql.execution.streaming.MemoryStream -import org.apache.spark.sql.functions.{count, window} +import org.apache.spark.sql.functions.{count, timestamp_seconds, window} import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, StreamTest} import org.apache.spark.sql.test.SharedSparkSession @@ -163,7 +163,7 @@ class ForeachWriterSuite extends StreamTest with SharedSparkSession with BeforeA val inputData = MemoryStream[Int] val windowedAggregation = inputData.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "10 seconds") .groupBy(window($"eventTime", "5 seconds") as 'window) .agg(count("*") as 'count) @@ -197,7 +197,7 @@ class ForeachWriterSuite extends StreamTest with SharedSparkSession with BeforeA val inputData = MemoryStream[Int] val windowedAggregation = inputData.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "10 seconds") .groupBy(window($"eventTime", "5 seconds") as 'window) .agg(count("*") as 'count) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 6486e1aee864..ae5d066566ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.UTC import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources.MemorySink -import org.apache.spark.sql.functions.{count, window} +import org.apache.spark.sql.functions.{count, timestamp_seconds, window} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.OutputMode._ import org.apache.spark.util.Utils @@ -129,7 +129,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche // No event time metrics when there is no watermarking val inputData1 = MemoryStream[Int] val aggWithoutWatermark = inputData1.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .groupBy(window($"eventTime", "5 seconds") as 'window) .agg(count("*") as 'count) .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) @@ -146,7 +146,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche // All event time metrics where watermarking is set val inputData2 = MemoryStream[Int] val aggWithWatermark = inputData2.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "10 seconds") .groupBy(window($"eventTime", "5 seconds") as 'window) .agg(count("*") as 'count) @@ -169,7 +169,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche // All event time metrics where watermarking is set val inputData = MemoryStream[Int] val aggWithWatermark = inputData.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "10 seconds") .groupBy(window($"eventTime", "5 seconds") as 'window) .agg(count("*") as 'count) @@ -224,7 +224,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche // All event time metrics where watermarking is set val inputData = MemoryStream[Int] val aggWithWatermark = inputData.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "10 seconds") .groupBy(window($"eventTime", "5 seconds") as 'window) .agg(count("*") as 'count) @@ -286,7 +286,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche val inputData = MemoryStream[Int] val windowedAggregation = inputData.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "10 seconds") .groupBy(window($"eventTime", "5 seconds") as 'window) .agg(count("*") as 'count) @@ -309,7 +309,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, "10") val windowedAggregation = inputData.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "10 seconds") .groupBy(window($"eventTime", "5 seconds") as 'window) .agg(count("*") as 'count) @@ -336,7 +336,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche val input = MemoryStream[Long] val aggWithWatermark = input.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "2 years 5 months") .groupBy(window($"eventTime", "5 seconds") as 'window) .agg(count("*") as 'count) @@ -368,7 +368,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche test("recovery") { val inputData = MemoryStream[Int] val df = inputData.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "10 seconds") .groupBy(window($"eventTime", "5 seconds") as 'window) .agg(count("*") as 'count) @@ -403,14 +403,14 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche val first = MemoryStream[Int] val firstDf = first.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "10 seconds") .select('value) val second = MemoryStream[Int] val secondDf = second.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "5 seconds") .select('value) @@ -480,7 +480,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche val inputData = MemoryStream[Int] val windowedAggregation = inputData.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "10 seconds") .groupBy(window($"eventTime", "5 seconds") as 'window) .agg(count("*") as 'count) @@ -505,7 +505,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche val inputData = MemoryStream[Int] val windowedAggregation = inputData.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "10 seconds") .groupBy($"eventTime") .agg(count("*") as 'count) @@ -544,8 +544,8 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche test("the new watermark should override the old one") { val df = MemoryStream[(Long, Long)].toDF() - .withColumn("first", $"_1".cast("timestamp")) - .withColumn("second", $"_2".cast("timestamp")) + .withColumn("first", timestamp_seconds($"_1")) + .withColumn("second", timestamp_seconds($"_2")) .withWatermark("first", "1 minute") .withWatermark("second", "2 minutes") @@ -557,7 +557,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche test("EventTime watermark should be ignored in batch query.") { val df = testData - .withColumn("eventTime", $"key".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"key")) .withWatermark("eventTime", "1 minute") .select("eventTime") .as[Long] @@ -596,7 +596,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche test("SPARK-27340 Alias on TimeWindow expression cause watermark metadata lost") { val inputData = MemoryStream[Int] val aliasWindow = inputData.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "10 seconds") .select(window($"eventTime", "5 seconds") as 'aliasWindow) // Check the eventTime metadata is kept in the top level alias. @@ -626,7 +626,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche def testWithFlag(flag: Boolean): Unit = withClue(s"with $flagKey = $flag") { val inputData = MemoryStream[Int] val windowedAggregation = inputData.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "10 seconds") .groupBy(window($"eventTime", "5 seconds") as 'window) .agg(count("*") as 'count) @@ -762,10 +762,10 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche input1: MemoryStream[Int], input2: MemoryStream[Int]): Dataset[_] = { val df1 = input1.toDF - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "10 seconds") val df2 = input2.toDF - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "15 seconds") df1.union(df2).select($"eventTime".cast("int")) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 877965100f01..a25451bef62f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -210,7 +210,7 @@ abstract class FileStreamSinkSuite extends StreamTest { val inputData = MemoryStream[Long] val inputDF = inputData.toDF.toDF("time") val outputDf = inputDF - .selectExpr("CAST(time AS timestamp) AS timestamp") + .selectExpr("timestamp_seconds(time) AS timestamp") .withWatermark("timestamp", "10 seconds") .groupBy(window($"timestamp", "5 seconds")) .count() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index b04f8b0d4d17..e2887e78b050 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.execution.RDDScanExec import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.{FlatMapGroupsWithStateExecHelper, MemoryStateStore, StateStore, StateStoreId, StateStoreMetrics, UnsafeRowPair} +import org.apache.spark.sql.functions.timestamp_seconds import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.types.{DataType, IntegerType} @@ -826,7 +827,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { val inputData = MemoryStream[(String, Int)] val result = inputData.toDS - .select($"_1".as("key"), $"_2".cast("timestamp").as("eventTime")) + .select($"_1".as("key"), timestamp_seconds($"_2").as("eventTime")) .withWatermark("eventTime", "10 seconds") .as[(String, Long)] .groupByKey(_._1) @@ -901,7 +902,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { val inputData = MemoryStream[(String, Int)] val result = inputData.toDS - .select($"_1".as("key"), $"_2".cast("timestamp").as("eventTime")) + .select($"_1".as("key"), timestamp_seconds($"_2").as("eventTime")) .withWatermark("eventTime", "10 seconds") .as[(String, Long)] .groupByKey(_._1) @@ -1111,7 +1112,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { val inputData = MemoryStream[(String, Long)] val result = inputData.toDF().toDF("key", "time") - .selectExpr("key", "cast(time as timestamp) as timestamp") + .selectExpr("key", "timestamp_seconds(time) as timestamp") .withWatermark("timestamp", "10 second") .as[(String, Long)] .groupByKey(x => x._1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 85e1b85b84d2..cb69460ca158 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -188,7 +188,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { testWithAllStateVersions("state metrics - append mode") { val inputData = MemoryStream[Int] val aggWithWatermark = inputData.toDF() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "10 seconds") .groupBy(window($"eventTime", "5 seconds") as 'window) .agg(count("*") as 'count) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala index f63778aef5a7..164a00e4377b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala @@ -86,7 +86,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { test("deduplicate with watermark") { val inputData = MemoryStream[Int] val result = inputData.toDS() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "10 seconds") .dropDuplicates() .select($"eventTime".cast("long").as[Long]) @@ -113,7 +113,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { test("deduplicate with aggregate - append mode") { val inputData = MemoryStream[Int] val windowedaggregate = inputData.toDS() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "10 seconds") .dropDuplicates() .withWatermark("eventTime", "10 seconds") @@ -229,7 +229,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { test("SPARK-19841: watermarkPredicate should filter based on keys") { val input = MemoryStream[(Int, Int)] val df = input.toDS.toDF("time", "id") - .withColumn("time", $"time".cast("timestamp")) + .withColumn("time", timestamp_seconds($"time")) .withWatermark("time", "1 second") .dropDuplicates("id", "time") // Change the column positions .select($"id") @@ -248,7 +248,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { test("SPARK-21546: dropDuplicates should ignore watermark when it's not a key") { val input = MemoryStream[(Int, Int)] val df = input.toDS.toDF("id", "time") - .withColumn("time", $"time".cast("timestamp")) + .withColumn("time", timestamp_seconds($"time")) .withWatermark("time", "1 second") .dropDuplicates("id") .select($"id", $"time".cast("long")) @@ -264,7 +264,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { def testWithFlag(flag: Boolean): Unit = withClue(s"with $flagKey = $flag") { val inputData = MemoryStream[Int] val result = inputData.toDS() - .withColumn("eventTime", $"value".cast("timestamp")) + .withColumn("eventTime", timestamp_seconds($"value")) .withWatermark("eventTime", "10 seconds") .dropDuplicates() .select($"eventTime".cast("long").as[Long]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala index 3f218c9cb7fd..fe5f34917113 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala @@ -87,11 +87,12 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with val input2 = MemoryStream[Int] val df1 = input1.toDF - .select('value as "key", 'value.cast("timestamp") as "timestamp", ('value * 2) as "leftValue") + .select('value as "key", timestamp_seconds($"value") as "timestamp", + ('value * 2) as "leftValue") .select('key, window('timestamp, "10 second"), 'leftValue) val df2 = input2.toDF - .select('value as "key", 'value.cast("timestamp") as "timestamp", + .select('value as "key", timestamp_seconds($"value") as "timestamp", ('value * 3) as "rightValue") .select('key, window('timestamp, "10 second"), 'rightValue) @@ -127,12 +128,13 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with val input2 = MemoryStream[Int] val df1 = input1.toDF - .select('value as "key", 'value.cast("timestamp") as "timestamp", ('value * 2) as "leftValue") + .select('value as "key", timestamp_seconds($"value") as "timestamp", + ('value * 2) as "leftValue") .withWatermark("timestamp", "10 seconds") .select('key, window('timestamp, "10 second"), 'leftValue) val df2 = input2.toDF - .select('value as "key", 'value.cast("timestamp") as "timestamp", + .select('value as "key", timestamp_seconds($"value") as "timestamp", ('value * 3) as "rightValue") .select('key, window('timestamp, "10 second"), 'rightValue) @@ -177,11 +179,12 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with val rightInput = MemoryStream[(Int, Int)] val df1 = leftInput.toDF.toDF("leftKey", "time") - .select('leftKey, 'time.cast("timestamp") as "leftTime", ('leftKey * 2) as "leftValue") + .select('leftKey, timestamp_seconds($"time") as "leftTime", ('leftKey * 2) as "leftValue") .withWatermark("leftTime", "10 seconds") val df2 = rightInput.toDF.toDF("rightKey", "time") - .select('rightKey, 'time.cast("timestamp") as "rightTime", ('rightKey * 3) as "rightValue") + .select('rightKey, timestamp_seconds($"time") as "rightTime", + ('rightKey * 3) as "rightValue") .withWatermark("rightTime", "10 seconds") val joined = @@ -235,11 +238,12 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with val rightInput = MemoryStream[(Int, Int)] val df1 = leftInput.toDF.toDF("leftKey", "time") - .select('leftKey, 'time.cast("timestamp") as "leftTime", ('leftKey * 2) as "leftValue") + .select('leftKey, timestamp_seconds($"time") as "leftTime", ('leftKey * 2) as "leftValue") .withWatermark("leftTime", "20 seconds") val df2 = rightInput.toDF.toDF("rightKey", "time") - .select('rightKey, 'time.cast("timestamp") as "rightTime", ('rightKey * 3) as "rightValue") + .select('rightKey, timestamp_seconds($"time") as "rightTime", + ('rightKey * 3) as "rightValue") .withWatermark("rightTime", "30 seconds") val condition = expr( @@ -425,7 +429,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with test("SPARK-26187 restore the stream-stream inner join query from Spark 2.4") { val inputStream = MemoryStream[(Int, Long)] val df = inputStream.toDS() - .select(col("_1").as("value"), col("_2").cast("timestamp").as("timestamp")) + .select(col("_1").as("value"), timestamp_seconds($"_2").as("timestamp")) val leftStream = df.select(col("value").as("leftId"), col("timestamp").as("leftTime")) @@ -500,7 +504,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with val df = input.toDF .select( 'value as "key", - 'value.cast("timestamp") as s"${prefix}Time", + timestamp_seconds($"value") as s"${prefix}Time", ('value * multiplier) as s"${prefix}Value") .withWatermark(s"${prefix}Time", "10 seconds") @@ -682,11 +686,12 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with val rightInput = MemoryStream[(Int, Int)] val df1 = leftInput.toDF.toDF("leftKey", "time") - .select('leftKey, 'time.cast("timestamp") as "leftTime", ('leftKey * 2) as "leftValue") + .select('leftKey, timestamp_seconds($"time") as "leftTime", ('leftKey * 2) as "leftValue") .withWatermark("leftTime", "10 seconds") val df2 = rightInput.toDF.toDF("rightKey", "time") - .select('rightKey, 'time.cast("timestamp") as "rightTime", ('rightKey * 3) as "rightValue") + .select('rightKey, timestamp_seconds($"time") as "rightTime", + ('rightKey * 3) as "rightValue") .withWatermark("rightTime", "10 seconds") val joined = @@ -777,7 +782,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with val inputStream = MemoryStream[(Int, Long)] val df = inputStream.toDS() - .select(col("_1").as("value"), col("_2").cast("timestamp").as("timestamp")) + .select(col("_1").as("value"), timestamp_seconds($"_2").as("timestamp")) val leftStream = df.select(col("value").as("leftId"), col("timestamp").as("leftTime")) @@ -840,7 +845,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with val inputStream = MemoryStream[(Int, Long)] val df = inputStream.toDS() - .select(col("_1").as("value"), col("_2").cast("timestamp").as("timestamp")) + .select(col("_1").as("value"), timestamp_seconds($"_2").as("timestamp")) // we're just flipping "left" and "right" from left outer join and apply right outer join @@ -883,7 +888,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with test("SPARK-26187 restore the stream-stream outer join query from Spark 2.4") { val inputStream = MemoryStream[(Int, Long)] val df = inputStream.toDS() - .select(col("_1").as("value"), col("_2").cast("timestamp").as("timestamp")) + .select(col("_1").as("value"), timestamp_seconds($"_2").as("timestamp")) val leftStream = df.select(col("value").as("leftId"), col("timestamp").as("leftTime")) diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index db1f6fbd97d9..82af7dceb27f 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -39,6 +39,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled private val originalSessionLocalTimeZone = TestHive.conf.sessionLocalTimeZone + private val originalLegacyAllowCastNumericToTimestamp = + TestHive.conf.legacyAllowCastNumericToTimestamp def testCases: Seq[(String, File)] = { hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f) @@ -58,6 +60,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // Fix session local timezone to America/Los_Angeles for those timezone sensitive tests // (timestamp_*) TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, "America/Los_Angeles") + // Ensures that cast numeric to timestamp enabled so that we can test them + TestHive.setConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP, true) RuleExecutor.resetMetrics() } @@ -68,6 +72,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, originalCrossJoinEnabled) TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, originalSessionLocalTimeZone) + TestHive.setConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP, + originalLegacyAllowCastNumericToTimestamp) // For debugging dump some statistics about how much time was spent in various optimizer rules logWarning(RuleExecutor.dumpTimeSpent()) diff --git a/sql/hive/src/test/resources/golden/constant null testing-0-237a6af90a857da1efcbe98f6bbbf9d6 b/sql/hive/src/test/resources/golden/constant null testing-0-6a01a94ef1b0d29152c88cd3083fd70b similarity index 100% rename from sql/hive/src/test/resources/golden/constant null testing-0-237a6af90a857da1efcbe98f6bbbf9d6 rename to sql/hive/src/test/resources/golden/constant null testing-0-6a01a94ef1b0d29152c88cd3083fd70b diff --git a/sql/hive/src/test/resources/golden/timestamp cast #3-0-732ed232ac592c5e7f7c913a88874fd2 b/sql/hive/src/test/resources/golden/timestamp cast #3-0-732ed232ac592c5e7f7c913a88874fd2 deleted file mode 100644 index 5625e59da887..000000000000 --- a/sql/hive/src/test/resources/golden/timestamp cast #3-0-732ed232ac592c5e7f7c913a88874fd2 +++ /dev/null @@ -1 +0,0 @@ -1.2 diff --git a/sql/hive/src/test/resources/golden/timestamp cast #4-0-6d2da5cfada03605834e38bc4075bc79 b/sql/hive/src/test/resources/golden/timestamp cast #4-0-6d2da5cfada03605834e38bc4075bc79 deleted file mode 100644 index 1d94c8a014fb..000000000000 --- a/sql/hive/src/test/resources/golden/timestamp cast #4-0-6d2da5cfada03605834e38bc4075bc79 +++ /dev/null @@ -1 +0,0 @@ --1.2 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index b10a8cb8bf2b..2b42444ceeaa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -201,14 +201,17 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd |IF(TRUE, CAST(NULL AS BINARY), CAST("1" AS BINARY)) AS COL18, |IF(FALSE, CAST(NULL AS DATE), CAST("1970-01-01" AS DATE)) AS COL19, |IF(TRUE, CAST(NULL AS DATE), CAST("1970-01-01" AS DATE)) AS COL20, - |IF(TRUE, CAST(NULL AS TIMESTAMP), CAST(1 AS TIMESTAMP)) AS COL21, + |IF(TRUE, CAST(NULL AS TIMESTAMP), CAST('1969-12-31 16:00:01' AS TIMESTAMP)) AS COL21, |IF(FALSE, CAST(NULL AS DECIMAL), CAST(1 AS DECIMAL)) AS COL22, |IF(TRUE, CAST(NULL AS DECIMAL), CAST(1 AS DECIMAL)) AS COL23 |FROM src LIMIT 1""".stripMargin) test("constant null testing timestamp") { - val r1 = sql("SELECT IF(FALSE, CAST(NULL AS TIMESTAMP), CAST(1 AS TIMESTAMP)) AS COL20") - .collect().head + var r1 = sql( + """ + |SELECT IF(FALSE, CAST(NULL AS TIMESTAMP), + |CAST('1969-12-31 16:00:01' AS TIMESTAMP)) AS COL20 + """.stripMargin).collect().head assert(new Timestamp(1000) == r1.getTimestamp(0)) } @@ -552,28 +555,22 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd // Jdk version leads to different query output for double, so not use createQueryTest here test("timestamp cast #1") { - val res = sql("SELECT CAST(CAST(1 AS TIMESTAMP) AS DOUBLE) FROM src LIMIT 1").collect().head + val res = sql("SELECT CAST(TIMESTAMP_SECONDS(1) AS DOUBLE) FROM src LIMIT 1").collect().head assert(1 == res.getDouble(0)) } test("timestamp cast #2") { - val res = sql("SELECT CAST(CAST(-1 AS TIMESTAMP) AS DOUBLE) FROM src LIMIT 1").collect().head + val res = sql("SELECT CAST(TIMESTAMP_SECONDS(-1) AS DOUBLE) FROM src LIMIT 1").collect().head assert(-1 == res.get(0)) } - createQueryTest("timestamp cast #3", - "SELECT CAST(CAST(1.2 AS TIMESTAMP) AS DOUBLE) FROM src LIMIT 1") - - createQueryTest("timestamp cast #4", - "SELECT CAST(CAST(-1.2 AS TIMESTAMP) AS DOUBLE) FROM src LIMIT 1") - - test("timestamp cast #5") { - val res = sql("SELECT CAST(CAST(1200 AS TIMESTAMP) AS INT) FROM src LIMIT 1").collect().head + test("timestamp cast #3") { + val res = sql("SELECT CAST(TIMESTAMP_SECONDS(1200) AS INT) FROM src LIMIT 1").collect().head assert(1200 == res.getInt(0)) } - test("timestamp cast #6") { - val res = sql("SELECT CAST(CAST(-1200 AS TIMESTAMP) AS INT) FROM src LIMIT 1").collect().head + test("timestamp cast #4") { + val res = sql("SELECT CAST(TIMESTAMP_SECONDS(-1200) AS INT) FROM src LIMIT 1").collect().head assert(-1200 == res.getInt(0)) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 7bca2af37993..057f2f4ce01b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -434,8 +434,8 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { withTempView("tab1") { Seq(Tuple1(1451400761)).toDF("test_date").createOrReplaceTempView("tab1") sql(s"CREATE TEMPORARY FUNCTION testUDFToDate AS '${classOf[GenericUDFToDate].getName}'") - val count = sql("select testUDFToDate(cast(test_date as timestamp))" + - " from tab1 group by testUDFToDate(cast(test_date as timestamp))").count() + val count = sql("select testUDFToDate(timestamp_seconds(test_date))" + + " from tab1 group by testUDFToDate(timestamp_seconds(test_date))").count() sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToDate") assert(count == 1) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index d12eae0e410b..2fe6a59a27c1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1172,7 +1172,7 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi test("SPARK-6785: HiveQuerySuite - Date comparison test 2") { checkAnswer( - sql("SELECT CAST(CAST(0 AS timestamp) AS date) > CAST(0 AS timestamp) FROM src LIMIT 1"), + sql("SELECT CAST(timestamp_seconds(0) AS date) > timestamp_seconds(0) FROM src LIMIT 1"), Row(false)) } @@ -1182,10 +1182,10 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi sql( """ | SELECT - | CAST(CAST(0 AS timestamp) AS date), - | CAST(CAST(CAST(0 AS timestamp) AS date) AS string), - | CAST(0 AS timestamp), - | CAST(CAST(0 AS timestamp) AS string), + | CAST(timestamp_seconds(0) AS date), + | CAST(CAST(timestamp_seconds(0) AS date) AS string), + | timestamp_seconds(0), + | CAST(timestamp_seconds(0) AS string), | CAST(CAST(CAST('1970-01-01 23:00:00' AS timestamp) AS date) AS timestamp) | FROM src LIMIT 1 """.stripMargin),