From cb390e5a98d68052a310b903caef1f1e757165a1 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 18 Feb 2020 02:15:44 +0800 Subject: [PATCH 1/3] [SPARK-30808][SQL] Enable Java 8 time API in Thrift server ### What changes were proposed in this pull request? - Set `spark.sql.datetime.java8API.enabled` to `true` in `hiveResultString()`, and restore it back at the end of the call. - Convert collected `java.time.Instant` & `java.time.LocalDate` to `java.sql.Timestamp` and `java.sql.Date` for correct formatting. ### Why are the changes needed? Because of textual representation of timestamps/dates before 1582 year is incorrect: ```shell $ export TZ="America/Los_Angeles" $ ./bin/spark-sql -S ``` ```sql spark-sql> set spark.sql.session.timeZone=America/Los_Angeles; spark.sql.session.timeZone America/Los_Angeles spark-sql> SELECT DATE_TRUNC('MILLENNIUM', DATE '1970-03-20'); 1001-01-01 00:07:02 ``` It must be 1001-01-01 00:**00:00**. ### Does this PR introduce any user-facing change? Yes. After the changes: ```shell $ export TZ="America/Los_Angeles" $ ./bin/spark-sql -S ``` ```sql spark-sql> set spark.sql.session.timeZone=America/Los_Angeles; spark.sql.session.timeZone America/Los_Angeles spark-sql> SELECT DATE_TRUNC('MILLENNIUM', DATE '1970-03-20'); 1001-01-01 00:00:00 ``` ### How was this patch tested? By running hive-thiftserver tests. In particular: ``` ./build/sbt -Phadoop-2.7 -Phive-2.3 -Phive-thriftserver "hive-thriftserver/test:testOnly *SparkThriftServerProtocolVersionsSuite" ``` Closes #27552 from MaxGekk/hive-thriftserver-java8-time-api. Authored-by: Maxim Gekk Signed-off-by: Wenchen Fan # Conflicts: # sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala # sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- .../spark/sql/execution/HiveResult.scala | 65 ++++++++++++------- .../apache/spark/sql/SQLQueryTestSuite.scala | 2 +- .../spark/sql/execution/HiveResultSuite.scala | 31 ++++----- .../sql/expressions/ExpressionInfoSuite.scala | 7 +- .../hive/thriftserver/SparkSQLDriver.scala | 5 +- .../hive/execution/HiveComparisonTest.scala | 4 +- 6 files changed, 63 insertions(+), 51 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala index 73484a212c169..b906170db015e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala @@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import java.time.{Instant, LocalDate} -import org.apache.spark.sql.Row +import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.execution.command.{DescribeCommandBase, ExecutedCommandExec, ShowTablesCommand, ShowViewsCommand} import org.apache.spark.sql.execution.datasources.v2.{DescribeTableExec, ShowTablesExec} @@ -37,30 +37,45 @@ object HiveResult { * Returns the result as a hive compatible sequence of strings. This is used in tests and * `SparkSQLDriver` for CLI applications. */ - def hiveResultString(executedPlan: SparkPlan): Seq[String] = executedPlan match { - case ExecutedCommandExec(_: DescribeCommandBase) => - formatDescribeTableOutput(executedPlan.executeCollectPublic()) - case _: DescribeTableExec => - formatDescribeTableOutput(executedPlan.executeCollectPublic()) - // SHOW TABLES in Hive only output table names while our v1 command outputs - // database, table name, isTemp. - case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended => - command.executeCollect().map(_.getString(1)) - // SHOW TABLES in Hive only output table names while our v2 command outputs - // namespace and table name. - case command : ShowTablesExec => - command.executeCollect().map(_.getString(1)) - // SHOW VIEWS in Hive only outputs view names while our v1 command outputs - // namespace, viewName, and isTemporary. - case command @ ExecutedCommandExec(_: ShowViewsCommand) => - command.executeCollect().map(_.getString(1)) - case other => - val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq - // We need the types so we can output struct field names - val types = executedPlan.output.map(_.dataType) - // Reformat to match hive tab delimited output. - result.map(_.zip(types).map(e => toHiveString(e))) - .map(_.mkString("\t")) + def hiveResultString(ds: Dataset[_]): Seq[String] = { + val executedPlan = ds.queryExecution.executedPlan + executedPlan match { + case ExecutedCommandExec(_: DescribeCommandBase) => + formatDescribeTableOutput(executedPlan.executeCollectPublic()) + case _: DescribeTableExec => + formatDescribeTableOutput(executedPlan.executeCollectPublic()) + // SHOW TABLES in Hive only output table names while our v1 command outputs + // database, table name, isTemp. + case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended => + command.executeCollect().map(_.getString(1)) + // SHOW TABLES in Hive only output table names while our v2 command outputs + // namespace and table name. + case command : ShowTablesExec => + command.executeCollect().map(_.getString(1)) + // SHOW VIEWS in Hive only outputs view names while our v1 command outputs + // namespace, viewName, and isTemporary. + case command @ ExecutedCommandExec(_: ShowViewsCommand) => + command.executeCollect().map(_.getString(1)) + case _ => + val sessionWithJava8DatetimeEnabled = { + val cloned = ds.sparkSession.cloneSession() + cloned.conf.set(SQLConf.DATETIME_JAVA8API_ENABLED.key, true) + cloned + } + sessionWithJava8DatetimeEnabled.withActive { + // We cannot collect the original dataset because its encoders could be created + // with disabled Java 8 date-time API. + val result: Seq[Seq[Any]] = Dataset.ofRows(ds.sparkSession, ds.logicalPlan) + .queryExecution + .executedPlan + .executeCollectPublic().map(_.toSeq).toSeq + // We need the types so we can output struct field names + val types = executedPlan.output.map(_.dataType) + // Reformat to match hive tab delimited output. + result.map(_.zip(types).map(e => toHiveString(e))) + .map(_.mkString("\t")) + } + } } private def formatDescribeTableOutput(rows: Array[Row]): Seq[String] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 92da58c27a141..f25aa1d9594ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -515,7 +515,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession { val schema = df.schema.catalogString // Get answer, but also get rid of the #1234 expression ids that show up in explain plans val answer = SQLExecution.withNewExecutionId(df.queryExecution, Some(sql)) { - hiveResultString(df.queryExecution.executedPlan).map(replaceNotIncludedMsg) + hiveResultString(df).map(replaceNotIncludedMsg) } // If the output is not pre-sorted, sort it. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala index 5e81c74420fd0..8bf245a331037 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala @@ -26,11 +26,10 @@ class HiveResultSuite extends SharedSparkSession { test("date formatting in hive result") { val dates = Seq("2018-12-28", "1582-10-03", "1582-10-04", "1582-10-15") val df = dates.toDF("a").selectExpr("cast(a as date) as b") - val executedPlan1 = df.queryExecution.executedPlan - val result = HiveResult.hiveResultString(executedPlan1) + val result = HiveResult.hiveResultString(df) assert(result == dates) - val executedPlan2 = df.selectExpr("array(b)").queryExecution.executedPlan - val result2 = HiveResult.hiveResultString(executedPlan2) + val df2 = df.selectExpr("array(b)") + val result2 = HiveResult.hiveResultString(df2) assert(result2 == dates.map(x => s"[$x]")) } @@ -41,11 +40,10 @@ class HiveResultSuite extends SharedSparkSession { "1582-10-04 01:02:03", "1582-10-15 01:02:03") val df = timestamps.toDF("a").selectExpr("cast(a as timestamp) as b") - val executedPlan1 = df.queryExecution.executedPlan - val result = HiveResult.hiveResultString(executedPlan1) + val result = HiveResult.hiveResultString(df) assert(result == timestamps) - val executedPlan2 = df.selectExpr("array(b)").queryExecution.executedPlan - val result2 = HiveResult.hiveResultString(executedPlan2) + val df2 = df.selectExpr("array(b)") + val result2 = HiveResult.hiveResultString(df2) assert(result2 == timestamps.map(x => s"[$x]")) } @@ -58,15 +56,14 @@ class HiveResultSuite extends SharedSparkSession { test("decimal formatting in hive result") { val df = Seq(new java.math.BigDecimal("1")).toDS() Seq(2, 6, 18).foreach { scala => - val executedPlan = - df.selectExpr(s"CAST(value AS decimal(38, $scala))").queryExecution.executedPlan - val result = HiveResult.hiveResultString(executedPlan) + val decimalDf = df.selectExpr(s"CAST(value AS decimal(38, $scala))") + val result = HiveResult.hiveResultString(decimalDf) assert(result.head.split("\\.").last.length === scala) } - val executedPlan = Seq(java.math.BigDecimal.ZERO).toDS() - .selectExpr(s"CAST(value AS decimal(38, 8))").queryExecution.executedPlan - val result = HiveResult.hiveResultString(executedPlan) + val df2 = Seq(java.math.BigDecimal.ZERO).toDS() + .selectExpr(s"CAST(value AS decimal(38, 8))") + val result = HiveResult.hiveResultString(df2) assert(result.head === "0.00000000") } @@ -77,8 +74,7 @@ class HiveResultSuite extends SharedSparkSession { withTable(s"$ns.$tbl") { spark.sql(s"CREATE TABLE $ns.$tbl (id bigint) USING $source") val df = spark.sql(s"SHOW TABLES FROM $ns") - val executedPlan = df.queryExecution.executedPlan - assert(HiveResult.hiveResultString(executedPlan).head == tbl) + assert(HiveResult.hiveResultString(df).head == tbl) } } } @@ -91,11 +87,10 @@ class HiveResultSuite extends SharedSparkSession { withTable(s"$ns.$tbl") { spark.sql(s"CREATE TABLE $ns.$tbl (id bigint COMMENT 'col1') USING $source") val df = spark.sql(s"DESCRIBE $ns.$tbl") - val executedPlan = df.queryExecution.executedPlan val expected = "id " + "\tbigint " + "\tcol1 " - assert(HiveResult.hiveResultString(executedPlan).head == expected) + assert(HiveResult.hiveResultString(df).head == expected) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala index 53f9757750735..c3a1e3109d0d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala @@ -144,15 +144,14 @@ class ExpressionInfoSuite extends SparkFunSuite with SharedSparkSession { withClue(s"Function '${info.getName}', Expression class '$className'") { val example = info.getExamples checkExampleSyntax(example) - example.split(" > ").toList.foreach { + example.split(" > ").toList.foreach(_ match { case exampleRe(sql, output) => val df = clonedSpark.sql(sql) - val actual = unindentAndTrim( - hiveResultString(df.queryExecution.executedPlan).mkString("\n")) + val actual = unindentAndTrim(hiveResultString(df).mkString("\n")) val expected = unindentAndTrim(output) assert(actual === expected) case _ => - } + }) } } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index 12fba0eae6dce..64e91f405d613 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -60,9 +60,10 @@ private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlCont // TODO unify the error code try { context.sparkContext.setJobDescription(command) - val execution = context.sessionState.executePlan(context.sql(command).logicalPlan) + val df = context.sql(command) + val execution = df.queryExecution hiveResponse = SQLExecution.withNewExecutionId(execution) { - hiveResultString(execution.executedPlan) + hiveResultString(df) } tableSchema = getResultSetSchema(execution) new CommandProcessorResponse(0) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 2e4c01830432f..a30fa576fc92d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -345,7 +345,9 @@ abstract class HiveComparisonTest extends SparkFunSuite with BeforeAndAfterAll { val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) => val query = new TestHiveQueryExecution(queryString.replace("../../data", testDataPath)) def getResult(): Seq[String] = { - SQLExecution.withNewExecutionId(query)(hiveResultString(query.executedPlan)) + SQLExecution.withNewExecutionId(query) { + hiveResultString(Dataset.ofRows(query.sparkSession, query.logical)) + } } try { (query, prepareAnswer(query, getResult())) } catch { case e: Throwable => From 4ea85add56af405fa99e64dcc321c7cc2087660b Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 2 Jun 2020 12:10:42 +0300 Subject: [PATCH 2/3] Re-gen date.sql.out --- .../test/resources/sql-tests/results/postgreSQL/date.sql.out | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/date.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/date.sql.out index 151fa1e28d725..1d862ba8a41a8 100755 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/date.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/date.sql.out @@ -584,7 +584,7 @@ select make_date(-44, 3, 15) -- !query schema struct -- !query output -0045-03-15 +-0044-03-15 -- !query From ee69a1b630ddba4cb5120a742d52b2956bc9a3f0 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 2 Jun 2020 20:32:37 +0300 Subject: [PATCH 3/3] Test with different jvm and session zone ids --- .../spark/sql/execution/HiveResultSuite.scala | 55 +++++++++++++------ 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala index 8bf245a331037..a040b22d22786 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala @@ -17,34 +17,53 @@ package org.apache.spark.sql.execution +import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ import org.apache.spark.sql.connector.InMemoryTableCatalog +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSparkSession} class HiveResultSuite extends SharedSparkSession { import testImplicits._ + private def withOutstandingZoneIds(f: => Unit): Unit = { + for { + jvmZoneId <- outstandingZoneIds + sessionZoneId <- outstandingZoneIds + } { + withDefaultTimeZone(jvmZoneId) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionZoneId.getId) { + f + } + } + } + } + test("date formatting in hive result") { - val dates = Seq("2018-12-28", "1582-10-03", "1582-10-04", "1582-10-15") - val df = dates.toDF("a").selectExpr("cast(a as date) as b") - val result = HiveResult.hiveResultString(df) - assert(result == dates) - val df2 = df.selectExpr("array(b)") - val result2 = HiveResult.hiveResultString(df2) - assert(result2 == dates.map(x => s"[$x]")) + withOutstandingZoneIds { + val dates = Seq("2018-12-28", "1582-10-03", "1582-10-04", "1582-10-15") + val df = dates.toDF("a").selectExpr("cast(a as date) as b") + val result = HiveResult.hiveResultString(df) + assert(result == dates) + val df2 = df.selectExpr("array(b)") + val result2 = HiveResult.hiveResultString(df2) + assert(result2 == dates.map(x => s"[$x]")) + } } test("timestamp formatting in hive result") { - val timestamps = Seq( - "2018-12-28 01:02:03", - "1582-10-03 01:02:03", - "1582-10-04 01:02:03", - "1582-10-15 01:02:03") - val df = timestamps.toDF("a").selectExpr("cast(a as timestamp) as b") - val result = HiveResult.hiveResultString(df) - assert(result == timestamps) - val df2 = df.selectExpr("array(b)") - val result2 = HiveResult.hiveResultString(df2) - assert(result2 == timestamps.map(x => s"[$x]")) + withOutstandingZoneIds { + val timestamps = Seq( + "2018-12-28 01:02:03", + "1582-10-03 01:02:03", + "1582-10-04 01:02:03", + "1582-10-15 01:02:03") + val df = timestamps.toDF("a").selectExpr("cast(a as timestamp) as b") + val result = HiveResult.hiveResultString(df) + assert(result == timestamps) + val df2 = df.selectExpr("array(b)") + val result2 = HiveResult.hiveResultString(df2) + assert(result2 == timestamps.map(x => s"[$x]")) + } } test("toHiveString correctly handles UDTs") {