Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private[spark] object InternalAccumulator {
* add to the same set of accumulators. We do this to report the distribution of accumulator
* values across all tasks within each stage.
*/
def create(sc: SparkContext): Seq[Accumulator[_]] = {
def createAll(sc: SparkContext): Seq[Accumulator[_]] = {
val accums = createAll()
accums.foreach { accum =>
Accumulators.register(accum)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,13 +950,6 @@ class DAGScheduler(
// First figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

// Create internal accumulators if the stage has no accumulators initialized.
// Reset internal accumulators only if this stage is not partially submitted
// Otherwise, we may override existing accumulator values from some tasks
if (stage.internalAccumulators.isEmpty || stage.numPartitions == partitionsToCompute.size) {
stage.resetInternalAccumulators()
}

// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
val properties = jobIdToActiveJob(jobId).properties
Expand Down Expand Up @@ -1036,7 +1029,7 @@ class DAGScheduler(
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.internalAccumulators, properties)
taskBinary, part, locs, stage.latestInfo.internalAccumulators, properties)
}

case stage: ResultStage =>
Expand All @@ -1046,7 +1039,7 @@ class DAGScheduler(
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.internalAccumulators)
taskBinary, part, locs, id, properties, stage.latestInfo.internalAccumulators)
}
}
} catch {
Expand Down
19 changes: 2 additions & 17 deletions core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,6 @@ private[scheduler] abstract class Stage(
val name: String = callSite.shortForm
val details: String = callSite.longForm

private var _internalAccumulators: Seq[Accumulator[_]] = Seq.empty

/** Internal accumulators shared across all tasks in this stage. */
def internalAccumulators: Seq[Accumulator[_]] = _internalAccumulators

/**
* Re-initialize the internal accumulators associated with this stage.
*
* This is called every time the stage is submitted, *except* when a subset of tasks
* belonging to this stage has already finished. Otherwise, reinitializing the internal
* accumulators here again will override partial values from the finished tasks.
*/
def resetInternalAccumulators(): Unit = {
_internalAccumulators = InternalAccumulator.create(rdd.sparkContext)
}

/**
* Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
* here, before any attempts have actually been created, because the DAGScheduler uses this
Expand Down Expand Up @@ -127,7 +111,8 @@ private[scheduler] abstract class Stage(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
_latestInfo = StageInfo.fromStage(
this, nextAttemptId, Some(numPartitionsToCompute), taskLocalityPreferences)
this, nextAttemptId, Some(numPartitionsToCompute),
InternalAccumulator.createAll(rdd.sparkContext), taskLocalityPreferences)
nextAttemptId += 1
}

Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.scheduler

import scala.collection.mutable.HashMap

import org.apache.spark.Accumulator
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.storage.RDDInfo

Expand All @@ -35,14 +36,19 @@ class StageInfo(
val rddInfos: Seq[RDDInfo],
val parentIds: Seq[Int],
val details: String,
val internalAccumulators: Seq[Accumulator[_]] = Seq.empty,
private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
/** Time when all tasks in the stage completed or when the stage was cancelled. */
var completionTime: Option[Long] = None
/** If the stage failed, the reason why. */
var failureReason: Option[String] = None
/** Terminal values of accumulables updated during this stage. */

/**
* Terminal values of accumulables updated during this stage, including all the user-defined
* accumulators.
*/
val accumulables = HashMap[Long, AccumulableInfo]()

def stageFailed(reason: String) {
Expand Down Expand Up @@ -75,6 +81,7 @@ private[spark] object StageInfo {
stage: Stage,
attemptId: Int,
numTasks: Option[Int] = None,
internalAccumulators: Seq[Accumulator[_]] = Seq.empty,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
): StageInfo = {
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
Expand All @@ -87,6 +94,7 @@ private[spark] object StageInfo {
rddInfos,
stage.parents.map(_.id),
stage.details,
internalAccumulators,
taskLocalityPreferences)
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
// This could be empty if the JobProgressListener hasn't received information about the
// stage or if the stage information has been garbage collected
listener.stageIdToInfo.getOrElse(stageId,
new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown"))
new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown", Seq.empty))
}

val activeStages = Buffer[StageInfo]()
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,9 @@ private[spark] object JsonProtocol {
// The "Stage Infos" field was added in Spark 1.2.0
val stageInfos = Utils.jsonOption(json \ "Stage Infos")
.map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown"))
stageIds.map { id =>
new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown", Seq.empty)
}
}
SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
}
Expand Down Expand Up @@ -686,7 +688,7 @@ private[spark] object JsonProtocol {
}

val stageInfo = new StageInfo(
stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details)
stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details, Seq.empty)
stageInfo.submissionTime = submissionTime
stageInfo.completionTime = completionTime
stageInfo.failureReason = failureReason
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -928,8 +928,8 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
numTasks: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
): StageInfo = {
new StageInfo(
stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details", taskLocalityPreferences)
new StageInfo(stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details",
Seq.empty, taskLocalityPreferences)
}

private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = {
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -337,15 +337,15 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
// first attempt -- its successful
val writer1 = manager.getWriter[Int, Int](shuffleHandle, 0,
new TaskContextImpl(0, 0, 0L, 0, taskMemoryManager, new Properties, metricsSystem,
InternalAccumulator.create(sc)))
InternalAccumulator.createAll(sc)))
val data1 = (1 to 10).map { x => x -> x}

// second attempt -- also successful. We'll write out different data,
// just to simulate the fact that the records may get written differently
// depending on what gets spilled, what gets combined, etc.
val writer2 = manager.getWriter[Int, Int](shuffleHandle, 0,
new TaskContextImpl(0, 0, 1L, 0, taskMemoryManager, new Properties, metricsSystem,
InternalAccumulator.create(sc)))
InternalAccumulator.createAll(sc)))
val data2 = (11 to 20).map { x => x -> x}

// interleave writes of both attempts -- we want to test that both attempts can occur
Expand Down Expand Up @@ -374,7 +374,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC

val reader = manager.getReader[Int, Int](shuffleHandle, 0, 1,
new TaskContextImpl(1, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem,
InternalAccumulator.create(sc)))
InternalAccumulator.createAll(sc)))
val readData = reader.read().toIndexedSeq
assert(readData === data1.toIndexedSeq || readData === data2.toIndexedSeq)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// SPARK-9809 -- this stage is submitted without a task for each partition (because some of
// the shuffle map output is still available from stage 0); make sure we've still got internal
// accumulators setup
assert(scheduler.stageIdToStage(2).internalAccumulators.nonEmpty)
assert(scheduler.stageIdToStage(2).latestInfo.internalAccumulators.nonEmpty)
completeShuffleMapStageSuccessfully(2, 0, 2)
completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
assert(results === Map(0 -> 1234, 1 -> 1235))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
}
val taskMemoryManager = new TaskMemoryManager(sc.env.memoryManager, 0)
val taskContext = new TaskContextImpl(
0, 0, 0, 0, taskMemoryManager, new Properties, null, InternalAccumulator.create(sc))
0, 0, 0, 0, taskMemoryManager, new Properties, null, InternalAccumulator.createAll(sc))

val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow](
taskContext,
Expand Down