Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sql/hive-thriftserver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@
<groupId>net.sf.jpam</groupId>
<artifactId>jpam</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, S
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
import org.apache.spark.sql.hive.thriftserver.ui.{HiveThriftServer2AppStatusStore, LiveExecutionData, LiveSessionData, ThriftServerTab}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity}
import org.apache.spark.util.{ShutdownHookManager, Utils}

/**
Expand All @@ -62,10 +63,13 @@ object HiveThriftServer2 extends Logging {

server.init(executionHive.conf)
server.start()
listener = new HiveThriftServer2Listener(server, sqlContext.conf)
val kvstore = sqlContext.sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]
listener = new HiveThriftServer2Listener(kvstore, server, sqlContext.conf)
sqlContext.sparkContext.addSparkListener(listener)
uiTab = if (sqlContext.sparkContext.getConf.get(UI_ENABLED)) {
Some(new ThriftServerTab(sqlContext.sparkContext))
Some(new ThriftServerTab(
new HiveThriftServer2AppStatusStore(kvstore, Some(listener)),
sqlContext.sparkContext))
} else {
None
}
Expand Down Expand Up @@ -101,10 +105,16 @@ object HiveThriftServer2 extends Logging {
server.init(executionHive.conf)
server.start()
logInfo("HiveThriftServer2 started")
listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf)
val kvstore = SparkSQLEnv.sparkContext.statusStore.store
.asInstanceOf[ElementTrackingStore]
listener = new HiveThriftServer2Listener(
kvstore,
server,
SparkSQLEnv.sqlContext.conf)
SparkSQLEnv.sparkContext.addSparkListener(listener)
uiTab = if (SparkSQLEnv.sparkContext.getConf.get(UI_ENABLED)) {
Some(new ThriftServerTab(SparkSQLEnv.sparkContext))
Some(new ThriftServerTab(new HiveThriftServer2AppStatusStore(kvstore, Some(listener)),
SparkSQLEnv.sparkContext))
} else {
None
}
Expand All @@ -125,16 +135,20 @@ object HiveThriftServer2 extends Logging {
val sessionId: String,
val startTimestamp: Long,
val ip: String,
val userName: String) {
val userName: String) extends LiveEntity {
var finishTimestamp: Long = 0L
var totalExecution: Int = 0
def totalTime: Long = {
if (finishTimestamp == 0L) {
System.currentTimeMillis - startTimestamp
} else {
finishTimestamp - startTimestamp
}

override protected def doUpdate(): Any = {
new LiveSessionData(
sessionId,
startTimestamp,
ip,
userName,
finishTimestamp,
totalExecution)
}

}

private[thriftserver] object ExecutionState extends Enumeration {
Expand All @@ -143,23 +157,33 @@ object HiveThriftServer2 extends Logging {
}

private[thriftserver] class ExecutionInfo(
val execId: String,
val statement: String,
val sessionId: String,
val startTimestamp: Long,
val userName: String) {
val userName: String) extends LiveEntity {
var finishTimestamp: Long = 0L
var closeTimestamp: Long = 0L
var executePlan: String = ""
var detail: String = ""
var state: ExecutionState.Value = ExecutionState.STARTED
val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
var groupId: String = ""
def totalTime(endTime: Long): Long = {
if (endTime == 0L) {
System.currentTimeMillis - startTimestamp
} else {
endTime - startTimestamp
}

override protected def doUpdate(): Any = {
new LiveExecutionData(
execId,
statement,
sessionId,
startTimestamp,
userName,
finishTimestamp,
closeTimestamp,
executePlan,
detail,
state,
jobId,
groupId)
}
}

Expand All @@ -168,6 +192,7 @@ object HiveThriftServer2 extends Logging {
* An inner sparkListener called in sc.stop to clean up the HiveThriftServer2
*/
private[thriftserver] class HiveThriftServer2Listener(
val kvstore: ElementTrackingStore,
val server: HiveServer2,
val conf: SQLConf) extends SparkListener {

Expand All @@ -179,35 +204,14 @@ object HiveThriftServer2 extends Logging {
private val retainedStatements = conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT)
private val retainedSessions = conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT)

