diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 65c08cff2ba11..66fe1d7f2ba13 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1597,7 +1597,8 @@ class SparkContext(config: SparkConf) extends Logging { } /** - * Get the max number of tasks that can be concurrent launched currently. + * Get the max number of tasks that can be concurrent launched based on the resources + * could be used, even if some of them are being used at the moment. * Note that please don't cache the value returned by this method, because the number can change * due to add/remove executors. * @@ -2776,8 +2777,9 @@ object SparkContext extends Logging { } // some cluster managers don't set the EXECUTOR_CORES config by default (standalone // and mesos coarse grained), so we can't rely on that config for those. - val shouldCheckExecCores = executorCores.isDefined || sc.conf.contains(EXECUTOR_CORES) || + var shouldCheckExecCores = executorCores.isDefined || sc.conf.contains(EXECUTOR_CORES) || (master.equalsIgnoreCase("yarn") || master.startsWith("k8s")) + shouldCheckExecCores &= !sc.conf.get(SKIP_VALIDATE_CORES_TESTING) // Number of cores per executor must meet at least one task requirement. if (shouldCheckExecCores && execCores < taskCores) { @@ -2833,7 +2835,7 @@ object SparkContext extends Logging { limitingResourceName = taskReq.resourceName } } - if(!shouldCheckExecCores && Utils.isDynamicAllocationEnabled(sc.conf)) { + if(!shouldCheckExecCores) { // if we can't rely on the executor cores config throw a warning for user logWarning("Please ensure that the number of slots available on your " + "executors is limited by the number of cores to task cpus and not another " + @@ -2857,7 +2859,7 @@ object SparkContext extends Logging { s"result in wasted resources due to resource ${limitingResourceName} limiting the " + s"number of runnable tasks per executor to: ${numSlots}. Please adjust " + s"your configuration." - if (Utils.isTesting) { + if (sc.conf.get(RESOURCES_WARNING_TESTING)) { throw new SparkException(message) } else { logWarning(message) diff --git a/core/src/main/scala/org/apache/spark/internal/config/Tests.scala b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala index 232264d6c8e49..e328ed026bfb0 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Tests.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala @@ -61,4 +61,19 @@ private[spark] object Tests { .version("3.0.0") .intConf .createWithDefault(2) + + val RESOURCES_WARNING_TESTING = ConfigBuilder("spark.resources.warnings.testing") + .version("3.0.1") + .booleanConf + .createWithDefault(false) + + // This configuration is used for unit tests to allow skipping the task cpus to cores validation + // to allow emulating standalone mode behavior while running in local mode. Standalone mode + // by default doesn't specify a number of executor cores, it just uses all the ones available + // on the host. + val SKIP_VALIDATE_CORES_TESTING = + ConfigBuilder("spark.testing.skipValidateCores") + .version("3.0.1") + .booleanConf + .createWithDefault(false) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala b/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala index 2274e6898adf6..043c6b90384b4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala @@ -60,6 +60,6 @@ private[spark] object BarrierJobAllocationFailed { val ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER = "[SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires " + "more slots than the total number of slots in the cluster currently. Please init a new " + - "cluster with more CPU cores or repartition the input RDD(s) to reduce the number of " + - "slots required to run this barrier stage." + "cluster with more resources(e.g. CPU, GPU) or repartition the input RDD(s) to reduce " + + "the number of slots required to run this barrier stage." } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 51445bf451bec..b483b52662270 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -453,10 +453,12 @@ private[spark] class DAGScheduler( * submission. */ private def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = { - val numPartitions = rdd.getNumPartitions - val maxNumConcurrentTasks = sc.maxNumConcurrentTasks - if (rdd.isBarrier() && numPartitions > maxNumConcurrentTasks) { - throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks) + if (rdd.isBarrier()) { + val numPartitions = rdd.getNumPartitions + val maxNumConcurrentTasks = sc.maxNumConcurrentTasks + if (numPartitions > maxNumConcurrentTasks) { + throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks) + } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala index fd04db8c09d76..508c6cebd9fe3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala @@ -36,4 +36,5 @@ private[spark] class ExecutorResourceInfo( override protected def resourceName = this.name override protected def resourceAddresses = this.addresses override protected def slotsPerAddress: Int = numParts + def totalAddressAmount: Int = resourceAddresses.length * slotsPerAddress } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 9159d2a0158d5..7b76af2f489f9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -77,7 +77,8 @@ private[spark] trait SchedulerBackend { def getDriverAttributes: Option[Map[String, String]] = None /** - * Get the max number of tasks that can be concurrent launched currently. + * Get the max number of tasks that can be concurrent launched based on the resources + * could be used, even if some of them are being used at the moment. * Note that please don't cache the value returned by this method, because the number can change * due to add/remove executors. * diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 97125f6238b22..46641e5bf5580 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -446,7 +446,17 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - val availableSlots = availableCpus.map(c => c / CPUS_PER_TASK).sum + // we only need to calculate available slots if using barrier scheduling, otherwise the + // value is -1 + val availableSlots = if (taskSet.isBarrier) { + val availableResourcesAmount = availableResources.map { resourceMap => + // note that the addresses here have been expanded according to the numParts + resourceMap.map { case (name, addresses) => (name, addresses.length) } + } + calculateAvailableSlots(this, availableCpus, availableResourcesAmount) + } else { + -1 + } // Skip the barrier taskSet if the available slots are less than the number of pending tasks. if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { // Skip the launch process. @@ -933,6 +943,30 @@ private[spark] object TaskSchedulerImpl { val SCHEDULER_MODE_PROPERTY = SCHEDULER_MODE.key + /** + * Calculate the max available task slots given the `availableCpus` and `availableResources` + * from a collection of executors. + * + * @param scheduler the TaskSchedulerImpl instance + * @param availableCpus an Array of the amount of available cpus from the executors. + * @param availableResources an Array of the resources map from the executors. In the resource + * map, it maps from the resource name to its amount. + * @return the number of max task slots + */ + def calculateAvailableSlots( + scheduler: TaskSchedulerImpl, + availableCpus: Array[Int], + availableResources: Array[Map[String, Int]]): Int = { + val cpusPerTask = scheduler.CPUS_PER_TASK + val resourcesReqsPerTask = scheduler.resourcesReqsPerTask + availableCpus.zip(availableResources).map { case (cpu, resources) => + val cpuNum = cpu / cpusPerTask + resourcesReqsPerTask.map { req => + resources.get(req.resourceName).map(_ / req.amount).getOrElse(0) + }.reduceOption(Math.min).map(_.min(cpuNum)).getOrElse(cpuNum) + }.sum + } + /** * Used to balance containers across hosts. * diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 9d8fb8f83856d..8b55e2c7dbee5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -563,10 +563,26 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp !executorsPendingLossReason.contains(id) } + /** + * Get the max number of tasks that can be concurrent launched based on the resources + * could be used, even if some of them are being used at the moment. + * Note that please don't cache the value returned by this method, because the number can change + * due to add/remove executors. + * + * @return The max number of tasks that can be concurrent launched currently. + */ override def maxNumConcurrentTasks(): Int = synchronized { - executorDataMap.values.map { executor => - executor.totalCores / scheduler.CPUS_PER_TASK - }.sum + val (cpus, resources) = { + executorDataMap + .filter { case (id, _) => isExecutorActive(id) } + .values.toArray.map { executor => + ( + executor.totalCores, + executor.resourcesInfo.map { case (name, rInfo) => (name, rInfo.totalAddressAmount) } + ) + }.unzip + } + TaskSchedulerImpl.calculateAvailableSlots(scheduler, cpus, resources) } // this function is for testing only diff --git a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala index 435b927068e60..7052d1a2028bb 100644 --- a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala +++ b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala @@ -19,9 +19,12 @@ package org.apache.spark import scala.concurrent.duration._ +import org.apache.spark.TestUtils.createTempScriptWithExpectedOutput import org.apache.spark.internal.config._ import org.apache.spark.rdd.{PartitionPruningRDD, RDD} +import org.apache.spark.resource.TestResourceIDs.{EXECUTOR_GPU_ID, TASK_GPU_ID, WORKER_GPU_ID} import org.apache.spark.scheduler.BarrierJobAllocationFailed._ +import org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed import org.apache.spark.util.ThreadUtils /** @@ -259,4 +262,37 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext testSubmitJob(sc, rdd, message = ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER) } + + test("SPARK-32518: CoarseGrainedSchedulerBackend.maxNumConcurrentTasks should " + + "consider all kinds of resources for the barrier stage") { + withTempDir { dir => + val discoveryScript = createTempScriptWithExpectedOutput( + dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0"]}""") + + val conf = new SparkConf() + .setMaster("local-cluster[1, 2, 1024]") + .setAppName("test-cluster") + .set(WORKER_GPU_ID.amountConf, "1") + .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript) + .set(EXECUTOR_GPU_ID.amountConf, "1") + .set(TASK_GPU_ID.amountConf, "1") + // disable barrier stage retry to fail the application as soon as possible + .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 1) + // disable the check to simulate the behavior of Standalone in order to + // reproduce the issue. + .set(Tests.SKIP_VALIDATE_CORES_TESTING, true) + sc = new SparkContext(conf) + // setup an executor which will have 2 CPUs and 1 GPU + TestUtils.waitUntilExecutorsUp(sc, 1, 60000) + + val exception = intercept[BarrierJobSlotsNumberCheckFailed] { + sc.parallelize(Range(1, 10), 2) + .barrier() + .mapPartitions { iter => iter } + .collect() + } + assert(exception.getMessage.contains("[SPARK-24819]: Barrier execution " + + "mode does not allow run a barrier stage that requires more slots")) + } + } } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index ce437a5e42b22..dc1c0451c628d 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -36,6 +36,7 @@ import org.scalatest.concurrent.Eventually import org.apache.spark.TestUtils._ import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Tests.RESOURCES_WARNING_TESTING import org.apache.spark.internal.config.UI._ import org.apache.spark.resource.ResourceAllocation import org.apache.spark.resource.ResourceUtils._ @@ -890,6 +891,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu .setAppName("test-cluster") conf.set(TASK_GPU_ID.amountConf, "2") conf.set(EXECUTOR_GPU_ID.amountConf, "4") + conf.set(RESOURCES_WARNING_TESTING, true) var error = intercept[SparkException] { sc = new SparkContext(conf)