Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ class InputMetrics private (_bytesRead: Accumulator[Long], _recordsRead: Accumul
*/
def recordsRead: Long = _recordsRead.localValue

/**
* Returns true if this metrics has been updated before.
*/
def isUpdated: Boolean = (bytesRead | recordsRead) != 0

private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v)
private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ class OutputMetrics private (_bytesWritten: Accumulator[Long], _recordsWritten:
*/
def recordsWritten: Long = _recordsWritten.localValue

/**
* Returns true if this metrics has been updated before.
*/
def isUpdated: Boolean = (bytesWritten | recordsWritten) != 0

private[spark] def setBytesWritten(v: Long): Unit = _bytesWritten.setValue(v)
private[spark] def setRecordsWritten(v: Long): Unit = _recordsWritten.setValue(v)
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,6 @@ class ShuffleReadMetrics private (
*/
def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched

/**
* Returns true if this metrics has been updated before.
*/
def isUpdated: Boolean = (totalBytesRead | totalBlocksFetched | recordsRead | fetchWaitTime) != 0

private[spark] def incRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched.add(v)
private[spark] def incLocalBlocksFetched(v: Int): Unit = _localBlocksFetched.add(v)
private[spark] def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead.add(v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ class ShuffleWriteMetrics private (
*/
def writeTime: Long = _writeTime.localValue

/**
* Returns true if this metrics has been updated before.
*/
def isUpdated: Boolean = (writeTime | recordsWritten | bytesWritten) != 0

private[spark] def incBytesWritten(v: Long): Unit = _bytesWritten.add(v)
private[spark] def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v)
private[spark] def incWriteTime(v: Long): Unit = _writeTime.add(v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,47 +167,32 @@ private[v1] object AllStagesResource {
// to make it a little easier to deal w/ all of the nested options. Mostly it lets us just
// implement one "build" method, which just builds the quantiles for each field.

val inputMetrics: Option[InputMetricDistributions] =
val inputMetrics: InputMetricDistributions =
new MetricHelper[InternalInputMetrics, InputMetricDistributions](rawMetrics, quantiles) {
def getSubmetrics(raw: InternalTaskMetrics): Option[InternalInputMetrics] = {
if (raw.inputMetrics.isUpdated) {
Some(raw.inputMetrics)
} else {
None
}
}
def getSubmetrics(raw: InternalTaskMetrics): InternalInputMetrics = raw.inputMetrics

def build: InputMetricDistributions = new InputMetricDistributions(
bytesRead = submetricQuantiles(_.bytesRead),
recordsRead = submetricQuantiles(_.recordsRead)
)
}.metricOption
}.build

val outputMetrics: Option[OutputMetricDistributions] =
val outputMetrics: OutputMetricDistributions =
new MetricHelper[InternalOutputMetrics, OutputMetricDistributions](rawMetrics, quantiles) {
def getSubmetrics(raw: InternalTaskMetrics): Option[InternalOutputMetrics] = {
if (raw.outputMetrics.isUpdated) {
Some(raw.outputMetrics)
} else {
None
}
}
def getSubmetrics(raw: InternalTaskMetrics): InternalOutputMetrics = raw.outputMetrics

def build: OutputMetricDistributions = new OutputMetricDistributions(
bytesWritten = submetricQuantiles(_.bytesWritten),
recordsWritten = submetricQuantiles(_.recordsWritten)
)
}.metricOption
}.build

val shuffleReadMetrics: Option[ShuffleReadMetricDistributions] =
val shuffleReadMetrics: ShuffleReadMetricDistributions =
new MetricHelper[InternalShuffleReadMetrics, ShuffleReadMetricDistributions](rawMetrics,
quantiles) {
def getSubmetrics(raw: InternalTaskMetrics): Option[InternalShuffleReadMetrics] = {
if (raw.shuffleReadMetrics.isUpdated) {
Some(raw.shuffleReadMetrics)
} else {
None
}
}
def getSubmetrics(raw: InternalTaskMetrics): InternalShuffleReadMetrics =
raw.shuffleReadMetrics

def build: ShuffleReadMetricDistributions = new ShuffleReadMetricDistributions(
readBytes = submetricQuantiles(_.totalBytesRead),
readRecords = submetricQuantiles(_.recordsRead),
Expand All @@ -217,24 +202,20 @@ private[v1] object AllStagesResource {
totalBlocksFetched = submetricQuantiles(_.totalBlocksFetched),
fetchWaitTime = submetricQuantiles(_.fetchWaitTime)
)
}.metricOption
}.build

val shuffleWriteMetrics: Option[ShuffleWriteMetricDistributions] =
val shuffleWriteMetrics: ShuffleWriteMetricDistributions =
new MetricHelper[InternalShuffleWriteMetrics, ShuffleWriteMetricDistributions](rawMetrics,
quantiles) {
def getSubmetrics(raw: InternalTaskMetrics): Option[InternalShuffleWriteMetrics] = {
if (raw.shuffleWriteMetrics.isUpdated) {
Some(raw.shuffleWriteMetrics)
} else {
None
}
}
def getSubmetrics(raw: InternalTaskMetrics): InternalShuffleWriteMetrics =
raw.shuffleWriteMetrics

def build: ShuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions(
writeBytes = submetricQuantiles(_.bytesWritten),
writeRecords = submetricQuantiles(_.recordsWritten),
writeTime = submetricQuantiles(_.writeTime)
)
}.metricOption
}.build

new TaskMetricDistributions(
quantiles = quantiles,
Expand Down Expand Up @@ -273,84 +254,55 @@ private[v1] object AllStagesResource {
)
}

def convertInputMetrics(internal: InternalInputMetrics): Option[InputMetrics] = {
if (internal.isUpdated) {
Some(new InputMetrics(
bytesRead = internal.bytesRead,
recordsRead = internal.recordsRead
))
} else {
None
}
def convertInputMetrics(internal: InternalInputMetrics): InputMetrics = {
new InputMetrics(
bytesRead = internal.bytesRead,
recordsRead = internal.recordsRead
)
}

def convertOutputMetrics(internal: InternalOutputMetrics): Option[OutputMetrics] = {
if (internal.isUpdated) {
Some(new OutputMetrics(
bytesWritten = internal.bytesWritten,
recordsWritten = internal.recordsWritten
))
} else {
None
}
def convertOutputMetrics(internal: InternalOutputMetrics): OutputMetrics = {
new OutputMetrics(
bytesWritten = internal.bytesWritten,
recordsWritten = internal.recordsWritten
)
}

def convertShuffleReadMetrics(
internal: InternalShuffleReadMetrics): Option[ShuffleReadMetrics] = {
if (internal.isUpdated) {
Some(new ShuffleReadMetrics(
remoteBlocksFetched = internal.remoteBlocksFetched,
localBlocksFetched = internal.localBlocksFetched,
fetchWaitTime = internal.fetchWaitTime,
remoteBytesRead = internal.remoteBytesRead,
totalBlocksFetched = internal.totalBlocksFetched,
recordsRead = internal.recordsRead
))
} else {
None
}
def convertShuffleReadMetrics(internal: InternalShuffleReadMetrics): ShuffleReadMetrics = {
new ShuffleReadMetrics(
remoteBlocksFetched = internal.remoteBlocksFetched,
localBlocksFetched = internal.localBlocksFetched,
fetchWaitTime = internal.fetchWaitTime,
remoteBytesRead = internal.remoteBytesRead,
localBytesRead = internal.localBytesRead,
recordsRead = internal.recordsRead
)
}

def convertShuffleWriteMetrics(
internal: InternalShuffleWriteMetrics): Option[ShuffleWriteMetrics] = {
if ((internal.bytesWritten | internal.writeTime | internal.recordsWritten) == 0) {
None
} else {
Some(new ShuffleWriteMetrics(
bytesWritten = internal.bytesWritten,
writeTime = internal.writeTime,
recordsWritten = internal.recordsWritten
))
}
def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): ShuffleWriteMetrics = {
new ShuffleWriteMetrics(
bytesWritten = internal.bytesWritten,
writeTime = internal.writeTime,
recordsWritten = internal.recordsWritten
)
}
}

/**
* Helper for getting distributions from nested metric types. Many of the metrics we want are
* contained in options inside TaskMetrics (eg., ShuffleWriteMetrics). This makes it easy to handle
* the options (returning None if the metrics are all empty), and extract the quantiles for each
* metric. After creating an instance, call metricOption to get the result type.
* Helper for getting distributions from nested metric types.
*/
private[v1] abstract class MetricHelper[I, O](
rawMetrics: Seq[InternalTaskMetrics],
quantiles: Array[Double]) {

def getSubmetrics(raw: InternalTaskMetrics): Option[I]
def getSubmetrics(raw: InternalTaskMetrics): I

def build: O

val data: Seq[I] = rawMetrics.flatMap(getSubmetrics)
val data: Seq[I] = rawMetrics.map(getSubmetrics)

/** applies the given function to all input metrics, and returns the quantiles */
def submetricQuantiles(f: I => Double): IndexedSeq[Double] = {
Distribution(data.map { d => f(d) }).get.getQuantiles(quantiles)
}

def metricOption: Option[O] = {
if (data.isEmpty) {
None
} else {
Some(build)
}
}
}
18 changes: 9 additions & 9 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,10 @@ class TaskMetrics private[spark](
val resultSerializationTime: Long,
val memoryBytesSpilled: Long,
val diskBytesSpilled: Long,
val inputMetrics: Option[InputMetrics],
val outputMetrics: Option[OutputMetrics],
val shuffleReadMetrics: Option[ShuffleReadMetrics],
val shuffleWriteMetrics: Option[ShuffleWriteMetrics])
val inputMetrics: InputMetrics,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one thing - can you verify that this doesn't change the json output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find the jackson doc for it, so I tried it locally. For scala Option, jackson just treat it as a nullable field, if it's Some, it will unwrap the Some and parse the real value. If it's None, it will return null for that field.

In this case, where we always return Some, it's ok to drop the Option.

val outputMetrics: OutputMetrics,
val shuffleReadMetrics: ShuffleReadMetrics,
val shuffleWriteMetrics: ShuffleWriteMetrics)

class InputMetrics private[spark](
val bytesRead: Long,
Expand All @@ -190,7 +190,7 @@ class ShuffleReadMetrics private[spark](
val localBlocksFetched: Int,
val fetchWaitTime: Long,
val remoteBytesRead: Long,
val totalBlocksFetched: Int,
val localBytesRead: Long,
val recordsRead: Long)

class ShuffleWriteMetrics private[spark](
Expand All @@ -209,10 +209,10 @@ class TaskMetricDistributions private[spark](
val memoryBytesSpilled: IndexedSeq[Double],
val diskBytesSpilled: IndexedSeq[Double],

val inputMetrics: Option[InputMetricDistributions],
val outputMetrics: Option[OutputMetricDistributions],
val shuffleReadMetrics: Option[ShuffleReadMetricDistributions],
val shuffleWriteMetrics: Option[ShuffleWriteMetricDistributions])
val inputMetrics: InputMetricDistributions,
val outputMetrics: OutputMetricDistributions,
val shuffleReadMetrics: ShuffleReadMetricDistributions,
val shuffleWriteMetrics: ShuffleWriteMetricDistributions)

class InputMetricDistributions private[spark](
val bytesRead: IndexedSeq[Double],
Expand Down
22 changes: 5 additions & 17 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -326,39 +326,27 @@ private[spark] object JsonProtocol {
}

def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
val shuffleReadMetrics: JValue = if (taskMetrics.shuffleReadMetrics.isUpdated) {
val shuffleReadMetrics: JValue =
("Remote Blocks Fetched" -> taskMetrics.shuffleReadMetrics.remoteBlocksFetched) ~
("Local Blocks Fetched" -> taskMetrics.shuffleReadMetrics.localBlocksFetched) ~
("Fetch Wait Time" -> taskMetrics.shuffleReadMetrics.fetchWaitTime) ~
("Remote Bytes Read" -> taskMetrics.shuffleReadMetrics.remoteBytesRead) ~
("Local Bytes Read" -> taskMetrics.shuffleReadMetrics.localBytesRead) ~
("Total Records Read" -> taskMetrics.shuffleReadMetrics.recordsRead)
} else {
JNothing
}
val shuffleWriteMetrics: JValue = if (taskMetrics.shuffleWriteMetrics.isUpdated) {
val shuffleWriteMetrics: JValue =
("Shuffle Bytes Written" -> taskMetrics.shuffleWriteMetrics.bytesWritten) ~
("Shuffle Write Time" -> taskMetrics.shuffleWriteMetrics.writeTime) ~
("Shuffle Records Written" -> taskMetrics.shuffleWriteMetrics.recordsWritten)
} else {
JNothing
}
val inputMetrics: JValue = if (taskMetrics.inputMetrics.isUpdated) {
val inputMetrics: JValue =
("Bytes Read" -> taskMetrics.inputMetrics.bytesRead) ~
("Records Read" -> taskMetrics.inputMetrics.recordsRead)
} else {
JNothing
}
val outputMetrics: JValue = if (taskMetrics.outputMetrics.isUpdated) {
val outputMetrics: JValue =
("Bytes Written" -> taskMetrics.outputMetrics.bytesWritten) ~
("Records Written" -> taskMetrics.outputMetrics.recordsWritten)
} else {
JNothing
}
val updatedBlocks =
JArray(taskMetrics.updatedBlockStatuses.toList.map { case (id, status) =>
("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
("Status" -> blockStatusToJson(status))
})
("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~
("Executor Run Time" -> taskMetrics.executorRunTime) ~
Expand Down
Loading