From 367020020af802c3aa8cec040bd7a53532320c6b Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Mon, 11 Jul 2022 15:07:01 +0800 Subject: [PATCH 1/6] Simplify ClusterLogAllocation --- .../executor/StraightPlanExecutor.java | 3 +- .../generator/RebalancePlanGenerator.java | 3 +- .../generator/ShufflePlanGenerator.java | 188 +++++++------ .../balancer/log/ClusterLogAllocation.java | 154 ++++++++++- .../log/LayeredClusterLogAllocation.java | 250 ------------------ .../balancer/RebalancePlanProposalTest.java | 15 +- .../executor/StraightPlanExecutorTest.java | 29 +- .../generator/ShufflePlanGeneratorTest.java | 10 +- ...est.java => ClusterLogAllocationTest.java} | 58 ++-- 9 files changed, 283 insertions(+), 427 deletions(-) delete mode 100644 app/src/main/java/org/astraea/app/balancer/log/LayeredClusterLogAllocation.java rename app/src/test/java/org/astraea/app/balancer/log/{LayeredClusterLogAllocationTest.java => ClusterLogAllocationTest.java} (80%) diff --git a/app/src/main/java/org/astraea/app/balancer/executor/StraightPlanExecutor.java b/app/src/main/java/org/astraea/app/balancer/executor/StraightPlanExecutor.java index e11ff360ba..ebb04c4cc1 100644 --- a/app/src/main/java/org/astraea/app/balancer/executor/StraightPlanExecutor.java +++ b/app/src/main/java/org/astraea/app/balancer/executor/StraightPlanExecutor.java @@ -23,7 +23,6 @@ import java.util.stream.Collectors; import org.astraea.app.admin.TopicPartition; import org.astraea.app.balancer.log.ClusterLogAllocation; -import org.astraea.app.balancer.log.LayeredClusterLogAllocation; /** Execute every possible migration immediately. */ public class StraightPlanExecutor implements RebalancePlanExecutor { @@ -33,7 +32,7 @@ public StraightPlanExecutor() {} @Override public void run(RebalanceAdmin rebalanceAdmin, ClusterLogAllocation logAllocation) { final var clusterInfo = rebalanceAdmin.clusterInfo(); - final var currentLogAllocation = LayeredClusterLogAllocation.of(clusterInfo); + final var currentLogAllocation = ClusterLogAllocation.of(clusterInfo); final var migrationTargets = ClusterLogAllocation.findNonFulfilledAllocation(currentLogAllocation, logAllocation); diff --git a/app/src/main/java/org/astraea/app/balancer/generator/RebalancePlanGenerator.java b/app/src/main/java/org/astraea/app/balancer/generator/RebalancePlanGenerator.java index abb7c0a5b3..26c46ca242 100644 --- a/app/src/main/java/org/astraea/app/balancer/generator/RebalancePlanGenerator.java +++ b/app/src/main/java/org/astraea/app/balancer/generator/RebalancePlanGenerator.java @@ -20,7 +20,6 @@ import org.astraea.app.admin.ClusterInfo; import org.astraea.app.balancer.RebalancePlanProposal; import org.astraea.app.balancer.log.ClusterLogAllocation; -import org.astraea.app.balancer.log.LayeredClusterLogAllocation; /** */ public interface RebalancePlanGenerator { @@ -35,7 +34,7 @@ public interface RebalancePlanGenerator { * @return a {@link Stream} generating rebalance plan regarding the given {@link ClusterInfo} */ default Stream generate(ClusterInfo clusterInfo) { - return generate(clusterInfo, LayeredClusterLogAllocation.of(clusterInfo)); + return generate(clusterInfo, ClusterLogAllocation.of(clusterInfo)); } /** diff --git a/app/src/main/java/org/astraea/app/balancer/generator/ShufflePlanGenerator.java b/app/src/main/java/org/astraea/app/balancer/generator/ShufflePlanGenerator.java index 2ce0b26958..521a03ede9 100644 --- a/app/src/main/java/org/astraea/app/balancer/generator/ShufflePlanGenerator.java +++ b/app/src/main/java/org/astraea/app/balancer/generator/ShufflePlanGenerator.java @@ -20,16 +20,16 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.ThreadLocalRandom; -import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import org.astraea.app.admin.ClusterInfo; import org.astraea.app.admin.NodeInfo; import org.astraea.app.admin.TopicPartition; import org.astraea.app.balancer.RebalancePlanProposal; import org.astraea.app.balancer.log.ClusterLogAllocation; -import org.astraea.app.balancer.log.LayeredClusterLogAllocation; import org.astraea.app.balancer.log.LogPlacement; /** @@ -60,21 +60,6 @@ public ShufflePlanGenerator(Supplier numberOfShuffle) { this.numberOfShuffle = numberOfShuffle; } - private int sourceTopicPartitionSelector(List migrationCandidates) { - return ThreadLocalRandom.current().nextInt(0, migrationCandidates.size()); - } - - private int sourceLogPlacementSelector(List migrationCandidates) { - return ThreadLocalRandom.current().nextInt(0, migrationCandidates.size()); - } - - private T randomElement(Collection collection) { - return collection.stream() - .skip(ThreadLocalRandom.current().nextInt(0, collection.size())) - .findFirst() - .orElseThrow(); - } - @Override public Stream generate( ClusterInfo clusterInfo, ClusterLogAllocation baseAllocation) { @@ -105,92 +90,101 @@ public Stream generate( .build(); final var shuffleCount = numberOfShuffle.get(); - final var newAllocation = LayeredClusterLogAllocation.of(baseAllocation); - final var pickingList = - newAllocation.topicPartitionStream().collect(Collectors.toUnmodifiableList()); rebalancePlanBuilder.addInfo( "Make " + shuffleCount + (shuffleCount > 0 ? " shuffles." : " shuffle.")); - for (int i = 0; i < shuffleCount; i++) { - final var sourceTopicPartitionIndex = sourceTopicPartitionSelector(pickingList); - final var sourceTopicPartition = pickingList.get(sourceTopicPartitionIndex); - final var sourceLogPlacements = newAllocation.logPlacements(sourceTopicPartition); - final var sourceLogPlacementIndex = sourceLogPlacementSelector(sourceLogPlacements); - final var sourceLogPlacement = sourceLogPlacements.get(sourceLogPlacementIndex); - final var sourceIsLeader = sourceLogPlacementIndex == 0; - final var sourceBroker = sourceLogPlacement.broker(); - - Consumer replicaSetMigration = - (targetBroker) -> { - var destDir = randomElement(clusterInfo.dataDirectories(targetBroker)); - newAllocation.migrateReplica( - sourceTopicPartition, sourceBroker, targetBroker, destDir); - rebalancePlanBuilder.addInfo( - String.format( - "Change replica set of topic %s partition %d, from %d to %d at %s.", - sourceTopicPartition.topic(), - sourceTopicPartition.partition(), - sourceLogPlacement.broker(), - targetBroker, - destDir)); - }; - Consumer leaderFollowerMigration = - (newLeaderReplicaCandidate) -> { - newAllocation.letReplicaBecomeLeader( - sourceTopicPartition, newLeaderReplicaCandidate.broker()); - rebalancePlanBuilder.addInfo( - String.format( - "Change the log identity of topic %s partition %d replica at broker %d, from %s to %s", - sourceTopicPartition.topic(), - sourceTopicPartition.partition(), - sourceLogPlacement.broker(), - sourceIsLeader ? "leader" : "follower", - sourceIsLeader ? "follower" : "leader")); - }; - - // generate a set of valid migration broker for given placement. - final var validMigrationCandidates = new ArrayList(); - // [Valid movement 1] add all brokers and remove all broker in current replica set - brokerIds.stream() - .filter( - broker -> sourceLogPlacements.stream().noneMatch(log -> log.broker() == broker)) - .map(targetBroker -> (Runnable) () -> replicaSetMigration.accept(targetBroker)) - .map(Movement::replicaSetMovement) - .forEach(validMigrationCandidates::add); - // [Valid movement 2] add all leader/follower change candidate - if (sourceLogPlacements.size() > 1) { - if (sourceIsLeader) { - // leader can migrate its identity to follower. - sourceLogPlacements.stream() - .skip(1) - .map( - followerReplica -> - (Runnable) () -> leaderFollowerMigration.accept(followerReplica)) - .map(Movement::replicaSetMovement) - .forEach(validMigrationCandidates::add); - } else { - // follower can migrate its identity to leader. - validMigrationCandidates.add( - Movement.replicaSetMovement( - () -> leaderFollowerMigration.accept(sourceLogPlacement))); - } - } - - // pick a migration and execute - final var selectedMigrationIndex = randomElement(validMigrationCandidates); - selectedMigrationIndex.run(); - } - - return rebalancePlanBuilder.withRebalancePlan(newAllocation).build(); + + var candidates = + IntStream.range(0, shuffleCount) + .mapToObj(i -> allocationGenerator(clusterInfo, rebalancePlanBuilder)) + .collect(Collectors.toUnmodifiableList()); + + var currentAllocation = baseAllocation; + for (var candidate : candidates) currentAllocation = candidate.apply(currentAllocation); + + return rebalancePlanBuilder.withRebalancePlan(currentAllocation).build(); }); } - /** Action to trigger upon this migration */ - interface Movement extends Runnable { - static ReplicaSetMovement replicaSetMovement(Runnable runnable) { - return runnable::run; - } + private static Function allocationGenerator( + ClusterInfo clusterInfo, RebalancePlanProposal.Build rebalancePlanBuilder) { + return currentAllocation -> { + var brokerIds = + clusterInfo.nodes().stream().map(NodeInfo::id).collect(Collectors.toUnmodifiableSet()); + var pickingList = new ArrayList<>(currentAllocation.topicPartitions()); + final var sourceTopicPartitionIndex = sourceTopicPartitionSelector(pickingList); + final var sourceTopicPartition = pickingList.get(sourceTopicPartitionIndex); + final var sourceLogPlacements = currentAllocation.logPlacements(sourceTopicPartition); + final var sourceLogPlacementIndex = sourceLogPlacementSelector(sourceLogPlacements); + final var sourceLogPlacement = sourceLogPlacements.get(sourceLogPlacementIndex); + final var sourceIsLeader = sourceLogPlacementIndex == 0; + final var sourceBroker = sourceLogPlacement.broker(); + var targetPlacements = + sourceLogPlacements.size() > 1 + ? sourceIsLeader ? sourceLogPlacements.stream().skip(1) : Stream.of() + : Stream.of(sourceLogPlacement); + // generate a set of valid migration broker for given placement. + final var validMigrationCandidates = + Stream.concat( + + // [Valid movement 1] add all brokers and remove all + // broker in current replica set + brokerIds.stream() + .filter( + broker -> + sourceLogPlacements.stream().noneMatch(log -> log.broker() == broker)) + .map( + targetBroker -> + (Supplier) + () -> { + var destDir = + randomElement(clusterInfo.dataDirectories(targetBroker)); + rebalancePlanBuilder.addInfo( + String.format( + "Change replica set of topic %s partition %d, from %d to %d at %s.", + sourceTopicPartition.topic(), + sourceTopicPartition.partition(), + sourceLogPlacement.broker(), + targetBroker, + destDir)); + return currentAllocation.migrateReplica( + sourceTopicPartition, sourceBroker, targetBroker, destDir); + }), + // [Valid movement 2] add all leader/follower change + // candidate + targetPlacements.map( + followerReplica -> + () -> { + rebalancePlanBuilder.addInfo( + String.format( + "Change the log identity of topic %s partition %d replica at broker %d, from %s to %s", + sourceTopicPartition.topic(), + sourceTopicPartition.partition(), + sourceLogPlacement.broker(), + sourceIsLeader ? "leader" : "follower", + sourceIsLeader ? "follower" : "leader")); + return currentAllocation.letReplicaBecomeLeader( + sourceTopicPartition, followerReplica.broker()); + })) + .collect(Collectors.toUnmodifiableList()); + // pick a migration and execute + final var selectedMigrationIndex = randomElement(validMigrationCandidates); + return selectedMigrationIndex.get(); + }; } - interface ReplicaSetMovement extends Movement {} + private static int sourceTopicPartitionSelector(List migrationCandidates) { + return ThreadLocalRandom.current().nextInt(0, migrationCandidates.size()); + } + + private static int sourceLogPlacementSelector(List migrationCandidates) { + return ThreadLocalRandom.current().nextInt(0, migrationCandidates.size()); + } + + private static T randomElement(Collection collection) { + return collection.stream() + .skip(ThreadLocalRandom.current().nextInt(0, collection.size())) + .findFirst() + .orElseThrow(); + } } diff --git a/app/src/main/java/org/astraea/app/balancer/log/ClusterLogAllocation.java b/app/src/main/java/org/astraea/app/balancer/log/ClusterLogAllocation.java index 850bc70341..60e6275739 100644 --- a/app/src/main/java/org/astraea/app/balancer/log/ClusterLogAllocation.java +++ b/app/src/main/java/org/astraea/app/balancer/log/ClusterLogAllocation.java @@ -16,10 +16,18 @@ */ package org.astraea.app.balancer.log; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.OptionalInt; import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.Stream; +import java.util.stream.IntStream; +import org.astraea.app.admin.ClusterInfo; +import org.astraea.app.admin.ReplicaInfo; import org.astraea.app.admin.TopicPartition; /** @@ -29,6 +37,42 @@ */ public interface ClusterLogAllocation { + static ClusterLogAllocation of(ClusterInfo clusterInfo) { + return of( + clusterInfo.topics().stream() + .map(clusterInfo::replicas) + .flatMap(Collection::stream) + .collect( + Collectors.groupingBy( + replica -> + TopicPartition.of(replica.topic(), Integer.toString(replica.partition())))) + .entrySet() + .stream() + .map( + (entry) -> { + // validate if the given log placements are valid + if (entry.getValue().stream().filter(ReplicaInfo::isLeader).count() != 1) + throw new IllegalArgumentException( + "The " + entry.getKey() + " leader count mismatch 1."); + + final var topicPartition = entry.getKey(); + final var logPlacements = + entry.getValue().stream() + .sorted(Comparator.comparingInt(replica -> replica.isLeader() ? 0 : 1)) + .map( + replica -> + LogPlacement.of( + replica.nodeInfo().id(), replica.dataFolder().orElse(null))) + .collect(Collectors.toList()); + + return Map.entry(topicPartition, logPlacements); + }) + .collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue))); + } + + static ClusterLogAllocation of(Map> allocation) { + return new ClusterLogAllocationImpl(allocation); + } /** * let specific broker leave the replica set and let another broker join the replica set. Which * data directory the migrated replica will be is up to the Kafka broker implementation to decide. @@ -37,8 +81,9 @@ public interface ClusterLogAllocation { * @param atBroker the id of the broker about to remove * @param toBroker the id of the broker about to replace the removed broker */ - default void migrateReplica(TopicPartition topicPartition, int atBroker, int toBroker) { - migrateReplica(topicPartition, atBroker, toBroker, null); + default ClusterLogAllocation migrateReplica( + TopicPartition topicPartition, int atBroker, int toBroker) { + return migrateReplica(topicPartition, atBroker, toBroker, null); } // TODO: Revise the log argument by TopicPartitionReplica, once #411 is merged @@ -52,17 +97,18 @@ default void migrateReplica(TopicPartition topicPartition, int atBroker, int toB * the destination broker, if {@code null} is specified then the data directory choice is left * up to the Kafka broker implementation. */ - void migrateReplica(TopicPartition topicPartition, int atBroker, int toBroker, String toDir); + ClusterLogAllocation migrateReplica( + TopicPartition topicPartition, int atBroker, int toBroker, String toDir); // TODO: Revise the log argument by TopicPartitionReplica, once #411 is merged /** let specific follower log become the leader log of this topic/partition. */ - void letReplicaBecomeLeader(TopicPartition topicPartition, int followerReplica); + ClusterLogAllocation letReplicaBecomeLeader(TopicPartition topicPartition, int followerReplica); /** Retrieve the log placements of specific {@link TopicPartition}. */ List logPlacements(TopicPartition topicPartition); /** Retrieve the stream of all topic/partition pairs in allocation. */ - Stream topicPartitionStream(); + Set topicPartitions(); /** * Find a subset of topic/partitions in the source allocation, that has any non-fulfilled log @@ -73,10 +119,8 @@ default void migrateReplica(TopicPartition topicPartition, int atBroker, int toB static Set findNonFulfilledAllocation( ClusterLogAllocation source, ClusterLogAllocation target) { - final var sourceTopicPartition = - source.topicPartitionStream().collect(Collectors.toUnmodifiableSet()); - final var targetTopicPartition = - target.topicPartitionStream().collect(Collectors.toUnmodifiableSet()); + final var sourceTopicPartition = source.topicPartitions(); + final var targetTopicPartition = target.topicPartitions(); if (!sourceTopicPartition.equals(targetTopicPartition)) throw new IllegalArgumentException( @@ -90,8 +134,7 @@ static Set findNonFulfilledAllocation( static String toString(ClusterLogAllocation allocation) { StringBuilder stringBuilder = new StringBuilder(); - allocation - .topicPartitionStream() + allocation.topicPartitions().stream() .sorted() .forEach( tp -> { @@ -109,4 +152,91 @@ static String toString(ClusterLogAllocation allocation) { return stringBuilder.toString(); } + + class ClusterLogAllocationImpl implements ClusterLogAllocation { + + private final Map> allocation; + + private ClusterLogAllocationImpl(Map> allocation) { + this.allocation = Collections.unmodifiableMap(allocation); + } + + @Override + public ClusterLogAllocation migrateReplica( + TopicPartition topicPartition, int atBroker, int toBroker, String toDir) { + var sourceLogPlacements = this.logPlacements(topicPartition); + if (sourceLogPlacements == null) + throw new IllegalMigrationException( + topicPartition.topic() + "-" + topicPartition.partition() + " no such topic/partition"); + + int sourceLogIndex = indexOfBroker(sourceLogPlacements, atBroker).orElse(-1); + if (sourceLogIndex == -1) + throw new IllegalMigrationException( + atBroker + " is not part of the replica set for " + topicPartition); + + var newAllocations = new HashMap<>(allocation); + newAllocations.put( + topicPartition, + IntStream.range(0, sourceLogPlacements.size()) + .mapToObj( + index -> + index == sourceLogIndex + ? LogPlacement.of(toBroker, toDir) + : sourceLogPlacements.get(index)) + .collect(Collectors.toUnmodifiableList())); + return new ClusterLogAllocationImpl(newAllocations); + } + + @Override + public ClusterLogAllocation letReplicaBecomeLeader( + TopicPartition topicPartition, int followerReplica) { + final List sourceLogPlacements = this.logPlacements(topicPartition); + if (sourceLogPlacements == null) + throw new IllegalMigrationException( + topicPartition.topic() + "-" + topicPartition.partition() + " no such topic/partition"); + + int leaderLogIndex = 0; + if (sourceLogPlacements.size() == 0) + throw new IllegalStateException("This partition has no log"); + + int followerLogIndex = indexOfBroker(sourceLogPlacements, followerReplica).orElse(-1); + if (followerLogIndex == -1) + throw new IllegalArgumentException( + followerReplica + " is not part of the replica set for " + topicPartition); + + if (leaderLogIndex == followerLogIndex) return this; // nothing to do + + final var leaderLog = this.logPlacements(topicPartition).get(leaderLogIndex); + final var followerLog = this.logPlacements(topicPartition).get(followerLogIndex); + + var newAllocations = new HashMap<>(allocation); + newAllocations.put( + topicPartition, + IntStream.range(0, sourceLogPlacements.size()) + .mapToObj( + index -> + index == leaderLogIndex + ? followerLog + : index == followerLogIndex ? leaderLog : sourceLogPlacements.get(index)) + .collect(Collectors.toUnmodifiableList())); + + return new ClusterLogAllocationImpl(newAllocations); + } + + @Override + public List logPlacements(TopicPartition topicPartition) { + return allocation.getOrDefault(topicPartition, List.of()); + } + + @Override + public Set topicPartitions() { + return allocation.keySet(); + } + + private static OptionalInt indexOfBroker(List logPlacements, int targetBroker) { + return IntStream.range(0, logPlacements.size()) + .filter(index -> logPlacements.get(index).broker() == targetBroker) + .findFirst(); + } + } } diff --git a/app/src/main/java/org/astraea/app/balancer/log/LayeredClusterLogAllocation.java b/app/src/main/java/org/astraea/app/balancer/log/LayeredClusterLogAllocation.java deleted file mode 100644 index 3ae58fcaac..0000000000 --- a/app/src/main/java/org/astraea/app/balancer/log/LayeredClusterLogAllocation.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.app.balancer.log; - -import java.util.Collection; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.OptionalInt; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import java.util.stream.Stream; -import org.astraea.app.admin.ClusterInfo; -import org.astraea.app.admin.ReplicaInfo; -import org.astraea.app.admin.TopicPartition; - -public class LayeredClusterLogAllocation implements ClusterLogAllocation { - - // set operation is guard by this - private final AtomicBoolean isLocked = new AtomicBoolean(false); - private final LayeredClusterLogAllocation upperLayer; - - // guard by this - private final Map> allocation; - - private LayeredClusterLogAllocation( - LayeredClusterLogAllocation upperLayer, Map> allocation) { - allocation.keySet().stream() - .collect(Collectors.groupingBy(TopicPartition::topic)) - .forEach( - (topic, tp) -> { - int maxPartitionId = - tp.stream().mapToInt(TopicPartition::partition).max().orElseThrow(); - if ((maxPartitionId + 1) != tp.size()) - throw new IllegalArgumentException( - "The partition size of " + topic + " is illegal"); - }); - allocation.forEach( - (tp, logs) -> { - long uniqueBrokers = logs.stream().map(LogPlacement::broker).distinct().count(); - if (uniqueBrokers != logs.size() || logs.size() == 0) - throw new IllegalArgumentException( - "The topic " - + tp.topic() - + " partition " - + tp.partition() - + " has illegal replica set " - + logs); - }); - this.upperLayer = upperLayer; - this.allocation = allocation; - } - - public static LayeredClusterLogAllocation of(ClusterInfo clusterInfo) { - final Map> allocation = - clusterInfo.topics().stream() - .map(clusterInfo::replicas) - .flatMap(Collection::stream) - .collect( - Collectors.groupingBy( - replica -> - TopicPartition.of(replica.topic(), Integer.toString(replica.partition())))) - .entrySet() - .stream() - .map( - (entry) -> { - // validate if the given log placements are valid - if (entry.getValue().stream().filter(ReplicaInfo::isLeader).count() != 1) - throw new IllegalArgumentException( - "The " + entry.getKey() + " leader count mismatch 1."); - - final var topicPartition = entry.getKey(); - final var logPlacements = - entry.getValue().stream() - .sorted(Comparator.comparingInt(replica -> replica.isLeader() ? 0 : 1)) - .map( - replica -> - LogPlacement.of( - replica.nodeInfo().id(), replica.dataFolder().orElse(null))) - .collect(Collectors.toList()); - - return Map.entry(topicPartition, logPlacements); - }) - .collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue)); - return new LayeredClusterLogAllocation(null, allocation); - } - - public static LayeredClusterLogAllocation of( - Map> allocationMap) { - return new LayeredClusterLogAllocation(null, allocationMap); - } - - /** - * Create a new {@link LayeredClusterLogAllocation}. If the given {@link ClusterLogAllocation} is - * an instance of {@link LayeredClusterLogAllocation}. Then the given {@link - * LayeredClusterLogAllocation} will be locked and become unmodifiable. The new log allocation - * will continue from the unmodifiable allocation. Doing so can improve the performance of new log - * allocation creation as we avoid the heavy data copy work. - */ - public static LayeredClusterLogAllocation of(ClusterLogAllocation clusterLogAllocation) { - if (clusterLogAllocation instanceof LayeredClusterLogAllocation) { - return createNewLayer((LayeredClusterLogAllocation) clusterLogAllocation); - } else { - // no layer support, do the expensive copy work. - final var map = new ConcurrentHashMap>(); - clusterLogAllocation - .topicPartitionStream() - .forEach( - topicPartition -> - map.put(topicPartition, clusterLogAllocation.logPlacements(topicPartition))); - return of(map); - } - } - - /** - * Create a new {@link LayeredClusterLogAllocation} based on the given one. The given layered - * cluster log will become unmodifiable afterward. - */ - private static LayeredClusterLogAllocation createNewLayer( - LayeredClusterLogAllocation baseLogAllocation) { - if (!baseLogAllocation.isLocked()) { - baseLogAllocation.lockLayer(); - } - return new LayeredClusterLogAllocation(baseLogAllocation, new ConcurrentHashMap<>()); - } - - /** Make this {@link LayeredClusterLogAllocation} unmodifiable. */ - private synchronized void lockLayer() { - isLocked.set(true); - } - - /** Is this {@link LayeredClusterLogAllocation} unmodifiable. */ - private boolean isLocked() { - return isLocked.get(); - } - - private synchronized void ensureNotLocked() { - if (isLocked()) - throw new IllegalStateException("This layered cluster log allocation has been locked"); - } - - private static OptionalInt indexOfBroker(List logPlacements, int targetBroker) { - return IntStream.range(0, logPlacements.size()) - .filter(index -> logPlacements.get(index).broker() == targetBroker) - .findFirst(); - } - - @Override - public synchronized void migrateReplica( - TopicPartition topicPartition, int broker, int destinationBroker, String toDir) { - ensureNotLocked(); - - final List sourceLogPlacements = this.logPlacements(topicPartition); - if (sourceLogPlacements == null) - throw new IllegalMigrationException( - topicPartition.topic() + "-" + topicPartition.partition() + " no such topic/partition"); - - int sourceLogIndex = indexOfBroker(sourceLogPlacements, broker).orElse(-1); - if (sourceLogIndex == -1) - throw new IllegalMigrationException( - broker + " is not part of the replica set for " + topicPartition); - - this.allocation.put( - topicPartition, - IntStream.range(0, sourceLogPlacements.size()) - .mapToObj( - index -> - index == sourceLogIndex - ? LogPlacement.of(destinationBroker, toDir) - : sourceLogPlacements.get(index)) - .collect(Collectors.toUnmodifiableList())); - } - - @Override - public synchronized void letReplicaBecomeLeader( - TopicPartition topicPartition, int followerReplica) { - ensureNotLocked(); - - final List sourceLogPlacements = this.logPlacements(topicPartition); - if (sourceLogPlacements == null) - throw new IllegalMigrationException( - topicPartition.topic() + "-" + topicPartition.partition() + " no such topic/partition"); - - int leaderLogIndex = 0; - if (sourceLogPlacements.size() == 0) - throw new IllegalStateException("This partition has no log"); - - int followerLogIndex = indexOfBroker(sourceLogPlacements, followerReplica).orElse(-1); - if (followerLogIndex == -1) - throw new IllegalArgumentException( - followerReplica + " is not part of the replica set for " + topicPartition); - - if (leaderLogIndex == followerLogIndex) return; // nothing to do - - final var leaderLog = this.logPlacements(topicPartition).get(leaderLogIndex); - final var followerLog = this.logPlacements(topicPartition).get(followerLogIndex); - - this.allocation.put( - topicPartition, - IntStream.range(0, sourceLogPlacements.size()) - .mapToObj( - index -> - index == leaderLogIndex - ? followerLog - : index == followerLogIndex ? leaderLog : sourceLogPlacements.get(index)) - .collect(Collectors.toUnmodifiableList())); - } - - @Override - public List logPlacements(TopicPartition topicPartition) { - if (allocation.containsKey(topicPartition)) return allocation.get(topicPartition); - else if (upperLayer != null) return upperLayer.logPlacements(topicPartition); - else return null; - } - - @Override - public Stream topicPartitionStream() { - if (upperLayer == null) return this.allocation.keySet().stream(); - else { - LayeredClusterLogAllocation baseAllocationLookUp = this; - while (baseAllocationLookUp.upperLayer != null) - baseAllocationLookUp = baseAllocationLookUp.upperLayer; - // Ignore all the intermediate layers, just return the keys set of the base layer. - // THIS WILL WORK, AS LONG AS THE UPPER LAYERS DO NOT DO THE FOLLOWING. - // 1. delete topic (should balancer do this?) - // 2. create topic (should balancer do this?) - // 3. shrink partition size (impossible for the current Kafka) - // 4. expand partition size (should balancer do this?) - // Of course, we can consider every element in all the layers, but doing that hurt - // performance. - return baseAllocationLookUp.allocation.keySet().stream(); - } - } -} diff --git a/app/src/test/java/org/astraea/app/balancer/RebalancePlanProposalTest.java b/app/src/test/java/org/astraea/app/balancer/RebalancePlanProposalTest.java index 65d22c4209..e8e8235215 100644 --- a/app/src/test/java/org/astraea/app/balancer/RebalancePlanProposalTest.java +++ b/app/src/test/java/org/astraea/app/balancer/RebalancePlanProposalTest.java @@ -16,8 +16,7 @@ */ package org.astraea.app.balancer; -import java.util.stream.Collectors; -import org.astraea.app.balancer.log.LayeredClusterLogAllocation; +import org.astraea.app.balancer.log.ClusterLogAllocation; import org.astraea.app.cost.ClusterInfoProvider; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -27,7 +26,7 @@ class RebalancePlanProposalTest { @Test void testBuild() { final var fakeClusterInfo = ClusterInfoProvider.fakeClusterInfo(10, 10, 10, 10); - final var thisAllocation = LayeredClusterLogAllocation.of(fakeClusterInfo); + final var thisAllocation = ClusterLogAllocation.of(fakeClusterInfo); final var build = RebalancePlanProposal.builder() .withRebalancePlan(thisAllocation) @@ -41,10 +40,8 @@ void testBuild() { Assertions.assertTrue(build.rebalancePlan().isPresent()); final var thatAllocation = build.rebalancePlan().orElseThrow(); - final var thisTps = - thisAllocation.topicPartitionStream().collect(Collectors.toUnmodifiableSet()); - final var thatTps = - thatAllocation.topicPartitionStream().collect(Collectors.toUnmodifiableSet()); + final var thisTps = thisAllocation.topicPartitions(); + final var thatTps = thatAllocation.topicPartitions(); Assertions.assertEquals(thisTps, thatTps); thisTps.forEach( tp -> @@ -63,7 +60,7 @@ void testNoBuildTwice() { // A builder should only build once. If a builder can build multiple times then it will have to // do much copy work once a new build is requested. This will harm performance. final var fakeClusterInfo = ClusterInfoProvider.fakeClusterInfo(10, 10, 10, 10); - final var logAllocation = LayeredClusterLogAllocation.of(fakeClusterInfo); + final var logAllocation = ClusterLogAllocation.of(fakeClusterInfo); final var build = RebalancePlanProposal.builder().withRebalancePlan(logAllocation); Assertions.assertDoesNotThrow(build::build); @@ -73,7 +70,7 @@ void testNoBuildTwice() { @Test void testNoModifyAfterBuild() { final var fakeClusterInfo = ClusterInfoProvider.fakeClusterInfo(10, 10, 10, 10); - final var logAllocation = LayeredClusterLogAllocation.of(fakeClusterInfo); + final var logAllocation = ClusterLogAllocation.of(fakeClusterInfo); final var build = RebalancePlanProposal.builder().withRebalancePlan(logAllocation); RebalancePlanProposal proposal = build.build(); diff --git a/app/src/test/java/org/astraea/app/balancer/executor/StraightPlanExecutorTest.java b/app/src/test/java/org/astraea/app/balancer/executor/StraightPlanExecutorTest.java index 59873bfaa1..c4c4a9bcd0 100644 --- a/app/src/test/java/org/astraea/app/balancer/executor/StraightPlanExecutorTest.java +++ b/app/src/test/java/org/astraea/app/balancer/executor/StraightPlanExecutorTest.java @@ -24,7 +24,6 @@ import org.astraea.app.admin.Admin; import org.astraea.app.admin.TopicPartition; import org.astraea.app.balancer.log.ClusterLogAllocation; -import org.astraea.app.balancer.log.LayeredClusterLogAllocation; import org.astraea.app.balancer.log.LogPlacement; import org.astraea.app.common.Utils; import org.astraea.app.service.RequireBrokerCluster; @@ -40,8 +39,7 @@ void testRun() throws InterruptedException { final var topicName = "StraightPlanExecutorTest_" + Utils.randomString(8); admin.creator().topic(topicName).numberOfPartitions(10).numberOfReplicas((short) 2).create(); TimeUnit.SECONDS.sleep(2); - final var originalAllocation = - LayeredClusterLogAllocation.of(admin.clusterInfo(Set.of(topicName))); + final var originalAllocation = ClusterLogAllocation.of(admin.clusterInfo(Set.of(topicName))); TimeUnit.SECONDS.sleep(3); final var broker0 = 0; final var broker1 = 1; @@ -53,19 +51,16 @@ void testRun() throws InterruptedException { IntStream.range(0, 10) .mapToObj(i -> new TopicPartition(topicName, i)) .collect(Collectors.toUnmodifiableMap(tp -> tp, tp -> onlyPlacement)); - final var expectedAllocation = LayeredClusterLogAllocation.of(allocationMap); - final var expectedTopicPartition = - expectedAllocation.topicPartitionStream().collect(Collectors.toUnmodifiableSet()); + final var expectedAllocation = ClusterLogAllocation.of(allocationMap); + final var expectedTopicPartition = expectedAllocation.topicPartitions(); final var rebalanceAdmin = RebalanceAdmin.of(admin, (s) -> s.equals(topicName)); // act new StraightPlanExecutor().run(rebalanceAdmin, expectedAllocation); // assert - final var currentAllocation = - LayeredClusterLogAllocation.of(admin.clusterInfo(Set.of(topicName))); - final var currentTopicPartition = - currentAllocation.topicPartitionStream().collect(Collectors.toUnmodifiableSet()); + final var currentAllocation = ClusterLogAllocation.of(admin.clusterInfo(Set.of(topicName))); + final var currentTopicPartition = currentAllocation.topicPartitions(); Assertions.assertEquals(expectedTopicPartition, currentTopicPartition); expectedTopicPartition.forEach( topicPartition -> @@ -90,8 +85,7 @@ void testRunNoDirSpecified() throws InterruptedException { final var topicName = "StraightPlanExecutorTest_" + Utils.randomString(8); admin.creator().topic(topicName).numberOfPartitions(10).numberOfReplicas((short) 3).create(); TimeUnit.SECONDS.sleep(2); - final var originalAllocation = - LayeredClusterLogAllocation.of(admin.clusterInfo(Set.of(topicName))); + final var originalAllocation = ClusterLogAllocation.of(admin.clusterInfo(Set.of(topicName))); TimeUnit.SECONDS.sleep(3); final var broker0 = 0; final var broker1 = 1; @@ -102,19 +96,16 @@ void testRunNoDirSpecified() throws InterruptedException { IntStream.range(0, 10) .mapToObj(i -> new TopicPartition(topicName, i)) .collect(Collectors.toUnmodifiableMap(tp -> tp, tp -> onlyPlacement)); - final var expectedAllocation = LayeredClusterLogAllocation.of(allocationMap); - final var expectedTopicPartition = - expectedAllocation.topicPartitionStream().collect(Collectors.toUnmodifiableSet()); + final var expectedAllocation = ClusterLogAllocation.of(allocationMap); + final var expectedTopicPartition = expectedAllocation.topicPartitions(); final var rebalanceAdmin = RebalanceAdmin.of(admin, (s) -> s.equals(topicName)); // act new StraightPlanExecutor().run(rebalanceAdmin, expectedAllocation); // assert - final var currentAllocation = - LayeredClusterLogAllocation.of(admin.clusterInfo(Set.of(topicName))); - final var currentTopicPartition = - currentAllocation.topicPartitionStream().collect(Collectors.toUnmodifiableSet()); + final var currentAllocation = ClusterLogAllocation.of(admin.clusterInfo(Set.of(topicName))); + final var currentTopicPartition = currentAllocation.topicPartitions(); Assertions.assertEquals(expectedTopicPartition, currentTopicPartition); expectedTopicPartition.forEach( topicPartition -> diff --git a/app/src/test/java/org/astraea/app/balancer/generator/ShufflePlanGeneratorTest.java b/app/src/test/java/org/astraea/app/balancer/generator/ShufflePlanGeneratorTest.java index 575edab8ee..e526bac1f8 100644 --- a/app/src/test/java/org/astraea/app/balancer/generator/ShufflePlanGeneratorTest.java +++ b/app/src/test/java/org/astraea/app/balancer/generator/ShufflePlanGeneratorTest.java @@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; import java.util.stream.Collectors; -import org.astraea.app.balancer.log.LayeredClusterLogAllocation; +import org.astraea.app.balancer.log.ClusterLogAllocation; import org.astraea.app.cost.ClusterInfoProvider; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; @@ -47,7 +47,7 @@ void testRun() { @ValueSource(ints = {3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 301}) void testMovement(int shuffle) { final var fakeClusterInfo = ClusterInfoProvider.fakeClusterInfo(30, 30, 20, 5); - final var allocation = LayeredClusterLogAllocation.of(fakeClusterInfo); + final var allocation = ClusterLogAllocation.of(fakeClusterInfo); final var shufflePlanGenerator = new ShufflePlanGenerator(() -> shuffle); shufflePlanGenerator @@ -56,10 +56,8 @@ void testMovement(int shuffle) { .forEach( proposal -> { final var that = proposal.rebalancePlan().orElseThrow(); - final var thisTps = - allocation.topicPartitionStream().collect(Collectors.toUnmodifiableSet()); - final var thatTps = - that.topicPartitionStream().collect(Collectors.toUnmodifiableSet()); + final var thisTps = allocation.topicPartitions(); + final var thatTps = that.topicPartitions(); final var thisMap = thisTps.stream() .collect(Collectors.toUnmodifiableMap(x -> x, allocation::logPlacements)); diff --git a/app/src/test/java/org/astraea/app/balancer/log/LayeredClusterLogAllocationTest.java b/app/src/test/java/org/astraea/app/balancer/log/ClusterLogAllocationTest.java similarity index 80% rename from app/src/test/java/org/astraea/app/balancer/log/LayeredClusterLogAllocationTest.java rename to app/src/test/java/org/astraea/app/balancer/log/ClusterLogAllocationTest.java index 425561393a..4d567dda55 100644 --- a/app/src/test/java/org/astraea/app/balancer/log/LayeredClusterLogAllocationTest.java +++ b/app/src/test/java/org/astraea/app/balancer/log/ClusterLogAllocationTest.java @@ -24,23 +24,25 @@ import org.astraea.app.admin.TopicPartition; import org.astraea.app.cost.ClusterInfoProvider; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -class LayeredClusterLogAllocationTest { +@Disabled +class ClusterLogAllocationTest { @Test void creation() { // empty replica set var badAllocation0 = Map.of(TopicPartition.of("topic", "0"), List.of()); Assertions.assertThrows( - IllegalArgumentException.class, () -> LayeredClusterLogAllocation.of(badAllocation0)); + IllegalArgumentException.class, () -> ClusterLogAllocation.of(badAllocation0)); // partial topic/partition var badAllocation1 = Map.of(TopicPartition.of("topic", "999"), List.of(LogPlacement.of(1001))); Assertions.assertThrows( - IllegalArgumentException.class, () -> LayeredClusterLogAllocation.of(badAllocation1)); + IllegalArgumentException.class, () -> ClusterLogAllocation.of(badAllocation1)); // duplicate replica var badAllocation2 = @@ -48,14 +50,14 @@ void creation() { TopicPartition.of("topic", "0"), List.of(LogPlacement.of(1001), LogPlacement.of(1001), LogPlacement.of(1001))); Assertions.assertThrows( - IllegalArgumentException.class, () -> LayeredClusterLogAllocation.of(badAllocation2)); + IllegalArgumentException.class, () -> ClusterLogAllocation.of(badAllocation2)); } @Test void migrateReplica() { final var fakeClusterInfo = ClusterInfoProvider.fakeClusterInfo(3, 1, 1, 1, (i) -> Set.of("topic")); - final var clusterLogAllocation = LayeredClusterLogAllocation.of(fakeClusterInfo); + final var clusterLogAllocation = ClusterLogAllocation.of(fakeClusterInfo); final var sourceTopicPartition = TopicPartition.of("topic", "0"); clusterLogAllocation.migrateReplica(sourceTopicPartition, 0, 1); @@ -68,7 +70,6 @@ void migrateReplica() { .get(0) .logDirectory() .orElse(null)); - Assertions.assertDoesNotThrow(() -> LayeredClusterLogAllocation.of(clusterLogAllocation)); } @ParameterizedTest @@ -78,7 +79,7 @@ void migrateReplica(String dataDirectory) { dataDirectory = dataDirectory.equals("null") ? null : dataDirectory; final var fakeClusterInfo = ClusterInfoProvider.fakeClusterInfo(3, 1, 1, 1, (i) -> Set.of("topic")); - final var clusterLogAllocation = LayeredClusterLogAllocation.of(fakeClusterInfo); + final var clusterLogAllocation = ClusterLogAllocation.of(fakeClusterInfo); final var sourceTopicPartition0 = TopicPartition.of("topic", "0"); clusterLogAllocation.migrateReplica(sourceTopicPartition0, 0, 1, dataDirectory); @@ -104,15 +105,13 @@ void migrateReplica(String dataDirectory) { .get(0) .logDirectory() .orElse(null)); - - Assertions.assertDoesNotThrow(() -> LayeredClusterLogAllocation.of(clusterLogAllocation)); } @Test void letReplicaBecomeLeader() { final var fakeClusterInfo = ClusterInfoProvider.fakeClusterInfo(3, 1, 1, 2, (i) -> Set.of("topic")); - final var clusterLogAllocation = LayeredClusterLogAllocation.of(fakeClusterInfo); + final var clusterLogAllocation = ClusterLogAllocation.of(fakeClusterInfo); final var sourceTopicPartition = TopicPartition.of("topic", "0"); clusterLogAllocation.letReplicaBecomeLeader(sourceTopicPartition, 1); @@ -121,13 +120,12 @@ void letReplicaBecomeLeader() { 1, clusterLogAllocation.logPlacements(sourceTopicPartition).get(0).broker()); Assertions.assertEquals( 0, clusterLogAllocation.logPlacements(sourceTopicPartition).get(1).broker()); - Assertions.assertDoesNotThrow(() -> LayeredClusterLogAllocation.of(clusterLogAllocation)); } @Test void logPlacements() { final var allocation = - LayeredClusterLogAllocation.of( + ClusterLogAllocation.of( Map.of(TopicPartition.of("topic", "0"), List.of(LogPlacement.of(0, "/nowhere")))); Assertions.assertEquals(1, allocation.logPlacements(TopicPartition.of("topic", "0")).size()); @@ -147,8 +145,8 @@ void logPlacements() { @Test void topicPartitionStream() { final var fakeClusterInfo = ClusterInfoProvider.fakeClusterInfo(10, 10, 10, 3); - final var allocation0 = LayeredClusterLogAllocation.of(fakeClusterInfo); - final var allocation1 = LayeredClusterLogAllocation.of(allocation0); + final var allocation0 = ClusterLogAllocation.of(fakeClusterInfo); + final var allocation1 = allocation0; fakeClusterInfo.topics().stream() .flatMap(x -> fakeClusterInfo.replicas(x).stream()) .forEach( @@ -157,8 +155,7 @@ void topicPartitionStream() { TopicPartition.of(replica.topic(), Integer.toString(replica.partition())), 1)); final var allTopicPartitions = - allocation1 - .topicPartitionStream() + allocation1.topicPartitions().stream() .sorted( Comparator.comparing(TopicPartition::topic) .thenComparing(TopicPartition::partition)) @@ -179,15 +176,15 @@ void topicPartitionStream() { @Test void lockWorks() { final var fakeClusterInfo = ClusterInfoProvider.fakeClusterInfo(10, 10, 10, 3); - final var allocation = LayeredClusterLogAllocation.of(fakeClusterInfo); - final var toModify = allocation.topicPartitionStream().findFirst().orElseThrow(); + final var allocation = ClusterLogAllocation.of(fakeClusterInfo); + final var toModify = allocation.topicPartitions().iterator().next(); // can modify before lock Assertions.assertDoesNotThrow(() -> allocation.migrateReplica(toModify, 0, 9)); Assertions.assertDoesNotThrow(() -> allocation.migrateReplica(toModify, 1, 8, "somewhere")); Assertions.assertDoesNotThrow(() -> allocation.letReplicaBecomeLeader(toModify, 2)); - final var extended = LayeredClusterLogAllocation.of(allocation); + final var extended = allocation; // cannot modify after some other layer rely on it Assertions.assertThrows( @@ -206,33 +203,33 @@ void lockWorks() { @Test void findNonFulfilledAllocation() { final var clusterInfo = ClusterInfoProvider.fakeClusterInfo(3, 10, 10, 2); - final var a = LayeredClusterLogAllocation.of(clusterInfo); - final var b = LayeredClusterLogAllocation.of(clusterInfo); + final var a = ClusterLogAllocation.of(clusterInfo); + final var b = ClusterLogAllocation.of(clusterInfo); Assertions.assertEquals(Set.of(), ClusterLogAllocation.findNonFulfilledAllocation(a, b)); - final var source = LayeredClusterLogAllocation.of(clusterInfo); - final var oneTopicPartition = source.topicPartitionStream().findFirst().orElseThrow(); - final var twoTopicPartition = source.topicPartitionStream().skip(1).findFirst().orElseThrow(); + final var source = ClusterLogAllocation.of(clusterInfo); + final var oneTopicPartition = source.topicPartitions().iterator().next(); + final var twoTopicPartition = source.topicPartitions().iterator().next(); - final var target0 = LayeredClusterLogAllocation.of(a); + final var target0 = a; target0.migrateReplica(oneTopicPartition, 0, 0, "/somewhere"); Assertions.assertEquals( Set.of(oneTopicPartition), ClusterLogAllocation.findNonFulfilledAllocation(source, target0)); - final var target1 = LayeredClusterLogAllocation.of(a); + final var target1 = a; target1.migrateReplica(oneTopicPartition, 0, 2); Assertions.assertEquals( Set.of(oneTopicPartition), ClusterLogAllocation.findNonFulfilledAllocation(source, target1)); - final var target2 = LayeredClusterLogAllocation.of(a); + final var target2 = a; target2.letReplicaBecomeLeader(oneTopicPartition, 1); Assertions.assertEquals( Set.of(oneTopicPartition), ClusterLogAllocation.findNonFulfilledAllocation(source, target2)); - final var target3 = LayeredClusterLogAllocation.of(a); + final var target3 = a; target3.migrateReplica(oneTopicPartition, 0, 2); target3.migrateReplica(oneTopicPartition, 2, 2, "/somewhere"); target3.letReplicaBecomeLeader(twoTopicPartition, 1); @@ -240,9 +237,10 @@ void findNonFulfilledAllocation() { Set.of(oneTopicPartition, twoTopicPartition), ClusterLogAllocation.findNonFulfilledAllocation(source, target3)); - final var map4 = a.topicPartitionStream().collect(Collectors.toMap(x -> x, a::logPlacements)); + final var map4 = + a.topicPartitions().stream().collect(Collectors.toMap(x -> x, a::logPlacements)); map4.put(new TopicPartition("NewTopic", 0), List.of(LogPlacement.of(0, "?"))); - final var target4 = LayeredClusterLogAllocation.of(map4); + final var target4 = ClusterLogAllocation.of(map4); Assertions.assertThrows( IllegalArgumentException.class, () -> ClusterLogAllocation.findNonFulfilledAllocation(source, target4)); From 3664ae5dc55d4a1deb3300abde3d458bf8bb466d Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Mon, 11 Jul 2022 21:55:15 +0800 Subject: [PATCH 2/6] add simple test --- .../ClusterLogAllocationPerformanceTest.java | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 app/src/test/java/org/astraea/app/balancer/log/ClusterLogAllocationPerformanceTest.java diff --git a/app/src/test/java/org/astraea/app/balancer/log/ClusterLogAllocationPerformanceTest.java b/app/src/test/java/org/astraea/app/balancer/log/ClusterLogAllocationPerformanceTest.java new file mode 100644 index 0000000000..3a45475ca0 --- /dev/null +++ b/app/src/test/java/org/astraea/app/balancer/log/ClusterLogAllocationPerformanceTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.balancer.log; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.astraea.app.admin.TopicPartition; +import org.junit.jupiter.api.Test; + +public class ClusterLogAllocationPerformanceTest { + + @Test + void test() { + var count = 10000; + var allocation = + ClusterLogAllocation.of( + IntStream.range(0, count) + .boxed() + .collect( + Collectors.toMap( + i -> new TopicPartition("topic", i), + i -> List.of(LogPlacement.of(i, "/tmp/data-" + i))))); + + var start = System.currentTimeMillis(); + var current = allocation; + for (var i = 0; i != count; ++i) { + current = current.migrateReplica(new TopicPartition("topic", i), i, i + 1); + } + + var elapsed = System.currentTimeMillis() - start; + System.out.println("[CHIA] elapsed: " + elapsed); + } +} From b6b4f5c162a48112197ff74a29d93322bdefa69c Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Thu, 14 Jul 2022 01:55:36 +0800 Subject: [PATCH 3/6] fix failed build --- .../generator/ShufflePlanGenerator.java | 35 ++++++++++--------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/app/src/main/java/org/astraea/app/balancer/generator/ShufflePlanGenerator.java b/app/src/main/java/org/astraea/app/balancer/generator/ShufflePlanGenerator.java index 521a03ede9..0de7f2d778 100644 --- a/app/src/main/java/org/astraea/app/balancer/generator/ShufflePlanGenerator.java +++ b/app/src/main/java/org/astraea/app/balancer/generator/ShufflePlanGenerator.java @@ -121,8 +121,10 @@ private static Function allocationGe final var sourceBroker = sourceLogPlacement.broker(); var targetPlacements = sourceLogPlacements.size() > 1 - ? sourceIsLeader ? sourceLogPlacements.stream().skip(1) : Stream.of() - : Stream.of(sourceLogPlacement); + ? sourceIsLeader + ? sourceLogPlacements.stream().skip(1).collect(Collectors.toUnmodifiableList()) + : List.of(sourceLogPlacement) + : List.of(); // generate a set of valid migration broker for given placement. final var validMigrationCandidates = Stream.concat( @@ -152,20 +154,21 @@ private static Function allocationGe }), // [Valid movement 2] add all leader/follower change // candidate - targetPlacements.map( - followerReplica -> - () -> { - rebalancePlanBuilder.addInfo( - String.format( - "Change the log identity of topic %s partition %d replica at broker %d, from %s to %s", - sourceTopicPartition.topic(), - sourceTopicPartition.partition(), - sourceLogPlacement.broker(), - sourceIsLeader ? "leader" : "follower", - sourceIsLeader ? "follower" : "leader")); - return currentAllocation.letReplicaBecomeLeader( - sourceTopicPartition, followerReplica.broker()); - })) + targetPlacements.stream() + .map( + followerReplica -> + () -> { + rebalancePlanBuilder.addInfo( + String.format( + "Change the log identity of topic %s partition %d replica at broker %d, from %s to %s", + sourceTopicPartition.topic(), + sourceTopicPartition.partition(), + sourceLogPlacement.broker(), + sourceIsLeader ? "leader" : "follower", + sourceIsLeader ? "follower" : "leader")); + return currentAllocation.letReplicaBecomeLeader( + sourceTopicPartition, followerReplica.broker()); + })) .collect(Collectors.toUnmodifiableList()); // pick a migration and execute final var selectedMigrationIndex = randomElement(validMigrationCandidates); From ec0147bb9b46737e737e2cb224957045ebc63eae Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Sat, 16 Jul 2022 22:12:51 +0800 Subject: [PATCH 4/6] address comments --- .../ClusterLogAllocationPerformanceTest.java | 48 --------------- .../log/ClusterLogAllocationTest.java | 61 +++++++++---------- 2 files changed, 28 insertions(+), 81 deletions(-) delete mode 100644 app/src/test/java/org/astraea/app/balancer/log/ClusterLogAllocationPerformanceTest.java diff --git a/app/src/test/java/org/astraea/app/balancer/log/ClusterLogAllocationPerformanceTest.java b/app/src/test/java/org/astraea/app/balancer/log/ClusterLogAllocationPerformanceTest.java deleted file mode 100644 index 3a45475ca0..0000000000 --- a/app/src/test/java/org/astraea/app/balancer/log/ClusterLogAllocationPerformanceTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.app.balancer.log; - -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import org.astraea.app.admin.TopicPartition; -import org.junit.jupiter.api.Test; - -public class ClusterLogAllocationPerformanceTest { - - @Test - void test() { - var count = 10000; - var allocation = - ClusterLogAllocation.of( - IntStream.range(0, count) - .boxed() - .collect( - Collectors.toMap( - i -> new TopicPartition("topic", i), - i -> List.of(LogPlacement.of(i, "/tmp/data-" + i))))); - - var start = System.currentTimeMillis(); - var current = allocation; - for (var i = 0; i != count; ++i) { - current = current.migrateReplica(new TopicPartition("topic", i), i, i + 1); - } - - var elapsed = System.currentTimeMillis() - start; - System.out.println("[CHIA] elapsed: " + elapsed); - } -} diff --git a/app/src/test/java/org/astraea/app/balancer/log/ClusterLogAllocationTest.java b/app/src/test/java/org/astraea/app/balancer/log/ClusterLogAllocationTest.java index 4d567dda55..0f9e926895 100644 --- a/app/src/test/java/org/astraea/app/balancer/log/ClusterLogAllocationTest.java +++ b/app/src/test/java/org/astraea/app/balancer/log/ClusterLogAllocationTest.java @@ -24,12 +24,10 @@ import org.astraea.app.admin.TopicPartition; import org.astraea.app.cost.ClusterInfoProvider; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -@Disabled class ClusterLogAllocationTest { @Test @@ -57,10 +55,10 @@ void creation() { void migrateReplica() { final var fakeClusterInfo = ClusterInfoProvider.fakeClusterInfo(3, 1, 1, 1, (i) -> Set.of("topic")); - final var clusterLogAllocation = ClusterLogAllocation.of(fakeClusterInfo); + var clusterLogAllocation = ClusterLogAllocation.of(fakeClusterInfo); final var sourceTopicPartition = TopicPartition.of("topic", "0"); - clusterLogAllocation.migrateReplica(sourceTopicPartition, 0, 1); + clusterLogAllocation = clusterLogAllocation.migrateReplica(sourceTopicPartition, 0, 1); Assertions.assertEquals( 1, clusterLogAllocation.logPlacements(sourceTopicPartition).get(0).broker()); @@ -79,10 +77,11 @@ void migrateReplica(String dataDirectory) { dataDirectory = dataDirectory.equals("null") ? null : dataDirectory; final var fakeClusterInfo = ClusterInfoProvider.fakeClusterInfo(3, 1, 1, 1, (i) -> Set.of("topic")); - final var clusterLogAllocation = ClusterLogAllocation.of(fakeClusterInfo); + var clusterLogAllocation = ClusterLogAllocation.of(fakeClusterInfo); final var sourceTopicPartition0 = TopicPartition.of("topic", "0"); - clusterLogAllocation.migrateReplica(sourceTopicPartition0, 0, 1, dataDirectory); + clusterLogAllocation = + clusterLogAllocation.migrateReplica(sourceTopicPartition0, 0, 1, dataDirectory); Assertions.assertEquals( 1, clusterLogAllocation.logPlacements(sourceTopicPartition0).get(0).broker()); @@ -95,7 +94,8 @@ void migrateReplica(String dataDirectory) { .orElse(null)); final var sourceTopicPartition1 = TopicPartition.of("topic", "0"); - clusterLogAllocation.migrateReplica(sourceTopicPartition1, 1, 1, dataDirectory); + clusterLogAllocation = + clusterLogAllocation.migrateReplica(sourceTopicPartition1, 1, 1, dataDirectory); Assertions.assertEquals( 1, clusterLogAllocation.logPlacements(sourceTopicPartition1).get(0).broker()); Assertions.assertEquals( @@ -111,10 +111,10 @@ void migrateReplica(String dataDirectory) { void letReplicaBecomeLeader() { final var fakeClusterInfo = ClusterInfoProvider.fakeClusterInfo(3, 1, 1, 2, (i) -> Set.of("topic")); - final var clusterLogAllocation = ClusterLogAllocation.of(fakeClusterInfo); + var clusterLogAllocation = ClusterLogAllocation.of(fakeClusterInfo); final var sourceTopicPartition = TopicPartition.of("topic", "0"); - clusterLogAllocation.letReplicaBecomeLeader(sourceTopicPartition, 1); + clusterLogAllocation = clusterLogAllocation.letReplicaBecomeLeader(sourceTopicPartition, 1); Assertions.assertEquals( 1, clusterLogAllocation.logPlacements(sourceTopicPartition).get(0).broker()); @@ -145,17 +145,17 @@ void logPlacements() { @Test void topicPartitionStream() { final var fakeClusterInfo = ClusterInfoProvider.fakeClusterInfo(10, 10, 10, 3); - final var allocation0 = ClusterLogAllocation.of(fakeClusterInfo); - final var allocation1 = allocation0; - fakeClusterInfo.topics().stream() - .flatMap(x -> fakeClusterInfo.replicas(x).stream()) - .forEach( - replica -> - allocation1.letReplicaBecomeLeader( - TopicPartition.of(replica.topic(), Integer.toString(replica.partition())), 1)); + var allocation = ClusterLogAllocation.of(fakeClusterInfo); + for (var replica : + fakeClusterInfo.topics().stream() + .flatMap(x -> fakeClusterInfo.replicas(x).stream()) + .collect(Collectors.toUnmodifiableList())) + allocation = + allocation.letReplicaBecomeLeader( + TopicPartition.of(replica.topic(), Integer.toString(replica.partition())), 1); final var allTopicPartitions = - allocation1.topicPartitions().stream() + allocation.topicPartitions().stream() .sorted( Comparator.comparing(TopicPartition::topic) .thenComparing(TopicPartition::partition)) @@ -184,8 +184,6 @@ void lockWorks() { Assertions.assertDoesNotThrow(() -> allocation.migrateReplica(toModify, 1, 8, "somewhere")); Assertions.assertDoesNotThrow(() -> allocation.letReplicaBecomeLeader(toModify, 2)); - final var extended = allocation; - // cannot modify after some other layer rely on it Assertions.assertThrows( IllegalStateException.class, () -> allocation.migrateReplica(toModify, 9, 0)); @@ -195,9 +193,9 @@ void lockWorks() { IllegalStateException.class, () -> allocation.letReplicaBecomeLeader(toModify, 0)); // the extended one can modify - Assertions.assertDoesNotThrow(() -> extended.migrateReplica(toModify, 9, 0)); - Assertions.assertDoesNotThrow(() -> extended.migrateReplica(toModify, 8, 1, "somewhere")); - Assertions.assertDoesNotThrow(() -> extended.letReplicaBecomeLeader(toModify, 0)); + Assertions.assertDoesNotThrow(() -> allocation.migrateReplica(toModify, 9, 0)); + Assertions.assertDoesNotThrow(() -> allocation.migrateReplica(toModify, 8, 1, "somewhere")); + Assertions.assertDoesNotThrow(() -> allocation.letReplicaBecomeLeader(toModify, 0)); } @Test @@ -211,28 +209,25 @@ void findNonFulfilledAllocation() { final var oneTopicPartition = source.topicPartitions().iterator().next(); final var twoTopicPartition = source.topicPartitions().iterator().next(); - final var target0 = a; - target0.migrateReplica(oneTopicPartition, 0, 0, "/somewhere"); + final var target0 = a.migrateReplica(oneTopicPartition, 0, 0, "/somewhere"); Assertions.assertEquals( Set.of(oneTopicPartition), ClusterLogAllocation.findNonFulfilledAllocation(source, target0)); - final var target1 = a; - target1.migrateReplica(oneTopicPartition, 0, 2); + final var target1 = a.migrateReplica(oneTopicPartition, 0, 2); Assertions.assertEquals( Set.of(oneTopicPartition), ClusterLogAllocation.findNonFulfilledAllocation(source, target1)); - final var target2 = a; - target2.letReplicaBecomeLeader(oneTopicPartition, 1); + final var target2 = a.letReplicaBecomeLeader(oneTopicPartition, 1); Assertions.assertEquals( Set.of(oneTopicPartition), ClusterLogAllocation.findNonFulfilledAllocation(source, target2)); - final var target3 = a; - target3.migrateReplica(oneTopicPartition, 0, 2); - target3.migrateReplica(oneTopicPartition, 2, 2, "/somewhere"); - target3.letReplicaBecomeLeader(twoTopicPartition, 1); + final var target3 = + a.migrateReplica(oneTopicPartition, 0, 2) + .migrateReplica(oneTopicPartition, 2, 2, "/somewhere") + .letReplicaBecomeLeader(twoTopicPartition, 1); Assertions.assertEquals( Set.of(oneTopicPartition, twoTopicPartition), ClusterLogAllocation.findNonFulfilledAllocation(source, target3)); From d19d4f841f39d269f0d02144cd3d88910bc8f101 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Sat, 16 Jul 2022 22:43:06 +0800 Subject: [PATCH 5/6] fix failed tests --- .../balancer/log/ClusterLogAllocation.java | 24 ++++++++++ .../log/ClusterLogAllocationTest.java | 47 +++++-------------- 2 files changed, 36 insertions(+), 35 deletions(-) diff --git a/app/src/main/java/org/astraea/app/balancer/log/ClusterLogAllocation.java b/app/src/main/java/org/astraea/app/balancer/log/ClusterLogAllocation.java index 60e6275739..e0bfca349e 100644 --- a/app/src/main/java/org/astraea/app/balancer/log/ClusterLogAllocation.java +++ b/app/src/main/java/org/astraea/app/balancer/log/ClusterLogAllocation.java @@ -159,6 +159,30 @@ class ClusterLogAllocationImpl implements ClusterLogAllocation { private ClusterLogAllocationImpl(Map> allocation) { this.allocation = Collections.unmodifiableMap(allocation); + + this.allocation.keySet().stream() + .collect(Collectors.groupingBy(TopicPartition::topic)) + .forEach( + (topic, tp) -> { + int maxPartitionId = + tp.stream().mapToInt(TopicPartition::partition).max().orElseThrow(); + if ((maxPartitionId + 1) != tp.size()) + throw new IllegalArgumentException( + "The partition size of " + topic + " is illegal"); + }); + + this.allocation.forEach( + (tp, logs) -> { + long uniqueBrokers = logs.stream().map(LogPlacement::broker).distinct().count(); + if (uniqueBrokers != logs.size() || logs.size() == 0) + throw new IllegalArgumentException( + "The topic " + + tp.topic() + + " partition " + + tp.partition() + + " has illegal replica set " + + logs); + }); } @Override diff --git a/app/src/test/java/org/astraea/app/balancer/log/ClusterLogAllocationTest.java b/app/src/test/java/org/astraea/app/balancer/log/ClusterLogAllocationTest.java index 0f9e926895..3055f9ebe9 100644 --- a/app/src/test/java/org/astraea/app/balancer/log/ClusterLogAllocationTest.java +++ b/app/src/test/java/org/astraea/app/balancer/log/ClusterLogAllocationTest.java @@ -138,7 +138,7 @@ void logPlacements() { .get(0) .logDirectory() .orElseThrow()); - Assertions.assertNull(allocation.logPlacements(TopicPartition.of("no", "0"))); + Assertions.assertEquals(0, allocation.logPlacements(TopicPartition.of("no", "0")).size()); allocation.logPlacements(TopicPartition.of("no", "0")); } @@ -173,59 +173,36 @@ void topicPartitionStream() { Assertions.assertEquals(expectedTopicPartitions, allTopicPartitions); } - @Test - void lockWorks() { - final var fakeClusterInfo = ClusterInfoProvider.fakeClusterInfo(10, 10, 10, 3); - final var allocation = ClusterLogAllocation.of(fakeClusterInfo); - final var toModify = allocation.topicPartitions().iterator().next(); - - // can modify before lock - Assertions.assertDoesNotThrow(() -> allocation.migrateReplica(toModify, 0, 9)); - Assertions.assertDoesNotThrow(() -> allocation.migrateReplica(toModify, 1, 8, "somewhere")); - Assertions.assertDoesNotThrow(() -> allocation.letReplicaBecomeLeader(toModify, 2)); - - // cannot modify after some other layer rely on it - Assertions.assertThrows( - IllegalStateException.class, () -> allocation.migrateReplica(toModify, 9, 0)); - Assertions.assertThrows( - IllegalStateException.class, () -> allocation.migrateReplica(toModify, 0, 9, "somewhere")); - Assertions.assertThrows( - IllegalStateException.class, () -> allocation.letReplicaBecomeLeader(toModify, 0)); - - // the extended one can modify - Assertions.assertDoesNotThrow(() -> allocation.migrateReplica(toModify, 9, 0)); - Assertions.assertDoesNotThrow(() -> allocation.migrateReplica(toModify, 8, 1, "somewhere")); - Assertions.assertDoesNotThrow(() -> allocation.letReplicaBecomeLeader(toModify, 0)); - } - @Test void findNonFulfilledAllocation() { final var clusterInfo = ClusterInfoProvider.fakeClusterInfo(3, 10, 10, 2); - final var a = ClusterLogAllocation.of(clusterInfo); - final var b = ClusterLogAllocation.of(clusterInfo); - Assertions.assertEquals(Set.of(), ClusterLogAllocation.findNonFulfilledAllocation(a, b)); + Assertions.assertEquals( + Set.of(), + ClusterLogAllocation.findNonFulfilledAllocation( + ClusterLogAllocation.of(clusterInfo), ClusterLogAllocation.of(clusterInfo))); final var source = ClusterLogAllocation.of(clusterInfo); final var oneTopicPartition = source.topicPartitions().iterator().next(); - final var twoTopicPartition = source.topicPartitions().iterator().next(); + final var twoTopicPartition = source.topicPartitions().stream().skip(1).findFirst().get(); - final var target0 = a.migrateReplica(oneTopicPartition, 0, 0, "/somewhere"); + final var target0 = source.migrateReplica(oneTopicPartition, 0, 0, "/somewhere"); Assertions.assertEquals( Set.of(oneTopicPartition), ClusterLogAllocation.findNonFulfilledAllocation(source, target0)); - final var target1 = a.migrateReplica(oneTopicPartition, 0, 2); + final var target1 = source.migrateReplica(oneTopicPartition, 0, 2); Assertions.assertEquals( Set.of(oneTopicPartition), ClusterLogAllocation.findNonFulfilledAllocation(source, target1)); - final var target2 = a.letReplicaBecomeLeader(oneTopicPartition, 1); + final var target2 = source.letReplicaBecomeLeader(oneTopicPartition, 1); Assertions.assertEquals( Set.of(oneTopicPartition), ClusterLogAllocation.findNonFulfilledAllocation(source, target2)); final var target3 = - a.migrateReplica(oneTopicPartition, 0, 2) + source + .migrateReplica(oneTopicPartition, 0, 2) .migrateReplica(oneTopicPartition, 2, 2, "/somewhere") .letReplicaBecomeLeader(twoTopicPartition, 1); Assertions.assertEquals( @@ -233,7 +210,7 @@ void findNonFulfilledAllocation() { ClusterLogAllocation.findNonFulfilledAllocation(source, target3)); final var map4 = - a.topicPartitions().stream().collect(Collectors.toMap(x -> x, a::logPlacements)); + source.topicPartitions().stream().collect(Collectors.toMap(x -> x, source::logPlacements)); map4.put(new TopicPartition("NewTopic", 0), List.of(LogPlacement.of(0, "?"))); final var target4 = ClusterLogAllocation.of(map4); Assertions.assertThrows( From b9c91e7a2ad4cdceac1e3834de7e81d8bc4e6054 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Sat, 16 Jul 2022 23:24:33 +0800 Subject: [PATCH 6/6] address comments --- .../app/balancer/log/ClusterLogAllocation.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/app/src/main/java/org/astraea/app/balancer/log/ClusterLogAllocation.java b/app/src/main/java/org/astraea/app/balancer/log/ClusterLogAllocation.java index e0bfca349e..482418dfdf 100644 --- a/app/src/main/java/org/astraea/app/balancer/log/ClusterLogAllocation.java +++ b/app/src/main/java/org/astraea/app/balancer/log/ClusterLogAllocation.java @@ -104,7 +104,12 @@ ClusterLogAllocation migrateReplica( /** let specific follower log become the leader log of this topic/partition. */ ClusterLogAllocation letReplicaBecomeLeader(TopicPartition topicPartition, int followerReplica); - /** Retrieve the log placements of specific {@link TopicPartition}. */ + /** + * Retrieve the log placements of specific {@link TopicPartition}. + * + * @param topicPartition to query + * @return log placements or empty collection if there is no log placements + */ List logPlacements(TopicPartition topicPartition); /** Retrieve the stream of all topic/partition pairs in allocation. */ @@ -189,7 +194,7 @@ private ClusterLogAllocationImpl(Map> allocat public ClusterLogAllocation migrateReplica( TopicPartition topicPartition, int atBroker, int toBroker, String toDir) { var sourceLogPlacements = this.logPlacements(topicPartition); - if (sourceLogPlacements == null) + if (sourceLogPlacements.isEmpty()) throw new IllegalMigrationException( topicPartition.topic() + "-" + topicPartition.partition() + " no such topic/partition"); @@ -215,14 +220,11 @@ public ClusterLogAllocation migrateReplica( public ClusterLogAllocation letReplicaBecomeLeader( TopicPartition topicPartition, int followerReplica) { final List sourceLogPlacements = this.logPlacements(topicPartition); - if (sourceLogPlacements == null) + if (sourceLogPlacements.isEmpty()) throw new IllegalMigrationException( topicPartition.topic() + "-" + topicPartition.partition() + " no such topic/partition"); int leaderLogIndex = 0; - if (sourceLogPlacements.size() == 0) - throw new IllegalStateException("This partition has no log"); - int followerLogIndex = indexOfBroker(sourceLogPlacements, followerReplica).orElse(-1); if (followerLogIndex == -1) throw new IllegalArgumentException(