Skip to content

Commit

Permalink
FIX: Duplicated switchover event.
Browse files Browse the repository at this point in the history
  • Loading branch information
brido4125 committed Dec 18, 2024
1 parent 7b301af commit b6a01ad
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 11 deletions.
72 changes: 61 additions & 11 deletions src/main/java/net/spy/memcached/MemcachedConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import net.spy.memcached.compat.SpyObject;
import net.spy.memcached.compat.log.LoggerFactory;
Expand Down Expand Up @@ -368,21 +369,69 @@ private void updateConnections(List<InetSocketAddress> addrs) throws IOException

/* ENABLE_REPLICATION if */
private Set<String> findChangedGroups(List<InetSocketAddress> addrs,
Collection<MemcachedNode> nodes) {
Map<String, InetSocketAddress> addrMap = new HashMap<>();
Collection<MemcachedReplicaGroup> oldGroups) {
Map<String, InetSocketAddress> newAddrMap = new HashMap<>();
Map<String, List<ArcusReplNodeAddress>> newGroupMap = new HashMap<>();

for (InetSocketAddress each : addrs) {
addrMap.put(each.toString(), each);
newAddrMap.put(each.toString(), each);
ArcusReplNodeAddress repl = (ArcusReplNodeAddress) each;
if (!newGroupMap.containsKey(repl.getGroupName())) {
newGroupMap.put(repl.getGroupName(), new ArrayList<>());
}
newGroupMap.get(repl.getGroupName()).add(repl);
}

Set<String> changedGroupSet = new HashSet<>();
for (MemcachedNode node : nodes) {
String nodeAddr = ((InetSocketAddress) node.getSocketAddress()).toString();
if (addrMap.remove(nodeAddr) == null) { // removed node
changedGroupSet.add(node.getReplicaGroup().getGroupName());
for (MemcachedReplicaGroup oldGroup : oldGroups) {
if (oldGroup.isAlreadySwitched()) {
// repl_slave or switchover 응답을 받은 경우
List<ArcusReplNodeAddress> newGroup = newGroupMap.get(oldGroup.getGroupName());
// ArcusReplNodeAddress의 toString : {groupName M[S] IP:Port}
// isAlreadySwitched인 그룹에 대한 변경 사항은 newGroupMap을 통해 변경 감지 하기에
// 중복 변경감지 방지를 위해 newAddrMap 주소값 제거
newAddrMap.keySet().removeIf(s -> s.startsWith("{" + oldGroup.getGroupName()));
if (newGroup != null) {
// old 형상 기준 group name이 변경되지 않은 경우
Set<String> newRepl = newGroup.stream()
.map(ArcusReplNodeAddress::getIPPort)
.collect(Collectors.toSet());
Set<String> oldRepl = oldGroup.getArcusReplNodeAddressList()
.stream()
.map(ArcusReplNodeAddress::getIPPort)
.collect(Collectors.toSet());

if (oldRepl.equals(newRepl)) {
// 노드 구성에 변화가 없고, 마스터 노드가 변경되지 않고, 슬레이브 노드들이 변경되지 않은 경우
ArcusReplNodeAddress oldMaster
= (ArcusReplNodeAddress) oldGroup.getMasterNode().getSocketAddress();
ArcusReplNodeAddress newMaster = newGroupMap.get(oldGroup.getGroupName())
.stream()
.filter(ArcusReplNodeAddress::isMaster)
.findFirst()
.orElse(null);
if (newMaster != null && oldMaster.getIPPort().equals(newMaster.getIPPort())) {
// 현재 로케이터 기준의 형상을 따른 zk 이벤트 발생 -> flag 원복
oldGroup.setAlreadySwitched(false);
}
// old 형상이 들어온 경우 flag 원복 skip -> 추후 시점에 원복 시킬 수 있는 이벤트 무조건 발생
// newMaster가 null이더라도, 추후 시점엔 정상적인 이벤트 발생으로 flag 원복
continue;
}
}
// groupname, ip port, 노드 구성 중 하나라도 변화가 있는 경우
changedGroupSet.add(oldGroup.getGroupName());
oldGroup.setAlreadySwitched(false);
} else {
List<ArcusReplNodeAddress> addrList = oldGroup.getArcusReplNodeAddressList();
for (ArcusReplNodeAddress repl : addrList) {
if (newAddrMap.remove(repl.toString()) == null) { // removed node
changedGroupSet.add(oldGroup.getGroupName());
}
}
}
}
for (String addr : addrMap.keySet()) { // newly added node
ArcusReplNodeAddress a = (ArcusReplNodeAddress) addrMap.get(addr);
for (String addr : newAddrMap.keySet()) { // newly added node
ArcusReplNodeAddress a = (ArcusReplNodeAddress) newAddrMap.get(addr);
changedGroupSet.add(a.getGroupName());
}
return changedGroupSet;
Expand Down Expand Up @@ -416,7 +465,8 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep
* we find out the changed groups with the comparison of previous and current znode list,
* and update the state of groups based on them.
*/
Set<String> changedGroups = findChangedGroups(addrs, locator.getAll());
Set<String> changedGroups = findChangedGroups(addrs,
((ArcusReplKetamaNodeLocator) locator).getAllGroups().values());

Map<String, List<ArcusReplNodeAddress>> newAllGroups =
ArcusReplNodeAddress.makeGroupAddrsList(findAddrsOfChangedGroups(addrs, changedGroups));
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/net/spy/memcached/MemcachedReplicaGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public abstract class MemcachedReplicaGroup extends SpyObject {
protected MemcachedNode masterCandidate;
private final StringBuilder sb = new StringBuilder();
private boolean delayedSwitchover = false;
private boolean alreadySwitched = false;

public static final int MAX_REPL_SLAVE_SIZE = 2;
public static final int MAX_REPL_GROUP_SIZE = MAX_REPL_SLAVE_SIZE + 1;
Expand All @@ -43,6 +44,17 @@ protected MemcachedReplicaGroup(final String groupName) {
this.group = groupName;
}

public List<ArcusReplNodeAddress> getArcusReplNodeAddressList() {
List<ArcusReplNodeAddress> arcusReplNodeAddressList = new ArrayList<>();
if (masterNode != null) {
arcusReplNodeAddressList.add((ArcusReplNodeAddress) masterNode.getSocketAddress());
}
for (MemcachedNode slaveNode : slaveNodes) {
arcusReplNodeAddressList.add((ArcusReplNodeAddress) slaveNode.getSocketAddress());
}
return arcusReplNodeAddressList;
}

@Override
public String toString() {
sb.setLength(0);
Expand All @@ -56,6 +68,14 @@ public String toString() {
return sb.toString();
}

public boolean isAlreadySwitched() {
return alreadySwitched;
}

public void setAlreadySwitched(boolean alreadySwitched) {
this.alreadySwitched = alreadySwitched;
}

public boolean isEmptyGroup() {
return masterNode == null && slaveNodes.isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ protected final void receivedMoveOperations(String cause) {

getLogger().info("%s message received by %s operation from %s", cause, this, handlingNode);
transitionState(OperationState.MOVING);
group.setAlreadySwitched(true);
}
/* ENABLE_REPLICATION end */

Expand Down

0 comments on commit b6a01ad

Please sign in to comment.