Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance testing tool #11

Merged
merged 51 commits into from
Oct 27, 2021
Merged
Changes from 49 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
fb7eb21
Add my name to README in authors field
Sep 3, 2021
336eeee
Merge remote-tracking branch 'upstream/main' into main
Sep 3, 2021
a64c29e
Merge branch 'skiptests:main' into main
chinghongfang Sep 4, 2021
39920d3
Merge branch 'skiptests:main' into main
chinghongfang Sep 5, 2021
402c13e
Create producer/consumer executor.
Sep 5, 2021
721add0
Producer/Consumer works.
chinghongfang Sep 7, 2021
3aaf3fa
Record bytes read/write per second.
chinghongfang Sep 7, 2021
8dcaed5
Performance testing tool
chinghongfang Sep 8, 2021
2e19e26
Merge branch 'skiptests:main' into performance
chinghongfang Sep 11, 2021
541ce66
Merge branch 'skiptests:main' into main
chinghongfang Sep 12, 2021
110183c
Recompose structure to unit test convenient.
chinghongfang Sep 13, 2021
f038e70
Merge branch 'skiptests:main' into performance
chinghongfang Sep 13, 2021
cd27779
AvgLatency unit test.
Sep 13, 2021
e1df327
Format the code.
Sep 13, 2021
e7d2b85
Merge branch 'skiptests:main' into main
chinghongfang Sep 14, 2021
0768cdc
Write unit test
chinghongfang Sep 14, 2021
d2c31c3
Change new-line symbol in format output
chinghongfang Sep 14, 2021
7ec6148
Reconstruct and unit test
Sep 14, 2021
1eb07f7
Remove renamed file
Sep 14, 2021
1ff8073
Warm up, producing record to all partitions
chinghongfang Sep 16, 2021
3600b4d
Add warm up process.
chinghongfang Sep 26, 2021
4df3be2
Lengthen test wait time for auto closing
chinghongfang Sep 26, 2021
7dc08b1
Merge remote-tracking branch 'upstream/main' into performance
chinghongfang Sep 29, 2021
6713c87
Solve record lost problem
chinghongfang Oct 1, 2021
f40ea23
Add some comment
chinghongfang Oct 2, 2021
6a763e7
Add description Performance benchmark tool to README
chinghongfang Oct 2, 2021
d0126c1
Add some comment.
chinghongfang Oct 3, 2021
3a131bf
Merge remote-tracking branch 'upstream/main' into performance
chinghongfang Oct 3, 2021
7ceb8b0
Merge remote-tracking branch 'upstream/main' into performance
chinghongfang Oct 8, 2021
1da9e07
Parse argument by jcommander
chinghongfang Oct 8, 2021
8149b6e
Merge remote-tracking branch 'upstream/main' into performance
chinghongfang Oct 8, 2021
cc822dc
Parse short value
chinghongfang Oct 8, 2021
1fd45a4
Wait longer on testing consumer/producer close
chinghongfang Oct 8, 2021
75dfc40
Update README
chinghongfang Oct 8, 2021
6774127
Fix inconsistent names.
chinghongfang Oct 17, 2021
c727f84
Merge remote-tracking branch 'upstream/main' into performance
chinghongfang Oct 17, 2021
6a62c42
Update performance argument
chinghongfang Oct 18, 2021
6db1a6d
Reconstruct main logic
chinghongfang Oct 20, 2021
242494c
Merge remote-tracking branch 'upstream/main' into performance
chinghongfang Oct 20, 2021
968086c
Spotless Apply
chinghongfang Oct 20, 2021
92a3bd0
Merge remote-tracking branch 'upstream/main' into performance
chinghongfang Oct 20, 2021
186c956
Change string to variable
chinghongfang Oct 20, 2021
204c322
Merge remote-tracking branch 'upstream/main' into performance
chinghongfang Oct 20, 2021
0f65c95
Merge remote-tracking branch 'upstream/main' into performance
chinghongfang Oct 21, 2021
bbbae7b
Change unit test environment to real world
chinghongfang Oct 21, 2021
b5ca2f1
Metric thread-safe and consumer same topic
chinghongfang Oct 22, 2021
8ba0b5b
Add partitioner setting for producer
chinghongfang Oct 25, 2021
f978452
Spotless Apply
chinghongfang Oct 25, 2021
ba0503e
Merge remote-tracking branch 'upstream/main' into performance
chinghongfang Oct 25, 2021
3c78d3e
Consumer/producer configure property file
chinghongfang Oct 27, 2021
b7256a2
Throw exception rather than ignore
chinghongfang Oct 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -14,10 +14,11 @@ This project offers many kafka tools to simplify the life for kafka users.

