Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-9627. Reset RaftPeer priorities after transfer leadership #5549

Merged
merged 4 commits into from
Nov 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
Expand All @@ -52,6 +53,7 @@
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
Expand Down Expand Up @@ -88,6 +90,11 @@ public final class RatisHelper {
private static final RaftGroup EMPTY_GROUP = RaftGroup.valueOf(DUMMY_GROUP_ID,
Collections.emptyList());

// Used for OM/SCM HA transfer leadership
@VisibleForTesting
public static final int NEUTRAL_PRIORITY = 0;
private static final int HIGHER_PRIORITY = 1;

private RatisHelper() {
}

Expand Down Expand Up @@ -491,60 +498,98 @@ public static void debug(ByteBuf buf, String name, Logger log) {
log.debug("{}: {}\n {}", name, buf, builder);
}

static RaftPeer newRaftPeer(RaftPeer peer, RaftPeerId target) {
final int priority = peer.getId().equals(target) ?
HIGHER_PRIORITY : NEUTRAL_PRIORITY;
return RaftPeer.newBuilder(peer).setPriority(priority).build();
}


/**
* Use raft client to send admin request, transfer the leadership.
* 1. Set priority and send setConfiguration request
* 2. Trigger transferLeadership API.
*
* @param raftGroup the Raft group
* @param group the Raft group
* @param targetPeerId the target expected leader
*/
public static void transferRatisLeadership(ConfigurationSource conf,
RaftGroup raftGroup, RaftPeerId targetPeerId, GrpcTlsConfig tlsConfig)
RaftGroup group, RaftPeerId targetPeerId, GrpcTlsConfig tlsConfig)
throws IOException {
// TODO: need a common raft client related conf.
try (RaftClient raftClient = newRaftClient(SupportedRpcType.GRPC, null,
null, raftGroup, createRetryPolicy(conf), tlsConfig, conf)) {
if (raftGroup.getPeer(targetPeerId) == null) {
throw new IOException("Cannot choose the target leader. The expected " +
"leader RaftPeerId is " + targetPeerId + " and the peers are " +
raftGroup.getPeers().stream().map(RaftPeer::getId)
.collect(Collectors.toList()) + ".");
if (group.getPeer(targetPeerId) == null) {
throw new IOException("Target " + targetPeerId + " not found in group "
+ group.getPeers().stream().map(RaftPeer::getId)
.collect(Collectors.toList()) + ".");
}

LOG.info("Start transferring leadership to {}", targetPeerId);
try (RaftClient client = newRaftClient(SupportedRpcType.GRPC, null,
null, group, createRetryPolicy(conf), tlsConfig, conf)) {
final GroupInfoReply info = client.getGroupManagementApi(targetPeerId)
.info(group.getGroupId());
if (!info.isSuccess()) {
throw new IOException("Failed to get info for " + group.getGroupId()
+ " from " + targetPeerId);
}
LOG.info("Chosen the targetLeaderId {} to transfer leadership",
targetPeerId);

// Set priority
List<RaftPeer> peersWithNewPriorities = new ArrayList<>();
for (RaftPeer peer : raftGroup.getPeers()) {
peersWithNewPriorities.add(
RaftPeer.newBuilder(peer)
.setPriority(peer.getId().equals(targetPeerId) ? 2 : 1)
.build()
);

final RaftGroup remote = info.getGroup();
if (!group.equals(remote)) {
throw new IOException("Group mismatched: the given group " + group
+ " and the remote group from " + targetPeerId + " are not equal."
+ "\n Given: " + group
+ "\n Remote: " + remote);
}
RaftClientReply reply;
// Set new configuration
reply = raftClient.admin().setConfiguration(peersWithNewPriorities);
if (reply.isSuccess()) {
LOG.info("Successfully set new priority for division: {}",
peersWithNewPriorities);
} else {
LOG.warn("Failed to set new priority for division: {}." +
" Ratis reply: {}", peersWithNewPriorities, reply);
throw new IOException(reply.getException());

RaftClientReply setConf = null;
try {
// Set priority
final List<RaftPeer> peersWithNewPriorities = group.getPeers().stream()
.map(peer -> newRaftPeer(peer, targetPeerId))
.collect(Collectors.toList());
// Set new configuration
setConf = client.admin().setConfiguration(peersWithNewPriorities);
if (setConf.isSuccess()) {
LOG.info("Successfully set priority: {}", peersWithNewPriorities);
} else {
throw new IOException("Failed to set priority.",
setConf.getException());
}

// Trigger the transferLeadership
final RaftClientReply reply = client.admin()
.transferLeadership(targetPeerId, 60_000);
if (reply.isSuccess()) {
LOG.info("Successfully transferred leadership to {}.", targetPeerId);
} else {
LOG.warn("Failed to transfer leadership to {}. Ratis reply: {}",
targetPeerId, reply);
throw new IOException(reply.getException());
}
} finally {
// Reset peers regardless of the result of transfer leadership
if (setConf != null && setConf.isSuccess()) {
resetPriorities(remote, client);
}
}
}
}

// Trigger the transferLeadership
reply = raftClient.admin().transferLeadership(targetPeerId, 60000);
private static void resetPriorities(RaftGroup original, RaftClient client) {
final List<RaftPeer> resetPeers = original.getPeers().stream()
.map(originalPeer -> RaftPeer.newBuilder(originalPeer)
.setPriority(NEUTRAL_PRIORITY).build())
.collect(Collectors.toList());
LOG.info("Resetting Raft peers priorities to {}", resetPeers);
try {
RaftClientReply reply = client.admin().setConfiguration(resetPeers);
if (reply.isSuccess()) {
LOG.info("Successfully transferred leadership to {}.", targetPeerId);
LOG.info("Successfully reset priorities: {}", original);
} else {
LOG.warn("Failed to transfer leadership to {}. Ratis reply: {}",
targetPeerId, reply);
throw new IOException(reply.getException());
LOG.warn("Failed to reset priorities: {}, reply: {}", original, reply);
}
} catch (IOException e) {
LOG.error("Failed to reset priorities for " + original, e);
// Not re-thrown in order to keep the main exception, if there is any.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,23 @@

import org.apache.hadoop.hdds.cli.OzoneAdmin;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;

Expand Down Expand Up @@ -96,12 +101,14 @@ public void testOmTransfer() throws Exception {
ozoneAdmin.execute(args1);
Thread.sleep(3000);
Assertions.assertEquals(newLeader, cluster.getOMLeader());
assertOMResetPriorities();

oldLeader = cluster.getOMLeader();
String[] args3 = {"om", "transfer", "-r"};
ozoneAdmin.execute(args3);
Thread.sleep(3000);
Assertions.assertNotSame(oldLeader, cluster.getOMLeader());
assertOMResetPriorities();
}

@Test
Expand All @@ -118,12 +125,45 @@ public void testScmTransfer() throws Exception {
ozoneAdmin.execute(args1);
cluster.waitForClusterToBeReady();
Assertions.assertEquals(newLeader, getScmLeader(cluster));
assertSCMResetPriorities();

oldLeader = getScmLeader(cluster);
String[] args3 = {"scm", "transfer", "-r"};
ozoneAdmin.execute(args3);
cluster.waitForClusterToBeReady();
Assertions.assertNotSame(oldLeader, getScmLeader(cluster));
assertSCMResetPriorities();
}

private void assertOMResetPriorities() throws IOException {
OzoneManagerRatisServer ratisServer = cluster.getOMLeader()
.getOmRatisServer();
RaftGroupId raftGroupId = ratisServer.getRaftGroupId();
Collection<RaftPeer> raftPeers = ratisServer
.getServer()
.getDivision(raftGroupId)
.getGroup()
.getPeers();

for (RaftPeer raftPeer: raftPeers) {
Assertions.assertEquals(RatisHelper.NEUTRAL_PRIORITY,
raftPeer.getPriority());
}
}

private void assertSCMResetPriorities() {
StorageContainerManager scm = getScmLeader(cluster);
Assertions.assertNotNull(scm);
Collection<RaftPeer> raftPeers = scm
.getScmHAManager()
.getRatisServer()
.getDivision()
.getGroup()
.getPeers();
for (RaftPeer raftPeer: raftPeers) {
Assertions.assertEquals(RatisHelper.NEUTRAL_PRIORITY,
raftPeer.getPriority());
}
}

static StorageContainerManager getScmLeader(MiniOzoneHAClusterImpl impl) {
Expand Down