From 1c84b6bcaf563f8f804e24a2ff8fc136d0c7e07a Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 3 Aug 2020 13:56:19 +0800 Subject: [PATCH 1/7] fix --- .../scheduler/ExecutorResourceInfo.scala | 4 +- .../spark/scheduler/TaskSchedulerImpl.scala | 107 ++++++++++-------- .../CoarseGrainedSchedulerBackend.scala | 13 ++- .../spark/BarrierStageOnSubmittedSuite.scala | 36 ++++++ 4 files changed, 107 insertions(+), 53 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala index fd04db8c09d7..f9e4ad4152f3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala @@ -34,6 +34,6 @@ private[spark] class ExecutorResourceInfo( extends ResourceInformation(name, addresses.toArray) with ResourceAllocator { override protected def resourceName = this.name - override protected def resourceAddresses = this.addresses - override protected def slotsPerAddress: Int = numParts + override def resourceAddresses: Seq[String] = this.addresses + override def slotsPerAddress: Int = numParts } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 2551e497a165..10433df035e6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -468,51 +468,6 @@ private[spark] class TaskSchedulerImpl( Some(localTaskReqAssign.toMap) } - // Use the resource that the resourceProfile has as the limiting resource to calculate the - // total number of slots available based on the current offers. - private def calculateAvailableSlots( - resourceProfileIds: Array[Int], - availableCpus: Array[Int], - availableResources: Array[Map[String, Buffer[String]]], - taskSet: TaskSetManager): Int = { - val resourceProfile = sc.resourceProfileManager.resourceProfileFromId( - taskSet.taskSet.resourceProfileId) - val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter { case (id, _) => - (id == resourceProfile.id) - } - val coresKnown = resourceProfile.isCoresLimitKnown - var limitingResource = resourceProfile.limitingResource(conf) - val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(resourceProfile, conf) - - offersForResourceProfile.map { case (o, index) => - val numTasksPerExecCores = availableCpus(index) / taskCpus - // if limiting resource is empty then we have no other resources, so it has to be CPU - if (limitingResource == ResourceProfile.CPUS || limitingResource.isEmpty) { - numTasksPerExecCores - } else { - val taskLimit = resourceProfile.taskResources.get(limitingResource).map(_.amount) - .getOrElse { - val errorMsg = "limitingResource returns from ResourceProfile " + - s"$resourceProfile doesn't actually contain that task resource!" - taskSet.abort(errorMsg) - throw new SparkException(errorMsg) - } - // available addresses already takes into account if there are fractional - // task resource requests - val availAddrs = availableResources(index).get(limitingResource).map(_.size).getOrElse(0) - val resourceLimit = (availAddrs / taskLimit).toInt - if (!coresKnown) { - // when executor cores config isn't set, we can't calculate the real limiting resource - // and number of tasks per executor ahead of time, so calculate it now based on what - // is available. - if (numTasksPerExecCores <= resourceLimit) numTasksPerExecCores else resourceLimit - } else { - resourceLimit - } - } - }.sum - } - private def minTaskLocality( l1: Option[TaskLocality], l2: Option[TaskLocality]) : Option[TaskLocality] = { @@ -591,9 +546,14 @@ private[spark] class TaskSchedulerImpl( // we only need to calculate available slots if using barrier scheduling, otherwise the // value is -1 val numBarrierSlotsAvailable = if (taskSet.isBarrier) { - val slots = calculateAvailableSlots(resourceProfileIds, availableCpus, availableResources, - taskSet) - slots + val rpId = taskSet.taskSet.resourceProfileId + val availableResourcesAmount = availableResources.map { resourceMap => + // available addresses already takes into account if there are fractional + // task resource requests + resourceMap.map { case (name, addresses) => (name, addresses.length) } + } + calculateAvailableSlots(this, conf, rpId, resourceProfileIds, availableCpus, + availableResourcesAmount) } else { -1 } @@ -1166,6 +1126,57 @@ private[spark] object TaskSchedulerImpl { val SCHEDULER_MODE_PROPERTY = SCHEDULER_MODE.key + /** + * Calculate the max available task slots given the `availableCpus` and `availableResources` + * from a collection of ResourceProfiles. And only those ResourceProfiles who has the + * same id with the `rpId` can be used to calculate the task slots. + * + * @param scheduler the TaskSchedulerImpl instance + * @param conf SparkConf used to calculate the limiting resource and get the cpu amount per task + * @param rpId the target ResourceProfile id. Only those ResourceProfiles who has the same id + * with it can be used to calculate the task slots. + * @param availableRPIds an Array of ids of the available ResourceProfiles from the executors. + * @param availableCpus an Array of the amount of available cpus from the executors. + * @param availableResources an Array of the resources map from the executors. In the resource + * map, it maps from the resource name to its amount. + * @return the number of max task slots + */ + def calculateAvailableSlots( + scheduler: TaskSchedulerImpl, + conf: SparkConf, + rpId: Int, + availableRPIds: Array[Int], + availableCpus: Array[Int], + availableResources: Array[Map[String, Int]]): Int = { + val resourceProfile = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) + val coresKnown = resourceProfile.isCoresLimitKnown + val limitingResource = resourceProfile.limitingResource(conf) + // if limiting resource is empty then we have no other resources, so it has to be CPU + val limitedByCpu = limitingResource == ResourceProfile.CPUS || limitingResource.isEmpty + val cpusPerTask = ResourceProfile.getTaskCpusOrDefaultForProfile(resourceProfile, conf) + val taskLimit = resourceProfile.taskResources.get(limitingResource).map(_.amount).get + + availableCpus.zip(availableResources).zip(availableRPIds) + .filter { case (_, id) => id == rpId } + .map { case ((cpu, resources), _) => + val numTasksPerExecCores = cpu / cpusPerTask + if (limitedByCpu) { + numTasksPerExecCores + } else { + val availAddrs = resources.getOrElse(limitingResource, 0) + val resourceLimit = (availAddrs / taskLimit).toInt + // when executor cores config isn't set, we can't calculate the real limiting resource + // and number of tasks per executor ahead of time, so calculate it now based on what + // is available. + if (!coresKnown && numTasksPerExecCores <= resourceLimit) { + numTasksPerExecCores + } else { + resourceLimit + } + } + }.sum + } + /** * Used to balance containers across hosts. * diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 8fbefae58af1..b108a70d9220 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -633,9 +633,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } override def maxNumConcurrentTasks(rp: ResourceProfile): Int = synchronized { - val cpusPerTask = ResourceProfile.getTaskCpusOrDefaultForProfile(rp, conf) - val executorsWithResourceProfile = executorDataMap.values.filter(_.resourceProfileId == rp.id) - executorsWithResourceProfile.map(_.totalCores / cpusPerTask).sum + val (rpIds, cpus, resources) = executorDataMap.values.toArray.map { executor => + ( + executor.resourceProfileId, + executor.totalCores, + executor.resourcesInfo.map { case (name, rInfo) => + (name, rInfo.resourceAddresses.length * rInfo.slotsPerAddress) + } + ) + }.unzip3 + TaskSchedulerImpl.calculateAvailableSlots(scheduler, conf, rp.id, rpIds, cpus, resources) } // this function is for testing only diff --git a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala index 435b927068e6..1ba13c2ef189 100644 --- a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala +++ b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala @@ -19,9 +19,12 @@ package org.apache.spark import scala.concurrent.duration._ +import org.apache.spark.TestUtils.createTempScriptWithExpectedOutput import org.apache.spark.internal.config._ import org.apache.spark.rdd.{PartitionPruningRDD, RDD} +import org.apache.spark.resource.TestResourceIDs.{EXECUTOR_GPU_ID, TASK_GPU_ID, WORKER_GPU_ID} import org.apache.spark.scheduler.BarrierJobAllocationFailed._ +import org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed import org.apache.spark.util.ThreadUtils /** @@ -259,4 +262,37 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext testSubmitJob(sc, rdd, message = ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER) } + + test("SPARK-32518: CoarseGrainedSchedulerBackend.maxNumConcurrentTasks should " + + "consider all kinds of resources for the barrier stage") { + withTempDir { dir => + val discoveryScript = createTempScriptWithExpectedOutput( + dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0"]}""") + + val conf = new SparkConf() + // Setup a local cluster which would only has one executor with 2 CPUs and 1 GPU. + .setMaster("local-cluster[1, 2, 1024]") + .setAppName("test-cluster") + .set(WORKER_GPU_ID.amountConf, "1") + .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript) + .set(EXECUTOR_GPU_ID.amountConf, "1") + .set(TASK_GPU_ID.amountConf, "1") + // disable barrier stage retry to fail the application as soon as possible + .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 1) + sc = new SparkContext(conf) + TestUtils.waitUntilExecutorsUp(sc, 1, 60000) + + val exception = intercept[BarrierJobSlotsNumberCheckFailed] { + // Setup a barrier stage which contains 2 tasks and each task requires 1 CPU and 1 GPU. + // Therefore, the total resources requirement (2 CPUs and 2 GPUs) of this barrier stage + // can not be satisfied since the cluster only has 2 CPUs and 1 GPU in total. + sc.parallelize(Range(1, 10), 2) + .barrier() + .mapPartitions { iter => iter } + .collect() + } + assert(exception.getMessage.contains("[SPARK-24819]: Barrier execution " + + "mode does not allow run a barrier stage that requires more slots")) + } + } } From 3af932b2fe3db30bcca5a83bf6f783c98a591304 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 3 Aug 2020 14:08:33 +0800 Subject: [PATCH 2/7] update error message --- .../apache/spark/scheduler/BarrierJobAllocationFailed.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala b/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala index 2274e6898adf..043c6b90384b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala @@ -60,6 +60,6 @@ private[spark] object BarrierJobAllocationFailed { val ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER = "[SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires " + "more slots than the total number of slots in the cluster currently. Please init a new " + - "cluster with more CPU cores or repartition the input RDD(s) to reduce the number of " + - "slots required to run this barrier stage." + "cluster with more resources(e.g. CPU, GPU) or repartition the input RDD(s) to reduce " + + "the number of slots required to run this barrier stage." } From c25021551ff753a58cdf25541305bd60f58403df Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 4 Aug 2020 10:50:40 +0800 Subject: [PATCH 3/7] filter active executors --- .../CoarseGrainedSchedulerBackend.scala | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index b108a70d9220..0c99be5a5b6e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -633,15 +633,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } override def maxNumConcurrentTasks(rp: ResourceProfile): Int = synchronized { - val (rpIds, cpus, resources) = executorDataMap.values.toArray.map { executor => - ( - executor.resourceProfileId, - executor.totalCores, - executor.resourcesInfo.map { case (name, rInfo) => - (name, rInfo.resourceAddresses.length * rInfo.slotsPerAddress) - } - ) - }.unzip3 + val (rpIds, cpus, resources) = { + executorDataMap + .filter { case (id, _) => isExecutorActive(id) } + .values.toArray.map { executor => + ( + executor.resourceProfileId, + executor.totalCores, + executor.resourcesInfo.map { case (name, rInfo) => + (name, rInfo.resourceAddresses.length * rInfo.slotsPerAddress) + } + ) + }.unzip3 + } TaskSchedulerImpl.calculateAvailableSlots(scheduler, conf, rp.id, rpIds, cpus, resources) } From 5c51453c0c4cdaf84552afca63ecdd02bf771574 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 4 Aug 2020 10:54:00 +0800 Subject: [PATCH 4/7] call after isBarrier --- .../org/apache/spark/scheduler/DAGScheduler.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6b376cdadc66..7641948ed4b3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -480,10 +480,12 @@ private[spark] class DAGScheduler( * submission. */ private def checkBarrierStageWithNumSlots(rdd: RDD[_], rp: ResourceProfile): Unit = { - val numPartitions = rdd.getNumPartitions - val maxNumConcurrentTasks = sc.maxNumConcurrentTasks(rp) - if (rdd.isBarrier() && numPartitions > maxNumConcurrentTasks) { - throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks) + if (rdd.isBarrier()) { + val numPartitions = rdd.getNumPartitions + val maxNumConcurrentTasks = sc.maxNumConcurrentTasks(rp) + if (numPartitions > maxNumConcurrentTasks) { + throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks) + } } } From b5c0bf220731941e179b1b237e7acc377991d890 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 4 Aug 2020 10:59:37 +0800 Subject: [PATCH 5/7] add func totalAddressAmount --- .../org/apache/spark/scheduler/ExecutorResourceInfo.scala | 5 +++-- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 4 +--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala index f9e4ad4152f3..508c6cebd9fe 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala @@ -34,6 +34,7 @@ private[spark] class ExecutorResourceInfo( extends ResourceInformation(name, addresses.toArray) with ResourceAllocator { override protected def resourceName = this.name - override def resourceAddresses: Seq[String] = this.addresses - override def slotsPerAddress: Int = numParts + override protected def resourceAddresses = this.addresses + override protected def slotsPerAddress: Int = numParts + def totalAddressAmount: Int = resourceAddresses.length * slotsPerAddress } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 0c99be5a5b6e..8898ff78d61a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -640,9 +640,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp ( executor.resourceProfileId, executor.totalCores, - executor.resourcesInfo.map { case (name, rInfo) => - (name, rInfo.resourceAddresses.length * rInfo.slotsPerAddress) - } + executor.resourcesInfo.map { case (name, rInfo) => (name, rInfo.totalAddressAmount) } ) }.unzip3 } From 83a5dff5d659cd6931e27bcce4419b212d2130db Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 4 Aug 2020 11:15:59 +0800 Subject: [PATCH 6/7] update comment --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- .../org/apache/spark/scheduler/SchedulerBackend.scala | 2 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 9 +++++++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9ecf316beeaa..995ec7419e3e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1603,7 +1603,7 @@ class SparkContext(config: SparkConf) extends Logging { /** * Get the max number of tasks that can be concurrent launched based on the ResourceProfile - * being used. + * could be used, even if some of them are being used at the moment. * Note that please don't cache the value returned by this method, because the number can change * due to add/remove executors. * diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index a5bba645be14..a566d0a04387 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -83,7 +83,7 @@ private[spark] trait SchedulerBackend { /** * Get the max number of tasks that can be concurrent launched based on the ResourceProfile - * being used. + * could be used, even if some of them are being used at the moment. * Note that please don't cache the value returned by this method, because the number can change * due to add/remove executors. * diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 8898ff78d61a..3d7c76cde2b6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -632,6 +632,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } + /** + * Get the max number of tasks that can be concurrent launched based on the ResourceProfile + * could be used, even if some of them are being used at the moment. + * Note that please don't cache the value returned by this method, because the number can change + * due to add/remove executors. + * + * @param rp ResourceProfile which to use to calculate max concurrent tasks. + * @return The max number of tasks that can be concurrent launched currently. + */ override def maxNumConcurrentTasks(rp: ResourceProfile): Int = synchronized { val (rpIds, cpus, resources) = { executorDataMap From 786b145771a12004668ea35afdc6e4054924bfed Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Wed, 5 Aug 2020 11:26:33 +0800 Subject: [PATCH 7/7] fix --- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 10433df035e6..a0c507e7f893 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -1150,9 +1150,15 @@ private[spark] object TaskSchedulerImpl { availableResources: Array[Map[String, Int]]): Int = { val resourceProfile = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) val coresKnown = resourceProfile.isCoresLimitKnown - val limitingResource = resourceProfile.limitingResource(conf) - // if limiting resource is empty then we have no other resources, so it has to be CPU - val limitedByCpu = limitingResource == ResourceProfile.CPUS || limitingResource.isEmpty + val (limitingResource, limitedByCpu) = { + val limiting = resourceProfile.limitingResource(conf) + // if limiting resource is empty then we have no other resources, so it has to be CPU + if (limiting == ResourceProfile.CPUS || limiting.isEmpty) { + (ResourceProfile.CPUS, true) + } else { + (limiting, false) + } + } val cpusPerTask = ResourceProfile.getTaskCpusOrDefaultForProfile(resourceProfile, conf) val taskLimit = resourceProfile.taskResources.get(limitingResource).map(_.amount).get