diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index 2302007c5275e..300db0047b441 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -1127,27 +1127,26 @@ class ControllerIntegrationTest extends QuorumTestHarness { TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch, "failed to get expected partition state upon topic creation") - val topicIdAfterCreate = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic()) - assertEquals(None, topicIdAfterCreate) - val emptyTopicId = controller.controllerContext.topicIds.get("t") - assertEquals(None, emptyTopicId) + assertEquals(None, zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic)) + assertEquals(None, controller.controllerContext.topicIds.get(tp.topic)) servers(controllerId).shutdown() servers(controllerId).awaitShutdown() servers = makeServers(1) TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed to elect a controller") - val topicIdAfterUpgrade = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic()) - assertNotEquals(emptyTopicId, topicIdAfterUpgrade) + + val (topicIdAfterUpgrade, _) = TestUtils.computeUntilTrue(zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic))(_.nonEmpty) + assertNotEquals(None, topicIdAfterUpgrade, s"topic id for ${tp.topic} not found in ZK") + val controller2 = getController().kafkaController - assertNotEquals(emptyTopicId, controller2.controllerContext.topicIds.get("t")) - val topicId = controller2.controllerContext.topicIds.get("t").get - assertEquals(topicIdAfterUpgrade.get, topicId) - assertEquals("t", controller2.controllerContext.topicNames(topicId)) + val topicId = controller2.controllerContext.topicIds.get(tp.topic) + assertEquals(topicIdAfterUpgrade, topicId) + assertEquals(tp.topic, controller2.controllerContext.topicNames(topicId.get)) TestUtils.waitUntilTrue(() => servers(0).logManager.getLog(tp).isDefined, "log was not created") val topicIdInLog = servers(0).logManager.getLog(tp).get.topicId - assertEquals(Some(topicId), topicIdInLog) + assertEquals(topicId, topicIdInLog) adminZkClient.deleteTopic(tp.topic) TestUtils.waitUntilTrue(() => !servers.head.kafkaController.controllerContext.allTopics.contains(tp.topic),