From 819ae03cad57660f38fdee039ff1f953f7fa230a Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Fri, 15 May 2015 19:40:30 -0700 Subject: [PATCH 1/9] [SPARK-6964][SQL][WIP] Support Cancellation in the Thrift Server --- .../SparkExecuteStatementOperation.scala | 121 ++++++++++++++++-- .../server/SparkSQLOperationManager.scala | 5 +- 2 files changed, 112 insertions(+), 14 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index c0d1266212cdd..ba05be279e810 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -20,10 +20,26 @@ package org.apache.spark.sql.hive.thriftserver import java.sql.{Date, Timestamp} import java.util.{Map => JMap, UUID} +import java.security.PrivilegedExceptionAction +import java.util.concurrent.Executors +import java.util.concurrent.Future +import java.util.concurrent.RejectedExecutionException +import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, UUID} + +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ArrayBuffer, Map => SMap} + +import org.apache.commons.logging.Log +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.metastore.api.FieldSchema +import org.apache.hadoop.hive.ql.session.SessionState +import org.apache.hadoop.hive.shims.ShimLoader +import org.apache.hadoop.security.UserGroupInformation import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.ExecuteStatementOperation import org.apache.hive.service.cli.session.HiveSession +import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.Logging import org.apache.spark.sql.execution.SetCommand @@ -31,8 +47,6 @@ import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf} -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, Map => SMap} private[hive] class SparkExecuteStatementOperation( parentSession: HiveSession, @@ -40,17 +54,19 @@ private[hive] class SparkExecuteStatementOperation( confOverlay: JMap[String, String], runInBackground: Boolean = true) (hiveContext: HiveContext, sessionToActivePool: SMap[SessionHandle, String]) - // NOTE: `runInBackground` is set to `false` intentionally to disable asynchronous execution - extends ExecuteStatementOperation(parentSession, statement, confOverlay, false) + extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground) with Logging { private var result: DataFrame = _ private var iter: Iterator[SparkRow] = _ private var dataTypes: Array[DataType] = _ + private var statementId: String = _ def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. - logDebug("CLOSING") + hiveContext.sparkContext.clearJobGroup() + logDebug(s"CLOSING $statementId") + cleanup(OperationState.CLOSED) } def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) { @@ -114,10 +130,10 @@ private[hive] class SparkExecuteStatementOperation( } def getResultSetSchema: TableSchema = { - logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") - if (result.queryExecution.analyzed.output.size == 0) { + if (result == null || result.queryExecution.analyzed.output.size == 0) { new TableSchema(new FieldSchema("Result", "string", "") :: Nil) } else { + logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") val schema = result.queryExecution.analyzed.output.map { attr => new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "") } @@ -125,9 +141,65 @@ private[hive] class SparkExecuteStatementOperation( } } - def run(): Unit = { - val statementId = UUID.randomUUID().toString - logInfo(s"Running query '$statement'") + override def run(): Unit = { + setState(OperationState.PENDING) + setHasResultSet(true) // avoid no resultset for async run + + if (!runInBackground) { + runInternal() + } else { + val parentSessionState = SessionState.get() + val hiveConf = new HiveConf(getParentSession().getHiveConf()) + val sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf) + + // Runnable impl to call runInternal asynchronously, + // from a different thread + val backgroundOperation = new Runnable() { + + override def run(): Unit = { + val doAsAction = new PrivilegedExceptionAction[Object]() { + override def run(): Object = { + + // User information is part of the metastore client member in Hive + SessionState.setCurrentSessionState(parentSessionState) + try { + runInternal() + } catch { + case e: HiveSQLException => + setOperationException(e) + log.error("Error running hive query: ", e) + } + return null + } + } + + try { + ShimLoader.getHadoopShims().doAs(sparkServiceUGI, doAsAction) + } catch { + case e: Exception => + setOperationException(new HiveSQLException(e)) + logError("Error running hive query as user : " + + sparkServiceUGI.getShortUserName(), e) + } + } + } + try { + // This submit blocks if no background threads are available to run this operation + val backgroundHandle = + getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation) + setBackgroundHandle(backgroundHandle) + } catch { + case rejected: RejectedExecutionException => + setState(OperationState.ERROR); + throw new HiveSQLException("The background threadpool cannot accept" + + " new task for execution, please retry the operation", rejected) + } + } + } + + private def runInternal(): Unit = { + statementId = UUID.randomUUID().toString + logInfo(s"Running query '$statement' with $statementId") setState(OperationState.RUNNING) HiveThriftServer2.listener.onStatementStart( statementId, @@ -159,18 +231,43 @@ private[hive] class SparkExecuteStatementOperation( } } dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray - setHasResultSet(true) } catch { + case e: HiveSQLException => + if (getStatus().getState() == OperationState.CANCELED) { + return + } else { + setState(OperationState.ERROR); + throw e + } // Actually do need to catch Throwable as some failures don't inherit from Exception and // HiveServer will silently swallow them. case e: Throwable => + val currentState = getStatus().getState() + logError(s"Error executing query, currentState $currentState, :", e) setState(OperationState.ERROR) HiveThriftServer2.listener.onStatementError( statementId, e.getMessage, e.getStackTraceString) - logError("Error executing query:", e) throw new HiveSQLException(e.toString) } setState(OperationState.FINISHED) HiveThriftServer2.listener.onStatementFinish(statementId) } + + override def cancel(): Unit = { + logInfo(s"Cancel '$statement' with $statementId") + if (statementId != null) { + hiveContext.sparkContext.cancelJobGroup(statementId) + } + cleanup(OperationState.CANCELED) + } + + private def cleanup(state: OperationState) { + setState(state) + if (runInBackground) { + val backgroundHandle = getBackgroundHandle() + if (backgroundHandle != null) { + backgroundHandle.cancel(true) + } + } + } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 9c0bf02391e0e..0a72f9ec156d5 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -44,9 +44,10 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext) confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation = synchronized { - val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay)( - hiveContext, sessionToActivePool) + val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay, + async)(hiveContext, sessionToActivePool) handleToOperation.put(operation.getHandle, operation) + logDebug(s"Created Operation for $statement with session=$parentSession, async=$async") operation } } From 184ec353c86e52211f13e3fdbbb2c2651a955d6c Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Mon, 18 May 2015 17:13:28 -0700 Subject: [PATCH 2/9] keep hive conf --- .../SparkExecuteStatementOperation.scala | 44 ++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index ba05be279e810..66f26f018f818 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -33,6 +33,8 @@ import org.apache.commons.logging.Log import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.metastore.api.FieldSchema +import org.apache.hadoop.hive.ql.metadata.Hive +import org.apache.hadoop.hive.ql.metadata.HiveException import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.shims.ShimLoader import org.apache.hadoop.security.UserGroupInformation @@ -149,8 +151,9 @@ private[hive] class SparkExecuteStatementOperation( runInternal() } else { val parentSessionState = SessionState.get() - val hiveConf = new HiveConf(getParentSession().getHiveConf()) + val hiveConf = getConfigForOperation() val sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf) + val sessionHive = getCurrentHive() // Runnable impl to call runInternal asynchronously, // from a different thread @@ -161,6 +164,7 @@ private[hive] class SparkExecuteStatementOperation( override def run(): Object = { // User information is part of the metastore client member in Hive + Hive.set(sessionHive) SessionState.setCurrentSessionState(parentSessionState) try { runInternal() @@ -270,4 +274,42 @@ private[hive] class SparkExecuteStatementOperation( } } } + + /** + * If there are query specific settings to overlay, then create a copy of config + * There are two cases we need to clone the session config that's being passed to hive driver + * 1. Async query - + * If the client changes a config setting, that shouldn't reflect in the execution already underway + * 2. confOverlay - + * The query specific settings should only be applied to the query config and not session + * @return new configuration + * @throws HiveSQLException + */ + private def getConfigForOperation(): HiveConf = { + var sqlOperationConf = getParentSession().getHiveConf() + if (!getConfOverlay().isEmpty() || runInBackground) { + // clone the partent session config for this query + sqlOperationConf = new HiveConf(sqlOperationConf) + + // apply overlay query specific settings, if any + getConfOverlay().foreach { case (k, v) => + try { + sqlOperationConf.verifyAndSet(k, v) + } catch { + case e: IllegalArgumentException => + throw new HiveSQLException("Error applying statement specific settings", e) + } + } + } + return sqlOperationConf + } + + private def getCurrentHive(): Hive = { + try { + return Hive.get() + } catch { + case e: HiveException => + throw new HiveSQLException("Failed to get current Hive object", e); + } + } } From 04142c388a8e91264315df2f663da6d8531c27f8 Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Wed, 20 May 2015 14:03:23 -0700 Subject: [PATCH 3/9] set SQLSession for async execution --- .../org/apache/spark/sql/SQLContext.scala | 5 +++ .../SparkExecuteStatementOperation.scala | 2 ++ .../HiveThriftServer2Suites.scala | 31 +++++++++++++++++-- 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 0aab7fa8709b8..ddb54025baa24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -916,6 +916,11 @@ class SQLContext(@transient val sparkContext: SparkContext) tlSession.remove() } + protected[sql] def setSession(session: SQLSession): Unit = { + detachSession() + tlSession.set(session) + } + protected[sql] class SQLSession { // Note that this is a lazy val so we can override the default value in subclasses. protected[sql] lazy val conf: SQLConf = new SQLConf diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 66f26f018f818..9fa2126041fd3 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -154,6 +154,7 @@ private[hive] class SparkExecuteStatementOperation( val hiveConf = getConfigForOperation() val sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf) val sessionHive = getCurrentHive() + val currentSqlSession = hiveContext.currentSession // Runnable impl to call runInternal asynchronously, // from a different thread @@ -164,6 +165,7 @@ private[hive] class SparkExecuteStatementOperation( override def run(): Object = { // User information is part of the metastore client member in Hive + hiveContext.setSession(currentSqlSession) Hive.set(sessionHive) SessionState.setCurrentSessionState(parentSessionState) try { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index f57c7083ea504..906d1827cfa2d 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -19,11 +19,14 @@ package org.apache.spark.sql.hive.thriftserver import java.io.File import java.net.URL -import java.sql.{Date, DriverManager, Statement} +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Paths} +import java.sql.{Date, DriverManager, SQLException, Statement} import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ -import scala.concurrent.{Await, Promise} +import scala.concurrent.{Await, Promise, future} +import scala.concurrent.ExecutionContext.Implicits.global import scala.sys.process.{Process, ProcessLogger} import scala.util.{Random, Try} @@ -338,6 +341,30 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } ) } + + test("test jdbc cancel") { + withJdbcStatement { statement => + val queries = Seq( + "DROP TABLE IF EXISTS test_map", + "CREATE TABLE test_map(key INT, value STRING)", + s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map") + + queries.foreach(statement.execute) + + val f = future { Thread.sleep(3000); statement.cancel(); } + + val join = "SELECT COUNT(*) FROM test_map " + List.fill(10)("join test_map").mkString(" ") + val e = intercept[SQLException] { + statement.executeQuery(join) + } + assert(e.getMessage contains "cancelled") + Await.result(f, Duration.Inf) + + val rs1 = statement.executeQuery("SELECT COUNT(*) FROM test_map") + rs1.next() + assert(5 == rs1.getInt(1)) + } + } } class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { From 3d8ebf8adb2babb7beb3abf83196c5cc7ccc4344 Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Thu, 21 May 2015 17:16:27 -0700 Subject: [PATCH 4/9] add spark.sql.hive.thriftServer.async flag --- .../server/SparkSQLOperationManager.scala | 6 +++-- .../HiveThriftServer2Suites.scala | 26 +++++++++++++------ .../apache/spark/sql/hive/HiveContext.scala | 6 +++++ 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 0a72f9ec156d5..c8031ed0f3437 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -44,10 +44,12 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext) confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation = synchronized { + val runInBackground = async && hiveContext.hiveThriftServerAsync val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay, - async)(hiveContext, sessionToActivePool) + runInBackground)(hiveContext, sessionToActivePool) handleToOperation.put(operation.getHandle, operation) - logDebug(s"Created Operation for $statement with session=$parentSession, async=$async") + logDebug(s"Created Operation for $statement with session=$parentSession, " + + s"runInBackground=$runInBackground") operation } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 906d1827cfa2d..19d3e249512b6 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -351,18 +351,28 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { queries.foreach(statement.execute) - val f = future { Thread.sleep(3000); statement.cancel(); } - - val join = "SELECT COUNT(*) FROM test_map " + List.fill(10)("join test_map").mkString(" ") + val largeJoin = "SELECT COUNT(*) FROM test_map " + List.fill(10)("join test_map").mkString(" ") + val f = future { Thread.sleep(100); statement.cancel(); } val e = intercept[SQLException] { - statement.executeQuery(join) + statement.executeQuery(largeJoin) } assert(e.getMessage contains "cancelled") Await.result(f, Duration.Inf) - val rs1 = statement.executeQuery("SELECT COUNT(*) FROM test_map") + // cancel is a noop + statement.executeQuery("SET spark.sql.hive.thriftServer.async=false") + val sf = future { Thread.sleep(100); statement.cancel(); } + val smallJoin = "SELECT COUNT(*) FROM test_map " + List.fill(4)("join test_map").mkString(" ") + val rs1 = statement.executeQuery(smallJoin) + Await.result(sf, Duration.Inf) rs1.next() - assert(5 == rs1.getInt(1)) + assert(5*5*5*5*5 == rs1.getInt(1)) + rs1.close() + + val rs2 = statement.executeQuery("SELECT COUNT(*) FROM test_map") + rs2.next() + assert(5 == rs2.getInt(1)) + rs2.close() } } } @@ -604,7 +614,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl } override protected def afterAll(): Unit = { - stopThriftServer() - logInfo("HiveThriftServer2 stopped") + //stopThriftServer() + //logInfo("HiveThriftServer2 stopped") } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 800f51c5e2e86..aa0c11128d649 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -144,6 +144,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { getConf("spark.sql.hive.metastore.barrierPrefixes", "") .split(",").filterNot(_ == "") + /* + * hive thrift server use background spark sql thread pool to execute sql queries + */ + protected[hive] def hiveThriftServerAsync: Boolean = + getConf("spark.sql.hive.thriftServer.async", "true") == "true" + @transient protected[sql] lazy val substitutor = new VariableSubstitution() From 341885b2035938685c07a8f6acfb02bb65c63f9e Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Thu, 21 May 2015 22:30:42 -0700 Subject: [PATCH 5/9] small fix --- .../hive/thriftserver/HiveThriftServer2Suites.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 19d3e249512b6..e091798bda390 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -351,7 +351,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { queries.foreach(statement.execute) - val largeJoin = "SELECT COUNT(*) FROM test_map " + List.fill(10)("join test_map").mkString(" ") + val largeJoin = "SELECT COUNT(*) FROM test_map " + + List.fill(10)("join test_map").mkString(" ") val f = future { Thread.sleep(100); statement.cancel(); } val e = intercept[SQLException] { statement.executeQuery(largeJoin) @@ -362,7 +363,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { // cancel is a noop statement.executeQuery("SET spark.sql.hive.thriftServer.async=false") val sf = future { Thread.sleep(100); statement.cancel(); } - val smallJoin = "SELECT COUNT(*) FROM test_map " + List.fill(4)("join test_map").mkString(" ") + val smallJoin = "SELECT COUNT(*) FROM test_map " + + List.fill(4)("join test_map").mkString(" ") val rs1 = statement.executeQuery(smallJoin) Await.result(sf, Duration.Inf) rs1.next() @@ -614,7 +616,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl } override protected def afterAll(): Unit = { - //stopThriftServer() - //logInfo("HiveThriftServer2 stopped") + stopThriftServer() + logInfo("HiveThriftServer2 stopped") } } From eb3e38517b41f6600156ed9c9497e4d516691ae4 Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Tue, 26 May 2015 09:09:39 -0700 Subject: [PATCH 6/9] small nit --- .../src/main/scala/org/apache/spark/sql/hive/HiveContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index aa0c11128d649..b8f294c262af7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -148,7 +148,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { * hive thrift server use background spark sql thread pool to execute sql queries */ protected[hive] def hiveThriftServerAsync: Boolean = - getConf("spark.sql.hive.thriftServer.async", "true") == "true" + getConf("spark.sql.hive.thriftServer.async", "true").toBoolean @transient protected[sql] lazy val substitutor = new VariableSubstitution() From 380480f1f38059cded2f7ca0e880e3035b3a6e5c Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Wed, 27 May 2015 09:01:10 -0700 Subject: [PATCH 7/9] fix for liancheng's comments --- .../thriftserver/SparkExecuteStatementOperation.scala | 9 +++++++-- .../sql/hive/thriftserver/HiveThriftServer2Suites.scala | 8 ++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 9fa2126041fd3..a566b32e88a95 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -28,6 +28,7 @@ import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, UUID} import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, Map => SMap} +import scala.util.control.NonFatal import org.apache.commons.logging.Log import org.apache.hadoop.hive.conf.HiveConf @@ -196,9 +197,13 @@ private[hive] class SparkExecuteStatementOperation( setBackgroundHandle(backgroundHandle) } catch { case rejected: RejectedExecutionException => - setState(OperationState.ERROR); + setState(OperationState.ERROR) throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected) + case NonFatal(e) => + logError(s"Error executing query in background", e) + setState(OperationState.ERROR) + throw e } } } @@ -249,7 +254,7 @@ private[hive] class SparkExecuteStatementOperation( // HiveServer will silently swallow them. case e: Throwable => val currentState = getStatus().getState() - logError(s"Error executing query, currentState $currentState, :", e) + logError(s"Error executing query, currentState $currentState, ", e) setState(OperationState.ERROR) HiveThriftServer2.listener.onStatementError( statementId, e.getMessage, e.getStackTraceString) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index e091798bda390..dc885fc52953d 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -358,7 +358,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { statement.executeQuery(largeJoin) } assert(e.getMessage contains "cancelled") - Await.result(f, Duration.Inf) + Await.result(f, 3.minute) // cancel is a noop statement.executeQuery("SET spark.sql.hive.thriftServer.async=false") @@ -366,14 +366,14 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { val smallJoin = "SELECT COUNT(*) FROM test_map " + List.fill(4)("join test_map").mkString(" ") val rs1 = statement.executeQuery(smallJoin) - Await.result(sf, Duration.Inf) + Await.result(sf, 3.minute) rs1.next() - assert(5*5*5*5*5 == rs1.getInt(1)) + assert(rs1.getInt(1) === math.pow(5, 5)) rs1.close() val rs2 = statement.executeQuery("SELECT COUNT(*) FROM test_map") rs2.next() - assert(5 == rs2.getInt(1)) + assert(rs2.getInt(1) === 5) rs2.close() } } From 7bfa2a7665679d441f925652c05e82ff6dcce2a3 Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Fri, 5 Jun 2015 10:01:36 -0700 Subject: [PATCH 8/9] fix merge --- .../SparkExecuteStatementOperation.scala | 13 +++---------- .../hive/thriftserver/HiveThriftServer2Suites.scala | 1 - 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index a566b32e88a95..8589adf786db5 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -17,32 +17,25 @@ package org.apache.spark.sql.hive.thriftserver -import java.sql.{Date, Timestamp} -import java.util.{Map => JMap, UUID} - import java.security.PrivilegedExceptionAction -import java.util.concurrent.Executors -import java.util.concurrent.Future +import java.sql.{Date, Timestamp} import java.util.concurrent.RejectedExecutionException -import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, UUID} +import java.util.{Map => JMap, UUID} import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, Map => SMap} import scala.util.control.NonFatal -import org.apache.commons.logging.Log import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.metastore.api.FieldSchema +import org.apache.hive.service.cli._ import org.apache.hadoop.hive.ql.metadata.Hive import org.apache.hadoop.hive.ql.metadata.HiveException import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.shims.ShimLoader import org.apache.hadoop.security.UserGroupInformation -import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.ExecuteStatementOperation import org.apache.hive.service.cli.session.HiveSession -import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.Logging import org.apache.spark.sql.execution.SetCommand diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index dc885fc52953d..178bd1f5cb164 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.thriftserver import java.io.File import java.net.URL import java.nio.charset.StandardCharsets -import java.nio.file.{Files, Paths} import java.sql.{Date, DriverManager, SQLException, Statement} import scala.collection.mutable.ArrayBuffer From 687c1138d2b190ad4ef87d3c5b45a887458c8d1a Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Fri, 5 Jun 2015 10:38:48 -0700 Subject: [PATCH 9/9] fix 100 characters --- .../sql/hive/thriftserver/SparkExecuteStatementOperation.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 8589adf786db5..e071103df925c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -279,7 +279,8 @@ private[hive] class SparkExecuteStatementOperation( * If there are query specific settings to overlay, then create a copy of config * There are two cases we need to clone the session config that's being passed to hive driver * 1. Async query - - * If the client changes a config setting, that shouldn't reflect in the execution already underway + * If the client changes a config setting, that shouldn't reflect in the execution + * already underway * 2. confOverlay - * The query specific settings should only be applied to the query config and not session * @return new configuration