Skip to content

Commit 7beb227

Browse files
erenavsarogullarisquito
authored andcommitted
[SPARK-17663][CORE] SchedulableBuilder should handle invalid data access via scheduler.allocation.file
## What changes were proposed in this pull request? If `spark.scheduler.allocation.file` has invalid `minShare` or/and `weight` values, these cause : - `NumberFormatException` due to `toInt` function - `SparkContext` can not be initialized. - It does not show meaningful error message to user. In a nutshell, this functionality can be more robust by selecting one of the following flows : **1-** Currently, if `schedulingMode` has an invalid value, a warning message is logged and default value is set as `FIFO`. Same pattern can be used for `minShare`(default: 0) and `weight`(default: 1) as well **2-** Meaningful error message can be shown to the user for all invalid cases. PR offers : - `schedulingMode` handles just empty values. It also needs to be supported for **whitespace**, **non-uppercase**(fair, FaIr etc...) or `SchedulingMode.NONE` cases by setting default value(`FIFO`) - `minShare` and `weight` handle just empty values. They also need to be supported for **non-integer** cases by setting default values. - Some refactoring of `PoolSuite`. **Code to Reproduce :** ``` val conf = new SparkConf().setAppName("spark-fairscheduler").setMaster("local") conf.set("spark.scheduler.mode", "FAIR") conf.set("spark.scheduler.allocation.file", "src/main/resources/fairscheduler-invalid-data.xml") val sc = new SparkContext(conf) ``` **fairscheduler-invalid-data.xml :** ``` <allocations> <pool name="production"> <schedulingMode>FIFO</schedulingMode> <weight>invalid_weight</weight> <minShare>2</minShare> </pool> </allocations> ``` **Stacktrace :** ``` Exception in thread "main" java.lang.NumberFormatException: For input string: "invalid_weight" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272) at scala.collection.immutable.StringOps.toInt(StringOps.scala:29) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:127) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:102) ``` ## How was this patch tested? Added Unit Test Case. Author: erenavsarogullari <erenavsarogullari@gmail.com> Closes #15237 from erenavsarogullari/SPARK-17663.
1 parent 7730426 commit 7beb227

File tree

3 files changed

+182
-51
lines changed

3 files changed

+182
-51
lines changed

core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ package org.apache.spark.scheduler
2020
import java.io.{FileInputStream, InputStream}
2121
import java.util.{NoSuchElementException, Properties}
2222

23-
import scala.xml.XML
23+
import scala.xml.{Node, XML}
2424

2525
import org.apache.spark.SparkConf
2626
import org.apache.spark.internal.Logging
27+
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
2728
import org.apache.spark.util.Utils
2829

