Skip to content

Commit a369cb1

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into conf-set-translate
2 parents c26a9e3 + 2df5f1f commit a369cb1

File tree

45 files changed

+907
-342
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+907
-342
lines changed

LICENSE

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,22 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
771771
See the License for the specific language governing permissions and
772772
limitations under the License.
773773

774+
========================================================================
775+
For TestTimSort (core/src/test/java/org/apache/spark/util/collection/TestTimSort.java):
776+
========================================================================
777+
Copyright (C) 2015 Stijn de Gouw
778+
779+
Licensed under the Apache License, Version 2.0 (the "License");
780+
you may not use this file except in compliance with the License.
781+
You may obtain a copy of the License at
782+
783+
http://www.apache.org/licenses/LICENSE-2.0
784+
785+
Unless required by applicable law or agreed to in writing, software
786+
distributed under the License is distributed on an "AS IS" BASIS,
787+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
788+
See the License for the specific language governing permissions and
789+
limitations under the License.
774790

775791
========================================================================
776792
For LimitedInputStream

core/src/main/java/org/apache/spark/util/collection/TimSort.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -425,15 +425,14 @@ private void pushRun(int runBase, int runLen) {
425425
private void mergeCollapse() {
426426
while (stackSize > 1) {
427427
int n = stackSize - 2;
428-
if (n > 0 && runLen[n-1] <= runLen[n] + runLen[n+1]) {
428+
if ( (n >= 1 && runLen[n-1] <= runLen[n] + runLen[n+1])
429+
|| (n >= 2 && runLen[n-2] <= runLen[n] + runLen[n-1])) {
429430
if (runLen[n - 1] < runLen[n + 1])
430431
n--;
431-
mergeAt(n);
432-
} else if (runLen[n] <= runLen[n + 1]) {
433-
mergeAt(n);
434-
} else {
432+
} else if (runLen[n] > runLen[n + 1]) {
435433
break; // Invariant is established
436434
}
435+
mergeAt(n);
437436
}
438437
}
439438

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -280,15 +280,24 @@ object AccumulatorParam {
280280

281281
// TODO: The multi-thread support in accumulators is kind of lame; check
282282
// if there's a more intuitive way of doing it right
283-
private[spark] object Accumulators {
284-
// Store a WeakReference instead of a StrongReference because this way accumulators can be
285-
// appropriately garbage collected during long-running jobs and release memory
286-
type WeakAcc = WeakReference[Accumulable[_, _]]
287-
val originals = Map[Long, WeakAcc]()
288-
val localAccums = new ThreadLocal[Map[Long, WeakAcc]]() {
289-
override protected def initialValue() = Map[Long, WeakAcc]()
283+
private[spark] object Accumulators extends Logging {
284+
/**
285+
* This global map holds the original accumulator objects that are created on the driver.
286+
* It keeps weak references to these objects so that accumulators can be garbage-collected
287+
* once the RDDs and user-code that reference them are cleaned up.
288+
*/
289+
val originals = Map[Long, WeakReference[Accumulable[_, _]]]()
290+
291+
/**
292+
* This thread-local map holds per-task copies of accumulators; it is used to collect the set
293+
* of accumulator updates to send back to the driver when tasks complete. After tasks complete,
294+
* this map is cleared by `Accumulators.clear()` (see Executor.scala).
295+
*/
296+
private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
297+
override protected def initialValue() = Map[Long, Accumulable[_, _]]()
290298
}
291-
var lastId: Long = 0
299+
300+
private var lastId: Long = 0
292301

293302
def newId(): Long = synchronized {
294303
lastId += 1
@@ -297,16 +306,16 @@ private[spark] object Accumulators {
297306

298307
def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {
299308
if (original) {
300-
originals(a.id) = new WeakAcc(a)
309+
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
301310
} else {
302-
localAccums.get()(a.id) = new WeakAcc(a)
311+
localAccums.get()(a.id) = a
303312
}
304313
}
305314

306315
// Clear the local (non-original) accumulators for the current thread
307316
def clear() {
308317
synchronized {
309-
localAccums.get.clear
318+
localAccums.get.clear()
310319
}
311320
}
312321

@@ -320,12 +329,7 @@ private[spark] object Accumulators {
320329
def values: Map[Long, Any] = synchronized {
321330
val ret = Map[Long, Any]()
322331
for ((id, accum) <- localAccums.get) {
323-
// Since we are now storing weak references, we must check whether the underlying data
324-
// is valid.
325-
ret(id) = accum.get match {
326-
case Some(values) => values.localValue
327-
case None => None
328-
}
332+
ret(id) = accum.localValue
329333
}
330334
return ret
331335
}
@@ -341,6 +345,8 @@ private[spark] object Accumulators {
341345
case None =>
342346
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
343347
}
348+
} else {
349+
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
344350
}
345351
}
346352
}

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,86 @@
1717

1818
package org.apache.spark
1919

20-
import akka.actor.Actor
20+
import scala.concurrent.duration._
21+
import scala.collection.mutable
22+
23+
import akka.actor.{Actor, Cancellable}
24+
2125
import org.apache.spark.executor.TaskMetrics
2226
import org.apache.spark.storage.BlockManagerId
23-
import org.apache.spark.scheduler.TaskScheduler
27+
import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
2428
import org.apache.spark.util.ActorLogReceive
2529

2630
/**
2731
* A heartbeat from executors to the driver. This is a shared message used by several internal
28-
* components to convey liveness or execution information for in-progress tasks.
32+
* components to convey liveness or execution information for in-progress tasks. It will also
33+
* expire the hosts that have not heartbeated for more than spark.network.timeout.
2934
*/
3035
private[spark] case class Heartbeat(
3136
executorId: String,
3237
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
3338
blockManagerId: BlockManagerId)
3439

40+
private[spark] case object ExpireDeadHosts
41+
3542
private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
3643

3744
/**
3845
* Lives in the driver to receive heartbeats from executors..
3946
*/
40-
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
47+
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
4148
extends Actor with ActorLogReceive with Logging {
4249

50+
// executor ID -> timestamp of when the last heartbeat from this executor was received
51+
private val executorLastSeen = new mutable.HashMap[String, Long]
52+
53+
private val executorTimeoutMs = sc.conf.getLong("spark.network.timeout",
54+
sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120)) * 1000
55+
56+
private val checkTimeoutIntervalMs = sc.conf.getLong("spark.network.timeoutInterval",
57+
sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60)) * 1000
58+
59+
private var timeoutCheckingTask: Cancellable = null
60+
61+
override def preStart(): Unit = {
62+
import context.dispatcher
63+
timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
64+
checkTimeoutIntervalMs.milliseconds, self, ExpireDeadHosts)
65+
super.preStart()
66+
}
67+
4368
override def receiveWithLogging = {
4469
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
45-
val response = HeartbeatResponse(
46-
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
70+
val unknownExecutor = !scheduler.executorHeartbeatReceived(
71+
executorId, taskMetrics, blockManagerId)
72+
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
73+
executorLastSeen(executorId) = System.currentTimeMillis()
4774
sender ! response
75+
case ExpireDeadHosts =>
76+
expireDeadHosts()
77+
}
78+
79+
private def expireDeadHosts(): Unit = {
80+
logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.")
81+
val now = System.currentTimeMillis()
82+
for ((executorId, lastSeenMs) <- executorLastSeen) {
83+
if (now - lastSeenMs > executorTimeoutMs) {
84+
logWarning(s"Removing executor $executorId with no recent heartbeats: " +
85+
s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
86+
scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
87+
"timed out after ${now - lastSeenMs} ms"))
88+
if (sc.supportDynamicAllocation) {
89+
sc.killExecutor(executorId)
90+
}
91+
executorLastSeen.remove(executorId)
92+
}
93+
}
94+
}
95+
96+
override def postStop(): Unit = {
97+
if (timeoutCheckingTask != null) {
98+
timeoutCheckingTask.cancel()
99+
}
100+
super.postStop()
48101
}
49102
}

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ private[spark] object SparkConf extends Logging {
407407
* @param warn Whether to print a warning if the key is deprecated. Warnings will be printed
408408
* only once for each key.
409409
*/
410-
def translateConfKey(userKey: String, warn: Boolean = false): String = {
410+
private def translateConfKey(userKey: String, warn: Boolean = false): String = {
411411
deprecatedConfigs.get(userKey)
412412
.map { deprecatedKey =>
413413
if (warn) {

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
351351
private[spark] var (schedulerBackend, taskScheduler) =
352352
SparkContext.createTaskScheduler(this, master)
353353
private val heartbeatReceiver = env.actorSystem.actorOf(
354-
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
354+
Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver")
355355
@volatile private[spark] var dagScheduler: DAGScheduler = _
356356
try {
357357
dagScheduler = new DAGScheduler(this)
@@ -398,7 +398,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
398398
private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false)
399399
private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
400400
if (dynamicAllocationEnabled) {
401-
assert(master.contains("yarn") || dynamicAllocationTesting,
401+
assert(supportDynamicAllocation,
402402
"Dynamic allocation of executors is currently only supported in YARN mode")
403403
Some(new ExecutorAllocationManager(this, listenerBus, conf))
404404
} else {
@@ -1122,6 +1122,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
11221122
postEnvironmentUpdate()
11231123
}
11241124

1125+
/**
1126+
* Return whether dynamically adjusting the amount of resources allocated to
1127+
* this application is supported. This is currently only available for YARN.
1128+
*/
1129+
private[spark] def supportDynamicAllocation =
1130+
master.contains("yarn") || dynamicAllocationTesting
1131+
11251132
/**
11261133
* :: DeveloperApi ::
11271134
* Register a listener to receive up-calls from events that happen during execution.
@@ -1155,7 +1162,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
11551162
*/
11561163
@DeveloperApi
11571164
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
1158-
assert(master.contains("yarn") || dynamicAllocationTesting,
1165+
assert(supportDynamicAllocation,
11591166
"Requesting executors is currently only supported in YARN mode")
11601167
schedulerBackend match {
11611168
case b: CoarseGrainedSchedulerBackend =>
@@ -1173,7 +1180,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
11731180
*/
11741181
@DeveloperApi
11751182
override def killExecutors(executorIds: Seq[String]): Boolean = {
1176-
assert(master.contains("yarn") || dynamicAllocationTesting,
1183+
assert(supportDynamicAllocation,
11771184
"Killing executors is currently only supported in YARN mode")
11781185
schedulerBackend match {
11791186
case b: CoarseGrainedSchedulerBackend =>
@@ -1382,17 +1389,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
13821389
stopped = true
13831390
env.metricsSystem.report()
13841391
metadataCleaner.cancel()
1385-
env.actorSystem.stop(heartbeatReceiver)
13861392
cleaner.foreach(_.stop())
13871393
dagScheduler.stop()
13881394
dagScheduler = null
1395+
listenerBus.stop()
1396+
eventLogger.foreach(_.stop())
1397+
env.actorSystem.stop(heartbeatReceiver)
13891398
progressBar.foreach(_.stop())
13901399
taskScheduler = null
13911400
// TODO: Cache.stop()?
13921401
env.stop()
13931402
SparkEnv.set(null)
1394-
listenerBus.stop()
1395-
eventLogger.foreach(_.stop())
13961403
logInfo("Successfully stopped SparkContext")
13971404
SparkContext.clearActiveContext()
13981405
} else {

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -655,8 +655,7 @@ private[spark] object SparkSubmitUtils {
655655

656656
/**
657657
* Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
658-
* in the format `groupId:artifactId:version` or `groupId/artifactId:version`. The latter provides
659-
* simplicity for Spark Package users.
658+
* in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
660659
* @param coordinates Comma-delimited string of maven coordinates
661660
* @return Sequence of Maven coordinates
662661
*/
@@ -747,6 +746,35 @@ private[spark] object SparkSubmitUtils {
747746
md.addDependency(dd)
748747
}
749748
}
749+
750+
/** Add exclusion rules for dependencies already included in the spark-assembly */
751+
private[spark] def addExclusionRules(
752+
ivySettings: IvySettings,
753+
ivyConfName: String,
754+
md: DefaultModuleDescriptor): Unit = {
755+
// Add scala exclusion rule
756+
val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
757+
val scalaDependencyExcludeRule =
758+
new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
759+
scalaDependencyExcludeRule.addConfiguration(ivyConfName)
760+
md.addExcludeRule(scalaDependencyExcludeRule)
761+
762+
// We need to specify each component explicitly, otherwise we miss spark-streaming-kafka and
763+
// other spark-streaming utility components. Underscore is there to differentiate between
764+
// spark-streaming_2.1x and spark-streaming-kafka-assembly_2.1x
765+
val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
766+
"sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
767+
768+
components.foreach { comp =>
769+
val sparkArtifacts =
770+
new ArtifactId(new ModuleId("org.apache.spark", s"spark-$comp*"), "*", "*", "*")
771+
val sparkDependencyExcludeRule =
772+
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
773+
sparkDependencyExcludeRule.addConfiguration(ivyConfName)
774+
775+
md.addExcludeRule(sparkDependencyExcludeRule)
776+
}
777+
}
750778

751779
/** A nice function to use in tests as well. Values are dummy strings. */
752780
private[spark] def getModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
@@ -768,6 +796,9 @@ private[spark] object SparkSubmitUtils {
768796
if (coordinates == null || coordinates.trim.isEmpty) {
769797
""
770798
} else {
799+
val sysOut = System.out
800+
// To prevent ivy from logging to system out
801+
System.setOut(printStream)
771802
val artifacts = extractMavenCoordinates(coordinates)
772803
// Default configuration name for ivy
773804
val ivyConfName = "default"
@@ -811,19 +842,9 @@ private[spark] object SparkSubmitUtils {
811842
val md = getModuleDescriptor
812843
md.setDefaultConf(ivyConfName)
813844

814-
// Add an exclusion rule for Spark and Scala Library
815-
val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*")
816-
val sparkDependencyExcludeRule =
817-
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
818-
sparkDependencyExcludeRule.addConfiguration(ivyConfName)
819-
val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
820-
val scalaDependencyExcludeRule =
821-
new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
822-
scalaDependencyExcludeRule.addConfiguration(ivyConfName)
823-
824-
// Exclude any Spark dependencies, and add all supplied maven artifacts as dependencies
825-
md.addExcludeRule(sparkDependencyExcludeRule)
826-
md.addExcludeRule(scalaDependencyExcludeRule)
845+
// Add exclusion rules for Spark and Scala Library
846+
addExclusionRules(ivySettings, ivyConfName, md)
847+
// add all supplied maven artifacts as dependencies
827848
addDependenciesToIvy(md, artifacts, ivyConfName)
828849

829850
// resolve dependencies
@@ -835,7 +856,7 @@ private[spark] object SparkSubmitUtils {
835856
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
836857
packagesDirectory.getAbsolutePath + File.separator + "[artifact](-[classifier]).[ext]",
837858
retrieveOptions.setConfs(Array(ivyConfName)))
838-
859+
System.setOut(sysOut)
839860
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
840861
}
841862
}

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
4949

5050
// Interval between each check for event log updates
5151
private val UPDATE_INTERVAL_MS = conf.getOption("spark.history.fs.update.interval.seconds")
52-
.orElse(conf.getOption(SparkConf.translateConfKey("spark.history.fs.updateInterval", true)))
53-
.orElse(conf.getOption(SparkConf.translateConfKey("spark.history.updateInterval", true)))
52+
.orElse(conf.getOption("spark.history.fs.updateInterval"))
53+
.orElse(conf.getOption("spark.history.updateInterval"))
5454
.map(_.toInt)
5555
.getOrElse(10) * 1000
5656

0 commit comments

Comments
 (0)