diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index 6655e5a01ca88..2e415d1e31e17 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.Task; @@ -49,10 +51,12 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Properties; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -64,6 +68,7 @@ import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG; import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -144,7 +149,8 @@ public void setup() throws Exception { public void close() { // stop all Connect, Kafka and Zk threads. connect.stop(); - Block.resetBlockLatch(); + // unblock everything so that we don't leak threads after each test run + Block.reset(); } @Test @@ -382,14 +388,19 @@ private void assertRequestTimesOut(String requestDescription, ThrowingRunnable r connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS); } - private static class Block { - private static CountDownLatch blockLatch; + public static class Block { + // All latches that blocking connectors/tasks are or will be waiting on during a test case + private static final Set BLOCK_LATCHES = new HashSet<>(); + // All threads that are or were at one point blocked + private static final Set BLOCKED_THREADS = new HashSet<>(); + // The latch that can be used to wait for a connector/task to reach the most-recently-registered blocking point + private static CountDownLatch awaitBlockLatch; private final String block; public static final String BLOCK_CONFIG = "block"; - private static ConfigDef config() { + public static ConfigDef config() { return new ConfigDef() .define( BLOCK_CONFIG, @@ -401,31 +412,71 @@ private static ConfigDef config() { ); } + /** + * {@link CountDownLatch#await() Wait} for the connector/task to reach the point in its lifecycle where + * it will block. + */ public static void waitForBlock() throws InterruptedException, TimeoutException { + Timer timer = Time.SYSTEM.timer(CONNECTOR_BLOCK_TIMEOUT_MS); + + CountDownLatch awaitBlockLatch; synchronized (Block.class) { - if (blockLatch == null) { - throw new IllegalArgumentException("No connector has been created yet"); + while (Block.awaitBlockLatch == null) { + timer.update(); + if (timer.isExpired()) { + throw new TimeoutException("Timed out waiting for connector to block."); + } + Block.class.wait(timer.remainingMs()); } + awaitBlockLatch = Block.awaitBlockLatch; } log.debug("Waiting for connector to block"); - if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { + timer.update(); + if (!awaitBlockLatch.await(timer.remainingMs(), TimeUnit.MILLISECONDS)) { throw new TimeoutException("Timed out waiting for connector to block."); } log.debug("Connector should now be blocked"); } - // Note that there is only ever at most one global block latch at a time, which makes tests that + /** + * {@link CountDownLatch#countDown() Release} any latches allocated over the course of a test + * to either await a connector/task reaching a blocking point, or cause a connector/task to block. + */ + public static synchronized void reset() { + resetAwaitBlockLatch(); + BLOCK_LATCHES.forEach(CountDownLatch::countDown); + BLOCK_LATCHES.clear(); + BLOCKED_THREADS.forEach(t -> { + try { + t.join(30_000); + if (t.isAlive()) { + log.warn("Thread {} failed to finish in time", t); + } + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while waiting for blocked thread " + t + " to finish"); + } + }); + BLOCKED_THREADS.clear(); + } + + // Note that there is only ever at most one global await-block latch at a time, which makes tests that // use blocks in multiple places impossible. If necessary, this can be addressed in the future by - // adding support for multiple block latches at a time, possibly identifiable by a connector/task + // adding support for multiple await-block latches at a time, possibly identifiable by a connector/task // ID, the location of the expected block, or both. - public static void resetBlockLatch() { + private static synchronized void resetAwaitBlockLatch() { + if (awaitBlockLatch != null) { + awaitBlockLatch.countDown(); + awaitBlockLatch = null; + } + } + + private static CountDownLatch newBlockLatch() { + CountDownLatch result = new CountDownLatch(1); synchronized (Block.class) { - if (blockLatch != null) { - blockLatch.countDown(); - blockLatch = null; - } + BLOCK_LATCHES.add(result); } + return result; } public Block(Map props) { @@ -434,23 +485,35 @@ public Block(Map props) { public Block(String block) { this.block = block; - synchronized (Block.class) { - if (blockLatch != null) { - blockLatch.countDown(); + if (block != null) { + synchronized (Block.class) { + resetAwaitBlockLatch(); + awaitBlockLatch = new CountDownLatch(1); + Block.class.notify(); } - blockLatch = new CountDownLatch(1); } } public void maybeBlockOn(String block) { if (block.equals(this.block)) { log.info("Will block on {}", block); - blockLatch.countDown(); + CountDownLatch blockLatch; + synchronized (Block.class) { + assertNotNull( + "Block was reset prematurely", + awaitBlockLatch + ); + awaitBlockLatch.countDown(); + blockLatch = newBlockLatch(); + BLOCKED_THREADS.add(Thread.currentThread()); + } while (true) { try { - Thread.sleep(Long.MAX_VALUE); + blockLatch.await(); + log.debug("Instructed to stop blocking; will resume normal execution"); + return; } catch (InterruptedException e) { - // No-op. Just keep blocking. + log.debug("Interrupted while blocking; will continue blocking until instructed to stop"); } } } else { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java index cc92effb778a1..f4c79ab44a3ac 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java @@ -468,6 +468,8 @@ public void testAlterSinkConnectorOffsetsZombieSinkTasks() throws Exception { ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.alterConnectorOffsets(connectorName, new ConnectorOffsets(offsetsToAlter))); assertThat(e.getMessage(), containsString("zombie sink task")); + + BlockingConnectorTest.Block.reset(); } @Test @@ -807,6 +809,8 @@ public void testResetSinkConnectorOffsetsZombieSinkTasks() throws Exception { // Try to reset the offsets ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorOffsets(connectorName)); assertThat(e.getMessage(), containsString("zombie sink task")); + + BlockingConnectorTest.Block.reset(); } @Test