Skip to content

Commit

Permalink
HDDS-9627. Reset RaftPeer priorities after transfer leadership (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
ivandika3 authored and Ivan Brusentsev committed Nov 14, 2023
1 parent 5317a5e commit 4fce597
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
Expand All @@ -52,6 +53,7 @@
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
Expand Down Expand Up @@ -88,6 +90,11 @@ public final class RatisHelper {
private static final RaftGroup EMPTY_GROUP = RaftGroup.valueOf(DUMMY_GROUP_ID,
Collections.emptyList());

// Used for OM/SCM HA transfer leadership
@VisibleForTesting
public static final int NEUTRAL_PRIORITY = 0;
private static final int HIGHER_PRIORITY = 1;

private RatisHelper() {
}

Expand Down Expand Up @@ -491,60 +498,98 @@ public static void debug(ByteBuf buf, String name, Logger log) {
log.debug("{}: {}\n {}", name, buf, builder);
}

static RaftPeer newRaftPeer(RaftPeer peer, RaftPeerId target) {
final int priority = peer.getId().equals(target) ?
HIGHER_PRIORITY : NEUTRAL_PRIORITY;
return RaftPeer.newBuilder(peer).setPriority(priority).build();
}


/**
* Use raft client to send admin request, transfer the leadership.
* 1. Set priority and send setConfiguration request
* 2. Trigger transferLeadership API.
*
* @param raftGroup the Raft group
* @param group the Raft group
* @param targetPeerId the target expected leader
*/
public static void transferRatisLeadership(ConfigurationSource conf,
RaftGroup raftGroup, RaftPeerId targetPeerId, GrpcTlsConfig tlsConfig)
RaftGroup group, RaftPeerId targetPeerId, GrpcTlsConfig tlsConfig)
throws IOException {
// TODO: need a common raft client related conf.
try (RaftClient raftClient = newRaftClient(SupportedRpcType.GRPC, null,
null, raftGroup, createRetryPolicy(conf), tlsConfig, conf)) {
if (raftGroup.getPeer(targetPeerId) == null) {
throw new IOException("Cannot choose the target leader. The expected " +
"leader RaftPeerId is " + targetPeerId + " and the peers are " +
raftGroup.getPeers().stream().map(RaftPeer::getId)
.collect(Collectors.toList()) + ".");
if (group.getPeer(targetPeerId) == null) {
throw new IOException("Target " + targetPeerId + " not found in group "
+ group.getPeers().stream().map(RaftPeer::getId)
.collect(Collectors.toList()) + ".");
}

LOG.info("Start transferring leadership to {}", targetPeerId);
try (RaftClient client = newRaftClient(SupportedRpcType.GRPC, null,
null, group, createRetryPolicy(conf), tlsConfig, conf)) {
final GroupInfoReply info = client.getGroupManagementApi(targetPeerId)
.info(group.getGroupId());
if (!info.isSuccess()) {
throw new IOException("Failed to get info for " + group.getGroupId()
+ " from " + targetPeerId);
}
LOG.info("Chosen the targetLeaderId {} to transfer leadership",
targetPeerId);

// Set priority
List<RaftPeer> peersWithNewPriorities = new ArrayList<>();
for (RaftPeer peer : raftGroup.getPeers()) {
peersWithNewPriorities.add(
RaftPeer.newBuilder(peer)
.setPriority(peer.getId().equals(targetPeerId) ? 2 : 1)
.build()
);

final RaftGroup remote = info.getGroup();
if (!group.equals(remote)) {
throw new IOException("Group mismatched: the given group " + group
+ " and the remote group from " + targetPeerId + " are not equal."
+ "\n Given: " + group
+ "\n Remote: " + remote);
}
RaftClientReply reply;
// Set new configuration
reply = raftClient.admin().setConfiguration(peersWithNewPriorities);
if (reply.isSuccess()) {
LOG.info("Successfully set new priority for division: {}",
peersWithNewPriorities);
} else {
LOG.warn("Failed to set new priority for division: {}." +
" Ratis reply: {}", peersWithNewPriorities, reply);
throw new IOException(reply.getException());

RaftClientReply setConf = null;
try {
// Set priority
final List<RaftPeer> peersWithNewPriorities = group.getPeers().stream()
.map(peer -> newRaftPeer(peer, targetPeerId))
.collect(Collectors.toList());
// Set new configuration
setConf = client.admin().setConfiguration(peersWithNewPriorities);
if (setConf.isSuccess()) {
LOG.info("Successfully set priority: {}", peersWithNewPriorities);
} else {
throw new IOException("Failed to set priority.",
setConf.getException());
}

// Trigger the transferLeadership
final RaftClientReply reply = client.admin()
.transferLeadership(targetPeerId, 60_000);
if (reply.isSuccess()) {
LOG.info("Successfully transferred leadership to {}.", targetPeerId);
} else {
LOG.warn("Failed to transfer leadership to {}. Ratis reply: {}",
targetPeerId, reply);
throw new IOException(reply.getException());
}
} finally {
// Reset peers regardless of the result of transfer leadership
if (setConf != null && setConf.isSuccess()) {
resetPriorities(remote, client);
}
}
}
}

// Trigger the transferLeadership
reply = raftClient.admin().transferLeadership(targetPeerId, 60000);
private static void resetPriorities(RaftGroup original, RaftClient client) {
final List<RaftPeer> resetPeers = original.getPeers().stream()
.map(originalPeer -> RaftPeer.newBuilder(originalPeer)
.setPriority(NEUTRAL_PRIORITY).build())
.collect(Collectors.toList());
LOG.info("Resetting Raft peers priorities to {}", resetPeers);
try {
RaftClientReply reply = client.admin().setConfiguration(resetPeers);
if (reply.isSuccess()) {
LOG.info("Successfully transferred leadership to {}.", targetPeerId);
LOG.info("Successfully reset priorities: {}", original);
} else {
LOG.warn("Failed to transfer leadership to {}. Ratis reply: {}",
targetPeerId, reply);
throw new IOException(reply.getException());
LOG.warn("Failed to reset priorities: {}, reply: {}", original, reply);
}
} catch (IOException e) {
LOG.error("Failed to reset priorities for " + original, e);
// Not re-thrown in order to keep the main exception, if there is any.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,23 @@

import org.apache.hadoop.hdds.cli.OzoneAdmin;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;

Expand Down Expand Up @@ -96,12 +101,14 @@ public void testOmTransfer() throws Exception {
ozoneAdmin.execute(args1);
Thread.sleep(3000);
Assertions.assertEquals(newLeader, cluster.getOMLeader());
assertOMResetPriorities();

oldLeader = cluster.getOMLeader();
String[] args3 = {"om", "transfer", "-r"};
ozoneAdmin.execute(args3);
Thread.sleep(3000);
Assertions.assertNotSame(oldLeader, cluster.getOMLeader());
assertOMResetPriorities();
}

@Test
Expand All @@ -118,12 +125,45 @@ public void testScmTransfer() throws Exception {
ozoneAdmin.execute(args1);
cluster.waitForClusterToBeReady();
Assertions.assertEquals(newLeader, getScmLeader(cluster));
assertSCMResetPriorities();

oldLeader = getScmLeader(cluster);
String[] args3 = {"scm", "transfer", "-r"};
ozoneAdmin.execute(args3);
cluster.waitForClusterToBeReady();
Assertions.assertNotSame(oldLeader, getScmLeader(cluster));
assertSCMResetPriorities();
}

private void assertOMResetPriorities() throws IOException {
OzoneManagerRatisServer ratisServer = cluster.getOMLeader()
.getOmRatisServer();
RaftGroupId raftGroupId = ratisServer.getRaftGroupId();
Collection<RaftPeer> raftPeers = ratisServer
.getServer()
.getDivision(raftGroupId)
.getGroup()
.getPeers();

for (RaftPeer raftPeer: raftPeers) {
Assertions.assertEquals(RatisHelper.NEUTRAL_PRIORITY,
raftPeer.getPriority());
}
}

private void assertSCMResetPriorities() {
StorageContainerManager scm = getScmLeader(cluster);
Assertions.assertNotNull(scm);
Collection<RaftPeer> raftPeers = scm
.getScmHAManager()
.getRatisServer()
.getDivision()
.getGroup()
.getPeers();
for (RaftPeer raftPeer: raftPeers) {
Assertions.assertEquals(RatisHelper.NEUTRAL_PRIORITY,
raftPeer.getPriority());
}
}

static StorageContainerManager getScmLeader(MiniOzoneHAClusterImpl impl) {
Expand Down

0 comments on commit 4fce597

Please sign in to comment.