Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SNAP-2231] Limit maximum cores for a job to physical cores on a node #972

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,259 @@
*/
package org.apache.spark.scheduler

import org.apache.spark.SparkContext
import org.apache.spark.sql.SnappyContext
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.{LongFunction, ToLongFunction}

import scala.collection.mutable.ArrayBuffer

import com.koloboke.function.{LongObjPredicate, ObjLongToLongFunction}
import io.snappydata.Property
import io.snappydata.collection.{LongObjectHashMap, ObjectLongHashMap}

import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.sql.{BlockAndExecutorId, SnappyContext}
import org.apache.spark.{SparkContext, SparkException, TaskNotSerializableException}

private[spark] class SnappyTaskSchedulerImpl(sc: SparkContext) extends TaskSchedulerImpl(sc) {

override def postStartHook(): Unit = {
SnappyContext.initGlobalSparkContext(sc)
super.postStartHook()
}

private type CoresAndAttempts = (ObjectLongHashMap[String], LongObjectHashMap[TaskSetManager])

private val limitJobCores = Property.LimitJobCores.get(sc.conf)

private val (maxExecutorTaskCores, numExecutors) = {
val map = new ConcurrentHashMap[String, Integer](16, 0.7f, 1)
if (limitJobCores) {
for ((executorId, blockId) <- SnappyContext.getAllBlockIds) {
addBlockId(executorId, blockId, map)
}
}
(map, new AtomicInteger(map.size()))
}
private val stageCoresAndAttempts =
LongObjectHashMap.withExpectedSize[CoresAndAttempts](32)
private val taskIdExecutorAndManager =
LongObjectHashMap.withExpectedSize[(String, TaskSetManager)](32)

private val createNewStageMap = new LongFunction[CoresAndAttempts] {
override def apply(stageId: Long): CoresAndAttempts =
ObjectLongHashMap.withExpectedSize[String](8) ->
LongObjectHashMap.withExpectedSize[TaskSetManager](2)
}
private val lookupExecutorCores = new ToLongFunction[String] {
override def applyAsLong(executorId: String): Long = {
maxExecutorTaskCores.get(executorId) match {
case null => Int.MaxValue // no restriction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not defaultParallelism be better than Int.maxVal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null means that cores defined for executor are less than or equal to physical cores on the machine, or limit job has been explicitly disabled. Both cases imply the same thing that is don't put any limits on tasks on a node so this essentially falls back to Spark's TaskSchedulerImpl behaviour.

case c => c.intValue()
}
}
}
private val addCPUsOnTaskFinish = new ObjLongToLongFunction[String] {
override def applyAsLong(execId: String, availableCores: Long): Long =
availableCores + CPUS_PER_TASK
}

private def addBlockId(executorId: String, blockId: BlockAndExecutorId,
map: ConcurrentHashMap[String, Integer]): Boolean = {
if (limitJobCores && blockId.executorCores > 0 && blockId.numProcessors > 0 &&
blockId.numProcessors < blockId.executorCores) {
map.put(executorId, Int.box(blockId.numProcessors)) == null
} else false
}

private[spark] def addBlockId(executorId: String, blockId: BlockAndExecutorId): Unit = {
if (addBlockId(executorId, blockId, maxExecutorTaskCores)) {
numExecutors.incrementAndGet()
}
}

private[spark] def removeBlockId(executorId: String): Unit = {
maxExecutorTaskCores.remove(executorId) match {
case null =>
case _ => numExecutors.decrementAndGet()
}
}

override protected def getTaskSetManagerForSubmit(taskSet: TaskSet): TaskSetManager = {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val (stageAvailableCores, stageTaskSets) = stageCoresAndAttempts.computeIfAbsent(
stage, createNewStageMap)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not be setting the manager for the stageSet. I can see
stageTaskSets(taskSet.stageAttemptId) = manager in original TaskSchedulerImpl.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes done below in line number 112.

val conflictingTaskSet = !stageTaskSets.forEachWhile(new LongObjPredicate[TaskSetManager] {
override def test(attempt: Long, ts: TaskSetManager): Boolean = {
ts.taskSet == taskSet || ts.isZombie
}
})
if (conflictingTaskSet) {
throw new IllegalStateException(
s"more than one active taskSet for stage $stage: $stageTaskSets")
}
if (stageAvailableCores.size() > 0) stageAvailableCores.clear()
stageTaskSets.justPut(taskSet.stageAttemptId, manager)
manager
}

override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
logInfo(s"Cancelling stage $stageId")
stageCoresAndAttempts.get(stageId) match {
case null =>
case (_, attempts) => attempts.forEachWhile(new LongObjPredicate[TaskSetManager] {
override def test(attempt: Long, tsm: TaskSetManager): Boolean = {
// There are two possible cases here:
// 1. The task set manager has been created and some tasks have been scheduled.
// In this case, send a kill signal to the executors to kill the task
// and then abort the stage.
// 2. The task set manager has been created but no tasks has been scheduled.
// In this case, simply abort the stage.
tsm.runningTasksSet.foreach { tid =>
val execId = taskIdExecutorAndManager.get(tid)._1
backend.killTask(tid, execId, interruptThread)
}
val msg = s"Stage $stageId cancelled"
tsm.abort(msg)
logInfo(msg)
true
}
})
}
}

