-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-15827: Prevent KafkaBasedLog subclasses from leaking passed-in clients #14763
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15827: Prevent KafkaBasedLog subclasses from leaking passed-in clients #14763
Conversation
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
|
Can we add a comment to the class explaining why we're explicitly closing the clients even though it also looks like they're closed in the superclass's Also, is it worth adding a test (or augmenting one or more existing test cases) for this? |
Signed-off-by: Greg Harris <greg.harris@aiven.io>
…opicPartition> Signed-off-by: Greg Harris <greg.harris@aiven.io>
I added a test to KafkaBasedLogTest, but found that it was difficult to set up the same test for the duplicate implementation in OffsetSyncStore. Instead, I eliminated the OffsetSyncStore implementation because i felt it duplicated the withExistingClients method. I think we had some discussion about why adding a new constructor to KafkaBasedLog was not viable, but would you consider changing the signature of the withExistingClients method? |
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
Outdated
Show resolved
Hide resolved
C0urante
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks Greg! Left one nit, feel free to address or ignore at your discretion.
I think it's fine to tweak the withExistingClients factory method. Hopefully nobody's relying on that in their connectors, and if they are, maybe it's time for a KIP where we officially declare part or all of this class as public API and have to start making future changes in other places or only after follow-up KIPs.
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
|
Hi @C0urante I found another very similar problem, where InternalTopicsIntegrationTest was causing us to leak clients. There, the "bad topics" cause startServices to throw an exception, preventing stopServices from being called in halt(). I implemented nearly the same fix, by adding a stopServices call to the DistributedHerder::stop method, similar to what is already happening in the StandaloneHerder. PTAL thanks! |
| ThreadUtils.shutdownExecutorServiceQuietly(herderExecutor, herderExecutorTimeoutMs(), TimeUnit.MILLISECONDS); | ||
| ThreadUtils.shutdownExecutorServiceQuietly(forwardRequestExecutor, FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS); | ||
| ThreadUtils.shutdownExecutorServiceQuietly(startAndStopExecutor, START_AND_STOP_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS); | ||
| stopServices(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would putting stopServices in the finally block in halt achieve the same thing? I'm a little worried about this exacerbating ungraceful shutdowns in scenarios other than the one we're trying to address with this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would putting stopServices in the finally block in halt achieve the same thing?
Do you mean finally in DistributedHerder#run? halt is never called if startServices throws.
I considered putting stopServices in catch in DistributedHerder#run, but I saw that stopServices was already called in StandaloneHerder#stop and followed the same pattern.
I'm a little worried about this exacerbating ungraceful shutdowns in scenarios other than the one we're trying to address with this change.
For ungraceful shutdowns that end with the herder thread throwing an exception, it should still call exit and kill the process, without ever running this code.
For ungraceful shutdowns that don't throw an exception, i suppose the herder could block and ThreadUtils.shutdownExecutorServiceQuietly(herderExecutor, ...) would time out, and this stopServices would close the KafkaBasedLogs underneath the running herder, which could be bad.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean
finallyinDistributedHerder#run?haltis never called ifstartServicesthrows.
Ugh sorry, yes. This is my reward for rushing a review before I close the laptop for the day!
For ungraceful shutdowns that don't throw an exception, i suppose the herder could block and
ThreadUtils.shutdownExecutorServiceQuietly(herderExecutor, ...)would time out, and this stopServices would close the KafkaBasedLogs underneath the running herder, which could be bad.
Yeah, this is what I'm afraid of.
Alternatives also include wrapping startServices with try/catch logic to automatically invoke stopServices if anything goes wrong (and then proceed to throw the original exception).
Also, looking at this once more--doesn't the herderMetrics field never get closed in the same scenario that we're addressing with this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, this is starting to feel similar to the problem touched on in #11608. Generally, both involve failure to clean up resources when operations fail in the same scope that the resources were initialized in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this to move the stopServices from stop() to the catch block in run. This is because finally doesn't execute if exit() is called in the catch block, which is typical when in production.
I moved the stop/close for all of the non-started resources to closeResources, which is always called even if stopServices() throws. Before herderMetrics and member were only closed on the halt() happy-path, now they are called if an exception kills the herder thread too.
Signed-off-by: Greg Harris <greg.harris@aiven.io>
C0urante
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Greg, the approach here looks really clean. There's one failing unit test, DistributedHerderTest.testHaltCleansUpWorker, which doesn't like that the happy path for DistributedHerder::halt invokes WorkerGroupMember::stop twice. That behavior feels a little strange, but since WorkerGroupMember::stop is idempotent (as long as it's invoked from the same thread), I don't think we have to block on changing it, so feel free to just tweak the tests with atLeastOnce() and possibly a comment on idempotency for WorkerGroupMember::stop if that's preferable.
LGTM as long as the failing unit test is fixed and the rest of the CI build looks good 👍
Signed-off-by: Greg Harris <greg.harris@aiven.io>
|
Test failures appear unrelated, and the runtime tests pass locally for me. |
…clients (apache#14763) Signed-off-by: Greg Harris <greg.harris@aiven.io> Reviewers: Chris Egerton <chrise@aiven.io>
…clients (apache#14763) Signed-off-by: Greg Harris <greg.harris@aiven.io> Reviewers: Chris Egerton <chrise@aiven.io>
…clients (apache#14763) Signed-off-by: Greg Harris <greg.harris@aiven.io> Reviewers: Chris Egerton <chrise@aiven.io>
…clients (apache#14763) Signed-off-by: Greg Harris <greg.harris@aiven.io> Reviewers: Chris Egerton <chrise@aiven.io>
The KafkaBasedLog normally creates clients during start() and closes them in stop().
Some KafkaBasedLog subclasses accept already-created clients, and close them in stop() if start() is called first.
These clients should also be closed if stop() is called without first calling start().
Committer Checklist (excluded from commit message)