+ }
+}
+
+private[ui] class SqlStatsTableRow(
+ val jobTag: String,
+ val jobId: Seq[String],
+ val sqlExecId: Seq[String],
+ val duration: Long,
+ val executionTime: Long,
+ val sparkSessionTags: Seq[String],
+ val executionInfo: ExecutionInfo)
+
+private[ui] class SqlStatsTableDataSource(
+ info: Seq[ExecutionInfo],
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean)
+ extends PagedDataSource[SqlStatsTableRow](pageSize) {
+
+ // Convert ExecutionInfo to SqlStatsTableRow which contains the final contents to show in
+ // the table so that we can avoid creating duplicate contents during sorting the data
+ private val data = info.map(sqlStatsTableRow).sorted(ordering(sortColumn, desc))
+
+ override def dataSize: Int = data.size
+
+ override def sliceData(from: Int, to: Int): Seq[SqlStatsTableRow] = data.slice(from, to)
+
+ private def sqlStatsTableRow(executionInfo: ExecutionInfo): SqlStatsTableRow = {
+ val duration = executionInfo.totalTime(executionInfo.closeTimestamp)
+ val executionTime = executionInfo.totalTime(executionInfo.finishTimestamp)
+ val jobId = executionInfo.jobId.toSeq.sorted
+ val sqlExecId = executionInfo.sqlExecId.toSeq.sorted
+ val sparkSessionTags = executionInfo.sparkSessionTags.toSeq.sorted
+
+ new SqlStatsTableRow(
+ executionInfo.jobTag,
+ jobId,
+ sqlExecId,
+ duration,
+ executionTime,
+ sparkSessionTags,
+ executionInfo)
+ }
+
+ /**
+ * Return Ordering according to sortColumn and desc.
+ */
+ private def ordering(sortColumn: String, desc: Boolean): Ordering[SqlStatsTableRow] = {
+ val ordering: Ordering[SqlStatsTableRow] = sortColumn match {
+ case "User" => Ordering.by(_.executionInfo.userId)
+ case "Operation ID" => Ordering.by(_.executionInfo.operationId)
+ case "Job ID" => Ordering.by(_.jobId.headOption)
+ case "SQL Query ID" => Ordering.by(_.sqlExecId.headOption)
+ case "Session ID" => Ordering.by(_.executionInfo.sessionId)
+ case "Start Time" => Ordering.by(_.executionInfo.startTimestamp)
+ case "Finish Time" => Ordering.by(_.executionInfo.finishTimestamp)
+ case "Close Time" => Ordering.by(_.executionInfo.closeTimestamp)
+ case "Execution Time" => Ordering.by(_.executionTime)
+ case "Duration" => Ordering.by(_.duration)
+ case "Statement" => Ordering.by(_.executionInfo.statement)
+ case "State" => Ordering.by(_.executionInfo.state)
+ case "Detail" => Ordering.by(_.executionInfo.detail)
+ case "Job Tag" => Ordering.by(_.executionInfo.jobTag)
+ case "Spark Session Tags" => Ordering.by(_.sparkSessionTags.headOption)
+ case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
+ }
+ if (desc) {
+ ordering.reverse
+ } else {
+ ordering
+ }
+ }
+}
+
+private[ui] class SessionStatsTableDataSource(
+ info: Seq[SessionInfo],
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean)
+ extends PagedDataSource[SessionInfo](pageSize) {
+
+ // Sorting SessionInfo data
+ private val data = info.sorted(ordering(sortColumn, desc))
+
+ override def dataSize: Int = data.size
+
+ override def sliceData(from: Int, to: Int): Seq[SessionInfo] = data.slice(from, to)
+
+ /**
+ * Return Ordering according to sortColumn and desc.
+ */
+ private def ordering(sortColumn: String, desc: Boolean): Ordering[SessionInfo] = {
+ val ordering: Ordering[SessionInfo] = sortColumn match {
+ case "User" => Ordering.by(_.userId)
+ case "Session ID" => Ordering.by(_.sessionId)
+ case "Start Time" => Ordering.by(_.startTimestamp)
+ case "Finish Time" => Ordering.by(_.finishTimestamp)
+ case "Duration" => Ordering.by(_.totalTime)
+ case "Total Execute" => Ordering.by(_.totalExecution)
+ case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
+ }
+ if (desc) {
+ ordering.reverse
+ } else {
+ ordering
+ }
+ }
+}
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerSessionPage.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerSessionPage.scala
new file mode 100644
index 000000000000..fde6e8da8b63
--- /dev/null
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerSessionPage.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.ui._
+import org.apache.spark.ui.UIUtils._
+import org.apache.spark.util.Utils
+
+/** Page for Spark UI that contains information pertaining to a single Spark Connect session */
+private[ui] class SparkConnectServerSessionPage(parent: SparkConnectServerTab)
+ extends WebUIPage("session")
+ with Logging {
+
+ val store = parent.store
+ private val startTime = parent.startTime
+
+ /** Render the page */
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val sessionId = request.getParameter("id")
+ require(sessionId != null && sessionId.nonEmpty, "Missing id parameter")
+
+ val content = store.synchronized { // make sure all parts in this page are consistent
+ store
+ .getSession(sessionId)
+ .map { sessionStat =>
+ generateBasicStats() ++
+ ++
+
+ User
+ {sessionStat.userId}
+ ,
+ Session created at
+ {formatDate(sessionStat.startTimestamp)}
+ ,
+ Total run
+ {sessionStat.totalExecution}
+ Request(s)
+
+ {table.getOrElse("No statistics have been generated yet.")}
+
+
+ content
+ }
+}
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerTab.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerTab.scala
new file mode 100644
index 000000000000..4d1820a994c1
--- /dev/null
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/SparkConnectServerTab.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.ui
+
+import java.util.Date
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.ui.{SparkUI, SparkUITab}
+
+private[connect] class SparkConnectServerTab(
+ val store: SparkConnectServerAppStatusStore,
+ sparkUI: SparkUI)
+ extends SparkUITab(sparkUI, "connect")
+ with Logging {
+
+ override val name = "Connect"
+
+ val parent = sparkUI
+ val startTime =
+ try {
+ sparkUI.store.applicationInfo().attempts.head.startTime
+ } catch {
+ case _: NoSuchElementException => new Date(System.currentTimeMillis())
+ }
+
+ attachPage(new SparkConnectServerPage(this))
+ attachPage(new SparkConnectServerSessionPage(this))
+ parent.attachTab(this)
+ def detach(): Unit = {
+ parent.detachTab(this)
+ }
+}
+
+private[connect] object SparkConnectServerTab {
+ def getSparkUI(sparkContext: SparkContext): SparkUI = {
+ sparkContext.ui.getOrElse {
+ throw QueryExecutionErrors.parentSparkUIToAttachTabNotFoundError()
+ }
+ }
+}
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/ToolTips.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/ToolTips.scala
new file mode 100644
index 000000000000..9b51ace83c6c
--- /dev/null
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/ui/ToolTips.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.ui
+
+private[ui] object ToolTips {
+ val SPARK_CONNECT_SERVER_FINISH_TIME =
+ "Execution finish time, before fetching the results"
+
+ val SPARK_CONNECT_SERVER_CLOSE_TIME =
+ "Operation close time after fetching the results"
+
+ val SPARK_CONNECT_SERVER_EXECUTION =
+ "Difference between start time and finish time"
+
+ val SPARK_CONNECT_SERVER_DURATION =
+ "Difference between start time and close time"
+
+ val SPARK_CONNECT_SESSION_TOTAL_EXECUTE =
+ "Number of operations submitted in this session"
+
+ val SPARK_CONNECT_SESSION_DURATION =
+ "Elapsed time since session start, or until closed if the session was closed"
+
+}
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala
new file mode 100644
index 000000000000..9292e44f177b
--- /dev/null
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerListenerSuite.scala
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.ui
+
+import java.util.Properties
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SharedSparkContext, SparkConf, SparkContext, SparkEnv, SparkFunSuite}
+import org.apache.spark.internal.config.Status.{ASYNC_TRACKING_ENABLED, LIVE_ENTITY_UPDATE_PERIOD}
+import org.apache.spark.scheduler.SparkListenerJobStart
+import org.apache.spark.sql.connect.config.Connect.{CONNECT_UI_SESSION_LIMIT, CONNECT_UI_STATEMENT_LIMIT}
+import org.apache.spark.sql.connect.service._
+import org.apache.spark.status.ElementTrackingStore
+import org.apache.spark.util.kvstore.InMemoryStore
+
+class SparkConnectServerListenerSuite
+ extends SparkFunSuite
+ with BeforeAndAfter
+ with SharedSparkContext {
+
+ private var kvstore: ElementTrackingStore = _
+
+ after {
+ if (kvstore != null) {
+ kvstore.close()
+ kvstore = null
+ }
+ }
+
+ Seq(true, false).foreach { live =>
+ test(s"listener events should store successfully (live = $live)") {
+ val (statusStore: SparkConnectServerAppStatusStore, listener: SparkConnectServerListener) =
+ createAppStatusStore(live)
+
+ listener.onOtherEvent(
+ SparkListenerConnectSessionStarted("sessionId", "user", System.currentTimeMillis()))
+ listener.onOtherEvent(
+ SparkListenerConnectOperationStarted(
+ ExecuteJobTag("sessionId", "userId", "operationId"),
+ "operationId",
+ System.currentTimeMillis(),
+ "sessionId",
+ "userId",
+ "userName",
+ "dummy query",
+ None,
+ Set()))
+ listener.onOtherEvent(
+ SparkListenerConnectOperationAnalyzed(
+ ExecuteJobTag("sessionId", "userId", "operationId"),
+ "operationId",
+ System.currentTimeMillis()))
+ listener.onJobStart(
+ SparkListenerJobStart(0, System.currentTimeMillis(), Nil, createProperties))
+ listener.onOtherEvent(
+ SparkListenerConnectOperationFinished(
+ ExecuteJobTag("sessionId", "userId", "operationId"),
+ "sessionId",
+ System.currentTimeMillis()))
+ listener.onOtherEvent(
+ SparkListenerConnectOperationClosed(
+ ExecuteJobTag("sessionId", "userId", "operationId"),
+ "sessionId",
+ System.currentTimeMillis()))
+
+ if (live) {
+ assert(statusStore.getOnlineSessionNum === 1)
+ }
+
+ listener.onOtherEvent(
+ SparkListenerConnectSessionClosed("sessionId", "userId", System.currentTimeMillis()))
+
+ if (!live) {
+ // To update history store
+ kvstore.close(false)
+ }
+ assert(statusStore.getOnlineSessionNum === 0)
+ assert(statusStore.getExecutionList.size === 1)
+
+ val storeExecData = statusStore.getExecutionList.head
+
+ assert(storeExecData.jobTag === ExecuteJobTag("sessionId", "userId", "operationId"))
+ assert(storeExecData.sessionId === "sessionId")
+ assert(storeExecData.statement === "dummy query")
+ assert(storeExecData.jobId === Seq("0"))
+ assert(listener.noLiveData())
+ }
+ }
+
+ Seq(true, false).foreach { live =>
+ test(s"cleanup session if exceeds the threshold (live = $live)") {
+ val (statusStore: SparkConnectServerAppStatusStore, listener: SparkConnectServerListener) =
+ createAppStatusStore(live)
+ var time = 0
+ listener.onOtherEvent(
+ SparkListenerConnectSessionStarted("sessionId1", "user", System.currentTimeMillis()))
+ time += 1
+ listener.onOtherEvent(
+ SparkListenerConnectSessionStarted("sessionId2", "user", System.currentTimeMillis()))
+ time += 1
+ listener.onOtherEvent(SparkListenerConnectSessionClosed("sessionId1", "userId", time))
+ time += 1
+ listener.onOtherEvent(SparkListenerConnectSessionClosed("sessionId2", "userId", time))
+ listener.onOtherEvent(
+ SparkListenerConnectSessionStarted("sessionId3", "user", System.currentTimeMillis()))
+ time += 1
+ listener.onOtherEvent(SparkListenerConnectSessionClosed("sessionId3", "userId", time))
+
+ if (!live) {
+ kvstore.close(false)
+ }
+ assert(statusStore.getOnlineSessionNum === 0)
+ assert(statusStore.getSessionCount === 1)
+ assert(statusStore.getSession("sessionId1") === None)
+ assert(listener.noLiveData())
+ }
+ }
+
+ test("update execution info when jobstart event come after execution end event") {
+ val (statusStore: SparkConnectServerAppStatusStore, listener: SparkConnectServerListener) =
+ createAppStatusStore(true)
+
+ listener.onOtherEvent(
+ SparkListenerConnectSessionStarted("sessionId", "userId", System.currentTimeMillis()))
+ listener.onOtherEvent(
+ SparkListenerConnectOperationStarted(
+ ExecuteJobTag("sessionId", "userId", "operationId"),
+ "operationId",
+ System.currentTimeMillis(),
+ "sessionId",
+ "userId",
+ "userName",
+ "dummy query",
+ None,
+ Set()))
+ listener.onOtherEvent(
+ SparkListenerConnectOperationAnalyzed(
+ ExecuteJobTag("sessionId", "userId", "operationId"),
+ "operationId",
+ System.currentTimeMillis()))
+ listener.onOtherEvent(
+ SparkListenerConnectOperationFinished(
+ ExecuteJobTag("sessionId", "userId", "operationId"),
+ "operationId",
+ System.currentTimeMillis()))
+ listener.onOtherEvent(
+ SparkListenerConnectOperationClosed(
+ ExecuteJobTag("sessionId", "userId", "operationId"),
+ "operationId",
+ System.currentTimeMillis()))
+
+ listener.onJobStart(
+ SparkListenerJobStart(0, System.currentTimeMillis(), Nil, createProperties))
+ listener.onOtherEvent(
+ SparkListenerConnectSessionClosed("sessionId", "userId", System.currentTimeMillis()))
+ val exec = statusStore.getExecution(ExecuteJobTag("sessionId", "userId", "operationId"))
+ assert(exec.isDefined)
+ assert(exec.get.jobId === Seq("0"))
+ assert(listener.noLiveData())
+ }
+
+ test("SPARK-31387 - listener update methods should not throw exception with unknown input") {
+ val (statusStore: SparkConnectServerAppStatusStore, listener: SparkConnectServerListener) =
+ createAppStatusStore(true)
+
+ val unknownSession = "unknown_session"
+ val unknownJob = "unknown_job_tag"
+ listener.onOtherEvent(SparkListenerConnectSessionClosed(unknownSession, "userId", 0))
+ listener.onOtherEvent(
+ SparkListenerConnectOperationStarted(
+ ExecuteJobTag("sessionId", "userId", "operationId"),
+ "operationId",
+ System.currentTimeMillis(),
+ unknownSession,
+ "userId",
+ "userName",
+ "dummy query",
+ None,
+ Set()))
+ listener.onOtherEvent(
+ SparkListenerConnectOperationAnalyzed(
+ unknownJob,
+ "operationId",
+ System.currentTimeMillis()))
+ listener.onOtherEvent(SparkListenerConnectOperationCanceled(unknownJob, "userId", 0))
+ listener.onOtherEvent(
+ SparkListenerConnectOperationFailed(unknownJob, "operationId", 0, "msg"))
+ listener.onOtherEvent(SparkListenerConnectOperationFinished(unknownJob, "operationId", 0))
+ listener.onOtherEvent(SparkListenerConnectOperationClosed(unknownJob, "operationId", 0))
+ }
+
+ private def createProperties: Properties = {
+ val properties = new Properties()
+ properties.setProperty(
+ SparkContext.SPARK_JOB_TAGS,
+ ExecuteJobTag("sessionId", "userId", "operationId"))
+ properties
+ }
+
+ private def createAppStatusStore(live: Boolean) = {
+ val sparkConf = new SparkConf()
+ sparkConf
+ .set(ASYNC_TRACKING_ENABLED, false)
+ .set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
+ SparkEnv.get.conf
+ .set(CONNECT_UI_SESSION_LIMIT, 1)
+ .set(CONNECT_UI_STATEMENT_LIMIT, 10)
+ kvstore = new ElementTrackingStore(new InMemoryStore, sparkConf)
+ if (live) {
+ val listener = new SparkConnectServerListener(kvstore, sparkConf)
+ (new SparkConnectServerAppStatusStore(kvstore), listener)
+ } else {
+ (
+ new SparkConnectServerAppStatusStore(kvstore),
+ new SparkConnectServerListener(kvstore, sparkConf, false))
+ }
+ }
+}
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala
new file mode 100644
index 000000000000..99d0a14f1e8d
--- /dev/null
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ui/SparkConnectServerPageSuite.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.ui
+
+import java.util.{Calendar, Locale}
+import javax.servlet.http.HttpServletRequest
+
+import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite}
+import org.apache.spark.scheduler.SparkListenerJobStart
+import org.apache.spark.sql.connect.service._
+import org.apache.spark.status.ElementTrackingStore
+import org.apache.spark.util.kvstore.InMemoryStore
+
+class SparkConnectServerPageSuite
+ extends SparkFunSuite
+ with BeforeAndAfter
+ with SharedSparkContext {
+
+ private var kvstore: ElementTrackingStore = _
+
+ after {
+ if (kvstore != null) {
+ kvstore.close()
+ kvstore = null
+ }
+ }
+
+ /**
+ * Run a dummy session and return the store
+ */
+ private def getStatusStore: SparkConnectServerAppStatusStore = {
+ kvstore = new ElementTrackingStore(new InMemoryStore, new SparkConf())
+ // val server = mock(classOf[SparkConnectServer], RETURNS_SMART_NULLS)
+ val sparkConf = new SparkConf
+
+ val listener = new SparkConnectServerListener(kvstore, sparkConf)
+ val statusStore = new SparkConnectServerAppStatusStore(kvstore)
+
+ listener.onOtherEvent(
+ SparkListenerConnectSessionStarted("sessionId", "userId", System.currentTimeMillis()))
+ listener.onOtherEvent(
+ SparkListenerConnectOperationStarted(
+ "jobTag",
+ "operationId",
+ System.currentTimeMillis(),
+ "sessionId",
+ "userId",
+ "userName",
+ "dummy query",
+ None,
+ Set()))
+ listener.onOtherEvent(
+ SparkListenerConnectOperationAnalyzed("jobTag", "dummy plan", System.currentTimeMillis()))
+ listener.onOtherEvent(SparkListenerJobStart(0, System.currentTimeMillis(), Seq()))
+ listener.onOtherEvent(
+ SparkListenerConnectOperationFinished("jobTag", "operationId", System.currentTimeMillis()))
+ listener.onOtherEvent(
+ SparkListenerConnectOperationClosed("jobTag", "operationId", System.currentTimeMillis()))
+ listener.onOtherEvent(
+ SparkListenerConnectSessionClosed("sessionId", "userId", System.currentTimeMillis()))
+
+ statusStore
+ }
+
+ test("Spark Connect Server page should load successfully") {
+ val store = getStatusStore
+
+ val request = mock(classOf[HttpServletRequest])
+ val tab = mock(classOf[SparkConnectServerTab], RETURNS_SMART_NULLS)
+ when(tab.startTime).thenReturn(Calendar.getInstance().getTime)
+ when(tab.store).thenReturn(store)
+ when(tab.appName).thenReturn("testing")
+ when(tab.headerTabs).thenReturn(Seq.empty)
+ val page = new SparkConnectServerPage(tab)
+ val html = page.render(request).toString().toLowerCase(Locale.ROOT)
+
+ // session statistics and sql statistics tables should load successfully
+ assert(html.contains("session statistics (1)"))
+ assert(html.contains("request statistics (1)"))
+ assert(html.contains("dummy query"))
+
+ // Pagination support
+ assert(html.contains(""))
+
+ // Hiding table support
+ assert(
+ html.contains("class=\"collapse-aggregated-sessionstat" +
+ " collapse-table\" onclick=\"collapsetable"))
+ }
+
+ test("Spark Connect Server session page should load successfully") {
+ val store = getStatusStore
+
+ val request = mock(classOf[HttpServletRequest])
+ when(request.getParameter("id")).thenReturn("sessionId")
+ val tab = mock(classOf[SparkConnectServerTab], RETURNS_SMART_NULLS)
+ when(tab.startTime).thenReturn(Calendar.getInstance().getTime)
+ when(tab.store).thenReturn(store)
+ when(tab.appName).thenReturn("testing")
+ when(tab.headerTabs).thenReturn(Seq.empty)
+ val page = new SparkConnectServerSessionPage(tab)
+ val html = page.render(request).toString().toLowerCase(Locale.ROOT)
+
+ // session sql statistics table should load successfully
+ assert(html.contains("request statistics"))
+ assert(html.contains("userid"))
+ assert(html.contains("jobtag"))
+
+ // Pagination support
+ assert(html.contains(""))
+
+ // Hiding table support
+ assert(
+ html.contains("collapse-aggregated-sqlsessionstat collapse-table\"" +
+ " onclick=\"collapsetable"))
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f48cb32b3197..533ce28ab4d2 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -678,7 +678,7 @@ class SparkContext(config: SparkConf) extends Logging {
postApplicationStart()
// After application started, attach handlers to started server and start handler.
- _ui.foreach(_.attachAllHandler())
+ _ui.foreach(_.attachAllHandlers())
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index ac154b793856..685407c11208 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -67,15 +67,31 @@ private[spark] class SparkUI private (
createServletHandler("/", servlet, basePath)
}
+ private var readyToAttachHandlers = false
+
/**
* Attach all existing handlers to ServerInfo.
*/
- def attachAllHandler(): Unit = {
+ def attachAllHandlers(): Unit = {
+ // Attach all handlers that have been added already, but not yet attached.
serverInfo.foreach { server =>
server.removeHandler(initHandler)
handlers.foreach(server.addHandler(_, securityManager))
}
+ // Handlers attached after this can be directly started.
+ readyToAttachHandlers = true
}
+
+ /** Attaches a handler to this UI.
+ * Note: The handler will not be attached until readyToAttachHandlers is true,
+ * handlers added before that will be attached by attachAllHandlers */
+ override def attachHandler(handler: ServletContextHandler): Unit = synchronized {
+ handlers += handler
+ if (readyToAttachHandlers) {
+ serverInfo.foreach(_.addHandler(handler, securityManager))
+ }
+ }
+
/** Initialize all components of the server. */
def initialize(): Unit = {
val jobsTab = new JobsTab(this, store)
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 352c51baa8ca..e7d57a6e6def 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -442,7 +442,7 @@ class UISuite extends SparkFunSuite {
sparkUI.bind()
assert(TestUtils.httpResponseMessage(new URL(sparkUI.webUrl + "/jobs"))
=== "Spark is starting up. Please wait a while until it's ready.")
- sparkUI.attachAllHandler()
+ sparkUI.attachAllHandlers()
assert(TestUtils.httpResponseMessage(new URL(sparkUI.webUrl + "/jobs")).contains(sc.appName))
sparkUI.stop()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala
index a127fa59b743..76f64dcb6445 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/StreamingQueryHistoryServerPlugin.scala
@@ -39,5 +39,5 @@ class StreamingQueryHistoryServerPlugin extends AppHistoryServerPlugin {
}
}
- override def displayOrder: Int = 1
+ override def displayOrder: Int = 2
}