-
Notifications
You must be signed in to change notification settings - Fork 14.9k
MINOR: Stop leaking threads in BlockingConnectorTest #12290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@kkonstantine can you please review this? |
|
@ijuma it appears that @kkonstantine is busy at the moment. @cadonna would you be willing to take a look, since you were previously in the neighborhood? This change is much less involved but touches on the same |
|
@C0urante It's a bit difficult to review since there is no explanation on what each latch does. If you add some explanatory comments to the code, I can try to review it. |
ddd55b2 to
f936f46
Compare
|
Thanks @ijuma, I've updated the PR with some more details on the purpose of each latch type, and addressed a small bug that would have caused connectors/tasks to become unblocked incorrectly during testing. |
|
@tombentley @viktorsomogyi if you have a moment, would you mind taking a look? Thanks! |
viktorsomogyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@C0urante I'm not that familiar (yet) with this but I hope the comments make (some) sense 🙂
| while (true) { | ||
| try { | ||
| Thread.sleep(Long.MAX_VALUE); | ||
| blockLatch.await(); | ||
| log.debug("Instructed to stop blocking; will resume normal execution"); | ||
| return; | ||
| } catch (InterruptedException e) { | ||
| // No-op. Just keep blocking. | ||
| log.debug("Interrupted while blocking; will continue blocking until instructed to stop"); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this while block prevent the normal shutdown of connect based on the order in BlockingConnectorTest (you call connect.stop and then Block.reset)? For instance the way Worker is shutting down they expect WorkerConnectors to respond to an interrupt.
Code reference:
kafka/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
Line 267 in 6f7682d
| ThreadUtils.shutdownExecutorServiceQuietly(executor, EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sort of--this prevents the Worker class's executor field from shutting down gracefully (i.e., when we invoke awaitTermination on it in ThreadUtils::shutdownExecutorServiceQuietly), but it doesn't prevent the Connect worker from shutting down, since we put a bound on how long we wait for the executor to shut down before moving on.
This is why the tests on trunk (which also have this kind of while (true) loop to simulate connector/task blocks) don't hang.
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
Show resolved
Hide resolved
| connect.stop(); | ||
| Block.resetBlockLatch(); | ||
| // unblock everything so that we don't leak threads after each test run | ||
| Block.reset(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about resetting before stopping the workers, to allow a normal shutdown to happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be valuable to ensure that workers can shut down gracefully under these circumstances. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that should take place in a test then, not in the cleanup.
The reason I bring this up is that if I were to assert that the clients/threads are all stopped immediately after Block.reset() (as implemented in #14783) there's no synchronization to ensure that cleanup takes place before the assertion fires. The "asynchronous cleanup" initiated by Block.reset could exceed the lifetime of the test, still leaking the threads but only temporarily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that should take place in a test then, not in the cleanup.
It's easier to handle in a single method rather than copy over to 11 test cases, though. And I also don't necessarily see why @After-annotated classes need to be used exclusively for cleanup.
The concern about threads leaking (even if for a short period) beyond the scope of the test definitely seems valid. I've pushed a tweak that adds logic to wait for the blocked threads to complete in Block::reset. LMK if this seems clean enough; if not, I can bite the bullet and reverse the order of operations in BlockingConnectorTest::close and then see about adding more explicit checks for graceful worker shutdown in other places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's easier to handle in a single method rather than copy over to 11 test cases
Oh I see, I thought it would be sufficient to add one test case that called stop() to verify that one type of blocked thread still allows shutdown to complete, rather than verifying it for all of the different ways of blocking threads. That would have less coverage that the current tests on trunk.
I've pushed a tweak that adds logic to wait for the blocked threads to complete in Block::reset.
I think this is probably the better solution. The leak tester can separately verify that resources were closed properly now that the test ensures the threads stop. 👍
| @@ -350,13 +353,16 @@ private void assertRequestTimesOut(String requestDescription, ThrowingRunnable r | |||
| } | |||
|
|
|||
| private static class Block { | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you make this public to allow OffsetsApiIntegrationTest to use the latch?
and do you think that maybe these connectors should be moved out of this test to a common reusable class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you make this public to allow OffsetsApiIntegrationTest to use the latch?
Yep, done 👍
and do you think that maybe these connectors should be moved out of this test to a common reusable class?
I do think this would be cleaner, but it'd fairly involved. Do you think it's alright to merge this as-is without blocking on that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it's alright to merge this as-is without blocking on that?
Yep not a blocker.
| } | ||
|
|
||
| if (awaitBlockLatch == null) { | ||
| throw new IllegalArgumentException("No connector has been created yet"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this an opportunity for a flaky failure, if the test thread advances before the connector is created. It seems very rare, I don't see any instances on the Gradle dashboard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point--given that we're not guaranteed that, e.g., Connector::start has been invoked after a REST request to create a connector has returned, this does seem like a chance for a flaky failure.
I've tweaked this part to handle the case when awaitBlockLatch is null gracefully, without risking blocking forever.
| log.debug("Waiting for connector to block"); | ||
| if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { | ||
| if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { | ||
| throw new TimeoutException("Timed out waiting for connector to block."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since scanning creates connector instances, and validation caches the connector instance, how do you ensure that the right awaitBlockLatch is being waited on here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now it's a matter of writing test code carefully, with the assumption that if any connector or task instance has hit the block, it's the one we're interested in. So far I believe this holds for all the tests; let me know if you've found any exceptions, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you add this stanza before the log.debug("Connector should now be blocked") the tests still pass:
boolean retry;
synchronized (Block.class) {
retry = Block.awaitBlockLatch != null && Block.awaitBlockLatch != awaitBlockLatch;
}
if (retry) {
log.debug("New blocking instance was created, retrying wait");
waitForBlock();
}
For me, I see this being printed in:
- testBlockInSinkTaskStart
- testBlockInConnectorStart
- testWorkerRestartWithBlockInConnectorStart
- testBlockInSourceTaskStart
- testBlockInConnectorInitialize
This leads me to believe that this function is normally exiting before the blocking method of the last-instantiated instance happens.
I don't immediately see how this could cause flakiness, but it's at least an instance of the method not doing what it says it does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about a Block.prepare() called before the test starts that creates the awaitBlockLatch, instead of having the Block constructor initialize it? That could eliminate the wait-notify mechanism, since only one thread (the test thread) would be responsible for setting/clearing the awaitBlockLatch.
edit: Would this also allow you to block in methods used during plugin scanning, if you only started blocking if the asyncBlockLatch had been prepared first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe these changes are necessary here since the portions they address are not affected by the PR. If you would like to do this cleanup in a separate PR, I'd be happy to review.
f936f46 to
ef89fdb
Compare
| blockLatch.countDown(); | ||
| CountDownLatch blockLatch; | ||
| synchronized (Block.class) { | ||
| awaitBlockLatch.countDown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: small NPE here under this sequence of calls:
- new Block(s)
- Block.reset()
- block.maybeBlockOn(s)
… purposes of each latch type
- Reset block latch in OffsetsApiIntegrationTest cases that use the blocking connector - Harden Block::waitForBlock against possible race condition caused by connector creation latency
4e9e7bc to
a2fa402
Compare
Reviewers: Kvicii <kvicii.yu@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Greg Harris <greg.harris@aiven.io>
Reviewers: Kvicii <kvicii.yu@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Greg Harris <greg.harris@aiven.io>
Reviewers: Kvicii <kvicii.yu@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Greg Harris <greg.harris@aiven.io>
Reviewers: Kvicii <kvicii.yu@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Greg Harris <greg.harris@aiven.io>
Reviewers: Kvicii <kvicii.yu@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Greg Harris <greg.harris@aiven.io>
Reviewers: Kvicii <kvicii.yu@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Greg Harris <greg.harris@aiven.io>
These tests currently create threads that block forever until the JVM is shut down. This change unblocks those threads once their respective test cases are finished.
This is valuable not only for general code hygiene and resource utilization, but also for laying the groundwork for reusing an embedded Connect cluster across each of these test cases, which would drastically reduce test time. That's left for a follow-up PR, though.
Committer Checklist (excluded from commit message)