Skip to content

Commit

Permalink
Multi-raft-group setup rebalance without PD (#176) (#192)
Browse files Browse the repository at this point in the history
* Multi-raft-group setup rebalance without PD (#176)

* Multi-raft-group setup rebalance without PD

* Multi-raft-group setup rebalance without PD

* Multi-raft-group setup rebalance without PD

* Multi-raft-group setup rebalance without PD

* multi group rebalance

* multi group rebalance

* improve rebalance logic

* rebalance logic unittest

* rebalance logic unittest

* format rebalance imports

* (fix) minor fix for rebalance

* (fix) minor fix and more unit test

* (fix) minor fix

* (fix) rebalance refactoring

* (fix) add rebalance log

* (fix) add rebalance log
  • Loading branch information
fengjiachun authored and killme2008 committed Jun 27, 2019
1 parent b7f6cfb commit 175a6e3
Show file tree
Hide file tree
Showing 3 changed files with 272 additions and 1 deletion.
13 changes: 13 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/CliService.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.alipay.sofa.jraft;

import java.util.List;
import java.util.Map;
import java.util.Set;

import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
Expand Down Expand Up @@ -118,4 +120,15 @@ public interface CliService extends Lifecycle<CliOptions> {
* @return all alive peers of the replication group
*/
List<PeerId> getAlivePeers(final String groupId, final Configuration conf);

/**
* Balance the number of leaders.
*
* @param balanceGroupIds all raft group ids to balance
* @param conf configuration of all nodes
* @param balancedLeaderIds the result of all balanced leader ids
* @return operation status
*/
Status rebalance(final Set<String> balanceGroupIds, final Configuration conf,
final Map<String, PeerId> balancedLeaderIds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@
*/
package com.alipay.sofa.jraft.core;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -49,6 +54,7 @@
import com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponse;
import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.Utils;
import com.google.protobuf.Message;

/**
Expand Down Expand Up @@ -383,6 +389,87 @@ public List<PeerId> getAlivePeers(final String groupId, final Configuration conf
return getPeers(groupId, conf, true);
}

@Override
public Status rebalance(final Set<String> balanceGroupIds, final Configuration conf,
final Map<String, PeerId> rebalancedLeaderIds) {
Requires.requireNonNull(balanceGroupIds, "Null balance group ids");
Requires.requireTrue(!balanceGroupIds.isEmpty(), "Empty balance group ids");
Requires.requireNonNull(conf, "Null configuration");
Requires.requireTrue(!conf.isEmpty(), "No peers of configuration");

LOG.info("Rebalance start with raft groups={}.", balanceGroupIds);

final long start = Utils.monotonicMs();
int transfers = 0;
Status failedStatus = null;
final Queue<String> groupDeque = new ArrayDeque<>(balanceGroupIds);
final LeaderCounter leaderCounter = new LeaderCounter(balanceGroupIds.size(), conf.size());
for (;;) {
final String groupId = groupDeque.poll();
if (groupId == null) { // well done
break;
}

final PeerId leaderId = new PeerId();
final Status leaderStatus = getLeader(groupId, conf, leaderId);
if (!leaderStatus.isOk()) {
failedStatus = leaderStatus;
break;
}

if (rebalancedLeaderIds != null) {
rebalancedLeaderIds.put(groupId, leaderId);
}

if (leaderCounter.incrementAndGet(leaderId) <= leaderCounter.getExpectedAverage()) {
// The num of leaders is less than the expected average, we are going to deal with others
continue;
}

// Find the target peer and try to transfer the leader to this peer
final PeerId targetPeer = findTargetPeer(leaderId, groupId, conf, leaderCounter);
if (!targetPeer.isEmpty()) {
final Status transferStatus = transferLeader(groupId, conf, targetPeer);
transfers++;
if (!transferStatus.isOk()) {
// The failure of `transfer leader` usually means the node is busy,
// so we return failure status and should try `rebalance` again later.
failedStatus = transferStatus;
break;
}

LOG.info("Group {} transfer leader to {}.", groupId, targetPeer);
leaderCounter.decrementAndGet(leaderId);
groupDeque.add(groupId);
if (rebalancedLeaderIds != null) {
rebalancedLeaderIds.put(groupId, targetPeer);
}
}
}

final Status status = failedStatus != null ? failedStatus : Status.OK();
if (LOG.isInfoEnabled()) {
LOG.info(
"Rebalanced raft groups={}, status={}, number of transfers={}, elapsed time={} ms, rebalanced result={}.",
balanceGroupIds, status, transfers, Utils.monotonicMs() - start, rebalancedLeaderIds);
}
return status;
}

private PeerId findTargetPeer(final PeerId self, final String groupId, final Configuration conf,
final LeaderCounter leaderCounter) {
for (final PeerId peerId : getAlivePeers(groupId, conf)) {
if (peerId.equals(self)) {
continue;
}
if (leaderCounter.get(peerId) >= leaderCounter.getExpectedAverage()) {
continue;
}
return peerId;
}
return PeerId.emptyPeer();
}

private List<PeerId> getPeers(final String groupId, final Configuration conf, final boolean onlyGetAlive) {
Requires.requireTrue(!StringUtils.isBlank(groupId), "Blank group id");
Requires.requireNonNull(conf, "Null conf");
Expand Down Expand Up @@ -429,4 +516,31 @@ private List<PeerId> getPeers(final String groupId, final Configuration conf, fi
public CliClientService getCliClientService() {
return cliClientService;
}

private static class LeaderCounter {

private final Map<PeerId, Integer> counter = new HashMap<>();
// The expected average leader number on every peerId
private final int expectedAverage;

public LeaderCounter(final int groupCount, final int peerCount) {
this.expectedAverage = (int) Math.ceil((double) groupCount / peerCount);
}

public int getExpectedAverage() {
return expectedAverage;
}

public int incrementAndGet(final PeerId peerId) {
return this.counter.compute(peerId, (ignored, num) -> num == null ? 1 : num + 1);
}

public int decrementAndGet(final PeerId peerId) {
return this.counter.compute(peerId, (ignored, num) -> num == null ? 0 : num - 1);
}

public int get(final PeerId peerId) {
return this.counter.getOrDefault(peerId, 0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

import java.io.File;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand All @@ -33,6 +36,7 @@
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.NodeManager;
import com.alipay.sofa.jraft.RouteTable;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
Expand All @@ -59,7 +63,6 @@ public class CliServiceTest {

@Before
public void setup() throws Exception {

this.dataPath = TestUtils.mkTempDir();
FileUtils.forceMkdir(new File(this.dataPath));
assertEquals(NodeImpl.GLOBAL_NUM_NODES.get(), 0);
Expand Down Expand Up @@ -242,4 +245,145 @@ public void testGetAlivePeers() throws Exception {
assertEquals("Fail to get leader of group " + this.groupId, e.getMessage());
}
}

@Test
public void testRebalance() {
final Set<String> groupIds = new TreeSet<>();
groupIds.add("group_1");
groupIds.add("group_2");
groupIds.add("group_3");
groupIds.add("group_4");
groupIds.add("group_5");
groupIds.add("group_6");
groupIds.add("group_7");
groupIds.add("group_8");
final Configuration conf = new Configuration();
conf.addPeer(new PeerId("host_1", 8080));
conf.addPeer(new PeerId("host_2", 8080));
conf.addPeer(new PeerId("host_3", 8080));

final Map<String, PeerId> rebalancedLeaderIds = new HashMap<>();

final CliService cliService = new MockCliService(rebalancedLeaderIds, new PeerId("host_1", 8080));

assertTrue(cliService.rebalance(groupIds, conf, rebalancedLeaderIds).isOk());
assertEquals(groupIds.size(), rebalancedLeaderIds.size());

final Map<PeerId, Integer> ret = new HashMap<>();
for (Map.Entry<String, PeerId> entry : rebalancedLeaderIds.entrySet()) {
ret.compute(entry.getValue(), (ignored, num) -> num == null ? 1 : num + 1);
}
final int expectedAvgLeaderNum = (int) Math.ceil((double) groupIds.size() / conf.size());
for (Map.Entry<PeerId, Integer> entry : ret.entrySet()) {
System.out.println(entry);
assertTrue(entry.getValue() <= expectedAvgLeaderNum);
}
}

@Test
public void testRebalanceOnLeaderFail() {
final Set<String> groupIds = new TreeSet<>();
groupIds.add("group_1");
groupIds.add("group_2");
groupIds.add("group_3");
groupIds.add("group_4");
final Configuration conf = new Configuration();
conf.addPeer(new PeerId("host_1", 8080));
conf.addPeer(new PeerId("host_2", 8080));
conf.addPeer(new PeerId("host_3", 8080));

final Map<String, PeerId> rebalancedLeaderIds = new HashMap<>();

final CliService cliService = new MockLeaderFailCliService();

assertEquals("Fail to get leader", cliService.rebalance(groupIds, conf, rebalancedLeaderIds).getErrorMsg());
}

@Test
public void testRelalanceOnTransferLeaderFail() {
final Set<String> groupIds = new TreeSet<>();
groupIds.add("group_1");
groupIds.add("group_2");
groupIds.add("group_3");
groupIds.add("group_4");
groupIds.add("group_5");
groupIds.add("group_6");
groupIds.add("group_7");
final Configuration conf = new Configuration();
conf.addPeer(new PeerId("host_1", 8080));
conf.addPeer(new PeerId("host_2", 8080));
conf.addPeer(new PeerId("host_3", 8080));

final Map<String, PeerId> rebalancedLeaderIds = new HashMap<>();

final CliService cliService = new MockTransferLeaderFailCliService(rebalancedLeaderIds, new PeerId("host_1", 8080));

assertEquals("Fail to transfer leader", cliService.rebalance(groupIds, conf, rebalancedLeaderIds).getErrorMsg());
assertTrue(groupIds.size() >= rebalancedLeaderIds.size());

final Map<PeerId, Integer> ret = new HashMap<>();
for (Map.Entry<String, PeerId> entry : rebalancedLeaderIds.entrySet()) {
ret.compute(entry.getValue(), (ignored, num) -> num == null ? 1 : num + 1);
}
for (Map.Entry<PeerId, Integer> entry : ret.entrySet()) {
System.out.println(entry);
assertEquals(new PeerId("host_1", 8080), entry.getKey());
}
}

class MockCliService extends CliServiceImpl {

private final Map<String, PeerId> rebalancedLeaderIds;
private final PeerId initialLeaderId;

MockCliService(Map<String, PeerId> rebalancedLeaderIds, PeerId initialLeaderId) {
this.rebalancedLeaderIds = rebalancedLeaderIds;
this.initialLeaderId = initialLeaderId;
}

@Override
public Status getLeader(final String groupId, final Configuration conf, final PeerId leaderId) {
final PeerId ret = this.rebalancedLeaderIds.get(groupId);
if (ret != null) {
leaderId.parse(ret.toString());
} else {
leaderId.parse(this.initialLeaderId.toString());
}
return Status.OK();
}

@Override
public List<PeerId> getAlivePeers(final String groupId, final Configuration conf) {
return conf.getPeers();
}

@Override
public Status transferLeader(final String groupId, final Configuration conf, final PeerId peer) {
return Status.OK();
}
}

class MockLeaderFailCliService extends MockCliService {

MockLeaderFailCliService() {
super(null, null);
}

@Override
public Status getLeader(final String groupId, final Configuration conf, final PeerId leaderId) {
return new Status(-1, "Fail to get leader");
}
}

class MockTransferLeaderFailCliService extends MockCliService {

MockTransferLeaderFailCliService(Map<String, PeerId> rebalancedLeaderIds, PeerId initialLeaderId) {
super(rebalancedLeaderIds, initialLeaderId);
}

@Override
public Status transferLeader(final String groupId, final Configuration conf, final PeerId peer) {
return new Status(-1, "Fail to transfer leader");
}
}
}

0 comments on commit 175a6e3

Please sign in to comment.