override def taskSetFinished(manager: TaskSetManager): Unit = synchronized {
val taskSet = manager.taskSet
stageCoresAndAttempts.get(taskSet.stageId) match {
case null =>
case (_, taskSetsForStage) =>
taskSetsForStage.remove(taskSet.stageAttemptId)
if (taskSetsForStage.size() == 0) {
stageCoresAndAttempts.remove(taskSet.stageId)
}
}
manager.parent.removeSchedulable(manager)
if (isInfoEnabled) {
logInfo(s"Removed TaskSet ${taskSet.id}, whose tasks have all completed, from pool " +
manager.parent.name)
}
}

/**
* Avoid giving all the available cores on a node to a single task. This serves two purposes:
*
* a) Keeps some cores free for any concurrent tasks that may be submitted after
* the first has been scheduled.
*
* b) Since the snappy executors use (2 * physical cores) to aid in more concurrency,
* it helps reduce disk activity for a single task and improves performance for
* disk intensive queries.
*/
override protected def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: IndexedSeq[ArrayBuffer[TaskDescription]]): Boolean = {
// reduce the available CPUs for a single taskSet if more than physical cores are exposed
val availableCores = if (numExecutors.get() > 0) {
val coresAndAttempts = stageCoresAndAttempts.get(taskSet.taskSet.stageId)
if (coresAndAttempts ne null) coresAndAttempts._1 else null
} else null
var launchedTask = false
for (i <- shuffledOffers.indices) {
val execId = shuffledOffers(i).executorId
if ((availableCpus(i) >= CPUS_PER_TASK) &&
((availableCores eq null) ||
(availableCores.computeIfAbsent(execId, lookupExecutorCores) >= CPUS_PER_TASK))) {
try {
val host = shuffledOffers(i).host
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdExecutorAndManager.justPut(tid, execId -> taskSet)
executorIdToRunningTaskIds(execId).add(tid)
if (availableCores ne null) {
availableCores.addValue(execId, -CPUS_PER_TASK)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put an assertion similar to assert(availableCpus(i) >= 0) ?
We might catch some of the erroneous updates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add.

}
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
}
} catch {
case _: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted.
return launchedTask
}
}
}
launchedTask
}

override protected[scheduler] def getTaskSetManager(taskId: Long): Option[TaskSetManager] = {
taskIdExecutorAndManager.get(taskId) match {
case null => None
case (_, manager) => Some(manager)
}
}

override protected def getExecutorAndManager(
taskId: Long): Option[(() => String, TaskSetManager)] = {
taskIdExecutorAndManager.get(taskId) match {
case null => None
case (execId, manager) => Some(() => execId, manager)
}
}

