Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1743,15 +1743,24 @@ public void testPollThrowsInterruptExceptionIfInterrupted() {
* to an invalid topic.
*/
@ClusterTest
public void testSubscribeOnInvalidTopicThrowsInvalidTopicException() {
public void testSubscribeOnInvalidTopicThrowsInvalidTopicException() throws InterruptedException {
alterShareAutoOffsetReset("group1", "earliest");
try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) {

shareConsumer.subscribe(Set.of("topic abc"));

// The exception depends upon a metadata response which arrives asynchronously. If the delay is
// too short, the poll might return before the error is known.
assertThrows(InvalidTopicException.class, () -> shareConsumer.poll(Duration.ofMillis(10000)));
// The exception depends upon a metadata response which arrives asynchronously.
InvalidTopicException[] exception = {null};
waitForCondition(() -> {
try {
shareConsumer.poll(Duration.ofMillis(500));
} catch (InvalidTopicException e) {
exception[0] = e;
} catch (Throwable e) {
fail("An InvalidTopicException should be thrown. But " + e.getClass() + " is thrown");
}
return exception[0] != null;
}, DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, () -> "An InvalidTopicException should be thrown.");
}
}

Expand Down