diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 04a3fc4b63050..60c54dfc98a58 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -476,6 +476,7 @@ object SparkParallelTestGrouping { "org.apache.spark.ml.classification.LogisticRegressionSuite", "org.apache.spark.ml.classification.LinearSVCSuite", "org.apache.spark.sql.SQLQueryTestSuite", + "org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperationSuite", "org.apache.spark.sql.hive.thriftserver.ThriftServerQueryTestSuite", "org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite", "org.apache.spark.sql.hive.thriftserver.ui.ThriftServerPageSuite", diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 1404ece76449e..db88264685a54 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -342,8 +342,8 @@ private[hive] class SparkExecuteStatementOperation( synchronized { if (!getStatus.getState.isTerminal) { logInfo(s"Cancel query with $statementId") - cleanup() setState(OperationState.CANCELED) + cleanup() HiveThriftServer2.eventManager.onStatementCanceled(statementId) } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala index 3da568cfa256e..59516dc26bae6 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala @@ -46,8 +46,8 @@ private[hive] trait SparkOperation extends Operation with Logging { } abstract override def close(): Unit = { - cleanup() super.close() + cleanup() logInfo(s"Close statement with $statementId") HiveThriftServer2.eventManager.onOperationClosed(statementId) } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala index 13df3fabc4919..4c2f29e0bf394 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala @@ -17,10 +17,25 @@ package org.apache.spark.sql.hive.thriftserver +import java.util +import java.util.concurrent.Semaphore + +import scala.concurrent.duration._ + +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hive.service.cli.OperationState +import org.apache.hive.service.cli.session.{HiveSession, HiveSessionImpl} +import org.mockito.Mockito.{doReturn, mock, spy, when, RETURNS_DEEP_STUBS} +import org.mockito.invocation.InvocationOnMock + import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2EventManager +import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, NullType, StringType, StructField, StructType} -class SparkExecuteStatementOperationSuite extends SparkFunSuite { +class SparkExecuteStatementOperationSuite extends SparkFunSuite with SharedSparkSession { + test("SPARK-17112 `select null` via JDBC triggers IllegalArgumentException in ThriftServer") { val field1 = StructField("NULL", NullType) val field2 = StructField("(IF(true, NULL, NULL))", NullType) @@ -42,4 +57,68 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite { assert(columns.get(1).getType().getName == "INT") assert(columns.get(1).getComment() == "") } + + Seq( + (OperationState.CANCELED, (_: SparkExecuteStatementOperation).cancel()), + (OperationState.CLOSED, (_: SparkExecuteStatementOperation).close()) + ).foreach { case (finalState, transition) => + test("SPARK-32057 SparkExecuteStatementOperation should not transiently become ERROR " + + s"before being set to $finalState") { + val hiveSession = new HiveSessionImpl(ThriftserverShimUtils.testedProtocolVersions.head, + "username", "password", new HiveConf, "ip address") + hiveSession.open(new util.HashMap) + + HiveThriftServer2.eventManager = mock(classOf[HiveThriftServer2EventManager]) + + val spySqlContext = spy(sqlContext) + + // When cancel() is called on the operation, cleanup causes an exception to be thrown inside + // of execute(). This should not cause the state to become ERROR. The exception here will be + // triggered in our custom cleanup(). + val signal = new Semaphore(0) + val dataFrame = mock(classOf[DataFrame], RETURNS_DEEP_STUBS) + when(dataFrame.collect()).thenAnswer((_: InvocationOnMock) => { + signal.acquire() + throw new RuntimeException("Operation was cancelled by test cleanup.") + }) + val statement = "stmt" + doReturn(dataFrame, Nil: _*).when(spySqlContext).sql(statement) + + val executeStatementOperation = new MySparkExecuteStatementOperation(spySqlContext, + hiveSession, statement, signal, finalState) + + val run = new Thread() { + override def run(): Unit = executeStatementOperation.runInternal() + } + assert(executeStatementOperation.getStatus.getState === OperationState.INITIALIZED) + run.start() + eventually(timeout(5.seconds)) { + assert(executeStatementOperation.getStatus.getState === OperationState.RUNNING) + } + transition(executeStatementOperation) + run.join() + assert(executeStatementOperation.getStatus.getState === finalState) + } + } + + private class MySparkExecuteStatementOperation( + sqlContext: SQLContext, + hiveSession: HiveSession, + statement: String, + signal: Semaphore, + finalState: OperationState) + extends SparkExecuteStatementOperation(sqlContext, hiveSession, statement, + new util.HashMap, false) { + + override def cleanup(): Unit = { + super.cleanup() + signal.release() + // At this point, operation should already be in finalState (set by either close() or + // cancel()). We want to check if it stays in finalState after the exception thrown by + // releasing the semaphore propagates. We hence need to sleep for a short while. + Thread.sleep(1000) + // State should not be ERROR + assert(getStatus.getState === finalState) + } + } }