Skip to content

Commit b5025d9

Browse files
committed
- Allow for finer control of cleaner
- Address review comments, move to incubator spark - Also includes a change to speculation - including preventing exceptions in rare cases.
1 parent 232765f commit b5025d9

File tree

12 files changed

+93
-29
lines changed

12 files changed

+93
-29
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import akka.util.Duration
3232

3333
import org.apache.spark.scheduler.MapStatus
3434
import org.apache.spark.storage.BlockManagerId
35-
import org.apache.spark.util.{Utils, MetadataCleaner, TimeStampedHashMap}
35+
import org.apache.spark.util.{MetadataCleanerType, Utils, MetadataCleaner, TimeStampedHashMap}
3636

3737

3838
private[spark] sealed trait MapOutputTrackerMessage
@@ -71,7 +71,7 @@ private[spark] class MapOutputTracker extends Logging {
7171
var cacheEpoch = epoch
7272
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]
7373

74-
val metadataCleaner = new MetadataCleaner("MapOutputTracker", this.cleanup)
74+
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup)
7575

7676
// Send a message to the trackerActor and get its result within a default timeout, or
7777
// throw a SparkException if this fails.

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,11 @@ import org.apache.spark.scheduler.local.LocalScheduler
6161
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
6262
import org.apache.spark.storage.{StorageUtils, BlockManagerSource}
6363
import org.apache.spark.ui.SparkUI
64-
import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, TimeStampedHashMap}
64+
import org.apache.spark.util._
65+
import org.apache.spark.scheduler.StageInfo
66+
import org.apache.spark.storage.RDDInfo
67+
import org.apache.spark.storage.StorageStatus
68+
import scala.Some
6569
import org.apache.spark.scheduler.StageInfo
6670
import org.apache.spark.storage.RDDInfo
6771
import org.apache.spark.storage.StorageStatus
@@ -116,7 +120,7 @@ class SparkContext(
116120

117121
// Keeps track of all persisted RDDs
118122
private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]
119-
private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)
123+
private[spark] val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup)
120124

121125
// Initalize the Spark UI
122126
private[spark] val ui = new SparkUI(this)

core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.collection.mutable.{ListBuffer, Map, Set}
2626
import scala.math
2727

2828
import org.apache.spark._
29-
import org.apache.spark.storage.StorageLevel
29+
import org.apache.spark.storage.{BlockManager, StorageLevel}
3030
import org.apache.spark.util.Utils
3131

3232
private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
@@ -36,7 +36,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
3636

3737
def value = value_
3838

39-
def blockId: String = "broadcast_" + id
39+
def blockId: String = BlockManager.toBroadcastId(id)
4040

