diff --git a/build.gradle b/build.gradle index 43d421dbd7c7a..0306e203d944c 100644 --- a/build.gradle +++ b/build.gradle @@ -3434,7 +3434,6 @@ project(':jmh-benchmarks') { implementation project(':connect:transforms') implementation project(':connect:json') implementation project(':clients').sourceSets.test.output - implementation project(':core').sourceSets.test.output implementation project(':server-common').sourceSets.test.output implementation project(':metadata').sourceSets.test.output diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml index 03943f45512f9..7cafa743e154f 100644 --- a/checkstyle/import-control-jmh-benchmarks.xml +++ b/checkstyle/import-control-jmh-benchmarks.xml @@ -54,6 +54,9 @@ + + + diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index ce5c813ab1ff8..febac7d6a5853 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -19,6 +19,7 @@ import kafka.cluster.Partition; import kafka.log.LogManager; +import kafka.server.AlterPartitionManager; import kafka.server.BrokerBlockingSender; import kafka.server.FailedPartitions; import kafka.server.InitialFetchState; @@ -31,7 +32,6 @@ import kafka.server.ReplicaQuota; import kafka.server.builders.LogManagerBuilder; import kafka.server.builders.ReplicaManagerBuilder; -import kafka.utils.TestUtils; import org.apache.kafka.clients.FetchSessionHandler; import org.apache.kafka.common.DirectoryId; @@ -51,6 +51,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.jmh.util.BenchmarkConfigUtils; import org.apache.kafka.metadata.KRaftMetadataCache; import org.apache.kafka.metadata.LeaderRecoveryState; import org.apache.kafka.metadata.MockConfigRepository; @@ -120,10 +121,8 @@ public class ReplicaFetcherThreadBenchmark { @Setup(Level.Trial) public void setup() throws IOException { scheduler.startup(); - KafkaConfig config = KafkaConfig.fromProps(TestUtils.createBrokerConfig( - 0, true, true, 9092, Option.empty(), Option.empty(), - Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1, - (short) 1, false)); + Properties configs = BenchmarkConfigUtils.createDummyBrokerConfig(); + KafkaConfig config = KafkaConfig.fromProps(configs); LogConfig logConfig = createLogConfig(); BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false); @@ -147,6 +146,7 @@ public void setup() throws IOException { setTime(Time.SYSTEM). build(); + AlterPartitionManager alterPartitionManager = Mockito.mock(AlterPartitionManager.class); replicaManager = new ReplicaManagerBuilder(). setConfig(config). setMetrics(metrics). @@ -157,7 +157,7 @@ public void setup() throws IOException { setBrokerTopicStats(brokerTopicStats). setMetadataCache(new KRaftMetadataCache(config.nodeId(), () -> KRAFT_VERSION_1)). setLogDirFailureChannel(new LogDirFailureChannel(logDirs.size())). - setAlterPartitionManager(TestUtils.createAlterIsrManager()). + setAlterPartitionManager(alterPartitionManager). build(); LinkedHashMap initialFetched = new LinkedHashMap<>(); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/StressTestLog.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/StressTestLog.java index 7815d49d27ecb..aa37b15bca074 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/StressTestLog.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/log/StressTestLog.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.jmh.log; -import kafka.utils.TestUtils; - import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.config.TopicConfig; @@ -26,6 +24,7 @@ import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.common.RequestLocal; @@ -41,6 +40,7 @@ import org.apache.kafka.storage.internals.log.UnifiedLog; import org.apache.kafka.storage.internals.log.VerificationGuard; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; +import org.apache.kafka.test.TestUtils; import java.io.File; import java.nio.charset.StandardCharsets; @@ -57,7 +57,8 @@ public class StressTestLog { private static final AtomicBoolean RUNNING = new AtomicBoolean(true); public static void main(String[] args) throws Exception { - File dir = TestUtils.randomPartitionLogDir(TestUtils.tempDir()); + File tmp = TestUtils.tempDirectory(); + File dir = TestUtils.randomPartitionLogDir(tmp); MockTime time = new MockTime(); Properties logProperties = new Properties(); logProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, 64 * 1024 * 1024); @@ -157,11 +158,10 @@ public WriterThread(UnifiedLog log) { @Override protected void work() throws Exception { byte[] value = Long.toString(currentOffset).getBytes(StandardCharsets.UTF_8); - MemoryRecords records = TestUtils.singletonRecords(value, - null, + MemoryRecords records = MemoryRecords.withRecords( + RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, - RecordBatch.NO_TIMESTAMP, - RecordBatch.CURRENT_MAGIC_VALUE); + new SimpleRecord(RecordBatch.NO_TIMESTAMP, null, value)); LogAppendInfo logAppendInfo = log.appendAsLeader(records, 0, AppendOrigin.CLIENT, diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java index 12ca5e4a0c441..10325e8ba3f85 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java @@ -22,17 +22,18 @@ import kafka.server.KafkaConfig; import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; +import kafka.server.builders.LogManagerBuilder; import kafka.server.builders.ReplicaManagerBuilder; -import kafka.utils.TestUtils; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.jmh.util.BenchmarkConfigUtils; import org.apache.kafka.metadata.KRaftMetadataCache; import org.apache.kafka.metadata.MetadataCache; import org.apache.kafka.metadata.MockConfigRepository; -import org.apache.kafka.server.config.ServerLogConfigs; import org.apache.kafka.server.util.KafkaScheduler; import org.apache.kafka.server.util.MockTime; import org.apache.kafka.server.util.Scheduler; @@ -42,6 +43,7 @@ import org.apache.kafka.storage.internals.log.LogDirFailureChannel; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; +import org.mockito.Mockito; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Level; @@ -100,17 +102,31 @@ public class CheckpointBench { @Setup(Level.Trial) public void setup() { this.scheduler = new KafkaScheduler(1, true, "scheduler-thread"); - this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig( - 0, true, true, 9092, Option.empty(), Option.empty(), - Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1, - (short) 1, false)); + Properties configs = BenchmarkConfigUtils.createDummyBrokerConfig(); + this.brokerProperties = KafkaConfig.fromProps(configs); this.metrics = new Metrics(); this.time = new MockTime(); this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size()); final List files = brokerProperties.logDirs().stream().map(File::new).toList(); - this.logManager = TestUtils.createLogManager(CollectionConverters.asScala(files), - new LogConfig(new Properties()), new MockConfigRepository(), new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d, - 1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true), time, 4, false, Option.empty(), false, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT); + this.logManager = new LogManagerBuilder(). + setLogDirs(files). + setInitialOfflineDirs(List.of()). + setConfigRepository(new MockConfigRepository()). + setInitialDefaultConfig(new LogConfig(new Properties())). + setCleanerConfig(new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d, + 1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true)). + setRecoveryThreadsPerDataDir(1). + setFlushCheckMs(1000L). + setFlushRecoveryOffsetCheckpointMs(10000L). + setFlushStartOffsetCheckpointMs(10000L). + setRetentionCheckMs(1000L). + setProducerStateManagerConfig(60000, false). + setScheduler(scheduler). + setBrokerTopicStats(new BrokerTopicStats(false)). + setLogDirFailureChannel(failureChannel). + setTime(Time.SYSTEM). + build(); + scheduler.startup(); final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false); final MetadataCache metadataCache = @@ -120,7 +136,7 @@ public void setup() { this.metrics, this.time, "", ""); - this.alterPartitionManager = TestUtils.createAlterIsrManager(); + this.alterPartitionManager = Mockito.mock(AlterPartitionManager.class); this.replicaManager = new ReplicaManagerBuilder(). setConfig(brokerProperties). setMetrics(metrics). diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java index 0128f62f4f024..d071fa3a08efd 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java @@ -24,7 +24,6 @@ import kafka.server.ReplicaManager; import kafka.server.builders.LogManagerBuilder; import kafka.server.builders.ReplicaManagerBuilder; -import kafka.utils.TestUtils; import org.apache.kafka.common.DirectoryId; import org.apache.kafka.common.TopicPartition; @@ -32,6 +31,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.jmh.util.BenchmarkConfigUtils; import org.apache.kafka.metadata.ConfigRepository; import org.apache.kafka.metadata.KRaftMetadataCache; import org.apache.kafka.metadata.LeaderRecoveryState; @@ -45,6 +45,7 @@ import org.apache.kafka.storage.internals.log.LogDirFailureChannel; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; +import org.mockito.Mockito; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -107,10 +108,8 @@ public void setup() { topicId = Option.empty(); this.scheduler = new KafkaScheduler(1, true, "scheduler-thread"); - this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig( - 0, true, true, 9092, Option.empty(), Option.empty(), - Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1, - (short) 1, false)); + Properties configs = BenchmarkConfigUtils.createDummyBrokerConfig(); + this.brokerProperties = KafkaConfig.fromProps(configs); this.metrics = new Metrics(); this.time = Time.SYSTEM; this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size()); @@ -141,7 +140,8 @@ public void setup() { build(); scheduler.startup(); this.quotaManagers = QuotaFactory.instantiate(this.brokerProperties, this.metrics, this.time, "", ""); - this.alterPartitionManager = TestUtils.createAlterIsrManager(); + this.alterPartitionManager = Mockito.mock(AlterPartitionManager.class); + this.replicaManager = new ReplicaManagerBuilder(). setConfig(brokerProperties). setMetrics(metrics). diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BenchmarkConfigUtils.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BenchmarkConfigUtils.java new file mode 100644 index 0000000000000..2e058ba707e1f --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/BenchmarkConfigUtils.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.util; + +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.network.SocketServerConfigs; +import org.apache.kafka.raft.KRaftConfigs; +import org.apache.kafka.raft.QuorumConfig; +import org.apache.kafka.server.config.ReplicationConfigs; +import org.apache.kafka.server.config.ServerConfigs; +import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.storage.internals.log.CleanerConfig; +import org.apache.kafka.test.TestUtils; + +import java.io.File; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +public class BenchmarkConfigUtils { + + public static Properties createDummyBrokerConfig() { + Properties props = new Properties(); + + props.put(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, "true"); + props.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true"); + props.setProperty(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG, String.valueOf(TimeUnit.MINUTES.toMillis(10))); + props.put(KRaftConfigs.NODE_ID_CONFIG, "0"); + props.put(ServerConfigs.BROKER_ID_CONFIG, "0"); + + props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "PLAINTEXT://localhost:9092"); + props.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:9092"); + props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER"); + props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"); + + File dir = TestUtils.tempDirectory(); + props.put(ServerLogConfigs.LOG_DIR_CONFIG, dir.getAbsolutePath()); + + props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker"); + props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "0@localhost:0"); + + props.put(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG, "1500"); + props.put(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, "1500"); + + props.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, "true"); + props.put(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, "true"); + + props.put(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000"); + props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152"); + props.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "100"); + props.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, "1"); + props.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "1"); + + props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); + props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "5"); + props.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0"); + + props.put(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG, "2"); + props.put(ServerConfigs.BACKGROUND_THREADS_CONFIG, "2"); + + return props; + } +}