Skip to content

Commit

Permalink
Merge pull request opensource4you#7 from qoo332001/workflows
Browse files Browse the repository at this point in the history
good
  • Loading branch information
garyparrot authored Mar 28, 2022
2 parents da7bed7 + 4d0d7fd commit b64d60c
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions app/src/main/java/org/astraea/workload/RealtimeApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import java.time.Duration;
import java.util.Map;
import java.util.Set;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand All @@ -12,7 +14,7 @@ public static class Producer implements Workload {
/** @param argument topicName */
@Override
public void run(String bootstrapServer, String argument) {
final String[] split = argument.split(":");
final String[] split = argument.split(",");
var topicName = split[0];
final KafkaProducer<byte[], byte[]> kafkaProducer =
org.astraea.producer.Producer.of(bootstrapServer).kafkaProducer();
Expand All @@ -32,31 +34,36 @@ public static class Consumer implements Workload {
*/
@Override
public void run(String bootstrapServer, String argument) {
final String[] split = argument.split(":");
final String[] split = argument.split(",");
var topicName = split[0];
var groupName = split[1];
final KafkaConsumer<?, ?> kafkaConsumer =
new KafkaConsumer<>(
Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest"));

new KafkaConsumer<>(
Map.of(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer,
ConsumerConfig.GROUP_ID_CONFIG,
groupName,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest"));
kafkaConsumer.subscribe(Set.of(topicName));
while (true) {
kafkaConsumer.poll(Duration.ofSeconds(1L));
}
}

@Override
public String explainArgument() {
return "(topic name)";
return "(topic name),(group id)";
}
}

public static void main(String[] args) throws InterruptedException {
Workload workloadProducer = new RealtimeApplication.Producer();
workloadProducer.run("192.168.103.39:11300", "test-1:10");
Workload workloadConsumer = new RealtimeApplication.Producer();
workloadConsumer.run("192.168.103.39:11300", "test-1:10");
Workload workloadProducer = new RealtimeApplication.Consumer();
workloadProducer.run("192.168.103.39:11300", "test-1,good");
}
}

0 comments on commit b64d60c

Please sign in to comment.