diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index b67202e29fb3a..5d9dc112dbf3d 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -1743,15 +1743,24 @@ public void testPollThrowsInterruptExceptionIfInterrupted() { * to an invalid topic. */ @ClusterTest - public void testSubscribeOnInvalidTopicThrowsInvalidTopicException() { + public void testSubscribeOnInvalidTopicThrowsInvalidTopicException() throws InterruptedException { alterShareAutoOffsetReset("group1", "earliest"); try (ShareConsumer shareConsumer = createShareConsumer("group1")) { shareConsumer.subscribe(Set.of("topic abc")); - // The exception depends upon a metadata response which arrives asynchronously. If the delay is - // too short, the poll might return before the error is known. - assertThrows(InvalidTopicException.class, () -> shareConsumer.poll(Duration.ofMillis(10000))); + // The exception depends upon a metadata response which arrives asynchronously. + InvalidTopicException[] exception = {null}; + waitForCondition(() -> { + try { + shareConsumer.poll(Duration.ofMillis(500)); + } catch (InvalidTopicException e) { + exception[0] = e; + } catch (Throwable e) { + fail("An InvalidTopicException should be thrown. But " + e.getClass() + " is thrown"); + } + return exception[0] != null; + }, DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, () -> "An InvalidTopicException should be thrown."); } }