From b3a26e15ba1851669ffe5bf28e5bcb6f84a18316 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sat, 19 Oct 2019 13:04:39 +0800 Subject: [PATCH 1/6] Test ThriftServerQueryTestSuite asynchronously --- .../hive/thriftserver/ThriftServerQueryTestSuite.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index 799f00a28fd49..a24c89ab2def0 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive.thriftserver import java.io.File -import java.sql.{DriverManager, Statement, Timestamp} +import java.sql.{DriverManager, SQLException, Statement, Timestamp} import java.util.{Locale, MissingFormatArgumentException} import scala.util.{Random, Try} @@ -78,7 +78,7 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite { override def sparkConf: SparkConf = super.sparkConf // Hive Thrift server should not executes SQL queries in an asynchronous way // because we may set session configuration. - .set(HiveUtils.HIVE_THRIFT_SERVER_ASYNC, false) + .set(HiveUtils.HIVE_THRIFT_SERVER_ASYNC, true) override val isTestWithConfigSets = false @@ -220,6 +220,12 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite { s"Exception did not match for query #$i\n${expected.sql}, " + s"expected: ${expected.output}, but got: ${output.output}") + // SQLException should not exactly match. We only assert the result contains Exception. + case _ if output.output.startsWith(classOf[SQLException].getName) => + assert(expected.output.contains("Exception"), + s"Exception did not match for query #$i\n${expected.sql}, " + + s"expected: ${expected.output}, but got: ${output.output}") + case _ => assertResult(expected.output, s"Result did not match for query #$i\n${expected.sql}") { output.output From 5c896ced3d44446fce8d185a58aab32aacbb978e Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 20 Oct 2019 09:28:00 +0800 Subject: [PATCH 2/6] Add debug info --- .../spark/sql/catalyst/parser/ParseDriver.scala | 5 +++++ .../SparkExecuteStatementOperation.scala | 12 +++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala index 85998e33140d0..d09c8c313c6ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala @@ -88,6 +88,11 @@ abstract class AbstractSqlParser extends ParserInterface with Logging { protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = { logDebug(s"Parsing command: $command") + // scalastyle:off + if (command.startsWith("set") || command.startsWith("select i, left('ahoj', i)")) { + println(s"SQLConf.get.ansiEnabled: ${SQLConf.get.ansiEnabled}") + } + // scalastyle:on val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command))) lexer.removeErrorListeners() lexer.addErrorListener(ParseErrorListener) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 68197a9de8566..0c64885a46a89 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -275,7 +275,17 @@ private[hive] class SparkExecuteStatementOperation( Thread.currentThread().setContextClassLoader(executionHiveClassLoader) sqlContext.sparkContext.setJobGroup(statementId, statement) - result = sqlContext.sql(statement) + // scalastyle:off + if (statement.startsWith("set") || statement.startsWith("select i, left('ahoj', i)")) { + println("=" * 96) + println(s"statement: ${statement}") + println("spark.sql.ansi.enabled before: " + sqlContext.conf.getConfString("spark.sql.ansi.enabled")) + result = sqlContext.sql(statement) + println("spark.sql.ansi.enabled after: " + sqlContext.conf.getConfString("spark.sql.ansi.enabled")) + // scalastyle:on + } else { + result = sqlContext.sql(statement) + } logDebug(result.queryExecution.toString()) result.queryExecution.logical match { case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) => From df58bb381a7d1e8114b62c8a08b5c81bd36c38e0 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 21 Oct 2019 14:38:51 +0800 Subject: [PATCH 3/6] Apply SPARK-29530 --- .../spark/sql/catalyst/parser/ParseDriver.scala | 5 ----- .../scala/org/apache/spark/sql/SparkSession.scala | 1 + .../SparkExecuteStatementOperation.scala | 12 +----------- 3 files changed, 2 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala index d09c8c313c6ba..85998e33140d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala @@ -88,11 +88,6 @@ abstract class AbstractSqlParser extends ParserInterface with Logging { protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = { logDebug(s"Parsing command: $command") - // scalastyle:off - if (command.startsWith("set") || command.startsWith("select i, left('ahoj', i)")) { - println(s"SQLConf.get.ansiEnabled: ${SQLConf.get.ansiEnabled}") - } - // scalastyle:on val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command))) lexer.removeErrorListeners() lexer.addErrorListener(ParseErrorListener) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index bd2bc1c0ad5d7..e986e49aabe3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -600,6 +600,7 @@ class SparkSession private( * @since 2.0.0 */ def sql(sqlText: String): DataFrame = { + SparkSession.setActiveSession(this) val tracker = new QueryPlanningTracker val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) { sessionState.sqlParser.parsePlan(sqlText) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 0c64885a46a89..68197a9de8566 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -275,17 +275,7 @@ private[hive] class SparkExecuteStatementOperation( Thread.currentThread().setContextClassLoader(executionHiveClassLoader) sqlContext.sparkContext.setJobGroup(statementId, statement) - // scalastyle:off - if (statement.startsWith("set") || statement.startsWith("select i, left('ahoj', i)")) { - println("=" * 96) - println(s"statement: ${statement}") - println("spark.sql.ansi.enabled before: " + sqlContext.conf.getConfString("spark.sql.ansi.enabled")) - result = sqlContext.sql(statement) - println("spark.sql.ansi.enabled after: " + sqlContext.conf.getConfString("spark.sql.ansi.enabled")) - // scalastyle:on - } else { - result = sqlContext.sql(statement) - } + result = sqlContext.sql(statement) logDebug(result.queryExecution.toString()) result.queryExecution.logical match { case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) => From 8fd1efd84a195bbc8966eaa13529cc3c00128808 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 21 Oct 2019 14:43:16 +0800 Subject: [PATCH 4/6] Remove custom SparkConf --- .../sql/hive/thriftserver/ThriftServerQueryTestSuite.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index a24c89ab2def0..d09a7840733e9 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -75,11 +75,6 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite { } } - override def sparkConf: SparkConf = super.sparkConf - // Hive Thrift server should not executes SQL queries in an asynchronous way - // because we may set session configuration. - .set(HiveUtils.HIVE_THRIFT_SERVER_ASYNC, true) - override val isTestWithConfigSets = false /** List of test cases to ignore, in lower cases. */ From 8f08b3ddadbad3bb3ca0eff3fd8c43cfc6f50377 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 22 Oct 2019 07:21:14 +0800 Subject: [PATCH 5/6] Apply SPARK-29530 --- .../spark/sql/catalyst/parser/ParseDriver.scala | 14 +++++++------- .../spark/sql/execution/SparkSqlParser.scala | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala index 85998e33140d0..a84d29b71ac42 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.types.{DataType, StructType} /** * Base SQL parsing infrastructure. */ -abstract class AbstractSqlParser extends ParserInterface with Logging { +abstract class AbstractSqlParser(conf: SQLConf) extends ParserInterface with Logging { /** Creates/Resolves DataType for a given SQL string. */ override def parseDataType(sqlText: String): DataType = parse(sqlText) { parser => @@ -91,16 +91,16 @@ abstract class AbstractSqlParser extends ParserInterface with Logging { val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command))) lexer.removeErrorListeners() lexer.addErrorListener(ParseErrorListener) - lexer.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced - lexer.ansi = SQLConf.get.ansiEnabled + lexer.legacy_setops_precedence_enbled = conf.setOpsPrecedenceEnforced + lexer.ansi = conf.ansiEnabled val tokenStream = new CommonTokenStream(lexer) val parser = new SqlBaseParser(tokenStream) parser.addParseListener(PostProcessor) parser.removeErrorListeners() parser.addErrorListener(ParseErrorListener) - parser.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced - parser.ansi = SQLConf.get.ansiEnabled + parser.legacy_setops_precedence_enbled = conf.setOpsPrecedenceEnforced + parser.ansi = conf.ansiEnabled try { try { @@ -134,12 +134,12 @@ abstract class AbstractSqlParser extends ParserInterface with Logging { /** * Concrete SQL parser for Catalyst-only SQL statements. */ -class CatalystSqlParser(conf: SQLConf) extends AbstractSqlParser { +class CatalystSqlParser(conf: SQLConf) extends AbstractSqlParser(conf) { val astBuilder = new AstBuilder(conf) } /** For test-only. */ -object CatalystSqlParser extends AbstractSqlParser { +object CatalystSqlParser extends AbstractSqlParser(SQLConf.get) { val astBuilder = new AstBuilder(SQLConf.get) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index df63787fa508b..1ee822b584943 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.types.StructType /** * Concrete parser for Spark SQL statements. */ -class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser { +class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser(conf) { val astBuilder = new SparkSqlAstBuilder(conf) private val substitutor = new VariableSubstitution(conf) From f291ed40e2a10f802b3aade04a8ced656fd48301 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 22 Oct 2019 07:33:37 +0800 Subject: [PATCH 6/6] Remove SparkSession.setActiveSession(this) --- sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index e986e49aabe3b..bd2bc1c0ad5d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -600,7 +600,6 @@ class SparkSession private( * @since 2.0.0 */ def sql(sqlText: String): DataFrame = { - SparkSession.setActiveSession(this) val tracker = new QueryPlanningTracker val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) { sessionState.sqlParser.parsePlan(sqlText)