Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
/**
* Class that keeps track of the location of the map output of
* a stage. This is abstract because different versions of MapOutputTracker
* (driver and worker) use different HashMap to store its metadata.
* (driver and executor) use different HashMap to store its metadata.
*/
private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging {
private val timeout = AkkaUtils.askTimeout(conf)
Expand All @@ -81,11 +81,11 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
var trackerActor: ActorRef = _

/**
* This HashMap has different behavior for the master and the workers.
* This HashMap has different behavior for the driver and the executors.
*
* On the master, it serves as the source of map outputs recorded from ShuffleMapTasks.
* On the workers, it simply serves as a cache, in which a miss triggers a fetch from the
* master's corresponding HashMap.
* On the driver, it serves as the source of map outputs recorded from ShuffleMapTasks.
* On the executors, it simply serves as a cache, in which a miss triggers a fetch from the
* driver's corresponding HashMap.
*
* Note: because mapStatuses is accessed concurrently, subclasses should make sure it's a
* thread-safe map.
Expand All @@ -99,7 +99,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
protected var epoch: Long = 0
protected val epochLock = new AnyRef

/** Remembers which map output locations are currently being fetched on a worker. */
/** Remembers which map output locations are currently being fetched on a executor. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor grammar nit: "a -> an".

private val fetching = new HashSet[Int]

/**
Expand Down Expand Up @@ -198,8 +198,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging

/**
* Called from executors to update the epoch number, potentially clearing old outputs
* because of a fetch failure. Each worker task calls this with the latest epoch
* number on the master at the time it was created.
* because of a fetch failure. Each executor task calls this with the latest epoch
* number on the driver at the time it was created.
*/
def updateEpoch(newEpoch: Long) {
epochLock.synchronized {
Expand Down Expand Up @@ -231,7 +231,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
private var cacheEpoch = epoch

/**
* Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the master,
* Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the driver,
* so that statuses are dropped only by explicit de-registering or by TTL-based cleaning (if set).
* Other than these two scenarios, nothing should be dropped from this HashMap.
*/
Expand Down Expand Up @@ -341,7 +341,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
}

/**
* MapOutputTracker for the workers, which fetches map output information from the driver's
* MapOutputTracker for the executors, which fetches map output information from the driver's
* MapOutputTrackerMaster.
*/
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
Expand Down