4141
MultiTracker.synchronized {
4242
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,16 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
2525

2626
import org.apache.spark.{HttpServer, Logging, SparkEnv}
2727
import org.apache.spark.io.CompressionCodec
28-
import org.apache.spark.storage.StorageLevel
29-
import org.apache.spark.util.{Utils, MetadataCleaner, TimeStampedHashSet}
28+
import org.apache.spark.storage.{BlockManager, StorageLevel}
29+
import org.apache.spark.util.{MetadataCleanerType, Utils, MetadataCleaner, TimeStampedHashSet}
3030

3131

3232
private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
3333
extends Broadcast[T](id) with Logging with Serializable {
3434

3535
def value = value_
3636

37-
def blockId: String = "broadcast_" + id
37+
def blockId: String = BlockManager.toBroadcastId(id)
3838

3939
HttpBroadcast.synchronized {
4040
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
@@ -82,7 +82,7 @@ private object HttpBroadcast extends Logging {
8282
private var server: HttpServer = null
8383

8484
private val files = new TimeStampedHashSet[String]
85-
private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup)
85+
private val cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup)
8686

8787
private lazy val compressionCodec = CompressionCodec.createCodec()
8888

core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@ import scala.collection.mutable.{ListBuffer, Map, Set}
2525
import scala.math
2626

2727
import org.apache.spark._
28-
import org.apache.spark.storage.StorageLevel
28+
import org.apache.spark.storage.{BlockManager, StorageLevel}
2929
import org.apache.spark.util.Utils
3030

3131
private[spark] class TreeBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
3232
extends Broadcast[T](id) with Logging with Serializable {
3333

3434
def value = value_
3535

36-
def blockId = "broadcast_" + id
36+
def blockId = BlockManager.toBroadcastId(id)
3737

3838
MultiTracker.synchronized {
3939
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)

core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.File
2121

2222
import org.apache.spark.Logging
2323
import org.apache.spark.util.Utils
24+
import org.apache.spark.storage.ShuffleBlockManager
2425

2526

2627
private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging {
@@ -54,7 +55,7 @@ private[spark] object ShuffleSender {
5455

5556
val pResovler = new PathResolver {
5657
override def getAbsolutePath(blockId: String): String = {
57-
if (!blockId.startsWith("shuffle_")) {
58+
if (!ShuffleBlockManager.isShuffle(blockId)) {
5859
throw new Exception("Block " + blockId + " is not a shuffle block")
5960
}
6061
// Figure out which local directory it hashes to, and which subdirectory in that

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.rdd.RDD
2929
import org.apache.spark.executor.TaskMetrics
3030
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
3131
import org.apache.spark.storage.{BlockManager, BlockManagerMaster}
32-
import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
32+
import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
3333

3434
/**
3535
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
@@ -138,7 +138,7 @@ class DAGScheduler(
138138
val activeJobs = new HashSet[ActiveJob]
139139
val resultStageToJob = new HashMap[Stage, ActiveJob]
140140

141-
val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup)
141+
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup)
142142

143143
// Start a thread to run the DAGScheduler event loop
144144
def start() {

core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.zip.{GZIPInputStream, GZIPOutputStream}
2323
import org.apache.spark._
2424
import org.apache.spark.rdd.RDD
2525
import org.apache.spark.rdd.RDDCheckpointData
26-
import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
26+
import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
2727

2828
private[spark] object ResultTask {
2929

@@ -32,7 +32,7 @@ private[spark] object ResultTask {
3232
// expensive on the master node if it needs to launch thousands of tasks.
3333
val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
3434

35-
val metadataCleaner = new MetadataCleaner("ResultTask", serializedInfoCache.clearOldValues)
35+
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.RESULT_TASK, serializedInfoCache.clearOldValues)
3636

3737
def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = {
3838
synchronized {

core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.collection.mutable.HashMap
2525
import org.apache.spark._
2626
import org.apache.spark.executor.ShuffleWriteMetrics
2727
import org.apache.spark.storage._
28-
import org.apache.spark.util.{TimeStampedHashMap, MetadataCleaner}
28+
import org.apache.spark.util.{MetadataCleanerType, TimeStampedHashMap, MetadataCleaner}
2929
import org.apache.spark.rdd.RDD
3030
import org.apache.spark.rdd.RDDCheckpointData
3131

@@ -37,7 +37,7 @@ private[spark] object ShuffleMapTask {
3737
// expensive on the master node if it needs to launch thousands of tasks.
3838
val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
3939

40-
val metadataCleaner = new MetadataCleaner("ShuffleMapTask", serializedInfoCache.clearOldValues)
40+
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_MAP_TASK, serializedInfoCache.clearOldValues)
4141

4242
def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = {
4343
synchronized {

core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -630,11 +630,11 @@ private[spark] class ClusterTaskSetManager(
630630
var foundTasks = false
631631
val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
632632
logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
633-
if (tasksSuccessful >= minFinishedForSpeculation) {
633+
if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
634634
val time = clock.getTime()
635635
val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
636636
Arrays.sort(durations)
637-
val medianDuration = durations(min((0.5 * numTasks).round.toInt, durations.size - 1))
637+
val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.size - 1))
638638
val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 100)
639639
// TODO: Threshold should also look at standard deviation of task durations and have a lower
640640
// bound based on that.

0 commit comments

Comments
 (0)