Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,11 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
logWarning(s"A job was submitted with scheduler pool $poolName, which has not been " +
"configured. This can happen when the file that pools are read from isn't set, or " +
s"when that file doesn't contain $poolName. Created $poolName with default " +
s"configuration (schedulingMode: $DEFAULT_SCHEDULING_MODE, " +
s"minShare: $DEFAULT_MINIMUM_SHARE, weight: $DEFAULT_WEIGHT)")
}
}
parentPool.addSchedulable(manager)
Expand Down
97 changes: 91 additions & 6 deletions core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
val LOCAL = "local"
val APP_NAME = "PoolSuite"
val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file"
val TEST_POOL = "testPool"

def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl)
: TaskSetManager = {
Expand All @@ -40,7 +41,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null), 0)
}

def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int) {
def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int): Unit = {
val taskSetQueue = rootPool.getSortedTaskSetQueue
val nextTaskSetToSchedule =
taskSetQueue.find(t => (t.runningTasks + t.tasksSuccessful) < t.numTasks)
Expand Down Expand Up @@ -201,12 +202,96 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
verifyPool(rootPool, "pool_with_surrounded_whitespace", 3, 2, FAIR)
}

/**
* spark.scheduler.pool property should be ignored for the FIFO scheduler,
* because pools are only needed for fair scheduling.
*/
test("FIFO scheduler uses root pool and not spark.scheduler.pool property") {
sc = new SparkContext("local", "PoolSuite")
val taskScheduler = new TaskSchedulerImpl(sc)

val rootPool = new Pool("", SchedulingMode.FIFO, initMinShare = 0, initWeight = 0)
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)

val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, taskScheduler)
val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, taskScheduler)

val properties = new Properties()
properties.setProperty("spark.scheduler.pool", TEST_POOL)

// When FIFO Scheduler is used and task sets are submitted, they should be added to
// the root pool, and no additional pools should be created
// (even though there's a configured default pool).
schedulableBuilder.addTaskSetManager(taskSetManager0, properties)
schedulableBuilder.addTaskSetManager(taskSetManager1, properties)

assert(rootPool.getSchedulableByName(TEST_POOL) === null)
assert(rootPool.schedulableQueue.size === 2)
assert(rootPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0)
assert(rootPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1)
}

test("FAIR Scheduler uses default pool when spark.scheduler.pool property is not set") {
sc = new SparkContext("local", "PoolSuite")
val taskScheduler = new TaskSchedulerImpl(sc)

val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
schedulableBuilder.buildPools()

// Submit a new task set manager with pool properties set to null. This should result
// in the task set manager getting added to the default pool.
val taskSetManager0 = createTaskSetManager(stageId = 0, numTasks = 1, taskScheduler)
schedulableBuilder.addTaskSetManager(taskSetManager0, null)

val defaultPool = rootPool.getSchedulableByName(schedulableBuilder.DEFAULT_POOL_NAME)
assert(defaultPool !== null)
assert(defaultPool.schedulableQueue.size === 1)
assert(defaultPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0)

// When a task set manager is submitted with spark.scheduler.pool unset, it should be added to
// the default pool (as above).
val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, taskScheduler)
schedulableBuilder.addTaskSetManager(taskSetManager1, new Properties())

assert(defaultPool.schedulableQueue.size === 2)
assert(defaultPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1)
}

test("FAIR Scheduler creates a new pool when spark.scheduler.pool property points to " +
"a non-existent pool") {
sc = new SparkContext("local", "PoolSuite")
val taskScheduler = new TaskSchedulerImpl(sc)

val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
schedulableBuilder.buildPools()

assert(rootPool.getSchedulableByName(TEST_POOL) === null)

val taskSetManager = createTaskSetManager(stageId = 0, numTasks = 1, taskScheduler)

val properties = new Properties()
properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, TEST_POOL)

// The fair scheduler should create a new pool with default values when spark.scheduler.pool
// points to a pool that doesn't exist yet (this can happen when the file that pools are read
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JOOC does this result in a warning being printed to the user? (I know you can't test for that -- but it does seem like useful behavior to have)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, this behavior is not highlighted via logs and we just points the pool creation as
Created pool: ..., schedulingMode: ..., minShare: ..., weight: ... so i agree it can be useful for the user and it is addressed with the latest commits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating this!

// from isn't set, or when that file doesn't contain the pool name specified
// by spark.scheduler.pool).
schedulableBuilder.addTaskSetManager(taskSetManager, properties)

verifyPool(rootPool, TEST_POOL, schedulableBuilder.DEFAULT_MINIMUM_SHARE,
schedulableBuilder.DEFAULT_WEIGHT, schedulableBuilder.DEFAULT_SCHEDULING_MODE)
val testPool = rootPool.getSchedulableByName(TEST_POOL)
assert(testPool.getSchedulableByName(taskSetManager.name) === taskSetManager)
}

private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int,
expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = {
assert(rootPool.getSchedulableByName(poolName) != null)
assert(rootPool.getSchedulableByName(poolName).minShare === expectedInitMinShare)
assert(rootPool.getSchedulableByName(poolName).weight === expectedInitWeight)
assert(rootPool.getSchedulableByName(poolName).schedulingMode === expectedSchedulingMode)
val selectedPool = rootPool.getSchedulableByName(poolName)
assert(selectedPool !== null)
assert(selectedPool.minShare === expectedInitMinShare)
assert(selectedPool.weight === expectedInitWeight)
assert(selectedPool.schedulingMode === expectedSchedulingMode)
}

}