2930
/**
@@ -102,38 +103,57 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
102103
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
103104

104105
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
105-
var schedulingMode = DEFAULT_SCHEDULING_MODE
106-
var minShare = DEFAULT_MINIMUM_SHARE
107-
var weight = DEFAULT_WEIGHT
108-
109-
val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
110-
if (xmlSchedulingMode != "") {
111-
try {
112-
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
113-
} catch {
114-
case e: NoSuchElementException =>
115-
logWarning(s"Unsupported schedulingMode: $xmlSchedulingMode, " +
116-
s"using the default schedulingMode: $schedulingMode")
117-
}
118-
}
119106

120-
val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
121-
if (xmlMinShare != "") {
122-
minShare = xmlMinShare.toInt
123-
}
107+
val schedulingMode = getSchedulingModeValue(poolNode, poolName, DEFAULT_SCHEDULING_MODE)
108+
val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY, DEFAULT_MINIMUM_SHARE)
109+
val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY, DEFAULT_WEIGHT)
124110

125-
val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
126-
if (xmlWeight != "") {
127-
weight = xmlWeight.toInt
128-
}
111+
rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight))
129112

130-
val pool = new Pool(poolName, schedulingMode, minShare, weight)
131-
rootPool.addSchedulable(pool)
132113
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
133114
poolName, schedulingMode, minShare, weight))
134115
}
135116
}
136117

118+
private def getSchedulingModeValue(
119+
poolNode: Node,
120+
poolName: String,
121+
defaultValue: SchedulingMode): SchedulingMode = {
122+
123+
val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase
124+
val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode, using the default " +
125+
s"schedulingMode: $defaultValue for pool: $poolName"
126+
try {
127+
if (SchedulingMode.withName(xmlSchedulingMode) != SchedulingMode.NONE) {
128+
SchedulingMode.withName(xmlSchedulingMode)
129+
} else {
130+
logWarning(warningMessage)
131+
defaultValue
132+
}
133+
} catch {
134+
case e: NoSuchElementException =>
135+
logWarning(warningMessage)
136+
defaultValue
137+
}
138+
}
139+
140+
private def getIntValue(
141+
poolNode: Node,
142+
poolName: String,
143+
propertyName: String, defaultValue: Int): Int = {
144+
145+
val data = (poolNode \ propertyName).text.trim
146+
try {
147+
data.toInt
148+
} catch {
149+
case e: NumberFormatException =>
150+
logWarning(s"Error while loading scheduler allocation file. " +
151+
s"$propertyName is blank or invalid: $data, using the default $propertyName: " +
152+
s"$defaultValue for pool: $poolName")
153+
defaultValue
154+
}
155+
}
156+
137157
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
138158
var poolName = DEFAULT_POOL_NAME
139159
var parentPool = rootPool.getSchedulableByName(poolName)
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?xml version="1.0"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
19+
<allocations>
20+
<pool name="pool_with_invalid_min_share">
21+
<minShare>INVALID_MIN_SHARE</minShare>
22+
<weight>2</weight>
23+
<schedulingMode>FAIR</schedulingMode>
24+
</pool>
25+
<pool name="pool_with_invalid_weight">
26+
<minShare>1</minShare>
27+
<weight>INVALID_WEIGHT</weight>
28+
<schedulingMode>FAIR</schedulingMode>
29+
</pool>
30+
<pool name="pool_with_invalid_scheduling_mode">
31+
<minShare>3</minShare>
32+
<weight>2</weight>
33+
<schedulingMode>INVALID_SCHEDULING_MODE</schedulingMode>
34+
</pool>
35+
<pool name="pool_with_non_uppercase_scheduling_mode">
36+
<minShare>2</minShare>
37+
<weight>1</weight>
38+
<schedulingMode>fair</schedulingMode>
39+
</pool>
40+
<pool name="pool_with_NONE_scheduling_mode">
41+
<minShare>1</minShare>
42+
<weight>2</weight>
43+
<schedulingMode>NONE</schedulingMode>
44+
</pool>
45+
<pool name="pool_with_whitespace_min_share">
46+
<minShare> </minShare>
47+
<weight>2</weight>
48+
<schedulingMode>FAIR</schedulingMode>
49+
</pool>
50+
<pool name="pool_with_whitespace_weight">
51+
<minShare>1</minShare>
52+
<weight> </weight>
53+
<schedulingMode>FAIR</schedulingMode>
54+
</pool>
55+
<pool name="pool_with_whitespace_scheduling_mode">
56+
<minShare>3</minShare>
57+
<weight>2</weight>
58+
<schedulingMode> </schedulingMode>
59+
</pool>
60+
<pool name="pool_with_empty_min_share">
61+
<minShare></minShare>
62+
<weight>3</weight>
63+
<schedulingMode>FAIR</schedulingMode>
64+
</pool>
65+
<pool name="pool_with_empty_weight">
66+
<minShare>2</minShare>
67+
<weight></weight>
68+
<schedulingMode>FAIR</schedulingMode>
69+
</pool>
70+
<pool name="pool_with_empty_scheduling_mode">
71+
<minShare>2</minShare>
72+
<weight>2</weight>
73+
<schedulingMode></schedulingMode>
74+
</pool>
75+
<pool name="pool_with_surrounded_whitespace">
76+
<minShare> 3 </minShare>
77+
<weight> 2 </weight>
78+
<schedulingMode> FAIR </schedulingMode>
79+
</pool>
80+
</allocations>

core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala

Lines changed: 57 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,18 @@ package org.apache.spark.scheduler
2020
import java.util.Properties
2121

2222
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
23+
import org.apache.spark.scheduler.SchedulingMode._
2324

2425
/**
2526
* Tests that pools and the associated scheduling algorithms for FIFO and fair scheduling work
2627
* correctly.
2728
*/
2829
class PoolSuite extends SparkFunSuite with LocalSparkContext {
2930

31+
val LOCAL = "local"
32+
val APP_NAME = "PoolSuite"
33+
val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file"
34+
3035
def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl)
3136
: TaskSetManager = {
3237
val tasks = Array.tabulate[Task[_]](numTasks) { i =>
@@ -45,12 +50,11 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
4550
}
4651

4752
test("FIFO Scheduler Test") {
48-
sc = new SparkContext("local", "TaskSchedulerImplSuite")
53+
sc = new SparkContext(LOCAL, APP_NAME)
4954
val taskScheduler = new TaskSchedulerImpl(sc)
5055

51-
val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
56+
val rootPool = new Pool("", FIFO, 0, 0)
5257
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
53-
schedulableBuilder.buildPools()
5458

5559
val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler)
5660
val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler)
@@ -74,30 +78,24 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
7478
*/
7579
test("Fair Scheduler Test") {
7680
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
77-
val conf = new SparkConf().set("spark.scheduler.allocation.file", xmlPath)
78-
sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
81+
val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath)
82+
sc = new SparkContext(LOCAL, APP_NAME, conf)
7983
val taskScheduler = new TaskSchedulerImpl(sc)
8084

81-
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
85+
val rootPool = new Pool("", FAIR, 0, 0)
8286
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
8387
schedulableBuilder.buildPools()
8488

