diff --git a/README.md b/README.md index d2974dd700..e0b32296fa 100644 --- a/README.md +++ b/README.md @@ -14,10 +14,11 @@ This project offers many kafka tools to simplify the life for kafka users. 1. [Kafka quick start](#kafka-cluster-quick-start): set up a true kafka cluster in one minute 2. [Kafka benchmark](#latency-benchmark): run producers/consumers to test the performance and consistency for kafka cluster -3. [Kafka offset explorer](#topic-explorer): check the start/end offsets of kafka topics -4. [Kafka official tool](#kafka-official-tool): run any one specific kafka official tool. All you have to prepare is the docker env. -5. [Kafka metric client](#kafka-metric-client): utility for accessing kafka Mbean metrics via JMX. -6. [Replica Collie](#replica-collie): move replicas from brokers to others. You can use this tool to obstruct specific brokers from hosting specific topics. +3. [Kafka performance](#Performance-Benchmark): check producing/consuming performance. +4. [Kafka offset explorer](#topic-explorer): check the start/end offsets of kafka topics +5. [Kafka official tool](#kafka-official-tool): run any one specific kafka official tool. All you have to prepare is the docker env. +6. [Kafka metric client](#kafka-metric-client): utility for accessing kafka Mbean metrics via JMX. +7. [Replica Collie](#replica-collie): move replicas from brokers to others. You can use this tool to obstruct specific brokers from hosting specific topics. [Release page](https://github.com/skiptests/astraea/releases) offers the uber jar including all tools. ```shell @@ -94,6 +95,28 @@ java -jar app-0.0.1-SNAPSHOT-all.jar latency --bootstrap.servers 192.168.50.224: --- +## Performance Benchmark +This tool is used test to following metrics. +1. publish latency: the time of completing producer data request +2. E2E latency: the time for a record to travel through Kafka +3. input rate: sum of consumer inputs in MByte per second +4. output rate: sum of producer outputs in MByte per second + +Run the benchmark from source +```shell +./gradlew run --args="Performance --bootstrap.servers localhost:9092 --topic topic --topicConfig partitions:10,replicationFactor:3 --producers 5 --consumers 1 --records 100000 --recordSize 10000" +``` +### Performance Benchmark Configurations +1. --bootstrap.servers: the server to connect to +2. --topic: the topic name +3. --partitions: topic config when creating new topic. Default: 1 +4. --replicas: topic config when creating new topic. Default: 1 +5. --consumers: the number of consumers (threads). Default: 1 +6. --records: the total number of records sent by the producers. Default: 1000 +7. --record.size: the record size in byte. Default: 1024 byte + +--- + ## Topic Explorer This tool can expose both earliest offset and latest offset for all (public and private) topics. diff --git a/app/src/main/java/org/astraea/App.java b/app/src/main/java/org/astraea/App.java index 27b6e61c3d..561e1d4968 100644 --- a/app/src/main/java/org/astraea/App.java +++ b/app/src/main/java/org/astraea/App.java @@ -6,6 +6,7 @@ import java.util.List; import java.util.Map; import org.astraea.metrics.kafka.KafkaMetricClientApp; +import org.astraea.performance.Performance; import org.astraea.performance.latency.End2EndLatency; import org.astraea.topic.ReplicaCollie; import org.astraea.topic.TopicExplorer; @@ -16,7 +17,8 @@ public class App { "latency", End2EndLatency.class, "offset", TopicExplorer.class, "metrics", KafkaMetricClientApp.class, - "replica", ReplicaCollie.class); + "replica", ReplicaCollie.class, + "performance", Performance.class); static void execute(Map> mains, List args) throws Throwable { diff --git a/app/src/main/java/org/astraea/argument/ArgumentUtil.java b/app/src/main/java/org/astraea/argument/ArgumentUtil.java index 947db75250..f9ea031231 100644 --- a/app/src/main/java/org/astraea/argument/ArgumentUtil.java +++ b/app/src/main/java/org/astraea/argument/ArgumentUtil.java @@ -96,6 +96,13 @@ public Boolean convert(String value) { } } + public static class ShortConverter implements IStringConverter { + @Override + public Short convert(String value) { + return Short.valueOf(value); + } + } + private static , T> C requireNonEmpty(C collection) { if (collection.isEmpty()) throw new ParameterException("array type can't be empty"); return collection; diff --git a/app/src/main/java/org/astraea/performance/ComponentFactory.java b/app/src/main/java/org/astraea/performance/ComponentFactory.java new file mode 100644 index 0000000000..339789c5dc --- /dev/null +++ b/app/src/main/java/org/astraea/performance/ComponentFactory.java @@ -0,0 +1,57 @@ +package org.astraea.performance; + +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; + +/** An interface used for creating producer, consumer. */ +public interface ComponentFactory { + Producer createProducer(); + + Consumer createConsumer(); + + /** + * (Optional) Setting partitioner for producer. If not set, use default partitioner RoundRobin + * + * @param partitionerName The partitioner class to use + * @return This factory + */ + default ComponentFactory partitioner(String partitionerName) { + return this; + } + + /** + * Used for creating Kafka producer, consumer of the same Kafka server and the same topic. The + * consumers generated by the same object from `fromKafka(brokers)` subscribe the same topic and + * have the same groupID. + */ + static ComponentFactory fromKafka(String brokers, String topic, Map config) { + final Properties prop = new Properties(); + final String groupId = "groupId:" + System.currentTimeMillis(); + prop.putAll(config); + prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + + return new ComponentFactory() { + /** Create Producer with KafkaProducer functions */ + @Override + public Producer createProducer() { + return Producer.fromKafka(prop, topic); + } + + /** Create Consumer with KafkaConsumer functions */ + @Override + public Consumer createConsumer() { + return Consumer.fromKafka(prop, Collections.singleton(topic)); + } + + @Override + public ComponentFactory partitioner(String partitionerName) { + prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, partitionerName); + return this; + } + }; + } +} diff --git a/app/src/main/java/org/astraea/performance/Consumer.java b/app/src/main/java/org/astraea/performance/Consumer.java new file mode 100644 index 0000000000..6012bdcdaa --- /dev/null +++ b/app/src/main/java/org/astraea/performance/Consumer.java @@ -0,0 +1,43 @@ +package org.astraea.performance; + +import java.time.Duration; +import java.util.Collection; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + +/** An interface for polling records. */ +public interface Consumer { + ConsumerRecords poll(Duration timeout); + + void wakeup(); + + void cleanup(); + + /** Create a Consumer with KafkaConsumer functionality */ + static Consumer fromKafka(Properties prop, Collection topics) { + + var kafkaConsumer = + new KafkaConsumer( + prop, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + kafkaConsumer.subscribe(topics); + return new Consumer() { + + @Override + public ConsumerRecords poll(Duration timeout) { + return kafkaConsumer.poll(timeout); + } + + @Override + public void wakeup() { + kafkaConsumer.wakeup(); + } + + @Override + public void cleanup() { + kafkaConsumer.close(); + } + }; + } +} diff --git a/app/src/main/java/org/astraea/performance/Metrics.java b/app/src/main/java/org/astraea/performance/Metrics.java new file mode 100644 index 0000000000..1d091d1e66 --- /dev/null +++ b/app/src/main/java/org/astraea/performance/Metrics.java @@ -0,0 +1,67 @@ +package org.astraea.performance; + +/** Used to record statistics. This is thread safe. */ +public class Metrics { + private double avgLatency; + private long num; + private long max; + private long min; + private long bytes; + + public Metrics() { + avgLatency = 0; + num = 0; + max = 0; + min = Long.MAX_VALUE; + bytes = 0; + } + + /** Simultaneously add latency and bytes. */ + public synchronized void put(long latency, long bytes) { + putLatency(latency); + addBytes(bytes); + } + /** Add a new value to latency metric. */ + public synchronized void putLatency(long latency) { + if (min > latency) min = latency; + if (max < latency) max = latency; + ++num; + avgLatency += (((double) latency) - avgLatency) / (double) num; + } + /** Add a new value to bytes. */ + public synchronized void addBytes(long bytes) { + this.bytes += bytes; + } + + /** Get the number of latency put. */ + public synchronized long num() { + return num; + } + /** Get the maximum of latency put. */ + public synchronized long max() { + return max; + } + /** Get the minimum of latency put. */ + public synchronized long min() { + return min; + } + /** Get the average latency. */ + public synchronized double avgLatency() { + return avgLatency; + } + /** Reset to 0 and returns the old value of bytes */ + public synchronized long bytesThenReset() { + long tmp = this.bytes; + this.bytes = 0; + return tmp; + } + /** Set all attributes to default value */ + public synchronized void reset() { + avgLatency = 0; + num = 0; + max = 0; + // 初始為最大的integer值 + min = Long.MAX_VALUE; + bytes = 0; + } +} diff --git a/app/src/main/java/org/astraea/performance/Performance.java b/app/src/main/java/org/astraea/performance/Performance.java new file mode 100644 index 0000000000..72c0100b24 --- /dev/null +++ b/app/src/main/java/org/astraea/performance/Performance.java @@ -0,0 +1,211 @@ +package org.astraea.performance; + +import com.beust.jcommander.Parameter; +import java.io.IOException; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.kafka.clients.producer.internals.DefaultPartitioner; +import org.apache.kafka.common.errors.WakeupException; +import org.astraea.argument.ArgumentUtil; +import org.astraea.argument.BasicArgument; +import org.astraea.concurrent.ThreadPool; +import org.astraea.topic.TopicAdmin; + +/** + * Performance benchmark which includes + * + *
    + *
  1. publish latency: the time of completing producer data request + *
  2. E2E latency: the time for a record to travel through Kafka + *
  3. input rate: sum of consumer inputs in MByte per second + *
  4. output rate: sum of producer outputs in MByte per second + *
+ * + * With configurations: + * + *
    + *
  1. --brokers: the server to connect to + *
  2. --topic: the topic name. Create new topic when the given topic does not exist. Default: + * "testPerformance-" + System.currentTimeMillis() + *
  3. --partitions: topic config when creating new topic. Default: 1 + *
  4. --replicationFactor: topic config when creating new topic. Default: 1 + *
  5. --producers: the number of producers (threads). Default: 1 + *
  6. --consumers: the number of consumers (threads). Default: 1 + *
  7. --records: the total number of records sent by the producers. Default: 1000 + *
  8. --recordSize: the record size in byte. Default: 1024 + *
+ * + * To avoid records being produced too fast, producer wait for one millisecond after each send. + */ +public class Performance { + + public static void main(String[] args) { + final var param = ArgumentUtil.parseArgument(new Argument(), args); + + try { + execute( + param, + ComponentFactory.fromKafka(param.brokers, param.topic, param.perfProps()) + .partitioner(DefaultPartitioner.class.getName())); + } catch (InterruptedException ignore) { + } + } + + public static void execute(final Argument param, ComponentFactory componentFactory) + throws InterruptedException { + try (var topicAdmin = TopicAdmin.of(param.perfProps())) { + topicAdmin.createTopic(param.topic, param.partitions, param.replicas); + } catch (IOException ignore) { + } + + final Metrics[] consumerMetric = new Metrics[param.consumers]; + final Metrics[] producerMetric = new Metrics[param.producers]; + for (int i = 0; i < producerMetric.length; ++i) producerMetric[i] = new Metrics(); + + // unconditional carry. Let all producers produce the same number of records. + param.records += param.producers - 1; + param.records -= param.records % param.producers; + + var complete = new CountDownLatch(1); + try (ThreadPool consumerThreads = + ThreadPool.builder() + .executors( + IntStream.range(0, param.consumers) + .mapToObj( + i -> + consumerExecutor( + componentFactory.createConsumer(), + consumerMetric[i] = new Metrics())) + .collect(Collectors.toList())) + .executor(new Tracker(producerMetric, consumerMetric, param.records, complete)) + .build()) { + + System.out.println("Wait for consumer startup"); + Thread.sleep(10000); + + // Close after all records are sent + try (ThreadPool producerThreads = + ThreadPool.builder() + .loop((int) (param.records / param.producers)) + .executors( + IntStream.range(0, param.producers) + .mapToObj( + i -> + producerExecutor( + componentFactory.createProducer(), param, producerMetric[i])) + .collect(Collectors.toList())) + .build()) { + complete.await(); + } + } + } + + static ThreadPool.Executor consumerExecutor(Consumer consumer, Metrics metrics) { + return new ThreadPool.Executor() { + @Override + public void execute() { + try { + for (var record : consumer.poll(Duration.ofSeconds(10))) { + // 記錄端到端延時, 記錄輸入byte(沒有算入header和timestamp) + metrics.put( + System.currentTimeMillis() - record.timestamp(), + record.serializedKeySize() + record.serializedValueSize()); + } + } catch (WakeupException ignore) { + // Stop polling and being ready to clean up + } + } + + @Override + public void wakeup() { + consumer.wakeup(); + } + + @Override + public void cleanup() { + consumer.cleanup(); + } + }; + } + + static ThreadPool.Executor producerExecutor(Producer producer, Argument param, Metrics metrics) { + byte[] payload = new byte[param.recordSize]; + return new ThreadPool.Executor() { + @Override + public void execute() { + long start = System.currentTimeMillis(); + try { + producer.send(payload).get(); + metrics.put(System.currentTimeMillis() - start, payload.length); + Thread.sleep(1); + } catch (InterruptedException | ExecutionException ignored) { + } + } + + @Override + public void cleanup() { + producer.cleanup(); + } + }; + } + + static class Argument extends BasicArgument { + + @Parameter( + names = {"--topic"}, + description = "String: topic name", + validateWith = ArgumentUtil.NotEmptyString.class) + String topic = "testPerformance-" + System.currentTimeMillis(); + + @Parameter( + names = {"--partitions"}, + description = "Integer: number of partitions to create the topic", + validateWith = ArgumentUtil.PositiveLong.class) + int partitions = 1; + + @Parameter( + names = {"--replicas"}, + description = "Integer: number of replica to create the topic", + validateWith = ArgumentUtil.PositiveLong.class, + converter = ArgumentUtil.ShortConverter.class) + short replicas = 1; + + @Parameter( + names = {"--producers"}, + description = "Integer: number of producers to produce records", + validateWith = ArgumentUtil.PositiveLong.class) + int producers = 1; + + @Parameter( + names = {"--consumers"}, + description = "Integer: number of consumers to consume records", + validateWith = ArgumentUtil.NonNegativeLong.class) + int consumers = 1; + + @Parameter( + names = {"--records"}, + description = "Integer: number of records to send", + validateWith = ArgumentUtil.NonNegativeLong.class) + long records = 1000; + + @Parameter( + names = {"--record.size"}, + description = "Integer: size of each record", + validateWith = ArgumentUtil.PositiveLong.class) + int recordSize = 1024; + + @Parameter( + names = {"--prop.file"}, + description = "String: path to the properties file", + validateWith = ArgumentUtil.NotEmptyString.class) + String propFile; + + public Map perfProps() { + return properties(propFile); + } + } +} diff --git a/app/src/main/java/org/astraea/performance/Producer.java b/app/src/main/java/org/astraea/performance/Producer.java new file mode 100644 index 0000000000..7139f7a273 --- /dev/null +++ b/app/src/main/java/org/astraea/performance/Producer.java @@ -0,0 +1,39 @@ +package org.astraea.performance; + +import java.util.Properties; +import java.util.concurrent.Future; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.ByteArraySerializer; + +/** An interface for sending records. */ +public interface Producer { + Future send(byte[] payload); + + void cleanup(); + + /** + * Create a KafkaProducer. + * + * @param prop: Properties to create a KafkaProducer + * @param topic: Topic to send to + * @return a KafkaProducer + */ + static Producer fromKafka(Properties prop, String topic) { + final KafkaProducer kafkaProducer = + new KafkaProducer<>(prop, new ByteArraySerializer(), new ByteArraySerializer()); + return new Producer() { + + @Override + public Future send(byte[] payload) { + return kafkaProducer.send(new ProducerRecord<>(topic, payload)); + } + + @Override + public void cleanup() { + kafkaProducer.close(); + } + }; + } +} diff --git a/app/src/main/java/org/astraea/performance/Tracker.java b/app/src/main/java/org/astraea/performance/Tracker.java new file mode 100644 index 0000000000..a81fab1913 --- /dev/null +++ b/app/src/main/java/org/astraea/performance/Tracker.java @@ -0,0 +1,73 @@ +package org.astraea.performance; + +import java.util.concurrent.CountDownLatch; +import org.astraea.concurrent.ThreadPool; + +/** Print out the given metrics. Run until `close()` is called. */ +public class Tracker implements ThreadPool.Executor { + private final Metrics[] producerData; + private final Metrics[] consumerData; + private final long records; + private final CountDownLatch complete; + + public Tracker( + Metrics[] producerData, Metrics[] consumerData, long records, CountDownLatch complete) { + this.producerData = producerData; + this.consumerData = consumerData; + this.records = records; + this.complete = complete; + } + + @Override + public void execute() throws InterruptedException { + long completed = 0; + long bytes = 0L; + long max = 0; + long min = Long.MAX_VALUE; + + /* producer */ + for (Metrics data : producerData) { + completed += data.num(); + bytes += data.bytesThenReset(); + if (max < data.max()) max = data.max(); + if (min > data.min()) min = data.min(); + } + if (completed == 0) { + Thread.sleep(1000); + return; + } + System.out.printf("producers完成度: %.2f%%%n", ((double) completed * 100.0 / (double) records)); + System.out.printf(" 輸出%.3fMB/second%n", ((double) bytes / (1 << 20))); + System.out.println(" 發送max latency:" + max + "ms"); + System.out.println(" 發送mim latency:" + min + "ms"); + for (int i = 0; i < producerData.length; ++i) { + System.out.printf( + " producer[%d]的發送average latency: %.3fms%n", i, producerData[i].avgLatency()); + } + /* consumer */ + completed = 0; + bytes = 0L; + max = 0; + min = Long.MAX_VALUE; + for (Metrics data : consumerData) { + completed += data.num(); + bytes += data.bytesThenReset(); + if (max < data.max()) max = data.max(); + if (min > data.min()) min = data.min(); + } + System.out.printf("consumer完成度: %.2f%%%n", ((double) completed * 100.0 / (double) records)); + System.out.printf(" 輸入%.3fMB/second%n", ((double) bytes / (1 << 20))); + System.out.println(" 端到端max latency:" + max + "ms"); + System.out.println(" 端到端mim latency:" + min + "ms"); + for (int i = 0; i < consumerData.length; ++i) { + System.out.printf( + " consumer[%d]的端到端average latency: %.3fms%n", i, consumerData[i].avgLatency()); + } + + System.out.println("\n"); + // The consumer has consumed the target records + if (completed >= records) complete.countDown(); + // 等待1秒,再抓資料輸出 + Thread.sleep(1000); + } +} diff --git a/app/src/test/java/org/astraea/performance/MetricsTest.java b/app/src/test/java/org/astraea/performance/MetricsTest.java new file mode 100644 index 0000000000..02828ee5f8 --- /dev/null +++ b/app/src/test/java/org/astraea/performance/MetricsTest.java @@ -0,0 +1,78 @@ +package org.astraea.performance; + +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.LongAdder; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class MetricsTest { + @Test + void testAverage() { + Random rand = new Random(); + final int num = 1000; + double avg = 0.0; + Metrics metrics = new Metrics(); + + Assertions.assertEquals(0, metrics.avgLatency()); + + for (int i = 0; i < num; ++i) { + long next = rand.nextInt(); + avg += ((double) next - avg) / (i + 1); + metrics.putLatency(next); + } + + Assertions.assertEquals(avg, metrics.avgLatency()); + } + + // Simultaneously add and get + @Test + void testBytes() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final Metrics metrics = new Metrics(); + final LongAdder longAdder = new LongAdder(); + final long input = 100; + final int loopCount = 10000; + Thread adder = + new Thread( + () -> { + try { + countDownLatch.await(); + } catch (InterruptedException ignore) { + } + for (int i = 0; i < loopCount; ++i) { + metrics.addBytes(input); + } + }); + Thread getter = + new Thread( + () -> { + try { + countDownLatch.await(); + } catch (InterruptedException ignore) { + } + for (int i = 0; i < loopCount; ++i) { + longAdder.add(metrics.bytesThenReset()); + } + }); + adder.start(); + getter.start(); + countDownLatch.countDown(); + adder.join(); + longAdder.add(metrics.bytesThenReset()); + + Assertions.assertEquals(loopCount * input, longAdder.sum()); + } + + @Test + public void testReset() { + final Metrics metrics = new Metrics(); + metrics.addBytes(10); + metrics.putLatency(11); + + metrics.reset(); + + Assertions.assertEquals(0, metrics.bytesThenReset()); + Assertions.assertEquals(0, metrics.avgLatency()); + } +} diff --git a/app/src/test/java/org/astraea/performance/PerformanceTest.java b/app/src/test/java/org/astraea/performance/PerformanceTest.java new file mode 100644 index 0000000000..f9a78dcec7 --- /dev/null +++ b/app/src/test/java/org/astraea/performance/PerformanceTest.java @@ -0,0 +1,51 @@ +package org.astraea.performance; + +import java.util.Map; +import org.astraea.concurrent.ThreadPool; +import org.astraea.service.RequireBrokerCluster; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class PerformanceTest extends RequireBrokerCluster { + private final ComponentFactory factory = + ComponentFactory.fromKafka( + bootstrapServers(), "testing-" + System.currentTimeMillis(), Map.of()); + + @Test + public void testExecute() { + Performance.Argument param = new Performance.Argument(); + param.brokers = bootstrapServers(); + Assertions.assertDoesNotThrow(() -> Performance.execute(param, factory)); + } + + @Test + public void testProducerExecutor() throws InterruptedException { + Metrics metrics = new Metrics(); + ThreadPool.Executor executor = + Performance.producerExecutor(factory.createProducer(), new Performance.Argument(), metrics); + + executor.execute(); + + Assertions.assertEquals(1, metrics.num()); + Assertions.assertEquals(1024, metrics.bytesThenReset()); + + executor.cleanup(); + } + + @Test + public void testConsumerExecutor() throws InterruptedException { + Metrics metrics = new Metrics(); + ThreadPool.Executor executor = Performance.consumerExecutor(factory.createConsumer(), metrics); + + executor.execute(); + + Assertions.assertEquals(0, metrics.num()); + Assertions.assertEquals(0, metrics.bytesThenReset()); + + factory.createProducer().send(new byte[1024]); + executor.execute(); + + Assertions.assertEquals(1, metrics.num()); + Assertions.assertNotEquals(1024, metrics.bytesThenReset()); + } +}