From bcd896261c02f25c51b3317413fdafab2c33d9dc Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 1 Aug 2022 18:50:05 -0700 Subject: [PATCH 1/2] fix flakiness --- .../kafka/streams/processor/internals/StreamTask.java | 2 +- .../integration/RackAwarenessIntegrationTest.java | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 07c4494225fec..f7bf8a5e74859 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -1267,7 +1267,7 @@ public RecordQueue createQueue(final TopicPartition partition) { final SourceNode source = topology.source(partition.topic()); if (source == null) { throw new TopologyException( - "Topic is unknown to the topology. " + + "Topic " + partition.topic() + " is unknown to the topology. " + "This may happen if different KafkaStreams instances of the same application execute different Topologies. " + "Note that Topologies are only identical if all operators are added in the same order." ); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java index 677633e9b01de..1503af0a66f09 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java @@ -55,6 +55,7 @@ import static java.util.Arrays.asList; import static java.util.Collections.singletonList; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -149,7 +150,7 @@ public void shouldDoRebalancingWithMaximumNumberOfClientTags() throws Exception waitUntilAllKafkaStreamsClientsAreRunning(); - assertTrue(isIdealTaskDistributionReachedForTags(clientTagKeys)); + waitForCondition(() -> isIdealTaskDistributionReachedForTags(clientTagKeys), "not all tags are evenly distributed"); } @Test @@ -165,7 +166,7 @@ public void shouldDistributeStandbyReplicasWhenAllClientsAreLocatedOnASameCluste createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); waitUntilAllKafkaStreamsClientsAreRunning(); - assertTrue(isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE))); + waitForCondition(() -> isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE)), "not all tags are evenly distributed"); } @Test @@ -186,7 +187,7 @@ public void shouldDistributeStandbyReplicasOverMultipleClientTags() throws Excep createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_3), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); waitUntilAllKafkaStreamsClientsAreRunning(); - assertTrue(isIdealTaskDistributionReachedForTags(asList(TAG_ZONE, TAG_CLUSTER))); + waitForCondition(() -> isIdealTaskDistributionReachedForTags(asList(TAG_ZONE, TAG_CLUSTER)), "not all tags are evenly distributed"); } @Test @@ -204,8 +205,8 @@ public void shouldDistributeStandbyReplicasWhenIdealDistributionCanNotBeAchieved waitUntilAllKafkaStreamsClientsAreRunning(); - assertTrue(isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE))); - assertTrue(isPartialTaskDistributionReachedForTags(singletonList(TAG_CLUSTER))); + waitForCondition(() -> isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE)), "not all tags are evenly distributed"); + waitForCondition(() -> isIdealTaskDistributionReachedForTags(singletonList(TAG_CLUSTER)), "not all tags are evenly distributed"); } private void stopKafkaStreamsInstanceWithIndex(final int index) { From c9f65fb346590da4939fad7de9d2104819fc3013 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 2 Aug 2022 18:56:52 -0700 Subject: [PATCH 2/2] github comments --- .../streams/integration/RackAwarenessIntegrationTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java index 1503af0a66f09..7c93b769f5814 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java @@ -57,7 +57,6 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(600) @Tag("integration") @@ -144,7 +143,7 @@ public void shouldDoRebalancingWithMaximumNumberOfClientTags() throws Exception createAndStart(clientTags2, clientTagKeys, numberOfStandbyReplicas); waitUntilAllKafkaStreamsClientsAreRunning(); - assertTrue(isIdealTaskDistributionReachedForTags(clientTagKeys)); + waitForCondition(() -> isIdealTaskDistributionReachedForTags(clientTagKeys), "not all tags are evenly distributed"); stopKafkaStreamsInstanceWithIndex(0); @@ -206,7 +205,7 @@ public void shouldDistributeStandbyReplicasWhenIdealDistributionCanNotBeAchieved waitUntilAllKafkaStreamsClientsAreRunning(); waitForCondition(() -> isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE)), "not all tags are evenly distributed"); - waitForCondition(() -> isIdealTaskDistributionReachedForTags(singletonList(TAG_CLUSTER)), "not all tags are evenly distributed"); + waitForCondition(() -> isPartialTaskDistributionReachedForTags(singletonList(TAG_CLUSTER)), "not all tags are evenly distributed"); } private void stopKafkaStreamsInstanceWithIndex(final int index) {