From 0a5c63f5f634fb07b912a3f95322d4e740d256fb Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Sat, 21 May 2022 10:33:20 -0400 Subject: [PATCH 1/5] KAFKA-12657: Stop leaking threads in BlockingConnectorTest --- .../integration/BlockingConnectorTest.java | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index 6655e5a01ca88..ceb07fe80cae3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -383,7 +383,8 @@ private void assertRequestTimesOut(String requestDescription, ThrowingRunnable r } private static class Block { - private static CountDownLatch blockLatch; + private static CountDownLatch awaitBlockLatch; + private static CountDownLatch performBlockLatch; private final String block; @@ -403,13 +404,13 @@ private static ConfigDef config() { public static void waitForBlock() throws InterruptedException, TimeoutException { synchronized (Block.class) { - if (blockLatch == null) { + if (awaitBlockLatch == null) { throw new IllegalArgumentException("No connector has been created yet"); } } log.debug("Waiting for connector to block"); - if (!blockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { + if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { throw new TimeoutException("Timed out waiting for connector to block."); } log.debug("Connector should now be blocked"); @@ -421,9 +422,13 @@ public static void waitForBlock() throws InterruptedException, TimeoutException // ID, the location of the expected block, or both. public static void resetBlockLatch() { synchronized (Block.class) { - if (blockLatch != null) { - blockLatch.countDown(); - blockLatch = null; + if (awaitBlockLatch != null) { + awaitBlockLatch.countDown(); + awaitBlockLatch = null; + } + if (performBlockLatch != null) { + performBlockLatch.countDown(); + performBlockLatch = null; } } } @@ -435,22 +440,22 @@ public Block(Map props) { public Block(String block) { this.block = block; synchronized (Block.class) { - if (blockLatch != null) { - blockLatch.countDown(); - } - blockLatch = new CountDownLatch(1); + resetBlockLatch(); + awaitBlockLatch = new CountDownLatch(1); + performBlockLatch = new CountDownLatch(1); } } public void maybeBlockOn(String block) { if (block.equals(this.block)) { log.info("Will block on {}", block); - blockLatch.countDown(); + awaitBlockLatch.countDown(); while (true) { try { - Thread.sleep(Long.MAX_VALUE); + performBlockLatch.await(); + log.debug("Instructed to stop blocking; will resume normal execution"); } catch (InterruptedException e) { - // No-op. Just keep blocking. + log.debug("Interrupted while blocking; will continue blocking until instructed to stop"); } } } else { From c62b5476f646fc4a1fd8debe920d72163f92ab37 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Tue, 16 Aug 2022 12:40:48 -0400 Subject: [PATCH 2/5] Track multiple block latches per test case, add comments to summarize purposes of each latch type --- .../integration/BlockingConnectorTest.java | 76 +++++++++++++------ 1 file changed, 53 insertions(+), 23 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index ceb07fe80cae3..ba3c0a4b25cf2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -49,10 +49,12 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Properties; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -144,7 +146,8 @@ public void setup() throws Exception { public void close() { // stop all Connect, Kafka and Zk threads. connect.stop(); - Block.resetBlockLatch(); + // unblock everything so that we don't leak threads after each test run + Block.reset(); } @Test @@ -383,14 +386,16 @@ private void assertRequestTimesOut(String requestDescription, ThrowingRunnable r } private static class Block { + // All latches that blocking connectors/tasks are or will be waiting on during a test case + private static final Set BLOCK_LATCHES = new HashSet<>(); + // The latch that can be used to wait for a connector/task to reach the most-recently-registered blocking point private static CountDownLatch awaitBlockLatch; - private static CountDownLatch performBlockLatch; private final String block; public static final String BLOCK_CONFIG = "block"; - private static ConfigDef config() { + public static ConfigDef config() { return new ConfigDef() .define( BLOCK_CONFIG, @@ -402,11 +407,18 @@ private static ConfigDef config() { ); } + /** + * {@link CountDownLatch#await() Wait} for the connector/task to reach the point in its lifecycle where + * it will block. + */ public static void waitForBlock() throws InterruptedException, TimeoutException { + CountDownLatch awaitBlockLatch; synchronized (Block.class) { - if (awaitBlockLatch == null) { - throw new IllegalArgumentException("No connector has been created yet"); - } + awaitBlockLatch = Block.awaitBlockLatch; + } + + if (awaitBlockLatch == null) { + throw new IllegalArgumentException("No connector has been created yet"); } log.debug("Waiting for connector to block"); @@ -416,21 +428,33 @@ public static void waitForBlock() throws InterruptedException, TimeoutException log.debug("Connector should now be blocked"); } - // Note that there is only ever at most one global block latch at a time, which makes tests that + /** + * {@link CountDownLatch#countDown() Release} any latches allocated over the course of a test + * to either await a connector/task reaching a blocking point, or cause a connector/task to block. + */ + public static synchronized void reset() { + resetAwaitBlockLatch(); + BLOCK_LATCHES.forEach(CountDownLatch::countDown); + BLOCK_LATCHES.clear(); + } + + // Note that there is only ever at most one global await-block latch at a time, which makes tests that // use blocks in multiple places impossible. If necessary, this can be addressed in the future by - // adding support for multiple block latches at a time, possibly identifiable by a connector/task + // adding support for multiple await-block latches at a time, possibly identifiable by a connector/task // ID, the location of the expected block, or both. - public static void resetBlockLatch() { + private static synchronized void resetAwaitBlockLatch() { + if (awaitBlockLatch != null) { + awaitBlockLatch.countDown(); + awaitBlockLatch = null; + } + } + + private static CountDownLatch newBlockLatch() { + CountDownLatch result = new CountDownLatch(1); synchronized (Block.class) { - if (awaitBlockLatch != null) { - awaitBlockLatch.countDown(); - awaitBlockLatch = null; - } - if (performBlockLatch != null) { - performBlockLatch.countDown(); - performBlockLatch = null; - } + BLOCK_LATCHES.add(result); } + return result; } public Block(Map props) { @@ -439,21 +463,27 @@ public Block(Map props) { public Block(String block) { this.block = block; - synchronized (Block.class) { - resetBlockLatch(); - awaitBlockLatch = new CountDownLatch(1); - performBlockLatch = new CountDownLatch(1); + if (block != null) { + synchronized (Block.class) { + resetAwaitBlockLatch(); + awaitBlockLatch = new CountDownLatch(1); + } } } public void maybeBlockOn(String block) { if (block.equals(this.block)) { log.info("Will block on {}", block); - awaitBlockLatch.countDown(); + CountDownLatch blockLatch; + synchronized (Block.class) { + awaitBlockLatch.countDown(); + blockLatch = newBlockLatch(); + } while (true) { try { - performBlockLatch.await(); + blockLatch.await(); log.debug("Instructed to stop blocking; will resume normal execution"); + return; } catch (InterruptedException e) { log.debug("Interrupted while blocking; will continue blocking until instructed to stop"); } From 32e0ea65a53a024e1055ecb24b298f3c922018ad Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Wed, 13 Dec 2023 09:54:55 -0500 Subject: [PATCH 3/5] Address review comments - Reset block latch in OffsetsApiIntegrationTest cases that use the blocking connector - Harden Block::waitForBlock against possible race condition caused by connector creation latency --- .../integration/BlockingConnectorTest.java | 21 +++++++++++++------ .../OffsetsApiIntegrationTest.java | 4 ++++ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index ba3c0a4b25cf2..1439e6f93a997 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.Task; @@ -385,7 +387,7 @@ private void assertRequestTimesOut(String requestDescription, ThrowingRunnable r connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS); } - private static class Block { + public static class Block { // All latches that blocking connectors/tasks are or will be waiting on during a test case private static final Set BLOCK_LATCHES = new HashSet<>(); // The latch that can be used to wait for a connector/task to reach the most-recently-registered blocking point @@ -412,17 +414,23 @@ public static ConfigDef config() { * it will block. */ public static void waitForBlock() throws InterruptedException, TimeoutException { + Timer timer = Time.SYSTEM.timer(CONNECTOR_BLOCK_TIMEOUT_MS); + CountDownLatch awaitBlockLatch; synchronized (Block.class) { + while (Block.awaitBlockLatch == null) { + timer.update(); + if (timer.isExpired()) { + throw new TimeoutException("Timed out waiting for connector to block."); + } + Block.class.wait(timer.remainingMs()); + } awaitBlockLatch = Block.awaitBlockLatch; } - if (awaitBlockLatch == null) { - throw new IllegalArgumentException("No connector has been created yet"); - } - log.debug("Waiting for connector to block"); - if (!awaitBlockLatch.await(CONNECTOR_BLOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { + timer.update(); + if (!awaitBlockLatch.await(timer.remainingMs(), TimeUnit.MILLISECONDS)) { throw new TimeoutException("Timed out waiting for connector to block."); } log.debug("Connector should now be blocked"); @@ -467,6 +475,7 @@ public Block(String block) { synchronized (Block.class) { resetAwaitBlockLatch(); awaitBlockLatch = new CountDownLatch(1); + Block.class.notify(); } } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java index cc92effb778a1..f4c79ab44a3ac 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java @@ -468,6 +468,8 @@ public void testAlterSinkConnectorOffsetsZombieSinkTasks() throws Exception { ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.alterConnectorOffsets(connectorName, new ConnectorOffsets(offsetsToAlter))); assertThat(e.getMessage(), containsString("zombie sink task")); + + BlockingConnectorTest.Block.reset(); } @Test @@ -807,6 +809,8 @@ public void testResetSinkConnectorOffsetsZombieSinkTasks() throws Exception { // Try to reset the offsets ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorOffsets(connectorName)); assertThat(e.getMessage(), containsString("zombie sink task")); + + BlockingConnectorTest.Block.reset(); } @Test From 205fa45d265903bb2b326c5ce8ed87410b5e9718 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Wed, 13 Dec 2023 14:28:28 -0500 Subject: [PATCH 4/5] Wait for blocked threads to complete in Block::reset --- .../connect/integration/BlockingConnectorTest.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index 1439e6f93a997..a6ac58b6482cd 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -390,6 +390,8 @@ private void assertRequestTimesOut(String requestDescription, ThrowingRunnable r public static class Block { // All latches that blocking connectors/tasks are or will be waiting on during a test case private static final Set BLOCK_LATCHES = new HashSet<>(); + // All threads that are or were at one point blocked + private static final Set BLOCKED_THREADS = new HashSet<>(); // The latch that can be used to wait for a connector/task to reach the most-recently-registered blocking point private static CountDownLatch awaitBlockLatch; @@ -444,6 +446,17 @@ public static synchronized void reset() { resetAwaitBlockLatch(); BLOCK_LATCHES.forEach(CountDownLatch::countDown); BLOCK_LATCHES.clear(); + BLOCKED_THREADS.forEach(t -> { + try { + t.join(30_000); + if (t.isAlive()) { + log.warn("Thread {} failed to finish in time", t); + } + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while waiting for blocked thread " + t + " to finish"); + } + }); + BLOCKED_THREADS.clear(); } // Note that there is only ever at most one global await-block latch at a time, which makes tests that @@ -487,6 +500,7 @@ public void maybeBlockOn(String block) { synchronized (Block.class) { awaitBlockLatch.countDown(); blockLatch = newBlockLatch(); + BLOCKED_THREADS.add(Thread.currentThread()); } while (true) { try { From a2fa402acafd1c79e959f798574314208c431a16 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Fri, 12 Jan 2024 15:39:54 -0500 Subject: [PATCH 5/5] Address potential NPE in Block::maybeBlockOn --- .../kafka/connect/integration/BlockingConnectorTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index a6ac58b6482cd..2e415d1e31e17 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -68,6 +68,7 @@ import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG; import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -498,6 +499,10 @@ public void maybeBlockOn(String block) { log.info("Will block on {}", block); CountDownLatch blockLatch; synchronized (Block.class) { + assertNotNull( + "Block was reset prematurely", + awaitBlockLatch + ); awaitBlockLatch.countDown(); blockLatch = newBlockLatch(); BLOCKED_THREADS.add(Thread.currentThread());