Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: after scaling down a Raft cluster, the metadata still contains the removed node #6855

Merged
merged 4 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6840](https://github.com/apache/incubator-seata/pull/6840)] Fix the issue of unsafe deserialization in ProcessorYaml.java
- [[#6843](https://github.com/apache/incubator-seata/pull/6843)] Fix 403 error when sending a POST request from the console
- [[#6850](https://github.com/apache/incubator-seata/pull/6850)] raft mode is backward compatible with version 2.0

- [[#6855](https://github.com/apache/incubator-seata/pull/6855)] after scaling down a Raft cluster, the metadata still contains the removed node


### optimize:
Expand Down
3 changes: 2 additions & 1 deletion changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
- [[#6845](https://github.com/apache/incubator-seata/pull/6845)] 修复rocksdb open相同文件多次的问题
- [[#6840](https://github.com/apache/incubator-seata/pull/6840)] 修复ProcessorYaml中不安全的反序列化
- [[#6843](https://github.com/apache/incubator-seata/pull/6843)] 修复从控制台发送POST请求时出现的403错误
- [[#6850](https://github.com/apache/incubator-seata/pull/6850)] raft mode is backward compatible with version 2.0
- [[#6850](https://github.com/apache/incubator-seata/pull/6850)] raft模式向下兼容2.0版本
- [[#6855](https://github.com/apache/incubator-seata/pull/6855)] 修复raft缩容后元数据中残留该节点的问题(需先升级到2.2再进行缩容)


### optimize:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ public boolean equals(Object o) {
return Objects.equals(control, node.control) && Objects.equals(transaction, node.transaction);
}


// convert to String
public String toJsonString(ObjectMapper objectMapper) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ public void onLeaderStart(final long term) {
SeataClusterContext.unbindGroup();
}
});
Configuration conf = RouteTable.getInstance().getConfiguration(group);
// A member change might trigger a leader re-election. At this point, it’s necessary to filter out non-existent members and synchronize again.
changePeers(conf);
}
}

Expand All @@ -262,28 +265,40 @@ public void onStartFollowing(final LeaderChangeContext ctx) {
public void onConfigurationCommitted(Configuration conf) {
LOGGER.info("groupId: {}, onConfigurationCommitted: {}.", group, conf);
RouteTable.getInstance().updateConfiguration(group, conf);
// After a member change, the metadata needs to be synchronized again.
initSync.compareAndSet(true, false);
if (isLeader()) {
lock.lock();
try {
List<PeerId> newFollowers = conf.getPeers();
Set<PeerId> newLearners = conf.getLearners();
List<Node> currentFollowers = raftClusterMetadata.getFollowers();
if (CollectionUtils.isNotEmpty(newFollowers)) {
raftClusterMetadata.setFollowers(currentFollowers.stream()
.filter(node -> contains(node, newFollowers)).collect(Collectors.toList()));
}
if (CollectionUtils.isNotEmpty(newLearners)) {
raftClusterMetadata.setLearner(raftClusterMetadata.getLearner().stream()
.filter(node -> contains(node, newLearners)).collect(Collectors.toList()));
}
syncMetadata();
} finally {
lock.unlock();
changePeers(conf);
}
}

private void changePeers(Configuration conf) {
lock.lock();
try {
List<PeerId> newFollowers = conf.getPeers();
Set<PeerId> newLearners = conf.getLearners();
List<Node> currentFollowers = raftClusterMetadata.getFollowers();
if (CollectionUtils.isNotEmpty(newFollowers)) {
raftClusterMetadata.setFollowers(currentFollowers.stream().filter(node -> contains(node, newFollowers))
.collect(Collectors.toList()));
}
if (CollectionUtils.isNotEmpty(newLearners)) {
raftClusterMetadata.setLearner(raftClusterMetadata.getLearner().stream()
.filter(node -> contains(node, newLearners)).collect(Collectors.toList()));
} else {
raftClusterMetadata.setLearner(Collections.emptyList());
}
CompletableFuture.runAsync(this::syncMetadata, RESYNC_METADATA_POOL);
} finally {
lock.unlock();
}
}

private boolean contains(Node node, Collection<PeerId> list) {
// This indicates that the node is of a lower version.
// When scaling up or down on a higher version
// you need to ensure that the cluster is consistent first
// otherwise, the lower version nodes may be removed.
if (node.getInternal() == null) {
return true;
}
Expand Down Expand Up @@ -367,25 +382,24 @@ public void refreshClusterMetadata(RaftBaseMsg syncMsg) {
}

private void syncCurrentNodeInfo(String group) {
if (initSync.get()) {
return;
}
try {
RouteTable.getInstance().refreshLeader(RaftServerManager.getCliClientServiceInstance(), group, 1000);
PeerId peerId = RouteTable.getInstance().selectLeader(group);
if (peerId != null) {
syncCurrentNodeInfo(peerId);
if (initSync.compareAndSet(false, true)) {
try {
RouteTable.getInstance().refreshLeader(RaftServerManager.getCliClientServiceInstance(), group, 1000);
PeerId peerId = RouteTable.getInstance().selectLeader(group);
if (peerId != null) {
syncCurrentNodeInfo(peerId);
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}

private void syncCurrentNodeInfo(PeerId leaderPeerId) {
try {
// Ensure that the current leader must be version 2.1 or later to synchronize the operation
Node leader = raftClusterMetadata.getLeader();
if (leader != null && StringUtils.isNotBlank(leader.getVersion()) && initSync.compareAndSet(false, true)) {
if (leader != null && StringUtils.isNotBlank(leader.getVersion())) {
RaftServer raftServer = RaftServerManager.getRaftServer(group);
PeerId cureentPeerId = raftServer.getServerId();
Node node = raftClusterMetadata.createNode(XID.getIpAddress(), XID.getPort(), cureentPeerId.getPort(),
Expand Down
Loading