-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21115][Core]If the cores left is less than the coresPerExecutor,the cores left will not be allocated, so it should not to check in every schedule #18322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ll not be allocated, so it should not to check in every schedule
|
Can one of the admins verify this patch? |
|
According to my test, current code of |
|
@jerryshao I have not see any issue here, and I have tested this again using the latest Master code, the problem also exists. |
|
If you don't see any issue here, what's problem you met? From my understanding, what you did here is only changing the code slightly to avoid unnecessary check, is that right? |
|
@jerryshao The problem is: If we start an app with the param --total-executor-cores=4 and spark.executor.cores=3, the code "app.coresLeft>0" is always true in "org.apache.spark.deploy.master.startExecutorsOnWorkers" and it will try to allocate executor for this app and it will allocate nothing, it is better to compare the app.coresLeft whih coresPerExecutor, if the coresLeft less than coresPerExecutor, it will return directly. |
|
@jerryshao I have modified the "app.coresLeft>0" to "app.coresLeft >= coresPerExecutor.getOrElse(1)". |
|
I see, I understand your changes now. IMO, because user specifically request for 3 cores per executor (as an example), it is not so good to allocate 1 executor with only 1 core, this may break user's purpose. Also the original behavior of Besides, I think it would be better to add some warning logs in |
|
@jerryshao Ok, I will add warning logs in SparkSubmit, thanks. |
|
@jerryshao I have added warning logs in SparkSubmit , would you like to review it again, thanks. |
| SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script") | ||
| } | ||
| if (totalExecutorCores != null && executorCores != null) { | ||
| val totalCores = Try(totalExecutorCores.toInt).getOrElse(-1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of the Try block here? if it fails the input is invalid, and proceeding with -1 can't be right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will remove the Try block, thanks.
| // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app | ||
| // in the queue, then the second app, etc. | ||
| for (app <- waitingApps if app.coresLeft > 0) { | ||
| for (app <- waitingApps) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't all of this change be reverted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is not it be better to compare app.coresLeft whih coresPerExecutor? If the coresLeft less than coresPerExecutor, it will return directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen If the total cores is not divisible by cores per executor, the compare app.coresLeft>0 will be always true, so it is better to compare app.coresLeft with coresPerExecutor than compare with 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, Jerry was not saying that this part results in a logic change. This is OK.
| SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script") | ||
| } | ||
| if (totalExecutorCores != null && executorCores != null | ||
| && (totalExecutorCores.toInt % executorCores.toInt) != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor, but I think the parentheses are in the wrong place. They should go around the expression, or probably just be removed. You might avoid duplicating the mod expression for clarity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I've modified the mod expression repeated to a val param and reused it,thanks.
| // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app | ||
| // in the queue, then the second app, etc. | ||
| for (app <- waitingApps if app.coresLeft > 0) { | ||
| for (app <- waitingApps) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, Jerry was not saying that this part results in a logic change. This is OK.
| allocateWorkerResourceToExecutors( | ||
| app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) | ||
| // If the cores left is less than the coresPerExecutor,the cores left will not be allocated | ||
| if (app.coresLeft >= coresPerExecutor.getOrElse(1)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might avoid duplicating coresPerExecutor.getOrElse(1) by referring to it as requestedCoresPerExecutor or something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I've modified the expression "val coresPerExecutor = app.desc.coresPerExecutor" to "val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)" and reused it. And then using "app.desc.coresPerExecutor" directly in the function allocateWorkerResourceToExecutors. Thanks.
| if (pyFiles != null && !isPython) { | ||
| SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script") | ||
| } | ||
| if (totalExecutorCores != null && executorCores != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think about this again, I think this part of code logic could be moved to SparkConf, in case user sets these configuration via SparkConf object in a programmatic way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I have moved it to SparkConf, would you like to review it again, thanks.
|
@eatoncys can you please add a unit test in |
|
@jerryshao, I have added a unit test in MasterSuite, would you like to review it again, thanks. |
| } | ||
| } | ||
|
|
||
| if (contains("spark.cores.max")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these checks for negative numbers are redundant with arg checking for spark-submit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen Users may set these configuration via SparkConf object in a programmatic way after arg checking for spark-submit, can I move the checkings from spark–submit to here, or removed it from here directly,which is better?
| } | ||
|
|
||
| if (contains("spark.cores.max")) { | ||
| val totalCores = getInt("spark.cores.max", -1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use Option instead, I think we don't need to check them if they're not set. Also looks like if we don't set this configuration, will it always lead to exception as default value is -1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao I don't understand very cleanly, if we don't set this configuration ,the "if (contains("spark.cores.max")) " will not got into.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry my fault, I misunderstood the code.
| master.invokePrivate(_state()) | ||
| } | ||
|
|
||
| test("Total cores is not divisible by cores per executor") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC I don't think this UT really reflects the code you changed in Master, even with the original code this two UTs should also be passed. I think you should test if the logics you changed is executed or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao The result is same before and after my change, how to test them differently, any suggestion? thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My original thinking is that if it is possible to test wether the if branch is executed or not using mock and verify. But looks like it is not easy to test. Can you please investigate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao I have not any good way to test like this, any good suggestion? @srowen
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, if there's no better way to verify it, I think it is not useful to add this two UTs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao Ok, I have removed them out, thanks
| s"(was ${get("spark.executor.cores")}) can only be a positive number") | ||
| } | ||
| } | ||
| if (contains("spark.cores.max") && contains("spark.executor.cores")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could move the negative check to here to simplify the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao I put the negative check here first, but I think the app should exit directly if the cores is negative, so I move them out. And @srowen thinks these checks for negative numbers are redundant with arg checking for spark-submit, it may be a good way to move the checkings from spark-submit to here.
| if (Try(JavaUtils.byteStringAsBytes(executorMemory)).getOrElse(-1L) <= 0) { | ||
| throw new IllegalArgumentException(s"spark.executor.memory " + | ||
| s"(was ${executorMemory}) can only be a positive number") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This above two checks seems unnecessary, let's not change unrelated code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao Ok, I have removed them, and moved the checkings back to spark-submit. thanks.
| } | ||
| if (numExecutors != null && Try(numExecutors.toInt).getOrElse(-1) <= 0) { | ||
| SparkSubmit.printErrorAndExit("Number of executors must be a positive number") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above changes are valid and useful, I'd suggest to not change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao Ok ,I have moved them back, thanks
|
@jiangxb1987 can you please help to review this PR? This is a simple code improvement to avoid some unnecessary code execution when left cores is not enough for one executor. I don't strong inclination on this PR since previous code also does correct behavior, I'd like to hear your thoughts. |
|
cc @srowen |
| } | ||
| } | ||
| if (contains("spark.cores.max") && contains("spark.executor.cores")) { | ||
| val totalCores = getInt("spark.cores.max", 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao I think most of the new argument checking code is redundant. I think the 7 lines from here are all that are needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just keep this checking branch, or move the previous negative checking code here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen @jiangxb1987 Ok, I have removed the argument checking code, thanks.
jiangxb1987
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes looks good overall, only a few comments.
| if (contains("spark.cores.max")) { | ||
| val totalCores = getInt("spark.cores.max", -1) | ||
| if (totalCores <= 0) { | ||
| throw new IllegalArgumentException(s"spark.cores.max (was ${get("spark.cores.max")})" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
${get("spark.cores.max")} => $totalCores ?
| val executorCores = getInt("spark.executor.cores", -1) | ||
| if (executorCores <= 0) { | ||
| throw new IllegalArgumentException(s"spark.executor.cores " + | ||
| s"(was ${get("spark.executor.cores")}) can only be a positive number") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| } | ||
| } | ||
| if (contains("spark.cores.max") && contains("spark.executor.cores")) { | ||
| val totalCores = getInt("spark.cores.max", 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just keep this checking branch, or move the previous negative checking code here.
|
LGTM! |
jiangxb1987
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, cc @cloud-fan
|
LGTM, merging to master! |
…or,the cores left will not be allocated, so it should not to check in every schedule ## What changes were proposed in this pull request? If we start an app with the param --total-executor-cores=4 and spark.executor.cores=3, the cores left is always 1, so it will try to allocate executors in the function org.apache.spark.deploy.master.startExecutorsOnWorkers in every schedule. Another question is, is it will be better to allocate another executor with 1 core for the cores left. ## How was this patch tested? unit test Author: 10129659 <chen.yanshan@zte.com.cn> Closes apache#18322 from eatoncys/leftcores.
What changes were proposed in this pull request?
If we start an app with the param --total-executor-cores=4 and spark.executor.cores=3, the cores left is always 1, so it will try to allocate executors in the function org.apache.spark.deploy.master.startExecutorsOnWorkers in every schedule.
Another question is, is it will be better to allocate another executor with 1 core for the cores left.
How was this patch tested?
unit test