Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9ad1754
(wip) rewrite MetricsDuringTopicCreationDeletionTest.scala with Clust…
yunchipang Apr 21, 2025
0ef952b
(wip) change metrics checking logic; add async topic deletion config
yunchipang Apr 22, 2025
bb5df87
cleanup commented code
yunchipang Apr 22, 2025
3ac6906
migrate scala-based test to CluterTest, set replica=1 and add log.ini…
yunchipang Apr 23, 2025
5791062
convert scala-based test inplace to java
yunchipang Apr 25, 2025
9932251
moved to clients-integration-tests, checkstyleTest not passed
yunchipang Apr 25, 2025
928f05d
run spotlessApply to format imports; allow com.yammer.metrics.core
yunchipang Apr 25, 2025
b39870d
replace old-school thread with Closeable
yunchipang Apr 25, 2025
ad861c8
remove forEach broker block
yunchipang Apr 28, 2025
fee1bcd
use List.of() in deleteTopics instead of Collections.singletonList()
yunchipang Apr 28, 2025
22984dc
throw exception instead of catching it in setUp()
yunchipang Apr 28, 2025
6a5ff36
replace int[] with just int
yunchipang Apr 28, 2025
4018bf4
merge closed with running (same purpose)
yunchipang Apr 29, 2025
8d303bf
move initial metrics fetch to test-level
yunchipang Apr 30, 2025
3cc96ae
move runThread to test-level
yunchipang Apr 30, 2025
5ecd2a1
extract createAndDeleteTopics; add sleep in thread
yunchipang Apr 30, 2025
ddb406e
replace 'com.yammer.metrics:metrics-core:2.2.0' with libs.metrics
yunchipang May 5, 2025
6676a48
use only one tab
yunchipang May 5, 2025
5aea56a
remove 'Jenkins' from comment
yunchipang May 5, 2025
967ceb5
revert indentation; remove redundant running=True
yunchipang May 5, 2025
df725ab
remove printStackprintStackTrace; make REPLICATION_FACTOR short
yunchipang May 5, 2025
81fce69
small fixes
yunchipang May 5, 2025
ddbe4c7
add brokers=1 to config
yunchipang May 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1971,6 +1971,7 @@ project(':clients:clients-integration-tests') {
}

