-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25231] : Fix synchronization of executor heartbeat receiver in TaskSchedulerImpl #22221
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ize on the TaskSchedulerImpl object The main reason for the heartbeat timeouts was that the heartbeat-receiver-event-loop-thread was blocked waiting on the TaskSchedulerImpl object which was being held by one of the dispatcher-event-loop threads executing the method dequeueSpeculativeTasks() in TaskSetManager.scala. On further analysis of the heartbeat receiver method, it turns out there is no need to hold the lock on the whole object. The block of code in the method only uses one global HashMap taskIdToTaskSetManager. Making that map a ConcurrentHashMap, we are ensuring atomicity of operations and speeding up the heartbeat receiver thread operation.
|
ok to test |
|
Test build #95216 has finished for PR 22221 at commit
|
| synchronized { | ||
| try { | ||
| taskIdToTaskSetManager.get(tid) match { | ||
| Option(taskIdToTaskSetManager.get(tid)) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I like this direction.
I have a question about the change of semantics. By removing synchronization at accumUpdatesWithTaskIds(), a pair of operations in this synchronized get() and remove() in cleanupTaskState() is not atomic regarding get.
Is this change ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It think your question is about the get() and remove() inside cleanupTaskState all being inside a single synchronize block, correct?
I don't see that as being a problem here since taskIdToTaskSetManager is a concurrentHashMap. That protects the operations from being atomic and if you do a remove on an object that isn't there then it does nothing. There is no other code that removes from there so I don't think that can happen anyway. With this change the only thing outside of a synchronize block is a get in accumUpdatesWithTaskIds which will be harmless if it had been removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConcurrentHashMap make each operation like get(), remove(), and others. Thus, I reviewed places more than one operations are within a synchronized. The place is here.
When we apply this PR, the get in accumUpdatesWithTaskIds can be executed between get() and remove(). My question is like a confirmation whether it is safe or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes as far as I can see its safe. If the get happened before its removed it calculates the accumulators, if its after its removed it just gets an empty array back. This isn't any different then when it was synchronized. There is nothing in the statusUpdate between the get and call to cleanupTaskState where it removes that I see depends on accumulators or anything else.
|
We would appreciate it if you would describe some detail on
|
|
@kiszk I have added some more details in the PR description. Thank you. |
| accumUpdates.flatMap { case (id, updates) => | ||
| val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None)) | ||
| taskIdToTaskSetManager.get(id).map { taskSetMgr => | ||
| Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just leave a small concern here, original code locked hole scope of ids in accumUpdates, after this changing, maybe some id could be found originally but can't find now, because taskIdToTaskSetManager can be changed by removeExecutor or statusUpdate. Its not big problem if executor has been removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this could happen, but it shouldn't cause issues because before this change the executor could have been removed right before this function was called (its all timing dependent), so that does not change this functionality. This is only to update accumulators for running tasks. If the tasks had finished then the accumulator updates would have been processed via the task end events.
xuanyuanking
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Want to give a +1 for this, leave small concern in code. The screen shot is very helpful to know the scenario.
|
@pgandhi999 thank you for your comments. |
|
+1 |
…askSchedulerImpl Running a large Spark job with speculation turned on was causing executor heartbeats to time out on the driver end after sometime and eventually, after hitting the max number of executor failures, the job would fail. ## What changes were proposed in this pull request? The main reason for the heartbeat timeouts was that the heartbeat-receiver-event-loop-thread was blocked waiting on the TaskSchedulerImpl object which was being held by one of the dispatcher-event-loop threads executing the method dequeueSpeculativeTasks() in TaskSetManager.scala. On further analysis of the heartbeat receiver method executorHeartbeatReceived() in TaskSchedulerImpl class, we found out that instead of waiting to acquire the lock on the TaskSchedulerImpl object, we can remove that lock and make the operations to the global variables inside the code block to be atomic. The block of code in that method only uses one global HashMap taskIdToTaskSetManager. Making that map a ConcurrentHashMap, we are ensuring atomicity of operations and speeding up the heartbeat receiver thread operation. ## How was this patch tested? Screenshots of the thread dump have been attached below: **heartbeat-receiver-event-loop-thread:** <img width="1409" alt="screen shot 2018-08-24 at 9 19 57 am" src="https://user-images.githubusercontent.com/22228190/44593413-e25df780-a788-11e8-9520-176a18401a59.png"> **dispatcher-event-loop-thread:** <img width="1409" alt="screen shot 2018-08-24 at 9 21 56 am" src="https://user-images.githubusercontent.com/22228190/44593484-13d6c300-a789-11e8-8d88-34b1d51d4541.png"> Closes #22221 from pgandhi999/SPARK-25231. Authored-by: pgandhi <pgandhi@oath.com> Signed-off-by: Thomas Graves <tgraves@apache.org> (cherry picked from commit 559b899) Signed-off-by: Thomas Graves <tgraves@apache.org>
|
merged to master and 2.3, thanks @pgandhi999 |
…askSchedulerImpl Running a large Spark job with speculation turned on was causing executor heartbeats to time out on the driver end after sometime and eventually, after hitting the max number of executor failures, the job would fail. The main reason for the heartbeat timeouts was that the heartbeat-receiver-event-loop-thread was blocked waiting on the TaskSchedulerImpl object which was being held by one of the dispatcher-event-loop threads executing the method dequeueSpeculativeTasks() in TaskSetManager.scala. On further analysis of the heartbeat receiver method executorHeartbeatReceived() in TaskSchedulerImpl class, we found out that instead of waiting to acquire the lock on the TaskSchedulerImpl object, we can remove that lock and make the operations to the global variables inside the code block to be atomic. The block of code in that method only uses one global HashMap taskIdToTaskSetManager. Making that map a ConcurrentHashMap, we are ensuring atomicity of operations and speeding up the heartbeat receiver thread operation. Screenshots of the thread dump have been attached below: **heartbeat-receiver-event-loop-thread:** <img width="1409" alt="screen shot 2018-08-24 at 9 19 57 am" src="https://user-images.githubusercontent.com/22228190/44593413-e25df780-a788-11e8-9520-176a18401a59.png"> **dispatcher-event-loop-thread:** <img width="1409" alt="screen shot 2018-08-24 at 9 21 56 am" src="https://user-images.githubusercontent.com/22228190/44593484-13d6c300-a789-11e8-8d88-34b1d51d4541.png"> Closes apache#22221 from pgandhi999/SPARK-25231. Authored-by: pgandhi <pgandhi@oath.com> Signed-off-by: Thomas Graves <tgraves@apache.org> (cherry picked from commit 559b899) Ref: LIHADOOP-41272 RB=1446840 BUG=LIHADOOP-41272 G=superfriends-reviewers R=fli,mshen,yezhou,edlu A=fli
Running a large Spark job with speculation turned on was causing executor heartbeats to time out on the driver end after sometime and eventually, after hitting the max number of executor failures, the job would fail.
What changes were proposed in this pull request?
The main reason for the heartbeat timeouts was that the heartbeat-receiver-event-loop-thread was blocked waiting on the TaskSchedulerImpl object which was being held by one of the dispatcher-event-loop threads executing the method dequeueSpeculativeTasks() in TaskSetManager.scala. On further analysis of the heartbeat receiver method executorHeartbeatReceived() in TaskSchedulerImpl class, we found out that instead of waiting to acquire the lock on the TaskSchedulerImpl object, we can remove that lock and make the operations to the global variables inside the code block to be atomic. The block of code in that method only uses one global HashMap taskIdToTaskSetManager. Making that map a ConcurrentHashMap, we are ensuring atomicity of operations and speeding up the heartbeat receiver thread operation.
How was this patch tested?
Screenshots of the thread dump have been attached below:
heartbeat-receiver-event-loop-thread:
dispatcher-event-loop-thread: