Skip to content

Commit 51458ab

Browse files
committed
Added stageId <--> jobId mapping in DAGScheduler
...and make sure that DAGScheduler data structures are cleaned up on job completion. Initial effort and discussion at mesos/spark#842
1 parent 58d9bbc commit 51458ab

File tree

9 files changed

+286
-88
lines changed

9 files changed

+286
-88
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

+6-2
Original file line numberDiff line numberDiff line change
@@ -244,12 +244,12 @@ private[spark] class MapOutputTrackerMaster extends MapOutputTracker {
244244
case Some(bytes) =>
245245
return bytes
246246
case None =>
247-
statuses = mapStatuses(shuffleId)
247+
statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
248248
epochGotten = epoch
249249
}
250250
}
251251
// If we got here, we failed to find the serialized locations in the cache, so we pulled
252-
// out a snapshot of the locations as "locs"; let's serialize and return that
252+
// out a snapshot of the locations as "statuses"; let's serialize and return that
253253
val bytes = MapOutputTracker.serializeMapStatuses(statuses)
254254
logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
255255
// Add them into the table only if the epoch hasn't changed while we were working
@@ -274,6 +274,10 @@ private[spark] class MapOutputTrackerMaster extends MapOutputTracker {
274274
override def updateEpoch(newEpoch: Long) {
275275
// This might be called on the MapOutputTrackerMaster if we're running in local mode.
276276
}
277+
278+
def has(shuffleId: Int): Boolean = {
279+
cachedSerializedStatuses.get(shuffleId).isDefined || mapStatuses.contains(shuffleId)
280+
}
277281
}
278282

279283
private[spark] object MapOutputTracker {

0 commit comments

Comments
 (0)