Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
42ca72d
Change types of some signatures
Jan 18, 2016
2c62000
Boiler plate for all the new internal accums
Jan 18, 2016
1167722
Squashed commit of the following:
Jan 19, 2016
144df46
Implement TaskMetrics using Accumulators
Jan 19, 2016
5ec17c1
Fix accums not set on the driver
Jan 19, 2016
362cde5
Fix test compile
Jan 19, 2016
e43e8be
Make accum updates read from all registered accums
Jan 19, 2016
2330a37
Fix metrics being double counted on driver
Jan 19, 2016
cca87cc
Merge branch 'master' of github.com:apache/spark into task-metrics-us…
Jan 19, 2016
4ead1ba
Miscellaneous updates; make diff smaller
Jan 19, 2016
0f40753
Fix JsonProtocolSuite
Jan 19, 2016
76b605c
Fix SQLQuerySuite
Jan 19, 2016
7b5d840
Fix style
Jan 19, 2016
2069a78
Fix MiMa
Jan 19, 2016
d9813b1
Add test on accum values being zero'ed out + cleanups
Jan 19, 2016
40fd853
Add tests for TaskMetrics, which uncovered a bug
Jan 19, 2016
cdb3279
Minor test changes
Jan 19, 2016
8e46ee3
Merge branch 'master' of github.com:apache/spark into task-metrics-us…
Jan 19, 2016
628076d
Fix style
Jan 19, 2016
2a3cd27
Fix MiMa
Jan 19, 2016
67a1bee
Fix InputOutputMetricsSuite
Jan 20, 2016
ec6ea44
Fix TaskContextSuite
Jan 20, 2016
6355dbd
Fix ReplayListenerSuite
Jan 20, 2016
ed81584
Fix SparkListenerSuite
Jan 20, 2016
641f736
Merge branch 'master' of github.com:apache/spark into task-metrics-us…
Jan 20, 2016
4ca7328
Minor comment correction
Jan 20, 2016
17db1c9
Add test to verify internal accums are cleaned up
Jan 20, 2016
308db4c
Add deprecated matching methods for Input/OutputMetrics
Jan 20, 2016
a591119
Merge branch 'task-metrics-use-accums' of github.com:andrewor14/spark…
Jan 20, 2016
393ec19
Add "countFailedValues" flag in Accumulable[Info]
Jan 20, 2016
4c8bc90
Send back accum updates only, not TaskMetrics
Jan 20, 2016
54d7a18
Fix test compile
Jan 20, 2016
cc46a99
Fix tests
Jan 21, 2016
7547ceb
Reconstruct TaskMetrics on driver
Jan 21, 2016
2e458cc
Send accum updates instead of TaskMetrics on heartbeat
Jan 21, 2016
17457fc
Clean up a few TODOs that were introduced in SPARK-12895
Jan 21, 2016
4ab261d
Fix style
Jan 21, 2016
c62078a
Fix JsonProtocolSuite
Jan 21, 2016
891eebc
Add extra tests to AccumulatorSuite
Jan 21, 2016
52496d9
Fix InternalAccumulatorSuite
Jan 21, 2016
830c280
Add more tests in TaskContextSuite
Jan 21, 2016
cc25cee
Fix PartitionBatchPruningSuite
Jan 21, 2016
abde0ed
Merge branch 'master' of github.com:apache/spark into dont-send-task-…
Jan 21, 2016
1a590e6
Fix MiMa
Jan 21, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -444,13 +444,7 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
@Override
public Option<MapStatus> stop(boolean success) {
try {
// Update task metrics from accumulators (null in UnsafeShuffleWriterSuite)
Map<String, Accumulator<Object>> internalAccumulators =
taskContext.internalMetricsToAccumulators();
if (internalAccumulators != null) {
internalAccumulators.apply(InternalAccumulator.PEAK_EXECUTION_MEMORY())
.add(getPeakMemoryUsedBytes());
}
taskContext.taskMetrics().incPeakExecutionMemory(getPeakMemoryUsedBytes());

if (stopping) {
return Option.apply(null);
Expand Down
86 changes: 63 additions & 23 deletions core/src/main/scala/org/apache/spark/Accumulable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,40 +35,67 @@ import org.apache.spark.util.Utils
* [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are
* accumulating a set. You will add items to the set, and you will union two sets together.
*
* All accumulators created on the driver to be used on the executors must be registered with
* [[Accumulators]]. This is already done automatically for accumulators created by the user.
* Internal accumulators must be explicitly registered by the caller.
*
* Operations are not thread-safe.
*
* @param id ID of this accumulator; for internal use only.
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `R` and `T`
* @param name human-readable name for use in Spark's web UI
* @param internal if this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported
* to the driver via heartbeats. For internal [[Accumulable]]s, `R` must be
* thread safe so that they can be reported correctly.
* @param countFailedValues whether to accumulate values from failed tasks. This is set to true
* for system and time metrics like serialization time or bytes spilled,
* and false for things with absolute values like number of input rows.
* This should be used for internal metrics only.
* @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
*/
class Accumulable[R, T] private[spark] (
initialValue: R,
class Accumulable[R, T] private (
val id: Long,
@transient initialValue: R,
param: AccumulableParam[R, T],
val name: Option[String],
internal: Boolean)
internal: Boolean,
val countFailedValues: Boolean)
extends Serializable {

private[spark] def this(
@transient initialValue: R, param: AccumulableParam[R, T], internal: Boolean) = {
this(initialValue, param, None, internal)
initialValue: R,
param: AccumulableParam[R, T],
name: Option[String],
internal: Boolean,
countFailedValues: Boolean) = {
this(Accumulators.newId(), initialValue, param, name, internal, countFailedValues)
}

def this(@transient initialValue: R, param: AccumulableParam[R, T], name: Option[String]) =
this(initialValue, param, name, false)
private[spark] def this(
initialValue: R,
param: AccumulableParam[R, T],
name: Option[String],
internal: Boolean) = {
this(initialValue, param, name, internal, false /* countFailedValues */)
}

def this(@transient initialValue: R, param: AccumulableParam[R, T]) =
this(initialValue, param, None)
def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) =
this(initialValue, param, name, false /* internal */)

val id: Long = Accumulators.newId
def this(initialValue: R, param: AccumulableParam[R, T]) = this(initialValue, param, None)

@volatile @transient private var value_ : R = initialValue // Current value on master
val zero = param.zero(initialValue) // Zero value to be passed to workers
@volatile @transient private var value_ : R = initialValue // Current value on driver
val zero = param.zero(initialValue) // Zero value to be passed to executors
private var deserialized = false

Accumulators.register(this)
// In many places we create internal accumulators without access to the active context cleaner,
// so if we register them here then we may never unregister these accumulators. To avoid memory
// leaks, we require the caller to explicitly register internal accumulators elsewhere.
if (!internal) {
Accumulators.register(this)
}

/**
* If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported to the driver
Expand All @@ -77,6 +104,17 @@ class Accumulable[R, T] private[spark] (
*/
private[spark] def isInternal: Boolean = internal

/**
* Return a copy of this [[Accumulable]].
*
* The copy will have the same ID as the original and will not be registered with
* [[Accumulators]] again. This method exists so that the caller can avoid passing the
* same mutable instance around.
*/
private[spark] def copy(): Accumulable[R, T] = {
new Accumulable[R, T](id, initialValue, param, name, internal, countFailedValues)
}

/**
* Add more data to this accumulator / accumulable
* @param term the data to add
Expand Down Expand Up @@ -106,7 +144,7 @@ class Accumulable[R, T] private[spark] (
def merge(term: R) { value_ = param.addInPlace(value_, term)}

/**
* Access the accumulator's current value; only allowed on master.
* Access the accumulator's current value; only allowed on driver.
*/
def value: R = {
if (!deserialized) {
Expand All @@ -128,7 +166,7 @@ class Accumulable[R, T] private[spark] (
def localValue: R = value_

/**
* Set the accumulator's value; only allowed on master.
* Set the accumulator's value; only allowed on driver.
*/
def value_= (newValue: R) {
if (!deserialized) {
Expand All @@ -139,22 +177,24 @@ class Accumulable[R, T] private[spark] (
}

/**
* Set the accumulator's value; only allowed on master
* Set the accumulator's value. For internal use only.
*/
def setValue(newValue: R) {
this.value = newValue
}
def setValue(newValue: R): Unit = { value_ = newValue }

/**
* Set the accumulator's value. For internal use only.
*/
private[spark] def setValueAny(newValue: Any): Unit = { setValue(newValue.asInstanceOf[R]) }

// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
value_ = zero
deserialized = true

// Automatically register the accumulator when it is deserialized with the task closure.
//
// Note internal accumulators sent with task are deserialized before the TaskContext is created
// and are registered in the TaskContext constructor. Other internal accumulators, such SQL
// metrics, still need to register here.
// This is for external accumulators and internal ones that do not represent task level
// metrics, e.g. internal SQL metrics, which are per-operator.
val taskContext = TaskContext.get()
if (taskContext != null) {
taskContext.registerAccumulator(this)
Expand Down
103 changes: 76 additions & 27 deletions core/src/main/scala/org/apache/spark/Accumulator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

package org.apache.spark

import scala.collection.{mutable, Map}
import java.util.concurrent.atomic.AtomicLong
import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable
import scala.ref.WeakReference

import org.apache.spark.storage.{BlockId, BlockStatus}


/**
* A simpler value of [[Accumulable]] where the result type being accumulated is the same
Expand Down Expand Up @@ -49,14 +54,18 @@ import scala.ref.WeakReference
*
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `T`
* @param name human-readable name associated with this accumulator
* @param internal whether this accumulator is used internally within Spark only
* @param countFailedValues whether to accumulate values from failed tasks
* @tparam T result type
*/
class Accumulator[T] private[spark] (
@transient private[spark] val initialValue: T,
param: AccumulatorParam[T],
name: Option[String],
internal: Boolean)
extends Accumulable[T, T](initialValue, param, name, internal) {
internal: Boolean,
override val countFailedValues: Boolean = false)
extends Accumulable[T, T](initialValue, param, name, internal, countFailedValues) {

def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
this(initialValue, param, name, false)
Expand All @@ -75,43 +84,65 @@ private[spark] object Accumulators extends Logging {
* This global map holds the original accumulator objects that are created on the driver.
* It keeps weak references to these objects so that accumulators can be garbage-collected
* once the RDDs and user-code that reference them are cleaned up.
* TODO: Don't use a global map; these should be tied to a SparkContext at the very least.
*/
@GuardedBy("Accumulators")
val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()

private var lastId: Long = 0
private val nextId = new AtomicLong(0L)

def newId(): Long = synchronized {
lastId += 1
lastId
}
/**
* Return a globally unique ID for a new [[Accumulable]].
* Note: Once you copy the [[Accumulable]] the ID is no longer unique.
*/
def newId(): Long = nextId.getAndIncrement

/**
* Register an [[Accumulable]] created on the driver such that it can be used on the executors.
*
* All accumulators registered here can later be used as a container for accumulating partial
* values across multiple tasks. This is what [[org.apache.spark.scheduler.DAGScheduler]] does.
* Note: if an accumulator is registered here, it should also be registered with the active
* context cleaner for cleanup so as to avoid memory leaks.
*
* If an [[Accumulable]] with the same ID was already registered, do nothing instead of
* overwriting it. This happens when we copy accumulators, e.g. when we reconstruct
* [[org.apache.spark.executor.TaskMetrics]] from accumulator updates.
*/
def register(a: Accumulable[_, _]): Unit = synchronized {
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
if (!originals.contains(a.id)) {
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
}
}

def remove(accId: Long) {
synchronized {
originals.remove(accId)
}
/**
* Unregister the [[Accumulable]] with the given ID, if any.
*/
def remove(accId: Long): Unit = synchronized {
originals.remove(accId)
}

// Add values to the original accumulators with some given IDs
def add(values: Map[Long, Any]): Unit = synchronized {
for ((id, value) <- values) {
if (originals.contains(id)) {
// Since we are now storing weak references, we must check whether the underlying data
// is valid.
originals(id).get match {
case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
case None =>
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
}
} else {
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
/**
* Return the [[Accumulable]] registered with the given ID, if any.
*/
def get(id: Long): Option[Accumulable[_, _]] = synchronized {
originals.get(id).map { weakRef =>
// Since we are storing weak references, we must check whether the underlying data is valid.
weakRef.get match {
case Some(accum) => accum
case None =>
throw new IllegalAccessError(s"Attempted to access garbage collected accumulator $id")
}
}
}

/**
* Clear all registered [[Accumulable]]s. For testing only.
*/
def clear(): Unit = synchronized {
originals.clear()
}

}


Expand Down Expand Up @@ -156,5 +187,23 @@ object AccumulatorParam {
def zero(initialValue: Float): Float = 0f
}

// TODO: Add AccumulatorParams for other types, e.g. lists and strings
// Note: when merging values, this param just adopts the newer value. This is used only
// internally for things that shouldn't really be accumulated across tasks, like input
// read method, which should be the same across all tasks in the same stage.
private[spark] object StringAccumulatorParam extends AccumulatorParam[String] {
def addInPlace(t1: String, t2: String): String = t2
def zero(initialValue: String): String = ""
}

// Note: this is expensive as it makes a copy of the list every time the caller adds an item.
// A better way to use this is to first accumulate the values yourself then them all at once.
private[spark] class ListAccumulatorParam[T] extends AccumulatorParam[Seq[T]] {
def addInPlace(t1: Seq[T], t2: Seq[T]): Seq[T] = t1 ++ t2
def zero(initialValue: Seq[T]): Seq[T] = Seq.empty[T]
}

// For the internal metric that records what blocks are updated in a particular task
private[spark] object UpdatedBlockStatusesAccumulatorParam
extends ListAccumulatorParam[(BlockId, BlockStatus)]

}
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ case class Aggregator[K, V, C] (
Option(context).foreach { c =>
c.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
c.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
c.internalMetricsToAccumulators(
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
c.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
}
}
}
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
*/
private[spark] case class Heartbeat(
executorId: String,
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
accumUpdates: Array[(Long, Seq[AccumulableInfo])], // taskId -> accum updates
blockManagerId: BlockManagerId)

/**
Expand Down Expand Up @@ -119,14 +119,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
context.reply(true)

// Messages received from executors
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) =>
if (scheduler != null) {
if (executorLastSeen.contains(executorId)) {
executorLastSeen(executorId) = clock.getTimeMillis()
eventLoopThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
executorId, accumUpdates, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
context.reply(response)
}
Expand Down
Loading