Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify ClusterLogAllocation #474

Merged
merged 7 commits into from
Jul 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.stream.Collectors;
import org.astraea.app.admin.TopicPartition;
import org.astraea.app.balancer.log.ClusterLogAllocation;
import org.astraea.app.balancer.log.LayeredClusterLogAllocation;

/** Execute every possible migration immediately. */
public class StraightPlanExecutor implements RebalancePlanExecutor {
Expand All @@ -33,7 +32,7 @@ public StraightPlanExecutor() {}
@Override
public void run(RebalanceAdmin rebalanceAdmin, ClusterLogAllocation logAllocation) {
final var clusterInfo = rebalanceAdmin.clusterInfo();
final var currentLogAllocation = LayeredClusterLogAllocation.of(clusterInfo);
final var currentLogAllocation = ClusterLogAllocation.of(clusterInfo);
final var migrationTargets =
ClusterLogAllocation.findNonFulfilledAllocation(currentLogAllocation, logAllocation);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.astraea.app.admin.ClusterInfo;
import org.astraea.app.balancer.RebalancePlanProposal;
import org.astraea.app.balancer.log.ClusterLogAllocation;
import org.astraea.app.balancer.log.LayeredClusterLogAllocation;

/** */
public interface RebalancePlanGenerator {
Expand All @@ -35,7 +34,7 @@ public interface RebalancePlanGenerator {
* @return a {@link Stream} generating rebalance plan regarding the given {@link ClusterInfo}
*/
default Stream<RebalancePlanProposal> generate(ClusterInfo clusterInfo) {
return generate(clusterInfo, LayeredClusterLogAllocation.of(clusterInfo));
return generate(clusterInfo, ClusterLogAllocation.of(clusterInfo));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.astraea.app.admin.ClusterInfo;
import org.astraea.app.admin.NodeInfo;
import org.astraea.app.admin.TopicPartition;
import org.astraea.app.balancer.RebalancePlanProposal;
import org.astraea.app.balancer.log.ClusterLogAllocation;
import org.astraea.app.balancer.log.LayeredClusterLogAllocation;
import org.astraea.app.balancer.log.LogPlacement;

/**
Expand Down Expand Up @@ -60,21 +60,6 @@ public ShufflePlanGenerator(Supplier<Integer> numberOfShuffle) {
this.numberOfShuffle = numberOfShuffle;
}

private int sourceTopicPartitionSelector(List<TopicPartition> migrationCandidates) {
return ThreadLocalRandom.current().nextInt(0, migrationCandidates.size());
}

private int sourceLogPlacementSelector(List<LogPlacement> migrationCandidates) {
return ThreadLocalRandom.current().nextInt(0, migrationCandidates.size());
}

private <T> T randomElement(Collection<T> collection) {
return collection.stream()
.skip(ThreadLocalRandom.current().nextInt(0, collection.size()))
.findFirst()
.orElseThrow();
}

@Override
public Stream<RebalancePlanProposal> generate(
ClusterInfo clusterInfo, ClusterLogAllocation baseAllocation) {
Expand Down Expand Up @@ -105,92 +90,104 @@ public Stream<RebalancePlanProposal> generate(
.build();

final var shuffleCount = numberOfShuffle.get();
final var newAllocation = LayeredClusterLogAllocation.of(baseAllocation);
final var pickingList =
newAllocation.topicPartitionStream().collect(Collectors.toUnmodifiableList());

rebalancePlanBuilder.addInfo(
"Make " + shuffleCount + (shuffleCount > 0 ? " shuffles." : " shuffle."));
for (int i = 0; i < shuffleCount; i++) {
final var sourceTopicPartitionIndex = sourceTopicPartitionSelector(pickingList);
final var sourceTopicPartition = pickingList.get(sourceTopicPartitionIndex);
final var sourceLogPlacements = newAllocation.logPlacements(sourceTopicPartition);
final var sourceLogPlacementIndex = sourceLogPlacementSelector(sourceLogPlacements);
final var sourceLogPlacement = sourceLogPlacements.get(sourceLogPlacementIndex);
final var sourceIsLeader = sourceLogPlacementIndex == 0;
final var sourceBroker = sourceLogPlacement.broker();

Consumer<Integer> replicaSetMigration =
(targetBroker) -> {
var destDir = randomElement(clusterInfo.dataDirectories(targetBroker));
newAllocation.migrateReplica(
sourceTopicPartition, sourceBroker, targetBroker, destDir);
rebalancePlanBuilder.addInfo(
String.format(
"Change replica set of topic %s partition %d, from %d to %d at %s.",
sourceTopicPartition.topic(),
sourceTopicPartition.partition(),
sourceLogPlacement.broker(),
targetBroker,
destDir));
};
Consumer<LogPlacement> leaderFollowerMigration =
(newLeaderReplicaCandidate) -> {
newAllocation.letReplicaBecomeLeader(
sourceTopicPartition, newLeaderReplicaCandidate.broker());
rebalancePlanBuilder.addInfo(
String.format(
"Change the log identity of topic %s partition %d replica at broker %d, from %s to %s",
sourceTopicPartition.topic(),
sourceTopicPartition.partition(),
sourceLogPlacement.broker(),
sourceIsLeader ? "leader" : "follower",
sourceIsLeader ? "follower" : "leader"));
};

// generate a set of valid migration broker for given placement.
final var validMigrationCandidates = new ArrayList<Movement>();
// [Valid movement 1] add all brokers and remove all broker in current replica set
brokerIds.stream()
.filter(
broker -> sourceLogPlacements.stream().noneMatch(log -> log.broker() == broker))
.map(targetBroker -> (Runnable) () -> replicaSetMigration.accept(targetBroker))
.map(Movement::replicaSetMovement)
.forEach(validMigrationCandidates::add);
// [Valid movement 2] add all leader/follower change candidate
if (sourceLogPlacements.size() > 1) {
if (sourceIsLeader) {
// leader can migrate its identity to follower.
sourceLogPlacements.stream()
.skip(1)
.map(
followerReplica ->
(Runnable) () -> leaderFollowerMigration.accept(followerReplica))
.map(Movement::replicaSetMovement)
.forEach(validMigrationCandidates::add);
} else {
// follower can migrate its identity to leader.
validMigrationCandidates.add(
Movement.replicaSetMovement(
() -> leaderFollowerMigration.accept(sourceLogPlacement)));
}
}

// pick a migration and execute
final var selectedMigrationIndex = randomElement(validMigrationCandidates);
selectedMigrationIndex.run();
}

return rebalancePlanBuilder.withRebalancePlan(newAllocation).build();

var candidates =
IntStream.range(0, shuffleCount)
.mapToObj(i -> allocationGenerator(clusterInfo, rebalancePlanBuilder))
.collect(Collectors.toUnmodifiableList());

var currentAllocation = baseAllocation;
for (var candidate : candidates) currentAllocation = candidate.apply(currentAllocation);

return rebalancePlanBuilder.withRebalancePlan(currentAllocation).build();
});
}

/** Action to trigger upon this migration */
interface Movement extends Runnable {
static ReplicaSetMovement replicaSetMovement(Runnable runnable) {
return runnable::run;
}
private static Function<ClusterLogAllocation, ClusterLogAllocation> allocationGenerator(
ClusterInfo clusterInfo, RebalancePlanProposal.Build rebalancePlanBuilder) {
return currentAllocation -> {
var brokerIds =
clusterInfo.nodes().stream().map(NodeInfo::id).collect(Collectors.toUnmodifiableSet());
var pickingList = new ArrayList<>(currentAllocation.topicPartitions());
final var sourceTopicPartitionIndex = sourceTopicPartitionSelector(pickingList);
final var sourceTopicPartition = pickingList.get(sourceTopicPartitionIndex);
final var sourceLogPlacements = currentAllocation.logPlacements(sourceTopicPartition);
final var sourceLogPlacementIndex = sourceLogPlacementSelector(sourceLogPlacements);
final var sourceLogPlacement = sourceLogPlacements.get(sourceLogPlacementIndex);
final var sourceIsLeader = sourceLogPlacementIndex == 0;
final var sourceBroker = sourceLogPlacement.broker();
var targetPlacements =
sourceLogPlacements.size() > 1
? sourceIsLeader
? sourceLogPlacements.stream().skip(1).collect(Collectors.toUnmodifiableList())
: List.of(sourceLogPlacement)
: List.<LogPlacement>of();
// generate a set of valid migration broker for given placement.
final var validMigrationCandidates =
Stream.concat(

// [Valid movement 1] add all brokers and remove all
// broker in current replica set
brokerIds.stream()
.filter(
broker ->
sourceLogPlacements.stream().noneMatch(log -> log.broker() == broker))
.map(
targetBroker ->
(Supplier<ClusterLogAllocation>)
() -> {
var destDir =
randomElement(clusterInfo.dataDirectories(targetBroker));
rebalancePlanBuilder.addInfo(
String.format(
"Change replica set of topic %s partition %d, from %d to %d at %s.",
sourceTopicPartition.topic(),
sourceTopicPartition.partition(),
sourceLogPlacement.broker(),
targetBroker,
destDir));
return currentAllocation.migrateReplica(
sourceTopicPartition, sourceBroker, targetBroker, destDir);
}),
// [Valid movement 2] add all leader/follower change
// candidate
targetPlacements.stream()
.map(
followerReplica ->
() -> {
rebalancePlanBuilder.addInfo(
String.format(
"Change the log identity of topic %s partition %d replica at broker %d, from %s to %s",
sourceTopicPartition.topic(),
sourceTopicPartition.partition(),
sourceLogPlacement.broker(),
sourceIsLeader ? "leader" : "follower",
sourceIsLeader ? "follower" : "leader"));
return currentAllocation.letReplicaBecomeLeader(
sourceTopicPartition, followerReplica.broker());
}))
.collect(Collectors.toUnmodifiableList());
// pick a migration and execute
final var selectedMigrationIndex = randomElement(validMigrationCandidates);
return selectedMigrationIndex.get();
};
}

interface ReplicaSetMovement extends Movement {}
private static int sourceTopicPartitionSelector(List<TopicPartition> migrationCandidates) {
return ThreadLocalRandom.current().nextInt(0, migrationCandidates.size());
}

private static int sourceLogPlacementSelector(List<LogPlacement> migrationCandidates) {
return ThreadLocalRandom.current().nextInt(0, migrationCandidates.size());
}

private static <T> T randomElement(Collection<T> collection) {
return collection.stream()
.skip(ThreadLocalRandom.current().nextInt(0, collection.size()))
.findFirst()
.orElseThrow();
}
}
Loading