diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
index 65fa38387b9e..2fc0259c39d0 100644
--- a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.Utils
*
* @param pageSize the number of rows in a page
*/
-private[ui] abstract class PagedDataSource[T](val pageSize: Int) {
+private[spark] abstract class PagedDataSource[T](val pageSize: Int) {
if (pageSize <= 0) {
throw new IllegalArgumentException("Page size must be positive")
@@ -72,7 +72,7 @@ private[ui] case class PageData[T](totalPage: Int, data: Seq[T])
/**
* A paged table that will generate a HTML table for a specified page and also the page navigation.
*/
-private[ui] trait PagedTable[T] {
+private[spark] trait PagedTable[T] {
def tableId: String
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
index 1b2d8a821b36..1a25cd2a49e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
@@ -17,16 +17,17 @@
package org.apache.spark.sql.execution.ui
+import java.net.URLEncoder
import javax.servlet.http.HttpServletRequest
+import scala.collection.JavaConverters._
import scala.collection.mutable
-import scala.xml.{Node, NodeSeq}
-
-import org.apache.commons.lang3.StringEscapeUtils
+import scala.xml.{Node, NodeSeq, Unparsed}
import org.apache.spark.JobExecutionStatus
import org.apache.spark.internal.Logging
-import org.apache.spark.ui.{UIUtils, WebUIPage}
+import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage}
+import org.apache.spark.util.Utils
private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with Logging {
@@ -55,8 +56,8 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L
val _content = mutable.ListBuffer[Node]()
if (running.nonEmpty) {
- val runningPageTable = new RunningExecutionTable(
- parent, currentTime, running.sortBy(_.submissionTime).reverse).toNodeSeq(request)
+ val runningPageTable =
+ executionsTable(request, "running", running, currentTime, true, true, true)
_content ++=
- Running Queries:
+ Running Queries:
{running.size}
}
@@ -129,7 +130,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L
{
if (completed.nonEmpty) {
- Completed Queries:
+ Completed Queries:
{completed.size}
}
@@ -137,50 +138,232 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L
{
if (failed.nonEmpty) {
- Failed Queries:
+ Failed Queries:
{failed.size}
}
}
+
UIUtils.headerSparkPage(request, "SQL", summary ++ content, parent, Some(5000))
}
+
+ private def executionsTable(
+ request: HttpServletRequest,
+ executionTag: String,
+ executionData: Seq[SQLExecutionUIData],
+ currentTime: Long,
+ showRunningJobs: Boolean,
+ showSucceededJobs: Boolean,
+ showFailedJobs: Boolean): Seq[Node] = {
+
+ // stripXSS is called to remove suspicious characters used in XSS attacks
+ val allParameters = request.getParameterMap.asScala.toMap.map { case (k, v) =>
+ UIUtils.stripXSS(k) -> v.map(UIUtils.stripXSS).toSeq
+ }
+ val parameterOtherTable = allParameters.filterNot(_._1.startsWith(executionTag))
+ .map(para => para._1 + "=" + para._2(0))
+
+ val parameterExecutionPage = UIUtils.stripXSS(request.getParameter(s"$executionTag.page"))
+ val parameterExecutionSortColumn = UIUtils.stripXSS(request
+ .getParameter(s"$executionTag.sort"))
+ val parameterExecutionSortDesc = UIUtils.stripXSS(request.getParameter(s"$executionTag.desc"))
+ val parameterExecutionPageSize = UIUtils.stripXSS(request
+ .getParameter(s"$executionTag.pageSize"))
+ val parameterExecutionPrevPageSize = UIUtils.stripXSS(request
+ .getParameter(s"$executionTag.prevPageSize"))
+
+ val executionPage = Option(parameterExecutionPage).map(_.toInt).getOrElse(1)
+ val executionSortColumn = Option(parameterExecutionSortColumn).map { sortColumn =>
+ UIUtils.decodeURLParameter(sortColumn)
+ }.getOrElse("ID")
+ val executionSortDesc = Option(parameterExecutionSortDesc).map(_.toBoolean).getOrElse(
+ // New executions should be shown above old executions by default.
+ executionSortColumn == "ID"
+ )
+ val executionPageSize = Option(parameterExecutionPageSize).map(_.toInt).getOrElse(100)
+ val executionPrevPageSize = Option(parameterExecutionPrevPageSize).map(_.toInt)
+ .getOrElse(executionPageSize)
+
+ // If the user has changed to a larger page size, then go to page 1 in order to avoid
+ // IndexOutOfBoundsException.
+ val page: Int = if (executionPageSize <= executionPrevPageSize) {
+ executionPage
+ } else {
+ 1
+ }
+ val tableHeaderId = executionTag // "running", "completed" or "failed"
+
+ try {
+ new ExecutionPagedTable(
+ request,
+ parent,
+ executionData,
+ tableHeaderId,
+ executionTag,
+ UIUtils.prependBaseUri(request, parent.basePath),
+ "SQL", // subPath
+ parameterOtherTable,
+ currentTime,
+ pageSize = executionPageSize,
+ sortColumn = executionSortColumn,
+ desc = executionSortDesc,
+ showRunningJobs,
+ showSucceededJobs,
+ showFailedJobs).table(page)
+ } catch {
+ case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
+
+
Error while rendering execution table:
+
+ {Utils.exceptionString(e)}
+
+
+ }
+ }
}
-private[ui] abstract class ExecutionTable(
+private[ui] class ExecutionPagedTable(
+ request: HttpServletRequest,
parent: SQLTab,
- tableId: String,
+ data: Seq[SQLExecutionUIData],
+ tableHeaderId: String,
+ executionTag: String,
+ basePath: String,
+ subPath: String,
+ parameterOtherTable: Iterable[String],
currentTime: Long,
- executionUIDatas: Seq[SQLExecutionUIData],
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean,
showRunningJobs: Boolean,
showSucceededJobs: Boolean,
- showFailedJobs: Boolean) {
+ showFailedJobs: Boolean) extends PagedTable[ExecutionTableRowData] {
- protected def baseHeader: Seq[String] = Seq(
- "ID",
- "Description",
- "Submitted",
- "Duration")
+ override val dataSource = new ExecutionDataSource(
+ request,
+ parent,
+ data,
+ basePath,
+ currentTime,
+ pageSize,
+ sortColumn,
+ desc,
+ showRunningJobs,
+ showSucceededJobs,
+ showFailedJobs)
+
+ private val parameterPath = s"$basePath/$subPath/?${parameterOtherTable.mkString("&")}"
+
+ override def tableId: String = s"$executionTag-table"
+
+ override def tableCssClass: String =
+ "table table-bordered table-condensed table-striped " +
+ "table-head-clickable table-cell-width-limited"
+
+ override def prevPageSizeFormField: String = s"$executionTag.prevPageSize"
+
+ override def pageLink(page: Int): String = {
+ val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+ parameterPath +
+ s"&$pageNumberFormField=$page" +
+ s"&$executionTag.sort=$encodedSortColumn" +
+ s"&$executionTag.desc=$desc" +
+ s"&$pageSizeFormField=$pageSize" +
+ s"#$tableHeaderId"
+ }
- protected def header: Seq[String]
+ override def pageSizeFormField: String = s"$executionTag.pageSize"
- protected def row(
- request: HttpServletRequest,
- currentTime: Long,
- executionUIData: SQLExecutionUIData): Seq[Node] = {
- val submissionTime = executionUIData.submissionTime
- val duration = executionUIData.completionTime.map(_.getTime()).getOrElse(currentTime) -
- submissionTime
+ override def pageNumberFormField: String = s"$executionTag.page"
+
+ override def goButtonFormPath: String = {
+ val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+ s"$parameterPath&$executionTag.sort=$encodedSortColumn&$executionTag.desc=$desc#$tableHeaderId"
+ }
+
+ override def headers: Seq[Node] = {
+ // Information for each header: title, sortable
+ val executionHeadersAndCssClasses: Seq[(String, Boolean)] =
+ Seq(
+ ("ID", true),
+ ("Description", true),
+ ("Submitted", true),
+ ("Duration", true)) ++ {
+ if (showRunningJobs && showSucceededJobs && showFailedJobs) {
+ Seq(
+ ("Running Job IDs", true),
+ ("Succeeded Job IDs", true),
+ ("Failed Job IDs", true))
+ } else if (showSucceededJobs && showFailedJobs) {
+ Seq(
+ ("Succeeded Job IDs", true),
+ ("Failed Job IDs", true))
+ } else {
+ Seq(("Job IDs", true))
+ }
+ }
- def jobLinks(status: JobExecutionStatus): Seq[Node] = {
- executionUIData.jobs.flatMap { case (jobId, jobStatus) =>
- if (jobStatus == status) {
- [{jobId.toString}]
+ val sortableColumnHeaders = executionHeadersAndCssClasses.filter {
+ case (_, sortable) => sortable
+ }.map { case (title, _) => title }
+
+ require(sortableColumnHeaders.contains(sortColumn), s"Unknown column: $sortColumn")
+
+ val headerRow: Seq[Node] = {
+ executionHeadersAndCssClasses.map { case (header, sortable) =>
+ if (header == sortColumn) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$executionTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
+ s"&$executionTag.desc=${!desc}" +
+ s"&$executionTag.pageSize=$pageSize" +
+ s"#$tableHeaderId")
+ val arrow = if (desc) "▾" else "▴" // UP or DOWN
+
+
+
+ {header}
+ {Unparsed(arrow)}
+
+
+ |
} else {
- None
+ if (sortable) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$executionTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
+ s"&$executionTag.pageSize=$pageSize" +
+ s"#$tableHeaderId")
+
+
+
+ {header}
+
+ |
+ } else {
+
+ {header}
+ |
+ }
}
- }.toSeq
+ }
+ }
+
+ {headerRow}
+
+ }
+
+ override def row(executionTableRow: ExecutionTableRowData): Seq[Node] = {
+ val executionUIData = executionTableRow.executionUIData
+ val submissionTime = executionUIData.submissionTime
+ val duration = executionTableRow.duration
+
+ def jobLinks(jobData: Seq[Int]): Seq[Node] = {
+ jobData.map { jobId =>
+ [{jobId.toString}]
+ }
}
@@ -188,7 +371,7 @@ private[ui] abstract class ExecutionTable(
{executionUIData.executionId.toString}
|
- {descriptionCell(request, executionUIData)}
+ {descriptionCell(executionUIData)}
|
{UIUtils.formatDate(submissionTime)}
@@ -198,27 +381,26 @@ private[ui] abstract class ExecutionTable(
|
{if (showRunningJobs) {
- {jobLinks(JobExecutionStatus.RUNNING)}
+ {jobLinks(executionTableRow.runningJobData)}
|
}}
{if (showSucceededJobs) {
- {jobLinks(JobExecutionStatus.SUCCEEDED)}
+ {jobLinks(executionTableRow.completedJobData)}
|
}}
{if (showFailedJobs) {
- {jobLinks(JobExecutionStatus.FAILED)}
+ {jobLinks(executionTableRow.failedJobData)}
|
}}
}
- private def descriptionCell(
- request: HttpServletRequest,
- execution: SQLExecutionUIData): Seq[Node] = {
+ private def descriptionCell(execution: SQLExecutionUIData): Seq[Node] = {
val details = if (execution.details != null && execution.details.nonEmpty) {
-
+
+details
++
@@ -229,73 +411,107 @@ private[ui] abstract class ExecutionTable(
}
val desc = if (execution.description != null && execution.description.nonEmpty) {
-
{execution.description}
+
{execution.description}
} else {
-
{execution.executionId}
+
{execution.executionId}
}
-
{desc} {details}
- }
-
- def toNodeSeq(request: HttpServletRequest): Seq[Node] = {
- UIUtils.listingTable[SQLExecutionUIData](
- header, row(request, currentTime, _), executionUIDatas, id = Some(tableId))
+
{desc}{details}
}
private def jobURL(request: HttpServletRequest, jobId: Long): String =
"%s/jobs/job/?id=%s".format(UIUtils.prependBaseUri(request, parent.basePath), jobId)
- private def executionURL(request: HttpServletRequest, executionID: Long): String =
+ private def executionURL(executionID: Long): String =
s"${UIUtils.prependBaseUri(
request, parent.basePath)}/${parent.prefix}/execution/?id=$executionID"
}
-private[ui] class RunningExecutionTable(
- parent: SQLTab,
- currentTime: Long,
- executionUIDatas: Seq[SQLExecutionUIData])
- extends ExecutionTable(
- parent,
- "running-execution-table",
- currentTime,
- executionUIDatas,
- showRunningJobs = true,
- showSucceededJobs = true,
- showFailedJobs = true) {
- override protected def header: Seq[String] =
- baseHeader ++ Seq("Running Job IDs", "Succeeded Job IDs", "Failed Job IDs")
-}
+private[ui] class ExecutionTableRowData(
+ val submissionTime: Long,
+ val duration: Long,
+ val executionUIData: SQLExecutionUIData,
+ val runningJobData: Seq[Int],
+ val completedJobData: Seq[Int],
+ val failedJobData: Seq[Int])
+
-private[ui] class CompletedExecutionTable(
+private[ui] class ExecutionDataSource(
+ request: HttpServletRequest,
parent: SQLTab,
+ executionData: Seq[SQLExecutionUIData],
+ basePath: String,
currentTime: Long,
- executionUIDatas: Seq[SQLExecutionUIData])
- extends ExecutionTable(
- parent,
- "completed-execution-table",
- currentTime,
- executionUIDatas,
- showRunningJobs = false,
- showSucceededJobs = true,
- showFailedJobs = false) {
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean,
+ showRunningJobs: Boolean,
+ showSucceededJobs: Boolean,
+ showFailedJobs: Boolean) extends PagedDataSource[ExecutionTableRowData](pageSize) {
- override protected def header: Seq[String] = baseHeader ++ Seq("Job IDs")
-}
+ // Convert ExecutionData to ExecutionTableRowData which contains the final contents to show
+ // in the table so that we can avoid creating duplicate contents during sorting the data
+ private val data = executionData.map(executionRow).sorted(ordering(sortColumn, desc))
-private[ui] class FailedExecutionTable(
- parent: SQLTab,
- currentTime: Long,
- executionUIDatas: Seq[SQLExecutionUIData])
- extends ExecutionTable(
- parent,
- "failed-execution-table",
- currentTime,
- executionUIDatas,
- showRunningJobs = false,
- showSucceededJobs = true,
- showFailedJobs = true) {
+ private var _sliceExecutionIds: Set[Int] = _
+
+ override def dataSize: Int = data.size
+
+ override def sliceData(from: Int, to: Int): Seq[ExecutionTableRowData] = {
+ val r = data.slice(from, to)
+ _sliceExecutionIds = r.map(_.executionUIData.executionId.toInt).toSet
+ r
+ }
- override protected def header: Seq[String] =
- baseHeader ++ Seq("Succeeded Job IDs", "Failed Job IDs")
+ private def executionRow(executionUIData: SQLExecutionUIData): ExecutionTableRowData = {
+ val submissionTime = executionUIData.submissionTime
+ val duration = executionUIData.completionTime.map(_.getTime())
+ .getOrElse(currentTime) - submissionTime
+
+ val runningJobData = if (showRunningJobs) {
+ executionUIData.jobs.filter {
+ case (_, jobStatus) => jobStatus == JobExecutionStatus.RUNNING
+ }.map { case (jobId, _) => jobId }.toSeq.sorted
+ } else Seq.empty
+
+ val completedJobData = if (showSucceededJobs) {
+ executionUIData.jobs.filter {
+ case (_, jobStatus) => jobStatus == JobExecutionStatus.SUCCEEDED
+ }.map { case (jobId, _) => jobId }.toSeq.sorted
+ } else Seq.empty
+
+ val failedJobData = if (showFailedJobs) {
+ executionUIData.jobs.filter {
+ case (_, jobStatus) => jobStatus == JobExecutionStatus.FAILED
+ }.map { case (jobId, _) => jobId }.toSeq.sorted
+ } else Seq.empty
+
+ new ExecutionTableRowData(
+ submissionTime,
+ duration,
+ executionUIData,
+ runningJobData,
+ completedJobData,
+ failedJobData)
+ }
+
+ /** Return Ordering according to sortColumn and desc. */
+ private def ordering(sortColumn: String, desc: Boolean): Ordering[ExecutionTableRowData] = {
+ val ordering: Ordering[ExecutionTableRowData] = sortColumn match {
+ case "ID" => Ordering.by(_.executionUIData.executionId)
+ case "Description" => Ordering.by(_.executionUIData.description)
+ case "Submitted" => Ordering.by(_.executionUIData.submissionTime)
+ case "Duration" => Ordering.by(_.duration)
+ case "Job IDs" | "Succeeded Job IDs" => Ordering by (_.completedJobData.headOption)
+ case "Running Job IDs" => Ordering.by(_.runningJobData.headOption)
+ case "Failed Job IDs" => Ordering.by(_.failedJobData.headOption)
+ case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
+ }
+ if (desc) {
+ ordering.reverse
+ } else {
+ ordering
+ }
+ }
}