diff --git a/app/src/main/java/org/astraea/argument/Argument.java b/app/src/main/java/org/astraea/argument/Argument.java index 866b3627b4..30d7a0e41e 100644 --- a/app/src/main/java/org/astraea/argument/Argument.java +++ b/app/src/main/java/org/astraea/argument/Argument.java @@ -7,14 +7,23 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Properties; -import java.util.stream.Collectors; import org.apache.kafka.clients.CommonClientConfigs; /** This basic argument defines the common property used by all kafka clients. */ public abstract class Argument { + static String[] filterEmpty(String[] args) { + return Arrays.stream(args) + .map(String::trim) + .filter(trim -> !trim.isEmpty()) + .toArray(String[]::new); + } + /** * Side effect: parse args into toolArgument * @@ -25,7 +34,8 @@ public static T parse(T toolArgument, String[] args) { JCommander jc = JCommander.newBuilder().addObject(toolArgument).build(); jc.setUsageFormatter(new UnixStyleUsageFormatter(jc)); try { - jc.parse(args); + // filter the empty string + jc.parse(filterEmpty(args)); } catch (ParameterException pe) { var sb = new StringBuilder(); jc.getUsageFormatter().usage(sb); @@ -39,38 +49,41 @@ public static T parse(T toolArgument, String[] args) { description = "String: server to connect to", validateWith = NonEmptyStringField.class, required = true) - public String brokers; + String bootstrapServers; /** - * @param propsFile file path containing the properties to be passed to kafka - * @return the kafka props consists of bootstrap servers and all props from file (if it is - * existent) + * @return all configs from both "--configs" and "--prop.file". Other kafka-related configs are + * added also. */ - Map properties(String propsFile) { - var props = new Properties(); - if (propsFile != null) { - try (var input = new FileInputStream(propsFile)) { + public Map configs() { + var all = new HashMap<>(configs); + if (propFile != null) { + var props = new Properties(); + try (var input = new FileInputStream(propFile)) { props.load(input); } catch (IOException e) { throw new UncheckedIOException(e); } + props.forEach((k, v) -> all.put(k.toString(), v.toString())); } - props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokers); - return props.entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey().toString(), Map.Entry::getValue)); + all.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + return Collections.unmodifiableMap(all); + } + + public String bootstrapServers() { + return bootstrapServers; } @Parameter( names = {"--prop.file"}, description = "the file path containing the properties to be passed to kafka admin", validateWith = NonEmptyStringField.class) - public String propFile; + String propFile; - /** - * @return the kafka props consists of bootstrap servers and all props from file (if it is - * existent) - */ - public Map props() { - return properties(propFile); - } + @Parameter( + names = {"--configs"}, + description = "Map: set configs by command-line. For example: --configs a=b,c=d", + converter = StringMapField.class, + validateWith = StringMapField.class) + Map configs = Map.of(); } diff --git a/app/src/main/java/org/astraea/consumer/Builder.java b/app/src/main/java/org/astraea/consumer/Builder.java index 2de5d8e72b..c609657b05 100644 --- a/app/src/main/java/org/astraea/consumer/Builder.java +++ b/app/src/main/java/org/astraea/consumer/Builder.java @@ -73,8 +73,9 @@ public Builder configs(Map configs) { return this; } - public Builder brokers(String brokers) { - return config(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Objects.requireNonNull(brokers)); + public Builder bootstrapServers(String bootstrapServers) { + return config( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Objects.requireNonNull(bootstrapServers)); } public Builder consumerRebalanceListener(ConsumerRebalanceListener listener) { diff --git a/app/src/main/java/org/astraea/consumer/Consumer.java b/app/src/main/java/org/astraea/consumer/Consumer.java index 5078a83bed..e223421354 100644 --- a/app/src/main/java/org/astraea/consumer/Consumer.java +++ b/app/src/main/java/org/astraea/consumer/Consumer.java @@ -36,8 +36,8 @@ static Builder builder() { return new Builder<>(); } - static Consumer of(String brokers) { - return builder().brokers(brokers).build(); + static Consumer of(String bootstrapServers) { + return builder().bootstrapServers(bootstrapServers).build(); } static Consumer of(Map configs) { diff --git a/app/src/main/java/org/astraea/performance/Performance.java b/app/src/main/java/org/astraea/performance/Performance.java index 085708ffe1..55905e1f57 100644 --- a/app/src/main/java/org/astraea/performance/Performance.java +++ b/app/src/main/java/org/astraea/performance/Performance.java @@ -5,10 +5,8 @@ import java.nio.file.Path; import java.time.Duration; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -16,7 +14,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; @@ -27,7 +24,6 @@ import org.astraea.argument.PathField; import org.astraea.argument.PositiveLongField; import org.astraea.argument.PositiveShortField; -import org.astraea.argument.StringMapField; import org.astraea.concurrent.Executor; import org.astraea.concurrent.State; import org.astraea.concurrent.ThreadPool; @@ -96,14 +92,14 @@ static List producerExecutors( argument.isolation() == Isolation.READ_COMMITTED ? argument.transactionSize : 1, argument.isolation() == Isolation.READ_COMMITTED ? Producer.builder() - .configs(argument.allConfigs()) - .brokers(argument.brokers) + .configs(argument.configs()) + .bootstrapServers(argument.bootstrapServers()) .compression(argument.compression) .partitionClassName(argument.partitioner) .buildTransactional() : Producer.builder() - .configs(argument.allConfigs()) - .brokers(argument.brokers) + .configs(argument.configs()) + .bootstrapServers(argument.bootstrapServers()) .compression(argument.compression) .partitionClassName(argument.partitioner) .build(), @@ -116,7 +112,7 @@ static List producerExecutors( public static Result execute(final Argument param) throws InterruptedException, IOException, ExecutionException { List partitions; - try (var topicAdmin = TopicAdmin.of(param.props())) { + try (var topicAdmin = TopicAdmin.of(param.configs())) { topicAdmin .creator() .numberOfReplicas(param.replicas) @@ -167,10 +163,10 @@ public static Result execute(final Argument param) i -> consumerExecutor( Consumer.builder() - .brokers(param.brokers) + .bootstrapServers(param.bootstrapServers()) .topics(Set.of(param.topic)) .groupId(groupId) - .configs(param.allConfigs()) + .configs(param.configs()) .isolation(param.isolation()) .consumerRebalanceListener( ignore -> consumerBalancerLatch.countDown()) @@ -252,13 +248,6 @@ private static boolean positiveSpecifyBroker(Argument param) { public static class Argument extends org.astraea.argument.Argument { - Map allConfigs() { - var all = new HashMap<>(configs); - props().forEach((k, v) -> all.put(k, v.toString())); - all.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression.nameOfKafka()); - return all; - } - @Parameter( names = {"--topic"}, description = "String: topic name", @@ -315,13 +304,6 @@ Map allConfigs() { validateWith = NonEmptyStringField.class) String partitioner = DefaultPartitioner.class.getName(); - @Parameter( - names = {"--configs"}, - description = "Map: set the configuration passed to producer/partitioner", - converter = StringMapField.class, - validateWith = StringMapField.class) - Map configs = Map.of(); - @Parameter( names = {"--compression"}, description = diff --git a/app/src/main/java/org/astraea/producer/Builder.java b/app/src/main/java/org/astraea/producer/Builder.java index 82eabb9df4..10ca9a8189 100644 --- a/app/src/main/java/org/astraea/producer/Builder.java +++ b/app/src/main/java/org/astraea/producer/Builder.java @@ -47,8 +47,9 @@ public Builder configs(Map configs) { return this; } - public Builder brokers(String brokers) { - return config(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Objects.requireNonNull(brokers)); + public Builder bootstrapServers(String bootstrapServers) { + return config( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Objects.requireNonNull(bootstrapServers)); } public Builder partitionClassName(String partitionClassName) { diff --git a/app/src/main/java/org/astraea/producer/Producer.java b/app/src/main/java/org/astraea/producer/Producer.java index 21ae4b299b..5921cf3aee 100644 --- a/app/src/main/java/org/astraea/producer/Producer.java +++ b/app/src/main/java/org/astraea/producer/Producer.java @@ -29,8 +29,8 @@ static Builder builder() { return new Builder<>(); } - static Producer of(String brokers) { - return builder().brokers(brokers).build(); + static Producer of(String bootstrapServers) { + return builder().bootstrapServers(bootstrapServers).build(); } static Producer of(Map configs) { diff --git a/app/src/main/java/org/astraea/topic/Builder.java b/app/src/main/java/org/astraea/topic/Builder.java index 259465853b..92fabba87d 100644 --- a/app/src/main/java/org/astraea/topic/Builder.java +++ b/app/src/main/java/org/astraea/topic/Builder.java @@ -42,12 +42,13 @@ public class Builder { Builder() {} - public Builder brokers(String brokers) { - this.configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, Objects.requireNonNull(brokers)); + public Builder bootstrapServers(String bootstrapServers) { + this.configs.put( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, Objects.requireNonNull(bootstrapServers)); return this; } - public Builder configs(Map configs) { + public Builder configs(Map configs) { this.configs.putAll(configs); return this; } diff --git a/app/src/main/java/org/astraea/topic/ReplicaCollie.java b/app/src/main/java/org/astraea/topic/ReplicaCollie.java index 4b1960bad9..f2966462af 100644 --- a/app/src/main/java/org/astraea/topic/ReplicaCollie.java +++ b/app/src/main/java/org/astraea/topic/ReplicaCollie.java @@ -298,7 +298,7 @@ static Map execute(TopicAdmin admin, Argument args public static void main(String[] args) throws IOException { var argument = org.astraea.argument.Argument.parse(new Argument(), args); - try (var admin = TopicAdmin.of(argument.props())) { + try (var admin = TopicAdmin.of(argument.bootstrapServers())) { execute(admin, argument) .forEach( (tp, assignments) -> @@ -353,8 +353,7 @@ static class Argument extends org.astraea.argument.Argument { @Parameter( names = {"--verify"}, - description = - "True if you just want to see the new assignment instead of executing the plan", + description = "add this flag if all you want to do is to review the plan", validateWith = BooleanField.class, converter = BooleanField.class) boolean verify = false; diff --git a/app/src/main/java/org/astraea/topic/ReplicaSyncingMonitor.java b/app/src/main/java/org/astraea/topic/ReplicaSyncingMonitor.java index fe602ef072..ac234496ca 100644 --- a/app/src/main/java/org/astraea/topic/ReplicaSyncingMonitor.java +++ b/app/src/main/java/org/astraea/topic/ReplicaSyncingMonitor.java @@ -28,7 +28,7 @@ public class ReplicaSyncingMonitor { public static void main(String[] args) { Argument argument = org.astraea.argument.Argument.parse(new Argument(), args); - try (TopicAdmin topicAdmin = TopicAdmin.of(argument.props())) { + try (TopicAdmin topicAdmin = TopicAdmin.of(argument.bootstrapServers())) { execute(topicAdmin, argument); } } diff --git a/app/src/main/java/org/astraea/topic/TopicAdmin.java b/app/src/main/java/org/astraea/topic/TopicAdmin.java index c39c516b79..f68a473837 100644 --- a/app/src/main/java/org/astraea/topic/TopicAdmin.java +++ b/app/src/main/java/org/astraea/topic/TopicAdmin.java @@ -13,11 +13,11 @@ static Builder builder() { return new Builder(); } - static TopicAdmin of(String brokers) { - return builder().brokers(brokers).build(); + static TopicAdmin of(String bootstrapServers) { + return builder().bootstrapServers(bootstrapServers).build(); } - static TopicAdmin of(Map configs) { + static TopicAdmin of(Map configs) { return builder().configs(configs).build(); } diff --git a/app/src/main/java/org/astraea/topic/TopicExplorer.java b/app/src/main/java/org/astraea/topic/TopicExplorer.java index ea1d35f022..e45680a841 100644 --- a/app/src/main/java/org/astraea/topic/TopicExplorer.java +++ b/app/src/main/java/org/astraea/topic/TopicExplorer.java @@ -113,7 +113,7 @@ static Result execute(TopicAdmin admin, Set topics) { public static void main(String[] args) throws IOException { var argument = org.astraea.argument.Argument.parse(new Argument(), args); - try (var admin = TopicAdmin.of(argument.props())) { + try (var admin = TopicAdmin.of(argument.bootstrapServers())) { var result = execute(admin, argument.topics.isEmpty() ? admin.topicNames() : argument.topics); TreeOutput.print(result, System.out); } diff --git a/app/src/main/java/org/astraea/topic/cost/PartitionScore.java b/app/src/main/java/org/astraea/topic/cost/PartitionScore.java index 9cae91b4b6..e4393f06c8 100644 --- a/app/src/main/java/org/astraea/topic/cost/PartitionScore.java +++ b/app/src/main/java/org/astraea/topic/cost/PartitionScore.java @@ -81,7 +81,7 @@ public static Map> execute( public static void main(String[] args) { var argument = org.astraea.argument.Argument.parse(new Argument(), args); - var admin = TopicAdmin.of(argument.brokers); + var admin = TopicAdmin.of(argument.bootstrapServers()); var score = execute(argument, admin); printScore(score, argument); } diff --git a/app/src/test/java/org/astraea/argument/ArgumentTest.java b/app/src/test/java/org/astraea/argument/ArgumentTest.java new file mode 100644 index 0000000000..b52fc952c5 --- /dev/null +++ b/app/src/test/java/org/astraea/argument/ArgumentTest.java @@ -0,0 +1,53 @@ +package org.astraea.argument; + +import java.io.BufferedWriter; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ArgumentTest { + + @Test + void testFilterEmpty() { + var args0 = Argument.filterEmpty(new String[] {"", " ", " "}); + Assertions.assertEquals(0, args0.length); + + var args1 = Argument.filterEmpty(new String[] {"", "1 ", " "}); + Assertions.assertEquals(1, args1.length); + Assertions.assertEquals("1", args1[0]); + + var args2 = Argument.filterEmpty(new String[] {"", " 1", " 2 "}); + Assertions.assertEquals(2, args2.length); + Assertions.assertEquals("1", args2[0]); + Assertions.assertEquals("2", args2[1]); + } + + @Test + void testCommonProperties() throws IOException { + var file = Files.createTempFile("test_basic_argument", ""); + try (var output = new BufferedWriter(new FileWriter(file.toFile()))) { + output.write("key1=value1"); + output.newLine(); + output.write("key2=value2"); + } + var argument = + Argument.parse( + new DumbArgument(), + new String[] { + "--bootstrap.servers", "abc", "--prop.file", file.toString(), "--configs", "a=b" + }); + Assertions.assertEquals(1, argument.configs.size()); + Assertions.assertEquals("b", argument.configs.get("a")); + Assertions.assertEquals(file.toString(), argument.propFile); + // 2 (from prop file) + 1 (from configs) + 1 (bootstrap servers) + Assertions.assertEquals(4, argument.configs().size()); + Assertions.assertEquals("abc", argument.bootstrapServers); + Assertions.assertEquals("abc", argument.bootstrapServers()); + Assertions.assertEquals("value1", argument.configs().get("key1")); + Assertions.assertEquals("value2", argument.configs().get("key2")); + } + + private static class DumbArgument extends Argument {} +} diff --git a/app/src/test/java/org/astraea/argument/BasicArgumentTest.java b/app/src/test/java/org/astraea/argument/BasicArgumentTest.java deleted file mode 100644 index f5fcafabbe..0000000000 --- a/app/src/test/java/org/astraea/argument/BasicArgumentTest.java +++ /dev/null @@ -1,30 +0,0 @@ -package org.astraea.argument; - -import java.io.BufferedWriter; -import java.io.FileWriter; -import java.io.IOException; -import java.nio.file.Files; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -public class BasicArgumentTest { - - @Test - void testCommonProperties() throws IOException { - var file = Files.createTempFile("test_basic_argument", ""); - try (var output = new BufferedWriter(new FileWriter(file.toFile()))) { - output.write("key1=value1"); - output.newLine(); - output.write("key2=value2"); - } - var argument = - org.astraea.argument.Argument.parse( - new DumbArgument(), new String[] {"--bootstrap.servers", "abc"}); - Assertions.assertEquals(3, argument.properties(file.toString()).size()); - Assertions.assertEquals("abc", argument.brokers); - Assertions.assertEquals("value1", argument.properties(file.toString()).get("key1").toString()); - Assertions.assertEquals("value2", argument.properties(file.toString()).get("key2").toString()); - } - - private static class DumbArgument extends Argument {} -} diff --git a/app/src/test/java/org/astraea/consumer/ConsumerTest.java b/app/src/test/java/org/astraea/consumer/ConsumerTest.java index 2277550efe..52c66dfc96 100644 --- a/app/src/test/java/org/astraea/consumer/ConsumerTest.java +++ b/app/src/test/java/org/astraea/consumer/ConsumerTest.java @@ -16,7 +16,7 @@ public class ConsumerTest extends RequireBrokerCluster { private static void produceData(String topic, int size) { - try (var producer = Producer.builder().brokers(bootstrapServers()).build()) { + try (var producer = Producer.builder().bootstrapServers(bootstrapServers()).build()) { IntStream.range(0, size) .forEach( i -> @@ -37,7 +37,7 @@ void testFromBeginning() { try (var consumer = Consumer.builder() .topics(Set.of(topic)) - .brokers(bootstrapServers()) + .bootstrapServers(bootstrapServers()) .fromBeginning() .build()) { @@ -51,7 +51,11 @@ void testFromLatest() { var topic = "testFromLatest"; produceData(topic, 1); try (var consumer = - Consumer.builder().topics(Set.of(topic)).brokers(bootstrapServers()).fromLatest().build()) { + Consumer.builder() + .topics(Set.of(topic)) + .bootstrapServers(bootstrapServers()) + .fromLatest() + .build()) { Assertions.assertEquals(0, consumer.poll(Duration.ofSeconds(3)).size()); } @@ -62,7 +66,11 @@ void testFromLatest() { void testWakeup() throws InterruptedException { var topic = "testWakeup"; try (var consumer = - Consumer.builder().topics(Set.of(topic)).brokers(bootstrapServers()).fromLatest().build()) { + Consumer.builder() + .topics(Set.of(topic)) + .bootstrapServers(bootstrapServers()) + .fromLatest() + .build()) { var service = Executors.newSingleThreadExecutor(); service.execute( () -> { @@ -92,7 +100,7 @@ void testGroupId() { try (var consumer = Consumer.builder() .topics(Set.of(topic)) - .brokers(bootstrapServers()) + .bootstrapServers(bootstrapServers()) .fromBeginning() .groupId(id) .build()) { diff --git a/app/src/test/java/org/astraea/consumer/RebalanceListenerTest.java b/app/src/test/java/org/astraea/consumer/RebalanceListenerTest.java index a573747c0a..8cf6ac55bc 100644 --- a/app/src/test/java/org/astraea/consumer/RebalanceListenerTest.java +++ b/app/src/test/java/org/astraea/consumer/RebalanceListenerTest.java @@ -16,7 +16,7 @@ void testConsumerRebalanceListener() { var topicName = "testRebalanceListener-" + System.currentTimeMillis(); try (var consumer = Consumer.builder() - .brokers(bootstrapServers()) + .bootstrapServers(bootstrapServers()) .topics(Set.of(topicName)) .consumerRebalanceListener(ignore -> getAssignment.incrementAndGet()) .build()) { diff --git a/app/src/test/java/org/astraea/performance/PerformanceTest.java b/app/src/test/java/org/astraea/performance/PerformanceTest.java index 3d6db03971..e4368181a5 100644 --- a/app/src/test/java/org/astraea/performance/PerformanceTest.java +++ b/app/src/test/java/org/astraea/performance/PerformanceTest.java @@ -85,7 +85,10 @@ void testConsumerExecutor() throws InterruptedException, ExecutionException { param.sizeDistributionType = DistributionType.FIXED; try (Executor executor = Performance.consumerExecutor( - Consumer.builder().topics(Set.of(topicName)).brokers(bootstrapServers()).build(), + Consumer.builder() + .topics(Set.of(topicName)) + .bootstrapServers(bootstrapServers()) + .build(), metrics, new Manager(param, List.of(), List.of()), () -> false)) { @@ -94,7 +97,7 @@ void testConsumerExecutor() throws InterruptedException, ExecutionException { Assertions.assertEquals(0, metrics.num()); Assertions.assertEquals(0, metrics.bytes()); - try (var producer = Producer.builder().brokers(bootstrapServers()).build()) { + try (var producer = Producer.builder().bootstrapServers(bootstrapServers()).build()) { producer.sender().topic(topicName).value(new byte[1024]).run().toCompletableFuture().get(); } executor.execute(); @@ -146,7 +149,7 @@ void testArgument() { }; var arg = org.astraea.argument.Argument.parse(new Performance.Argument(), arguments1); - Assertions.assertEquals("value", arg.configs.get("key")); + Assertions.assertEquals("value", arg.configs().get("key")); String[] arguments2 = {"--bootstrap.servers", "localhost:9092", "--topic", ""}; Assertions.assertThrows( diff --git a/app/src/test/java/org/astraea/performance/ProducerExecutorTest.java b/app/src/test/java/org/astraea/performance/ProducerExecutorTest.java index 909b2a659a..21cafc4fb6 100644 --- a/app/src/test/java/org/astraea/performance/ProducerExecutorTest.java +++ b/app/src/test/java/org/astraea/performance/ProducerExecutorTest.java @@ -123,7 +123,7 @@ private static Stream offerProducerExecutors() { ProducerExecutor.of( normalTopic, 1, - Producer.builder().brokers(bootstrapServers()).build(), + Producer.builder().bootstrapServers(bootstrapServers()).build(), new Observer(), new MyPartitionSupplier(), new MyDataSupplier()))), @@ -133,7 +133,7 @@ private static Stream offerProducerExecutors() { ProducerExecutor.of( transactionalTopic, 10, - Producer.builder().brokers(bootstrapServers()).buildTransactional(), + Producer.builder().bootstrapServers(bootstrapServers()).buildTransactional(), new Observer(), new MyPartitionSupplier(), new MyDataSupplier())))); diff --git a/app/src/test/java/org/astraea/producer/ProducerTest.java b/app/src/test/java/org/astraea/producer/ProducerTest.java index 1717792133..c444da5942 100644 --- a/app/src/test/java/org/astraea/producer/ProducerTest.java +++ b/app/src/test/java/org/astraea/producer/ProducerTest.java @@ -32,7 +32,10 @@ void testSender() throws ExecutionException, InterruptedException { var timestamp = System.currentTimeMillis() + 10; var header = Header.of("a", "b".getBytes()); try (var producer = - Producer.builder().brokers(bootstrapServers()).keySerializer(Serializer.STRING).build()) { + Producer.builder() + .bootstrapServers(bootstrapServers()) + .keySerializer(Serializer.STRING) + .build()) { Assertions.assertFalse(producer.transactional()); var metadata = producer @@ -50,7 +53,7 @@ void testSender() throws ExecutionException, InterruptedException { try (var consumer = Consumer.builder() - .brokers(bootstrapServers()) + .bootstrapServers(bootstrapServers()) .fromBeginning() .topics(Set.of(topicName)) .keyDeserializer(Deserializer.STRING) @@ -75,7 +78,7 @@ void testTransaction() { var header = Header.of("a", "b".getBytes()); try (var producer = Producer.builder() - .brokers(bootstrapServers()) + .bootstrapServers(bootstrapServers()) .keySerializer(Serializer.STRING) .buildTransactional()) { Assertions.assertTrue(producer.transactional()); @@ -94,7 +97,7 @@ void testTransaction() { try (var consumer = Consumer.builder() - .brokers(bootstrapServers()) + .bootstrapServers(bootstrapServers()) .fromBeginning() .topics(Set.of(topicName)) .keyDeserializer(Deserializer.STRING) @@ -108,7 +111,8 @@ void testTransaction() { @SuppressWarnings("unchecked") @Test void testInvalidSender() { - try (var producer = Producer.builder().brokers(bootstrapServers()).buildTransactional()) { + try (var producer = + Producer.builder().bootstrapServers(bootstrapServers()).buildTransactional()) { Assertions.assertThrows( IllegalArgumentException.class, () -> producer.send(List.of((Sender) Mockito.mock(Sender.class)))); @@ -125,7 +129,7 @@ void testSingleSend(Producer producer) try (var consumer = Consumer.builder() - .brokers(bootstrapServers()) + .bootstrapServers(bootstrapServers()) .fromBeginning() .topics(Set.of(topic)) .isolation( @@ -152,7 +156,7 @@ void testMultiplesSend(Producer producer) throws InterruptedExce try (var consumer = Consumer.builder() - .brokers(bootstrapServers()) + .bootstrapServers(bootstrapServers()) .fromBeginning() .topics(Set.of(topic)) .isolation( @@ -165,10 +169,12 @@ void testMultiplesSend(Producer producer) throws InterruptedExce private static Stream offerProducers() { return Stream.of( Arguments.of( - Named.of("normal producer", Producer.builder().brokers(bootstrapServers()).build())), + Named.of( + "normal producer", + Producer.builder().bootstrapServers(bootstrapServers()).build())), Arguments.of( Named.of( "transactional producer", - Producer.builder().brokers(bootstrapServers()).buildTransactional()))); + Producer.builder().bootstrapServers(bootstrapServers()).buildTransactional()))); } } diff --git a/app/src/test/java/org/astraea/topic/ReplicaCollieTest.java b/app/src/test/java/org/astraea/topic/ReplicaCollieTest.java index 8679633a72..aa502e33e7 100644 --- a/app/src/test/java/org/astraea/topic/ReplicaCollieTest.java +++ b/app/src/test/java/org/astraea/topic/ReplicaCollieTest.java @@ -2,7 +2,6 @@ import static org.junit.jupiter.api.condition.OS.WINDOWS; -import java.io.IOException; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -12,6 +11,7 @@ import java.util.stream.Collectors; import org.apache.kafka.common.TopicPartition; import org.astraea.Utils; +import org.astraea.argument.Argument; import org.astraea.service.RequireBrokerCluster; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -21,19 +21,19 @@ public class ReplicaCollieTest extends RequireBrokerCluster { @Test @DisabledOnOs(WINDOWS) - void testVerify() throws IOException, InterruptedException { + void testVerify() throws InterruptedException { test(true); } @Test @DisabledOnOs(WINDOWS) - void testExecute() throws IOException, InterruptedException { + void testExecute() throws InterruptedException { test(false); } @Test @DisabledOnOs(WINDOWS) - void testBrokerMigrator() throws IOException, InterruptedException { + void testBrokerMigrator() throws InterruptedException { var topicName = "ReplicaCollieTest-Broker"; try (var topicAdmin = TopicAdmin.of(bootstrapServers())) { topicAdmin @@ -75,7 +75,7 @@ void testBrokerMigrator() throws IOException, InterruptedException { @Test @DisabledOnOs(WINDOWS) - void testPathMigrator() throws IOException, InterruptedException { + void testPathMigrator() throws InterruptedException { var topicName = "ReplicaCollieTest-Path"; try (var topicAdmin = TopicAdmin.of(bootstrapServers())) { topicAdmin @@ -118,7 +118,7 @@ void testPathMigrator() throws IOException, InterruptedException { } } - private void test(boolean verify) throws IOException, InterruptedException { + private void test(boolean verify) throws InterruptedException { var topicName = "ReplicaCollieTest-" + verify; try (var topicAdmin = TopicAdmin.of(bootstrapServers())) { topicAdmin @@ -141,13 +141,25 @@ private void test(boolean verify) throws IOException, InterruptedException { topicAdmin.brokerIds().stream() .filter(b -> !badBroker.contains(b)) .collect(Collectors.toList()); - var argument = new ReplicaCollie.Argument(); - argument.fromBrokers = badBroker; - argument.toBrokers = targetBroker.subList(0, 1); - argument.brokers = bootstrapServers(); - argument.topics = Set.of(topicName); - argument.partitions = Set.of(0); - argument.verify = verify; + + var argument = + Argument.parse( + new ReplicaCollie.Argument(), + new String[] { + "--from", + badBroker.stream().map(String::valueOf).collect(Collectors.joining(",")), + "--to", + targetBroker.subList(0, 1).stream() + .map(String::valueOf) + .collect(Collectors.joining(",")), + "--bootstrap.servers", + bootstrapServers(), + "--topics", + topicName, + "--partitions", + "0", + verify ? "--verify" : "" + }); var result = ReplicaCollie.execute(topicAdmin, argument); var assignment = result.get(new TopicPartition(topicName, 0)); Assertions.assertEquals(badBroker, assignment.brokerSource); diff --git a/app/src/test/java/org/astraea/topic/TopicAdminTest.java b/app/src/test/java/org/astraea/topic/TopicAdminTest.java index 4f29ce778b..aa3768d477 100644 --- a/app/src/test/java/org/astraea/topic/TopicAdminTest.java +++ b/app/src/test/java/org/astraea/topic/TopicAdminTest.java @@ -141,7 +141,7 @@ void testConsumerGroups() throws InterruptedException { try (var topicAdmin = TopicAdmin.of(bootstrapServers())) { topicAdmin.creator().topic(topicName).numberOfPartitions(3).create(); Consumer.builder() - .brokers(bootstrapServers()) + .bootstrapServers(bootstrapServers()) .topics(Set.of(topicName)) .groupId(consumerGroup) .build(); @@ -265,7 +265,7 @@ void testMigrateAllPartitions() throws InterruptedException { void testReplicaSize() throws ExecutionException, InterruptedException { var topicName = "testReplicaSize"; try (var topicAdmin = TopicAdmin.of(bootstrapServers()); - var producer = Producer.builder().brokers(bootstrapServers()).build()) { + var producer = Producer.builder().bootstrapServers(bootstrapServers()).build()) { producer.sender().topic(topicName).key(new byte[100]).run().toCompletableFuture().get(); var originSize = topicAdmin @@ -306,7 +306,7 @@ void testCompact() throws InterruptedException { Producer.builder() .keySerializer(Serializer.STRING) .valueSerializer(Serializer.STRING) - .brokers(bootstrapServers()) + .bootstrapServers(bootstrapServers()) .build()) { IntStream.range(0, 10) .forEach(i -> producer.sender().key(key).value(value).topic(topicName).run()); @@ -328,7 +328,7 @@ void testCompact() throws InterruptedException { .keyDeserializer(Deserializer.STRING) .valueDeserializer(Deserializer.STRING) .fromBeginning() - .brokers(bootstrapServers()) + .bootstrapServers(bootstrapServers()) .topics(Set.of(topicName)) .build()) { diff --git a/app/src/test/java/org/astraea/topic/TopicExplorerTest.java b/app/src/test/java/org/astraea/topic/TopicExplorerTest.java index 6edefa65f2..d57017548b 100644 --- a/app/src/test/java/org/astraea/topic/TopicExplorerTest.java +++ b/app/src/test/java/org/astraea/topic/TopicExplorerTest.java @@ -58,7 +58,7 @@ public Closeable start() { () -> { var consumer = Consumer.builder() - .brokers(bootstrapServers()) + .bootstrapServers(bootstrapServers()) .topics(topics) .groupId(groupName) .configs( diff --git a/app/src/test/java/org/astraea/topic/cost/GetPartitionInfTest.java b/app/src/test/java/org/astraea/topic/cost/GetPartitionInfTest.java index 5195377983..1c77011191 100644 --- a/app/src/test/java/org/astraea/topic/cost/GetPartitionInfTest.java +++ b/app/src/test/java/org/astraea/topic/cost/GetPartitionInfTest.java @@ -49,7 +49,10 @@ static void setup() throws ExecutionException, InterruptedException { e.printStackTrace(); } var producer = - Producer.builder().brokers(bootstrapServers()).keySerializer(Serializer.STRING).build(); + Producer.builder() + .bootstrapServers(bootstrapServers()) + .keySerializer(Serializer.STRING) + .build(); int size = 10000; for (int t = 0; t <= 2; t++) { for (int p = 0; p <= 3; p++) { diff --git a/app/src/test/java/org/astraea/topic/cost/PartitionScoreTest.java b/app/src/test/java/org/astraea/topic/cost/PartitionScoreTest.java index bca1a74455..cad9bfffa5 100644 --- a/app/src/test/java/org/astraea/topic/cost/PartitionScoreTest.java +++ b/app/src/test/java/org/astraea/topic/cost/PartitionScoreTest.java @@ -48,7 +48,10 @@ static void setup() throws ExecutionException, InterruptedException { e.printStackTrace(); } var producer = - Producer.builder().brokers(bootstrapServers()).keySerializer(Serializer.STRING).build(); + Producer.builder() + .bootstrapServers(bootstrapServers()) + .keySerializer(Serializer.STRING) + .build(); int size = 10000; for (int t = 0; t <= 1; t++) { for (int p = 0; p <= 3; p++) {