diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 07c4494225fec..f7bf8a5e74859 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -1267,7 +1267,7 @@ public RecordQueue createQueue(final TopicPartition partition) { final SourceNode source = topology.source(partition.topic()); if (source == null) { throw new TopologyException( - "Topic is unknown to the topology. " + + "Topic " + partition.topic() + " is unknown to the topology. " + "This may happen if different KafkaStreams instances of the same application execute different Topologies. " + "Note that Topologies are only identical if all operators are added in the same order." ); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java index 677633e9b01de..7c93b769f5814 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java @@ -55,8 +55,8 @@ import static java.util.Arrays.asList; import static java.util.Collections.singletonList; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(600) @Tag("integration") @@ -143,13 +143,13 @@ public void shouldDoRebalancingWithMaximumNumberOfClientTags() throws Exception createAndStart(clientTags2, clientTagKeys, numberOfStandbyReplicas); waitUntilAllKafkaStreamsClientsAreRunning(); - assertTrue(isIdealTaskDistributionReachedForTags(clientTagKeys)); + waitForCondition(() -> isIdealTaskDistributionReachedForTags(clientTagKeys), "not all tags are evenly distributed"); stopKafkaStreamsInstanceWithIndex(0); waitUntilAllKafkaStreamsClientsAreRunning(); - assertTrue(isIdealTaskDistributionReachedForTags(clientTagKeys)); + waitForCondition(() -> isIdealTaskDistributionReachedForTags(clientTagKeys), "not all tags are evenly distributed"); } @Test @@ -165,7 +165,7 @@ public void shouldDistributeStandbyReplicasWhenAllClientsAreLocatedOnASameCluste createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); waitUntilAllKafkaStreamsClientsAreRunning(); - assertTrue(isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE))); + waitForCondition(() -> isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE)), "not all tags are evenly distributed"); } @Test @@ -186,7 +186,7 @@ public void shouldDistributeStandbyReplicasOverMultipleClientTags() throws Excep createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_3), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); waitUntilAllKafkaStreamsClientsAreRunning(); - assertTrue(isIdealTaskDistributionReachedForTags(asList(TAG_ZONE, TAG_CLUSTER))); + waitForCondition(() -> isIdealTaskDistributionReachedForTags(asList(TAG_ZONE, TAG_CLUSTER)), "not all tags are evenly distributed"); } @Test @@ -204,8 +204,8 @@ public void shouldDistributeStandbyReplicasWhenIdealDistributionCanNotBeAchieved waitUntilAllKafkaStreamsClientsAreRunning(); - assertTrue(isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE))); - assertTrue(isPartialTaskDistributionReachedForTags(singletonList(TAG_CLUSTER))); + waitForCondition(() -> isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE)), "not all tags are evenly distributed"); + waitForCondition(() -> isPartialTaskDistributionReachedForTags(singletonList(TAG_CLUSTER)), "not all tags are evenly distributed"); } private void stopKafkaStreamsInstanceWithIndex(final int index) {