Skip to content

Commit

Permalink
[SNAP-2231] Limit maximum cores for a job to physical cores on a node
Browse files Browse the repository at this point in the history
- overrides in SnappyTaskSchedulerImpl to track per executor cores used by a job
  and cap it to number of physical cores on a node
- combined some maps in TaskSchedulerImpl to recover performance due to above
  and improve further compared to base TaskSchedulerImpl
- property "spark.scheduler.limitJobCores=false" can be set to revert to previous behaviour
  • Loading branch information
Sumedh Wale committed Feb 27, 2018
1 parent 1cf9817 commit 3df2364
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,259 @@
*/
package org.apache.spark.scheduler

import org.apache.spark.SparkContext
import org.apache.spark.sql.SnappyContext
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.{LongFunction, ToLongFunction}

import scala.collection.mutable.ArrayBuffer

import com.koloboke.function.{LongObjPredicate, ObjLongToLongFunction}
import io.snappydata.Property
import io.snappydata.collection.{LongObjectHashMap, ObjectLongHashMap}

import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.sql.{BlockAndExecutorId, SnappyContext}
import org.apache.spark.{SparkContext, SparkException, TaskNotSerializableException}

private[spark] class SnappyTaskSchedulerImpl(sc: SparkContext) extends TaskSchedulerImpl(sc) {

override def postStartHook(): Unit = {
SnappyContext.initGlobalSparkContext(sc)
super.postStartHook()
}

private type CoresAndAttempts = (ObjectLongHashMap[String], LongObjectHashMap[TaskSetManager])

private val limitJobCores = Property.LimitJobCores.get(sc.conf)

private val (maxExecutorTaskCores, numExecutors) = {
val map = new ConcurrentHashMap[String, Integer](16, 0.7f, 1)
if (limitJobCores) {
for ((executorId, blockId) <- SnappyContext.getAllBlockIds) {
addBlockId(executorId, blockId, map)
}
}
(map, new AtomicInteger(map.size()))
}
private val stageCoresAndAttempts =
LongObjectHashMap.withExpectedSize[CoresAndAttempts](32)
private val taskIdExecutorAndManager =
LongObjectHashMap.withExpectedSize[(String, TaskSetManager)](32)

private val createNewStageMap = new LongFunction[CoresAndAttempts] {
override def apply(stageId: Long): CoresAndAttempts =
ObjectLongHashMap.withExpectedSize[String](8) ->
LongObjectHashMap.withExpectedSize[TaskSetManager](2)
}
private val lookupExecutorCores = new ToLongFunction[String] {
override def applyAsLong(executorId: String): Long = {
maxExecutorTaskCores.get(executorId) match {
case null => Int.MaxValue // no restriction
case c => c.intValue()
}
}
}
private val addCPUsOnTaskFinish = new ObjLongToLongFunction[String] {
override def applyAsLong(execId: String, availableCores: Long): Long =
availableCores + CPUS_PER_TASK
}

private def addBlockId(executorId: String, blockId: BlockAndExecutorId,
map: ConcurrentHashMap[String, Integer]): Boolean = {
if (limitJobCores && blockId.executorCores > 0 && blockId.numProcessors > 0 &&
blockId.numProcessors < blockId.executorCores) {
map.put(executorId, Int.box(blockId.numProcessors)) == null
} else false
}

private[spark] def addBlockId(executorId: String, blockId: BlockAndExecutorId): Unit = {
if (addBlockId(executorId, blockId, maxExecutorTaskCores)) {
numExecutors.incrementAndGet()
}
}

private[spark] def removeBlockId(executorId: String): Unit = {
maxExecutorTaskCores.remove(executorId) match {
case null =>
case _ => numExecutors.decrementAndGet()
}
}

override protected def getTaskSetManagerForSubmit(taskSet: TaskSet): TaskSetManager = {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val (stageAvailableCores, stageTaskSets) = stageCoresAndAttempts.computeIfAbsent(
stage, createNewStageMap)
val conflictingTaskSet = !stageTaskSets.forEachWhile(new LongObjPredicate[TaskSetManager] {
override def test(attempt: Long, ts: TaskSetManager): Boolean = {
ts.taskSet == taskSet || ts.isZombie
}
})
if (conflictingTaskSet) {
throw new IllegalStateException(
s"more than one active taskSet for stage $stage: $stageTaskSets")
}
if (stageAvailableCores.size() > 0) stageAvailableCores.clear()
stageTaskSets.justPut(taskSet.stageAttemptId, manager)
manager
}

override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
logInfo(s"Cancelling stage $stageId")
stageCoresAndAttempts.get(stageId) match {
case null =>
case (_, attempts) => attempts.forEachWhile(new LongObjPredicate[TaskSetManager] {
override def test(attempt: Long, tsm: TaskSetManager): Boolean = {
// There are two possible cases here:
// 1. The task set manager has been created and some tasks have been scheduled.
// In this case, send a kill signal to the executors to kill the task
// and then abort the stage.
// 2. The task set manager has been created but no tasks has been scheduled.
// In this case, simply abort the stage.
tsm.runningTasksSet.foreach { tid =>
val execId = taskIdExecutorAndManager.get(tid)._1
backend.killTask(tid, execId, interruptThread)
}
val msg = s"Stage $stageId cancelled"
tsm.abort(msg)
logInfo(msg)
true
}
})
}
}

