Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ public void setup() throws Exception {
@After
public void close() {
// stop all Connect, Kafka and Zk threads.
connect.stop();
Block.resetBlockLatch();
connect.stop();
}

@Test
Expand Down Expand Up @@ -349,8 +349,9 @@ private void assertRequestTimesOut(String requestDescription, ThrowingRunnable r
connect.requestTimeout(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
}

private static class Block {
public static class Block {
private static CountDownLatch blockLatch;
private static CountDownLatch clearLatch;

private final String block;

Expand Down Expand Up @@ -392,6 +393,10 @@ public static void resetBlockLatch() {
blockLatch.countDown();
blockLatch = null;
}
if (clearLatch != null) {
clearLatch.countDown();
clearLatch = null;
}
}
}

Expand All @@ -406,6 +411,7 @@ public Block(String block) {
blockLatch.countDown();
}
blockLatch = new CountDownLatch(1);
clearLatch = new CountDownLatch(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gharris1727 - I had a question regarding lines 410 to 412.

When is it used and do we need to do the same for clearLatch ?

}
}

Expand All @@ -415,7 +421,8 @@ public void maybeBlockOn(String block) {
blockLatch.countDown();
while (true) {
try {
Thread.sleep(Long.MAX_VALUE);
clearLatch.await(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
return;
} catch (InterruptedException e) {
// No-op. Just keep blocking.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ public void testAlterSinkConnectorOffsetsZombieSinkTasks() throws Exception {
ConnectRestException e = assertThrows(ConnectRestException.class,
() -> connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)));
assertThat(e.getMessage(), containsString("zombie sink task"));
BlockingConnectorTest.Block.resetBlockLatch();
}

@Test
Expand Down Expand Up @@ -785,6 +786,7 @@ public void testResetSinkConnectorOffsetsZombieSinkTasks() throws Exception {
// Try to reset the offsets
ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorOffsets(CONNECTOR_NAME));
assertThat(e.getMessage(), containsString("zombie sink task"));
BlockingConnectorTest.Block.resetBlockLatch();
}

@Test
Expand Down