Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 0 additions & 127 deletions core/src/main/scala/org/apache/spark/InternalAccumulator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,11 @@

package org.apache.spark

import org.apache.spark.storage.{BlockId, BlockStatus}


/**
* A collection of fields and methods concerned with internal accumulators that represent
* task level metrics.
*/
private[spark] object InternalAccumulator {

import AccumulatorParam._

// Prefixes used in names of internal task level metrics
val METRICS_PREFIX = "internal.metrics."
val SHUFFLE_READ_METRICS_PREFIX = METRICS_PREFIX + "shuffle.read."
Expand Down Expand Up @@ -79,125 +73,4 @@ private[spark] object InternalAccumulator {
}

// scalastyle:on

/**
* Create an internal [[Accumulator]] by name, which must begin with [[METRICS_PREFIX]].
*/
def create(name: String): Accumulator[_] = {
require(name.startsWith(METRICS_PREFIX),
s"internal accumulator name must start with '$METRICS_PREFIX': $name")
getParam(name) match {
case p @ LongAccumulatorParam => newMetric[Long](0L, name, p)
case p @ IntAccumulatorParam => newMetric[Int](0, name, p)
case p @ StringAccumulatorParam => newMetric[String]("", name, p)
case p @ UpdatedBlockStatusesAccumulatorParam =>
newMetric[Seq[(BlockId, BlockStatus)]](Seq(), name, p)
case p => throw new IllegalArgumentException(
s"unsupported accumulator param '${p.getClass.getSimpleName}' for metric '$name'.")
}
}

/**
* Get the [[AccumulatorParam]] associated with the internal metric name,
* which must begin with [[METRICS_PREFIX]].
*/
def getParam(name: String): AccumulatorParam[_] = {
require(name.startsWith(METRICS_PREFIX),
s"internal accumulator name must start with '$METRICS_PREFIX': $name")
name match {
case UPDATED_BLOCK_STATUSES => UpdatedBlockStatusesAccumulatorParam
case shuffleRead.LOCAL_BLOCKS_FETCHED => IntAccumulatorParam
case shuffleRead.REMOTE_BLOCKS_FETCHED => IntAccumulatorParam
case _ => LongAccumulatorParam
}
}

/**
* Accumulators for tracking internal metrics.
*/
def createAll(): Seq[Accumulator[_]] = {
Seq[String](
EXECUTOR_DESERIALIZE_TIME,
EXECUTOR_RUN_TIME,
RESULT_SIZE,
JVM_GC_TIME,
RESULT_SERIALIZATION_TIME,
MEMORY_BYTES_SPILLED,
DISK_BYTES_SPILLED,
PEAK_EXECUTION_MEMORY,
UPDATED_BLOCK_STATUSES).map(create) ++
createShuffleReadAccums() ++
createShuffleWriteAccums() ++
createInputAccums() ++
createOutputAccums() ++
sys.props.get("spark.testing").map(_ => create(TEST_ACCUM)).toSeq
}

/**
* Accumulators for tracking shuffle read metrics.
*/
def createShuffleReadAccums(): Seq[Accumulator[_]] = {
Seq[String](
shuffleRead.REMOTE_BLOCKS_FETCHED,
shuffleRead.LOCAL_BLOCKS_FETCHED,
shuffleRead.REMOTE_BYTES_READ,
shuffleRead.LOCAL_BYTES_READ,
shuffleRead.FETCH_WAIT_TIME,
shuffleRead.RECORDS_READ).map(create)
}

/**
* Accumulators for tracking shuffle write metrics.
*/
def createShuffleWriteAccums(): Seq[Accumulator[_]] = {
Seq[String](
shuffleWrite.BYTES_WRITTEN,
shuffleWrite.RECORDS_WRITTEN,
shuffleWrite.WRITE_TIME).map(create)
}

/**
* Accumulators for tracking input metrics.
*/
def createInputAccums(): Seq[Accumulator[_]] = {
Seq[String](
input.BYTES_READ,
input.RECORDS_READ).map(create)
}

/**
* Accumulators for tracking output metrics.
*/
def createOutputAccums(): Seq[Accumulator[_]] = {
Seq[String](
output.BYTES_WRITTEN,
output.RECORDS_WRITTEN).map(create)
}

/**
* Accumulators for tracking internal metrics.
*
* These accumulators are created with the stage such that all tasks in the stage will
* add to the same set of accumulators. We do this to report the distribution of accumulator
* values across all tasks within each stage.
*/
def createAll(sc: SparkContext): Seq[Accumulator[_]] = {
val accums = createAll()
accums.foreach { accum =>
Accumulators.register(accum)
sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum))
}
accums
}

