Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance testing tool #11

Merged
merged 51 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
fb7eb21
Add my name to README in authors field
Sep 3, 2021
336eeee
Merge remote-tracking branch 'upstream/main' into main
Sep 3, 2021
a64c29e
Merge branch 'skiptests:main' into main
chinghongfang Sep 4, 2021
39920d3
Merge branch 'skiptests:main' into main
chinghongfang Sep 5, 2021
402c13e
Create producer/consumer executor.
Sep 5, 2021
721add0
Producer/Consumer works.
chinghongfang Sep 7, 2021
3aaf3fa
Record bytes read/write per second.
chinghongfang Sep 7, 2021
8dcaed5
Performance testing tool
chinghongfang Sep 8, 2021
2e19e26
Merge branch 'skiptests:main' into performance
chinghongfang Sep 11, 2021
541ce66
Merge branch 'skiptests:main' into main
chinghongfang Sep 12, 2021
110183c
Recompose structure to unit test convenient.
chinghongfang Sep 13, 2021
f038e70
Merge branch 'skiptests:main' into performance
chinghongfang Sep 13, 2021
cd27779
AvgLatency unit test.
Sep 13, 2021
e1df327
Format the code.
Sep 13, 2021
e7d2b85
Merge branch 'skiptests:main' into main
chinghongfang Sep 14, 2021
0768cdc
Write unit test
chinghongfang Sep 14, 2021
d2c31c3
Change new-line symbol in format output
chinghongfang Sep 14, 2021
7ec6148
Reconstruct and unit test
Sep 14, 2021
1eb07f7
Remove renamed file
Sep 14, 2021
1ff8073
Warm up, producing record to all partitions
chinghongfang Sep 16, 2021
3600b4d
Add warm up process.
chinghongfang Sep 26, 2021
4df3be2
Lengthen test wait time for auto closing
chinghongfang Sep 26, 2021
7dc08b1
Merge remote-tracking branch 'upstream/main' into performance
chinghongfang Sep 29, 2021
6713c87
Solve record lost problem
chinghongfang Oct 1, 2021
f40ea23
Add some comment
chinghongfang Oct 2, 2021
6a763e7
Add description Performance benchmark tool to README
chinghongfang Oct 2, 2021
d0126c1
Add some comment.
chinghongfang Oct 3, 2021
3a131bf
Merge remote-tracking branch 'upstream/main' into performance
chinghongfang Oct 3, 2021
7ceb8b0
Merge remote-tracking branch 'upstream/main' into performance
chinghongfang Oct 8, 2021
1da9e07
Parse argument by jcommander
chinghongfang Oct 8, 2021
8149b6e
Merge remote-tracking branch 'upstream/main' into performance
chinghongfang Oct 8, 2021
cc822dc
Parse short value
chinghongfang Oct 8, 2021
1fd45a4
Wait longer on testing consumer/producer close
chinghongfang Oct 8, 2021
75dfc40
Update README
chinghongfang Oct 8, 2021
6774127
Fix inconsistent names.
chinghongfang Oct 17, 2021
c727f84
Merge remote-tracking branch 'upstream/main' into performance
chinghongfang Oct 17, 2021
6a62c42
Update performance argument
chinghongfang Oct 18, 2021
6db1a6d
Reconstruct main logic
chinghongfang Oct 20, 2021
242494c
Merge remote-tracking branch 'upstream/main' into performance
chinghongfang Oct 20, 2021
968086c
Spotless Apply
chinghongfang Oct 20, 2021
92a3bd0
Merge remote-tracking branch 'upstream/main' into performance
chinghongfang Oct 20, 2021
186c956
Change string to variable
chinghongfang Oct 20, 2021
204c322
Merge remote-tracking branch 'upstream/main' into performance
chinghongfang Oct 20, 2021
0f65c95
Merge remote-tracking branch 'upstream/main' into performance
chinghongfang Oct 21, 2021
bbbae7b
Change unit test environment to real world
chinghongfang Oct 21, 2021
b5ca2f1
Metric thread-safe and consumer same topic
chinghongfang Oct 22, 2021
8ba0b5b
Add partitioner setting for producer
chinghongfang Oct 25, 2021
f978452
Spotless Apply
chinghongfang Oct 25, 2021
ba0503e
Merge remote-tracking branch 'upstream/main' into performance
chinghongfang Oct 25, 2021
3c78d3e
Consumer/producer configure property file
chinghongfang Oct 27, 2021
b7256a2
Throw exception rather than ignore
chinghongfang Oct 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ This project offers many kafka tools to simplify the life for kafka users.

