-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20564][Deploy] Reduce massive executor failures when executor count is large (>2000) #17854
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…tors to be launched simultaneously
|
Jenkins, ok to test |
|
Test build #76494 has finished for PR 17854 at commit
|
|
It looks to me like this is actually making 2 behavior changes:
is that correct? Did you find that (2) was necessary as well? I understand the problem you are describing, but I'm surprised this really helps the driver scale up to more executors. Maybe this will let the executors start, but won't it just lead to the driver getting swamped when you've got 2500 executors sending heartbeats and task updates? I'm not saying its bad to make this improvement, just trying to understand. I'd feel better about just doing (1) -- if you found (2) is necessary, I would want to think through the implications a bit more. |
|
also cc @tgravescs @vanzin |
|
to slow down launching you could just set spark.yarn.containerLauncherMaxThreads to be smaller. that isn't guaranteed but neither is this really. Just an alternative or something you can do immediately. I don't see any reason to drop the containers yarn gives you unless you are really slowing it down such that is wasting a lot of resource, it will just cause more overhead. Asking for less to start with could be ok although again its just going to slow down the entire thing. How long is it taking you to launch these? Also can you put some more details about exactly what you are seeing? I assume its getting timeout exceptions? Exactly where is it timing out and why. It would be nice to really fix or improve that as well longer term. What is your timeout set to? I want to see details so we can determine if other things should be done, like make the registration retry more, are current timeout's sufficient, etc. |
|
I took a quick look at the registerExecutor call in CoarseGrainedExecutorBackend and its not retrying at all. We should change that to retry. We retry heartbeats and many other things so it makes sense to retry this. If that has to retry and takes a bit longer its no worse then you artificially delaying the launch and its better because if they can get registered it will be faster then you delaying the launch. Between that and changing the max launcher threads I would think that would be sufficient. If not you should have problems during other times. thoughts? |
|
What do you mean by "not retrying"? Do you mean this line: If that's what you mean, there's no need for retrying. No RPC calls retry anymore. See #16503 (comment) for an explanation. |
I see, I guess with the way we have the rpc implemented it just sitting in the outbox or inbox of receiver anyway, so you are saying it makes more sense to just increase the timeout. I definitely agree with you that if things retry they need to be idempotent. I don't in general agree that we shouldn't retry. If the rpc layer is doing it for us that is fine. There are also special cases where you may want do something like a exponential backoff on waiting between tries, etc. But those would be case by case basis. Lots of things retry connections from hadoop to cell phones, etc. Sometimes weird things happen in large clusters. In a large network things might just take a different route. I try to connect once and it either fails or is slow, try again and it works fine because took different route. if you have more raw data that says the retries are bad I would be interested. |
Spark has never really handled network failure. If the connection between the driver and the executor is cut, Spark sees that as the executor dying.
Yes, code that wants to retry should to do that explicitly. The old "retry" existed not because of needs of the code making the call, but because Akka could lose messages. The new RPC layer doesn't lose messages (ignoring the TCP reset case), so that old-style retry is not needed anymore. The connection itself dying is a bigger issue that needs to be handled in the RPC layer if it's really a problem, and the caller retrying isn't really the solution (IMO). |
|
@squito yes, I capped the number of resources in updateResourceRequests so that YarnAllocator asks for less number of resources in each iteration. When allocation fails one iteration, the request is then added back and YarnAllocator will try to allocate the leftover (from the previous iteration) plus the new requests in the next iteration, which can result a lot of allocated containers. The second change, as you pointed out, is used to address this possibility. On a second thought, maybe it is a better solution to change AMRMClientImpl::allocate where it does not add all resource requests from ask to askList. @tgravescs I tried reducing spark.yarn.containerLauncherMaxThreads but it didn't help much. My understanding is that these threads send container launch commands to node managers and immediately return, which is very light weight and can be extremely fast. Launching container on NM side is an async operation. |
|
Let me describe what I've seen when using 2500 executors.
In some cases, we got 5000 executor failures and the application retried and eventually failed. |
|
I re-ran the same application adding these configurations "--conf spark.yarn.scheduler.heartbeat.interval-ms=15000 --conf spark.yarn.launchContainer.count.simultaneously=50". Though it took 50 iterations to get 2500 containers from Yarn, it was faster to reach 2500 executors since there was much less executor failures and as a result little overhead of removing failed executors and less allocation requests to Yarn. |
|
Now I can comfortably use 2500 executors. But when I pushed the executor count to 3000, I saw a lot of heartbeat timeout errors. It is something else we can improve, probably another jira. |
|
what is your network timeout (spark.network.timeout) set to? |
|
also what is the exact error/stack trace you see when you say "failed to connect"? |
I think the biggest improvements might in your cluster setup. I'd ensure that the spark jars (and all dependencies) are already on the local file systems of each node, and keep the application jar as small as possible, by also pushing dependencies of your application onto the local filesystems of each node. That usually keeps the code of your application jar that needs to be shipped around pretty small. Even with it on hdfs, one of the copies is probably on the driver which will still put a lot of pressure on that node. |
|
@tgravescs I used the default spark.network.timeout (120s). When an executor cannot connect the driver, here is the executor log: Here is the related driver log: Container exited with a non-zero exit code 1 |
|
In Kubernetes/Spark, we see fairly similar behavior in the scenario described. When the simultaneous container launching is not throttled, it is capable of DOSing the system. Our solution so far is to allocate in rounds (similar to the throttling mechanism proposed here), and to wait for readiness of previously launched containers (registration of those executors) before proceeding. |
|
The reason why To achieve the second, you could do something like this change, which adds a new configuration (something I don't like) and also has the issues people have asked about (ignoring containers that YARN has allocated for you). Or you could change the way |
|
@mariahualiu do you plan to address any of the feedback here? If not, this should probably be closed. |
## What changes were proposed in this pull request? This PR proposes to close stale PRs, mostly the same instances with apache#18017 Closes apache#14085 - [SPARK-16408][SQL] SparkSQL Added file get Exception: is a directory … Closes apache#14239 - [SPARK-16593] [CORE] [WIP] Provide a pre-fetch mechanism to accelerate shuffle stage. Closes apache#14567 - [SPARK-16992][PYSPARK] Python Pep8 formatting and import reorganisation Closes apache#14579 - [SPARK-16921][PYSPARK] RDD/DataFrame persist()/cache() should return Python context managers Closes apache#14601 - [SPARK-13979][Core] Killed executor is re spawned without AWS key… Closes apache#14830 - [SPARK-16992][PYSPARK][DOCS] import sort and autopep8 on Pyspark examples Closes apache#14963 - [SPARK-16992][PYSPARK] Virtualenv for Pylint and pep8 in lint-python Closes apache#15227 - [SPARK-17655][SQL]Remove unused variables declarations and definations in a WholeStageCodeGened stage Closes apache#15240 - [SPARK-17556] [CORE] [SQL] Executor side broadcast for broadcast joins Closes apache#15405 - [SPARK-15917][CORE] Added support for number of executors in Standalone [WIP] Closes apache#16099 - [SPARK-18665][SQL] set statement state to "ERROR" after user cancel job Closes apache#16445 - [SPARK-19043][SQL]Make SparkSQLSessionManager more configurable Closes apache#16618 - [SPARK-14409][ML][WIP] Add RankingEvaluator Closes apache#16766 - [SPARK-19426][SQL] Custom coalesce for Dataset Closes apache#16832 - [SPARK-19490][SQL] ignore case sensitivity when filtering hive partition columns Closes apache#17052 - [SPARK-19690][SS] Join a streaming DataFrame with a batch DataFrame which has an aggregation may not work Closes apache#17267 - [SPARK-19926][PYSPARK] Make pyspark exception more user-friendly Closes apache#17371 - [SPARK-19903][PYSPARK][SS] window operator miss the `watermark` metadata of time column Closes apache#17401 - [SPARK-18364][YARN] Expose metrics for YarnShuffleService Closes apache#17519 - [SPARK-15352][Doc] follow-up: add configuration docs for topology-aware block replication Closes apache#17530 - [SPARK-5158] Access kerberized HDFS from Spark standalone Closes apache#17854 - [SPARK-20564][Deploy] Reduce massive executor failures when executor count is large (>2000) Closes apache#17979 - [SPARK-19320][MESOS][WIP]allow specifying a hard limit on number of gpus required in each spark executor when running on mesos Closes apache#18127 - [SPARK-6628][SQL][Branch-2.1] Fix ClassCastException when executing sql statement 'insert into' on hbase table Closes apache#18236 - [SPARK-21015] Check field name is not null and empty in GenericRowWit… Closes apache#18269 - [SPARK-21056][SQL] Use at most one spark job to list files in InMemoryFileIndex Closes apache#18328 - [SPARK-21121][SQL] Support changing storage level via the spark.sql.inMemoryColumnarStorage.level variable Closes apache#18354 - [SPARK-18016][SQL][CATALYST][BRANCH-2.1] Code Generation: Constant Pool Limit - Class Splitting Closes apache#18383 - [SPARK-21167][SS] Set kafka clientId while fetch messages Closes apache#18414 - [SPARK-21169] [core] Make sure to update application status to RUNNING if executors are accepted and RUNNING after recovery Closes apache#18432 - resolve com.esotericsoftware.kryo.KryoException Closes apache#18490 - [SPARK-21269][Core][WIP] Fix FetchFailedException when enable maxReqSizeShuffleToMem and KryoSerializer Closes apache#18585 - SPARK-21359 Closes apache#18609 - Spark SQL merge small files to big files Update InsertIntoHiveTable.scala Added: Closes apache#18308 - [SPARK-21099][Spark Core] INFO Log Message Using Incorrect Executor I… Closes apache#18599 - [SPARK-21372] spark writes one log file even I set the number of spark_rotate_log to 0 Closes apache#18619 - [SPARK-21397][BUILD]Maven shade plugin adding dependency-reduced-pom.xml to … Closes apache#18667 - Fix the simpleString used in error messages Closes apache#18782 - Branch 2.1 Added: Closes apache#17694 - [SPARK-12717][PYSPARK] Resolving race condition with pyspark broadcasts when using multiple threads Added: Closes apache#16456 - [SPARK-18994] clean up the local directories for application in future by annother thread Closes apache#18683 - [SPARK-21474][CORE] Make number of parallel fetches from a reducer configurable Closes apache#18690 - [SPARK-21334][CORE] Add metrics reporting service to External Shuffle Server Added: Closes apache#18827 - Merge pull request 1 from apache/master ## How was this patch tested? N/A Author: hyukjinkwon <gurwls223@gmail.com> Closes apache#18780 from HyukjinKwon/close-prs.
What changes were proposed in this pull request?
In applications that use over 2000 executors, we noticed a large number of failed executors due to driver overloading with too many executor RPCs within a short period of time (for example, retrieve spark properties, executor registration). This patch adds an extra configuration spark.yarn.launchContainer.count.simultaneously, which caps the maximal number of containers that driver can ask for and launch in every spark.yarn.scheduler.heartbeat.interval-ms. As a result, the number of executors grows steadily. The number of executor failures is reduced and applications can reach the desired number of executors faster.
How was this patch tested?
A gentle ping to the contributors of YarnAllocator: @srowen @foxish @jinxing64 @squito
A JIRA is opened: https://issues.apache.org/jira/browse/SPARK-20564