diff --git a/build.gradle b/build.gradle
index 9b1971feed9d2..56434c524910c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1971,6 +1971,7 @@ project(':clients:clients-integration-tests') {
}
dependencies {
+ testImplementation libs.metrics
testImplementation libs.slf4jApi
testImplementation project(':test-common:test-common-internal-api')
testImplementation project(':test-common:test-common-runtime')
diff --git a/checkstyle/import-control-clients-integration-tests.xml b/checkstyle/import-control-clients-integration-tests.xml
index 44cf0dba1fbec..752dcace7f254 100644
--- a/checkstyle/import-control-clients-integration-tests.xml
+++ b/checkstyle/import-control-clients-integration-tests.xml
@@ -28,6 +28,8 @@
+
+
diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/MetricsDuringTopicCreationDeletionTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/MetricsDuringTopicCreationDeletionTest.java
new file mode 100644
index 0000000000000..32a8e5192f9ba
--- /dev/null
+++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/MetricsDuringTopicCreationDeletionTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.config.ServerConfigs;
+import org.apache.kafka.server.config.ServerLogConfigs;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import com.yammer.metrics.core.Gauge;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class MetricsDuringTopicCreationDeletionTest {
+
+ private static final String TOPIC_NAME_PREFIX = "topic";
+ private static final int TOPIC_NUM = 2;
+ private static final int CREATE_DELETE_ITERATIONS = 3;
+ private static final short REPLICATION_FACTOR = 1;
+ private static final int PARTITION_NUM = 3;
+
+ private final ClusterInstance clusterInstance;
+ private final List topics;
+ private volatile boolean running = true;
+
+ public MetricsDuringTopicCreationDeletionTest(ClusterInstance clusterInstance) {
+ this.clusterInstance = clusterInstance;
+ this.topics = new ArrayList<>();
+ for (int n = 0; n < TOPIC_NUM; n++) {
+ topics.add(TOPIC_NAME_PREFIX + n);
+ }
+ }
+
+ /*
+ * Checking all metrics we care in a single test is faster though it would be more elegant to have 3 @Test methods
+ */
+ @ClusterTest(
+ types = {Type.KRAFT},
+ brokers = 1,
+ serverProperties = {
+ @ClusterConfigProperty(key = ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, value = "true"),
+ @ClusterConfigProperty(key = "log.initial.task.delay.ms", value = "100"),
+ @ClusterConfigProperty(key = "log.segment.delete.delay.ms", value = "1000"),
+ @ClusterConfigProperty(key = ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
+ // speed up the test for UnderReplicatedPartitions, which relies on the ISR expiry thread to execute concurrently with topic creation
+ // But the replica.lag.time.max.ms value still need to consider the slow testing environment
+ @ClusterConfigProperty(key = ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG, value = "4000")
+ }
+ )
+ public void testMetricsDuringTopicCreateDelete() throws Exception {
+
+ final int initialOfflinePartitionsCount = getGauge("OfflinePartitionsCount").value();
+ final int initialPreferredReplicaImbalanceCount = getGauge("PreferredReplicaImbalanceCount").value();
+ final int initialUnderReplicatedPartitionsCount = getGauge("UnderReplicatedPartitions").value();
+
+ CompletableFuture future = CompletableFuture.runAsync(() -> {
+ while (running) {
+ int offlinePartitionsCount = getGauge("OfflinePartitionsCount").value();
+ int preferredReplicaImbalanceCount = getGauge("PreferredReplicaImbalanceCount").value();
+ int underReplicatedPartitionsCount = getGauge("UnderReplicatedPartitions").value();
+
+ if (offlinePartitionsCount != initialOfflinePartitionsCount ||
+ preferredReplicaImbalanceCount != initialPreferredReplicaImbalanceCount ||
+ underReplicatedPartitionsCount != initialUnderReplicatedPartitionsCount) {
+ running = false;
+ }
+
+ try {
+ // Avoid busy loop
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (InterruptedException ignored) {
+
+ }
+ }
+ });
+
+ Closeable runThread = () -> {
+ running = false;
+ future.join();
+ };
+
+ try (runThread) {
+ createAndDeleteTopics();
+ }
+
+ final int finalOfflinePartitionsCount = getGauge("OfflinePartitionsCount").value();
+ final int finalPreferredReplicaImbalanceCount = getGauge("PreferredReplicaImbalanceCount").value();
+ final int finalUnderReplicatedPartitionsCount = getGauge("UnderReplicatedPartitions").value();
+
+ assertEquals(initialOfflinePartitionsCount, finalOfflinePartitionsCount,
+ "Expect offlinePartitionsCount to be " + initialOfflinePartitionsCount + ", but got: " + finalOfflinePartitionsCount);
+ assertEquals(initialPreferredReplicaImbalanceCount, finalPreferredReplicaImbalanceCount,
+ "Expect PreferredReplicaImbalanceCount to be " + initialPreferredReplicaImbalanceCount + ", but got: " + finalPreferredReplicaImbalanceCount);
+ assertEquals(initialUnderReplicatedPartitionsCount, finalUnderReplicatedPartitionsCount,
+ "Expect UnderReplicatedPartitionCount to be " + initialUnderReplicatedPartitionsCount + ", but got: " + finalUnderReplicatedPartitionsCount);
+ }
+
+ private void createAndDeleteTopics() {
+ for (int i = 1; i <= CREATE_DELETE_ITERATIONS && running; i++) {
+ for (String topic : topics) {
+ if (!running) return;
+ try {
+ clusterInstance.createTopic(topic, PARTITION_NUM, REPLICATION_FACTOR);
+ } catch (Exception ignored) { }
+ }
+
+ for (String topic : topics) {
+ if (!running) return;
+ try {
+ clusterInstance.deleteTopic(topic);
+ } catch (Exception ignored) { }
+ }
+ }
+ }
+
+ private Gauge getGauge(String metricName) {
+ return KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
+ .filter(entry -> entry.getKey().getName().endsWith(metricName))
+ .findFirst()
+ .map(entry -> (Gauge) entry.getValue())
+ .orElseThrow(() -> new AssertionError("Unable to find metric " + metricName));
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
deleted file mode 100644
index 4bfb5a7104b00..0000000000000
--- a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.integration
-
-import java.util.Properties
-import kafka.server.KafkaConfig
-import kafka.utils.{Logging, TestUtils}
-
-import scala.jdk.CollectionConverters._
-import org.junit.jupiter.api.{BeforeEach, TestInfo}
-import com.yammer.metrics.core.Gauge
-import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs}
-import org.apache.kafka.server.metrics.KafkaYammerMetrics
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
-
-class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with Logging {
-
- private val nodesNum = 3
- private val topicName = "topic"
- private val topicNum = 2
- private val replicationFactor = 3
- private val partitionNum = 3
- private val createDeleteIterations = 3
-
- private val overridingProps = new Properties
- overridingProps.put(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, "true")
- overridingProps.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false")
- // speed up the test for UnderReplicatedPartitions, which relies on the ISR expiry thread to execute concurrently with topic creation
- // But the replica.lag.time.max.ms value still need to consider the slow Jenkins testing environment
- overridingProps.put(ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG, "4000")
-
- private val testedMetrics = List("OfflinePartitionsCount","PreferredReplicaImbalanceCount","UnderReplicatedPartitions")
- private val topics = List.tabulate(topicNum) (n => topicName + n)
-
- @volatile private var running = true
-
- override def generateConfigs = TestUtils.createBrokerConfigs(nodesNum)
- .map(KafkaConfig.fromProps(_, overridingProps))
-
- @BeforeEach
- override def setUp(testInfo: TestInfo): Unit = {
- // Do some Metrics Registry cleanup by removing the metrics that this test checks.
- // This is a test workaround to the issue that prior harness runs may have left a populated registry.
- // see https://issues.apache.org/jira/browse/KAFKA-4605
- for (m <- testedMetrics) {
- val metricName = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala.keys.find(_.getName.endsWith(m))
- metricName.foreach(KafkaYammerMetrics.defaultRegistry.removeMetric)
- }
-
- super.setUp(testInfo)
- }
-
- /*
- * checking all metrics we care in a single test is faster though it would be more elegant to have 3 @Test methods
- */
- @ParameterizedTest
- @ValueSource(strings = Array("kraft"))
- def testMetricsDuringTopicCreateDelete(quorum: String): Unit = {
-
- // For UnderReplicatedPartitions, because of https://issues.apache.org/jira/browse/KAFKA-4605
- // we can't access the metrics value of each server. So instead we directly invoke the method
- // replicaManager.underReplicatedPartitionCount() that defines the metrics value.
- @volatile var underReplicatedPartitionCount = 0
-
- // For OfflinePartitionsCount and PreferredReplicaImbalanceCount even with https://issues.apache.org/jira/browse/KAFKA-4605
- // the test has worked reliably because the metric that gets triggered is the one generated by the first started server (controller)
- val offlinePartitionsCountGauge = getGauge("OfflinePartitionsCount")
- @volatile var offlinePartitionsCount = offlinePartitionsCountGauge.value
- assert(offlinePartitionsCount == 0)
-
- val preferredReplicaImbalanceCountGauge = getGauge("PreferredReplicaImbalanceCount")
- @volatile var preferredReplicaImbalanceCount = preferredReplicaImbalanceCountGauge.value
- assert(preferredReplicaImbalanceCount == 0)
-
- // Thread checking the metric continuously
- running = true
- val thread = new Thread(() => {
- while (running) {
- for (s <- servers if running) {
- underReplicatedPartitionCount = s.replicaManager.underReplicatedPartitionCount
- if (underReplicatedPartitionCount > 0) {
- running = false
- }
- }
-
- preferredReplicaImbalanceCount = preferredReplicaImbalanceCountGauge.value
- if (preferredReplicaImbalanceCount > 0) {
- running = false
- }
-
- offlinePartitionsCount = offlinePartitionsCountGauge.value
- if (offlinePartitionsCount > 0) {
- running = false
- }
- }
- })
- thread.start()
-
- // breakable loop that creates and deletes topics
- createDeleteTopics()
-
- // if the thread checking the gauge is still run, stop it
- running = false
- thread.join()
-
- assert(offlinePartitionsCount==0, s"Expect offlinePartitionsCount to be 0, but got: $offlinePartitionsCount")
- assert(preferredReplicaImbalanceCount==0, s"Expect PreferredReplicaImbalanceCount to be 0, but got: $preferredReplicaImbalanceCount")
- assert(underReplicatedPartitionCount==0, s"Expect UnderReplicatedPartitionCount to be 0, but got: $underReplicatedPartitionCount")
- }
-
- private def getGauge(metricName: String) = {
- KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
- .find { case (k, _) => k.getName.endsWith(metricName) }
- .getOrElse(throw new AssertionError( "Unable to find metric " + metricName))
- ._2.asInstanceOf[Gauge[Int]]
- }
-
- private def createDeleteTopics(): Unit = {
- for (l <- 1 to createDeleteIterations if running) {
- // Create topics
- for (t <- topics if running) {
- try {
- createTopic(t, partitionNum, replicationFactor)
- } catch {
- case e: Exception => e.printStackTrace()
- }
- }
-
- // Delete topics
- for (t <- topics if running) {
- try {
- deleteTopic(t)
- TestUtils.verifyTopicDeletion(t, partitionNum, servers)
- } catch {
- case e: Exception => e.printStackTrace()
- }
- }
- }
- }
-}
diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
index 4b887eff9d68e..d07e3bd2ea2e0 100644
--- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
+++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
@@ -290,6 +290,19 @@ default void createTopic(String topicName, int partitions, short replicas, Map