From b9fcf6ab5209e78df321d3f7bd7d0bd01dda0593 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Fri, 16 Jan 2026 07:53:27 +0900 Subject: [PATCH 1/2] KAFKA-20074: Fix flaky PlaintextAdminIntegrationTest#testDescribeStreamsGroupsNotReady. --- .../integration/kafka/api/IntegrationTestHarness.scala | 8 ++++++-- .../kafka/api/PlaintextAdminIntegrationTest.scala | 4 +++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 2a5e8df40e498..ebd7fccaeea0f 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -241,7 +241,8 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { configsToRemove: List[String] = List(), inputTopics: Set[String], changelogTopics: Set[String] = Set(), - streamsGroupId: String): AsyncKafkaConsumer[K, V] = { + streamsGroupId: String, + replicationFactor: Optional[Short] = Optional.empty()): AsyncKafkaConsumer[K, V] = { val props = new Properties() props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) props.put(ConsumerConfig.GROUP_ID_CONFIG, streamsGroupId) @@ -251,6 +252,9 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { props ++= configOverrides configsToRemove.foreach(props.remove(_)) + val boxed: Optional[java.lang.Short] = + replicationFactor.map[java.lang.Short](s => java.lang.Short.valueOf(s)) + val streamsRebalanceData = new StreamsRebalanceData( UUID.randomUUID(), Optional.empty(), @@ -259,7 +263,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { inputTopics.asJava, util.Set.of(), util.Map.of(), - changelogTopics.map(c => (c, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.empty(), util.Map.of()))).toMap.asJava, + changelogTopics.map(c => (c, new StreamsRebalanceData.TopicInfo(Optional.empty(), boxed, util.Map.of()))).toMap.asJava, util.Set.of() )), Map.empty[String, String].asJava diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index efcf5139d7426..ff80fed52086a 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -4465,10 +4465,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val config = createConfig client = Admin.create(config) + val unavailableReplicationFactorInThisCluster = 9999.toShort val streams = createStreamsGroup( inputTopics = Set(testTopicName), changelogTopics = Set(testTopicName + "-changelog"), - streamsGroupId = streamsGroupId + streamsGroupId = streamsGroupId, + replicationFactor = Optional.of(unavailableReplicationFactorInThisCluster), ) streams.poll(JDuration.ofMillis(500L)) From e5013c90d64473a1f0c2c777e9f133a139dd6356 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Fri, 16 Jan 2026 23:24:48 +0900 Subject: [PATCH 2/2] Addressing review. --- .../scala/integration/kafka/api/IntegrationTestHarness.scala | 1 - .../integration/kafka/api/PlaintextAdminIntegrationTest.scala | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index ebd7fccaeea0f..9271b96d563e3 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -254,7 +254,6 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { val boxed: Optional[java.lang.Short] = replicationFactor.map[java.lang.Short](s => java.lang.Short.valueOf(s)) - val streamsRebalanceData = new StreamsRebalanceData( UUID.randomUUID(), Optional.empty(), diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index ff80fed52086a..28202cb04d67d 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -4470,7 +4470,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { inputTopics = Set(testTopicName), changelogTopics = Set(testTopicName + "-changelog"), streamsGroupId = streamsGroupId, - replicationFactor = Optional.of(unavailableReplicationFactorInThisCluster), + replicationFactor = Optional.of(unavailableReplicationFactorInThisCluster) ) streams.poll(JDuration.ofMillis(500L))