diff --git a/common/src/test/java/org/astraea/common/admin/AsyncAdminTest.java b/common/src/test/java/org/astraea/common/admin/AsyncAdminTest.java index d906d16ed9..d60d6eb8d4 100644 --- a/common/src/test/java/org/astraea/common/admin/AsyncAdminTest.java +++ b/common/src/test/java/org/astraea/common/admin/AsyncAdminTest.java @@ -18,14 +18,22 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.astraea.common.Utils; +import org.astraea.common.consumer.Consumer; +import org.astraea.common.consumer.ConsumerConfigs; +import org.astraea.common.consumer.Deserializer; +import org.astraea.common.producer.Producer; +import org.astraea.common.producer.Serializer; import org.astraea.it.RequireBrokerCluster; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -276,4 +284,405 @@ void testMigrateToOtherFolders() throws ExecutionException, InterruptedException } } } + + @Test + void testCreator() throws ExecutionException, InterruptedException { + var topic = Utils.randomString(); + try (var admin = AsyncAdmin.of(bootstrapServers())) { + admin + .creator() + .topic(topic) + .configs(Map.of(TopicConfigs.COMPRESSION_TYPE_CONFIG, "lz4")) + .run() + .toCompletableFuture() + .get(); + Utils.sleep(Duration.ofSeconds(2)); + + var config = admin.topics(Set.of(topic)).toCompletableFuture().get().get(0).config(); + config.raw().keySet().forEach(key -> Assertions.assertTrue(config.value(key).isPresent())); + Assertions.assertTrue(config.raw().containsValue("lz4")); + } + } + + @Test + void testPartitions() throws ExecutionException, InterruptedException { + var topic = Utils.randomString(); + try (var admin = AsyncAdmin.of(bootstrapServers())) { + var before = + brokerIds().stream() + .mapToInt( + id -> + Utils.packException( + () -> + admin + .topicPartitionReplicas(Set.of(id)) + .toCompletableFuture() + .get() + .size())) + .sum(); + + admin.creator().topic(topic).numberOfPartitions(10).run().toCompletableFuture().get(); + // wait for syncing topic creation + Utils.sleep(Duration.ofSeconds(5)); + Assertions.assertTrue(admin.topicNames(true).toCompletableFuture().get().contains(topic)); + var partitions = admin.replicas(Set.of(topic)).toCompletableFuture().get(); + Assertions.assertEquals(10, partitions.size()); + var logFolders = + logFolders().values().stream().flatMap(Collection::stream).collect(Collectors.toSet()); + partitions.forEach( + replica -> + Assertions.assertTrue(logFolders.stream().anyMatch(replica.dataFolder()::contains))); + brokerIds() + .forEach( + id -> { + try { + Assertions.assertNotEquals( + 0, + admin.topicPartitionReplicas(Set.of(id)).toCompletableFuture().get().size()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + + var after = + brokerIds().stream() + .mapToInt( + id -> + Utils.packException( + () -> + admin + .topicPartitionReplicas(Set.of(id)) + .toCompletableFuture() + .get() + .size())) + .sum(); + + Assertions.assertEquals(before + 10, after); + } + } + + @Test + void testConsumerGroups() throws ExecutionException, InterruptedException { + var topic = Utils.randomString(); + var consumerGroup = Utils.randomString(); + try (var admin = AsyncAdmin.of(bootstrapServers())) { + admin.creator().topic(topic).numberOfPartitions(3).run().toCompletableFuture().get(); + try (var c1 = + Consumer.forTopics(Set.of(topic)) + .bootstrapServers(bootstrapServers()) + .config(ConsumerConfigs.GROUP_ID_CONFIG, consumerGroup) + .build()) { + // wait for syncing topic creation + Utils.sleep(Duration.ofSeconds(5)); + var consumerGroupMap = + admin.consumerGroups(Set.of(consumerGroup)).toCompletableFuture().get(); + Assertions.assertEquals(1, consumerGroupMap.size()); + Assertions.assertTrue( + consumerGroupMap.stream().anyMatch(cg -> cg.groupId().equals(consumerGroup))); + + try (var c2 = + Consumer.forTopics(Set.of(topic)) + .bootstrapServers(bootstrapServers()) + .config(ConsumerConfigs.GROUP_ID_CONFIG, "abc") + .build()) { + var count = + admin.consumerGroupIds().toCompletableFuture().get().stream() + .mapToInt( + t -> + Utils.packException( + () -> + admin + .consumerGroups(Set.of(t)) + .toCompletableFuture() + .get() + .size())) + .sum(); + Assertions.assertEquals( + count, + admin + .consumerGroups(admin.consumerGroupIds().toCompletableFuture().get()) + .toCompletableFuture() + .get() + .size()); + Assertions.assertEquals( + 1, admin.consumerGroups(Set.of("abc")).toCompletableFuture().get().size()); + } + } + } + } + + @Test + void testMigrateSinglePartition() throws ExecutionException, InterruptedException { + var topic = Utils.randomString(); + try (var admin = AsyncAdmin.of(bootstrapServers())) { + admin.creator().topic(topic).numberOfPartitions(1).run().toCompletableFuture().get(); + + Utils.sleep(Duration.ofSeconds(5)); + var broker = brokerIds().iterator().next(); + admin.moveToBrokers(Map.of(TopicPartition.of(topic, 0), List.of(broker))); + + Utils.waitFor( + () -> + Utils.packException( + () -> { + var partitionReplicas = + admin.replicas(Set.of(topic)).toCompletableFuture().get(); + return partitionReplicas.size() == 1 + && partitionReplicas.get(0).nodeInfo().id() == broker; + })); + + var currentBroker = + admin.replicas(Set.of(topic)).toCompletableFuture().get().stream() + .filter(replica -> replica.partition() == 0) + .findFirst() + .get() + .nodeInfo() + .id(); + var allPath = admin.brokerFolders().toCompletableFuture().get(); + var otherPath = + allPath.get(currentBroker).stream() + .filter( + i -> + Utils.packException( + () -> + !i.contains( + admin.replicas(Set.of(topic)).toCompletableFuture().get().stream() + .filter(replica -> replica.partition() == 0) + .findFirst() + .get() + .dataFolder()))) + .collect(Collectors.toSet()); + + admin.moveToFolders( + Map.of(TopicPartitionReplica.of(topic, 0, currentBroker), otherPath.iterator().next())); + Utils.waitFor( + () -> + Utils.packException( + () -> { + var partitionReplicas = + admin.replicas(Set.of(topic)).toCompletableFuture().get(); + return partitionReplicas.size() == 1 + && partitionReplicas + .get(0) + .dataFolder() + .equals(otherPath.iterator().next()); + })); + } + } + + @Test + void testIllegalMigrationArgument() throws ExecutionException, InterruptedException { + var topic = Utils.randomString(); + var topicParition = TopicPartition.of(topic, 0); + try (var admin = AsyncAdmin.of(bootstrapServers())) { + admin + .creator() + .topic(topic) + .numberOfPartitions(1) + .numberOfReplicas((short) 1) + .run() + .toCompletableFuture() + .get(); + Utils.sleep(Duration.ofSeconds(1)); + var currentReplica = + admin.replicas(Set.of(topic)).toCompletableFuture().get().stream() + .filter(replica -> replica.partition() == topicParition.partition()) + .findFirst() + .get(); + + var currentBroker = currentReplica.nodeInfo().id(); + var notExistReplica = (currentBroker + 1) % brokerIds().size(); + var nextDir = logFolders().get(notExistReplica).iterator().next(); + + Assertions.assertThrows( + ExecutionException.class, + () -> + admin + .moveToFolders(Map.of(TopicPartitionReplica.of(topic, 0, currentBroker), nextDir)) + .toCompletableFuture() + .get()); + } + } + + @Test + void testCompact() throws ExecutionException, InterruptedException { + var topic = Utils.randomString(); + try (var admin = AsyncAdmin.of(bootstrapServers())) { + admin + .creator() + .topic(topic) + .configs( + Map.of( + TopicConfigs.MAX_COMPACTION_LAG_MS_CONFIG, + "1000", + TopicConfigs.CLEANUP_POLICY_CONFIG, + TopicConfigs.CLEANUP_POLICY_COMPACT)) + .run() + .toCompletableFuture() + .get(); + + var key = "key"; + var anotherKey = "anotherKey"; + var value = "value"; + try (var producer = + Producer.builder() + .keySerializer(Serializer.STRING) + .valueSerializer(Serializer.STRING) + .bootstrapServers(bootstrapServers()) + .build()) { + IntStream.range(0, 10) + .forEach(i -> producer.sender().key(key).value(value).topic(topic).run()); + producer.flush(); + + Utils.sleep(Duration.ofSeconds(2)); + IntStream.range(0, 10) + .forEach(i -> producer.sender().key(anotherKey).value(value).topic(topic).run()); + producer.flush(); + } + + Utils.sleep(Duration.ofSeconds(3)); + + try (var consumer = + Consumer.forTopics(Set.of(topic)) + .keyDeserializer(Deserializer.STRING) + .valueDeserializer(Deserializer.STRING) + .config( + ConsumerConfigs.AUTO_OFFSET_RESET_CONFIG, + ConsumerConfigs.AUTO_OFFSET_RESET_EARLIEST) + .bootstrapServers(bootstrapServers()) + .build()) { + + var records = + IntStream.range(0, 5) + .mapToObj(i -> consumer.poll(Duration.ofSeconds(1))) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + Assertions.assertEquals( + 1, records.stream().filter(record -> record.key().equals(key)).count()); + + Assertions.assertEquals( + 10, records.stream().filter(record -> record.key().equals(anotherKey)).count()); + } + } + } + + @Test + void testBrokers() throws ExecutionException, InterruptedException { + try (var admin = AsyncAdmin.of(bootstrapServers())) { + admin + .creator() + .topic(Utils.randomString()) + .numberOfPartitions(6) + .run() + .toCompletableFuture() + .get(); + Utils.sleep(Duration.ofSeconds(2)); + var brokers = admin.brokers().toCompletableFuture().get(); + Assertions.assertEquals(3, brokers.size()); + brokers.forEach(broker -> Assertions.assertNotEquals(0, broker.config().raw().size())); + Assertions.assertEquals(1, brokers.stream().filter(Broker::isController).count()); + brokers.forEach( + broker -> Assertions.assertNotEquals(0, broker.topicPartitionLeaders().size())); + } + } + + @Test + void testBrokerFolders() throws ExecutionException, InterruptedException { + try (var admin = AsyncAdmin.of(bootstrapServers())) { + Assertions.assertEquals( + brokerIds().size(), admin.brokers().toCompletableFuture().get().size()); + // list all + logFolders() + .forEach( + (id, ds) -> + Utils.packException( + () -> + Assertions.assertEquals( + admin.brokerFolders().toCompletableFuture().get().get(id).size(), + ds.size()))); + } + } + + @Test + void testReplicas() throws ExecutionException, InterruptedException { + var topic = Utils.randomString(); + try (var admin = AsyncAdmin.of(bootstrapServers())) { + admin.creator().topic(topic).numberOfPartitions(2).run().toCompletableFuture().get(); + Utils.sleep(Duration.ofSeconds(2)); + Assertions.assertEquals( + 2, + admin.replicas(Set.of(topic)).toCompletableFuture().get().stream() + .collect( + Collectors.groupingBy( + replica -> TopicPartition.of(replica.topic(), replica.partition()))) + .size()); + + var count = + admin + .replicas(admin.topicNames(true).toCompletableFuture().get()) + .toCompletableFuture() + .get() + .stream() + .collect( + Collectors.groupingBy( + replica -> TopicPartition.of(replica.topic(), replica.partition()))) + .size(); + Assertions.assertEquals( + count, + admin + .replicas(admin.topicNames(true).toCompletableFuture().get()) + .toCompletableFuture() + .get() + .stream() + .collect( + Collectors.groupingBy( + replica -> TopicPartition.of(replica.topic(), replica.partition()))) + .size()); + } + } + + @Test + void testReplicasPreferredLeaderFlag() throws ExecutionException, InterruptedException { + var topic = Utils.randomString(); + var partitionCount = 10; + try (var admin = AsyncAdmin.of(bootstrapServers())) { + admin + .creator() + .topic(topic) + .numberOfPartitions(partitionCount) + .numberOfReplicas((short) 3) + .run() + .toCompletableFuture() + .get(); + + Utils.sleep(Duration.ofSeconds(3)); + + var expectedPreferredLeader = + IntStream.range(0, partitionCount) + .mapToObj(p -> TopicPartition.of(topic, p)) + .collect(Collectors.toUnmodifiableMap(p -> p, p -> List.of(0))); + + var currentPreferredLeader = + (Supplier>>) + () -> + Utils.packException( + () -> + admin.replicas(Set.of(topic)).toCompletableFuture().get().stream() + .filter(Replica::isPreferredLeader) + .collect( + Collectors.groupingBy( + replica -> + TopicPartition.of(replica.topic(), replica.partition()), + Collectors.mapping( + replica -> replica.nodeInfo().id(), + Collectors.toList())))); + + IntStream.range(0, partitionCount) + .forEach(p -> admin.moveToBrokers(Map.of(TopicPartition.of(topic, p), List.of(0, 1, 2)))); + Utils.sleep(Duration.ofSeconds(3)); + + Assertions.assertEquals(expectedPreferredLeader, currentPreferredLeader.get()); + } + } }