From 90e834d8a8af65fa649e79a21776090bb5b39cef Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Thu, 13 Oct 2022 13:16:05 +0800 Subject: [PATCH 1/3] add 3 more tests for AsyncAdmin new tests: testCreator, testPartitions and testConsumerGroups --- .../astraea/common/admin/AsyncAdminTest.java | 129 ++++++++++++++++++ 1 file changed, 129 insertions(+) diff --git a/common/src/test/java/org/astraea/common/admin/AsyncAdminTest.java b/common/src/test/java/org/astraea/common/admin/AsyncAdminTest.java index d906d16ed9..e9b5049694 100644 --- a/common/src/test/java/org/astraea/common/admin/AsyncAdminTest.java +++ b/common/src/test/java/org/astraea/common/admin/AsyncAdminTest.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -26,6 +27,8 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import org.astraea.common.Utils; +import org.astraea.common.consumer.Consumer; +import org.astraea.common.consumer.ConsumerConfigs; import org.astraea.it.RequireBrokerCluster; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -276,4 +279,130 @@ void testMigrateToOtherFolders() throws ExecutionException, InterruptedException } } } + + @Test + void testCreator() throws ExecutionException, InterruptedException { + var topic = Utils.randomString(); + try (var admin = AsyncAdmin.of(bootstrapServers())) { + admin + .creator() + .topic(topic) + .configs(Map.of(TopicConfigs.COMPRESSION_TYPE_CONFIG, "lz4")) + .run() + .toCompletableFuture() + .get(); + Utils.sleep(Duration.ofSeconds(2)); + + var config = admin.topics(Set.of(topic)).toCompletableFuture().get().get(0).config(); + config.raw().keySet().forEach(key -> Assertions.assertTrue(config.value(key).isPresent())); + Assertions.assertTrue(config.raw().containsValue("lz4")); + } + } + + @Test + void testPartitions() throws ExecutionException, InterruptedException { + var topic = Utils.randomString(); + try (var admin = AsyncAdmin.of(bootstrapServers())) { + var before = + brokerIds().stream() + .mapToInt( + id -> + Utils.packException( + () -> + admin + .topicPartitionReplicas(Set.of(id)) + .toCompletableFuture() + .get() + .size())) + .sum(); + + admin.creator().topic(topic).numberOfPartitions(10).run().toCompletableFuture().get(); + // wait for syncing topic creation + Utils.sleep(Duration.ofSeconds(5)); + Assertions.assertTrue(admin.topicNames(true).toCompletableFuture().get().contains(topic)); + var partitions = admin.replicas(Set.of(topic)).toCompletableFuture().get(); + Assertions.assertEquals(10, partitions.size()); + var logFolders = + logFolders().values().stream().flatMap(Collection::stream).collect(Collectors.toSet()); + partitions.forEach( + replica -> + Assertions.assertTrue(logFolders.stream().anyMatch(replica.dataFolder()::contains))); + brokerIds() + .forEach( + id -> { + try { + Assertions.assertNotEquals( + 0, + admin.topicPartitionReplicas(Set.of(id)).toCompletableFuture().get().size()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + + var after = + brokerIds().stream() + .mapToInt( + id -> + Utils.packException( + () -> + admin + .topicPartitionReplicas(Set.of(id)) + .toCompletableFuture() + .get() + .size())) + .sum(); + + Assertions.assertEquals(before + 10, after); + } + } + + @Test + void testConsumerGroups() throws ExecutionException, InterruptedException { + var topic = Utils.randomString(); + var consumerGroup = Utils.randomString(); + try (var admin = AsyncAdmin.of(bootstrapServers())) { + admin.creator().topic(topic).numberOfPartitions(3).run().toCompletableFuture().get(); + try (var c1 = + Consumer.forTopics(Set.of(topic)) + .bootstrapServers(bootstrapServers()) + .config(ConsumerConfigs.GROUP_ID_CONFIG, consumerGroup) + .build()) { + // wait for syncing topic creation + Utils.sleep(Duration.ofSeconds(5)); + var consumerGroupMap = + admin.consumerGroups(Set.of(consumerGroup)).toCompletableFuture().get(); + Assertions.assertEquals(1, consumerGroupMap.size()); + Assertions.assertTrue( + consumerGroupMap.stream().anyMatch(cg -> cg.groupId().equals(consumerGroup))); + + try (var c2 = + Consumer.forTopics(Set.of(topic)) + .bootstrapServers(bootstrapServers()) + .config(ConsumerConfigs.GROUP_ID_CONFIG, "abc") + .build()) { + var count = + admin.consumerGroupIds().toCompletableFuture().get().stream() + .mapToInt( + t -> + Utils.packException( + () -> + admin + .consumerGroups(Set.of(t)) + .toCompletableFuture() + .get() + .size())) + .sum(); + Assertions.assertEquals( + count, + admin + .consumerGroups(admin.consumerGroupIds().toCompletableFuture().get()) + .toCompletableFuture() + .get() + .size()); + Assertions.assertEquals( + 1, admin.consumerGroups(Set.of("abc")).toCompletableFuture().get().size()); + } + } + } + } } From dcbb8ecc18246d18af5851239322efc376bd01ff Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Thu, 13 Oct 2022 13:17:00 +0800 Subject: [PATCH 2/3] add 5 more tests about Migration, Compact and Broker. new tests: testMigrateSinglePartition, testIllegalMigrationArgument, testCompact, testBrokers and testBrokerFolders --- .../astraea/common/admin/AsyncAdminTest.java | 198 ++++++++++++++++++ 1 file changed, 198 insertions(+) diff --git a/common/src/test/java/org/astraea/common/admin/AsyncAdminTest.java b/common/src/test/java/org/astraea/common/admin/AsyncAdminTest.java index e9b5049694..dac6ce8210 100644 --- a/common/src/test/java/org/astraea/common/admin/AsyncAdminTest.java +++ b/common/src/test/java/org/astraea/common/admin/AsyncAdminTest.java @@ -26,9 +26,14 @@ import java.util.SortedSet; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import java.util.stream.IntStream; + import org.astraea.common.Utils; import org.astraea.common.consumer.Consumer; import org.astraea.common.consumer.ConsumerConfigs; +import org.astraea.common.consumer.Deserializer; +import org.astraea.common.producer.Producer; +import org.astraea.common.producer.Serializer; import org.astraea.it.RequireBrokerCluster; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -405,4 +410,197 @@ void testConsumerGroups() throws ExecutionException, InterruptedException { } } } + + @Test + void testMigrateSinglePartition() throws ExecutionException, InterruptedException { + var topic = Utils.randomString(); + try (var admin = AsyncAdmin.of(bootstrapServers())) { + admin.creator().topic(topic).numberOfPartitions(1).run().toCompletableFuture().get(); + + Utils.sleep(Duration.ofSeconds(5)); + var broker = brokerIds().iterator().next(); + admin.moveToBrokers(Map.of(TopicPartition.of(topic, 0), List.of(broker))); + + Utils.waitFor( + () -> + Utils.packException( + () -> { + var partitionReplicas = + admin.replicas(Set.of(topic)).toCompletableFuture().get(); + return partitionReplicas.size() == 1 + && partitionReplicas.get(0).nodeInfo().id() == broker; + })); + + var currentBroker = + admin.replicas(Set.of(topic)).toCompletableFuture().get().stream() + .filter(replica -> replica.partition() == 0) + .findFirst() + .get() + .nodeInfo() + .id(); + var allPath = admin.brokerFolders().toCompletableFuture().get(); + var otherPath = + allPath.get(currentBroker).stream() + .filter( + i -> + Utils.packException( + () -> + !i.contains( + admin.replicas(Set.of(topic)).toCompletableFuture().get().stream() + .filter(replica -> replica.partition() == 0) + .findFirst() + .get() + .dataFolder()))) + .collect(Collectors.toSet()); + + admin.moveToFolders( + Map.of(TopicPartitionReplica.of(topic, 0, currentBroker), otherPath.iterator().next())); + Utils.waitFor( + () -> + Utils.packException( + () -> { + var partitionReplicas = + admin.replicas(Set.of(topic)).toCompletableFuture().get(); + return partitionReplicas.size() == 1 + && partitionReplicas + .get(0) + .dataFolder() + .equals(otherPath.iterator().next()); + })); + } + } + + @Test + void testIllegalMigrationArgument() throws ExecutionException, InterruptedException { + var topic = Utils.randomString(); + var topicParition = TopicPartition.of(topic, 0); + try (var admin = AsyncAdmin.of(bootstrapServers())) { + admin + .creator() + .topic(topic) + .numberOfPartitions(1) + .numberOfReplicas((short) 1) + .run() + .toCompletableFuture() + .get(); + Utils.sleep(Duration.ofSeconds(1)); + var currentReplica = + admin.replicas(Set.of(topic)).toCompletableFuture().get().stream() + .filter(replica -> replica.partition() == topicParition.partition()) + .findFirst() + .get(); + + var currentBroker = currentReplica.nodeInfo().id(); + var notExistReplica = (currentBroker + 1) % brokerIds().size(); + var nextDir = logFolders().get(notExistReplica).iterator().next(); + + Assertions.assertThrows( + ExecutionException.class, + () -> + admin + .moveToFolders(Map.of(TopicPartitionReplica.of(topic, 0, currentBroker), nextDir)) + .toCompletableFuture() + .get()); + } + } + + @Test + void testCompact() throws ExecutionException, InterruptedException { + var topic = Utils.randomString(); + try (var admin = AsyncAdmin.of(bootstrapServers())) { + admin + .creator() + .topic(topic) + .configs( + Map.of( + TopicConfigs.MAX_COMPACTION_LAG_MS_CONFIG, + "1000", + TopicConfigs.CLEANUP_POLICY_CONFIG, + TopicConfigs.CLEANUP_POLICY_COMPACT)) + .run() + .toCompletableFuture() + .get(); + + var key = "key"; + var anotherKey = "anotherKey"; + var value = "value"; + try (var producer = + Producer.builder() + .keySerializer(Serializer.STRING) + .valueSerializer(Serializer.STRING) + .bootstrapServers(bootstrapServers()) + .build()) { + IntStream.range(0, 10) + .forEach(i -> producer.sender().key(key).value(value).topic(topic).run()); + producer.flush(); + + Utils.sleep(Duration.ofSeconds(2)); + IntStream.range(0, 10) + .forEach(i -> producer.sender().key(anotherKey).value(value).topic(topic).run()); + producer.flush(); + } + + Utils.sleep(Duration.ofSeconds(3)); + + try (var consumer = + Consumer.forTopics(Set.of(topic)) + .keyDeserializer(Deserializer.STRING) + .valueDeserializer(Deserializer.STRING) + .config( + ConsumerConfigs.AUTO_OFFSET_RESET_CONFIG, + ConsumerConfigs.AUTO_OFFSET_RESET_EARLIEST) + .bootstrapServers(bootstrapServers()) + .build()) { + + var records = + IntStream.range(0, 5) + .mapToObj(i -> consumer.poll(Duration.ofSeconds(1))) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + Assertions.assertEquals( + 1, records.stream().filter(record -> record.key().equals(key)).count()); + + Assertions.assertEquals( + 10, records.stream().filter(record -> record.key().equals(anotherKey)).count()); + } + } + } + + @Test + void testBrokers() throws ExecutionException, InterruptedException { + try (var admin = AsyncAdmin.of(bootstrapServers())) { + admin + .creator() + .topic(Utils.randomString()) + .numberOfPartitions(6) + .run() + .toCompletableFuture() + .get(); + Utils.sleep(Duration.ofSeconds(2)); + var brokers = admin.brokers().toCompletableFuture().get(); + Assertions.assertEquals(3, brokers.size()); + brokers.forEach(broker -> Assertions.assertNotEquals(0, broker.config().raw().size())); + Assertions.assertEquals(1, brokers.stream().filter(Broker::isController).count()); + brokers.forEach( + broker -> Assertions.assertNotEquals(0, broker.topicPartitionLeaders().size())); + } + } + + @Test + void testBrokerFolders() throws ExecutionException, InterruptedException { + try (var admin = AsyncAdmin.of(bootstrapServers())) { + Assertions.assertEquals( + brokerIds().size(), admin.brokers().toCompletableFuture().get().size()); + // list all + logFolders() + .forEach( + (id, ds) -> + Utils.packException( + () -> + Assertions.assertEquals( + admin.brokerFolders().toCompletableFuture().get().get(id).size(), + ds.size()))); + } + } } From a86d9c8f94b2f693d797a3055aac28fa9682bdc8 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Thu, 13 Oct 2022 13:18:29 +0800 Subject: [PATCH 3/3] add 2 more tests about Replica. new tests: testReplicas and testReplicasPreferredLeaderFlag --- .../astraea/common/admin/AsyncAdminTest.java | 240 ++++++++++++------ 1 file changed, 161 insertions(+), 79 deletions(-) diff --git a/common/src/test/java/org/astraea/common/admin/AsyncAdminTest.java b/common/src/test/java/org/astraea/common/admin/AsyncAdminTest.java index dac6ce8210..d60d6eb8d4 100644 --- a/common/src/test/java/org/astraea/common/admin/AsyncAdminTest.java +++ b/common/src/test/java/org/astraea/common/admin/AsyncAdminTest.java @@ -25,9 +25,9 @@ import java.util.Set; import java.util.SortedSet; import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; - import org.astraea.common.Utils; import org.astraea.common.consumer.Consumer; import org.astraea.common.consumer.ConsumerConfigs; @@ -310,16 +310,16 @@ void testPartitions() throws ExecutionException, InterruptedException { try (var admin = AsyncAdmin.of(bootstrapServers())) { var before = brokerIds().stream() - .mapToInt( - id -> - Utils.packException( - () -> - admin - .topicPartitionReplicas(Set.of(id)) - .toCompletableFuture() - .get() - .size())) - .sum(); + .mapToInt( + id -> + Utils.packException( + () -> + admin + .topicPartitionReplicas(Set.of(id)) + .toCompletableFuture() + .get() + .size())) + .sum(); admin.creator().topic(topic).numberOfPartitions(10).run().toCompletableFuture().get(); // wait for syncing topic creation @@ -346,16 +346,16 @@ void testPartitions() throws ExecutionException, InterruptedException { var after = brokerIds().stream() - .mapToInt( - id -> - Utils.packException( - () -> - admin - .topicPartitionReplicas(Set.of(id)) - .toCompletableFuture() - .get() - .size())) - .sum(); + .mapToInt( + id -> + Utils.packException( + () -> + admin + .topicPartitionReplicas(Set.of(id)) + .toCompletableFuture() + .get() + .size())) + .sum(); Assertions.assertEquals(before + 10, after); } @@ -368,10 +368,10 @@ void testConsumerGroups() throws ExecutionException, InterruptedException { try (var admin = AsyncAdmin.of(bootstrapServers())) { admin.creator().topic(topic).numberOfPartitions(3).run().toCompletableFuture().get(); try (var c1 = - Consumer.forTopics(Set.of(topic)) - .bootstrapServers(bootstrapServers()) - .config(ConsumerConfigs.GROUP_ID_CONFIG, consumerGroup) - .build()) { + Consumer.forTopics(Set.of(topic)) + .bootstrapServers(bootstrapServers()) + .config(ConsumerConfigs.GROUP_ID_CONFIG, consumerGroup) + .build()) { // wait for syncing topic creation Utils.sleep(Duration.ofSeconds(5)); var consumerGroupMap = @@ -381,22 +381,22 @@ void testConsumerGroups() throws ExecutionException, InterruptedException { consumerGroupMap.stream().anyMatch(cg -> cg.groupId().equals(consumerGroup))); try (var c2 = - Consumer.forTopics(Set.of(topic)) - .bootstrapServers(bootstrapServers()) - .config(ConsumerConfigs.GROUP_ID_CONFIG, "abc") - .build()) { + Consumer.forTopics(Set.of(topic)) + .bootstrapServers(bootstrapServers()) + .config(ConsumerConfigs.GROUP_ID_CONFIG, "abc") + .build()) { var count = admin.consumerGroupIds().toCompletableFuture().get().stream() - .mapToInt( - t -> - Utils.packException( - () -> - admin - .consumerGroups(Set.of(t)) - .toCompletableFuture() - .get() - .size())) - .sum(); + .mapToInt( + t -> + Utils.packException( + () -> + admin + .consumerGroups(Set.of(t)) + .toCompletableFuture() + .get() + .size())) + .sum(); Assertions.assertEquals( count, admin @@ -433,25 +433,25 @@ void testMigrateSinglePartition() throws ExecutionException, InterruptedExceptio var currentBroker = admin.replicas(Set.of(topic)).toCompletableFuture().get().stream() - .filter(replica -> replica.partition() == 0) - .findFirst() - .get() - .nodeInfo() - .id(); + .filter(replica -> replica.partition() == 0) + .findFirst() + .get() + .nodeInfo() + .id(); var allPath = admin.brokerFolders().toCompletableFuture().get(); var otherPath = allPath.get(currentBroker).stream() - .filter( - i -> - Utils.packException( - () -> - !i.contains( - admin.replicas(Set.of(topic)).toCompletableFuture().get().stream() - .filter(replica -> replica.partition() == 0) - .findFirst() - .get() - .dataFolder()))) - .collect(Collectors.toSet()); + .filter( + i -> + Utils.packException( + () -> + !i.contains( + admin.replicas(Set.of(topic)).toCompletableFuture().get().stream() + .filter(replica -> replica.partition() == 0) + .findFirst() + .get() + .dataFolder()))) + .collect(Collectors.toSet()); admin.moveToFolders( Map.of(TopicPartitionReplica.of(topic, 0, currentBroker), otherPath.iterator().next())); @@ -463,9 +463,9 @@ void testMigrateSinglePartition() throws ExecutionException, InterruptedExceptio admin.replicas(Set.of(topic)).toCompletableFuture().get(); return partitionReplicas.size() == 1 && partitionReplicas - .get(0) - .dataFolder() - .equals(otherPath.iterator().next()); + .get(0) + .dataFolder() + .equals(otherPath.iterator().next()); })); } } @@ -486,9 +486,9 @@ void testIllegalMigrationArgument() throws ExecutionException, InterruptedExcept Utils.sleep(Duration.ofSeconds(1)); var currentReplica = admin.replicas(Set.of(topic)).toCompletableFuture().get().stream() - .filter(replica -> replica.partition() == topicParition.partition()) - .findFirst() - .get(); + .filter(replica -> replica.partition() == topicParition.partition()) + .findFirst() + .get(); var currentBroker = currentReplica.nodeInfo().id(); var notExistReplica = (currentBroker + 1) % brokerIds().size(); @@ -525,38 +525,38 @@ void testCompact() throws ExecutionException, InterruptedException { var anotherKey = "anotherKey"; var value = "value"; try (var producer = - Producer.builder() - .keySerializer(Serializer.STRING) - .valueSerializer(Serializer.STRING) - .bootstrapServers(bootstrapServers()) - .build()) { + Producer.builder() + .keySerializer(Serializer.STRING) + .valueSerializer(Serializer.STRING) + .bootstrapServers(bootstrapServers()) + .build()) { IntStream.range(0, 10) - .forEach(i -> producer.sender().key(key).value(value).topic(topic).run()); + .forEach(i -> producer.sender().key(key).value(value).topic(topic).run()); producer.flush(); Utils.sleep(Duration.ofSeconds(2)); IntStream.range(0, 10) - .forEach(i -> producer.sender().key(anotherKey).value(value).topic(topic).run()); + .forEach(i -> producer.sender().key(anotherKey).value(value).topic(topic).run()); producer.flush(); } Utils.sleep(Duration.ofSeconds(3)); try (var consumer = - Consumer.forTopics(Set.of(topic)) - .keyDeserializer(Deserializer.STRING) - .valueDeserializer(Deserializer.STRING) - .config( - ConsumerConfigs.AUTO_OFFSET_RESET_CONFIG, - ConsumerConfigs.AUTO_OFFSET_RESET_EARLIEST) - .bootstrapServers(bootstrapServers()) - .build()) { + Consumer.forTopics(Set.of(topic)) + .keyDeserializer(Deserializer.STRING) + .valueDeserializer(Deserializer.STRING) + .config( + ConsumerConfigs.AUTO_OFFSET_RESET_CONFIG, + ConsumerConfigs.AUTO_OFFSET_RESET_EARLIEST) + .bootstrapServers(bootstrapServers()) + .build()) { var records = IntStream.range(0, 5) - .mapToObj(i -> consumer.poll(Duration.ofSeconds(1))) - .flatMap(Collection::stream) - .collect(Collectors.toList()); + .mapToObj(i -> consumer.poll(Duration.ofSeconds(1))) + .flatMap(Collection::stream) + .collect(Collectors.toList()); Assertions.assertEquals( 1, records.stream().filter(record -> record.key().equals(key)).count()); @@ -603,4 +603,86 @@ void testBrokerFolders() throws ExecutionException, InterruptedException { ds.size()))); } } + + @Test + void testReplicas() throws ExecutionException, InterruptedException { + var topic = Utils.randomString(); + try (var admin = AsyncAdmin.of(bootstrapServers())) { + admin.creator().topic(topic).numberOfPartitions(2).run().toCompletableFuture().get(); + Utils.sleep(Duration.ofSeconds(2)); + Assertions.assertEquals( + 2, + admin.replicas(Set.of(topic)).toCompletableFuture().get().stream() + .collect( + Collectors.groupingBy( + replica -> TopicPartition.of(replica.topic(), replica.partition()))) + .size()); + + var count = + admin + .replicas(admin.topicNames(true).toCompletableFuture().get()) + .toCompletableFuture() + .get() + .stream() + .collect( + Collectors.groupingBy( + replica -> TopicPartition.of(replica.topic(), replica.partition()))) + .size(); + Assertions.assertEquals( + count, + admin + .replicas(admin.topicNames(true).toCompletableFuture().get()) + .toCompletableFuture() + .get() + .stream() + .collect( + Collectors.groupingBy( + replica -> TopicPartition.of(replica.topic(), replica.partition()))) + .size()); + } + } + + @Test + void testReplicasPreferredLeaderFlag() throws ExecutionException, InterruptedException { + var topic = Utils.randomString(); + var partitionCount = 10; + try (var admin = AsyncAdmin.of(bootstrapServers())) { + admin + .creator() + .topic(topic) + .numberOfPartitions(partitionCount) + .numberOfReplicas((short) 3) + .run() + .toCompletableFuture() + .get(); + + Utils.sleep(Duration.ofSeconds(3)); + + var expectedPreferredLeader = + IntStream.range(0, partitionCount) + .mapToObj(p -> TopicPartition.of(topic, p)) + .collect(Collectors.toUnmodifiableMap(p -> p, p -> List.of(0))); + + var currentPreferredLeader = + (Supplier>>) + () -> + Utils.packException( + () -> + admin.replicas(Set.of(topic)).toCompletableFuture().get().stream() + .filter(Replica::isPreferredLeader) + .collect( + Collectors.groupingBy( + replica -> + TopicPartition.of(replica.topic(), replica.partition()), + Collectors.mapping( + replica -> replica.nodeInfo().id(), + Collectors.toList())))); + + IntStream.range(0, partitionCount) + .forEach(p -> admin.moveToBrokers(Map.of(TopicPartition.of(topic, p), List.of(0, 1, 2)))); + Utils.sleep(Duration.ofSeconds(3)); + + Assertions.assertEquals(expectedPreferredLeader, currentPreferredLeader.get()); + } + } }