diff --git a/app/src/main/java/org/astraea/app/web/BalancerHandler.java b/app/src/main/java/org/astraea/app/web/BalancerHandler.java index 4094f7447e..f0d183cc77 100644 --- a/app/src/main/java/org/astraea/app/web/BalancerHandler.java +++ b/app/src/main/java/org/astraea/app/web/BalancerHandler.java @@ -163,6 +163,7 @@ public CompletionStage post(Channel channel) { .retryOffer(currentClusterInfo, request.executionTime)); var changes = bestPlan + .solution() .map( p -> ClusterInfo.findNonFulfilledAllocation( @@ -187,11 +188,15 @@ public CompletionStage post(Channel channel) { .orElse(List.of()); var report = new Report( - bestPlan.map(p -> p.initialClusterCost().value()), - bestPlan.map(p -> p.proposalClusterCost().value()), + bestPlan.solution().isPresent(), + bestPlan.initialClusterCost().value(), + bestPlan.solution().map(p -> p.proposalClusterCost().value()), request.algorithmConfig.clusterCostFunction().toString(), changes, - bestPlan.map(p -> migrationCosts(p.moveCost())).orElseGet(List::of)); + bestPlan + .solution() + .map(p -> migrationCosts(p.moveCost())) + .orElseGet(List::of)); return new PlanInfo(report, bestPlan); }) .whenComplete( @@ -229,10 +234,10 @@ private static List migrationCosts(MoveCost cost) { .collect(Collectors.toList()); } - private Optional metricContext( + private Balancer.Plan metricContext( Collection fetchers, Collection metricSensors, - Function, Optional> execution) { + Function, Balancer.Plan> execution) { // TODO: use a global metric collector when we are ready to enable long-run metric sampling // https://github.com/skiptests/astraea/pull/955#discussion_r1026491162 try (var collector = MetricCollector.builder().interval(sampleInterval).build()) { @@ -407,20 +412,24 @@ public CompletionStage put(Channel channel) { // already scheduled, nothing to do if (executedPlans.containsKey(thePlanId)) return new PutPlanResponse(thePlanId); + // one plan at a time if (lastExecutionId.get() != null && !executedPlans.get(lastExecutionId.get()).isDone()) throw new IllegalStateException( "There is another on-going rebalance: " + lastExecutionId.get()); + // the plan exists but no plan generated + if (thePlanInfo.associatedPlan.solution().isEmpty()) + throw new IllegalStateException( + "The specified balancer plan didn't generate a useful plan: " + + thePlanId); // schedule the actual execution - thePlanInfo.associatedPlan.ifPresent( - p -> { - executedPlans.put( - thePlanId, - executor - .run(admin, p.proposal(), Duration.ofHours(1)) - .toCompletableFuture()); - lastExecutionId.set(thePlanId); - }); + var proposedPlan = thePlanInfo.associatedPlan.solution().get(); + executedPlans.put( + thePlanId, + executor + .run(admin, proposedPlan.proposal(), Duration.ofHours(1)) + .toCompletableFuture()); + lastExecutionId.set(thePlanId); return new PutPlanResponse(thePlanId); }, schedulingExecutor) @@ -555,8 +564,8 @@ static class MigrationCost { } static class Report implements Response { - // initial cost might be unavailable due to unable to evaluate cost function - final Optional cost; + final boolean isPlanGenerated; + final double cost; // don't generate new cost if there is no best plan final Optional newCost; @@ -566,12 +575,14 @@ static class Report implements Response { final List migrationCosts; Report( - Optional cost, + boolean isPlanGenerated, + double initialCost, Optional newCost, String function, List changes, List migrationCosts) { - this.cost = cost; + this.isPlanGenerated = isPlanGenerated; + this.cost = initialCost; this.newCost = newCost; this.function = function; this.changes = changes; @@ -581,9 +592,9 @@ static class Report implements Response { static class PlanInfo { private final Report report; - private final Optional associatedPlan; + private final Balancer.Plan associatedPlan; - PlanInfo(Report report, Optional associatedPlan) { + PlanInfo(Report report, Balancer.Plan associatedPlan) { this.report = report; this.associatedPlan = associatedPlan; } diff --git a/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java b/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java index 1d713e7df6..9647808eee 100644 --- a/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java @@ -83,6 +83,8 @@ public class BalancerHandlerTest extends RequireBrokerCluster { static final String BALANCER_CONFIGURATION_KEY = "balancerConfig"; static final int TIMEOUT_DEFAULT = 3; + private static final List defaultIncreasing = + List.of(costWeight(IncreasingCost.class.getName(), 1)); private static final List defaultDecreasing = List.of(costWeight(DecreasingCost.class.getName(), 1)); @@ -110,7 +112,7 @@ void testReport() { var report = progress.report; Assertions.assertNotNull(progress.id); Assertions.assertNotEquals(0, report.changes.size()); - Assertions.assertTrue(report.cost.get() >= report.newCost.get()); + Assertions.assertTrue(report.cost >= report.newCost.get()); // "before" should record size report.changes.forEach( c -> @@ -125,7 +127,7 @@ void testReport() { report.changes.stream() .flatMap(c -> c.after.stream()) .forEach(p -> Assertions.assertEquals(Optional.empty(), p.size)); - Assertions.assertTrue(report.cost.get() >= report.newCost.get()); + Assertions.assertTrue(report.cost >= report.newCost.get()); var sizeMigration = report.migrationCosts.stream() .filter(x -> x.name.equals(BalancerHandler.MOVED_SIZE)) @@ -160,7 +162,7 @@ void testTopics() { report.changes.stream().map(x -> x.topic).allMatch(allowedTopics::contains), "Only allowed topics been altered"); Assertions.assertTrue( - report.cost.get() >= report.newCost.get(), + report.cost >= report.newCost.get(), "The proposed plan should has better score then the current one"); var sizeMigration = report.migrationCosts.stream() @@ -294,7 +296,8 @@ void testBestPlan() { .clusterInfo(admin.topicNames(false).toCompletableFuture().join()) .toCompletableFuture() .join(), - Duration.ofSeconds(3))); + Duration.ofSeconds(3)) + .solution()); // test move cost predicate Assertions.assertEquals( @@ -312,7 +315,8 @@ void testBestPlan() { .clusterInfo(admin.topicNames(false).toCompletableFuture().join()) .toCompletableFuture() .join(), - Duration.ofSeconds(3))); + Duration.ofSeconds(3)) + .solution()); } } @@ -366,8 +370,7 @@ void testNoReport() { .post( Channel.ofRequest( JsonConverter.defaultConverter() - .toJson( - Map.of(BALANCER_CONFIGURATION_KEY, Map.of("iteration", "0"))))) + .toJson(Map.of(COST_WEIGHT_KEY, defaultIncreasing)))) .toCompletableFuture() .join()); Utils.sleep(Duration.ofSeconds(5)); @@ -377,8 +380,26 @@ void testNoReport() { handler.get(Channel.ofTarget(post.id)).toCompletableFuture().join()); Assertions.assertNotNull(post.id); Assertions.assertEquals(post.id, progress.id); - Assertions.assertFalse(progress.generated); - Assertions.assertNotNull(progress.exception); + Assertions.assertTrue(progress.generated, "Plan is calculated"); + Assertions.assertTrue(progress.report.newCost.isEmpty(), "No proposal"); + Assertions.assertNotNull(progress.report.function); + Assertions.assertEquals(0, progress.report.changes.size(), "No proposal"); + Assertions.assertEquals(0, progress.report.migrationCosts.size(), "No proposal"); + Assertions.assertNull(progress.exception, "No exception occurred during this process"); + Assertions.assertInstanceOf( + IllegalStateException.class, + Assertions.assertThrows( + CompletionException.class, + () -> + handler + .put( + Channel.ofRequest( + JsonConverter.defaultConverter() + .toJson(Map.of("id", progress.id)))) + .toCompletableFuture() + .join()) + .getCause(), + "Cannot execute a plan with no proposal available"); } } @@ -1107,6 +1128,25 @@ public synchronized ClusterCost clusterCost( } } + public static class IncreasingCost implements HasClusterCost { + + private ClusterInfo original; + + public IncreasingCost(Configuration configuration) {} + + private double value0 = 1.0; + + @Override + public synchronized ClusterCost clusterCost( + ClusterInfo clusterInfo, ClusterBean clusterBean) { + if (original == null) original = clusterInfo; + if (ClusterInfo.findNonFulfilledAllocation(original, clusterInfo).isEmpty()) return () -> 1; + double theCost = value0; + value0 = value0 * 1.002; + return () -> theCost; + } + } + public static class FetcherAndCost extends DecreasingCost { static AtomicReference> callback = new AtomicReference<>(); @@ -1148,7 +1188,7 @@ public SpyBalancer(AlgorithmConfig algorithmConfig) { } @Override - public Optional offer(ClusterInfo currentClusterInfo, Duration timeout) { + public Plan offer(ClusterInfo currentClusterInfo, Duration timeout) { offerCallbacks.forEach(Runnable::run); offerCallbacks.clear(); return super.offer(currentClusterInfo, timeout); diff --git a/common/src/main/java/org/astraea/common/balancer/Balancer.java b/common/src/main/java/org/astraea/common/balancer/Balancer.java index 2a55fc9c33..5eb1d2ce5c 100644 --- a/common/src/main/java/org/astraea/common/balancer/Balancer.java +++ b/common/src/main/java/org/astraea/common/balancer/Balancer.java @@ -35,7 +35,7 @@ public interface Balancer { * Execute {@link Balancer#offer(ClusterInfo, Duration)}. Retry the plan generation if a {@link * NoSufficientMetricsException} exception occurred. */ - default Optional retryOffer(ClusterInfo currentClusterInfo, Duration timeout) { + default Plan retryOffer(ClusterInfo currentClusterInfo, Duration timeout) { final var timeoutMs = System.currentTimeMillis() + timeout.toMillis(); while (System.currentTimeMillis() < timeoutMs) { try { @@ -65,7 +65,7 @@ default Optional retryOffer(ClusterInfo currentClusterInfo, Durat /** * @return a rebalance plan */ - Optional offer(ClusterInfo currentClusterInfo, Duration timeout); + Plan offer(ClusterInfo currentClusterInfo, Duration timeout); @SuppressWarnings("unchecked") static Balancer create(String classpath, AlgorithmConfig config) { @@ -95,14 +95,8 @@ static T create(Class balancerClass, AlgorithmConfig con } class Plan { - final ClusterInfo proposal; final ClusterCost initialClusterCost; - final ClusterCost proposalClusterCost; - final MoveCost moveCost; - - public ClusterInfo proposal() { - return proposal; - } + final Solution solution; /** * The {@link ClusterCost} score of the original {@link ClusterInfo} when this plan is start @@ -112,6 +106,30 @@ public ClusterCost initialClusterCost() { return initialClusterCost; } + public Optional solution() { + return Optional.ofNullable(solution); + } + + public Plan(ClusterCost initialClusterCost) { + this(initialClusterCost, null); + } + + public Plan(ClusterCost initialClusterCost, Solution solution) { + this.initialClusterCost = initialClusterCost; + this.solution = solution; + } + } + + class Solution { + + final ClusterInfo proposal; + final ClusterCost proposalClusterCost; + final MoveCost moveCost; + + public ClusterInfo proposal() { + return proposal; + } + /** The {@link ClusterCost} score of the proposed new allocation. */ public ClusterCost proposalClusterCost() { return proposalClusterCost; @@ -121,13 +139,9 @@ public MoveCost moveCost() { return moveCost; } - public Plan( - ClusterInfo proposal, - ClusterCost initialClusterCost, - ClusterCost proposalClusterCost, - MoveCost moveCost) { + public Solution( + ClusterCost proposalClusterCost, MoveCost moveCost, ClusterInfo proposal) { this.proposal = proposal; - this.initialClusterCost = initialClusterCost; this.proposalClusterCost = proposalClusterCost; this.moveCost = moveCost; } diff --git a/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java b/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java index 77497e07b8..1304f3fce1 100644 --- a/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java +++ b/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java @@ -78,7 +78,7 @@ public GreedyBalancer(AlgorithmConfig algorithmConfig) { } @Override - public Optional offer(ClusterInfo currentClusterInfo, Duration timeout) { + public Plan offer(ClusterInfo currentClusterInfo, Duration timeout) { final var allocationTweaker = new ShuffleTweaker(minStep, maxStep); final var metrics = config.metricSource().get(); final var clusterCostFunction = config.clusterCostFunction(); @@ -90,7 +90,7 @@ public Optional offer(ClusterInfo currentClusterInfo, Duration ti final var executionTime = timeout.toMillis(); Supplier moreRoom = () -> System.currentTimeMillis() - start < executionTime && loop.getAndDecrement() > 0; - BiFunction, ClusterCost, Optional> next = + BiFunction, ClusterCost, Optional> next = (currentAllocation, currentCost) -> allocationTweaker .generate(currentAllocation) @@ -99,11 +99,10 @@ public Optional offer(ClusterInfo currentClusterInfo, Duration ti newAllocation -> { var newClusterInfo = ClusterInfo.update(currentClusterInfo, newAllocation::replicas); - return new Balancer.Plan( - newAllocation, - initialCost, + return new Solution( clusterCostFunction.clusterCost(newClusterInfo, metrics), - moveCostFunction.moveCost(currentClusterInfo, newClusterInfo, metrics)); + moveCostFunction.moveCost(currentClusterInfo, newClusterInfo, metrics), + newAllocation); }) .filter( plan -> @@ -112,7 +111,7 @@ public Optional offer(ClusterInfo currentClusterInfo, Duration ti .findFirst(); var currentCost = initialCost; var currentAllocation = ClusterInfo.masked(currentClusterInfo, config.topicFilter()); - var currentPlan = Optional.empty(); + var currentSolution = Optional.empty(); // register JMX var currentIteration = new LongAdder(); @@ -132,10 +131,10 @@ public Optional offer(ClusterInfo currentClusterInfo, Duration ti currentMinCost.accumulate(currentCost.value()); var newPlan = next.apply(currentAllocation, currentCost); if (newPlan.isEmpty()) break; - currentPlan = newPlan; - currentCost = currentPlan.get().proposalClusterCost(); - currentAllocation = currentPlan.get().proposal(); + currentSolution = newPlan; + currentCost = currentSolution.get().proposalClusterCost(); + currentAllocation = currentSolution.get().proposal(); } - return currentPlan; + return new Plan(initialCost, currentSolution.orElse(null)); } } diff --git a/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java b/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java index 668d4809d7..1fd98bf441 100644 --- a/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java +++ b/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java @@ -18,7 +18,6 @@ import java.time.Duration; import java.util.Comparator; -import java.util.Optional; import java.util.Set; import java.util.TreeSet; import org.astraea.common.Utils; @@ -68,7 +67,7 @@ public SingleStepBalancer(AlgorithmConfig algorithmConfig) { } @Override - public Optional offer(ClusterInfo currentClusterInfo, Duration timeout) { + public Plan offer(ClusterInfo currentClusterInfo, Duration timeout) { final var allocationTweaker = new ShuffleTweaker(minStep, maxStep); final var currentClusterBean = config.metricSource().get(); final var clusterCostFunction = config.clusterCostFunction(); @@ -86,15 +85,15 @@ public Optional offer(ClusterInfo currentClusterInfo, Du .map( newAllocation -> { var newClusterInfo = ClusterInfo.update(currentClusterInfo, newAllocation::replicas); - return new Balancer.Plan( - newAllocation, - currentCost, + return new Solution( clusterCostFunction.clusterCost(newClusterInfo, currentClusterBean), - moveCostFunction.moveCost( - currentClusterInfo, newClusterInfo, currentClusterBean)); + moveCostFunction.moveCost(currentClusterInfo, newClusterInfo, currentClusterBean), + newAllocation); }) .filter(plan -> config.clusterConstraint().test(currentCost, plan.proposalClusterCost())) .filter(plan -> config.movementConstraint().test(plan.moveCost())) - .min(Comparator.comparing(plan -> plan.proposalClusterCost().value())); + .min(Comparator.comparing(plan -> plan.proposalClusterCost().value())) + .map(solution -> new Plan(currentCost, solution)) + .orElse(new Plan(currentCost)); } } diff --git a/common/src/test/java/org/astraea/common/balancer/BalancerAlgorithmTest.java b/common/src/test/java/org/astraea/common/balancer/BalancerAlgorithmTest.java index c3f54b1e61..e7a7cc3338 100644 --- a/common/src/test/java/org/astraea/common/balancer/BalancerAlgorithmTest.java +++ b/common/src/test/java/org/astraea/common/balancer/BalancerAlgorithmTest.java @@ -93,6 +93,7 @@ void test() { .toCompletableFuture() .join(), Duration.ofSeconds(5)) + .solution() .get(); var plan = @@ -105,6 +106,7 @@ void test() { .toCompletableFuture() .join(), Duration.ofSeconds(5)) + .solution() .get(); Assertions.assertTrue( diff --git a/common/src/test/java/org/astraea/common/balancer/BalancerTest.java b/common/src/test/java/org/astraea/common/balancer/BalancerTest.java index 5065411c37..d5e2636114 100644 --- a/common/src/test/java/org/astraea/common/balancer/BalancerTest.java +++ b/common/src/test/java/org/astraea/common/balancer/BalancerTest.java @@ -19,7 +19,6 @@ import java.time.Duration; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; @@ -122,6 +121,7 @@ void testLeaderCountRebalance(Class theClass) { .toCompletableFuture() .join(), Duration.ofSeconds(10)) + .solution() .orElseThrow(); new StraightPlanExecutor() .run(admin, plan.proposal(), Duration.ofSeconds(10)) @@ -175,6 +175,7 @@ public ClusterCost clusterCost( .config(Configuration.of(Map.of("iteration", "500"))) .build()) .offer(clusterInfo, Duration.ofSeconds(3)) + .solution() .get() .proposal(); @@ -239,6 +240,7 @@ void testExecutionTime(Class theClass) { .toCompletableFuture() .join(), Duration.ofSeconds(3)) + .solution() .get() .proposal()); Utils.sleep(Duration.ofMillis(1000)); @@ -316,12 +318,12 @@ void testRetryOffer(int sampleTimeMs) { var balancer = new Balancer() { @Override - public Optional offer(ClusterInfo currentClusterInfo, Duration timeout) { + public Plan offer(ClusterInfo currentClusterInfo, Duration timeout) { if (System.currentTimeMillis() - startMs < sampleTimeMs) throw new NoSufficientMetricsException( costFunction, Duration.ofMillis(sampleTimeMs - (System.currentTimeMillis() - startMs))); - return Optional.of(new Plan(currentClusterInfo, () -> 0, () -> 0, MoveCost.EMPTY)); + return new Plan(() -> 0, new Solution(() -> 0, MoveCost.EMPTY, currentClusterInfo)); } }; @@ -342,7 +344,7 @@ void testRetryOfferTimeout() { var balancer = new Balancer() { @Override - public Optional offer(ClusterInfo currentClusterInfo, Duration timeout) { + public Plan offer(ClusterInfo currentClusterInfo, Duration timeout) { throw new NoSufficientMetricsException( costFunction, Duration.ofSeconds(999), "This will takes forever"); } diff --git a/common/src/test/java/org/astraea/common/balancer/algorithms/GreedyBalancerTest.java b/common/src/test/java/org/astraea/common/balancer/algorithms/GreedyBalancerTest.java index fd6a4a236a..c3c1ebaf17 100644 --- a/common/src/test/java/org/astraea/common/balancer/algorithms/GreedyBalancerTest.java +++ b/common/src/test/java/org/astraea/common/balancer/algorithms/GreedyBalancerTest.java @@ -69,7 +69,7 @@ void testJmx() { .forEach( run -> { var plan = balancer.offer(clusterInfo, Duration.ofMillis(300)); - Assertions.assertTrue(plan.isPresent()); + Assertions.assertTrue(plan.solution().isPresent()); var bean = Assertions.assertDoesNotThrow( () -> diff --git a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java index 311d092035..9e01031c36 100644 --- a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java @@ -161,11 +161,12 @@ void testOptimization(HasClusterCost costFunction, TestCase testcase) { .build()) .offer(testcase.clusterInfo(), Duration.ofSeconds(1)); - Assertions.assertTrue(newPlan.isPresent()); - System.out.println("Initial cost: " + newPlan.get().initialClusterCost().value()); - System.out.println("New cost: " + newPlan.get().proposalClusterCost().value()); + Assertions.assertTrue(newPlan.solution().isPresent()); + System.out.println("Initial cost: " + newPlan.initialClusterCost().value()); + System.out.println("New cost: " + newPlan.solution().get().proposalClusterCost().value()); Assertions.assertTrue( - newPlan.get().initialClusterCost().value() > newPlan.get().proposalClusterCost().value()); + newPlan.initialClusterCost().value() + > newPlan.solution().get().proposalClusterCost().value()); } @Test diff --git a/docs/web_server/web_api_balancer_chinese.md b/docs/web_server/web_api_balancer_chinese.md index 78c3706098..047af21a28 100644 --- a/docs/web_server/web_api_balancer_chinese.md +++ b/docs/web_server/web_api_balancer_chinese.md @@ -30,12 +30,11 @@ curl -X POST http://localhost:8001/balancer \ "balancer": "org.astraea.common.balancer.algorithms.GreedyBalancer", "balancerConfig": { "shuffle.tweaker.min.step": "1", - "shuffle.tweaker.max.step": "30", - "iteration": "10000" + "shuffle.tweaker.max.step": "5" }, "costWeights": [ - { "cost": "org.astraea.common.cost.ReplicaSizeCost", "weight": 3}, - { "cost": "org.astraea.common.cost.ReplicaLeaderCost", "weight": 2} + { "cost": "org.astraea.common.cost.ReplicaSizeCost", "weight": 1 }, + { "cost": "org.astraea.common.cost.ReplicaLeaderCost", "weight": 1 } ], "maxMigratedSize": "300MB", "maxMigratedLeader": "3" @@ -125,8 +124,9 @@ JSON Response 範例 1. 搜尋負載平衡計劃的過程中發生錯誤 (此情境下 `generated` 會是 `false`) 2. 執行負載平衡計劃的過程中發生錯誤 (此情境下 `scheduled` 會是 `true` 但 `done` 為 `false`) * `info`: 此負載平衡計劃的詳細資訊,如果此計劃還沒生成,則此欄位會是 `null` - * `cost`: 目前叢集的成本 (越高越不好) - * `newCost`: 評估後比較好的成本 (<= `cost`) + * `isPlanGenerated`: 表示計劃是否成功生成,如果此欄位為 `false` 代表 Balancer 實作無法找到更好的計劃 + * `cost`: 目前叢集的分數 (越高越不好) + * `newCost`: 提出的新計劃之分數,當計劃沒有成功生成時,此欄位會是 `null` * `function`: 用來評估品質的方法 * `changes`: 新的 partitions 配置 * `topic`: topic 名稱 @@ -151,9 +151,10 @@ JSON Response 範例 "scheduled": true, "done": true, "info": { + "isPlanGenerated": true, "cost": 0.04948716593053935, "newCost": 0.04948716593053935, - "function": "ReplicaLeaderCost", + "function": "WeightCompositeClusterCost[{\"org.astraea.common.cost.ReplicaSizeCost@36835e87\" weight 1.0}, {\"org.astraea.common.cost.ReplicaLeaderCost@2d87f4d3\" weight 1.0}]", "changes": [ { "topic": "__consumer_offsets", diff --git a/gui/src/main/java/org/astraea/gui/tab/BalancerNode.java b/gui/src/main/java/org/astraea/gui/tab/BalancerNode.java index 54fcb80395..50ec518cba 100644 --- a/gui/src/main/java/org/astraea/gui/tab/BalancerNode.java +++ b/gui/src/main/java/org/astraea/gui/tab/BalancerNode.java @@ -59,7 +59,7 @@ public class BalancerNode { - static final AtomicReference LAST_PLAN = new AtomicReference<>(); + static final AtomicReference LAST_PLAN = new AtomicReference<>(); static final String TOPIC_NAME_KEY = "topic"; private static final String PARTITION_KEY = "partition"; private static final String MAX_MIGRATE_LOG_SIZE = "total max migrate log size"; @@ -88,7 +88,7 @@ public String toString() { } } - static List> costResult(Balancer.Plan plan) { + static List> costResult(Balancer.Solution plan) { var map = new HashMap>(); BiConsumer> process = @@ -112,12 +112,12 @@ static List> costResult(Balancer.Plan plan) { } static List> assignmentResult( - ClusterInfo clusterInfo, Balancer.Plan plan) { - return ClusterInfo.findNonFulfilledAllocation(clusterInfo, plan.proposal()).stream() + ClusterInfo clusterInfo, Balancer.Solution solution) { + return ClusterInfo.findNonFulfilledAllocation(clusterInfo, solution.proposal()).stream() .map( tp -> { var previousAssignments = clusterInfo.replicas(tp); - var newAssignments = plan.proposal().replicas(tp); + var newAssignments = solution.proposal().replicas(tp); var result = new LinkedHashMap(); result.put(TOPIC_NAME_KEY, tp.topic()); result.put(PARTITION_KEY, tp.partition()); @@ -223,10 +223,11 @@ static TableRefresher refresher(Context context) { }) .thenApply( entry -> { - entry.getValue().ifPresent(LAST_PLAN::set); + entry.getValue().solution().ifPresent(LAST_PLAN::set); var result = entry .getValue() + .solution() .map( plan -> Map.of( diff --git a/gui/src/test/java/org/astraea/gui/tab/BalancerNodeTest.java b/gui/src/test/java/org/astraea/gui/tab/BalancerNodeTest.java index bdf4521c89..ef174e8325 100644 --- a/gui/src/test/java/org/astraea/gui/tab/BalancerNodeTest.java +++ b/gui/src/test/java/org/astraea/gui/tab/BalancerNodeTest.java @@ -156,11 +156,10 @@ void testResult() { var results = BalancerNode.assignmentResult( beforeClusterInfo, - new Balancer.Plan( - ClusterInfo.of(allNodes, afterReplicas), + new Balancer.Solution( new ReplicaLeaderCost().clusterCost(beforeClusterInfo, ClusterBean.EMPTY), - new ReplicaLeaderCost().clusterCost(beforeClusterInfo, ClusterBean.EMPTY), - MoveCost.EMPTY)); + MoveCost.EMPTY, + ClusterInfo.of(allNodes, afterReplicas))); Assertions.assertEquals(results.size(), 1); Assertions.assertEquals(results.get(0).get("topic"), topic); Assertions.assertEquals(results.get(0).get("partition"), 0);