From 916838a3d43aeac59cdc799fed0de8d279b0ad66 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 12 Feb 2020 23:07:27 +0300 Subject: [PATCH 01/18] Enable Java 8 time API in thrift server --- .../org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 8944b93d9b697..233e6224a10d9 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -24,6 +24,7 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Utils /** A singleton object for the master program. The slaves should not access this. */ @@ -45,6 +46,8 @@ private[hive] object SparkSQLEnv extends Logging { sparkConf .setAppName(maybeAppName.getOrElse(s"SparkSQL::${Utils.localHostName()}")) + .set(SQLConf.DATETIME_JAVA8API_ENABLED, true) + val sparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport().getOrCreate() sparkContext = sparkSession.sparkContext From d98cdbcb85ba7df6d24429081da97f5619cfe54b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 13 Feb 2020 11:56:48 +0300 Subject: [PATCH 02/18] Convert Instant/LocalDate to Timestamp/Date for correct formatting --- .../thriftserver/SparkExecuteStatementOperation.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 76d07848f79a9..5d659c5670e85 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver import java.security.PrivilegedExceptionAction import java.sql.{Date, Timestamp} +import java.time.{Instant, LocalDate} import java.util.{Arrays, Map => JMap, UUID} import java.util.concurrent.RejectedExecutionException @@ -178,7 +179,12 @@ private[hive] class SparkExecuteStatementOperation( } curCol += 1 } - resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]]) + val resultRow = row.map { + case i: Instant => Timestamp.from(i).toString + case ld: LocalDate => Date.valueOf(ld) + case other => other + }.toArray.asInstanceOf[Array[Object]] + resultRowSet.addRow(resultRow) curRow += 1 resultOffset += 1 } From 580dd09ca9e3b8df54fcf31806913e581e4da720 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 13 Feb 2020 12:59:37 +0300 Subject: [PATCH 03/18] Big fix --- .../sql/hive/thriftserver/SparkExecuteStatementOperation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 5d659c5670e85..82230a5799548 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -180,7 +180,7 @@ private[hive] class SparkExecuteStatementOperation( curCol += 1 } val resultRow = row.map { - case i: Instant => Timestamp.from(i).toString + case i: Instant => Timestamp.from(i) case ld: LocalDate => Date.valueOf(ld) case other => other }.toArray.asInstanceOf[Array[Object]] From deaef5837b0d7bbbb1eb0f6b6e90aef4b1eefc7d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 16 Feb 2020 21:31:10 +0300 Subject: [PATCH 04/18] Set DATETIME_JAVA8API_ENABLED in HiveResult --- .../spark/sql/execution/HiveResult.scala | 58 ++++++++++++------- .../org/apache/spark/sql/SQLQuerySuite.scala | 2 +- .../apache/spark/sql/SQLQueryTestSuite.scala | 3 +- .../spark/sql/execution/HiveResultSuite.scala | 25 ++++---- .../hive/thriftserver/SparkSQLDriver.scala | 5 +- .../sql/hive/thriftserver/SparkSQLEnv.scala | 2 - .../hive/execution/HiveComparisonTest.scala | 2 +- .../apache/spark/sql/hive/test/TestHive.scala | 10 ++-- 8 files changed, 59 insertions(+), 48 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala index 5a2f16d8e1526..810fd009b5584 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala @@ -21,9 +21,10 @@ import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import java.time.{Instant, LocalDate} -import org.apache.spark.sql.Row +import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.execution.command.{DescribeCommandBase, ExecutedCommandExec, ShowTablesCommand} +import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval @@ -36,27 +37,40 @@ object HiveResult { * Returns the result as a hive compatible sequence of strings. This is used in tests and * `SparkSQLDriver` for CLI applications. */ - def hiveResultString(executedPlan: SparkPlan): Seq[String] = executedPlan match { - case ExecutedCommandExec(_: DescribeCommandBase) => - // If it is a describe command for a Hive table, we want to have the output format - // be similar with Hive. - executedPlan.executeCollectPublic().map { - case Row(name: String, dataType: String, comment) => - Seq(name, dataType, - Option(comment.asInstanceOf[String]).getOrElse("")) - .map(s => String.format(s"%-20s", s)) - .mkString("\t") - } - // SHOW TABLES in Hive only output table names, while ours output database, table name, isTemp. - case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended => - command.executeCollect().map(_.getString(1)) - case other => - val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq - // We need the types so we can output struct field names - val types = executedPlan.output.map(_.dataType) - // Reformat to match hive tab delimited output. - result.map(_.zip(types).map(e => toHiveString(e))) - .map(_.mkString("\t")) + def hiveResultString[T](ds: Dataset[T]): Seq[String] = { + val executedPlan = ds.queryExecution.executedPlan + executedPlan match { + case ExecutedCommandExec(_: DescribeCommandBase) => + // If it is a describe command for a Hive table, we want to have the output format + // be similar with Hive. + executedPlan.executeCollectPublic().map { + case Row(name: String, dataType: String, comment) => + Seq(name, dataType, + Option(comment.asInstanceOf[String]).getOrElse("")) + .map(s => String.format(s"%-20s", s)) + .mkString("\t") + } + // SHOW TABLES in Hive only output table names, + // while ours output database, table name, isTemp. + case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended => + command.executeCollect().map(_.getString(1)) + case _ => + val java8TimeConf = SQLConf.get.getConf(SQLConf.DATETIME_JAVA8API_ENABLED) + try { + SQLConf.get.setConf(SQLConf.DATETIME_JAVA8API_ENABLED, true) + val result: Seq[Seq[Any]] = ds.filter(lit(true)) + .queryExecution + .executedPlan + .executeCollectPublic().map(_.toSeq).toSeq + // We need the types so we can output struct field names + val types = executedPlan.output.map(_.dataType) + // Reformat to match hive tab delimited output. + result.map(_.zip(types).map(e => toHiveString(e))) + .map(_.mkString("\t")) + } finally { + SQLConf.get.setConf(SQLConf.DATETIME_JAVA8API_ENABLED, java8TimeConf) + } + } } private lazy val zoneId = DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 11f9724e587f2..6ea5e757a21f4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -190,7 +190,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark case exampleRe(sql, output) => val df = clonedSpark.sql(sql) val actual = unindentAndTrim( - hiveResultString(df.queryExecution.executedPlan).mkString("\n")) + hiveResultString(df).mkString("\n")) val expected = unindentAndTrim(output) assert(actual === expected) case _ => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 6b9e5bbd3c961..017bfc40da645 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -337,7 +337,6 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession { localSparkSession.conf.set(SQLConf.ANSI_ENABLED.key, true) case _ => } - localSparkSession.conf.set(SQLConf.DATETIME_JAVA8API_ENABLED.key, true) if (configSet.nonEmpty) { // Execute the list of set operation in order to add the desired configs @@ -471,7 +470,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession { val schema = df.schema.catalogString // Get answer, but also get rid of the #1234 expression ids that show up in explain plans val answer = SQLExecution.withNewExecutionId(session, df.queryExecution, Some(sql)) { - hiveResultString(df.queryExecution.executedPlan).map(replaceNotIncludedMsg) + hiveResultString(df).map(replaceNotIncludedMsg) } // If the output is not pre-sorted, sort it. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala index bb59b12e6f350..bddd15c6e25d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala @@ -25,11 +25,10 @@ class HiveResultSuite extends SharedSparkSession { test("date formatting in hive result") { val dates = Seq("2018-12-28", "1582-10-13", "1582-10-14", "1582-10-15") val df = dates.toDF("a").selectExpr("cast(a as date) as b") - val executedPlan1 = df.queryExecution.executedPlan - val result = HiveResult.hiveResultString(executedPlan1) + val result = HiveResult.hiveResultString(df) assert(result == dates) - val executedPlan2 = df.selectExpr("array(b)").queryExecution.executedPlan - val result2 = HiveResult.hiveResultString(executedPlan2) + val df2 = df.selectExpr("array(b)") + val result2 = HiveResult.hiveResultString(df2) assert(result2 == dates.map(x => s"[$x]")) } @@ -40,11 +39,10 @@ class HiveResultSuite extends SharedSparkSession { "1582-10-14 01:02:03", "1582-10-15 01:02:03") val df = timestamps.toDF("a").selectExpr("cast(a as timestamp) as b") - val executedPlan1 = df.queryExecution.executedPlan - val result = HiveResult.hiveResultString(executedPlan1) + val result = HiveResult.hiveResultString(df) assert(result == timestamps) - val executedPlan2 = df.selectExpr("array(b)").queryExecution.executedPlan - val result2 = HiveResult.hiveResultString(executedPlan2) + val df2 = df.selectExpr("array(b)") + val result2 = HiveResult.hiveResultString(df2) assert(result2 == timestamps.map(x => s"[$x]")) } @@ -57,15 +55,14 @@ class HiveResultSuite extends SharedSparkSession { test("decimal formatting in hive result") { val df = Seq(new java.math.BigDecimal("1")).toDS() Seq(2, 6, 18).foreach { scala => - val executedPlan = - df.selectExpr(s"CAST(value AS decimal(38, $scala))").queryExecution.executedPlan - val result = HiveResult.hiveResultString(executedPlan) + val decimalDf = df.selectExpr(s"CAST(value AS decimal(38, $scala))") + val result = HiveResult.hiveResultString(decimalDf) assert(result.head.split("\\.").last.length === scala) } - val executedPlan = Seq(java.math.BigDecimal.ZERO).toDS() - .selectExpr(s"CAST(value AS decimal(38, 8))").queryExecution.executedPlan - val result = HiveResult.hiveResultString(executedPlan) + val df2 = Seq(java.math.BigDecimal.ZERO).toDS() + .selectExpr(s"CAST(value AS decimal(38, 8))") + val result = HiveResult.hiveResultString(df2) assert(result.head === "0.00000000") } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index 362ac362e9718..d2077801a397f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -60,9 +60,10 @@ private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlCont // TODO unify the error code try { context.sparkContext.setJobDescription(command) - val execution = context.sessionState.executePlan(context.sql(command).logicalPlan) + val df = context.sql(command) + val execution = context.sessionState.executePlan(df.logicalPlan) hiveResponse = SQLExecution.withNewExecutionId(context.sparkSession, execution) { - hiveResultString(execution.executedPlan) + hiveResultString(df) } tableSchema = getResultSetSchema(execution) new CommandProcessorResponse(0) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 233e6224a10d9..dd49f6392a47b 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -46,8 +46,6 @@ private[hive] object SparkSQLEnv extends Logging { sparkConf .setAppName(maybeAppName.getOrElse(s"SparkSQL::${Utils.localHostName()}")) - .set(SQLConf.DATETIME_JAVA8API_ENABLED, true) - val sparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport().getOrCreate() sparkContext = sparkSession.sparkContext diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 28e1db961f611..61d06c724b5f3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -347,7 +347,7 @@ abstract class HiveComparisonTest val query = new TestHiveQueryExecution(queryString.replace("../../data", testDataPath)) def getResult(): Seq[String] = { SQLExecution.withNewExecutionId( - query.sparkSession, query)(hiveResultString(query.executedPlan)) + query.sparkSession, query)(hiveResultString(query.df)) } try { (query, prepareAnswer(query, getResult())) } catch { case e: Throwable => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala index cc4592a5caf68..883921685d2dc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -575,11 +575,11 @@ private[hive] class TestHiveSparkSession( private[hive] class TestHiveQueryExecution( sparkSession: TestHiveSparkSession, - logicalPlan: LogicalPlan) - extends QueryExecution(sparkSession, logicalPlan) with Logging { + val df: DataFrame) + extends QueryExecution(sparkSession, df.queryExecution.logical) with Logging { def this(sparkSession: TestHiveSparkSession, sql: String) { - this(sparkSession, sparkSession.sessionState.sqlParser.parsePlan(sql)) + this(sparkSession, sparkSession.sql(sql)) } def this(sql: String) { @@ -642,7 +642,9 @@ private[sql] class TestHiveSessionStateBuilder( override def overrideConfs: Map[String, String] = TestHiveContext.overrideConfs override def createQueryExecution: (LogicalPlan) => QueryExecution = { plan => - new TestHiveQueryExecution(session.asInstanceOf[TestHiveSparkSession], plan) + new TestHiveQueryExecution( + session.asInstanceOf[TestHiveSparkSession], + Dataset.ofRows(session, plan)) } override protected def newBuilder: NewBuilder = new TestHiveSessionStateBuilder(_, _) From 80f4c891d3da7bbefcd5738cb61a8d4dabfa1a96 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Feb 2020 00:01:54 +0300 Subject: [PATCH 05/18] Fix StackOverflowError --- .../sql/hive/execution/HiveComparisonTest.scala | 2 +- .../org/apache/spark/sql/hive/test/TestHive.scala | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index b3c90a375ec36..d2f7ef635c1c4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -346,7 +346,7 @@ abstract class HiveComparisonTest val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) => val query = new TestHiveQueryExecution(queryString.replace("../../data", testDataPath)) def getResult(): Seq[String] = { - SQLExecution.withNewExecutionId(query)(hiveResultString(query.df)) + SQLExecution.withNewExecutionId(query)(hiveResultString(query.dataset)) } try { (query, prepareAnswer(query, getResult())) } catch { case e: Throwable => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala index 78a40e9a8ed5d..a917402a6fbbe 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -575,11 +575,11 @@ private[hive] class TestHiveSparkSession( private[hive] class TestHiveQueryExecution( sparkSession: TestHiveSparkSession, - val df: DataFrame) - extends QueryExecution(sparkSession, df.queryExecution.logical) with Logging { + logicalPlan: LogicalPlan) + extends QueryExecution(sparkSession, logicalPlan) with Logging { def this(sparkSession: TestHiveSparkSession, sql: String) { - this(sparkSession, sparkSession.sql(sql)) + this(sparkSession, sparkSession.sessionState.sqlParser.parsePlan(sql)) } def this(sql: String) { @@ -605,6 +605,8 @@ private[hive] class TestHiveQueryExecution( // Proceed with analysis. sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker) } + + lazy val dataset = Dataset.ofRows(sparkSession, logical) } @@ -642,9 +644,7 @@ private[sql] class TestHiveSessionStateBuilder( override def overrideConfs: Map[String, String] = TestHiveContext.overrideConfs override def createQueryExecution: (LogicalPlan) => QueryExecution = { plan => - new TestHiveQueryExecution( - session.asInstanceOf[TestHiveSparkSession], - Dataset.ofRows(session, plan)) + new TestHiveQueryExecution(session.asInstanceOf[TestHiveSparkSession], plan) } override protected def newBuilder: NewBuilder = new TestHiveSessionStateBuilder(_, _) From d3131a5e24baf217c06f0f2e2fff9dd9efaba2e7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Feb 2020 09:35:51 +0300 Subject: [PATCH 06/18] Dataset[T] -> Dataset[_] --- .../main/scala/org/apache/spark/sql/execution/HiveResult.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala index 810fd009b5584..904ce3f86161e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala @@ -37,7 +37,7 @@ object HiveResult { * Returns the result as a hive compatible sequence of strings. This is used in tests and * `SparkSQLDriver` for CLI applications. */ - def hiveResultString[T](ds: Dataset[T]): Seq[String] = { + def hiveResultString(ds: Dataset[_]): Seq[String] = { val executedPlan = ds.queryExecution.executedPlan executedPlan match { case ExecutedCommandExec(_: DescribeCommandBase) => From e68e622ea4b0ce44f0d1b1fab677843c8e108afd Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Feb 2020 09:38:12 +0300 Subject: [PATCH 07/18] Use Dataset.ofRows --- .../main/scala/org/apache/spark/sql/execution/HiveResult.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala index 904ce3f86161e..7caa2388bfe91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala @@ -58,7 +58,7 @@ object HiveResult { val java8TimeConf = SQLConf.get.getConf(SQLConf.DATETIME_JAVA8API_ENABLED) try { SQLConf.get.setConf(SQLConf.DATETIME_JAVA8API_ENABLED, true) - val result: Seq[Seq[Any]] = ds.filter(lit(true)) + val result: Seq[Seq[Any]] = Dataset.ofRows(ds.sparkSession, ds.queryExecution.logical) .queryExecution .executedPlan .executeCollectPublic().map(_.toSeq).toSeq From e491f966b23a734358b222ed8486a547467a22ed Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Feb 2020 09:47:49 +0300 Subject: [PATCH 08/18] Use withActive --- .../org/apache/spark/sql/execution/HiveResult.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala index 7caa2388bfe91..1d8a755089567 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala @@ -55,9 +55,12 @@ object HiveResult { case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended => command.executeCollect().map(_.getString(1)) case _ => - val java8TimeConf = SQLConf.get.getConf(SQLConf.DATETIME_JAVA8API_ENABLED) - try { - SQLConf.get.setConf(SQLConf.DATETIME_JAVA8API_ENABLED, true) + val sessionWithJava8DatatimeEnabled = { + val cloned = ds.sparkSession.cloneSession() + cloned.conf.set(SQLConf.DATETIME_JAVA8API_ENABLED.key, true) + cloned + } + sessionWithJava8DatatimeEnabled.withActive { val result: Seq[Seq[Any]] = Dataset.ofRows(ds.sparkSession, ds.queryExecution.logical) .queryExecution .executedPlan @@ -67,8 +70,6 @@ object HiveResult { // Reformat to match hive tab delimited output. result.map(_.zip(types).map(e => toHiveString(e))) .map(_.mkString("\t")) - } finally { - SQLConf.get.setConf(SQLConf.DATETIME_JAVA8API_ENABLED, java8TimeConf) } } } From 95597a7244e2c99a9ae267e5705748ad8403a169 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Feb 2020 10:03:08 +0300 Subject: [PATCH 09/18] Fix typo --- .../scala/org/apache/spark/sql/execution/HiveResult.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala index 1d8a755089567..b9501eef5ae5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala @@ -55,12 +55,12 @@ object HiveResult { case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended => command.executeCollect().map(_.getString(1)) case _ => - val sessionWithJava8DatatimeEnabled = { + val sessionWithJava8DatetimeEnabled = { val cloned = ds.sparkSession.cloneSession() cloned.conf.set(SQLConf.DATETIME_JAVA8API_ENABLED.key, true) cloned } - sessionWithJava8DatatimeEnabled.withActive { + sessionWithJava8DatetimeEnabled.withActive { val result: Seq[Seq[Any]] = Dataset.ofRows(ds.sparkSession, ds.queryExecution.logical) .queryExecution .executedPlan From 2086bbf68405830556d25c9c6ec38fe877538405 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Feb 2020 10:23:19 +0300 Subject: [PATCH 10/18] Remove unused import --- .../org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index dd49f6392a47b..8944b93d9b697 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -24,7 +24,6 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Utils /** A singleton object for the master program. The slaves should not access this. */ From 69f63b228b35fc6eadf69e53a70768f6d5ae743d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Feb 2020 10:24:35 +0300 Subject: [PATCH 11/18] Use df.queryExecution in SparkSQLDriver --- .../org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index d1e975043607c..64e91f405d613 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -61,7 +61,7 @@ private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlCont try { context.sparkContext.setJobDescription(command) val df = context.sql(command) - val execution = context.sessionState.executePlan(context.sql(command).logicalPlan) + val execution = df.queryExecution hiveResponse = SQLExecution.withNewExecutionId(execution) { hiveResultString(df) } From cdb322d6366e5c92579a8eaa26ccd94626cdf3ee Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Feb 2020 10:30:21 +0300 Subject: [PATCH 12/18] Embed dataset to HiveComparisonTest --- .../apache/spark/sql/hive/execution/HiveComparisonTest.scala | 3 ++- .../test/scala/org/apache/spark/sql/hive/test/TestHive.scala | 2 -- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index d2f7ef635c1c4..9b3857c3c66f0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -346,7 +346,8 @@ abstract class HiveComparisonTest val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) => val query = new TestHiveQueryExecution(queryString.replace("../../data", testDataPath)) def getResult(): Seq[String] = { - SQLExecution.withNewExecutionId(query)(hiveResultString(query.dataset)) + val ds = Dataset.ofRows(query.sparkSession, query.logical) + SQLExecution.withNewExecutionId(query)(hiveResultString(ds)) } try { (query, prepareAnswer(query, getResult())) } catch { case e: Throwable => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala index a917402a6fbbe..222244a04f5f5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -605,8 +605,6 @@ private[hive] class TestHiveQueryExecution( // Proceed with analysis. sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker) } - - lazy val dataset = Dataset.ofRows(sparkSession, logical) } From 5ac5e772a84b51c7038f7d2a819d2d428960b3a9 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Feb 2020 10:35:58 +0300 Subject: [PATCH 13/18] Add a comment --- .../main/scala/org/apache/spark/sql/execution/HiveResult.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala index b9501eef5ae5b..be11493fa3540 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala @@ -61,6 +61,8 @@ object HiveResult { cloned } sessionWithJava8DatetimeEnabled.withActive { + // We cannot collect the original dataset because its encoders could be created + // with disabled Java 8 date-time API. val result: Seq[Seq[Any]] = Dataset.ofRows(ds.sparkSession, ds.queryExecution.logical) .queryExecution .executedPlan From ce77628cad487d7d86cb33577a699ac37bf8f5fd Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Feb 2020 10:47:24 +0300 Subject: [PATCH 14/18] Add a comment to SparkExecuteStatementOperation --- .../sql/hive/thriftserver/SparkExecuteStatementOperation.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 7ea9a336149ce..7bcd8032bd6af 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -179,6 +179,8 @@ private[hive] class SparkExecuteStatementOperation( } curCol += 1 } + // Convert date-time instances to types that are acceptable by Hive libs + // used in conversions to strings. val resultRow = row.map { case i: Instant => Timestamp.from(i) case ld: LocalDate => Date.valueOf(ld) From f9112b7ab9bb654d880a8bdf0ad28a78c3da0712 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Feb 2020 11:06:52 +0300 Subject: [PATCH 15/18] Minor change --- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 6ea5e757a21f4..d6efb2f91ca0b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -189,8 +189,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark example.split(" > ").toList.foreach(_ match { case exampleRe(sql, output) => val df = clonedSpark.sql(sql) - val actual = unindentAndTrim( - hiveResultString(df).mkString("\n")) + val actual = unindentAndTrim(hiveResultString(df).mkString("\n")) val expected = unindentAndTrim(output) assert(actual === expected) case _ => From 2129f30c8f54c669405db302dbfb52f78631fc05 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Feb 2020 02:31:47 -0800 Subject: [PATCH 16/18] Revert "Embed dataset to HiveComparisonTest" This reverts commit cdb322d6366e5c92579a8eaa26ccd94626cdf3ee. --- .../apache/spark/sql/hive/execution/HiveComparisonTest.scala | 3 +-- .../test/scala/org/apache/spark/sql/hive/test/TestHive.scala | 2 ++ 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 9b3857c3c66f0..d2f7ef635c1c4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -346,8 +346,7 @@ abstract class HiveComparisonTest val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) => val query = new TestHiveQueryExecution(queryString.replace("../../data", testDataPath)) def getResult(): Seq[String] = { - val ds = Dataset.ofRows(query.sparkSession, query.logical) - SQLExecution.withNewExecutionId(query)(hiveResultString(ds)) + SQLExecution.withNewExecutionId(query)(hiveResultString(query.dataset)) } try { (query, prepareAnswer(query, getResult())) } catch { case e: Throwable => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala index 222244a04f5f5..a917402a6fbbe 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -605,6 +605,8 @@ private[hive] class TestHiveQueryExecution( // Proceed with analysis. sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker) } + + lazy val dataset = Dataset.ofRows(sparkSession, logical) } From 902f9d854e22b9a20c38b5ee0321c79bd977e33d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Feb 2020 15:43:10 +0300 Subject: [PATCH 17/18] Replace ds.queryExecution.logical by ds.logicalPlan --- .../main/scala/org/apache/spark/sql/execution/HiveResult.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala index be11493fa3540..b19184055268a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala @@ -63,7 +63,7 @@ object HiveResult { sessionWithJava8DatetimeEnabled.withActive { // We cannot collect the original dataset because its encoders could be created // with disabled Java 8 date-time API. - val result: Seq[Seq[Any]] = Dataset.ofRows(ds.sparkSession, ds.queryExecution.logical) + val result: Seq[Seq[Any]] = Dataset.ofRows(ds.sparkSession, ds.logicalPlan) .queryExecution .executedPlan .executeCollectPublic().map(_.toSeq).toSeq From 880b1dec95595b58688a3d5f7a2e0637a090cb47 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Feb 2020 15:49:31 +0300 Subject: [PATCH 18/18] Bug fix --- .../apache/spark/sql/hive/execution/HiveComparisonTest.scala | 4 +++- .../test/scala/org/apache/spark/sql/hive/test/TestHive.scala | 2 -- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index d2f7ef635c1c4..82fe274515505 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -346,7 +346,9 @@ abstract class HiveComparisonTest val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) => val query = new TestHiveQueryExecution(queryString.replace("../../data", testDataPath)) def getResult(): Seq[String] = { - SQLExecution.withNewExecutionId(query)(hiveResultString(query.dataset)) + SQLExecution.withNewExecutionId(query) { + hiveResultString(Dataset.ofRows(query.sparkSession, query.logical)) + } } try { (query, prepareAnswer(query, getResult())) } catch { case e: Throwable => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala index a917402a6fbbe..222244a04f5f5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -605,8 +605,6 @@ private[hive] class TestHiveQueryExecution( // Proceed with analysis. sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker) } - - lazy val dataset = Dataset.ofRows(sparkSession, logical) }