/**
* Create a new accumulator representing an internal task metric.
*/
private def newMetric[T](
initialValue: T,
name: String,
param: AccumulatorParam[T]): Accumulator[T] = {
new Accumulator[T](initialValue, param, Some(name), internal = true, countFailedValues = true)
}

}
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,11 @@ object TaskContext {
protected[spark] def unset(): Unit = taskContext.remove()

/**
* An empty task context that does not represent an actual task.
* An empty task context that does not represent an actual task. This is only used in tests.
*/
private[spark] def empty(): TaskContextImpl = {
new TaskContextImpl(0, 0, 0, 0, null, new Properties, null)
}

}


Expand Down
7 changes: 1 addition & 6 deletions core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,10 @@ private[spark] class TaskContextImpl(
override val taskMemoryManager: TaskMemoryManager,
localProperties: Properties,
@transient private val metricsSystem: MetricsSystem,
initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.createAll())
override val taskMetrics: TaskMetrics = new TaskMetrics)
extends TaskContext
with Logging {

/**
* Metrics associated with this task.
*/
override val taskMetrics: TaskMetrics = new TaskMetrics(initialAccumulators)

/** List of callback functions to execute when the task completes. */
@transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener]

Expand Down
32 changes: 13 additions & 19 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -293,16 +293,14 @@ private[spark] class Executor(
val valueBytes = resultSer.serialize(value)
val afterSerialization = System.currentTimeMillis()

for (m <- task.metrics) {
// Deserialization happens in two parts: first, we deserialize a Task object, which
// includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
m.setExecutorDeserializeTime(
(taskStart - deserializeStartTime) + task.executorDeserializeTime)
// We need to subtract Task.run()'s deserialization time to avoid double-counting
m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
m.setResultSerializationTime(afterSerialization - beforeSerialization)
}
// Deserialization happens in two parts: first, we deserialize a Task object, which
// includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
task.metrics.setExecutorDeserializeTime(
(taskStart - deserializeStartTime) + task.executorDeserializeTime)
// We need to subtract Task.run()'s deserialization time to avoid double-counting
task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization)

// Note: accumulator updates must be collected after TaskMetrics is updated
val accumUpdates = task.collectAccumulatorUpdates()
Expand Down Expand Up @@ -357,10 +355,8 @@ private[spark] class Executor(
// Collect latest accumulator values to report back to the driver
val accumulatorUpdates: Seq[AccumulableInfo] =
if (task != null) {
task.metrics.foreach { m =>
m.setExecutorRunTime(System.currentTimeMillis() - taskStart)
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
}
task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
task.collectAccumulatorUpdates(taskFailed = true)
} else {
Seq.empty[AccumulableInfo]
Expand Down Expand Up @@ -485,11 +481,9 @@ private[spark] class Executor(

for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner.task != null) {
taskRunner.task.metrics.foreach { metrics =>
metrics.mergeShuffleReadMetrics()
metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
accumUpdates += ((taskRunner.taskId, metrics.accumulatorUpdates()))
}
taskRunner.task.metrics.mergeShuffleReadMetrics()
taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulatorUpdates()))
}
}

Expand Down
14 changes: 5 additions & 9 deletions core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.executor

import org.apache.spark.{Accumulator, InternalAccumulator}
import org.apache.spark.InternalAccumulator
import org.apache.spark.annotation.DeveloperApi


Expand All @@ -39,14 +39,11 @@ object DataReadMethod extends Enumeration with Serializable {
* A collection of accumulators that represents metrics about reading data from external systems.
*/
@DeveloperApi
class InputMetrics private (_bytesRead: Accumulator[Long], _recordsRead: Accumulator[Long])
extends Serializable {
class InputMetrics private[spark] () extends Serializable {
import InternalAccumulator._

private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
this(
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.BYTES_READ),
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.RECORDS_READ))
}
private[executor] val _bytesRead = TaskMetrics.createLongAccum(input.BYTES_READ)
private[executor] val _recordsRead = TaskMetrics.createLongAccum(input.RECORDS_READ)

