-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multi-raft-group setup rebalance without PD #176
Multi-raft-group setup rebalance without PD #176
Conversation
@SteNicholas 期望的是 multi raft group 的情况下这个 api 可以平衡每个节点上的 leader 数量,你的实现看起来只是对单个 group 随机 transfer 了一下 leader ? |
@fengjiachun 你的意思是对Multi Raft Group所有Group都要平衡每个节点上的 leader 数量吗? |
是的,就像 Benchmark client 代码里做的那样 |
@fengjiachun 已经修改成Multi Raft Group场景,帮忙Review。 |
Requires.requireTrue(!groupIds.isEmpty(), "Empty group id queue"); | ||
Requires.requireNonNull(conf, "Null configuration"); | ||
|
||
final int groupSizePerPeer = groupIds.size() / conf.size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
conf 只 check 了是否为 null, 没有 check size 是否为0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fengjiachun 这边check configuration的size是指Peer节点数量么?
@@ -383,6 +387,62 @@ public Status getLeader(final String groupId, final Configuration conf, final Pe | |||
return getPeers(groupId, conf, true); | |||
} | |||
|
|||
@Override | |||
public Status rebalance(final Queue<String> groupIds, final Configuration conf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
建议 groupIds 改为 List,并且方法内不修改这个 list,需要 queue 方法内自己构建
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fengjiachun 我把参数类型改成List,rebalance方法内用此参数构造Deque。
if (StringUtil.isEmpty(groupId)) { | ||
break; | ||
} | ||
PeerId leaderId = new PeerId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final PeerId leaderId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fengjiachun 这边把final修饰词给遗漏了,我加上。
Requires.requireNonNull(conf, "Null configuration"); | ||
|
||
final int groupSizePerPeer = groupIds.size() / conf.size(); | ||
final Map<PeerId, Integer> peerMap = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
peerMap 为局部变量, CHM 没有意义
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fengjiachun 我修改成HashMap,确实没必要用ConcurrentHashMap。
continue; | ||
} | ||
try { | ||
transferLeader(groupId, conf, peerId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transferLeader 失败情况下没有处理
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fengjiachun 如果transferLeader出现异常,目前我的做法是移除此节点,调用getAlivePeers方法就拿不到此PeerId节点。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我的意思是 transferLeader 需要考虑失败的情况,返回值 Status 不能忽略
continue; | ||
} | ||
if (size < groupSizePerPeer) { | ||
peerMap.put(leaderId, size + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
groupSizePerPeer * size <= sum 而不是 groupSizePerPeer * size == sum
所以这里 if (size < groupSizePerPeer) 是不是有问题?
assertTrue(this.cliService.rebalance(groupIds, conf).isOk()); | ||
cluster.waitLeader(); | ||
assertEquals(leader, cluster.getLeader().getNodeId().getPeerId()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
需要更严谨的单测
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
测试不够
if (!this.cliClientService.connect(leaderId.getEndpoint())) { | ||
return new Status(-1, "Fail to init channel to leader %s", leaderId); | ||
} | ||
LOG.info("Group {} leader is {}", groupId, leaderId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
日志句尾加上句号吧
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fengjiachun 我把transferLeader运行日志末尾加句号。
groupIds.add(groupId); | ||
break; | ||
} catch (final Exception e) { | ||
LOG.error("Fail to transfer leader to {}", peerId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
同上
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fengjiachun 我把transferLeader异常日志末尾加句号。
也描述一下 rebalance 的具体算法或思路吧 @SteNicholas |
* Balance the number of leaders. | ||
* | ||
* @param groupIds the raft group id queue | ||
* @param conf current configuration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
‘current configuration’ 要和上一行对齐
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fengjiachun 我修改下comment格式。
测试失败了,请看下。 |
@SteNicholas 单测还是需修改下,不要在单测里复制 rebalance 方法的逻辑,应该直接验证 rebalance 方法的输出 |
public Status rebalance(final List<String> groupIds, final Configuration conf, final Map<String, PeerId> leaderIds) { | ||
Requires.requireTrue(!groupIds.isEmpty(), "Empty group id queue"); | ||
Requires.requireNonNull(conf, "Null configuration"); | ||
Requires.requireTrue(conf.size() != 0, "No peers of configuration"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
用 !conf.isEmpty() 替换 conf.size() != 0 吧
@@ -383,6 +384,64 @@ public Status getLeader(final String groupId, final Configuration conf, final Pe | |||
return getPeers(groupId, conf, true); | |||
} | |||
|
|||
@Override | |||
public Status rebalance(final List<String> groupIds, final Configuration conf, final Map<String, PeerId> leaderIds) { | |||
Requires.requireTrue(!groupIds.isEmpty(), "Empty group id queue"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Empty group id queue" 这个 msg 要改下
final Map<PeerId, Integer> peerMap = new HashMap<>(); | ||
for (;;) { | ||
final String groupId = groupDeque.poll(); | ||
if (StringUtil.isEmpty(groupId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个类已经引入一个 apache 的 StringUtils 了,不要再使用类似的工具类,把 StringUtil 去掉吧
try { | ||
final Status status = getLeader(groupId, conf, leaderId); | ||
if (!status.isOk()) { | ||
throw new Exception("No leader in group: " + groupId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
抛出 JRaftException
continue; | ||
} | ||
try { | ||
transferLeader(groupId, conf, peerId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我的意思是 transferLeader 需要考虑失败的情况,返回值 Status 不能忽略
@@ -17,6 +17,8 @@ | |||
package com.alipay.sofa.jraft; | |||
|
|||
import java.util.List; | |||
import java.util.Map; | |||
import java.util.Queue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
多余的 import ? 检查一下,ide 一般都会有告警
@@ -16,8 +16,7 @@ | |||
*/ | |||
package com.alipay.sofa.jraft.core; | |||
|
|||
import java.util.ArrayList; | |||
import java.util.List; | |||
import java.util.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
禁止 inport *
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
同上,修改一下你的 ide 设置,禁止 import *
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import com.alipay.sofa.jraft.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
同上
assertTrue(this.cliService.rebalance(groupIds, conf).isOk()); | ||
cluster.waitLeader(); | ||
assertEquals(leader, cluster.getLeader().getNodeId().getPeerId()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
测试不够
先 merge 到临时分支,一起看看怎么更好的测试 |
* Multi-raft-group setup rebalance without PD (#176) * Multi-raft-group setup rebalance without PD * Multi-raft-group setup rebalance without PD * Multi-raft-group setup rebalance without PD * Multi-raft-group setup rebalance without PD * multi group rebalance * multi group rebalance * improve rebalance logic * rebalance logic unittest * rebalance logic unittest * format rebalance imports * (fix) minor fix for rebalance * (fix) minor fix and more unit test * (fix) minor fix * (fix) rebalance refactoring * (fix) add rebalance log * (fix) add rebalance log
Motivation:
In the case of RheaKV multi raft group, the PD module is currently used to balance leaders between all nodes in the cluster, but there are several problems:
The PD relies on the status data carried by the heartbeat of each node to analyze and then send a 'transfer_leader' command, which causes a delay.
Some users don't want to deploy a PD cluster, but still want to use multi-raft-group, which causes the raft cluster have no ability to automatically balance the leaders.
PD is a module at the RheaKV level and cannot directly support jraft-core module.
Modification:
Provide a setup rebalance mechanism at the jraft-core level, which can be called by cli service.
Result:
Fixes #107