dependencies {
testImplementation libs.metrics
testImplementation libs.slf4jApi
testImplementation project(':test-common:test-common-internal-api')
testImplementation project(':test-common:test-common-runtime')
Expand Down
2 changes: 2 additions & 0 deletions checkstyle/import-control-clients-integration-tests.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
<allow pkg="org.junit"/>
<allow pkg="kafka"/>

<allow pkg="com.yammer.metrics.core" />

<subpackage name="clients.producer">
<allow pkg="org.opentest4j"/>
</subpackage>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients;

import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;

import com.yammer.metrics.core.Gauge;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class MetricsDuringTopicCreationDeletionTest {

private static final String TOPIC_NAME_PREFIX = "topic";
private static final int TOPIC_NUM = 2;
private static final int CREATE_DELETE_ITERATIONS = 3;
private static final short REPLICATION_FACTOR = 1;
private static final int PARTITION_NUM = 3;

private final ClusterInstance clusterInstance;
private final List<String> topics;
private volatile boolean running = true;

public MetricsDuringTopicCreationDeletionTest(ClusterInstance clusterInstance) {
this.clusterInstance = clusterInstance;
this.topics = new ArrayList<>();
for (int n = 0; n < TOPIC_NUM; n++) {
topics.add(TOPIC_NAME_PREFIX + n);
}
}

/*
* Checking all metrics we care in a single test is faster though it would be more elegant to have 3 @Test methods
*/
@ClusterTest(
types = {Type.KRAFT},
brokers = 1,
serverProperties = {
@ClusterConfigProperty(key = ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = "log.initial.task.delay.ms", value = "100"),
@ClusterConfigProperty(key = "log.segment.delete.delay.ms", value = "1000"),
@ClusterConfigProperty(key = ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
// speed up the test for UnderReplicatedPartitions, which relies on the ISR expiry thread to execute concurrently with topic creation
// But the replica.lag.time.max.ms value still need to consider the slow testing environment
@ClusterConfigProperty(key = ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG, value = "4000")
}
)
public void testMetricsDuringTopicCreateDelete() throws Exception {

final int initialOfflinePartitionsCount = getGauge("OfflinePartitionsCount").value();
final int initialPreferredReplicaImbalanceCount = getGauge("PreferredReplicaImbalanceCount").value();
final int initialUnderReplicatedPartitionsCount = getGauge("UnderReplicatedPartitions").value();

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
while (running) {
int offlinePartitionsCount = getGauge("OfflinePartitionsCount").value();
int preferredReplicaImbalanceCount = getGauge("PreferredReplicaImbalanceCount").value();
int underReplicatedPartitionsCount = getGauge("UnderReplicatedPartitions").value();

if (offlinePartitionsCount != initialOfflinePartitionsCount ||
preferredReplicaImbalanceCount != initialPreferredReplicaImbalanceCount ||
underReplicatedPartitionsCount != initialUnderReplicatedPartitionsCount) {
running = false;
}

try {
// Avoid busy loop
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException ignored) {

}
}
});

Closeable runThread = () -> {
running = false;
future.join();
};

try (runThread) {
createAndDeleteTopics();
}

final int finalOfflinePartitionsCount = getGauge("OfflinePartitionsCount").value();
final int finalPreferredReplicaImbalanceCount = getGauge("PreferredReplicaImbalanceCount").value();
final int finalUnderReplicatedPartitionsCount = getGauge("UnderReplicatedPartitions").value();

assertEquals(initialOfflinePartitionsCount, finalOfflinePartitionsCount,
"Expect offlinePartitionsCount to be " + initialOfflinePartitionsCount + ", but got: " + finalOfflinePartitionsCount);
assertEquals(initialPreferredReplicaImbalanceCount, finalPreferredReplicaImbalanceCount,
"Expect PreferredReplicaImbalanceCount to be " + initialPreferredReplicaImbalanceCount + ", but got: " + finalPreferredReplicaImbalanceCount);
assertEquals(initialUnderReplicatedPartitionsCount, finalUnderReplicatedPartitionsCount,
"Expect UnderReplicatedPartitionCount to be " + initialUnderReplicatedPartitionsCount + ", but got: " + finalUnderReplicatedPartitionsCount);
}

private void createAndDeleteTopics() {
for (int i = 1; i <= CREATE_DELETE_ITERATIONS && running; i++) {
for (String topic : topics) {
if (!running) return;
try {
clusterInstance.createTopic(topic, PARTITION_NUM, REPLICATION_FACTOR);
} catch (Exception ignored) { }
}

for (String topic : topics) {
if (!running) return;
try {
clusterInstance.deleteTopic(topic);
} catch (Exception ignored) { }
}
}
}

private Gauge<Integer> getGauge(String metricName) {
return KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
.filter(entry -> entry.getKey().getName().endsWith(metricName))
.findFirst()
.map(entry -> (Gauge<Integer>) entry.getValue())
.orElseThrow(() -> new AssertionError("Unable to find metric " + metricName));
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,19 @@ default void createTopic(String topicName, int partitions, short replicas, Map<S
}
}

/**
* Deletes a topic and waits for the deletion to complete.
*
* @param topicName The name of the topic to delete
* @throws InterruptedException If the operation is interrupted
*/
default void deleteTopic(String topicName) throws InterruptedException {
try (Admin admin = admin()) {
admin.deleteTopics(List.of(topicName));
waitTopicDeletion(topicName);
}
}

void waitForReadyBrokers() throws InterruptedException;

default void waitForTopic(String topic, int partitions) throws InterruptedException {
Expand Down