diff --git a/cluster/src/main/scala/org/apache/spark/scheduler/SnappyTaskSchedulerImpl.scala b/cluster/src/main/scala/org/apache/spark/scheduler/SnappyTaskSchedulerImpl.scala index 1584b6865e..6980170e0e 100644 --- a/cluster/src/main/scala/org/apache/spark/scheduler/SnappyTaskSchedulerImpl.scala +++ b/cluster/src/main/scala/org/apache/spark/scheduler/SnappyTaskSchedulerImpl.scala @@ -16,8 +16,19 @@ */ package org.apache.spark.scheduler -import org.apache.spark.SparkContext -import org.apache.spark.sql.SnappyContext +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger +import java.util.function.{LongFunction, ToLongFunction} + +import scala.collection.mutable.ArrayBuffer + +import com.koloboke.function.{LongObjPredicate, ObjLongToLongFunction} +import io.snappydata.Property +import io.snappydata.collection.{LongObjectHashMap, ObjectLongHashMap} + +import org.apache.spark.scheduler.TaskLocality.TaskLocality +import org.apache.spark.sql.{BlockAndExecutorId, SnappyContext} +import org.apache.spark.{SparkContext, SparkException, TaskNotSerializableException} private[spark] class SnappyTaskSchedulerImpl(sc: SparkContext) extends TaskSchedulerImpl(sc) { @@ -25,4 +36,239 @@ private[spark] class SnappyTaskSchedulerImpl(sc: SparkContext) extends TaskSched SnappyContext.initGlobalSparkContext(sc) super.postStartHook() } + + private type CoresAndAttempts = (ObjectLongHashMap[String], LongObjectHashMap[TaskSetManager]) + + private val limitJobCores = Property.LimitJobCores.get(sc.conf) + + private val (maxExecutorTaskCores, numExecutors) = { + val map = new ConcurrentHashMap[String, Integer](16, 0.7f, 1) + if (limitJobCores) { + for ((executorId, blockId) <- SnappyContext.getAllBlockIds) { + addBlockId(executorId, blockId, map) + } + } + (map, new AtomicInteger(map.size())) + } + private val stageCoresAndAttempts = + LongObjectHashMap.withExpectedSize[CoresAndAttempts](32) + private val taskIdExecutorAndManager = + LongObjectHashMap.withExpectedSize[(String, TaskSetManager)](32) + + private val createNewStageMap = new LongFunction[CoresAndAttempts] { + override def apply(stageId: Long): CoresAndAttempts = + ObjectLongHashMap.withExpectedSize[String](8) -> + LongObjectHashMap.withExpectedSize[TaskSetManager](2) + } + private val lookupExecutorCores = new ToLongFunction[String] { + override def applyAsLong(executorId: String): Long = { + maxExecutorTaskCores.get(executorId) match { + case null => Int.MaxValue // no restriction + case c => c.intValue() + } + } + } + private val addCPUsOnTaskFinish = new ObjLongToLongFunction[String] { + override def applyAsLong(execId: String, availableCores: Long): Long = + availableCores + CPUS_PER_TASK + } + + private def addBlockId(executorId: String, blockId: BlockAndExecutorId, + map: ConcurrentHashMap[String, Integer]): Boolean = { + if (limitJobCores && blockId.executorCores > 0 && blockId.numProcessors > 0 && + blockId.numProcessors < blockId.executorCores) { + map.put(executorId, Int.box(blockId.numProcessors)) == null + } else false + } + + private[spark] def addBlockId(executorId: String, blockId: BlockAndExecutorId): Unit = { + if (addBlockId(executorId, blockId, maxExecutorTaskCores)) { + numExecutors.incrementAndGet() + } + } + + private[spark] def removeBlockId(executorId: String): Unit = { + maxExecutorTaskCores.remove(executorId) match { + case null => + case _ => numExecutors.decrementAndGet() + } + } + + override protected def getTaskSetManagerForSubmit(taskSet: TaskSet): TaskSetManager = { + val manager = createTaskSetManager(taskSet, maxTaskFailures) + val stage = taskSet.stageId + val (stageAvailableCores, stageTaskSets) = stageCoresAndAttempts.computeIfAbsent( + stage, createNewStageMap) + val conflictingTaskSet = !stageTaskSets.forEachWhile(new LongObjPredicate[TaskSetManager] { + override def test(attempt: Long, ts: TaskSetManager): Boolean = { + ts.taskSet == taskSet || ts.isZombie + } + }) + if (conflictingTaskSet) { + throw new IllegalStateException( + s"more than one active taskSet for stage $stage: $stageTaskSets") + } + if (stageAvailableCores.size() > 0) stageAvailableCores.clear() + stageTaskSets.justPut(taskSet.stageAttemptId, manager) + manager + } + + override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized { + logInfo(s"Cancelling stage $stageId") + stageCoresAndAttempts.get(stageId) match { + case null => + case (_, attempts) => attempts.forEachWhile(new LongObjPredicate[TaskSetManager] { + override def test(attempt: Long, tsm: TaskSetManager): Boolean = { + // There are two possible cases here: + // 1. The task set manager has been created and some tasks have been scheduled. + // In this case, send a kill signal to the executors to kill the task + // and then abort the stage. + // 2. The task set manager has been created but no tasks has been scheduled. + // In this case, simply abort the stage. + tsm.runningTasksSet.foreach { tid => + val execId = taskIdExecutorAndManager.get(tid)._1 + backend.killTask(tid, execId, interruptThread) + } + val msg = s"Stage $stageId cancelled" + tsm.abort(msg) + logInfo(msg) + true + } + }) + } + } + + override def taskSetFinished(manager: TaskSetManager): Unit = synchronized { + val taskSet = manager.taskSet + stageCoresAndAttempts.get(taskSet.stageId) match { + case null => + case (_, taskSetsForStage) => + taskSetsForStage.remove(taskSet.stageAttemptId) + if (taskSetsForStage.size() == 0) { + stageCoresAndAttempts.remove(taskSet.stageId) + } + } + manager.parent.removeSchedulable(manager) + if (isInfoEnabled) { + logInfo(s"Removed TaskSet ${taskSet.id}, whose tasks have all completed, from pool " + + manager.parent.name) + } + } + + /** + * Avoid giving all the available cores on a node to a single task. This serves two purposes: + * + * a) Keeps some cores free for any concurrent tasks that may be submitted after + * the first has been scheduled. + * + * b) Since the snappy executors use (2 * physical cores) to aid in more concurrency, + * it helps reduce disk activity for a single task and improves performance for + * disk intensive queries. + */ + override protected def resourceOfferSingleTaskSet( + taskSet: TaskSetManager, + maxLocality: TaskLocality, + shuffledOffers: Seq[WorkerOffer], + availableCpus: Array[Int], + tasks: IndexedSeq[ArrayBuffer[TaskDescription]]): Boolean = { + // reduce the available CPUs for a single taskSet if more than physical cores are exposed + val availableCores = if (numExecutors.get() > 0) { + val coresAndAttempts = stageCoresAndAttempts.get(taskSet.taskSet.stageId) + if (coresAndAttempts ne null) coresAndAttempts._1 else null + } else null + var launchedTask = false + for (i <- shuffledOffers.indices) { + val execId = shuffledOffers(i).executorId + if ((availableCpus(i) >= CPUS_PER_TASK) && + ((availableCores eq null) || + (availableCores.computeIfAbsent(execId, lookupExecutorCores) >= CPUS_PER_TASK))) { + try { + val host = shuffledOffers(i).host + for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { + tasks(i) += task + val tid = task.taskId + taskIdExecutorAndManager.justPut(tid, execId -> taskSet) + executorIdToRunningTaskIds(execId).add(tid) + if (availableCores ne null) { + availableCores.addValue(execId, -CPUS_PER_TASK) + } + availableCpus(i) -= CPUS_PER_TASK + assert(availableCpus(i) >= 0) + launchedTask = true + } + } catch { + case _: TaskNotSerializableException => + logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") + // Do not offer resources for this task, but don't throw an error to allow other + // task sets to be submitted. + return launchedTask + } + } + } + launchedTask + } + + override protected[scheduler] def getTaskSetManager(taskId: Long): Option[TaskSetManager] = { + taskIdExecutorAndManager.get(taskId) match { + case null => None + case (_, manager) => Some(manager) + } + } + + override protected def getExecutorAndManager( + taskId: Long): Option[(() => String, TaskSetManager)] = { + taskIdExecutorAndManager.get(taskId) match { + case null => None + case (execId, manager) => Some(() => execId, manager) + } + } + + override def error(message: String): Unit = synchronized { + if (stageCoresAndAttempts.size() > 0) { + // Have each task set throw a SparkException with the error + stageCoresAndAttempts.forEachWhile(new LongObjPredicate[CoresAndAttempts] { + override def test(stageId: Long, p: CoresAndAttempts): Boolean = { + p._2.forEachWhile(new LongObjPredicate[TaskSetManager] { + override def test(attempt: Long, manager: TaskSetManager): Boolean = { + try { + manager.abort(message) + } catch { + case e: Exception => logError("Exception in error callback", e) + } + true + } + }) + } + }) + } + else { + // No task sets are active but we still got an error. Just exit since this + // must mean the error is during registration. + // It might be good to do something smarter here in the future. + throw new SparkException(s"Exiting due to error from cluster scheduler: $message") + } + } + + override protected def cleanupTaskState(tid: Long): Unit = { + taskIdExecutorAndManager.remove(tid) match { + case null => + case (executorId, taskSet) => + executorIdToRunningTaskIds.get(executorId) match { + case Some(s) => s.remove(tid) + case None => + } + stageCoresAndAttempts.get(taskSet.taskSet.stageId) match { + case null => + case (cores, _) => cores.computeIfPresent(executorId, addCPUsOnTaskFinish) + } + } + } + + override private[scheduler] def taskSetManagerForAttempt( + stageId: Int, stageAttemptId: Int): Option[TaskSetManager] = { + stageCoresAndAttempts.get(stageId) match { + case null => None + case (_, attempts) => Option(attempts.get(stageAttemptId)) + } + } } diff --git a/cluster/src/main/scala/org/apache/spark/scheduler/cluster/SnappyCoarseGrainedSchedulerBackend.scala b/cluster/src/main/scala/org/apache/spark/scheduler/cluster/SnappyCoarseGrainedSchedulerBackend.scala index bbd66eb9b9..9b2cee98bb 100644 --- a/cluster/src/main/scala/org/apache/spark/scheduler/cluster/SnappyCoarseGrainedSchedulerBackend.scala +++ b/cluster/src/main/scala/org/apache/spark/scheduler/cluster/SnappyCoarseGrainedSchedulerBackend.scala @@ -20,7 +20,7 @@ import com.pivotal.gemfirexd.internal.engine.Misc import org.apache.spark.SparkContext import org.apache.spark.rpc.{RpcEndpointAddress, RpcEnv} -import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerBlockManagerAdded, SparkListenerBlockManagerRemoved, SparkListenerExecutorAdded, SparkListenerExecutorRemoved, TaskSchedulerImpl} +import org.apache.spark.scheduler._ import org.apache.spark.sql.{BlockAndExecutorId, SnappyContext} class SnappyCoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, @@ -87,18 +87,22 @@ class BlockManagerIdListener(sc: SparkContext) override def onBlockManagerAdded( msg: SparkListenerBlockManagerAdded): Unit = synchronized { val executorId = msg.blockManagerId.executorId - SnappyContext.getBlockIdIfNull(executorId) match { + val blockId = SnappyContext.getBlockIdIfNull(executorId) match { case None => val numCores = sc.schedulerBackend.defaultParallelism() - SnappyContext.addBlockId(executorId, new BlockAndExecutorId( - msg.blockManagerId, numCores, numCores)) - case Some(b) => b._blockId = msg.blockManagerId + val bid = new BlockAndExecutorId(msg.blockManagerId, numCores, numCores) + SnappyContext.addBlockId(executorId, bid) + bid + case Some(b) => b._blockId = msg.blockManagerId; b } + sc.taskScheduler.asInstanceOf[SnappyTaskSchedulerImpl].addBlockId(executorId, blockId) } override def onBlockManagerRemoved( msg: SparkListenerBlockManagerRemoved): Unit = { - SnappyContext.removeBlockId(msg.blockManagerId.executorId) + val executorId = msg.blockManagerId.executorId + SnappyContext.removeBlockId(executorId) + sc.taskScheduler.asInstanceOf[SnappyTaskSchedulerImpl].removeBlockId(executorId) } override def onExecutorRemoved(msg: SparkListenerExecutorRemoved): Unit = diff --git a/core/src/main/scala/io/snappydata/Literals.scala b/core/src/main/scala/io/snappydata/Literals.scala index 75984185da..b4b9cfe519 100644 --- a/core/src/main/scala/io/snappydata/Literals.scala +++ b/core/src/main/scala/io/snappydata/Literals.scala @@ -225,6 +225,11 @@ object Property extends Enumeration { "If true then cluster startup will wait for Spark jobserver to be fully initialized " + "before marking lead as 'RUNNING'. Default is false.", Some(false), prefix = null) + val LimitJobCores = Val(s"${Constant.SPARK_PREFIX}scheduler.limitJobCores", + "If true then cores given to a single job will be limited to physical cores on a host." + + "This allows executors to be configured with more than physical cores to allow for some " + + "concurrency of long jobs with short jobs. Default is true.", Some(true), prefix = null) + val SnappyConnection = Val[String](s"${Constant.PROPERTY_PREFIX}connection", "Host and client port combination in the form [host:clientPort]. This " + "is used by smart connector to connect to SnappyData cluster using " + diff --git a/spark b/spark index 9f2322ac1f..630426db86 160000 --- a/spark +++ b/spark @@ -1 +1 @@ -Subproject commit 9f2322ac1fca64896b7f1e0032cc0a741f4fdea9 +Subproject commit 630426db8658b5b64c42d6cdd4b63004bb8c8bef