Skip to content

Conversation

@Ngone51
Copy link
Member

@Ngone51 Ngone51 commented Aug 3, 2020

What changes were proposed in this pull request?

  1. Make CoarseGrainedSchedulerBackend.maxNumConcurrentTasks() considers all kinds of resources when calculating the max concurrent tasks

  2. Refactor calculateAvailableSlots() to make it be able to be used for both CoarseGrainedSchedulerBackend and TaskSchedulerImpl

Why are the changes needed?

Currently, CoarseGrainedSchedulerBackend.maxNumConcurrentTasks() only considers the CPU for the max concurrent tasks. This can cause the application to hang when a barrier stage requires extra custom resources but the cluster doesn't have enough corresponding resources. Because, without the checking for other custom resources in maxNumConcurrentTasks, the barrier stage can be submitted to the TaskSchedulerImpl. But the TaskSchedulerImpl won't launch tasks for the barrier stage due to the insufficient task slots calculated by TaskSchedulerImpl.calculateAvailableSlots (which does check all kinds of resources).

The application hang issue can be reproduced by the added unit test.

Does this PR introduce any user-facing change?

Yes. In case of a barrier stage requires more custom resources than the cluster has, the application can get hang before this PR but can fail due to insufficient resources at the end after this PR.

How was this patch tested?

Added a unit test.

@Ngone51
Copy link
Member Author

Ngone51 commented Aug 3, 2020

ping @tgravescs @jiangxb1987 @mridulm Please take a look, thanks!

@SparkQA
Copy link

SparkQA commented Aug 3, 2020

Test build #126958 has finished for PR 29332 at commit 3af932b.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Aug 3, 2020

Test build #126961 has started for PR 29332 at commit 3af932b.

val taskLimit = resourceProfile.taskResources.get(limitingResource).map(_.amount)
.getOrElse {
val errorMsg = "limitingResource returns from ResourceProfile " +
s"$resourceProfile doesn't actually contain that task resource!"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs This actually should not happen, right? According to:

if (taskResourcesToCheck.nonEmpty) {
throw new SparkException("No executor resource configs were not specified for the " +
s"following task configs: ${taskResourcesToCheck.keys.mkString(",")}")
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct, I believe it was there just a double check and make sure nothing broke in the future

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah thanks for fixing, I think this got lost in the change that cores no longer had to be limiting resource and thinking barrier scheduling didn't work with dynamic allocation. I think I was thinking it would be covered when dynamic allocation support was added, but it obviously affects the default profile.

val taskLimit = resourceProfile.taskResources.get(limitingResource).map(_.amount)
.getOrElse {
val errorMsg = "limitingResource returns from ResourceProfile " +
s"$resourceProfile doesn't actually contain that task resource!"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct, I believe it was there just a double check and make sure nothing broke in the future

val cpusPerTask = ResourceProfile.getTaskCpusOrDefaultForProfile(rp, conf)
val executorsWithResourceProfile = executorDataMap.values.filter(_.resourceProfileId == rp.id)
executorsWithResourceProfile.map(_.totalCores / cpusPerTask).sum
val (rpIds, cpus, resources) = executorDataMap.values.toArray.map { executor =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should first filter executorDataMap for active executors.

We are doing a lot of calculation here even when not in a barrier stage. I think out in DagScheduler.checkBarrierStageWithNumSlots we should move the call to this after check for rdd.isBarrier (or make it las) so everyone doesn't have to pay for it. This is definitely doing more work than it was before and we want to keep scheduler code as fast as possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense.

executor.resourceProfileId,
executor.totalCores,
executor.resourcesInfo.map { case (name, rInfo) =>
(name, rInfo.resourceAddresses.length * rInfo.slotsPerAddress)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, here we are trying to calculate the total concurrent tasks that could run, even if some of them are busy at the moment so I think we should add some documentation there just to clarify.
Also can we add in a function to ExecutorResourceInfo to get this (rInfo.resourceAddresses.length * rInfo.slotsPerAddress).

@SparkQA
Copy link

SparkQA commented Aug 4, 2020

Test build #127022 has finished for PR 29332 at commit 83a5dff.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, pending Jenkins

@tgravescs
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Aug 4, 2020

Test build #127053 has started for PR 29332 at commit 83a5dff.

@SparkQA
Copy link

SparkQA commented Aug 5, 2020

Test build #127074 has finished for PR 29332 at commit 786b145.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 7f275ee Aug 6, 2020
@Ngone51
Copy link
Member Author

Ngone51 commented Aug 6, 2020

thanks all! BTW, shall we backport this to branch-3.0? Though, it would be a little different fix since branch-3.0 doesn't have ResourceProfile.

@cloud-fan
Copy link
Contributor

Yea let's create a new a PR for 3.0

@tgravescs
Copy link
Contributor

branch 3 doesn't need it because cores are forced to be limiting resource.

@Ngone51
Copy link
Member Author

Ngone51 commented Aug 6, 2020

I think that's the problem. Think about the case where a barrier stage requires 2 CPUs and 2 GPUs but the cluster only has 2 CPUs and 1 GPU. In 3.0, since the limiting resource is cores/CPUs, the barrier stage would have a chance to launch tasks. However, it could only launch one task because the real limiting resources should be GPU. In this case, the barrier stage will fail because of partial task launch. But the error message is quite confusing for users as it suggests to disable delay scheduling, while the real cause should be insufficient (custom) resources. If we backport this fix to 3.0, the barrier stage should fail early before it was able to launch the task.

@tgravescs
Copy link
Contributor

This change it checking to see what your executors have. Spark will not let you start the application if your task requirements are such that cpus is not your limiting resource. so the only way you can get an executor with 2 cores and 1 gpu is if each task requires 2 cores and 1 gpu and at that point gpu is not your limiting resource, they are equivalent.

@Ngone51
Copy link
Member Author

Ngone51 commented Aug 6, 2020

That's not true when we don't need to check cores in some clusters? for example, Standalone.

@tgravescs
Copy link
Contributor

Oh right, if you didn't specify the executor cores in standalone mode - you get them all by default. Which honestly causes lots of issues - I filed a jira for this for other things. that is left up to the user to properly configure their cluster and job.
There are other things in spark 3.0 that expect cores to be the limiting resource. Dynamic allocation is a big one, which I know you don't care about, but I'm pretty sure there are other things throughout the code that does as well.

We should remove the isDynamicAllocationCheck here so that the warning gets printed for standalone as well:
https://github.com/apache/spark/blob/branch-3.0/core/src/main/scala/org/apache/spark/SparkContext.scala#L2836

If you want to go through and make sure everything is updated to schedule based on resources, I'm fine with it but I'm pretty sure needs to be more then this.

@Ngone51
Copy link
Member Author

Ngone51 commented Aug 10, 2020

I am not sure about the other things(probably, you could link the specific JIRA?) At least, I'd like to backport the fix to schedule based on the resources: #29395.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants