Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-raft-group setup rebalance without PD #176

Merged
11 changes: 11 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/CliService.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alipay.sofa.jraft;

import java.util.List;
import java.util.Map;

import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
Expand Down Expand Up @@ -118,4 +119,14 @@ public interface CliService extends Lifecycle<CliOptions> {
* @return all alive peers of the replication group
*/
List<PeerId> getAlivePeers(final String groupId, final Configuration conf);

/**
* Balance the number of leaders.
*
* @param groupIds the raft group id queue
* @param conf current configuration
* @param leaderIds the ids of raft group leader
* @return operation status
*/
Status rebalance(final List<String> groupIds, final Configuration conf, final Map<String, PeerId> leaderIds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
*/
package com.alipay.sofa.jraft.core;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -330,7 +334,7 @@ public Status getLeader(final String groupId, final Configuration conf, final Pe
final Status st = new Status(-1, "Fail to get leader of group %s", groupId);
for (final PeerId peer : conf) {
if (!this.cliClientService.connect(peer.getEndpoint())) {
LOG.error("Fail to connect peer {} to get leader for group {}.", groupId);
LOG.error("Fail to connect peer {} to get leader for group {}.", peer, groupId);
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
continue;
}

Expand Down Expand Up @@ -383,6 +387,66 @@ public List<PeerId> getAlivePeers(final String groupId, final Configuration conf
return getPeers(groupId, conf, true);
}

@Override
public Status rebalance(final List<String> groupIds, final Configuration conf, final Map<String, PeerId> leaderIds) {
Requires.requireTrue(!groupIds.isEmpty(), "Empty group id list");
Requires.requireNonNull(conf, "Null configuration");
Requires.requireTrue(!conf.isEmpty(), "No peers of configuration");

final int groupSizePerPeer = groupIds.size() / conf.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conf 只 check 了是否为 null, 没有 check size 是否为0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fengjiachun 这边check configuration的size是指Peer节点数量么?

final Queue<String> groupDeque = new ArrayDeque<>(groupIds);
final Map<PeerId, Integer> peerMap = new HashMap<>();
for (;;) {
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
final String groupId = groupDeque.poll();
if (StringUtils.isEmpty(groupId)) {
break;
}
final PeerId leaderId = new PeerId();
try {
final Status status = getLeader(groupId, conf, leaderId);
if (!status.isOk()) {
throw new JRaftException("No leader in group: " + groupId);
}
if (leaderId.getEndpoint() == null) {
continue;
}
LOG.info("Group {} leader is {}.", groupId, leaderId);
} catch (final Exception e) {
groupDeque.add(groupId);
continue;
}
final Integer size = peerMap.get(leaderId);
if (size == null) {
peerMap.put(leaderId, 1);
continue;
}
if (size <= groupSizePerPeer) {
peerMap.put(leaderId, size + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

groupSizePerPeer * size <= sum 而不是 groupSizePerPeer * size == sum

所以这里 if (size < groupSizePerPeer) 是不是有问题?

continue;
}
for (final PeerId peerId : getAlivePeers(groupId, conf)) {
final Integer pSize = peerMap.get(peerId);
if (pSize != null && pSize >= groupSizePerPeer) {
continue;
}
try {
final Status status = transferLeader(groupId, conf, peerId);
if (status.isOk()) {
LOG.info("Group {} transfer leader to {}.", groupId, peerId);
groupDeque.add(groupId);
break;
} else {
LOG.error("Fail to transfer leader to {}.", peerId);
}
} catch (final Exception e) {
LOG.error("Fail to transfer leader to {}.", peerId);
}
}
leaderIds.put(groupId, leaderId);
}
return Status.OK();
}

private List<PeerId> getPeers(final String groupId, final Configuration conf, final boolean onlyGetAlive) {
Requires.requireTrue(!StringUtils.isBlank(groupId), "Blank group id");
Requires.requireNonNull(conf, "Null conf");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,33 @@

import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import com.alipay.sofa.jraft.CliService;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.NodeManager;
import com.alipay.sofa.jraft.RouteTable;
import com.alipay.sofa.jraft.Status;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.alipay.sofa.jraft.CliService;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.NodeManager;
import com.alipay.sofa.jraft.RouteTable;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.test.TestUtils;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
Expand All @@ -51,14 +58,17 @@ public class CliServiceTest {
private String dataPath;

private TestCluster cluster;
private final String groupId = "CliServiceTest";

private CliService cliService;
private final String groupId = "CliServiceTest";

@Spy
private CliService cliService = new CliServiceImpl();

private Configuration conf;

@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);

this.dataPath = TestUtils.mkTempDir();
FileUtils.forceMkdir(new File(this.dataPath));
Expand All @@ -71,7 +81,6 @@ public void setup() throws Exception {
}
cluster.waitLeader();

cliService = new CliServiceImpl();
this.conf = new Configuration(peers);
assertTrue(cliService.init(new CliOptions()));
}
Expand Down Expand Up @@ -242,4 +251,24 @@ public void testGetAlivePeers() throws Exception {
assertEquals("Fail to get leader of group " + this.groupId, e.getMessage());
}
}

@Test
public void testRebalance() {
final PeerId leader = cluster.getLeader().getNodeId().getPeerId().copy();
assertNotNull(leader);

Mockito.doReturn(this.conf.getPeers()).when(this.cliService).getAlivePeers(groupId, this.conf);
for (final PeerId peer : this.conf.getPeerSet()) {
Mockito.doReturn(Status.OK()).when(this.cliService).transferLeader(groupId, this.conf, peer);
}

final List<String> groupIds = new ArrayList<>();
groupIds.add(groupId);
groupIds.add(groupId);
groupIds.add(groupId);

final Map<String, PeerId> leaderIds = new HashMap<>();
assertTrue(this.cliService.rebalance(groupIds, this.conf, leaderIds).isOk());
assertEquals(1, leaderIds.size());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需要更严谨的单测

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

测试不够

}