diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 5f4ffa151d19b..1f540878e3415 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2009,6 +2009,45 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assertDataStructuresEmpty() } + test("accumulator not calculated for resubmitted task in shuffle map stage") { + val accum = AccumulatorSuite.createLongAccum("a") + // Create a shuffleMapRdd with 2 partition + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) + submit(reduceRdd, Array(0)) + val shuffleMapTaskSet = taskSets.head + + // finish the first shuffle map task + runEvent(makeCompletionEvent( + shuffleMapTaskSet.tasks(0), + Success, + makeMapStatus("hostA", 1), + Seq(AccumulatorSuite.createLongAccum("", initValue = 1, id = accum.id)))) + // finish the first shuffle map task again in different host + runEvent(makeCompletionEvent( + shuffleMapTaskSet.tasks(0), + Success, + makeMapStatus("hostB", 1), + Seq(AccumulatorSuite.createLongAccum("", initValue = 1, id = accum.id)))) + // finish the second shuffle map task + runEvent(makeCompletionEvent( + shuffleMapTaskSet.tasks(1), + Success, + makeMapStatus("hostC", 1))) + // map status overwrite by last success one + assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === + HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostC"))) + // check accumulator only updated once + assert(accum.value === 1) + + // complete reduce task and job finished success + val reduceTaskSet = taskSets(1) + complete(reduceTaskSet, Seq((Success, 42))) + assertDataStructuresEmpty() + } + test("accumulators are updated on exception failures and task killed") { val acc1 = AccumulatorSuite.createLongAccum("ingenieur") val acc2 = AccumulatorSuite.createLongAccum("boulanger")