override def taskSetFinished(manager: TaskSetManager): Unit = synchronized {
val taskSet = manager.taskSet
stageCoresAndAttempts.get(taskSet.stageId) match {
case null =>
case (_, taskSetsForStage) =>
taskSetsForStage.remove(taskSet.stageAttemptId)
if (taskSetsForStage.size() == 0) {
stageCoresAndAttempts.remove(taskSet.stageId)
}
}
manager.parent.removeSchedulable(manager)
if (isInfoEnabled) {
logInfo(s"Removed TaskSet ${taskSet.id}, whose tasks have all completed, from pool " +
manager.parent.name)
}
}

/**
* Avoid giving all the available cores on a node to a single task. This serves two purposes:
*
* a) Keeps some cores free for any concurrent tasks that may be submitted after
* the first has been scheduled.
*
* b) Since the snappy executors use (2 * physical cores) to aid in more concurrency,
* it helps reduce disk activity for a single task and improves performance for
* disk intensive queries.
*/
override protected def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: IndexedSeq[ArrayBuffer[TaskDescription]]): Boolean = {
// reduce the available CPUs for a single taskSet if more than physical cores are exposed
val availableCores = if (numExecutors.get() > 0) {
val coresAndAttempts = stageCoresAndAttempts.get(taskSet.taskSet.stageId)
if (coresAndAttempts ne null) coresAndAttempts._1 else null
} else null
var launchedTask = false
for (i <- shuffledOffers.indices) {
val execId = shuffledOffers(i).executorId
if ((availableCpus(i) >= CPUS_PER_TASK) &&
((availableCores eq null) ||
(availableCores.computeIfAbsent(execId, lookupExecutorCores) >= CPUS_PER_TASK))) {
try {
val host = shuffledOffers(i).host
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdExecutorAndManager.justPut(tid, execId -> taskSet)
executorIdToRunningTaskIds(execId).add(tid)
if (availableCores ne null) {
availableCores.addValue(execId, -CPUS_PER_TASK)
}
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
}
} catch {
case _: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted.
return launchedTask
}
}
}
launchedTask
}

override protected[scheduler] def getTaskSetManager(taskId: Long): Option[TaskSetManager] = {
taskIdExecutorAndManager.get(taskId) match {
case null => None
case (_, manager) => Some(manager)
}
}

override protected def getExecutorAndManager(
taskId: Long): Option[(() => String, TaskSetManager)] = {
taskIdExecutorAndManager.get(taskId) match {
case null => None
case (execId, manager) => Some(() => execId, manager)
}
}

