-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20461][Core][SS]Use UninterruptibleThread for Executor and fix the potential hang in CachedKafkaConsumer #17761
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…CachedKafkaConsumer
| // Use UninterruptibleThread to run tasks so that we can allow running codes without being | ||
| // interrupted by `Thread.interrupt()`. Some issues, such as KAFKA-1894, HADOOP-10622, | ||
| // will hang forever if some methods are interrupted. | ||
| new UninterruptibleThread(r, "unused") // thread name will be set by ThreadFactoryBuilder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly of the codes are copied from ThreadUtils. This one is the only difference that matters.
|
|
||
| case class AvailableOffsetRange(earliest: Long, latest: Long) | ||
|
|
||
| private def runUninterruptiblyIfPossiable[T](body: => T): T = Thread.currentThread match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possiable -> Possible
|
|
||
| case class AvailableOffsetRange(earliest: Long, latest: Long) | ||
|
|
||
| private def runUninterruptiblyIfPossiable[T](body: => T): T = Thread.currentThread match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename runUninterruptiblyIfPossible
|
Test build #76144 has finished for PR 17761 at commit
|
|
Test build #76145 has finished for PR 17761 at commit
|
|
Task interruption is part of our API iirc - kill with interrupt = true. Also, IIRC @JoshRosen had done a bunch of work regarding task reaper, etc - which should get affected by this. |
|
@mridulm this only affect codes calling |
|
@zsxwing Got it, thanks for clarifying. |
|
LGTM. Merging this to master and 2.2 |
…x the potential hang in CachedKafkaConsumer ## What changes were proposed in this pull request? This PR changes Executor's threads to `UninterruptibleThread` so that we can use `runUninterruptibly` in `CachedKafkaConsumer`. However, this is just best effort to avoid hanging forever. If the user uses`CachedKafkaConsumer` in another thread (e.g., create a new thread or Future), the potential hang may still happen. ## How was this patch tested? The new added test. Author: Shixiong Zhu <shixiong@databricks.com> Closes #17761 from zsxwing/int. (cherry picked from commit 01c999e) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
What changes were proposed in this pull request?
This PR changes Executor's threads to
UninterruptibleThreadso that we can userunUninterruptiblyinCachedKafkaConsumer. However, this is just best effort to avoid hanging forever. If the user usesCachedKafkaConsumerin another thread (e.g., create a new thread or Future), the potential hang may still happen.How was this patch tested?
The new added test.