/**
* Total number of bytes read.
Expand All @@ -61,5 +58,4 @@ class InputMetrics private (_bytesRead: Accumulator[Long], _recordsRead: Accumul
private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v)
private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.executor

import org.apache.spark.{Accumulator, InternalAccumulator}
import org.apache.spark.InternalAccumulator
import org.apache.spark.annotation.DeveloperApi


Expand All @@ -38,14 +38,11 @@ object DataWriteMethod extends Enumeration with Serializable {
* A collection of accumulators that represents metrics about writing data to external systems.
*/
@DeveloperApi
class OutputMetrics private (_bytesWritten: Accumulator[Long], _recordsWritten: Accumulator[Long])
extends Serializable {

private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
this(
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.BYTES_WRITTEN),
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.RECORDS_WRITTEN))
}
class OutputMetrics private[spark] () extends Serializable {
import InternalAccumulator._

private[executor] val _bytesWritten = TaskMetrics.createLongAccum(output.BYTES_WRITTEN)
private[executor] val _recordsWritten = TaskMetrics.createLongAccum(output.RECORDS_WRITTEN)

/**
* Total number of bytes written.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.executor

import org.apache.spark.{Accumulator, InternalAccumulator}
import org.apache.spark.InternalAccumulator
import org.apache.spark.annotation.DeveloperApi


Expand All @@ -27,38 +27,21 @@ import org.apache.spark.annotation.DeveloperApi
* Operations are not thread-safe.
*/
@DeveloperApi
class ShuffleReadMetrics private (
_remoteBlocksFetched: Accumulator[Int],
_localBlocksFetched: Accumulator[Int],
_remoteBytesRead: Accumulator[Long],
_localBytesRead: Accumulator[Long],
_fetchWaitTime: Accumulator[Long],
_recordsRead: Accumulator[Long])
extends Serializable {

private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
this(
TaskMetrics.getAccum[Int](accumMap, InternalAccumulator.shuffleRead.REMOTE_BLOCKS_FETCHED),
TaskMetrics.getAccum[Int](accumMap, InternalAccumulator.shuffleRead.LOCAL_BLOCKS_FETCHED),
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.REMOTE_BYTES_READ),
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.LOCAL_BYTES_READ),
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.FETCH_WAIT_TIME),
TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.RECORDS_READ))
}

/**
* Create a new [[ShuffleReadMetrics]] that is not associated with any particular task.
*
* This mainly exists for legacy reasons, because we use dummy [[ShuffleReadMetrics]] in
* many places only to merge their values together later. In the future, we should revisit
* whether this is needed.
*
* A better alternative is [[TaskMetrics.createTempShuffleReadMetrics]] followed by
* [[TaskMetrics.mergeShuffleReadMetrics]].
*/
private[spark] def this() {
this(InternalAccumulator.createShuffleReadAccums().map { a => (a.name.get, a) }.toMap)
}
class ShuffleReadMetrics private[spark] () extends Serializable {
import InternalAccumulator._

private[executor] val _remoteBlocksFetched =
TaskMetrics.createIntAccum(shuffleRead.REMOTE_BLOCKS_FETCHED)
private[executor] val _localBlocksFetched =
TaskMetrics.createIntAccum(shuffleRead.LOCAL_BLOCKS_FETCHED)
private[executor] val _remoteBytesRead =
TaskMetrics.createLongAccum(shuffleRead.REMOTE_BYTES_READ)
private[executor] val _localBytesRead =
TaskMetrics.createLongAccum(shuffleRead.LOCAL_BYTES_READ)
private[executor] val _fetchWaitTime =
TaskMetrics.createLongAccum(shuffleRead.FETCH_WAIT_TIME)
private[executor] val _recordsRead =
TaskMetrics.createLongAccum(shuffleRead.RECORDS_READ)

/**
* Number of remote blocks fetched in this shuffle by this task.
Expand Down
Loading