def getOnlineSessionNum: Int = synchronized {
sessionList.count(_._2.finishTimestamp == 0)
kvstore.addTrigger(classOf[LiveSessionData], retainedSessions) { count =>
cleanupSession(count)
}

def isExecutionActive(execInfo: ExecutionInfo): Boolean = {
!(execInfo.state == ExecutionState.FAILED ||
execInfo.state == ExecutionState.CANCELED ||
execInfo.state == ExecutionState.CLOSED)
kvstore.addTrigger(classOf[LiveExecutionData], retainedStatements) { count =>
cleanupExecutions(count)
}

/**
* When an error or a cancellation occurs, we set the finishTimestamp of the statement.
* Therefore, when we count the number of running statements, we need to exclude errors and
* cancellations and count all statements that have not been closed so far.
*/
def getTotalRunning: Int = synchronized {
executionList.count {
case (_, v) => isExecutionActive(v)
}
}

def getSessionList: Seq[SessionInfo] = synchronized { sessionList.values.toSeq }

def getSession(sessionId: String): Option[SessionInfo] = synchronized {
sessionList.get(sessionId)
}

def getExecutionList: Seq[ExecutionInfo] = synchronized { executionList.values.toSeq }

override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
for {
props <- Option(jobStart.properties)
Expand All @@ -216,20 +220,22 @@ object HiveThriftServer2 extends Logging {
} {
info.jobId += jobStart.jobId.toString
info.groupId = groupId
updateLiveStore(info)
}
}

def onSessionCreated(ip: String, sessionId: String, userName: String = "UNKNOWN"): Unit = {
synchronized {
val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName)
sessionList.put(sessionId, info)
trimSessionIfNecessary()
updateLiveStore(info)
}
}

def onSessionClosed(sessionId: String): Unit = synchronized {
sessionList(sessionId).finishTimestamp = System.currentTimeMillis
trimSessionIfNecessary()
updateLiveStore(sessionList(sessionId))
sessionList.remove(sessionId)
}

def onStatementStart(
Expand All @@ -238,60 +244,86 @@ object HiveThriftServer2 extends Logging {
statement: String,
groupId: String,
userName: String = "UNKNOWN"): Unit = synchronized {
val info = new ExecutionInfo(statement, sessionId, System.currentTimeMillis, userName)
val info = new ExecutionInfo(id, statement, sessionId, System.currentTimeMillis, userName)
info.state = ExecutionState.STARTED
executionList.put(id, info)
trimExecutionIfNecessary()
sessionList(sessionId).totalExecution += 1
executionList(id).groupId = groupId
updateLiveStore(sessionList(sessionId))
updateLiveStore(executionList(id))
}

def onStatementParsed(id: String, executionPlan: String): Unit = synchronized {
executionList(id).executePlan = executionPlan
executionList(id).state = ExecutionState.COMPILED
updateLiveStore(executionList(id))
}

def onStatementCanceled(id: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.CANCELED
trimExecutionIfNecessary()
updateLiveStore(executionList(id))
}

def onStatementError(id: String, errorMsg: String, errorTrace: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).detail = errorMsg
executionList(id).state = ExecutionState.FAILED
trimExecutionIfNecessary()
updateLiveStore(executionList(id))
}

def onStatementFinish(id: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.FINISHED
trimExecutionIfNecessary()
updateLiveStore(executionList(id))
}

def onOperationClosed(id: String): Unit = synchronized {
executionList(id).closeTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.CLOSED
updateLiveStore(executionList(id))
executionList.remove(id)
}

private def cleanupExecutions(count: Long): Unit = {
val countToDelete = calculateNumberToRemove(count, retainedStatements)
if (countToDelete <= 0L) {
return
}
val view = kvstore.view(classOf[LiveExecutionData]).index("execId").first(0L)
val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>
j.finishTimestamp != 0
}
toDelete.foreach { j => kvstore.delete(j.getClass(), j.execId) }
}

private def trimExecutionIfNecessary() = {
if (executionList.size > retainedStatements) {
val toRemove = math.max(retainedStatements / 10, 1)
executionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s =>
executionList.remove(s._1)
}
private def cleanupSession(count: Long): Unit = {
val countToDelete = calculateNumberToRemove(count, retainedSessions)
if (countToDelete <= 0L) {
return
}
val view = kvstore.view(classOf[LiveSessionData]).index("sessionId").first(0L)
val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>
j.finishTimestamp != 0L
}
toDelete.foreach { j => kvstore.delete(j.getClass(), j.sessionId) }
}

private def trimSessionIfNecessary() = {
if (sessionList.size > retainedSessions) {
val toRemove = math.max(retainedSessions / 10, 1)
sessionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s =>
sessionList.remove(s._1)
}
/**
* Remove at least (retainedSize / 10) items to reduce friction. Because tracking may be done
* asynchronously, this method may return 0 in case enough items have been deleted already.
*/
private def calculateNumberToRemove(dataSize: Long, retainedSize: Long): Long = {
if (dataSize > retainedSize) {
math.max(retainedSize / 10L, dataSize - retainedSize)
} else {
0L
}
}

private def updateLiveStore(entity: LiveEntity): Unit = {
val now = System.nanoTime()
entity.write(kvstore, now)
}
}
}
Expand Down
Loading