override def error(message: String): Unit = synchronized {
if (stageCoresAndAttempts.size() > 0) {
// Have each task set throw a SparkException with the error
stageCoresAndAttempts.forEachWhile(new LongObjPredicate[CoresAndAttempts] {
override def test(stageId: Long, p: CoresAndAttempts): Boolean = {
p._2.forEachWhile(new LongObjPredicate[TaskSetManager] {
override def test(attempt: Long, manager: TaskSetManager): Boolean = {
try {
manager.abort(message)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
true
}
})
}
})
}
else {
// No task sets are active but we still got an error. Just exit since this
// must mean the error is during registration.
// It might be good to do something smarter here in the future.
throw new SparkException(s"Exiting due to error from cluster scheduler: $message")
}
}

override protected def cleanupTaskState(tid: Long): Unit = {
taskIdExecutorAndManager.remove(tid) match {
case null =>
case (executorId, taskSet) =>
executorIdToRunningTaskIds.get(executorId) match {
case Some(s) => s.remove(tid)
case None =>
}
stageCoresAndAttempts.get(taskSet.taskSet.stageId) match {
case null =>
case (cores, _) => cores.computeIfPresent(executorId, addCPUsOnTaskFinish)
}
}
}

override private[scheduler] def taskSetManagerForAttempt(
stageId: Int, stageAttemptId: Int): Option[TaskSetManager] = {
stageCoresAndAttempts.get(stageId) match {
case null => None
case (_, attempts) => Option(attempts.get(stageAttemptId))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.pivotal.gemfirexd.internal.engine.Misc

import org.apache.spark.SparkContext
import org.apache.spark.rpc.{RpcEndpointAddress, RpcEnv}
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerBlockManagerAdded, SparkListenerBlockManagerRemoved, SparkListenerExecutorAdded, SparkListenerExecutorRemoved, TaskSchedulerImpl}
import org.apache.spark.scheduler._
import org.apache.spark.sql.{BlockAndExecutorId, SnappyContext}

class SnappyCoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl,
Expand Down Expand Up @@ -87,18 +87,22 @@ class BlockManagerIdListener(sc: SparkContext)
override def onBlockManagerAdded(
msg: SparkListenerBlockManagerAdded): Unit = synchronized {
val executorId = msg.blockManagerId.executorId
SnappyContext.getBlockIdIfNull(executorId) match {
val blockId = SnappyContext.getBlockIdIfNull(executorId) match {
case None =>
val numCores = sc.schedulerBackend.defaultParallelism()
SnappyContext.addBlockId(executorId, new BlockAndExecutorId(
msg.blockManagerId, numCores, numCores))
case Some(b) => b._blockId = msg.blockManagerId
val bid = new BlockAndExecutorId(msg.blockManagerId, numCores, numCores)
SnappyContext.addBlockId(executorId, bid)
bid
case Some(b) => b._blockId = msg.blockManagerId; b
}
sc.taskScheduler.asInstanceOf[SnappyTaskSchedulerImpl].addBlockId(executorId, blockId)
}

override def onBlockManagerRemoved(
msg: SparkListenerBlockManagerRemoved): Unit = {
SnappyContext.removeBlockId(msg.blockManagerId.executorId)
val executorId = msg.blockManagerId.executorId
SnappyContext.removeBlockId(executorId)
sc.taskScheduler.asInstanceOf[SnappyTaskSchedulerImpl].removeBlockId(executorId)
}

override def onExecutorRemoved(msg: SparkListenerExecutorRemoved): Unit =
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/io/snappydata/Literals.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ object Property extends Enumeration {
"If true then cluster startup will wait for Spark jobserver to be fully initialized " +
"before marking lead as 'RUNNING'. Default is false.", Some(false), prefix = null)

val LimitJobCores = Val(s"${Constant.SPARK_PREFIX}scheduler.limitJobCores",
"If true then cores given to a single job will be limited to physical cores on a host." +
"This allows executors to be configured with more than physical cores to allow for some " +
"concurrency of long jobs with short jobs. Default is true.", Some(true), prefix = null)

val SnappyConnection = Val[String](s"${Constant.PROPERTY_PREFIX}connection",
"Host and client port combination in the form [host:clientPort]. This " +
"is used by smart connector to connect to SnappyData cluster using " +
Expand Down
2 changes: 1 addition & 1 deletion spark
Submodule spark updated from 9f2322 to 630426

0 comments on commit 3df2364

Please sign in to comment.