1. [Kafka quick start](#kafka-cluster-quick-start): set up a true kafka cluster in one minute
2. [Kafka benchmark](#latency-benchmark): run producers/consumers to test the performance and consistency for kafka cluster
3. [Kafka offset explorer](#offset-explorer): check the start/end offsets of kafka topics
4. [Kafka official tool](#kafka-official-tool): run any one specific kafka official tool. All you have to prepare is the docker env.
5. [Kafka metric client](#kafka-metric-client): utility for accessing kafka Mbean metrics via JMX.
6. [Replica Collie](#replica-collie): move replicas from brokers to others. You can use this tool to obstruct specific brokers from hosting specific topics.
3. [Kafka performance](#Performance Benchmark): check producing/consuming performance.
chinghongfang marked this conversation as resolved.
Show resolved Hide resolved
4. [Kafka offset explorer](#offset-explorer): check the start/end offsets of kafka topics
5. [Kafka official tool](#kafka-official-tool): run any one specific kafka official tool. All you have to prepare is the docker env.
6[Kafka metric client](#kafka-metric-client): utility for accessing kafka Mbean metrics via JMX.
7[Replica Collie](#replica-collie): move replicas from brokers to others. You can use this tool to obstruct specific brokers from hosting specific topics.
chinghongfang marked this conversation as resolved.
Show resolved Hide resolved

[Release page](https://github.com/skiptests/astraea/releases) offers the uber jar including all tools.
```shell
Expand Down Expand Up @@ -94,6 +95,28 @@ java -jar app-0.0.1-SNAPSHOT-all.jar latency --bootstrap.servers 192.168.50.224:

---

## Performance Benchmark
chinghongfang marked this conversation as resolved.
Show resolved Hide resolved
This tool is used test to following metrics.
1. publish latency: the time of completing producer data request
2. E2E latency: the time for a record to travel through Kafka
3. input rate: sum of consumer inputs in MByte per second
4. output rate: sum of producer outputs in MByte per second

Run the benchmark from source
```shell
./gradlew run --args="Performance --bootstrap.servers localhost:9092 --topic topic --topicConfig partitions:10,replicationFactor:3 --producers 5 --consumers 1 --records 100000 --recordSize 10000"
```
### Performance Benchmark Configurations
1. --bootstrap.servers: the server to connect to
2. --topic: the topic name
3. --partitions: topic config when creating new topic. Default: 1
4. --replicas: topic config when creating new topic. Default: 1
5. --consumers: the number of consumers (threads). Default: 1
6. --records: the total number of records sent by the producers. Default: 1000
7. --record.size: the record size in byte. Default: 1024 byte

---

## Topic Explorer

This tool can expose both earliest offset and latest offset for all (public and private) topics.
Expand Down
4 changes: 3 additions & 1 deletion app/src/main/java/org/astraea/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.List;
import java.util.Map;
import org.astraea.metrics.kafka.KafkaMetricClientApp;
import org.astraea.performance.Performance;
import org.astraea.performance.latency.End2EndLatency;
import org.astraea.topic.ReplicaCollie;
import org.astraea.topic.TopicExplorer;
Expand All @@ -16,7 +17,8 @@ public class App {
"latency", End2EndLatency.class,
"offset", TopicExplorer.class,
"metrics", KafkaMetricClientApp.class,
"replica", ReplicaCollie.class);
"replica", ReplicaCollie.class,
"performance", Performance.class);

static void execute(Map<String, Class<?>> mains, List<String> args) throws Throwable {

Expand Down
7 changes: 7 additions & 0 deletions app/src/main/java/org/astraea/argument/ArgumentUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ public Boolean convert(String value) {
}
}

public static class ShortConverter implements IStringConverter<Short> {
@Override
public Short convert(String value) {
return Short.valueOf(value);
}
}

private static <C extends Collection<T>, T> C requireNonEmpty(C collection) {
if (collection.isEmpty()) throw new ParameterException("array type can't be empty");
return collection;
Expand Down
39 changes: 39 additions & 0 deletions app/src/main/java/org/astraea/performance/ComponentFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.astraea.performance;

import java.util.Collection;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;

/** An interface used for creating producer, consumer, admin. */
public interface ComponentFactory {
Producer createProducer();

Consumer createConsumer(Collection<String> topic);

/**
* Used for creating Kafka producer, consumer, admin of the same Kafka server. The consumers
* generated by the same object from `fromKafka(brokers)` have the same groupID
*/
static ComponentFactory fromKafka(String brokers) {
return new ComponentFactory() {
private final String groupId = "groupId:" + System.currentTimeMillis();
/** Create Producer with KafkaProducer<byte[], byte[]> functions */
@Override
public Producer createProducer() {
Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
return Producer.fromKafka(prop);
}

/** Create Consumer with KafkaConsumer<byte[], byte[]> functions */
@Override
public Consumer createConsumer(Collection<String> topic) {
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
chinghongfang marked this conversation as resolved.
Show resolved Hide resolved
return Consumer.fromKafka(prop, topic);
}
};
}
}
43 changes: 43 additions & 0 deletions app/src/main/java/org/astraea/performance/Consumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.astraea.performance;

import java.time.Duration;
import java.util.Collection;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

/** An interface for polling records. */
public interface Consumer {
ConsumerRecords<byte[], byte[]> poll(Duration timeout);

void wakeup();

void cleanup();

/** Create a Consumer with KafkaConsumer<byte[], byte[]> functionality */
static Consumer fromKafka(Properties prop, Collection<String> topics) {

var kafkaConsumer =
new KafkaConsumer<byte[], byte[]>(
prop, new ByteArrayDeserializer(), new ByteArrayDeserializer());
kafkaConsumer.subscribe(topics);
return new Consumer() {

@Override
public ConsumerRecords<byte[], byte[]> poll(Duration timeout) {
return kafkaConsumer.poll(timeout);
}

@Override
public void wakeup() {
kafkaConsumer.wakeup();
}

@Override
public void cleanup() {
kafkaConsumer.close();
}
};
}
}
61 changes: 61 additions & 0 deletions app/src/main/java/org/astraea/performance/Metrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package org.astraea.performance;

import java.util.concurrent.atomic.LongAdder;

/** Used to track statistics. */
public class Metrics {
chinghongfang marked this conversation as resolved.
Show resolved Hide resolved
private double avgLatency;
private long num;
private long max;
private long min;
private final LongAdder bytes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

因為所有public methods都已經使用synchronized,似乎這裡也就不用LongAdder


public Metrics() {
avgLatency = 0;
num = 0;
max = 0;
min = Long.MAX_VALUE;
bytes = new LongAdder();
}
/** Add a new value to latency metric. */
public void putLatency(long latency) {
if (min > latency) min = latency;
if (max < latency) max = latency;
++num;
avgLatency += (((double) latency) - avgLatency) / (double) num;
}
/** Add a new value to bytes. */
public void addBytes(long bytes) {
this.bytes.add(bytes);
}

/** Get the number of latency put. */
public long num() {
return num;
}
/** Get the maximum of latency put. */
public long max() {
return max;
}
/** Get the minimum of latency put. */
public long min() {
return min;
}
/** Get the average latency. */
public double avgLatency() {
return avgLatency;
}
/** Reset to 0 and returns the old value of bytes */
public long bytesThenReset() {
return this.bytes.sumThenReset();
}
/** Set all attributes to default value */
public void reset() {
avgLatency = 0;
num = 0;
max = 0;
// 初始為最大的integer值
min = Long.MAX_VALUE;
bytes.reset();
}
}
Loading