diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala index f0a1174a71d3..d0cc8618dc62 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala @@ -19,6 +19,9 @@ package org.apache.spark.ui.env import javax.servlet.http.HttpServletRequest +import org.json4s.{JObject, JValue} +import org.json4s.JsonDSL._ + import scala.xml.Node import org.apache.spark.ui.{UIUtils, WebUIPage} @@ -26,6 +29,25 @@ import org.apache.spark.ui.{UIUtils, WebUIPage} private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") { private val listener = parent.listener + override def renderJson(request: HttpServletRequest): JValue = { + val jvmInfoJson = + ("Runtime Information" -> listener.jvmInformation.foldLeft(JObject())(_ ~ _)) + val sparkPropertiesJson = + ("Spark Properties" -> listener.sparkProperties.foldLeft(JObject())(_ ~ _)) + val systemPropertiesJson = + ("System Properties" -> listener.systemProperties.foldLeft(JObject())(_ ~ _)) + val classPathEntriesJson = + ("Classpath Entries" -> listener.classpathEntries.foldLeft(JObject())(_ ~ _)) + + val environmentJson = + jvmInfoJson ~ + sparkPropertiesJson ~ + systemPropertiesJson ~ + classPathEntriesJson + + environmentJson + } + def render(request: HttpServletRequest): Seq[Node] = { val runtimeInformationTable = UIUtils.listingTable( propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index b0e3bb3b552f..1779c8601455 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -21,6 +21,9 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node +import org.json4s.JValue +import org.json4s.JsonDSL._ + import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} import org.apache.spark.util.Utils @@ -44,6 +47,28 @@ private case class ExecutorSummaryInfo( private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { private val listener = parent.listener + override def renderJson(request: HttpServletRequest): JValue = { + val storageStatusList = listener.storageStatusList + val execInfoJsonList = for (statusId <- 0 until storageStatusList.size) yield { + val execInfo = getExecInfo(statusId) + ("Executor ID" -> execInfo.id) ~ + ("Address" -> execInfo.hostPort) ~ + ("RDD Blocks" -> execInfo.rddBlocks) ~ + ("Memory Used" -> execInfo.memoryUsed) ~ + ("Disk Used" -> execInfo.diskUsed) ~ + ("Active Tasks" -> execInfo.activeTasks) ~ + ("Failed Tasks" -> execInfo.failedTasks) ~ + ("Complete Tasks" -> execInfo.completedTasks) ~ + ("TotalTasks" -> execInfo.totalTasks) ~ + ("Task Time" -> execInfo.totalDuration) ~ + ("Input" -> execInfo.totalInputBytes) ~ + ("Shuffle Read" -> execInfo.totalShuffleRead) ~ + ("Shuffle Write" -> execInfo.totalShuffleWrite) + } + + execInfoJsonList + } + def render(request: HttpServletRequest): Seq[Node] = { val storageStatusList = listener.storageStatusList val maxMem = storageStatusList.map(_.maxMem).sum diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala index a82f71ed0847..8ce8e8077287 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala @@ -19,6 +19,11 @@ package org.apache.spark.ui.jobs import javax.servlet.http.HttpServletRequest +import org.apache.spark.util.JsonProtocol + +import org.json4s.JValue +import org.json4s.JsonDSL._ + import scala.xml.{Node, NodeSeq} import org.apache.spark.scheduler.Schedulable @@ -31,6 +36,29 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") private val listener = parent.listener private lazy val isFairScheduler = parent.isFairScheduler + override def renderJson(request: HttpServletRequest): JValue = { + listener.synchronized { + + val activeStageList = + listener.activeStages.values.map { info => JsonProtocol.stageInfoToJson(info) } + val activeStageJson = ("Active Stages" -> activeStageList) + val completedStageList = + listener.completedStages.reverse.map { info => JsonProtocol.stageInfoToJson(info) } + val completedStageJson = ("Completed Stages" -> completedStageList) + val failedStageList = + listener.failedStages.reverse.map { info => JsonProtocol.stageInfoToJson(info) } + val failedStageJson = ("Failed Stages" -> failedStageList) + + val stageInfoJson = + ("Scheduling Mode" -> listener.schedulingMode.map(_.toString).getOrElse("Unknown")) ~ + activeStageJson ~ + completedStageJson ~ + failedStageJson + + stageInfoJson + } + } + def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val activeStages = listener.activeStages.values.toSeq diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index 7a6c7d1a497e..58eb4df110dc 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -19,8 +19,13 @@ package org.apache.spark.ui.jobs import javax.servlet.http.HttpServletRequest +import org.apache.spark.util.JsonProtocol + import scala.xml.Node +import org.json4s.JValue +import org.json4s.JsonDSL._ + import org.apache.spark.scheduler.{Schedulable, StageInfo} import org.apache.spark.ui.{WebUIPage, UIUtils} @@ -30,6 +35,34 @@ private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") { private val sc = parent.sc private val listener = parent.listener + override def renderJson(request: HttpServletRequest): JValue = { + listener.synchronized { + val poolName = request.getParameter("poolname") + val poolToActiveStages = listener.poolToActiveStages + val activeStages = poolToActiveStages.get(poolName) match { + case Some(s) => s.values.map { + case info: StageInfo => + JsonProtocol.stageInfoToJson(info) + } + case None => Seq[JValue]() + } + + val pools = if (live) Seq(sc.getPoolForName(poolName).get) else Seq[Schedulable]() + + val poolListJson = + pools.map { schedulable => + ("Pool Name" -> schedulable.name) ~ + ("Minimum Share" -> schedulable.minShare) ~ + ("Pool Weight" -> schedulable.weight) ~ + ("Active Stages" -> activeStages) ~ + ("Running Tasks" -> schedulable.runningTasks) ~ + ("Scheduling Mode" -> schedulable.schedulingMode.toString) + } + + poolListJson + } + } + def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val poolName = request.getParameter("poolname") diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index db01be596e07..8357ed79ee7f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -22,15 +22,66 @@ import javax.servlet.http.HttpServletRequest import scala.xml.{Node, Unparsed} +import org.json4s.{JNothing, JObject, JValue} +import org.json4s.JsonDSL._ + import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils} import org.apache.spark.ui.jobs.UIData._ -import org.apache.spark.util.{Utils, Distribution} +import org.apache.spark.util.{JsonProtocol, Utils, Distribution} import org.apache.spark.scheduler.AccumulableInfo /** Page showing statistics and task list for a given stage */ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { private val listener = parent.listener + override def renderJson(request: HttpServletRequest): JValue = { + val stageId = request.getParameter("id").toInt + val stageAttemptId = request.getParameter("attempt").toInt + var stageSummary = ("Stage ID" -> stageId) ~ ("Stage Attempt ID" -> stageAttemptId) + val stageDataOpt = listener.stageIdToData.get((stageId, stageAttemptId)) + var stageInfoJson: JValue = JNothing + + if (!stageDataOpt.isEmpty && !stageDataOpt.get.taskData.isEmpty) { + val stageData = stageDataOpt.get + + stageSummary ~= ("Executor Run Time" -> stageData.executorRunTime) + if (stageData.inputBytes > 0) stageSummary ~= ("Input Bytes" -> stageData.inputBytes) + if (stageData.shuffleReadBytes > 0) { + stageSummary ~= ("Shuffle Read Bytes" -> stageData.shuffleReadBytes) + } + + if (stageData.shuffleWriteBytes > 0) { + stageSummary ~= ("Shuffle Write bytes" -> stageData.shuffleWriteBytes) + } + + if (stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0) { + stageSummary ~= + ("Memory Bytes Spilled" -> stageData.memoryBytesSpilled) ~ + ("Disk Bytes Spilled" -> stageData.diskBytesSpilled) + } + + val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime) + + val taskList = tasks.map { + case uiData: TaskUIData => + var jsonTaskInfo: JValue = JsonProtocol.taskInfoToJson(uiData.taskInfo) + val jsonTaskMetrics: JValue = + if (uiData.taskMetrics.isDefined) { + JsonProtocol.taskMetricsToJson(uiData.taskMetrics.get) + } else JNothing + + if (jsonTaskInfo.isInstanceOf[JObject] && jsonTaskMetrics.isInstanceOf[JObject]) { + jsonTaskInfo = + jsonTaskInfo.asInstanceOf[JObject] ~ jsonTaskMetrics.asInstanceOf[JObject] + } + jsonTaskInfo + } + + stageInfoJson = ("Stage Summary" -> stageSummary) ~ ("Tasks" -> taskList) + } + stageInfoJson + } + def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val stageId = request.getParameter("id").toInt diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 8a0075ae8daf..4ba7a29b4b48 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -21,7 +21,10 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.storage.{BlockId, BlockStatus, StorageStatus, StorageUtils} +import org.json4s.{JNothing, JValue} +import org.json4s.JsonDSL._ + +import org.apache.spark.storage._ import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils @@ -29,6 +32,61 @@ import org.apache.spark.util.Utils private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { private val listener = parent.listener + override def renderJson(request: HttpServletRequest): JValue = { + val rddId = request.getParameter("id").toInt + val storageStatusList = listener.storageStatusList + val rddInfoOpt = listener.rddInfoList.find(_.id == rddId) + + var rddInfoJson: JValue = JNothing + + if (rddInfoOpt.isDefined) { + val rddInfo = rddInfoOpt.get + + val rddSummaryJson = ("RDD Summary" -> + ("RDD ID" -> rddId) ~ + ("Storage Level" -> rddInfo.storageLevel.description) ~ + ("Cached Partitions" -> rddInfo.numCachedPartitions) ~ + ("Total Partitions" -> rddInfo.numPartitions) ~ + ("Memory Size" -> rddInfo.memSize) ~ + ("Disk Size" -> rddInfo.diskSize)) + + val dataDistributionList = + storageStatusList.map { status => + ("Host" -> (status.blockManagerId.host + ":" + status.blockManagerId.port)) ~ + ("Memory Usage" -> status.memUsedByRdd(rddId)) ~ + ("Memory Remaining" -> status.memRemaining) ~ + ("Disk Usage" -> status.diskUsedByRdd(rddId)) + } + + val dataDistributionJson = ("Data Distribution" -> dataDistributionList) + + val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList) + val blocks = storageStatusList + .flatMap(_.rddBlocksById(rddId)) + .sortWith(_._1.name < _._1.name) + .map { case (blockId, status) => + (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown"))) + } + + val partitionList = + blocks.map { case (id, block, locations) => + ("Block Name" -> id.toString) ~ + ("Storage Level" -> block.storageLevel.description) ~ + ("Size in Memory" -> block.memSize) ~ + ("Size on Disk" -> block.diskSize) ~ + ("Executors" -> locations) + } + + val partitionsJson = ("Partitions" -> partitionList) + + rddInfoJson = + rddSummaryJson ~ + dataDistributionJson ~ + partitionsJson + } + rddInfoJson + } + def render(request: HttpServletRequest): Seq[Node] = { val rddId = request.getParameter("id").toInt val storageStatusList = listener.storageStatusList diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala index 716591c9ed44..dcf851867bd7 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala @@ -21,14 +21,24 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node +import org.json4s.JValue +import org.json4s.JsonDSL._ + import org.apache.spark.storage.RDDInfo import org.apache.spark.ui.{WebUIPage, UIUtils} -import org.apache.spark.util.Utils +import org.apache.spark.util.{JsonProtocol, Utils} /** Page showing list of RDD's currently stored in the cluster */ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") { private val listener = parent.listener + override def renderJson(request: HttpServletRequest): JValue = { + val rddJsonList = + listener.rddInfoList.map { info => JsonProtocol.rddInfoToJson(info) } + + rddJsonList + } + def render(request: HttpServletRequest): Seq[Node] = { val rdds = listener.rddInfoList val content = UIUtils.listingTable(rddHeader, rddRow, rdds)