diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index d25c29113d6d..22784c8c2eff 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -21,12 +21,27 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.ui.{WebUIPage, UIUtils} +import org.apache.spark.ui.{UITableBuilder, UITable, WebUIPage, UIUtils} private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { private val pageSize = 20 + val appTable: UITable[ApplicationHistoryInfo] = { + val t = new UITableBuilder[ApplicationHistoryInfo]() + t.col("App ID") (identity) withMarkup { info => + val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}" + {info.id} + } + t.col("App Name") { _.name } + t.epochDateCol("Started") { _.startTime } + t.epochDateCol("Completed") { _.endTime } + t.durationCol("Duration") { info => info.endTime - info.startTime } + t.col("Spark User") { _.sparkUser } + t.epochDateCol("Last Updated") { _.lastUpdated } + t.build() + } + def render(request: HttpServletRequest): Seq[Node] = { val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt val requestedFirst = (requestedPage - 1) * pageSize @@ -39,7 +54,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { val last = Math.min(actualFirst + pageSize, allApps.size) - 1 val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0) - val appTable = UIUtils.listingTable(appHeader, appRow, apps) + val appTable = this.appTable.render(apps) val providerConfig = parent.getProviderConfig() val content =
@@ -65,30 +80,4 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
UIUtils.basicSparkPage(content, "History Server") } - - private val appHeader = Seq( - "App ID", - "App Name", - "Started", - "Completed", - "Duration", - "Spark User", - "Last Updated") - - private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { - val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}" - val startTime = UIUtils.formatDate(info.startTime) - val endTime = UIUtils.formatDate(info.endTime) - val duration = UIUtils.formatDuration(info.endTime - info.startTime) - val lastUpdated = UIUtils.formatDate(info.lastUpdated) - - {info.id} - {info.name} - {startTime} - {endTime} - {duration} - {info.sparkUser} - {lastUpdated} - - } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 4588c130ef43..2e6de43c53b5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -28,7 +28,7 @@ import org.json4s.JValue import org.apache.spark.deploy.{ExecutorState, JsonProtocol} import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.ExecutorInfo -import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.spark.ui.{UITable, UITableBuilder, UIUtils, WebUIPage} import org.apache.spark.util.Utils private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") { @@ -47,6 +47,24 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app JsonProtocol.writeApplicationInfo(app) } + private val executorsTable: UITable[ExecutorInfo] = { + val t = new UITableBuilder[ExecutorInfo]() + t.col("ExecutorID") { _.id.toString } + t.col("Worker") (identity) withMarkup { executor => + {executor.worker.id} + } + t.col("Cores") { _.cores } + t.sizeCol("Memory") { _.memory } + t.col("State") { _.state.toString } + t.col("Logs") (identity) withMarkup { executor => + stdout + stderr + } isUnsortable() + t.build() + } + /** Executor details for a particular application */ def render(request: HttpServletRequest): Seq[Node] = { val appId = request.getParameter("appId") @@ -56,15 +74,14 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app state.completedApps.find(_.id == appId).getOrElse(null) }) - val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs") val allExecutors = (app.executors.values ++ app.removedExecutors).toSet.toSeq // This includes executors that are either still running or have exited cleanly val executors = allExecutors.filter { exec => !ExecutorState.isFinished(exec.state) || exec.state == ExecutorState.EXITED } val removedExecutors = allExecutors.diff(executors) - val executorsTable = UIUtils.listingTable(executorHeaders, executorRow, executors) - val removedExecutorsTable = UIUtils.listingTable(executorHeaders, executorRow, removedExecutors) + val executorsTable = this.executorsTable.render(executors) + val removedExecutorsTable = this.executorsTable.render(removedExecutors) val content =
@@ -108,22 +125,4 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app
; UIUtils.basicSparkPage(content, "Application: " + app.desc.name) } - - private def executorRow(executor: ExecutorInfo): Seq[Node] = { - - {executor.id} - - {executor.worker.id} - - {executor.cores} - {executor.memory} - {executor.state} - - stdout - stderr - - - } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index 7ca3b08a2872..a1e1ed075289 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -20,15 +20,15 @@ package org.apache.spark.deploy.master.ui import javax.servlet.http.HttpServletRequest import scala.concurrent.Await -import scala.xml.Node +import scala.xml.{Text, Node} import akka.pattern.ask import org.json4s.JValue import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} -import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} -import org.apache.spark.ui.{WebUIPage, UIUtils} +import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo, DriverInfo} +import org.apache.spark.ui.{UITable, UITableBuilder, WebUIPage, UIUtils} import org.apache.spark.util.Utils private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { @@ -41,32 +41,69 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { JsonProtocol.writeMasterState(state) } + private val workerTable: UITable[WorkerInfo] = { + val t = new UITableBuilder[WorkerInfo]() + t.col("ID") (identity) withMarkup { worker => + {worker.id} + } + t.col("Address") { worker => s"${worker.host}:${worker.port}"} + t.col("State") { _.state.toString } + t.col("Cores") { _.coresUsed } formatWith { c: Int => s"$c Used" } + t.col("Memory") (identity) sortBy { worker => + s"${worker.memory}:${worker.memoryUsed}" + } withMarkup { worker => + Text(Utils.megabytesToString(worker.memory)) ++ + Text(Utils.megabytesToString(worker.memoryUsed)) + } + t.build() + } + + private val appTable: UITable[ApplicationInfo] = { + val t = new UITableBuilder[ApplicationInfo]() + t.col("ID") (_.id) withMarkup { id => + {id} + } + t.col("Name") { _.id } + t.col("Cores") { _.coresGranted } + t.sizeCol("Memory per Node") { _.desc.memoryPerSlave } + t.dateCol("Submitted Time") { _.submitDate } + t.col("User") { _.desc.user } + t.col("State") { _.state.toString } + t.durationCol("Duration") { _.duration } + t.build() + } + + private val driverTable: UITable[DriverInfo] = { + val t = new UITableBuilder[DriverInfo]() + t.col("ID") { _.id } + t.dateCol("Submitted Time") { _.submitDate } + t.col("Worker") (identity) withMarkup { driver => + driver.worker.map(w => {w.id.toString}).getOrElse(Text("None")) + } + t.col("State") { _.state.toString } + t.col("Cores") { _.desc.cores } + t.sizeCol("Memory") { _.desc.mem.toLong } + t.col("Main Class") { _.desc.command.arguments(1) } + t.build() + } + /** Index view listing applications and executors */ def render(request: HttpServletRequest): Seq[Node] = { val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] val state = Await.result(stateFuture, timeout) - val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory") - val workers = state.workers.sortBy(_.id) - val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers) + val allWorkersTable = workerTable.render(state.workers.sortBy(_.id)) - val appHeaders = Seq("ID", "Name", "Cores", "Memory per Node", "Submitted Time", "User", - "State", "Duration") - val activeApps = state.activeApps.sortBy(_.startTime).reverse - val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps) - val completedApps = state.completedApps.sortBy(_.endTime).reverse - val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps) + val activeAppsTable = appTable.render(state.activeApps.sortBy(_.startTime).reverse) + val completedAppsTable = appTable.render(state.completedApps.sortBy(_.endTime).reverse) - val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Cores", "Memory", - "Main Class") - val activeDrivers = state.activeDrivers.sortBy(_.startTime).reverse - val activeDriversTable = UIUtils.listingTable(driverHeaders, driverRow, activeDrivers) - val completedDrivers = state.completedDrivers.sortBy(_.startTime).reverse - val completedDriversTable = UIUtils.listingTable(driverHeaders, driverRow, completedDrivers) + val activeDriversTable = driverTable.render(state.activeDrivers.sortBy(_.startTime).reverse) + val completedDriversTable = + driverTable.render(state.completedDrivers.sortBy(_.startTime).reverse) // For now we only show driver information if the user has submitted drivers to the cluster. // This is until we integrate the notion of drivers and applications in the UI. - def hasDrivers = activeDrivers.length > 0 || completedDrivers.length > 0 + def hasDrivers = state.activeDrivers.length > 0 || state.completedDrivers.length > 0 val content =
@@ -93,7 +130,7 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {

Workers

- {workerTable} + {allWorkersTable}
@@ -138,57 +175,4 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { UIUtils.basicSparkPage(content, "Spark Master at " + state.uri) } - - private def workerRow(worker: WorkerInfo): Seq[Node] = { - - - {worker.id} - - {worker.host}:{worker.port} - {worker.state} - {worker.cores} ({worker.coresUsed} Used) - - {Utils.megabytesToString(worker.memory)} - ({Utils.megabytesToString(worker.memoryUsed)} Used) - - - } - - private def appRow(app: ApplicationInfo): Seq[Node] = { - - - {app.id} - - - {app.desc.name} - - - {app.coresGranted} - - - {Utils.megabytesToString(app.desc.memoryPerSlave)} - - {UIUtils.formatDate(app.submitDate)} - {app.desc.user} - {app.state.toString} - {UIUtils.formatDuration(app.duration)} - - } - - private def driverRow(driver: DriverInfo): Seq[Node] = { - - {driver.id} - {driver.submitDate} - {driver.worker.map(w => {w.id.toString}).getOrElse("None")} - - {driver.state} - - {driver.desc.cores} - - - {Utils.megabytesToString(driver.desc.mem.toLong)} - - {driver.desc.command.arguments(1)} - - } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala index 327b90503280..b6e0e38bd041 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala @@ -28,7 +28,7 @@ import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse} import org.apache.spark.deploy.master.DriverState import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} -import org.apache.spark.ui.{WebUIPage, UIUtils} +import org.apache.spark.ui.{UITable, UITableBuilder, WebUIPage, UIUtils} import org.apache.spark.util.Utils private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { @@ -42,23 +42,56 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { JsonProtocol.writeWorkerState(workerState) } + private val executorTable: UITable[ExecutorRunner] = { + val t = new UITableBuilder[ExecutorRunner]() + t.col("Executor ID") { _.execId } + t.col("Cores") { _.cores } + t.col("State") { _.state.toString } + t.sizeCol("Memory") { _.memory } + t.col("Job Details") (identity) withMarkup { executor => + + } isUnsortable() + t.col("Logs") (identity) withMarkup { executor => + stdout + stderr + } isUnsortable() + t.build() + } + + private val driverTable: UITable[DriverRunner] = { + val t = new UITableBuilder[DriverRunner]() + t.col("Driver ID") { _.driverId } + t.col("Main Class") { _.driverDesc.command.arguments(1) } + t.col("State") { _.finalState.getOrElse(DriverState.RUNNING).toString } + t.col("Cores") { _.driverDesc.cores } + t.sizeCol("Memory") { _.driverDesc.mem } + t.col("Logs") (identity) withMarkup { driver => + stdout + stderr + } isUnsortable() + t.col("Notes") { _.finalException.getOrElse("").toString } + t.build() + } + def render(request: HttpServletRequest): Seq[Node] = { val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse] val workerState = Await.result(stateFuture, timeout) - val executorHeaders = Seq("ExecutorID", "Cores", "State", "Memory", "Job Details", "Logs") val runningExecutors = workerState.executors - val runningExecutorTable = - UIUtils.listingTable(executorHeaders, executorRow, runningExecutors) + val runningExecutorTable = executorTable.render(runningExecutors) val finishedExecutors = workerState.finishedExecutors - val finishedExecutorTable = - UIUtils.listingTable(executorHeaders, executorRow, finishedExecutors) + val finishedExecutorTable = executorTable.render(finishedExecutors) - val driverHeaders = Seq("DriverID", "Main Class", "State", "Cores", "Memory", "Logs", "Notes") val runningDrivers = workerState.drivers.sortBy(_.driverId).reverse - val runningDriverTable = UIUtils.listingTable(driverHeaders, driverRow, runningDrivers) + val runningDriverTable = driverTable.render(runningDrivers) val finishedDrivers = workerState.finishedDrivers.sortBy(_.driverId).reverse - val finishedDriverTable = UIUtils.listingTable(driverHeaders, driverRow, finishedDrivers) + val finishedDriverTable = driverTable.render(finishedDrivers) // For now we only show driver information if the user has submitted drivers to the cluster. // This is until we integrate the notion of drivers and applications in the UI. @@ -105,50 +138,4 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { UIUtils.basicSparkPage(content, "Spark Worker at %s:%s".format( workerState.host, workerState.port)) } - - def executorRow(executor: ExecutorRunner): Seq[Node] = { - - {executor.execId} - {executor.cores} - {executor.state} - - {Utils.megabytesToString(executor.memory)} - - - - - - stdout - stderr - - - - } - - def driverRow(driver: DriverRunner): Seq[Node] = { - - {driver.driverId} - {driver.driverDesc.command.arguments(1)} - {driver.finalState.getOrElse(DriverState.RUNNING)} - - {driver.driverDesc.cores.toString} - - - {Utils.megabytesToString(driver.driverDesc.mem)} - - - stdout - stderr - - - {driver.finalException.getOrElse("")} - - - } } diff --git a/core/src/main/scala/org/apache/spark/ui/UITables.scala b/core/src/main/scala/org/apache/spark/ui/UITables.scala new file mode 100644 index 000000000000..7be5d593691f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/UITables.scala @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui + +import java.util.Date + +import scala.collection.mutable +import scala.xml.{Node, Text} + +import org.apache.spark.util.Utils + + +/** + * Describes how to render a column of values in a web UI table. + * + * @param name the name / title of this column + * @param fieldExtractor function for extracting this field's value from the table's row data type + * @tparam T the table's row data type + * @tparam V this column's value type + */ +case class UITableColumn[T, V]( + name: String, + fieldExtractor: T => V) { + + private var sortable: Boolean = true + private var sortKey: Option[V => String] = None + private var formatter: V => String = x => x.toString + private var cellContentsRenderer: V => Seq[Node] = (data: V) => Text(formatter(data)) + + /** + * Optional method for sorting this table by a key other than the cell's text contents. + */ + def sortBy(keyFunc: V => String): UITableColumn[T, V] = { + sortKey = Some(keyFunc) + this + } + + /** + * Override the default cell formatting of the extracted value. By default, values are rendered + * by calling toString(). + */ + def formatWith(formatFunc: V => String): UITableColumn[T, V] = { + formatter = formatFunc + this + } + + /** + * Make this column unsortable. This is useful for columns that display UI elements, such + * as buttons to link to logs + */ + def isUnsortable(): UITableColumn[T, V] = { + sortable = false + this + } + + /** + * Customize the markup used to render this table cell. The markup should only describe how to + * render the contents of the TD tag, not the TD tag itself. This overrides `formatWith`. + */ + def withMarkup(markupFunc: V => Seq[Node]): UITableColumn[T, V] = { + cellContentsRenderer = markupFunc + this + } + + /** Render the TD tag for this row */ + def _renderCell(row: T): Seq[Node] = { + val data = fieldExtractor(row) + val cellContents = cellContentsRenderer(data) + val cls = if (sortable) None else Some(Text("sorttable_nosort")) + Text(k(data)))} class={cls}> + {cellContents} + + } +} + +/** + * Describes how to render a table to display rows of type `T`. + * @param cols a sequence of UITableColumns that describe how each column should be rendered + * @param fixedWidth if true, all columns of this table will be displayed with the same width + * @tparam T the row data type + */ +private[spark] class UITable[T] (cols: Seq[UITableColumn[T, _]], fixedWidth: Boolean) { + + private val tableClass = if (fixedWidth) { + UIUtils.TABLE_CLASS + " table-fixed" + } else { + UIUtils.TABLE_CLASS + } + + private val colWidthAttr = if (fixedWidth) Some(Text((100.toDouble / cols.size) + "%")) else None + + private val headerRow: Seq[Node] = { + val headers = cols.map(_.name) + // if none of the headers have "\n" in them + if (headers.forall(!_.contains("\n"))) { + // represent header as simple text + headers.map(h => {h}) + } else { + // represent header text as list while respecting "\n" + headers.map { case h => + + + + } + } + } + + private def renderRow(row: T): Seq[Node] = { + val tds = cols.map(_._renderCell(row)) + { tds } + } + + /** Render the table with the given data */ + def render(data: Iterable[T]): Seq[Node] = { + val rows = data.map(renderRow) + + {headerRow} + + {rows} + +
+ } +} + +/** + * Builder for constructing web UI tables. This builder offers several advantages over constructing + * tables by hand using raw XML: + * + * - All of the table's data and formatting logic can live in one place; the table headers and + * rows aren't described in separate code. This prevents several common errors, like changing + * the ordering of two column headers but forgetting to re-order the corresponding TD tags. + * + * - No repetition of code for type-specific display rules: common column types like "memory", + * "duration", and "time" have convenience methods that implement the right formatting logic. + * + * - Details of our specific markup are generally abstracted away. For example, the markup for + * setting a custom sort key on a column now lives in one place, rather than being repeated + * in each table. + * + * The recommended way of using this class: + * + * - Create a new builder that is parametrized by the type (`T`) of data that you want to render. + * In many cases, there may be some record type like `WorkerInfo` that holds all of the + * information needed to render a particular row. If the data for each table row comes from + * several objects, you can combine those objects into a tuple or case-class. + * + * - Use the `col` methods to add columns to this builder. The final argument of each `col` method + * is a function that extracts the column's field from a row object of type `T`. Columns are + * displayed in the order that they are added to the builder. For most columns, you can write + * code like + * + * builder.col("Id") { _.id } + * builder.sizeCol("Memory" { _.memory } + * + * Columns have additional options, such as controlling their sort keys; see the individual + * methods' documentation for more details. + * + * - Call `build()` to construct an immutable object which can be used to render tables. + * + * There are many other features, including support for arbitrary markup in custom column types; + * see the actual uses in the web UI code for more details. + * + * @param fixedWidth if true, all columns will be rendered with the same width + * @tparam T the type of the data items that will be used to render individual rows + */ +private[spark] class UITableBuilder[T](fixedWidth: Boolean = false) { + private val cols = mutable.Buffer[UITableColumn[T, _]]() + + /** + * General builder method for table columns. By default, this extracts a field + * and displays it as as a string. You can call additional methods on the result + * of this method to customize this column's display. + */ + def col[V](name: String)(fieldExtractor: T => V): UITableColumn[T, V] = { + val newCol = new UITableColumn[T, V](name, fieldExtractor) + cols.append(newCol) + newCol + } + + /** + * Display a column of sizes, in megabytes, as human-readable strings, such as "4.0 MB". + */ + def sizeCol(name: String)(fieldExtractor: T => Long) { + col[Long](name)(fieldExtractor) sortBy (x => x.toString) formatWith Utils.megabytesToString + } + + /** + * Display a column of dates as yyyy/MM/dd HH:mm:ss format. + */ + def dateCol(name: String)(fieldExtractor: T => Date) { + col[Date](name)(fieldExtractor) formatWith UIUtils.formatDate + } + + /** + * Display a column of dates as yyyy/MM/dd HH:mm:ss format. + */ + def epochDateCol(name: String)(fieldExtractor: T => Long) { + col[Long](name)(fieldExtractor) formatWith UIUtils.formatDate + } + + /** + * Display a column of durations, in milliseconds, as human-readable strings, such as "12 s". + */ + def durationCol(name: String)(fieldExtractor: T => Long) { + col[Long](name)(fieldExtractor) sortBy (_.toString) formatWith UIUtils.formatDuration + } + + def build(): UITable[T] = { + val immutableCols: Seq[UITableColumn[T, _]] = cols.toSeq + new UITable[T](immutableCols, fixedWidth) + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 32e6b15bb099..3d897df5c321 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -234,6 +234,14 @@ private[spark] object UIUtils extends Logging { } + /** Render key-value pairs as a table */ + def stringPairTable(col1Name: String, col2Name: String): UITable[(Any, Any)] = { + val t = new UITableBuilder[(Any, Any)](fixedWidth = true) + t.col(col1Name) { _._1.toString } + t.col(col2Name) { _._2.toString } + t.build() + } + /** Returns an HTML table constructed by generating a row for each object in a sequence. */ def listingTable[T]( headers: Seq[String], diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala index f0a1174a71d3..1f9043918b66 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala @@ -26,29 +26,18 @@ import org.apache.spark.ui.{UIUtils, WebUIPage} private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") { private val listener = parent.listener + private val propertyTable = UIUtils.stringPairTable("Name", "Value") + private val classpathTable = UIUtils.stringPairTable("Resource", "Source") + def render(request: HttpServletRequest): Seq[Node] = { - val runtimeInformationTable = UIUtils.listingTable( - propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true) - val sparkPropertiesTable = UIUtils.listingTable( - propertyHeader, propertyRow, listener.sparkProperties, fixedWidth = true) - val systemPropertiesTable = UIUtils.listingTable( - propertyHeader, propertyRow, listener.systemProperties, fixedWidth = true) - val classpathEntriesTable = UIUtils.listingTable( - classPathHeaders, classPathRow, listener.classpathEntries, fixedWidth = true) val content = -

Runtime Information

{runtimeInformationTable} -

Spark Properties

{sparkPropertiesTable} -

System Properties

{systemPropertiesTable} -

Classpath Entries

{classpathEntriesTable} +

Runtime Information

{propertyTable.render(listener.jvmInformation)} +

Spark Properties

{propertyTable.render(listener.sparkProperties)} +

System Properties

{propertyTable.render(listener.systemProperties)} +

Classpath Entries

{classpathTable.render(listener.classpathEntries)}
UIUtils.headerSparkPage("Environment", content, parent) } - - private def propertyHeader = Seq("Name", "Value") - private def classPathHeaders = Seq("Resource", "Source") - private def jvmRow(kv: (String, String)) = {kv._1}{kv._2} - private def propertyRow(kv: (String, String)) = {kv._1}{kv._2} - private def classPathRow(data: (String, String)) = {data._1}{data._2} } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 2414e4c65237..2255d31fac73 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -20,17 +20,18 @@ package org.apache.spark.ui.jobs import java.util.Date import javax.servlet.http.HttpServletRequest -import scala.xml.{Node, Unparsed} +import scala.xml.{Text, Node, Unparsed} -import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils} +import org.apache.spark.ui._ import org.apache.spark.ui.jobs.UIData._ import org.apache.spark.util.{Utils, Distribution} -import org.apache.spark.scheduler.AccumulableInfo /** Page showing statistics and task list for a given stage */ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { private val listener = parent.listener + private val accumulableTable = UIUtils.stringPairTable("Accumulable", "Value") + def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val stageId = request.getParameter("id").toInt @@ -57,6 +58,101 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val hasShuffleWrite = stageData.shuffleWriteBytes > 0 val hasBytesSpilled = stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0 + val taskTableRenderer: UITable[TaskUIData] = { + val t = new UITableBuilder[TaskUIData]() + t.col("Index") { _.taskInfo.index } + t.col("Id") { _.taskInfo.taskId } + t.col("Attempt") { _.taskInfo } formatWith { info => + if (info.speculative) s"${info.attempt} (speculative)" else info.attempt.toString + } sortBy { info => + info.taskId.toString + } + t.col("Status") { _.taskInfo.status } + t.col("Locality level") { _.taskInfo.taskLocality } + t.col("Executor ID / Host") { case TaskUIData(info, _, _) => + s"${info.executorId} / ${info.host}" + } + t.dateCol("Launch Time") { case TaskUIData(info, _, _) => + new Date(info.launchTime) + } + t.col("Duration") { case TaskUIData(info, metrics, _) => + val duration = + if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis()) + else metrics.map(_.executorRunTime).getOrElse(1L) + (info, metrics, duration) + } formatWith { case (info, metrics, duration) => + if (info.status == "RUNNING") UIUtils.formatDuration(duration) + else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("") + } sortBy { case (info, metrics, duration) => + duration.toString + } + + t.durationCol("GC Time") { _.taskMetrics.map(_.jvmGCTime).getOrElse(0L)} + t.durationCol("Serialization Time") { + _.taskMetrics.map(_.resultSerializationTime).getOrElse(0L) + } + t.col("Accumulators")(identity) withMarkup { case TaskUIData(info, _, _) => + Unparsed( + info.accumulables.map{acc => s"${acc.name}: ${acc.update.get}"}.mkString("
") + ) + } + if (hasInput) { + t.col("Input") { + _.taskMetrics.flatMap(_.inputMetrics) + } sortBy { + _.map(_.bytesRead.toString).getOrElse("") + } formatWith { _.map( + m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase})") + .getOrElse("") + } + } + if (hasShuffleRead) { + t.col("Shuffle Read") { + _.taskMetrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead) + } sortBy { + _.map(_.toString).getOrElse("") + } formatWith { + _.map(Utils.bytesToString).getOrElse("") + } + } + if (hasShuffleWrite) { + t.col("Write Time") { + _.taskMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime) + } sortBy { + _.map(_.toString).getOrElse("") + } formatWith { + _.map(t => t / (1000 * 1000)).map { ms => + if (ms == 0) "" else UIUtils.formatDuration(ms) + }.getOrElse("") + } + + t.col("Shuffle Write") { + _.taskMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten) + } sortBy { + _.map(_.toString).getOrElse("") + } formatWith { + _.map(Utils.bytesToString).getOrElse("") + } + } + if (hasBytesSpilled) { + t.col("Shuffle Spill (Memory)") { _.taskMetrics.map(_.memoryBytesSpilled) } sortBy { + _.map(_.toString).getOrElse("") + } formatWith { + _.map(Utils.bytesToString).getOrElse("") + } + + t.col("Shuffle Spill (Disk)") { _.taskMetrics.map(_.diskBytesSpilled) } sortBy { + _.map(_.toString).getOrElse("") + } formatWith { + _.map(Utils.bytesToString).getOrElse("") + } + } + t.col("Errors")(identity) withMarkup { case TaskUIData(_, _, errorMessage) => + errorMessage.map { e =>
{e}
}.getOrElse(Text("")) + } + t.build() + } + // scalastyle:off val summary =
@@ -96,23 +192,10 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
// scalastyle:on - val accumulableHeaders: Seq[String] = Seq("Accumulable", "Value") - def accumulableRow(acc: AccumulableInfo) = {acc.name}{acc.value} - val accumulableTable = UIUtils.listingTable(accumulableHeaders, accumulableRow, - accumulables.values.toSeq) - - val taskHeaders: Seq[String] = - Seq( - "Index", "ID", "Attempt", "Status", "Locality Level", "Executor ID / Host", - "Launch Time", "Duration", "GC Time", "Accumulators") ++ - {if (hasInput) Seq("Input") else Nil} ++ - {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ - {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ - {if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++ - Seq("Errors") - - val taskTable = UIUtils.listingTable( - taskHeaders, taskRow(hasInput, hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks) + val accumulableTable = + this.accumulableTable.render(accumulables.values.map(a => (a.name, a.value))) + + val taskTable = taskTableRenderer.render(tasks) // Excludes tasks which failed and have incomplete metrics val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined) @@ -230,107 +313,4 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { UIUtils.headerSparkPage("Details for Stage %d".format(stageId), content, parent) } } - - def taskRow( - hasInput: Boolean, - hasShuffleRead: Boolean, - hasShuffleWrite: Boolean, - hasBytesSpilled: Boolean)(taskData: TaskUIData): Seq[Node] = { - taskData match { case TaskUIData(info, metrics, errorMessage) => - val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis()) - else metrics.map(_.executorRunTime).getOrElse(1L) - val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration) - else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("") - val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) - val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) - - val maybeInput = metrics.flatMap(_.inputMetrics) - val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("") - val inputReadable = maybeInput - .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})") - .getOrElse("") - - val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead) - val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") - val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("") - - val maybeShuffleWrite = - metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten) - val shuffleWriteSortable = maybeShuffleWrite.map(_.toString).getOrElse("") - val shuffleWriteReadable = maybeShuffleWrite.map(Utils.bytesToString).getOrElse("") - - val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime) - val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("") - val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms => - if (ms == 0) "" else UIUtils.formatDuration(ms) - }.getOrElse("") - - val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled) - val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("") - val memoryBytesSpilledReadable = - maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("") - - val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled) - val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("") - val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("") - - - {info.index} - {info.taskId} - { - if (info.speculative) s"${info.attempt} (speculative)" else info.attempt.toString - } - {info.status} - {info.taskLocality} - {info.executorId} / {info.host} - {UIUtils.formatDate(new Date(info.launchTime))} - - {formatDuration} - - - {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""} - - - {Unparsed( - info.accumulables.map{acc => s"${acc.name}: ${acc.update.get}"}.mkString("
") - )} - - - {if (hasInput) { - - {inputReadable} - - }} - {if (hasShuffleRead) { - - {shuffleReadReadable} - - }} - {if (hasShuffleWrite) { - - {writeTimeReadable} - - - {shuffleWriteReadable} - - }} - {if (hasBytesSpilled) { - - {memoryBytesSpilledReadable} - - - {diskBytesSpilledReadable} - - }} - - {errorMessage.map { e =>
{e}
}.getOrElse("")} - - - } - } } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 8a0075ae8daf..b9d1b7a9d360 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -19,16 +19,45 @@ package org.apache.spark.ui.storage import javax.servlet.http.HttpServletRequest -import scala.xml.Node +import scala.xml.{Text, Node} import org.apache.spark.storage.{BlockId, BlockStatus, StorageStatus, StorageUtils} -import org.apache.spark.ui.{WebUIPage, UIUtils} +import org.apache.spark.ui.{UITableBuilder, UITable, WebUIPage, UIUtils} import org.apache.spark.util.Utils /** Page showing storage details for a given RDD */ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { private val listener = parent.listener + private val workerTable: UITable[(Int, StorageStatus)] = { + val t = new UITableBuilder[(Int, StorageStatus)]() + t.col("Host") { case (_, status) => + s"${status.blockManagerId.host}:${status.blockManagerId.port}" + } + def getMemUsed(x: (Int, StorageStatus)): String = x._2.memUsedByRdd(x._1).toString + t.col("Memory Usage") (identity) sortBy { + getMemUsed + } withMarkup { case (rddId, status) => + val used = Utils.bytesToString(status.memUsedByRdd(rddId)) + val remaining = Utils.bytesToString(status.memRemaining) + Text(s"$used ($remaining Remaining)") + } + t.sizeCol("Disk Usage") { case (rddId, status) => status.diskUsedByRdd(rddId) } + t.build() + } + + private val blockTable: UITable[(BlockId, BlockStatus, Seq[String])] = { + val t = new UITableBuilder[(BlockId, BlockStatus, Seq[String])]() + t.col("Block Name") { case (id, block, locations) => id.toString } + t.col("Storage Level") { case (id, block, locations) => block.storageLevel.description } + t. sizeCol("Size in Memory") { case (id, block, locations) => block.memSize } + t.sizeCol("Size on Disk") { case (id, block, locations) => block.diskSize } + t.col("Executors") (identity) withMarkup { case (id, block, locations) => + locations.map(l => {l}
) + } + t.build() + } + def render(request: HttpServletRequest): Seq[Node] = { val rddId = request.getParameter("id").toInt val storageStatusList = listener.storageStatusList @@ -39,7 +68,7 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { // Worker table val workers = storageStatusList.map((rddId, _)) - val workerTable = UIUtils.listingTable(workerHeader, workerRow, workers) + val workerTable = this.workerTable.render(workers) // Block table val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList) @@ -49,7 +78,7 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { .map { case (blockId, status) => (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown"))) } - val blockTable = UIUtils.listingTable(blockHeader, blockRow, blocks) + val blockTable = this.blockTable.render(blocks) val content =
@@ -95,51 +124,4 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { UIUtils.headerSparkPage("RDD Storage Info for " + rddInfo.name, content, parent) } - - /** Header fields for the worker table */ - private def workerHeader = Seq( - "Host", - "Memory Usage", - "Disk Usage") - - /** Header fields for the block table */ - private def blockHeader = Seq( - "Block Name", - "Storage Level", - "Size in Memory", - "Size on Disk", - "Executors") - - /** Render an HTML row representing a worker */ - private def workerRow(worker: (Int, StorageStatus)): Seq[Node] = { - val (rddId, status) = worker - - {status.blockManagerId.host + ":" + status.blockManagerId.port} - - {Utils.bytesToString(status.memUsedByRdd(rddId))} - ({Utils.bytesToString(status.memRemaining)} Remaining) - - {Utils.bytesToString(status.diskUsedByRdd(rddId))} - - } - - /** Render an HTML row representing a block */ - private def blockRow(row: (BlockId, BlockStatus, Seq[String])): Seq[Node] = { - val (id, block, locations) = row - - {id} - - {block.storageLevel.description} - - - {Utils.bytesToString(block.memSize)} - - - {Utils.bytesToString(block.diskSize)} - - - {locations.map(l => {l}
)} - - - } } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala index 83489ca0679e..a72bc368ab3b 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala @@ -22,46 +22,32 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.storage.RDDInfo -import org.apache.spark.ui.{WebUIPage, UIUtils} -import org.apache.spark.util.Utils +import org.apache.spark.ui.{UITableBuilder, UITable, WebUIPage, UIUtils} /** Page showing list of RDD's currently stored in the cluster */ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") { private val listener = parent.listener - def render(request: HttpServletRequest): Seq[Node] = { - val rdds = listener.rddInfoList - val content = UIUtils.listingTable(rddHeader, rddRow, rdds) - UIUtils.headerSparkPage("Storage", content, parent) + val rddTable: UITable[RDDInfo] = { + val t = new UITableBuilder[RDDInfo]() + t.col("RDD Name") (identity) withMarkup { rdd => + + {rdd.name} + + } + t.col("Storage Level") { _.storageLevel.description } + t.col("Cached Partitions") { _.numCachedPartitions } + t.col("Fraction Cached") { rdd => + "%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions) + } + t.sizeCol("Size in Memory") { _.memSize } + t.sizeCol("Size in Tachyon") { _.tachyonSize } + t.sizeCol("Size on Disk") { _.diskSize } + t.build() } - /** Header fields for the RDD table */ - private def rddHeader = Seq( - "RDD Name", - "Storage Level", - "Cached Partitions", - "Fraction Cached", - "Size in Memory", - "Size in Tachyon", - "Size on Disk") - - /** Render an HTML row representing an RDD */ - private def rddRow(rdd: RDDInfo): Seq[Node] = { - // scalastyle:off - - - - {rdd.name} - - - {rdd.storageLevel.description} - - {rdd.numCachedPartitions} - {"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)} - {Utils.bytesToString(rdd.memSize)} - {Utils.bytesToString(rdd.tachyonSize)} - {Utils.bytesToString(rdd.diskSize)} - - // scalastyle:on + def render(request: HttpServletRequest): Seq[Node] = { + val content = rddTable.render(listener.rddInfoList) + UIUtils.headerSparkPage("Storage", content, parent) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 1353e487c72c..0aff0d84ed84 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -25,7 +25,8 @@ import scala.xml.Node import org.apache.spark.Logging import org.apache.spark.ui._ import org.apache.spark.ui.UIUtils._ -import org.apache.spark.util.Distribution +import org.apache.spark.streaming.scheduler.ReceiverInfo + /** Page for Spark Web UI that shows statistics of a streaming job */ private[ui] class StreamingPage(parent: StreamingTab) @@ -33,7 +34,7 @@ private[ui] class StreamingPage(parent: StreamingTab) private val listener = parent.listener private val startTime = Calendar.getInstance().getTime() - private val emptyCell = "-" + private val empty = "-" /** Render the page */ def render(request: HttpServletRequest): Seq[Node] = { @@ -45,6 +46,26 @@ private[ui] class StreamingPage(parent: StreamingTab) UIUtils.headerSparkPage("Streaming", content, parent, Some(5000)) } + private type BatchStatsRowType = (String, Option[Long], Option[Seq[Double]]) + private val batchStatsTable: UITable[BatchStatsRowType] = { + val t = new UITableBuilder[BatchStatsRowType]() + t.col("Metric") { _._1 } + t.col("Last batch") { _._2 } + def optDurationCol(name: String)(fieldExtractor: BatchStatsRowType => Option[Long]) { + t.col(name)(fieldExtractor) sortBy { + _.getOrElse("").toString + } formatWith { + _.map(UIUtils.formatDuration).getOrElse("-") + } + } + t.col("Minimum") { _._3.map(_(0).toLong) } + optDurationCol("25th percentile") { _._3.map(_(1).toLong) } + optDurationCol("Median") { _._3.map(_(2).toLong) } + optDurationCol("75th percentile") { _._3.map(_(3).toLong) } + optDurationCol("Maximum") { _._3.map(_(4).toLong) } + t.build() + } + /** Generate basic stats of the streaming program */ private def generateBasicStats(): Seq[Node] = { val timeSinceStart = System.currentTimeMillis() - startTime.getTime @@ -75,37 +96,46 @@ private[ui] class StreamingPage(parent: StreamingTab) val receivedRecordDistributions = listener.receivedRecordsDistributions val lastBatchReceivedRecord = listener.lastReceivedBatchRecords val table = if (receivedRecordDistributions.size > 0) { - val headerRow = Seq( - "Receiver", - "Status", - "Location", - "Records in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]", - "Minimum rate\n[records/sec]", - "Median rate\n[records/sec]", - "Maximum rate\n[records/sec]", - "Last Error" - ) - val dataRows = (0 until listener.numReceivers).map { receiverId => - val receiverInfo = listener.receiverInfo(receiverId) - val receiverName = receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId") - val receiverActive = receiverInfo.map { info => - if (info.active) "ACTIVE" else "INACTIVE" - }.getOrElse(emptyCell) - val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell) - val receiverLastBatchRecords = formatNumber(lastBatchReceivedRecord(receiverId)) - val receivedRecordStats = receivedRecordDistributions(receiverId).map { d => - d.getQuantiles(Seq(0.0, 0.5, 1.0)).map(r => formatNumber(r.toLong)) - }.getOrElse { - Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell) + val tableRenderer: UITable[(Int, Option[ReceiverInfo])] = { + val t = new UITableBuilder[(Int, Option[ReceiverInfo])]() + t.col("Receiver") { case (receiverId, receiverInfo) => + receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId") + } + t.col("Status") { case (_, receiverInfo) => + receiverInfo.map { info => if (info.active) "ACTIVE" else "INACTIVE" }.getOrElse(empty) } - val receiverLastError = listener.receiverInfo(receiverId).map { info => - val msg = s"${info.lastErrorMessage} - ${info.lastError}" - if (msg.size > 100) msg.take(97) + "..." else msg - }.getOrElse(emptyCell) - Seq(receiverName, receiverActive, receiverLocation, receiverLastBatchRecords) ++ - receivedRecordStats ++ Seq(receiverLastError) + t.col("Location") { case (_, receiverInfo) => + receiverInfo.map(_.location).getOrElse(empty) + } + t.col("Records in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]") { + case (receiverId, _) => formatNumber(lastBatchReceivedRecord(receiverId)) + } + t.col("Minimum rate\n[records/sec]") { case (receiverId, _) => + receivedRecordDistributions(receiverId).map { + _.getQuantiles(Seq(0.0)).map(formatNumber).head + }.getOrElse(empty) + } + t.col("Median rate\n[records/sec]") { case (receiverId, _) => + receivedRecordDistributions(receiverId).map { + _.getQuantiles(Seq(0.5)).map(formatNumber).head + }.getOrElse(empty) + } + t.col("Maximum rate\n[records/sec]") { case (receiverId, _) => + receivedRecordDistributions(receiverId).map { + _.getQuantiles(Seq(1.0)).map(formatNumber).head + }.getOrElse(empty) + } + t.col("Last Error") { case (_, receiverInfo) => + receiverInfo.map { info => + val msg = s"${info.lastErrorMessage} - ${info.lastError}" + if (msg.size > 100) msg.take(97) + "..." else msg + }.getOrElse(empty) + } + t.build() } - Some(listingTable(headerRow, dataRows)) + + val dataRows = (0 until listener.numReceivers).map { id => (id, listener.receiverInfo(id)) } + Some(tableRenderer.render(dataRows)) } else { None } @@ -121,33 +151,20 @@ private[ui] class StreamingPage(parent: StreamingTab) private def generateBatchStatsTable(): Seq[Node] = { val numBatches = listener.retainedCompletedBatches.size val lastCompletedBatch = listener.lastCompletedBatch + val table = if (numBatches > 0) { - val processingDelayQuantilesRow = { - Seq( - "Processing Time", - formatDurationOption(lastCompletedBatch.flatMap(_.processingDelay)) - ) ++ getQuantiles(listener.processingDelayDistribution) - } - val schedulingDelayQuantilesRow = { - Seq( - "Scheduling Delay", - formatDurationOption(lastCompletedBatch.flatMap(_.schedulingDelay)) - ) ++ getQuantiles(listener.schedulingDelayDistribution) - } - val totalDelayQuantilesRow = { - Seq( - "Total Delay", - formatDurationOption(lastCompletedBatch.flatMap(_.totalDelay)) - ) ++ getQuantiles(listener.totalDelayDistribution) - } - val headerRow = Seq("Metric", "Last batch", "Minimum", "25th percentile", - "Median", "75th percentile", "Maximum") - val dataRows: Seq[Seq[String]] = Seq( - processingDelayQuantilesRow, - schedulingDelayQuantilesRow, - totalDelayQuantilesRow + val rows: Seq[(String, Option[Long], Option[Seq[Double]])] = Seq( + ("Processing Time", + lastCompletedBatch.flatMap(_.processingDelay), + listener.processingDelayDistribution.map(_.getQuantiles())), + ("Scheduling Delay", + lastCompletedBatch.flatMap(_.schedulingDelay), + listener.schedulingDelayDistribution.map(_.getQuantiles())), + ("Total Delay", + lastCompletedBatch.flatMap(_.totalDelay), + listener.totalDelayDistribution.map(_.getQuantiles())) ) - Some(listingTable(headerRow, dataRows)) + Some(batchStatsTable.render(rows)) } else { None } @@ -162,26 +179,4 @@ private[ui] class StreamingPage(parent: StreamingTab) content } - - - /** - * Returns a human-readable string representing a duration such as "5 second 35 ms" - */ - private def formatDurationOption(msOption: Option[Long]): String = { - msOption.map(formatDurationVerbose).getOrElse(emptyCell) - } - - /** Get quantiles for any time distribution */ - private def getQuantiles(timeDistributionOption: Option[Distribution]) = { - timeDistributionOption.get.getQuantiles().map { ms => formatDurationVerbose(ms.toLong) } - } - - /** Generate HTML table from string data */ - private def listingTable(headers: Seq[String], data: Seq[Seq[String]]) = { - def generateDataRow(data: Seq[String]): Seq[Node] = { - {data.map(d => {d})} - } - UIUtils.listingTable(headers, generateDataRow, data, fixedWidth = true) - } } -