Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ private[spark] object SQLConf {

// This is only used for the thriftserver
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
val THRIFTSERVER_UI_STATEMENT_LIMIT = "spark.sql.thriftserver.ui.retainedStatements"
val THRIFTSERVER_UI_SESSION_LIMIT = "spark.sql.thriftserver.ui.retainedSessions"

// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = "spark.sql.sources.default"
Expand Down
12 changes: 12 additions & 0 deletions sql/hive-thriftserver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@
<groupId>${hive.group}</groupId>
<artifactId>hive-beeline</artifactId>
</dependency>
<!-- Added for selenium: -->
<dependency>
<groupId>org.seleniumhq.selenium</groupId>
<artifactId>selenium-java</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,27 @@ import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService}
import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor}
import org.apache.spark.sql.SQLConf

import org.apache.spark.Logging
import org.apache.spark.{SparkContext, SparkConf, Logging}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListener}
import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListenerApplicationEnd, SparkListener}
import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
import org.apache.spark.util.Utils

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
* The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a
* `HiveThriftServer2` thrift server.
*/
object HiveThriftServer2 extends Logging {
var LOG = LogFactory.getLog(classOf[HiveServer2])
var uiTab: Option[ThriftServerTab] = _
var listener: HiveThriftServer2Listener = _

/**
* :: DeveloperApi ::
Expand All @@ -46,7 +53,13 @@ object HiveThriftServer2 extends Logging {
val server = new HiveThriftServer2(sqlContext)
server.init(sqlContext.hiveconf)
server.start()
sqlContext.sparkContext.addSparkListener(new HiveThriftServer2Listener(server))
listener = new HiveThriftServer2Listener(server, sqlContext.conf)
sqlContext.sparkContext.addSparkListener(listener)
uiTab = if (sqlContext.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) {
Some(new ThriftServerTab(sqlContext.sparkContext))
} else {
None
}
}

def main(args: Array[String]) {
Expand All @@ -58,30 +71,164 @@ object HiveThriftServer2 extends Logging {
logInfo("Starting SparkContext")
SparkSQLEnv.init()

Utils.addShutdownHook { () => SparkSQLEnv.stop() }
Utils.addShutdownHook { () =>
SparkSQLEnv.stop()
uiTab.foreach(_.detach())
}

try {
val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)
server.init(SparkSQLEnv.hiveContext.hiveconf)
server.start()
logInfo("HiveThriftServer2 started")
SparkSQLEnv.sparkContext.addSparkListener(new HiveThriftServer2Listener(server))
listener = new HiveThriftServer2Listener(server, SparkSQLEnv.hiveContext.conf)
SparkSQLEnv.sparkContext.addSparkListener(listener)
uiTab = if (SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) {
Some(new ThriftServerTab(SparkSQLEnv.sparkContext))
} else {
None
}
} catch {
case e: Exception =>
logError("Error starting HiveThriftServer2", e)
System.exit(-1)
}
}

private[thriftserver] class SessionInfo(
val sessionId: String,
val startTimestamp: Long,
val ip: String,
val userName: String) {
var finishTimestamp: Long = 0L
var totalExecution: Int = 0
def totalTime: Long = {
if (finishTimestamp == 0L) {
System.currentTimeMillis - startTimestamp
} else {
finishTimestamp - startTimestamp
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to this section of Databricks Scala Guide, I think we prefer System.nanoTime to do timing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, didn't notice that we need to display wall time in the web UI, thus System.nanoTime isn't proper here.

}

private[thriftserver] object ExecutionState extends Enumeration {
val STARTED, COMPILED, FAILED, FINISHED = Value
type ExecutionState = Value
}

private[thriftserver] class ExecutionInfo(
val statement: String,
val sessionId: String,
val startTimestamp: Long,
val userName: String) {
var finishTimestamp: Long = 0L
var executePlan: String = ""
var detail: String = ""
var state: ExecutionState.Value = ExecutionState.STARTED
val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
var groupId: String = ""
def totalTime: Long = {
if (finishTimestamp == 0L) {
System.currentTimeMillis - startTimestamp
} else {
finishTimestamp - startTimestamp
}
}
}


/**
* A inner sparkListener called in sc.stop to clean up the HiveThriftServer2
*/
class HiveThriftServer2Listener(val server: HiveServer2) extends SparkListener {
private[thriftserver] class HiveThriftServer2Listener(
val server: HiveServer2,
val conf: SQLConf) extends SparkListener {

override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
server.stop()
}
}

val sessionList = new mutable.LinkedHashMap[String, SessionInfo]
val executionList = new mutable.LinkedHashMap[String, ExecutionInfo]
val retainedStatements =
conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT, "200").toInt
val retainedSessions =
conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT, "200").toInt
var totalRunning = 0

override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
for {
props <- Option(jobStart.properties)
groupId <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
(_, info) <- executionList if info.groupId == groupId
} {
info.jobId += jobStart.jobId.toString
info.groupId = groupId
}
}

def onSessionCreated(ip: String, sessionId: String, userName: String = "UNKNOWN"): Unit = {
val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName)
sessionList.put(sessionId, info)
trimSessionIfNecessary()
}

def onSessionClosed(sessionId: String): Unit = {
sessionList(sessionId).finishTimestamp = System.currentTimeMillis
}

def onStatementStart(
id: String,
sessionId: String,
statement: String,
groupId: String,
userName: String = "UNKNOWN"): Unit = {
val info = new ExecutionInfo(statement, sessionId, System.currentTimeMillis, userName)
info.state = ExecutionState.STARTED
executionList.put(id, info)
trimExecutionIfNecessary()
sessionList(sessionId).totalExecution += 1
executionList(id).groupId = groupId
totalRunning += 1
}

def onStatementParsed(id: String, executionPlan: String): Unit = {
executionList(id).executePlan = executionPlan
executionList(id).state = ExecutionState.COMPILED
}

def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).detail = errorMessage
executionList(id).state = ExecutionState.FAILED
totalRunning -= 1
}

def onStatementFinish(id: String): Unit = {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.FINISHED
totalRunning -= 1
}

private def trimExecutionIfNecessary() = synchronized {
if (executionList.size > retainedStatements) {
val toRemove = math.max(retainedStatements / 10, 1)
executionList.take(toRemove).foreach { s =>
executionList.remove(s._1)
}
}
}

private def trimSessionIfNecessary() = synchronized {
if (sessionList.size > retainedSessions) {
val toRemove = math.max(retainedSessions / 10, 1)
sessionList.take(toRemove).foreach { s =>
sessionList.remove(s._1)
}
}

}
}
}

private[hive] class HiveThriftServer2(hiveContext: HiveContext)
Expand Down
Loading