Skip to content

Commit

Permalink
[WEB] Provide extra failure information from Balancer response (#1309)
Browse files Browse the repository at this point in the history
  • Loading branch information
garyparrot authored Dec 21, 2022
1 parent 4eccc64 commit dbd4113
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 90 deletions.
51 changes: 31 additions & 20 deletions app/src/main/java/org/astraea/app/web/BalancerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public CompletionStage<Response> post(Channel channel) {
.retryOffer(currentClusterInfo, request.executionTime));
var changes =
bestPlan
.solution()
.map(
p ->
ClusterInfo.findNonFulfilledAllocation(
Expand All @@ -187,11 +188,15 @@ public CompletionStage<Response> post(Channel channel) {
.orElse(List.of());
var report =
new Report(
bestPlan.map(p -> p.initialClusterCost().value()),
bestPlan.map(p -> p.proposalClusterCost().value()),
bestPlan.solution().isPresent(),
bestPlan.initialClusterCost().value(),
bestPlan.solution().map(p -> p.proposalClusterCost().value()),
request.algorithmConfig.clusterCostFunction().toString(),
changes,
bestPlan.map(p -> migrationCosts(p.moveCost())).orElseGet(List::of));
bestPlan
.solution()
.map(p -> migrationCosts(p.moveCost()))
.orElseGet(List::of));
return new PlanInfo(report, bestPlan);
})
.whenComplete(
Expand Down Expand Up @@ -229,10 +234,10 @@ private static List<MigrationCost> migrationCosts(MoveCost cost) {
.collect(Collectors.toList());
}

private Optional<Balancer.Plan> metricContext(
private Balancer.Plan metricContext(
Collection<Fetcher> fetchers,
Collection<MetricSensor> metricSensors,
Function<Supplier<ClusterBean>, Optional<Balancer.Plan>> execution) {
Function<Supplier<ClusterBean>, Balancer.Plan> execution) {
// TODO: use a global metric collector when we are ready to enable long-run metric sampling
// https://github.com/skiptests/astraea/pull/955#discussion_r1026491162
try (var collector = MetricCollector.builder().interval(sampleInterval).build()) {
Expand Down Expand Up @@ -407,20 +412,24 @@ public CompletionStage<Response> put(Channel channel) {
// already scheduled, nothing to do
if (executedPlans.containsKey(thePlanId))
return new PutPlanResponse(thePlanId);
// one plan at a time
if (lastExecutionId.get() != null
&& !executedPlans.get(lastExecutionId.get()).isDone())
throw new IllegalStateException(
"There is another on-going rebalance: " + lastExecutionId.get());
// the plan exists but no plan generated
if (thePlanInfo.associatedPlan.solution().isEmpty())
throw new IllegalStateException(
"The specified balancer plan didn't generate a useful plan: "
+ thePlanId);
// schedule the actual execution
thePlanInfo.associatedPlan.ifPresent(
p -> {
executedPlans.put(
thePlanId,
executor
.run(admin, p.proposal(), Duration.ofHours(1))
.toCompletableFuture());
lastExecutionId.set(thePlanId);
});
var proposedPlan = thePlanInfo.associatedPlan.solution().get();
executedPlans.put(
thePlanId,
executor
.run(admin, proposedPlan.proposal(), Duration.ofHours(1))
.toCompletableFuture());
lastExecutionId.set(thePlanId);
return new PutPlanResponse(thePlanId);
},
schedulingExecutor)
Expand Down Expand Up @@ -555,8 +564,8 @@ static class MigrationCost {
}

static class Report implements Response {
// initial cost might be unavailable due to unable to evaluate cost function
final Optional<Double> cost;
final boolean isPlanGenerated;
final double cost;

// don't generate new cost if there is no best plan
final Optional<Double> newCost;
Expand All @@ -566,12 +575,14 @@ static class Report implements Response {
final List<MigrationCost> migrationCosts;

Report(
Optional<Double> cost,
boolean isPlanGenerated,
double initialCost,
Optional<Double> newCost,
String function,
List<Change> changes,
List<MigrationCost> migrationCosts) {
this.cost = cost;
this.isPlanGenerated = isPlanGenerated;
this.cost = initialCost;
this.newCost = newCost;
this.function = function;
this.changes = changes;
Expand All @@ -581,9 +592,9 @@ static class Report implements Response {

static class PlanInfo {
private final Report report;
private final Optional<Balancer.Plan> associatedPlan;
private final Balancer.Plan associatedPlan;

PlanInfo(Report report, Optional<Balancer.Plan> associatedPlan) {
PlanInfo(Report report, Balancer.Plan associatedPlan) {
this.report = report;
this.associatedPlan = associatedPlan;
}
Expand Down
60 changes: 50 additions & 10 deletions app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public class BalancerHandlerTest extends RequireBrokerCluster {
static final String BALANCER_CONFIGURATION_KEY = "balancerConfig";
static final int TIMEOUT_DEFAULT = 3;

private static final List<BalancerHandler.CostWeight> defaultIncreasing =
List.of(costWeight(IncreasingCost.class.getName(), 1));
private static final List<BalancerHandler.CostWeight> defaultDecreasing =
List.of(costWeight(DecreasingCost.class.getName(), 1));

Expand Down Expand Up @@ -110,7 +112,7 @@ void testReport() {
var report = progress.report;
Assertions.assertNotNull(progress.id);
Assertions.assertNotEquals(0, report.changes.size());
Assertions.assertTrue(report.cost.get() >= report.newCost.get());
Assertions.assertTrue(report.cost >= report.newCost.get());
// "before" should record size
report.changes.forEach(
c ->
Expand All @@ -125,7 +127,7 @@ void testReport() {
report.changes.stream()
.flatMap(c -> c.after.stream())
.forEach(p -> Assertions.assertEquals(Optional.empty(), p.size));
Assertions.assertTrue(report.cost.get() >= report.newCost.get());
Assertions.assertTrue(report.cost >= report.newCost.get());
var sizeMigration =
report.migrationCosts.stream()
.filter(x -> x.name.equals(BalancerHandler.MOVED_SIZE))
Expand Down Expand Up @@ -160,7 +162,7 @@ void testTopics() {
report.changes.stream().map(x -> x.topic).allMatch(allowedTopics::contains),
"Only allowed topics been altered");
Assertions.assertTrue(
report.cost.get() >= report.newCost.get(),
report.cost >= report.newCost.get(),
"The proposed plan should has better score then the current one");
var sizeMigration =
report.migrationCosts.stream()
Expand Down Expand Up @@ -294,7 +296,8 @@ void testBestPlan() {
.clusterInfo(admin.topicNames(false).toCompletableFuture().join())
.toCompletableFuture()
.join(),
Duration.ofSeconds(3)));
Duration.ofSeconds(3))
.solution());

// test move cost predicate
Assertions.assertEquals(
Expand All @@ -312,7 +315,8 @@ void testBestPlan() {
.clusterInfo(admin.topicNames(false).toCompletableFuture().join())
.toCompletableFuture()
.join(),
Duration.ofSeconds(3)));
Duration.ofSeconds(3))
.solution());
}
}

Expand Down Expand Up @@ -366,8 +370,7 @@ void testNoReport() {
.post(
Channel.ofRequest(
JsonConverter.defaultConverter()
.toJson(
Map.of(BALANCER_CONFIGURATION_KEY, Map.of("iteration", "0")))))
.toJson(Map.of(COST_WEIGHT_KEY, defaultIncreasing))))
.toCompletableFuture()
.join());
Utils.sleep(Duration.ofSeconds(5));
Expand All @@ -377,8 +380,26 @@ void testNoReport() {
handler.get(Channel.ofTarget(post.id)).toCompletableFuture().join());
Assertions.assertNotNull(post.id);
Assertions.assertEquals(post.id, progress.id);
Assertions.assertFalse(progress.generated);
Assertions.assertNotNull(progress.exception);
Assertions.assertTrue(progress.generated, "Plan is calculated");
Assertions.assertTrue(progress.report.newCost.isEmpty(), "No proposal");
Assertions.assertNotNull(progress.report.function);
Assertions.assertEquals(0, progress.report.changes.size(), "No proposal");
Assertions.assertEquals(0, progress.report.migrationCosts.size(), "No proposal");
Assertions.assertNull(progress.exception, "No exception occurred during this process");
Assertions.assertInstanceOf(
IllegalStateException.class,
Assertions.assertThrows(
CompletionException.class,
() ->
handler
.put(
Channel.ofRequest(
JsonConverter.defaultConverter()
.toJson(Map.of("id", progress.id))))
.toCompletableFuture()
.join())
.getCause(),
"Cannot execute a plan with no proposal available");
}
}

Expand Down Expand Up @@ -1107,6 +1128,25 @@ public synchronized ClusterCost clusterCost(
}
}

public static class IncreasingCost implements HasClusterCost {

private ClusterInfo<Replica> original;

public IncreasingCost(Configuration configuration) {}

private double value0 = 1.0;

@Override
public synchronized ClusterCost clusterCost(
ClusterInfo<Replica> clusterInfo, ClusterBean clusterBean) {
if (original == null) original = clusterInfo;
if (ClusterInfo.findNonFulfilledAllocation(original, clusterInfo).isEmpty()) return () -> 1;
double theCost = value0;
value0 = value0 * 1.002;
return () -> theCost;
}
}

public static class FetcherAndCost extends DecreasingCost {

static AtomicReference<Consumer<ClusterBean>> callback = new AtomicReference<>();
Expand Down Expand Up @@ -1148,7 +1188,7 @@ public SpyBalancer(AlgorithmConfig algorithmConfig) {
}

@Override
public Optional<Plan> offer(ClusterInfo<Replica> currentClusterInfo, Duration timeout) {
public Plan offer(ClusterInfo<Replica> currentClusterInfo, Duration timeout) {
offerCallbacks.forEach(Runnable::run);
offerCallbacks.clear();
return super.offer(currentClusterInfo, timeout);
Expand Down
44 changes: 29 additions & 15 deletions common/src/main/java/org/astraea/common/balancer/Balancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface Balancer {
* Execute {@link Balancer#offer(ClusterInfo, Duration)}. Retry the plan generation if a {@link
* NoSufficientMetricsException} exception occurred.
*/
default Optional<Plan> retryOffer(ClusterInfo<Replica> currentClusterInfo, Duration timeout) {
default Plan retryOffer(ClusterInfo<Replica> currentClusterInfo, Duration timeout) {
final var timeoutMs = System.currentTimeMillis() + timeout.toMillis();
while (System.currentTimeMillis() < timeoutMs) {
try {
Expand Down Expand Up @@ -65,7 +65,7 @@ default Optional<Plan> retryOffer(ClusterInfo<Replica> currentClusterInfo, Durat
/**
* @return a rebalance plan
*/
Optional<Plan> offer(ClusterInfo<Replica> currentClusterInfo, Duration timeout);
Plan offer(ClusterInfo<Replica> currentClusterInfo, Duration timeout);

@SuppressWarnings("unchecked")
static Balancer create(String classpath, AlgorithmConfig config) {
Expand Down Expand Up @@ -95,14 +95,8 @@ static <T extends Balancer> T create(Class<T> balancerClass, AlgorithmConfig con
}

class Plan {
final ClusterInfo<Replica> proposal;
final ClusterCost initialClusterCost;
final ClusterCost proposalClusterCost;
final MoveCost moveCost;

public ClusterInfo<Replica> proposal() {
return proposal;
}
final Solution solution;

/**
* The {@link ClusterCost} score of the original {@link ClusterInfo} when this plan is start
Expand All @@ -112,6 +106,30 @@ public ClusterCost initialClusterCost() {
return initialClusterCost;
}

public Optional<Solution> solution() {
return Optional.ofNullable(solution);
}

public Plan(ClusterCost initialClusterCost) {
this(initialClusterCost, null);
}

public Plan(ClusterCost initialClusterCost, Solution solution) {
this.initialClusterCost = initialClusterCost;
this.solution = solution;
}
}

class Solution {

final ClusterInfo<Replica> proposal;
final ClusterCost proposalClusterCost;
final MoveCost moveCost;

public ClusterInfo<Replica> proposal() {
return proposal;
}

/** The {@link ClusterCost} score of the proposed new allocation. */
public ClusterCost proposalClusterCost() {
return proposalClusterCost;
Expand All @@ -121,13 +139,9 @@ public MoveCost moveCost() {
return moveCost;
}

public Plan(
ClusterInfo<Replica> proposal,
ClusterCost initialClusterCost,
ClusterCost proposalClusterCost,
MoveCost moveCost) {
public Solution(
ClusterCost proposalClusterCost, MoveCost moveCost, ClusterInfo<Replica> proposal) {
this.proposal = proposal;
this.initialClusterCost = initialClusterCost;
this.proposalClusterCost = proposalClusterCost;
this.moveCost = moveCost;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public GreedyBalancer(AlgorithmConfig algorithmConfig) {
}

@Override
public Optional<Plan> offer(ClusterInfo<Replica> currentClusterInfo, Duration timeout) {
public Plan offer(ClusterInfo<Replica> currentClusterInfo, Duration timeout) {
final var allocationTweaker = new ShuffleTweaker(minStep, maxStep);
final var metrics = config.metricSource().get();
final var clusterCostFunction = config.clusterCostFunction();
Expand All @@ -90,7 +90,7 @@ public Optional<Plan> offer(ClusterInfo<Replica> currentClusterInfo, Duration ti
final var executionTime = timeout.toMillis();
Supplier<Boolean> moreRoom =
() -> System.currentTimeMillis() - start < executionTime && loop.getAndDecrement() > 0;
BiFunction<ClusterInfo<Replica>, ClusterCost, Optional<Balancer.Plan>> next =
BiFunction<ClusterInfo<Replica>, ClusterCost, Optional<Solution>> next =
(currentAllocation, currentCost) ->
allocationTweaker
.generate(currentAllocation)
Expand All @@ -99,11 +99,10 @@ public Optional<Plan> offer(ClusterInfo<Replica> currentClusterInfo, Duration ti
newAllocation -> {
var newClusterInfo =
ClusterInfo.update(currentClusterInfo, newAllocation::replicas);
return new Balancer.Plan(
newAllocation,
initialCost,
return new Solution(
clusterCostFunction.clusterCost(newClusterInfo, metrics),
moveCostFunction.moveCost(currentClusterInfo, newClusterInfo, metrics));
moveCostFunction.moveCost(currentClusterInfo, newClusterInfo, metrics),
newAllocation);
})
.filter(
plan ->
Expand All @@ -112,7 +111,7 @@ public Optional<Plan> offer(ClusterInfo<Replica> currentClusterInfo, Duration ti
.findFirst();
var currentCost = initialCost;
var currentAllocation = ClusterInfo.masked(currentClusterInfo, config.topicFilter());
var currentPlan = Optional.<Balancer.Plan>empty();
var currentSolution = Optional.<Solution>empty();

// register JMX
var currentIteration = new LongAdder();
Expand All @@ -132,10 +131,10 @@ public Optional<Plan> offer(ClusterInfo<Replica> currentClusterInfo, Duration ti
currentMinCost.accumulate(currentCost.value());
var newPlan = next.apply(currentAllocation, currentCost);
if (newPlan.isEmpty()) break;
currentPlan = newPlan;
currentCost = currentPlan.get().proposalClusterCost();
currentAllocation = currentPlan.get().proposal();
currentSolution = newPlan;
currentCost = currentSolution.get().proposalClusterCost();
currentAllocation = currentSolution.get().proposal();
}
return currentPlan;
return new Plan(initialCost, currentSolution.orElse(null));
}
}
Loading

0 comments on commit dbd4113

Please sign in to comment.