diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 9495cd2835f9..4dbb1e437a90 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -154,6 +154,11 @@ package object config { ConfigBuilder("spark.blacklist.application.fetchFailure.enabled") .booleanConf .createWithDefault(false) + + private[spark] val BLACKLIST_ALWAYSBLACKLISTEDNODES_CONF = + ConfigBuilder("spark.blacklist.alwaysBlacklistedNodes") + .stringConf + .createOptional // End blacklist confs private[spark] val UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE = diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index cd8e61d6d020..b0a11f572109 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -93,6 +93,21 @@ private[scheduler] class BlacklistTracker ( */ val nodeToBlacklistedExecs = new HashMap[String, HashSet[String]]() + /** + * Blacklists permanently the nodes listed in spark.blacklist.alwaysBlacklistedNodes + * The blacklist timeout is set to a large value, effectively never expiring. + */ + private val permanentlyBlacklistedNodes = BlacklistTracker.getBlacklistedNodes(conf) + if (permanentlyBlacklistedNodes.nonEmpty) { + val now = clock.getTimeMillis() + for (nodeName <- permanentlyBlacklistedNodes) { + nodeIdToBlacklistExpiryTime.put(nodeName, Long.MaxValue) + listenerBus.post(SparkListenerNodeBlacklisted(now, nodeName, 0)) + logWarning(s"Permanently blacklisted node $nodeName") + } + _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) + } + /** * Un-blacklists executors and nodes that have been blacklisted for at least * BLACKLIST_TIMEOUT_MILLIS @@ -409,6 +424,15 @@ private[scheduler] object BlacklistTracker extends Logging { } } + // Return a set of node names from the config spark.blacklist.alwaysBlacklistedNodes + def getBlacklistedNodes(conf: SparkConf): Set[String] = { + val listNodes = conf.get(config.BLACKLIST_ALWAYSBLACKLISTEDNODES_CONF) + listNodes match { + case Some(nodes) => (nodes + ",").split(',').map(_.trim).toSet + case None => Set() + } + } + /** * Verify that blacklist configurations are consistent; if not, throw an exception. Should only * be called if blacklisting is enabled. diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index 520d85a29892..84bf5b3e537a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -585,4 +585,16 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M 2000 + blacklist.BLACKLIST_TIMEOUT_MILLIS) assert(blacklist.nextExpiryTime === 1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS) } + + test("Nodes can be permanently blacklisted, SPARK-21829") { + val blacklistedNodes = "hostA, hostB" + conf.set("spark.blacklist.alwaysBlacklistedNodes", blacklistedNodes) + + val allocationClientMock = mock[ExecutorAllocationClient] + blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock) + for (nodeName <- blacklistedNodes.split(',').map(_.trim)) { + assert(blacklist.nodeIdToBlacklistExpiryTime.contains(nodeName)) + (blacklist.nodeIdToBlacklistExpiryTime.get(nodeName) === Long.MaxValue) + } + } } diff --git a/docs/configuration.md b/docs/configuration.md index e7c0306920e0..ccad7b97b2ed 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1528,6 +1528,17 @@ Apart from these, the following properties are also available, and may be useful blacklisted. +
spark.blacklist.alwaysBlacklistedNodesspark.speculation