Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
Expand Down Expand Up @@ -49,10 +51,12 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -64,6 +68,7 @@
import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -144,7 +149,8 @@ public void setup() throws Exception {
public void close() {
// stop all Connect, Kafka and Zk threads.
connect.stop();
Block.resetBlockLatch();
// unblock everything so that we don't leak threads after each test run
Block.reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about resetting before stopping the workers, to allow a normal shutdown to happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be valuable to ensure that workers can shut down gracefully under these circumstances. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that should take place in a test then, not in the cleanup.

The reason I bring this up is that if I were to assert that the clients/threads are all stopped immediately after Block.reset() (as implemented in #14783) there's no synchronization to ensure that cleanup takes place before the assertion fires. The "asynchronous cleanup" initiated by Block.reset could exceed the lifetime of the test, still leaking the threads but only temporarily.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that should take place in a test then, not in the cleanup.

It's easier to handle in a single method rather than copy over to 11 test cases, though. And I also don't necessarily see why @After-annotated classes need to be used exclusively for cleanup.

The concern about threads leaking (even if for a short period) beyond the scope of the test definitely seems valid. I've pushed a tweak that adds logic to wait for the blocked threads to complete in Block::reset. LMK if this seems clean enough; if not, I can bite the bullet and reverse the order of operations in BlockingConnectorTest::close and then see about adding more explicit checks for graceful worker shutdown in other places.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's easier to handle in a single method rather than copy over to 11 test cases

Oh I see, I thought it would be sufficient to add one test case that called stop() to verify that one type of blocked thread still allows shutdown to complete, rather than verifying it for all of the different ways of blocking threads. That would have less coverage that the current tests on trunk.

I've pushed a tweak that adds logic to wait for the blocked threads to complete in Block::reset.

I think this is probably the better solution. The leak tester can separately verify that resources were closed properly now that the test ensures the threads stop. 👍

}

@Test
Expand Down Expand Up @@ -382,14 +388,19 @@ private void assertRequestTimesOut(String requestDescription, ThrowingRunnable r
connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
}

private static class Block {
private static CountDownLatch blockLatch;
public static class Block {
// All latches that blocking connectors/tasks are or will be waiting on during a test case
private static final Set<CountDownLatch> BLOCK_LATCHES = new HashSet<>();
// All threads that are or were at one point blocked
private static final Set<Thread> BLOCKED_THREADS = new HashSet<>();
// The latch that can be used to wait for a connector/task to reach the most-recently-registered blocking point
private static CountDownLatch awaitBlockLatch;

private final String block;

public static final String BLOCK_CONFIG = "block";

private static ConfigDef config() {
public static ConfigDef config() {
return new ConfigDef()
.define(
BLOCK_CONFIG,
Expand All @@ -401,31 +412,71 @@ private static ConfigDef config() {
);
}

/**
* {@link CountDownLatch#await() Wait} for the connector/task to reach the point in its lifecycle where
* it will block.
*/
public static void waitForBlock() throws InterruptedException, TimeoutException {
Timer timer = Time.SYSTEM.timer(CONNECTOR_BLOCK_TIMEOUT_MS);

CountDownLatch awaitBlockLatch;
synchronized (Block.class) {
if (blockLatch == null) {
throw new IllegalArgumentException("No connector has been created yet");
while (Block.awaitBlockLatch == null) {
timer.update();
if (timer.isExpired()) {
throw new TimeoutException("Timed out waiting for connector to block.");
}
Block.class.wait(timer.remainingMs());
}
awaitBlockLatch = Block.awaitBlockLatch;
}

log.debug("Waiting for connector to block");
if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
timer.update();
if (!awaitBlockLatch.await(timer.remainingMs(), TimeUnit.MILLISECONDS)) {
throw new TimeoutException("Timed out waiting for connector to block.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since scanning creates connector instances, and validation caches the connector instance, how do you ensure that the right awaitBlockLatch is being waited on here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now it's a matter of writing test code carefully, with the assumption that if any connector or task instance has hit the block, it's the one we're interested in. So far I believe this holds for all the tests; let me know if you've found any exceptions, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you add this stanza before the log.debug("Connector should now be blocked") the tests still pass:

boolean retry;
synchronized (Block.class) {
    retry = Block.awaitBlockLatch != null && Block.awaitBlockLatch != awaitBlockLatch;
}
if (retry) {
    log.debug("New blocking instance was created, retrying wait");
    waitForBlock();
}

For me, I see this being printed in:

  • testBlockInSinkTaskStart
  • testBlockInConnectorStart
  • testWorkerRestartWithBlockInConnectorStart
  • testBlockInSourceTaskStart
  • testBlockInConnectorInitialize

This leads me to believe that this function is normally exiting before the blocking method of the last-instantiated instance happens.

I don't immediately see how this could cause flakiness, but it's at least an instance of the method not doing what it says it does.

Copy link
Contributor

@gharris1727 gharris1727 Dec 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about a Block.prepare() called before the test starts that creates the awaitBlockLatch, instead of having the Block constructor initialize it? That could eliminate the wait-notify mechanism, since only one thread (the test thread) would be responsible for setting/clearing the awaitBlockLatch.

edit: Would this also allow you to block in methods used during plugin scanning, if you only started blocking if the asyncBlockLatch had been prepared first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe these changes are necessary here since the portions they address are not affected by the PR. If you would like to do this cleanup in a separate PR, I'd be happy to review.

}
log.debug("Connector should now be blocked");
}

// Note that there is only ever at most one global block latch at a time, which makes tests that
/**
* {@link CountDownLatch#countDown() Release} any latches allocated over the course of a test
* to either await a connector/task reaching a blocking point, or cause a connector/task to block.
*/
public static synchronized void reset() {
resetAwaitBlockLatch();
BLOCK_LATCHES.forEach(CountDownLatch::countDown);
BLOCK_LATCHES.clear();
BLOCKED_THREADS.forEach(t -> {
try {
t.join(30_000);
if (t.isAlive()) {
log.warn("Thread {} failed to finish in time", t);
}
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while waiting for blocked thread " + t + " to finish");
}
});
BLOCKED_THREADS.clear();
}

// Note that there is only ever at most one global await-block latch at a time, which makes tests that
// use blocks in multiple places impossible. If necessary, this can be addressed in the future by
// adding support for multiple block latches at a time, possibly identifiable by a connector/task
// adding support for multiple await-block latches at a time, possibly identifiable by a connector/task
// ID, the location of the expected block, or both.
public static void resetBlockLatch() {
private static synchronized void resetAwaitBlockLatch() {
if (awaitBlockLatch != null) {
awaitBlockLatch.countDown();
awaitBlockLatch = null;
}
}

private static CountDownLatch newBlockLatch() {
CountDownLatch result = new CountDownLatch(1);
synchronized (Block.class) {
if (blockLatch != null) {
blockLatch.countDown();
blockLatch = null;
}
BLOCK_LATCHES.add(result);
}
return result;
}

public Block(Map<String, String> props) {
Expand All @@ -434,23 +485,35 @@ public Block(Map<String, String> props) {

public Block(String block) {
this.block = block;
synchronized (Block.class) {
if (blockLatch != null) {
blockLatch.countDown();
if (block != null) {
synchronized (Block.class) {
resetAwaitBlockLatch();
awaitBlockLatch = new CountDownLatch(1);
Block.class.notify();
}
blockLatch = new CountDownLatch(1);
}
}

public void maybeBlockOn(String block) {
if (block.equals(this.block)) {
log.info("Will block on {}", block);
blockLatch.countDown();
CountDownLatch blockLatch;
synchronized (Block.class) {
assertNotNull(
"Block was reset prematurely",
awaitBlockLatch
);
awaitBlockLatch.countDown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: small NPE here under this sequence of calls:

  1. new Block(s)
  2. Block.reset()
  3. block.maybeBlockOn(s)

blockLatch = newBlockLatch();
BLOCKED_THREADS.add(Thread.currentThread());
}
while (true) {
try {
Thread.sleep(Long.MAX_VALUE);
blockLatch.await();
log.debug("Instructed to stop blocking; will resume normal execution");
return;
} catch (InterruptedException e) {
// No-op. Just keep blocking.
log.debug("Interrupted while blocking; will continue blocking until instructed to stop");
}
}
Comment on lines 510 to 518
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this while block prevent the normal shutdown of connect based on the order in BlockingConnectorTest (you call connect.stop and then Block.reset)? For instance the way Worker is shutting down they expect WorkerConnectors to respond to an interrupt.

Code reference:

ThreadUtils.shutdownExecutorServiceQuietly(executor, EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sort of--this prevents the Worker class's executor field from shutting down gracefully (i.e., when we invoke awaitTermination on it in ThreadUtils::shutdownExecutorServiceQuietly), but it doesn't prevent the Connect worker from shutting down, since we put a bound on how long we wait for the executor to shut down before moving on.

This is why the tests on trunk (which also have this kind of while (true) loop to simulate connector/task blocks) don't hang.

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,8 @@ public void testAlterSinkConnectorOffsetsZombieSinkTasks() throws Exception {
ConnectRestException e = assertThrows(ConnectRestException.class,
() -> connect.alterConnectorOffsets(connectorName, new ConnectorOffsets(offsetsToAlter)));
assertThat(e.getMessage(), containsString("zombie sink task"));

BlockingConnectorTest.Block.reset();
}

@Test
Expand Down Expand Up @@ -807,6 +809,8 @@ public void testResetSinkConnectorOffsetsZombieSinkTasks() throws Exception {
// Try to reset the offsets
ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorOffsets(connectorName));
assertThat(e.getMessage(), containsString("zombie sink task"));

BlockingConnectorTest.Block.reset();
}

@Test
Expand Down