Skip to content

Commit 4e77466

Browse files
authored
KAFKA-19170 Move MetricsDuringTopicCreationDeletionTest to client-integration-tests module (#19528)
rewrite `MetricsDuringTopicCreationDeletionTest` to `ClusterTest` infra and move it to clients-integration-tests module. Reviewers: PoAn Yang <payang@apache.org>, Ken Huang <s7133700@gmail.com>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent 54b3b3d commit 4e77466

File tree

5 files changed

+163
-156
lines changed

5 files changed

+163
-156
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1970,6 +1970,7 @@ project(':clients:clients-integration-tests') {
19701970
}
19711971

19721972
dependencies {
1973+
testImplementation libs.metrics
19731974
testImplementation libs.slf4jApi
19741975
testImplementation project(':test-common:test-common-internal-api')
19751976
testImplementation project(':test-common:test-common-runtime')

checkstyle/import-control-clients-integration-tests.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
<allow pkg="org.junit"/>
2929
<allow pkg="kafka"/>
3030

31+
<allow pkg="com.yammer.metrics.core" />
32+
3133
<subpackage name="clients.producer">
3234
<allow pkg="org.opentest4j"/>
3335
</subpackage>
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients;
18+
19+
import org.apache.kafka.common.test.ClusterInstance;
20+
import org.apache.kafka.common.test.api.ClusterConfigProperty;
21+
import org.apache.kafka.common.test.api.ClusterTest;
22+
import org.apache.kafka.common.test.api.Type;
23+
import org.apache.kafka.server.config.ReplicationConfigs;
24+
import org.apache.kafka.server.config.ServerConfigs;
25+
import org.apache.kafka.server.config.ServerLogConfigs;
26+
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
27+
28+
import com.yammer.metrics.core.Gauge;
29+
30+
import java.io.Closeable;
31+
import java.util.ArrayList;
32+
import java.util.List;
33+
import java.util.concurrent.CompletableFuture;
34+
import java.util.concurrent.TimeUnit;
35+
36+
import static org.junit.jupiter.api.Assertions.assertEquals;
37+
38+
public class MetricsDuringTopicCreationDeletionTest {
39+
40+
private static final String TOPIC_NAME_PREFIX = "topic";
41+
private static final int TOPIC_NUM = 2;
42+
private static final int CREATE_DELETE_ITERATIONS = 3;
43+
private static final short REPLICATION_FACTOR = 1;
44+
private static final int PARTITION_NUM = 3;
45+
46+
private final ClusterInstance clusterInstance;
47+
private final List<String> topics;
48+
private volatile boolean running = true;
49+
50+
public MetricsDuringTopicCreationDeletionTest(ClusterInstance clusterInstance) {
51+
this.clusterInstance = clusterInstance;
52+
this.topics = new ArrayList<>();
53+
for (int n = 0; n < TOPIC_NUM; n++) {
54+
topics.add(TOPIC_NAME_PREFIX + n);
55+
}
56+
}
57+
58+
/*
59+
* Checking all metrics we care in a single test is faster though it would be more elegant to have 3 @Test methods
60+
*/
61+
@ClusterTest(
62+
types = {Type.KRAFT},
63+
brokers = 1,
64+
serverProperties = {
65+
@ClusterConfigProperty(key = ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, value = "true"),
66+
@ClusterConfigProperty(key = "log.initial.task.delay.ms", value = "100"),
67+
@ClusterConfigProperty(key = "log.segment.delete.delay.ms", value = "1000"),
68+
@ClusterConfigProperty(key = ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
69+
// speed up the test for UnderReplicatedPartitions, which relies on the ISR expiry thread to execute concurrently with topic creation
70+
// But the replica.lag.time.max.ms value still need to consider the slow testing environment
71+
@ClusterConfigProperty(key = ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG, value = "4000")
72+
}
73+
)
74+
public void testMetricsDuringTopicCreateDelete() throws Exception {
75+
76+
final int initialOfflinePartitionsCount = getGauge("OfflinePartitionsCount").value();
77+
final int initialPreferredReplicaImbalanceCount = getGauge("PreferredReplicaImbalanceCount").value();
78+
final int initialUnderReplicatedPartitionsCount = getGauge("UnderReplicatedPartitions").value();
79+
80+
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
81+
while (running) {
82+
int offlinePartitionsCount = getGauge("OfflinePartitionsCount").value();
83+
int preferredReplicaImbalanceCount = getGauge("PreferredReplicaImbalanceCount").value();
84+
int underReplicatedPartitionsCount = getGauge("UnderReplicatedPartitions").value();
85+
86+
if (offlinePartitionsCount != initialOfflinePartitionsCount ||
87+
preferredReplicaImbalanceCount != initialPreferredReplicaImbalanceCount ||
88+
underReplicatedPartitionsCount != initialUnderReplicatedPartitionsCount) {
89+
running = false;
90+
}
91+
92+
try {
93+
// Avoid busy loop
94+
TimeUnit.MILLISECONDS.sleep(100);
95+
} catch (InterruptedException ignored) {
96+
97+
}
98+
}
99+
});
100+
101+
Closeable runThread = () -> {
102+
running = false;
103+
future.join();
104+
};
105+
106+
try (runThread) {
107+
createAndDeleteTopics();
108+
}
109+
110+
final int finalOfflinePartitionsCount = getGauge("OfflinePartitionsCount").value();
111+
final int finalPreferredReplicaImbalanceCount = getGauge("PreferredReplicaImbalanceCount").value();
112+
final int finalUnderReplicatedPartitionsCount = getGauge("UnderReplicatedPartitions").value();
113+
114+
assertEquals(initialOfflinePartitionsCount, finalOfflinePartitionsCount,
115+
"Expect offlinePartitionsCount to be " + initialOfflinePartitionsCount + ", but got: " + finalOfflinePartitionsCount);
116+
assertEquals(initialPreferredReplicaImbalanceCount, finalPreferredReplicaImbalanceCount,
117+
"Expect PreferredReplicaImbalanceCount to be " + initialPreferredReplicaImbalanceCount + ", but got: " + finalPreferredReplicaImbalanceCount);
118+
assertEquals(initialUnderReplicatedPartitionsCount, finalUnderReplicatedPartitionsCount,
119+
"Expect UnderReplicatedPartitionCount to be " + initialUnderReplicatedPartitionsCount + ", but got: " + finalUnderReplicatedPartitionsCount);
120+
}
121+
122+
private void createAndDeleteTopics() {
123+
for (int i = 1; i <= CREATE_DELETE_ITERATIONS && running; i++) {
124+
for (String topic : topics) {
125+
if (!running) return;
126+
try {
127+
clusterInstance.createTopic(topic, PARTITION_NUM, REPLICATION_FACTOR);
128+
} catch (Exception ignored) { }
129+
}
130+
131+
for (String topic : topics) {
132+
if (!running) return;
133+
try {
134+
clusterInstance.deleteTopic(topic);
135+
} catch (Exception ignored) { }
136+
}
137+
}
138+
}
139+
140+
private Gauge<Integer> getGauge(String metricName) {
141+
return KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
142+
.filter(entry -> entry.getKey().getName().endsWith(metricName))
143+
.findFirst()
144+
.map(entry -> (Gauge<Integer>) entry.getValue())
145+
.orElseThrow(() -> new AssertionError("Unable to find metric " + metricName));
146+
}
147+
}

core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala

Lines changed: 0 additions & 156 deletions
This file was deleted.

test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,19 @@ default void createTopic(String topicName, int partitions, short replicas, Map<S
290290
}
291291
}
292292

293+
/**
294+
* Deletes a topic and waits for the deletion to complete.
295+
*
296+
* @param topicName The name of the topic to delete
297+
* @throws InterruptedException If the operation is interrupted
298+
*/
299+
default void deleteTopic(String topicName) throws InterruptedException {
300+
try (Admin admin = admin()) {
301+
admin.deleteTopics(List.of(topicName));
302+
waitTopicDeletion(topicName);
303+
}
304+
}
305+
293306
void waitForReadyBrokers() throws InterruptedException;
294307

295308
default void waitForTopic(String topic, int partitions) throws InterruptedException {

0 commit comments

Comments
 (0)