-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-54312][CORE] Avoid repeatedly scheduling tasks for SendHeartbeat/WorkDirClean in standalone worker #53054
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…up after registratiion
|
cc @Ngone51 @LuciferYang @dongjoon-hyun can you please take a look? Thanks |
Ngone51
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! LGTM.
|
|
||
| private var registerMasterFutures: Array[JFuture[_]] = null | ||
| private var registrationRetryTimer: Option[JScheduledFuture[_]] = None | ||
| private[worker] var heartbeatTask: Option[JScheduledFuture[_]] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The identifier marked as [work] seems to serve the purpose of merely being callable within test cases, right? Given that the current WorkerSuite already has with PrivateMethodTester, can we adopt the approach of using invokePrivate for testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated.
| cleanupThreadExecutor.shutdownNow() | ||
| metricsSystem.report() | ||
| cancelLastRegistrationRetry() | ||
| heartbeatTask.foreach(_.cancel(true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The handleRegisterResponse is a synchronized code block. Don't the operations on heartbeatTask and workDirCleanupTask within onStop also require synchronized protection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The synchronized block was introduced by #9138 to avoid some race conditions in very early implementation with some async call back...
Looks like not be necessary now...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worker is a ThreadSafeRpcEndpoint already. The synchronized protection seems to be unnecessary today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, so can we remove that unnecessary synchronized in a separate pr ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will create a separate task to revisit the synchronized usage here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM for Apache Spark 4.1.0. Thank you, @ivoson , @Ngone51 , @LuciferYang .
Merged to master/4.1.
…at/WorkDirClean in standalone worker ### What changes were proposed in this pull request? Currently, [worker](https://github.com/apache/spark/blob/87b3b94232436528f88c9a7aa7ee70758b85a33a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L495) will schedule tasks forwarding `SendHeartbeat` and `WorkDirCleanup` while `handleRegisterResponse`. While worker registration could happen multiple times in case of heartbeat timeout/disconnected from master, in these cases the tasks would be scheduled multiple times. To fix the issue: - Adding `heartbeatTask` and `workDirCleanupTask` in worker to tell whether these tasks have been scheduled - `heartbeatTask` and `workDirCleanupTask` will be initialized after the 1st registration, and then skipped scheduling these tasks in later registration. - Cancel the task and reset `heartbeatTask` and `workDirCleanupTask` when worker stops. ### Why are the changes needed? Fix the issue repeatedly scheduling SendHeartbeat/WorkDirClean tasks after worker registration. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT added ### Was this patch authored or co-authored using generative AI tooling? No Closes #53054 from ivoson/duplicate-worker-heartbeat. Authored-by: Tengfei Huang <tengfei.h@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit d51b433) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…at/WorkDirClean in standalone worker ### What changes were proposed in this pull request? Currently, [worker](https://github.com/apache/spark/blob/87b3b94232436528f88c9a7aa7ee70758b85a33a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L495) will schedule tasks forwarding `SendHeartbeat` and `WorkDirCleanup` while `handleRegisterResponse`. While worker registration could happen multiple times in case of heartbeat timeout/disconnected from master, in these cases the tasks would be scheduled multiple times. To fix the issue: - Adding `heartbeatTask` and `workDirCleanupTask` in worker to tell whether these tasks have been scheduled - `heartbeatTask` and `workDirCleanupTask` will be initialized after the 1st registration, and then skipped scheduling these tasks in later registration. - Cancel the task and reset `heartbeatTask` and `workDirCleanupTask` when worker stops. ### Why are the changes needed? Fix the issue repeatedly scheduling SendHeartbeat/WorkDirClean tasks after worker registration. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT added ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53054 from ivoson/duplicate-worker-heartbeat. Authored-by: Tengfei Huang <tengfei.h@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
Currently, worker will schedule tasks forwarding
SendHeartbeatandWorkDirCleanupwhilehandleRegisterResponse.While worker registration could happen multiple times in case of heartbeat timeout/disconnected from master, in these cases the tasks would be scheduled multiple times.
To fix the issue:
heartbeatTaskandworkDirCleanupTaskin worker to tell whether these tasks have been scheduledheartbeatTaskandworkDirCleanupTaskwill be initialized after the 1st registration, and then skipped scheduling these tasks in later registration.heartbeatTaskandworkDirCleanupTaskwhen worker stops.Why are the changes needed?
Fix the issue repeatedly scheduling SendHeartbeat/WorkDirClean tasks after worker registration.
Does this PR introduce any user-facing change?
No
How was this patch tested?
UT added
Was this patch authored or co-authored using generative AI tooling?
No