@@ -25,7 +25,7 @@ import akka.actor.{PoisonPill, ActorRef, Actor}
2525import org .apache .spark .Logging
2626import org .apache .spark .util .{AkkaUtils , ActorLogReceive }
2727
28- private [spark] sealed trait OutputCommitCoordinationMessage
28+ private [spark] sealed trait OutputCommitCoordinationMessage extends Serializable
2929
3030private [spark] case class StageStarted (stage : Int ) extends OutputCommitCoordinationMessage
3131private [spark] case class StageEnded (stage : Int ) extends OutputCommitCoordinationMessage
@@ -54,14 +54,14 @@ private[spark] class OutputCommitCoordinator extends Logging {
5454 // Initialized by SparkEnv
5555 var coordinatorActor : ActorRef = _
5656
57- // TODO: handling stage attempt ids?
5857 private type StageId = Int
5958 private type TaskId = Long
6059 private type TaskAttemptId = Long
6160
6261 private val authorizedCommittersByStage :
6362 mutable.Map [StageId , mutable.Map [TaskId , TaskAttemptId ]] = mutable.HashMap ()
6463
64+
6565 def stageStart (stage : StageId ) {
6666 coordinatorActor ! StageStarted (stage)
6767 }
@@ -102,7 +102,6 @@ private[spark] class OutputCommitCoordinator extends Logging {
102102 }
103103
104104 private def handleStageStart (stage : StageId ): Unit = {
105- // TODO: assert that we're not overwriting an existing entry?
106105 authorizedCommittersByStage(stage) = mutable.HashMap [TaskId , TaskAttemptId ]()
107106 }
108107
0 commit comments