diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index e2f3314bc859..a3776b3ad756 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -352,10 +352,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val ui = SparkUI.create(None, new HistoryAppStatusStore(conf, kvstore), conf, secManager,
app.info.name, HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
attempt.info.startTime.getTime(), attempt.info.appSparkVersion)
- loadPlugins().foreach(_.setupUI(ui))
- val loadedUI = LoadedAppUI(ui)
+ // place the tab in UI based on the display order
+ loadPlugins().toSeq.sortBy(_.displayOrder).foreach(_.setupUI(ui))
+ val loadedUI = LoadedAppUI(ui)
synchronized {
activeUIs((appId, attemptId)) = loadedUI
}
diff --git a/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala b/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala
index d144a0e998fa..2e9a31d5ac69 100644
--- a/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala
@@ -35,4 +35,9 @@ private[spark] trait AppHistoryServerPlugin {
* Sets up UI of this plugin to rebuild the history UI.
*/
def setupUI(ui: SparkUI): Unit
+
+ /**
+ * The position of a plugin tab relative to the other plugin tabs in the history UI.
+ */
+ def displayOrder: Int = Integer.MAX_VALUE
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala
index 522d0cf79bff..5bf1ce5eb8a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala
@@ -33,4 +33,7 @@ class SQLHistoryServerPlugin extends AppHistoryServerPlugin {
new SQLTab(sqlStatusStore, ui)
}
}
+
+ override def displayOrder: Int = 0
}
+
diff --git a/sql/hive-thriftserver/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin b/sql/hive-thriftserver/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
new file mode 100644
index 000000000000..96d990372ee4
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
@@ -0,0 +1 @@
+org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2HistoryServerPlugin
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 9517a599be63..f15193b0dc3c 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -20,9 +20,6 @@ package org.apache.spark.sql.hive.thriftserver
import java.util.Locale
import java.util.concurrent.atomic.AtomicBoolean
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService}
@@ -32,12 +29,11 @@ import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.UI.UI_ENABLED
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
-import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.hive.thriftserver.ui._
+import org.apache.spark.status.ElementTrackingStore
import org.apache.spark.util.{ShutdownHookManager, Utils}
/**
@@ -47,6 +43,7 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
object HiveThriftServer2 extends Logging {
var uiTab: Option[ThriftServerTab] = None
var listener: HiveThriftServer2Listener = _
+ var eventManager: HiveThriftServer2EventManager = _
/**
* :: DeveloperApi ::
@@ -62,14 +59,21 @@ object HiveThriftServer2 extends Logging {
server.init(executionHive.conf)
server.start()
- listener = new HiveThriftServer2Listener(server, sqlContext.conf)
- sqlContext.sparkContext.addSparkListener(listener)
- uiTab = if (sqlContext.sparkContext.getConf.get(UI_ENABLED)) {
- Some(new ThriftServerTab(sqlContext.sparkContext))
+ createListenerAndUI(server, sqlContext.sparkContext)
+ server
+ }
+
+ private def createListenerAndUI(server: HiveThriftServer2, sc: SparkContext): Unit = {
+ val kvStore = sc.statusStore.store.asInstanceOf[ElementTrackingStore]
+ eventManager = new HiveThriftServer2EventManager(sc)
+ listener = new HiveThriftServer2Listener(kvStore, sc.conf, Some(server))
+ sc.listenerBus.addToStatusQueue(listener)
+ uiTab = if (sc.getConf.get(UI_ENABLED)) {
+ Some(new ThriftServerTab(new HiveThriftServer2AppStatusStore(kvStore, Some(listener)),
+ ThriftServerTab.getSparkUI(sc)))
} else {
None
}
- server
}
def main(args: Array[String]): Unit = {
@@ -101,13 +105,7 @@ object HiveThriftServer2 extends Logging {
server.init(executionHive.conf)
server.start()
logInfo("HiveThriftServer2 started")
- listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf)
- SparkSQLEnv.sparkContext.addSparkListener(listener)
- uiTab = if (SparkSQLEnv.sparkContext.getConf.get(UI_ENABLED)) {
- Some(new ThriftServerTab(SparkSQLEnv.sparkContext))
- } else {
- None
- }
+ createListenerAndUI(server, SparkSQLEnv.sparkContext)
// If application was killed before HiveThriftServer2 start successfully then SparkSubmit
// process can not exit, so check whether if SparkContext was stopped.
if (SparkSQLEnv.sparkContext.stopped.get()) {
@@ -121,179 +119,10 @@ object HiveThriftServer2 extends Logging {
}
}
- private[thriftserver] class SessionInfo(
- val sessionId: String,
- val startTimestamp: Long,
- val ip: String,
- val userName: String) {
- var finishTimestamp: Long = 0L
- var totalExecution: Int = 0
- def totalTime: Long = {
- if (finishTimestamp == 0L) {
- System.currentTimeMillis - startTimestamp
- } else {
- finishTimestamp - startTimestamp
- }
- }
- }
-
private[thriftserver] object ExecutionState extends Enumeration {
val STARTED, COMPILED, CANCELED, FAILED, FINISHED, CLOSED = Value
type ExecutionState = Value
}
-
- private[thriftserver] class ExecutionInfo(
- val statement: String,
- val sessionId: String,
- val startTimestamp: Long,
- val userName: String) {
- var finishTimestamp: Long = 0L
- var closeTimestamp: Long = 0L
- var executePlan: String = ""
- var detail: String = ""
- var state: ExecutionState.Value = ExecutionState.STARTED
- val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
- var groupId: String = ""
- def totalTime(endTime: Long): Long = {
- if (endTime == 0L) {
- System.currentTimeMillis - startTimestamp
- } else {
- endTime - startTimestamp
- }
- }
- }
-
-
- /**
- * An inner sparkListener called in sc.stop to clean up the HiveThriftServer2
- */
- private[thriftserver] class HiveThriftServer2Listener(
- val server: HiveServer2,
- val conf: SQLConf) extends SparkListener {
-
- override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
- server.stop()
- }
- private val sessionList = new mutable.LinkedHashMap[String, SessionInfo]
- private val executionList = new mutable.LinkedHashMap[String, ExecutionInfo]
- private val retainedStatements = conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT)
- private val retainedSessions = conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT)
-
- def getOnlineSessionNum: Int = synchronized {
- sessionList.count(_._2.finishTimestamp == 0)
- }
-
- def isExecutionActive(execInfo: ExecutionInfo): Boolean = {
- !(execInfo.state == ExecutionState.FAILED ||
- execInfo.state == ExecutionState.CANCELED ||
- execInfo.state == ExecutionState.CLOSED)
- }
-
- /**
- * When an error or a cancellation occurs, we set the finishTimestamp of the statement.
- * Therefore, when we count the number of running statements, we need to exclude errors and
- * cancellations and count all statements that have not been closed so far.
- */
- def getTotalRunning: Int = synchronized {
- executionList.count {
- case (_, v) => isExecutionActive(v)
- }
- }
-
- def getSessionList: Seq[SessionInfo] = synchronized { sessionList.values.toSeq }
-
- def getSession(sessionId: String): Option[SessionInfo] = synchronized {
- sessionList.get(sessionId)
- }
-
- def getExecutionList: Seq[ExecutionInfo] = synchronized { executionList.values.toSeq }
-
- override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
- for {
- props <- Option(jobStart.properties)
- groupId <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
- (_, info) <- executionList if info.groupId == groupId
- } {
- info.jobId += jobStart.jobId.toString
- info.groupId = groupId
- }
- }
-
- def onSessionCreated(ip: String, sessionId: String, userName: String = "UNKNOWN"): Unit = {
- synchronized {
- val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName)
- sessionList.put(sessionId, info)
- trimSessionIfNecessary()
- }
- }
-
- def onSessionClosed(sessionId: String): Unit = synchronized {
- sessionList(sessionId).finishTimestamp = System.currentTimeMillis
- trimSessionIfNecessary()
- }
-
- def onStatementStart(
- id: String,
- sessionId: String,
- statement: String,
- groupId: String,
- userName: String = "UNKNOWN"): Unit = synchronized {
- val info = new ExecutionInfo(statement, sessionId, System.currentTimeMillis, userName)
- info.state = ExecutionState.STARTED
- executionList.put(id, info)
- trimExecutionIfNecessary()
- sessionList(sessionId).totalExecution += 1
- executionList(id).groupId = groupId
- }
-
- def onStatementParsed(id: String, executionPlan: String): Unit = synchronized {
- executionList(id).executePlan = executionPlan
- executionList(id).state = ExecutionState.COMPILED
- }
-
- def onStatementCanceled(id: String): Unit = synchronized {
- executionList(id).finishTimestamp = System.currentTimeMillis
- executionList(id).state = ExecutionState.CANCELED
- trimExecutionIfNecessary()
- }
-
- def onStatementError(id: String, errorMsg: String, errorTrace: String): Unit = synchronized {
- executionList(id).finishTimestamp = System.currentTimeMillis
- executionList(id).detail = errorMsg
- executionList(id).state = ExecutionState.FAILED
- trimExecutionIfNecessary()
- }
-
- def onStatementFinish(id: String): Unit = synchronized {
- executionList(id).finishTimestamp = System.currentTimeMillis
- executionList(id).state = ExecutionState.FINISHED
- trimExecutionIfNecessary()
- }
-
- def onOperationClosed(id: String): Unit = synchronized {
- executionList(id).closeTimestamp = System.currentTimeMillis
- executionList(id).state = ExecutionState.CLOSED
- }
-
- private def trimExecutionIfNecessary() = {
- if (executionList.size > retainedStatements) {
- val toRemove = math.max(retainedStatements / 10, 1)
- executionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s =>
- executionList.remove(s._1)
- }
- }
- }
-
- private def trimSessionIfNecessary() = {
- if (sessionList.size > retainedSessions) {
- val toRemove = math.max(retainedSessions / 10, 1)
- sessionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s =>
- sessionList.remove(s._1)
- }
- }
-
- }
- }
}
private[hive] class HiveThriftServer2(sqlContext: SQLContext)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 68197a9de856..76d07848f79a 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -77,7 +77,7 @@ private[hive] class SparkExecuteStatementOperation(
// RDDs will be cleaned automatically upon garbage collection.
logInfo(s"Close statement with $statementId")
cleanup(OperationState.CLOSED)
- HiveThriftServer2.listener.onOperationClosed(statementId)
+ HiveThriftServer2.eventManager.onOperationClosed(statementId)
}
def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int): Unit = {
@@ -195,7 +195,7 @@ private[hive] class SparkExecuteStatementOperation(
setState(OperationState.PENDING)
statementId = UUID.randomUUID().toString
logInfo(s"Submitting query '$statement' with $statementId")
- HiveThriftServer2.listener.onStatementStart(
+ HiveThriftServer2.eventManager.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
statement,
@@ -245,14 +245,14 @@ private[hive] class SparkExecuteStatementOperation(
case rejected: RejectedExecutionException =>
logError("Error submitting query in background, query rejected", rejected)
setState(OperationState.ERROR)
- HiveThriftServer2.listener.onStatementError(
+ HiveThriftServer2.eventManager.onStatementError(
statementId, rejected.getMessage, SparkUtils.exceptionString(rejected))
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case NonFatal(e) =>
logError(s"Error executing query in background", e)
setState(OperationState.ERROR)
- HiveThriftServer2.listener.onStatementError(
+ HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw new HiveSQLException(e)
}
@@ -284,7 +284,8 @@ private[hive] class SparkExecuteStatementOperation(
"in this session.")
case _ =>
}
- HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString())
+ HiveThriftServer2.eventManager.onStatementParsed(statementId,
+ result.queryExecution.toString())
iter = {
if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) {
resultList = None
@@ -315,12 +316,12 @@ private[hive] class SparkExecuteStatementOperation(
setState(OperationState.ERROR)
e match {
case hiveException: HiveSQLException =>
- HiveThriftServer2.listener.onStatementError(
+ HiveThriftServer2.eventManager.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
- HiveThriftServer2.listener.onStatementError(
+ HiveThriftServer2.eventManager.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error running query: " + root.toString, root)
}
@@ -329,7 +330,7 @@ private[hive] class SparkExecuteStatementOperation(
synchronized {
if (!getStatus.getState.isTerminal) {
setState(OperationState.FINISHED)
- HiveThriftServer2.listener.onStatementFinish(statementId)
+ HiveThriftServer2.eventManager.onStatementFinish(statementId)
}
}
sqlContext.sparkContext.clearJobGroup()
@@ -341,7 +342,7 @@ private[hive] class SparkExecuteStatementOperation(
if (!getStatus.getState.isTerminal) {
logInfo(s"Cancel query with $statementId")
cleanup(OperationState.CANCELED)
- HiveThriftServer2.listener.onStatementCanceled(statementId)
+ HiveThriftServer2.eventManager.onStatementCanceled(statementId)
}
}
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala
index 6c8a5b00992d..2945cfd200e4 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala
@@ -44,7 +44,7 @@ private[hive] class SparkGetCatalogsOperation(
override def close(): Unit = {
super.close()
- HiveThriftServer2.listener.onOperationClosed(statementId)
+ HiveThriftServer2.eventManager.onOperationClosed(statementId)
}
override def runInternal(): Unit = {
@@ -56,7 +56,7 @@ private[hive] class SparkGetCatalogsOperation(
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
- HiveThriftServer2.listener.onStatementStart(
+ HiveThriftServer2.eventManager.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
logMsg,
@@ -74,16 +74,16 @@ private[hive] class SparkGetCatalogsOperation(
setState(OperationState.ERROR)
e match {
case hiveException: HiveSQLException =>
- HiveThriftServer2.listener.onStatementError(
+ HiveThriftServer2.eventManager.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
- HiveThriftServer2.listener.onStatementError(
+ HiveThriftServer2.eventManager.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting catalogs: " + root.toString, root)
}
}
- HiveThriftServer2.listener.onStatementFinish(statementId)
+ HiveThriftServer2.eventManager.onStatementFinish(statementId)
}
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
index f845a2285b9a..ff7cbfeae13b 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
@@ -63,7 +63,7 @@ private[hive] class SparkGetColumnsOperation(
override def close(): Unit = {
super.close()
- HiveThriftServer2.listener.onOperationClosed(statementId)
+ HiveThriftServer2.eventManager.onOperationClosed(statementId)
}
override def runInternal(): Unit = {
@@ -78,7 +78,7 @@ private[hive] class SparkGetColumnsOperation(
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
- HiveThriftServer2.listener.onStatementStart(
+ HiveThriftServer2.eventManager.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
logMsg,
@@ -135,17 +135,17 @@ private[hive] class SparkGetColumnsOperation(
setState(OperationState.ERROR)
e match {
case hiveException: HiveSQLException =>
- HiveThriftServer2.listener.onStatementError(
+ HiveThriftServer2.eventManager.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
- HiveThriftServer2.listener.onStatementError(
+ HiveThriftServer2.eventManager.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting columns: " + root.toString, root)
}
}
- HiveThriftServer2.listener.onStatementFinish(statementId)
+ HiveThriftServer2.eventManager.onStatementFinish(statementId)
}
private def addToRowSet(
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
index 1cdd8918421b..d9c12b6ca9e6 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
@@ -54,7 +54,7 @@ private[hive] class SparkGetFunctionsOperation(
override def close(): Unit = {
super.close()
- HiveThriftServer2.listener.onOperationClosed(statementId)
+ HiveThriftServer2.eventManager.onOperationClosed(statementId)
}
override def runInternal(): Unit = {
@@ -81,7 +81,7 @@ private[hive] class SparkGetFunctionsOperation(
authorizeMetaGets(HiveOperationType.GET_FUNCTIONS, privObjs, cmdStr)
}
- HiveThriftServer2.listener.onStatementStart(
+ HiveThriftServer2.eventManager.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
logMsg,
@@ -110,16 +110,16 @@ private[hive] class SparkGetFunctionsOperation(
setState(OperationState.ERROR)
e match {
case hiveException: HiveSQLException =>
- HiveThriftServer2.listener.onStatementError(
+ HiveThriftServer2.eventManager.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
- HiveThriftServer2.listener.onStatementError(
+ HiveThriftServer2.eventManager.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting functions: " + root.toString, root)
}
}
- HiveThriftServer2.listener.onStatementFinish(statementId)
+ HiveThriftServer2.eventManager.onStatementFinish(statementId)
}
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
index 928610a6bcff..db19880d1b99 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala
@@ -50,7 +50,7 @@ private[hive] class SparkGetSchemasOperation(
override def close(): Unit = {
super.close()
- HiveThriftServer2.listener.onOperationClosed(statementId)
+ HiveThriftServer2.eventManager.onOperationClosed(statementId)
}
override def runInternal(): Unit = {
@@ -68,7 +68,7 @@ private[hive] class SparkGetSchemasOperation(
authorizeMetaGets(HiveOperationType.GET_TABLES, null, cmdStr)
}
- HiveThriftServer2.listener.onStatementStart(
+ HiveThriftServer2.eventManager.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
logMsg,
@@ -93,16 +93,16 @@ private[hive] class SparkGetSchemasOperation(
setState(OperationState.ERROR)
e match {
case hiveException: HiveSQLException =>
- HiveThriftServer2.listener.onStatementError(
+ HiveThriftServer2.eventManager.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
- HiveThriftServer2.listener.onStatementError(
+ HiveThriftServer2.eventManager.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting schemas: " + root.toString, root)
}
}
- HiveThriftServer2.listener.onStatementFinish(statementId)
+ HiveThriftServer2.eventManager.onStatementFinish(statementId)
}
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala
index ec03f1e148e6..b4093e58d3c0 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala
@@ -45,7 +45,7 @@ private[hive] class SparkGetTableTypesOperation(
override def close(): Unit = {
super.close()
- HiveThriftServer2.listener.onOperationClosed(statementId)
+ HiveThriftServer2.eventManager.onOperationClosed(statementId)
}
override def runInternal(): Unit = {
@@ -61,7 +61,7 @@ private[hive] class SparkGetTableTypesOperation(
authorizeMetaGets(HiveOperationType.GET_TABLETYPES, null)
}
- HiveThriftServer2.listener.onStatementStart(
+ HiveThriftServer2.eventManager.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
logMsg,
@@ -80,16 +80,16 @@ private[hive] class SparkGetTableTypesOperation(
setState(OperationState.ERROR)
e match {
case hiveException: HiveSQLException =>
- HiveThriftServer2.listener.onStatementError(
+ HiveThriftServer2.eventManager.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
- HiveThriftServer2.listener.onStatementError(
+ HiveThriftServer2.eventManager.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting table types: " + root.toString, root)
}
}
- HiveThriftServer2.listener.onStatementFinish(statementId)
+ HiveThriftServer2.eventManager.onStatementFinish(statementId)
}
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
index bf9cf7ad46d9..45c6d980aac4 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala
@@ -59,7 +59,7 @@ private[hive] class SparkGetTablesOperation(
override def close(): Unit = {
super.close()
- HiveThriftServer2.listener.onOperationClosed(statementId)
+ HiveThriftServer2.eventManager.onOperationClosed(statementId)
}
override def runInternal(): Unit = {
@@ -85,7 +85,7 @@ private[hive] class SparkGetTablesOperation(
authorizeMetaGets(HiveOperationType.GET_TABLES, privObjs, cmdStr)
}
- HiveThriftServer2.listener.onStatementStart(
+ HiveThriftServer2.eventManager.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
logMsg,
@@ -124,17 +124,17 @@ private[hive] class SparkGetTablesOperation(
setState(OperationState.ERROR)
e match {
case hiveException: HiveSQLException =>
- HiveThriftServer2.listener.onStatementError(
+ HiveThriftServer2.eventManager.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
- HiveThriftServer2.listener.onStatementError(
+ HiveThriftServer2.eventManager.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting tables: " + root.toString, root)
}
}
- HiveThriftServer2.listener.onStatementFinish(statementId)
+ HiveThriftServer2.eventManager.onStatementFinish(statementId)
}
private def addToRowSet(
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala
index 0d263b09d57d..dd5668a93f82 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala
@@ -44,7 +44,7 @@ private[hive] class SparkGetTypeInfoOperation(
override def close(): Unit = {
super.close()
- HiveThriftServer2.listener.onOperationClosed(statementId)
+ HiveThriftServer2.eventManager.onOperationClosed(statementId)
}
override def runInternal(): Unit = {
@@ -60,7 +60,7 @@ private[hive] class SparkGetTypeInfoOperation(
authorizeMetaGets(HiveOperationType.GET_TYPEINFO, null)
}
- HiveThriftServer2.listener.onStatementStart(
+ HiveThriftServer2.eventManager.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
logMsg,
@@ -98,16 +98,16 @@ private[hive] class SparkGetTypeInfoOperation(
setState(OperationState.ERROR)
e match {
case hiveException: HiveSQLException =>
- HiveThriftServer2.listener.onStatementError(
+ HiveThriftServer2.eventManager.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
- HiveThriftServer2.listener.onStatementError(
+ HiveThriftServer2.eventManager.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting type info: " + root.toString, root)
}
}
- HiveThriftServer2.listener.onStatementFinish(statementId)
+ HiveThriftServer2.eventManager.onStatementFinish(statementId)
}
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index 41b324d70c31..b3171897141c 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -55,7 +55,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext:
super.openSession(protocol, username, passwd, ipAddress, sessionConf, withImpersonation,
delegationToken)
val session = super.getSession(sessionHandle)
- HiveThriftServer2.listener.onSessionCreated(
+ HiveThriftServer2.eventManager.onSessionCreated(
session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
val ctx = if (sqlContext.conf.hiveThriftServerSingleSession) {
sqlContext
@@ -74,7 +74,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext:
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
- HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
+ HiveThriftServer2.eventManager.onSessionClosed(sessionHandle.getSessionId.toString)
val ctx = sparkSqlOperationManager.sessionToContexts.getOrDefault(sessionHandle, sqlContext)
ctx.sparkSession.sessionState.catalog.getTempViewNames().foreach(ctx.uncacheTable)
super.closeSession(sessionHandle)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala
new file mode 100644
index 000000000000..5cb78f6e6465
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver.ui
+
+import com.fasterxml.jackson.annotation.JsonIgnore
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.kvstore.{KVIndex, KVStore}
+
+/**
+ * Provides a view of a KVStore with methods that make it easy to query SQL-specific state. There's
+ * no state kept in this class, so it's ok to have multiple instances of it in an application.
+ */
+class HiveThriftServer2AppStatusStore(
+ store: KVStore,
+ val listener: Option[HiveThriftServer2Listener] = None) {
+
+ def getSessionList: Seq[SessionInfo] = {
+ store.view(classOf[SessionInfo]).asScala.toSeq
+ }
+
+ def getExecutionList: Seq[ExecutionInfo] = {
+ store.view(classOf[ExecutionInfo]).asScala.toSeq
+ }
+
+ def getOnlineSessionNum: Int = {
+ store.view(classOf[SessionInfo]).asScala.count(_.finishTimestamp == 0)
+ }
+
+ def getSession(sessionId: String): Option[SessionInfo] = {
+ try {
+ Some(store.read(classOf[SessionInfo], sessionId))
+ } catch {
+ case _: NoSuchElementException => None
+ }
+ }
+
+ def getExecution(executionId: String): Option[ExecutionInfo] = {
+ try {
+ Some(store.read(classOf[ExecutionInfo], executionId))
+ } catch {
+ case _: NoSuchElementException => None
+ }
+ }
+
+ /**
+ * When an error or a cancellation occurs, we set the finishTimestamp of the statement.
+ * Therefore, when we count the number of running statements, we need to exclude errors and
+ * cancellations and count all statements that have not been closed so far.
+ */
+ def getTotalRunning: Int = {
+ store.view(classOf[ExecutionInfo]).asScala.count(_.isExecutionActive)
+ }
+
+ def getSessionCount: Long = {
+ store.count(classOf[SessionInfo])
+ }
+
+ def getExecutionCount: Long = {
+ store.count(classOf[ExecutionInfo])
+ }
+}
+
+private[thriftserver] class SessionInfo(
+ @KVIndexParam val sessionId: String,
+ val startTimestamp: Long,
+ val ip: String,
+ val userName: String,
+ val finishTimestamp: Long,
+ val totalExecution: Long) {
+ @JsonIgnore @KVIndex("finishTime")
+ private def finishTimeIndex: Long = if (finishTimestamp > 0L ) finishTimestamp else -1L
+ def totalTime: Long = {
+ if (finishTimestamp == 0L) {
+ System.currentTimeMillis - startTimestamp
+ } else {
+ finishTimestamp - startTimestamp
+ }
+ }
+}
+
+private[thriftserver] class ExecutionInfo(
+ @KVIndexParam val execId: String,
+ val statement: String,
+ val sessionId: String,
+ val startTimestamp: Long,
+ val userName: String,
+ val finishTimestamp: Long,
+ val closeTimestamp: Long,
+ val executePlan: String,
+ val detail: String,
+ val state: ExecutionState.Value,
+ val jobId: ArrayBuffer[String],
+ val groupId: String) {
+ @JsonIgnore @KVIndex("finishTime")
+ private def finishTimeIndex: Long = if (finishTimestamp > 0L && !isExecutionActive) {
+ finishTimestamp
+ } else -1L
+
+ @JsonIgnore @KVIndex("isExecutionActive")
+ def isExecutionActive: Boolean = {
+ !(state == ExecutionState.FAILED ||
+ state == ExecutionState.CANCELED ||
+ state == ExecutionState.CLOSED)
+ }
+
+ def totalTime(endTime: Long): Long = {
+ if (endTime == 0L) {
+ System.currentTimeMillis - startTimestamp
+ } else {
+ endTime - startTimestamp
+ }
+ }
+}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2EventManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2EventManager.scala
new file mode 100644
index 000000000000..fa04c67896a6
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2EventManager.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.thriftserver.ui
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.SparkListenerEvent
+
+/**
+ * This class manages events generated by the thriftserver application. It converts the
+ * operation and session events to listener events and post it into the live listener bus.
+ */
+private[thriftserver] class HiveThriftServer2EventManager(sc: SparkContext) {
+
+ def postLiveListenerBus(event: SparkListenerEvent): Unit = {
+ sc.listenerBus.post(event)
+ }
+
+ def onSessionCreated(ip: String, sessionId: String, userName: String = "UNKNOWN"): Unit = {
+ postLiveListenerBus(SparkListenerThriftServerSessionCreated(ip, sessionId,
+ userName, System.currentTimeMillis()))
+ }
+
+ def onSessionClosed(sessionId: String): Unit = {
+ postLiveListenerBus(SparkListenerThriftServerSessionClosed(sessionId,
+ System.currentTimeMillis()))
+ }
+
+ def onStatementStart(
+ id: String,
+ sessionId: String,
+ statement: String,
+ groupId: String,
+ userName: String = "UNKNOWN"): Unit = {
+ postLiveListenerBus(SparkListenerThriftServerOperationStart(id, sessionId, statement, groupId,
+ System.currentTimeMillis(), userName))
+ }
+
+ def onStatementParsed(id: String, executionPlan: String): Unit = {
+ postLiveListenerBus(SparkListenerThriftServerOperationParsed(id, executionPlan))
+ }
+
+ def onStatementCanceled(id: String): Unit = {
+ postLiveListenerBus(SparkListenerThriftServerOperationCanceled(id, System.currentTimeMillis()))
+ }
+
+ def onStatementError(id: String, errorMsg: String, errorTrace: String): Unit = {
+ postLiveListenerBus(SparkListenerThriftServerOperationError(id, errorMsg, errorTrace,
+ System.currentTimeMillis()))
+ }
+
+ def onStatementFinish(id: String): Unit = {
+ postLiveListenerBus(SparkListenerThriftServerOperationFinish(id, System.currentTimeMillis()))
+
+ }
+
+ def onOperationClosed(id: String): Unit = {
+ postLiveListenerBus(SparkListenerThriftServerOperationClosed(id, System.currentTimeMillis()))
+ }
+}
+
+private[thriftserver] case class SparkListenerThriftServerSessionCreated(
+ ip: String,
+ sessionId: String,
+ userName: String,
+ startTime: Long) extends SparkListenerEvent
+
+private[thriftserver] case class SparkListenerThriftServerSessionClosed(
+ sessionId: String, finishTime: Long) extends SparkListenerEvent
+
+private[thriftserver] case class SparkListenerThriftServerOperationStart(
+ id: String,
+ sessionId: String,
+ statement: String,
+ groupId: String,
+ startTime: Long,
+ userName: String = "UNKNOWN") extends SparkListenerEvent
+
+private[thriftserver] case class SparkListenerThriftServerOperationParsed(
+ id: String,
+ executionPlan: String) extends SparkListenerEvent
+
+private[thriftserver] case class SparkListenerThriftServerOperationCanceled(
+ id: String, finishTime: Long) extends SparkListenerEvent
+
+private[thriftserver] case class SparkListenerThriftServerOperationError(
+ id: String,
+ errorMsg: String,
+ errorTrace: String,
+ finishTime: Long) extends SparkListenerEvent
+
+private[thriftserver] case class SparkListenerThriftServerOperationFinish(
+ id: String,
+ finishTime: Long) extends SparkListenerEvent
+
+private[thriftserver] case class SparkListenerThriftServerOperationClosed(
+ id: String,
+ closeTime: Long) extends SparkListenerEvent
+
+
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2HistoryServerPlugin.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2HistoryServerPlugin.scala
new file mode 100644
index 000000000000..aec4125801f6
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2HistoryServerPlugin.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver.ui
+
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.SparkListener
+import org.apache.spark.status.{AppHistoryServerPlugin, ElementTrackingStore}
+import org.apache.spark.ui.SparkUI
+
+class HiveThriftServer2HistoryServerPlugin extends AppHistoryServerPlugin {
+
+ override def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener] = {
+ Seq(new HiveThriftServer2Listener(store, conf, None, false))
+ }
+
+ override def setupUI(ui: SparkUI): Unit = {
+ val store = new HiveThriftServer2AppStatusStore(ui.store.store)
+ if (store.getSessionCount > 0) {
+ new ThriftServerTab(store, ui)
+ }
+ }
+
+ override def displayOrder: Int = 1
+}
+
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
new file mode 100644
index 000000000000..6d0a506fa94d
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver.ui
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hive.service.server.HiveServer2
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity}
+
+/**
+ * An inner sparkListener called in sc.stop to clean up the HiveThriftServer2
+ */
+private[thriftserver] class HiveThriftServer2Listener(
+ kvstore: ElementTrackingStore,
+ sparkConf: SparkConf,
+ server: Option[HiveServer2],
+ live: Boolean = true) extends SparkListener {
+
+ private val sessionList = new ConcurrentHashMap[String, LiveSessionData]()
+ private val executionList = new ConcurrentHashMap[String, LiveExecutionData]()
+
+ private val (retainedStatements: Int, retainedSessions: Int) = {
+ (sparkConf.get(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT),
+ sparkConf.get(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT))
+ }
+
+ // How often to update live entities. -1 means "never update" when replaying applications,
+ // meaning only the last write will happen. For live applications, this avoids a few
+ // operations that we can live without when rapidly processing incoming events.
+ private val liveUpdatePeriodNs = if (live) sparkConf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
+
+ // Returns true if this listener has no live data. Exposed for tests only.
+ private[thriftserver] def noLiveData(): Boolean = {
+ sessionList.isEmpty && executionList.isEmpty
+ }
+
+ kvstore.addTrigger(classOf[SessionInfo], retainedSessions) { count =>
+ cleanupSession(count)
+ }
+
+ kvstore.addTrigger(classOf[ExecutionInfo], retainedStatements) { count =>
+ cleanupExecutions(count)
+ }
+
+ kvstore.onFlush {
+ if (!live) {
+ flush((entity: LiveEntity) => updateStoreWithTriggerEnabled(entity))
+ }
+ }
+
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+ if (live) {
+ server.foreach(_.stop())
+ }
+ }
+
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+ val properties = jobStart.properties
+ if (properties != null) {
+ val groupId = properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+ if (groupId != null) {
+ updateJobDetails(jobStart.jobId.toString, groupId)
+ }
+ }
+ }
+
+ private def updateJobDetails(jobId: String, groupId: String): Unit = {
+ val execList = executionList.values().asScala.filter(_.groupId == groupId).toSeq
+ if (execList.nonEmpty) {
+ execList.foreach { exec =>
+ exec.jobId += jobId.toString
+ updateLiveStore(exec)
+ }
+ } else {
+ // It may possible that event reordering happens, such a way that JobStart event come after
+ // Execution end event (Refer SPARK-27019). To handle that situation, if occurs in
+ // Thriftserver, following code will take care. Here will come only if JobStart event comes
+ // after Execution End event.
+ val storeExecInfo = kvstore.view(classOf[ExecutionInfo]).asScala.filter(_.groupId == groupId)
+ storeExecInfo.foreach { exec =>
+ val liveExec = getOrCreateExecution(exec.execId, exec.statement, exec.sessionId,
+ exec.startTimestamp, exec.userName)
+ liveExec.jobId += jobId.toString
+ updateStoreWithTriggerEnabled(liveExec)
+ executionList.remove(liveExec.execId)
+ }
+ }
+ }
+
+ override def onOtherEvent(event: SparkListenerEvent): Unit = {
+ event match {
+ case e: SparkListenerThriftServerSessionCreated => onSessionCreated(e)
+ case e: SparkListenerThriftServerSessionClosed => onSessionClosed(e)
+ case e: SparkListenerThriftServerOperationStart => onOperationStart(e)
+ case e: SparkListenerThriftServerOperationParsed => onOperationParsed(e)
+ case e: SparkListenerThriftServerOperationCanceled => onOperationCanceled(e)
+ case e: SparkListenerThriftServerOperationError => onOperationError(e)
+ case e: SparkListenerThriftServerOperationFinish => onOperationFinished(e)
+ case e: SparkListenerThriftServerOperationClosed => onOperationClosed(e)
+ case _ => // Ignore
+ }
+ }
+
+ private def onSessionCreated(e: SparkListenerThriftServerSessionCreated): Unit = {
+ val session = getOrCreateSession(e.sessionId, e.startTime, e.ip, e.userName)
+ sessionList.put(e.sessionId, session)
+ updateLiveStore(session)
+ }
+
+ private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit = {
+ val session = sessionList.get(e.sessionId)
+ session.finishTimestamp = e.finishTime
+ updateStoreWithTriggerEnabled(session)
+ sessionList.remove(e.sessionId)
+ }
+
+ private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit = {
+ val info = getOrCreateExecution(
+ e.id,
+ e.statement,
+ e.sessionId,
+ e.startTime,
+ e.userName)
+
+ info.state = ExecutionState.STARTED
+ executionList.put(e.id, info)
+ sessionList.get(e.sessionId).totalExecution += 1
+ executionList.get(e.id).groupId = e.groupId
+ updateLiveStore(executionList.get(e.id))
+ updateLiveStore(sessionList.get(e.sessionId))
+ }
+
+ private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit = {
+ executionList.get(e.id).executePlan = e.executionPlan
+ executionList.get(e.id).state = ExecutionState.COMPILED
+ updateLiveStore(executionList.get(e.id))
+ }
+
+ private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit = {
+ executionList.get(e.id).finishTimestamp = e.finishTime
+ executionList.get(e.id).state = ExecutionState.CANCELED
+ updateLiveStore(executionList.get(e.id))
+ }
+
+ private def onOperationError(e: SparkListenerThriftServerOperationError): Unit = {
+ executionList.get(e.id).finishTimestamp = e.finishTime
+ executionList.get(e.id).detail = e.errorMsg
+ executionList.get(e.id).state = ExecutionState.FAILED
+ updateLiveStore(executionList.get(e.id))
+ }
+
+ private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit = {
+ executionList.get(e.id).finishTimestamp = e.finishTime
+ executionList.get(e.id).state = ExecutionState.FINISHED
+ updateLiveStore(executionList.get(e.id))
+ }
+
+ private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit = {
+ executionList.get(e.id).closeTimestamp = e.closeTime
+ executionList.get(e.id).state = ExecutionState.CLOSED
+ updateStoreWithTriggerEnabled(executionList.get(e.id))
+ executionList.remove(e.id)
+ }
+
+ // Update both live and history stores. Trigger is enabled by default, hence
+ // it will cleanup the entity which exceeds the threshold.
+ def updateStoreWithTriggerEnabled(entity: LiveEntity): Unit = {
+ entity.write(kvstore, System.nanoTime(), checkTriggers = true)
+ }
+
+ // Update only live stores. If trigger is enabled, it will cleanup entity
+ // which exceeds the threshold.
+ def updateLiveStore(entity: LiveEntity, trigger: Boolean = false): Unit = {
+ val now = System.nanoTime()
+ if (live && liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > liveUpdatePeriodNs) {
+ entity.write(kvstore, now, checkTriggers = trigger)
+ }
+ }
+
+ /** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush them. */
+ private def flush(entityFlushFunc: LiveEntity => Unit): Unit = {
+ sessionList.values.asScala.foreach(entityFlushFunc)
+ executionList.values.asScala.foreach(entityFlushFunc)
+ }
+
+ private def getOrCreateSession(
+ sessionId: String,
+ startTime: Long,
+ ip: String,
+ username: String): LiveSessionData = {
+ sessionList.computeIfAbsent(sessionId,
+ (_: String) => new LiveSessionData(sessionId, startTime, ip, username))
+ }
+
+ private def getOrCreateExecution(
+ execId: String, statement: String,
+ sessionId: String, startTimestamp: Long,
+ userName: String): LiveExecutionData = {
+ executionList.computeIfAbsent(execId,
+ (_: String) => new LiveExecutionData(execId, statement, sessionId, startTimestamp, userName))
+ }
+
+ private def cleanupExecutions(count: Long): Unit = {
+ val countToDelete = calculateNumberToRemove(count, retainedStatements)
+ if (countToDelete <= 0L) {
+ return
+ }
+ val view = kvstore.view(classOf[ExecutionInfo]).index("finishTime").first(0L)
+ val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>
+ j.finishTimestamp != 0
+ }
+ toDelete.foreach { j => kvstore.delete(j.getClass, j.execId) }
+ }
+
+ private def cleanupSession(count: Long): Unit = {
+ val countToDelete = calculateNumberToRemove(count, retainedSessions)
+ if (countToDelete <= 0L) {
+ return
+ }
+ val view = kvstore.view(classOf[SessionInfo]).index("finishTime").first(0L)
+ val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>
+ j.finishTimestamp != 0L
+ }
+
+ toDelete.foreach { j => kvstore.delete(j.getClass, j.sessionId) }
+ }
+
+ /**
+ * Remove at least (retainedSize / 10) items to reduce friction. Because tracking may be done
+ * asynchronously, this method may return 0 in case enough items have been deleted already.
+ */
+ private def calculateNumberToRemove(dataSize: Long, retainedSize: Long): Long = {
+ if (dataSize > retainedSize) {
+ math.max(retainedSize / 10L, dataSize - retainedSize)
+ } else {
+ 0L
+ }
+ }
+}
+
+private[thriftserver] class LiveExecutionData(
+ val execId: String,
+ val statement: String,
+ val sessionId: String,
+ val startTimestamp: Long,
+ val userName: String) extends LiveEntity {
+
+ var finishTimestamp: Long = 0L
+ var closeTimestamp: Long = 0L
+ var executePlan: String = ""
+ var detail: String = ""
+ var state: ExecutionState.Value = ExecutionState.STARTED
+ val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
+ var groupId: String = ""
+
+ override protected def doUpdate(): Any = {
+ new ExecutionInfo(
+ execId,
+ statement,
+ sessionId,
+ startTimestamp,
+ userName,
+ finishTimestamp,
+ closeTimestamp,
+ executePlan,
+ detail,
+ state,
+ jobId,
+ groupId)
+ }
+}
+
+private[thriftserver] class LiveSessionData(
+ val sessionId: String,
+ val startTimeStamp: Long,
+ val ip: String,
+ val username: String) extends LiveEntity {
+
+ var finishTimestamp: Long = 0L
+ var totalExecution: Int = 0
+
+ override protected def doUpdate(): Any = {
+ new SessionInfo(
+ sessionId,
+ startTimeStamp,
+ ip,
+ username,
+ finishTimestamp,
+ totalExecution)
+ }
+}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
index d3351f3d6ca1..adfda0c56585 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
@@ -28,7 +28,6 @@ import scala.xml.{Node, Unparsed}
import org.apache.commons.text.StringEscapeUtils
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.{ExecutionInfo, SessionInfo}
import org.apache.spark.sql.hive.thriftserver.ui.ToolTips._
import org.apache.spark.ui._
import org.apache.spark.ui.UIUtils._
@@ -36,23 +35,24 @@ import org.apache.spark.util.Utils
/** Page for Spark Web UI that shows statistics of the thrift server */
private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("") with Logging {
-
- private val listener = parent.listener
- private val startTime = Calendar.getInstance().getTime()
+ private val store = parent.store
+ private val startTime = parent.startTime
/** Render the page */
def render(request: HttpServletRequest): Seq[Node] = {
- val content =
- listener.synchronized { // make sure all parts in this page are consistent
- generateBasicStats() ++
-
++
+ val content = store.synchronized { // make sure all parts in this page are consistent
+ generateBasicStats() ++
+
++