override def error(message: String): Unit = synchronized {
if (stageCoresAndAttempts.size() > 0) {
// Have each task set throw a SparkException with the error
stageCoresAndAttempts.forEachWhile(new LongObjPredicate[CoresAndAttempts] {
override def test(stageId: Long, p: CoresAndAttempts): Boolean = {
p._2.forEachWhile(new LongObjPredicate[TaskSetManager] {
override def test(attempt: Long, manager: TaskSetManager): Boolean = {
try {
manager.abort(message)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
true
}
})
}
})
}
else {
// No task sets are active but we still got an error. Just exit since this
// must mean the error is during registration.
// It might be good to do something smarter here in the future.
throw new SparkException(s"Exiting due to error from cluster scheduler: $message")
}
}

override protected def cleanupTaskState(tid: Long): Unit = {
taskIdExecutorAndManager.remove(tid) match {
case null =>
case (executorId, taskSet) =>
executorIdToRunningTaskIds.get(executorId) match {
case Some(s) => s.remove(tid)
case None =>
}
stageCoresAndAttempts.get(taskSet.taskSet.stageId) match {
case null =>
case (cores, _) => cores.computeIfPresent(executorId, addCPUsOnTaskFinish)
}
}
}

override private[scheduler] def taskSetManagerForAttempt(
stageId: Int, stageAttemptId: Int): Option[TaskSetManager] = {
stageCoresAndAttempts.get(stageId) match {
case null => None
case (_, attempts) => Option(attempts.get(stageAttemptId))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.pivotal.gemfirexd.internal.engine.Misc

import org.apache.spark.SparkContext
import org.apache.spark.rpc.{RpcEndpointAddress, RpcEnv}
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerBlockManagerAdded, SparkListenerBlockManagerRemoved, SparkListenerExecutorAdded, SparkListenerExecutorRemoved, TaskSchedulerImpl}
import org.apache.spark.scheduler._
import org.apache.spark.sql.{BlockAndExecutorId, SnappyContext}

class SnappyCoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl,
Expand Down Expand Up @@ -87,18 +87,22 @@ class BlockManagerIdListener(sc: SparkContext)
override def onBlockManagerAdded(
msg: SparkListenerBlockManagerAdded): Unit = synchronized {
val executorId = msg.blockManagerId.executorId
SnappyContext.getBlockIdIfNull(executorId) match {
val blockId = SnappyContext.getBlockIdIfNull(executorId) match {
case None =>
val numCores = sc.schedulerBackend.defaultParallelism()
SnappyContext.addBlockId(executorId, new BlockAndExecutorId(
msg.blockManagerId, numCores, numCores))
case Some(b) => b._blockId = msg.blockManagerId
val bid = new BlockAndExecutorId(msg.blockManagerId, numCores, numCores)
SnappyContext.addBlockId(executorId, bid)
bid
case Some(b) => b._blockId = msg.blockManagerId; b
}
sc.taskScheduler.asInstanceOf[SnappyTaskSchedulerImpl].addBlockId(executorId, blockId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SnappyTaskSchedulerImpl.addBlockId() method has a condition blockId.numProcessors < blockId.executorCores. From here it will never be satisfied.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "case None" is for a corner one where blockManager gets added before executor. For normal cases onExecutorAdded will be invoked first where number of physical cores have been properly initialized so addBlockId will work fine. Will add the handling for that case in onExecutorAdded and invoke addBlockId from the Some() match case there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will also add removal in onExecutorRemoved.

}

override def onBlockManagerRemoved(
msg: SparkListenerBlockManagerRemoved): Unit = {
SnappyContext.removeBlockId(msg.blockManagerId.executorId)
val executorId = msg.blockManagerId.executorId
SnappyContext.removeBlockId(executorId)
sc.taskScheduler.asInstanceOf[SnappyTaskSchedulerImpl].removeBlockId(executorId)
}

override def onExecutorRemoved(msg: SparkListenerExecutorRemoved): Unit =
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/io/snappydata/Literals.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ object Property extends Enumeration {
"If true then cluster startup will wait for Spark jobserver to be fully initialized " +
"before marking lead as 'RUNNING'. Default is false.", Some(false), prefix = null)

val LimitJobCores = Val(s"${Constant.SPARK_PREFIX}scheduler.limitJobCores",
"If true then cores given to a single job will be limited to physical cores on a host." +
"This allows executors to be configured with more than physical cores to allow for some " +
"concurrency of long jobs with short jobs. Default is true.", Some(true), prefix = null)

val SnappyConnection = Val[String](s"${Constant.PROPERTY_PREFIX}connection",
"Host and client port combination in the form [host:clientPort]. This " +
"is used by smart connector to connect to SnappyData cluster using " +
Expand Down
2 changes: 1 addition & 1 deletion spark
Submodule spark updated from 9f2322 to 630426