Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move --configs to base argument #327

Merged
merged 2 commits into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 34 additions & 21 deletions app/src/main/java/org/astraea/argument/Argument.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,23 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.CommonClientConfigs;

/** This basic argument defines the common property used by all kafka clients. */
public abstract class Argument {

static String[] filterEmpty(String[] args) {
return Arrays.stream(args)
.map(String::trim)
.filter(trim -> !trim.isEmpty())
.toArray(String[]::new);
}

/**
* Side effect: parse args into toolArgument
*
Expand All @@ -25,7 +34,8 @@ public static <T> T parse(T toolArgument, String[] args) {
JCommander jc = JCommander.newBuilder().addObject(toolArgument).build();
jc.setUsageFormatter(new UnixStyleUsageFormatter(jc));
try {
jc.parse(args);
// filter the empty string
jc.parse(filterEmpty(args));
} catch (ParameterException pe) {
var sb = new StringBuilder();
jc.getUsageFormatter().usage(sb);
Expand All @@ -39,38 +49,41 @@ public static <T> T parse(T toolArgument, String[] args) {
description = "String: server to connect to",
validateWith = NonEmptyStringField.class,
required = true)
public String brokers;
String bootstrapServers;

/**
* @param propsFile file path containing the properties to be passed to kafka
* @return the kafka props consists of bootstrap servers and all props from file (if it is
* existent)
* @return all configs from both "--configs" and "--prop.file". Other kafka-related configs are
* added also.
*/
Map<String, Object> properties(String propsFile) {
var props = new Properties();
if (propsFile != null) {
try (var input = new FileInputStream(propsFile)) {
public Map<String, String> configs() {
var all = new HashMap<>(configs);
if (propFile != null) {
var props = new Properties();
try (var input = new FileInputStream(propFile)) {
props.load(input);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
props.forEach((k, v) -> all.put(k.toString(), v.toString()));
}
props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokers);
return props.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().toString(), Map.Entry::getValue));
all.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return Collections.unmodifiableMap(all);
}

public String bootstrapServers() {
return bootstrapServers;
}

@Parameter(
names = {"--prop.file"},
description = "the file path containing the properties to be passed to kafka admin",
validateWith = NonEmptyStringField.class)
public String propFile;
String propFile;

/**
* @return the kafka props consists of bootstrap servers and all props from file (if it is
* existent)
*/
public Map<String, Object> props() {
return properties(propFile);
}
@Parameter(
names = {"--configs"},
description = "Map: set configs by command-line. For example: --configs a=b,c=d",
converter = StringMapField.class,
validateWith = StringMapField.class)
Map<String, String> configs = Map.of();
}
5 changes: 3 additions & 2 deletions app/src/main/java/org/astraea/consumer/Builder.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ public Builder<Key, Value> configs(Map<String, String> configs) {
return this;
}

public Builder<Key, Value> brokers(String brokers) {
return config(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Objects.requireNonNull(brokers));
public Builder<Key, Value> bootstrapServers(String bootstrapServers) {
return config(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Objects.requireNonNull(bootstrapServers));
}

public Builder<Key, Value> consumerRebalanceListener(ConsumerRebalanceListener listener) {
Expand Down
4 changes: 2 additions & 2 deletions app/src/main/java/org/astraea/consumer/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ static Builder<byte[], byte[]> builder() {
return new Builder<>();
}

static Consumer<byte[], byte[]> of(String brokers) {
return builder().brokers(brokers).build();
static Consumer<byte[], byte[]> of(String bootstrapServers) {
return builder().bootstrapServers(bootstrapServers).build();
}

static Consumer<byte[], byte[]> of(Map<String, String> configs) {
Expand Down
32 changes: 7 additions & 25 deletions app/src/main/java/org/astraea/performance/Performance.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,15 @@
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
Expand All @@ -27,7 +24,6 @@
import org.astraea.argument.PathField;
import org.astraea.argument.PositiveLongField;
import org.astraea.argument.PositiveShortField;
import org.astraea.argument.StringMapField;
import org.astraea.concurrent.Executor;
import org.astraea.concurrent.State;
import org.astraea.concurrent.ThreadPool;
Expand Down Expand Up @@ -96,14 +92,14 @@ static List<ProducerExecutor> producerExecutors(
argument.isolation() == Isolation.READ_COMMITTED ? argument.transactionSize : 1,
argument.isolation() == Isolation.READ_COMMITTED
? Producer.builder()
.configs(argument.allConfigs())
.brokers(argument.brokers)
.configs(argument.configs())
.bootstrapServers(argument.bootstrapServers())
.compression(argument.compression)
.partitionClassName(argument.partitioner)
.buildTransactional()
: Producer.builder()
.configs(argument.allConfigs())
.brokers(argument.brokers)
.configs(argument.configs())
.bootstrapServers(argument.bootstrapServers())
.compression(argument.compression)
.partitionClassName(argument.partitioner)
.build(),
Expand All @@ -116,7 +112,7 @@ static List<ProducerExecutor> producerExecutors(
public static Result execute(final Argument param)
throws InterruptedException, IOException, ExecutionException {
List<Integer> partitions;
try (var topicAdmin = TopicAdmin.of(param.props())) {
try (var topicAdmin = TopicAdmin.of(param.configs())) {
topicAdmin
.creator()
.numberOfReplicas(param.replicas)
Expand Down Expand Up @@ -167,10 +163,10 @@ public static Result execute(final Argument param)
i ->
consumerExecutor(
Consumer.builder()
.brokers(param.brokers)
.bootstrapServers(param.bootstrapServers())
.topics(Set.of(param.topic))
.groupId(groupId)
.configs(param.allConfigs())
.configs(param.configs())
.isolation(param.isolation())
.consumerRebalanceListener(
ignore -> consumerBalancerLatch.countDown())
Expand Down Expand Up @@ -252,13 +248,6 @@ private static boolean positiveSpecifyBroker(Argument param) {

public static class Argument extends org.astraea.argument.Argument {

Map<String, String> allConfigs() {
var all = new HashMap<>(configs);
props().forEach((k, v) -> all.put(k, v.toString()));
all.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression.nameOfKafka());
return all;
}

@Parameter(
names = {"--topic"},
description = "String: topic name",
Expand Down Expand Up @@ -315,13 +304,6 @@ Map<String, String> allConfigs() {
validateWith = NonEmptyStringField.class)
String partitioner = DefaultPartitioner.class.getName();

@Parameter(
names = {"--configs"},
description = "Map: set the configuration passed to producer/partitioner",
converter = StringMapField.class,
validateWith = StringMapField.class)
Map<String, String> configs = Map.of();

@Parameter(
names = {"--compression"},
description =
Expand Down
5 changes: 3 additions & 2 deletions app/src/main/java/org/astraea/producer/Builder.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ public Builder<Key, Value> configs(Map<String, String> configs) {
return this;
}

public Builder<Key, Value> brokers(String brokers) {
return config(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Objects.requireNonNull(brokers));
public Builder<Key, Value> bootstrapServers(String bootstrapServers) {
return config(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Objects.requireNonNull(bootstrapServers));
}

public Builder<Key, Value> partitionClassName(String partitionClassName) {
Expand Down
4 changes: 2 additions & 2 deletions app/src/main/java/org/astraea/producer/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ static Builder<byte[], byte[]> builder() {
return new Builder<>();
}

static Producer<byte[], byte[]> of(String brokers) {
return builder().brokers(brokers).build();
static Producer<byte[], byte[]> of(String bootstrapServers) {
return builder().bootstrapServers(bootstrapServers).build();
}

static Producer<byte[], byte[]> of(Map<String, String> configs) {
Expand Down
7 changes: 4 additions & 3 deletions app/src/main/java/org/astraea/topic/Builder.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ public class Builder {

Builder() {}

public Builder brokers(String brokers) {
this.configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, Objects.requireNonNull(brokers));
public Builder bootstrapServers(String bootstrapServers) {
this.configs.put(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, Objects.requireNonNull(bootstrapServers));
return this;
}

public Builder configs(Map<String, Object> configs) {
public Builder configs(Map<String, String> configs) {
this.configs.putAll(configs);
return this;
}
Expand Down
5 changes: 2 additions & 3 deletions app/src/main/java/org/astraea/topic/ReplicaCollie.java
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ static Map<TopicPartition, MigratorInfo> execute(TopicAdmin admin, Argument args
public static void main(String[] args) throws IOException {
var argument = org.astraea.argument.Argument.parse(new Argument(), args);

try (var admin = TopicAdmin.of(argument.props())) {
try (var admin = TopicAdmin.of(argument.bootstrapServers())) {
execute(admin, argument)
.forEach(
(tp, assignments) ->
Expand Down Expand Up @@ -353,8 +353,7 @@ static class Argument extends org.astraea.argument.Argument {

@Parameter(
names = {"--verify"},
description =
"True if you just want to see the new assignment instead of executing the plan",
description = "add this flag if all you want to do is to review the plan",
validateWith = BooleanField.class,
converter = BooleanField.class)
boolean verify = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class ReplicaSyncingMonitor {

public static void main(String[] args) {
Argument argument = org.astraea.argument.Argument.parse(new Argument(), args);
try (TopicAdmin topicAdmin = TopicAdmin.of(argument.props())) {
try (TopicAdmin topicAdmin = TopicAdmin.of(argument.bootstrapServers())) {
execute(topicAdmin, argument);
}
}
Expand Down
6 changes: 3 additions & 3 deletions app/src/main/java/org/astraea/topic/TopicAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ static Builder builder() {
return new Builder();
}

static TopicAdmin of(String brokers) {
return builder().brokers(brokers).build();
static TopicAdmin of(String bootstrapServers) {
return builder().bootstrapServers(bootstrapServers).build();
}

static TopicAdmin of(Map<String, Object> configs) {
static TopicAdmin of(Map<String, String> configs) {
return builder().configs(configs).build();
}

Expand Down
2 changes: 1 addition & 1 deletion app/src/main/java/org/astraea/topic/TopicExplorer.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ static Result execute(TopicAdmin admin, Set<String> topics) {

public static void main(String[] args) throws IOException {
var argument = org.astraea.argument.Argument.parse(new Argument(), args);
try (var admin = TopicAdmin.of(argument.props())) {
try (var admin = TopicAdmin.of(argument.bootstrapServers())) {
var result = execute(admin, argument.topics.isEmpty() ? admin.topicNames() : argument.topics);
TreeOutput.print(result, System.out);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public static Map<Integer, Map<TopicPartition, Double>> execute(

public static void main(String[] args) {
var argument = org.astraea.argument.Argument.parse(new Argument(), args);
var admin = TopicAdmin.of(argument.brokers);
var admin = TopicAdmin.of(argument.bootstrapServers());
var score = execute(argument, admin);
printScore(score, argument);
}
Expand Down
53 changes: 53 additions & 0 deletions app/src/test/java/org/astraea/argument/ArgumentTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.astraea.argument;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class ArgumentTest {

@Test
void testFilterEmpty() {
var args0 = Argument.filterEmpty(new String[] {"", " ", " "});
Assertions.assertEquals(0, args0.length);

var args1 = Argument.filterEmpty(new String[] {"", "1 ", " "});
Assertions.assertEquals(1, args1.length);
Assertions.assertEquals("1", args1[0]);

var args2 = Argument.filterEmpty(new String[] {"", " 1", " 2 "});
Assertions.assertEquals(2, args2.length);
Assertions.assertEquals("1", args2[0]);
Assertions.assertEquals("2", args2[1]);
}

@Test
void testCommonProperties() throws IOException {
var file = Files.createTempFile("test_basic_argument", "");
try (var output = new BufferedWriter(new FileWriter(file.toFile()))) {
output.write("key1=value1");
output.newLine();
output.write("key2=value2");
}
var argument =
Argument.parse(
new DumbArgument(),
new String[] {
"--bootstrap.servers", "abc", "--prop.file", file.toString(), "--configs", "a=b"
});
Assertions.assertEquals(1, argument.configs.size());
Assertions.assertEquals("b", argument.configs.get("a"));
Assertions.assertEquals(file.toString(), argument.propFile);
// 2 (from prop file) + 1 (from configs) + 1 (bootstrap servers)
Assertions.assertEquals(4, argument.configs().size());
Assertions.assertEquals("abc", argument.bootstrapServers);
Assertions.assertEquals("abc", argument.bootstrapServers());
Assertions.assertEquals("value1", argument.configs().get("key1"));
Assertions.assertEquals("value2", argument.configs().get("key2"));
}

private static class DumbArgument extends Argument {}
}
Loading