1. [Kafka quick start](#kafka-cluster-quick-start): set up a true kafka cluster in one minute
2. [Kafka benchmark](#latency-benchmark): run producers/consumers to test the performance and consistency for kafka cluster
3. [Kafka offset explorer](#topic-explorer): check the start/end offsets of kafka topics
4. [Kafka official tool](#kafka-official-tool): run any one specific kafka official tool. All you have to prepare is the docker env.
5. [Kafka metric client](#kafka-metric-client): utility for accessing kafka Mbean metrics via JMX.
6. [Replica Collie](#replica-collie): move replicas from brokers to others. You can use this tool to obstruct specific brokers from hosting specific topics.
3. [Kafka performance](#Performance-Benchmark): check producing/consuming performance.
4. [Kafka offset explorer](#topic-explorer): check the start/end offsets of kafka topics
5. [Kafka official tool](#kafka-official-tool): run any one specific kafka official tool. All you have to prepare is the docker env.
6. [Kafka metric client](#kafka-metric-client): utility for accessing kafka Mbean metrics via JMX.
7. [Replica Collie](#replica-collie): move replicas from brokers to others. You can use this tool to obstruct specific brokers from hosting specific topics.

[Release page](https://github.com/skiptests/astraea/releases) offers the uber jar including all tools.
```shell
@@ -94,6 +95,28 @@ java -jar app-0.0.1-SNAPSHOT-all.jar latency --bootstrap.servers 192.168.50.224:

---

## Performance Benchmark
This tool is used test to following metrics.
1. publish latency: the time of completing producer data request
2. E2E latency: the time for a record to travel through Kafka
3. input rate: sum of consumer inputs in MByte per second
4. output rate: sum of producer outputs in MByte per second

Run the benchmark from source
```shell
./gradlew run --args="Performance --bootstrap.servers localhost:9092 --topic topic --topicConfig partitions:10,replicationFactor:3 --producers 5 --consumers 1 --records 100000 --recordSize 10000"
```
### Performance Benchmark Configurations
1. --bootstrap.servers: the server to connect to
2. --topic: the topic name
3. --partitions: topic config when creating new topic. Default: 1
4. --replicas: topic config when creating new topic. Default: 1
5. --consumers: the number of consumers (threads). Default: 1
6. --records: the total number of records sent by the producers. Default: 1000
7. --record.size: the record size in byte. Default: 1024 byte

---

## Topic Explorer

This tool can expose both earliest offset and latest offset for all (public and private) topics.
4 changes: 3 additions & 1 deletion app/src/main/java/org/astraea/App.java
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@
import java.util.List;
import java.util.Map;
import org.astraea.metrics.kafka.KafkaMetricClientApp;
import org.astraea.performance.Performance;
import org.astraea.performance.latency.End2EndLatency;
import org.astraea.topic.ReplicaCollie;
import org.astraea.topic.TopicExplorer;
@@ -16,7 +17,8 @@ public class App {
"latency", End2EndLatency.class,
"offset", TopicExplorer.class,
"metrics", KafkaMetricClientApp.class,
"replica", ReplicaCollie.class);
"replica", ReplicaCollie.class,
"performance", Performance.class);

static void execute(Map<String, Class<?>> mains, List<String> args) throws Throwable {

7 changes: 7 additions & 0 deletions app/src/main/java/org/astraea/argument/ArgumentUtil.java
Original file line number Diff line number Diff line change
@@ -96,6 +96,13 @@ public Boolean convert(String value) {
}
}

public static class ShortConverter implements IStringConverter<Short> {
@Override
public Short convert(String value) {
return Short.valueOf(value);
}
}

private static <C extends Collection<T>, T> C requireNonEmpty(C collection) {
if (collection.isEmpty()) throw new ParameterException("array type can't be empty");
return collection;
78 changes: 78 additions & 0 deletions app/src/main/java/org/astraea/performance/ComponentFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package org.astraea.performance;

import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;

/** An interface used for creating producer, consumer. */
public interface ComponentFactory {
Producer createProducer();

Consumer createConsumer();

/**
* (Optional) Setting partitioner for producer. If not set, use default partitioner RoundRobin
*
* @param partitionerName The partitioner class to use
* @return This factory
*/
default ComponentFactory partitioner(String partitionerName) {
return this;
}

/**
* (Optional) JMX server for user defined partitioner. Default partitioner will not use this
* attribute
*
* @param JMXAddress The jmx address to connect to
* @return This factory
*/
default ComponentFactory JMXAddress(String JMXAddress) {
return this;
}

/**
* Used for creating Kafka producer, consumer of the same Kafka server and the same topic. The
* consumers generated by the same object from `fromKafka(brokers)` subscribe the same topic and
* have the same groupID.
*/
static ComponentFactory fromKafka(String brokers, String topic) {
return new ComponentFactory() {
private final String groupId = "groupId:" + System.currentTimeMillis();
private String partitionerName = DefaultPartitioner.class.getName();
private String JMXAddress = "0.0.0.0@0";
/** Create Producer with KafkaProducer<byte[], byte[]> functions */
@Override
public Producer createProducer() {
Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, partitionerName);
prop.put("jmx_servers", JMXAddress);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這邊要考慮一下JMXAddress不存在的時候該怎麼辦?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這裡是要給partitioner使用的參數,也只有custom partitioner 會使用這項參數。這裡想交由partitioner做判斷,看partitioner是否接受無JMXAddress

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我沒表達清楚。

想像一下之後的情境,我們應該會測試poison partitioner和其他partitioners的比較,換言之,前者我們會設定jmx address,但後者我們不會設定。因此這邊應該要判斷是否有jmx address來決定要用哪個partitioner

當然這個可以到下一個PR再來處理,那麼我們應該先不要引入jmx address這個參數,下一隻PR在處理

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

瞭解,這裡就先不引入jmx address,之後再來處理。

return Producer.fromKafka(prop, topic);
}

/** Create Consumer with KafkaConsumer<byte[], byte[]> functions */
@Override
public Consumer createConsumer() {
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return Consumer.fromKafka(prop, Collections.singleton(topic));
}

@Override
public ComponentFactory partitioner(String partitionerName) {
this.partitionerName = partitionerName;
return this;
}

@Override
public ComponentFactory JMXAddress(String JMXAddress) {
this.JMXAddress = JMXAddress;
return this;
}
};
}
}
43 changes: 43 additions & 0 deletions app/src/main/java/org/astraea/performance/Consumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.astraea.performance;

import java.time.Duration;
import java.util.Collection;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

/** An interface for polling records. */
public interface Consumer {
ConsumerRecords<byte[], byte[]> poll(Duration timeout);

void wakeup();

void cleanup();

/** Create a Consumer with KafkaConsumer<byte[], byte[]> functionality */
static Consumer fromKafka(Properties prop, Collection<String> topics) {

var kafkaConsumer =
new KafkaConsumer<byte[], byte[]>(
prop, new ByteArrayDeserializer(), new ByteArrayDeserializer());
kafkaConsumer.subscribe(topics);
return new Consumer() {

@Override
public ConsumerRecords<byte[], byte[]> poll(Duration timeout) {
return kafkaConsumer.poll(timeout);
}

@Override
public void wakeup() {
kafkaConsumer.wakeup();
}

@Override
public void cleanup() {
kafkaConsumer.close();
}
};
}
}
67 changes: 67 additions & 0 deletions app/src/main/java/org/astraea/performance/Metrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.astraea.performance;

import java.util.concurrent.atomic.LongAdder;

/** Used to record statistics. This is thread safe. */
public class Metrics {
private double avgLatency;
private long num;
private long max;
private long min;
private final LongAdder bytes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

因為所有public methods都已經使用synchronized,似乎這裡也就不用LongAdder


public Metrics() {
avgLatency = 0;
num = 0;
max = 0;
min = Long.MAX_VALUE;
bytes = new LongAdder();
}

/** Simultaneously add latency and bytes. */
public synchronized void put(long latency, long bytes) {
putLatency(latency);
addBytes(bytes);
}
/** Add a new value to latency metric. */
public synchronized void putLatency(long latency) {
if (min > latency) min = latency;
if (max < latency) max = latency;
++num;
avgLatency += (((double) latency) - avgLatency) / (double) num;
}
/** Add a new value to bytes. */
public synchronized void addBytes(long bytes) {
this.bytes.add(bytes);
}

/** Get the number of latency put. */
public synchronized long num() {
return num;
}
/** Get the maximum of latency put. */
public synchronized long max() {
return max;
}
/** Get the minimum of latency put. */
public synchronized long min() {
return min;
}
/** Get the average latency. */
public synchronized double avgLatency() {
return avgLatency;
}
/** Reset to 0 and returns the old value of bytes */
public synchronized long bytesThenReset() {
return this.bytes.sumThenReset();
}
/** Set all attributes to default value */
public synchronized void reset() {
avgLatency = 0;
num = 0;
max = 0;
// 初始為最大的integer值
min = Long.MAX_VALUE;
bytes.reset();
}
}
Loading