8589
// Ensure that the XML file was read in correctly.
86-
assert(rootPool.getSchedulableByName("default") != null)
87-
assert(rootPool.getSchedulableByName("1") != null)
88-
assert(rootPool.getSchedulableByName("2") != null)
89-
assert(rootPool.getSchedulableByName("3") != null)
90-
assert(rootPool.getSchedulableByName("1").minShare === 2)
91-
assert(rootPool.getSchedulableByName("1").weight === 1)
92-
assert(rootPool.getSchedulableByName("2").minShare === 3)
93-
assert(rootPool.getSchedulableByName("2").weight === 1)
94-
assert(rootPool.getSchedulableByName("3").minShare === 0)
95-
assert(rootPool.getSchedulableByName("3").weight === 1)
90+
verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
91+
verifyPool(rootPool, "1", 2, 1, FIFO)
92+
verifyPool(rootPool, "2", 3, 1, FIFO)
93+
verifyPool(rootPool, "3", 0, 1, FIFO)
9694

9795
val properties1 = new Properties()
98-
properties1.setProperty("spark.scheduler.pool", "1")
96+
properties1.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, "1")
9997
val properties2 = new Properties()
100-
properties2.setProperty("spark.scheduler.pool", "2")
98+
properties2.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, "2")
10199

102100
val taskSetManager10 = createTaskSetManager(0, 1, taskScheduler)
103101
val taskSetManager11 = createTaskSetManager(1, 1, taskScheduler)
@@ -134,22 +132,22 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
134132
}
135133

136134
test("Nested Pool Test") {
137-
sc = new SparkContext("local", "TaskSchedulerImplSuite")
135+
sc = new SparkContext(LOCAL, APP_NAME)
138136
val taskScheduler = new TaskSchedulerImpl(sc)
139137

140-
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
141-
val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
142-
val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
138+
val rootPool = new Pool("", FAIR, 0, 0)
139+
val pool0 = new Pool("0", FAIR, 3, 1)
140+
val pool1 = new Pool("1", FAIR, 4, 1)
143141
rootPool.addSchedulable(pool0)
144142
rootPool.addSchedulable(pool1)
145143

146-
val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
147-
val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
144+
val pool00 = new Pool("00", FAIR, 2, 2)
145+
val pool01 = new Pool("01", FAIR, 1, 1)
148146
pool0.addSchedulable(pool00)
149147
pool0.addSchedulable(pool01)
150148

151-
val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
152-
val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
149+
val pool10 = new Pool("10", FAIR, 2, 2)
150+
val pool11 = new Pool("11", FAIR, 2, 1)
153151
pool1.addSchedulable(pool10)
154152
pool1.addSchedulable(pool11)
155153

@@ -178,4 +176,37 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
178176
scheduleTaskAndVerifyId(2, rootPool, 6)
179177
scheduleTaskAndVerifyId(3, rootPool, 2)
180178
}
179+
180+
test("SPARK-17663: FairSchedulableBuilder sets default values for blank or invalid datas") {
181+
val xmlPath = getClass.getClassLoader.getResource("fairscheduler-with-invalid-data.xml")
182+
.getFile()
183+
val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath)
184+
185+
val rootPool = new Pool("", FAIR, 0, 0)
186+
val schedulableBuilder = new FairSchedulableBuilder(rootPool, conf)
187+
schedulableBuilder.buildPools()
188+
189+
verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
190+
verifyPool(rootPool, "pool_with_invalid_min_share", 0, 2, FAIR)
191+
verifyPool(rootPool, "pool_with_invalid_weight", 1, 1, FAIR)
192+
verifyPool(rootPool, "pool_with_invalid_scheduling_mode", 3, 2, FIFO)
193+
verifyPool(rootPool, "pool_with_non_uppercase_scheduling_mode", 2, 1, FAIR)
194+
verifyPool(rootPool, "pool_with_NONE_scheduling_mode", 1, 2, FIFO)
195+
verifyPool(rootPool, "pool_with_whitespace_min_share", 0, 2, FAIR)
196+
verifyPool(rootPool, "pool_with_whitespace_weight", 1, 1, FAIR)
197+
verifyPool(rootPool, "pool_with_whitespace_scheduling_mode", 3, 2, FIFO)
198+
verifyPool(rootPool, "pool_with_empty_min_share", 0, 3, FAIR)
199+
verifyPool(rootPool, "pool_with_empty_weight", 2, 1, FAIR)
200+
verifyPool(rootPool, "pool_with_empty_scheduling_mode", 2, 2, FIFO)
201+
verifyPool(rootPool, "pool_with_surrounded_whitespace", 3, 2, FAIR)
202+
}
203+
204+
private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int,
205+
expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = {
206+
assert(rootPool.getSchedulableByName(poolName) != null)
207+
assert(rootPool.getSchedulableByName(poolName).minShare === expectedInitMinShare)
208+
assert(rootPool.getSchedulableByName(poolName).weight === expectedInitWeight)
209+
assert(rootPool.getSchedulableByName(poolName).schedulingMode === expectedSchedulingMode)
210+
}
211+
181212
}

0 commit comments

Comments
 (0)