Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1597,7 +1597,8 @@ class SparkContext(config: SparkConf) extends Logging {
}

/**
* Get the max number of tasks that can be concurrent launched currently.
* Get the max number of tasks that can be concurrent launched based on the resources
* could be used, even if some of them are being used at the moment.
* Note that please don't cache the value returned by this method, because the number can change
* due to add/remove executors.
*
Expand Down Expand Up @@ -2776,8 +2777,9 @@ object SparkContext extends Logging {
}
// some cluster managers don't set the EXECUTOR_CORES config by default (standalone
// and mesos coarse grained), so we can't rely on that config for those.
val shouldCheckExecCores = executorCores.isDefined || sc.conf.contains(EXECUTOR_CORES) ||
var shouldCheckExecCores = executorCores.isDefined || sc.conf.contains(EXECUTOR_CORES) ||
(master.equalsIgnoreCase("yarn") || master.startsWith("k8s"))
shouldCheckExecCores &= !sc.conf.get(SKIP_VALIDATE_CORES_TESTING)

// Number of cores per executor must meet at least one task requirement.
if (shouldCheckExecCores && execCores < taskCores) {
Expand Down Expand Up @@ -2833,7 +2835,7 @@ object SparkContext extends Logging {
limitingResourceName = taskReq.resourceName
}
}
if(!shouldCheckExecCores && Utils.isDynamicAllocationEnabled(sc.conf)) {
if(!shouldCheckExecCores) {
// if we can't rely on the executor cores config throw a warning for user
logWarning("Please ensure that the number of slots available on your " +
"executors is limited by the number of cores to task cpus and not another " +
Expand All @@ -2857,7 +2859,7 @@ object SparkContext extends Logging {
s"result in wasted resources due to resource ${limitingResourceName} limiting the " +
s"number of runnable tasks per executor to: ${numSlots}. Please adjust " +
s"your configuration."
if (Utils.isTesting) {
if (sc.conf.get(RESOURCES_WARNING_TESTING)) {
throw new SparkException(message)
} else {
logWarning(message)
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/Tests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,19 @@ private[spark] object Tests {
.version("3.0.0")
.intConf
.createWithDefault(2)

val RESOURCES_WARNING_TESTING = ConfigBuilder("spark.resources.warnings.testing")
.version("3.0.1")
.booleanConf
.createWithDefault(false)

// This configuration is used for unit tests to allow skipping the task cpus to cores validation
// to allow emulating standalone mode behavior while running in local mode. Standalone mode
// by default doesn't specify a number of executor cores, it just uses all the ones available
// on the host.
val SKIP_VALIDATE_CORES_TESTING =
ConfigBuilder("spark.testing.skipValidateCores")
.version("3.0.1")
.booleanConf
.createWithDefault(false)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 2 configs are backported from Master branch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. This should be 3.0.1 when it comes to branch-3.0, @Ngone51 .
Also, after merging this, please update master branch consistently.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @dongjoon-hyun for letting me know. I was wondering about it previously.

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ private[spark] object BarrierJobAllocationFailed {
val ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER =
"[SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires " +
"more slots than the total number of slots in the cluster currently. Please init a new " +
"cluster with more CPU cores or repartition the input RDD(s) to reduce the number of " +
"slots required to run this barrier stage."
"cluster with more resources(e.g. CPU, GPU) or repartition the input RDD(s) to reduce " +
"the number of slots required to run this barrier stage."
}
Original file line number Diff line number Diff line change
Expand Up @@ -453,10 +453,12 @@ private[spark] class DAGScheduler(
* submission.
*/
private def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = {
val numPartitions = rdd.getNumPartitions
val maxNumConcurrentTasks = sc.maxNumConcurrentTasks
if (rdd.isBarrier() && numPartitions > maxNumConcurrentTasks) {
throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks)
if (rdd.isBarrier()) {
val numPartitions = rdd.getNumPartitions
val maxNumConcurrentTasks = sc.maxNumConcurrentTasks
if (numPartitions > maxNumConcurrentTasks) {
throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ private[spark] class ExecutorResourceInfo(
override protected def resourceName = this.name
override protected def resourceAddresses = this.addresses
override protected def slotsPerAddress: Int = numParts
def totalAddressAmount: Int = resourceAddresses.length * slotsPerAddress
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ private[spark] trait SchedulerBackend {
def getDriverAttributes: Option[Map[String, String]] = None

/**
* Get the max number of tasks that can be concurrent launched currently.
* Get the max number of tasks that can be concurrent launched based on the resources
* could be used, even if some of them are being used at the moment.
* Note that please don't cache the value returned by this method, because the number can change
* due to add/remove executors.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,17 @@ private[spark] class TaskSchedulerImpl(
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
for (taskSet <- sortedTaskSets) {
val availableSlots = availableCpus.map(c => c / CPUS_PER_TASK).sum
// we only need to calculate available slots if using barrier scheduling, otherwise the
// value is -1
val availableSlots = if (taskSet.isBarrier) {
val availableResourcesAmount = availableResources.map { resourceMap =>
// note that the addresses here have been expanded according to the numParts
resourceMap.map { case (name, addresses) => (name, addresses.length) }
}
calculateAvailableSlots(this, availableCpus, availableResourcesAmount)
} else {
-1
}
// Skip the barrier taskSet if the available slots are less than the number of pending tasks.
if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
// Skip the launch process.
Expand Down Expand Up @@ -933,6 +943,30 @@ private[spark] object TaskSchedulerImpl {

val SCHEDULER_MODE_PROPERTY = SCHEDULER_MODE.key

/**
* Calculate the max available task slots given the `availableCpus` and `availableResources`
* from a collection of executors.
*
* @param scheduler the TaskSchedulerImpl instance
* @param availableCpus an Array of the amount of available cpus from the executors.
* @param availableResources an Array of the resources map from the executors. In the resource
* map, it maps from the resource name to its amount.
* @return the number of max task slots
*/
def calculateAvailableSlots(
scheduler: TaskSchedulerImpl,
availableCpus: Array[Int],
availableResources: Array[Map[String, Int]]): Int = {
val cpusPerTask = scheduler.CPUS_PER_TASK
val resourcesReqsPerTask = scheduler.resourcesReqsPerTask
availableCpus.zip(availableResources).map { case (cpu, resources) =>
val cpuNum = cpu / cpusPerTask
resourcesReqsPerTask.map { req =>
resources.get(req.resourceName).map(_ / req.amount).getOrElse(0)
}.reduceOption(Math.min).map(_.min(cpuNum)).getOrElse(cpuNum)
}.sum
}

/**
* Used to balance containers across hosts.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,10 +563,26 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
!executorsPendingLossReason.contains(id)
}

/**
* Get the max number of tasks that can be concurrent launched based on the resources
* could be used, even if some of them are being used at the moment.
* Note that please don't cache the value returned by this method, because the number can change
* due to add/remove executors.
*
* @return The max number of tasks that can be concurrent launched currently.
*/
override def maxNumConcurrentTasks(): Int = synchronized {
executorDataMap.values.map { executor =>
executor.totalCores / scheduler.CPUS_PER_TASK
}.sum
val (cpus, resources) = {
executorDataMap
.filter { case (id, _) => isExecutorActive(id) }
.values.toArray.map { executor =>
(
executor.totalCores,
executor.resourcesInfo.map { case (name, rInfo) => (name, rInfo.totalAddressAmount) }
)
}.unzip
}
TaskSchedulerImpl.calculateAvailableSlots(scheduler, cpus, resources)
}

// this function is for testing only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package org.apache.spark

import scala.concurrent.duration._

import org.apache.spark.TestUtils.createTempScriptWithExpectedOutput
import org.apache.spark.internal.config._
import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
import org.apache.spark.resource.TestResourceIDs.{EXECUTOR_GPU_ID, TASK_GPU_ID, WORKER_GPU_ID}
import org.apache.spark.scheduler.BarrierJobAllocationFailed._
import org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed
import org.apache.spark.util.ThreadUtils

/**
Expand Down Expand Up @@ -259,4 +262,37 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
testSubmitJob(sc, rdd,
message = ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
}

test("SPARK-32518: CoarseGrainedSchedulerBackend.maxNumConcurrentTasks should " +
"consider all kinds of resources for the barrier stage") {
withTempDir { dir =>
val discoveryScript = createTempScriptWithExpectedOutput(
dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0"]}""")

val conf = new SparkConf()
.setMaster("local-cluster[1, 2, 1024]")
.setAppName("test-cluster")
.set(WORKER_GPU_ID.amountConf, "1")
.set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
.set(EXECUTOR_GPU_ID.amountConf, "1")
.set(TASK_GPU_ID.amountConf, "1")
// disable barrier stage retry to fail the application as soon as possible
.set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 1)
// disable the check to simulate the behavior of Standalone in order to
// reproduce the issue.
.set(Tests.SKIP_VALIDATE_CORES_TESTING, true)
sc = new SparkContext(conf)
// setup an executor which will have 2 CPUs and 1 GPU
TestUtils.waitUntilExecutorsUp(sc, 1, 60000)

val exception = intercept[BarrierJobSlotsNumberCheckFailed] {
sc.parallelize(Range(1, 10), 2)
.barrier()
.mapPartitions { iter => iter }
.collect()
}
assert(exception.getMessage.contains("[SPARK-24819]: Barrier execution " +
"mode does not allow run a barrier stage that requires more slots"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if its worth it but it would be nice to perhaps print what the limiting resource is. If its to much change or work to track we may just skip it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually a good idea. But as you mentioned, I'm afraid this needs much more changes. So, I'd like to skip it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, we can revisit if it becomes an issue later.

}
}
}
2 changes: 2 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.scalatest.concurrent.Eventually

import org.apache.spark.TestUtils._
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests.RESOURCES_WARNING_TESTING
import org.apache.spark.internal.config.UI._
import org.apache.spark.resource.ResourceAllocation
import org.apache.spark.resource.ResourceUtils._
Expand Down Expand Up @@ -890,6 +891,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
.setAppName("test-cluster")
conf.set(TASK_GPU_ID.amountConf, "2")
conf.set(EXECUTOR_GPU_ID.amountConf, "4")
conf.set(RESOURCES_WARNING_TESTING, true)

var error = intercept[SparkException] {
sc = new SparkContext(conf)
Expand Down