From b6a01adacdb98b963306a081518891247c47f1d4 Mon Sep 17 00:00:00 2001 From: brido4125 Date: Tue, 10 Dec 2024 13:55:12 +0900 Subject: [PATCH] FIX: Duplicated switchover event. --- .../spy/memcached/MemcachedConnection.java | 72 ++++++++++++++++--- .../spy/memcached/MemcachedReplicaGroup.java | 20 ++++++ .../memcached/protocol/BaseOperationImpl.java | 1 + 3 files changed, 82 insertions(+), 11 deletions(-) diff --git a/src/main/java/net/spy/memcached/MemcachedConnection.java b/src/main/java/net/spy/memcached/MemcachedConnection.java index a8c2318b0..9a513bc49 100644 --- a/src/main/java/net/spy/memcached/MemcachedConnection.java +++ b/src/main/java/net/spy/memcached/MemcachedConnection.java @@ -46,6 +46,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import net.spy.memcached.compat.SpyObject; import net.spy.memcached.compat.log.LoggerFactory; @@ -368,21 +369,69 @@ private void updateConnections(List addrs) throws IOException /* ENABLE_REPLICATION if */ private Set findChangedGroups(List addrs, - Collection nodes) { - Map addrMap = new HashMap<>(); + Collection oldGroups) { + Map newAddrMap = new HashMap<>(); + Map> newGroupMap = new HashMap<>(); + for (InetSocketAddress each : addrs) { - addrMap.put(each.toString(), each); + newAddrMap.put(each.toString(), each); + ArcusReplNodeAddress repl = (ArcusReplNodeAddress) each; + if (!newGroupMap.containsKey(repl.getGroupName())) { + newGroupMap.put(repl.getGroupName(), new ArrayList<>()); + } + newGroupMap.get(repl.getGroupName()).add(repl); } - Set changedGroupSet = new HashSet<>(); - for (MemcachedNode node : nodes) { - String nodeAddr = ((InetSocketAddress) node.getSocketAddress()).toString(); - if (addrMap.remove(nodeAddr) == null) { // removed node - changedGroupSet.add(node.getReplicaGroup().getGroupName()); + for (MemcachedReplicaGroup oldGroup : oldGroups) { + if (oldGroup.isAlreadySwitched()) { + // repl_slave or switchover 응답을 받은 경우 + List newGroup = newGroupMap.get(oldGroup.getGroupName()); + // ArcusReplNodeAddress의 toString : {groupName M[S] IP:Port} + // isAlreadySwitched인 그룹에 대한 변경 사항은 newGroupMap을 통해 변경 감지 하기에 + // 중복 변경감지 방지를 위해 newAddrMap 주소값 제거 + newAddrMap.keySet().removeIf(s -> s.startsWith("{" + oldGroup.getGroupName())); + if (newGroup != null) { + // old 형상 기준 group name이 변경되지 않은 경우 + Set newRepl = newGroup.stream() + .map(ArcusReplNodeAddress::getIPPort) + .collect(Collectors.toSet()); + Set oldRepl = oldGroup.getArcusReplNodeAddressList() + .stream() + .map(ArcusReplNodeAddress::getIPPort) + .collect(Collectors.toSet()); + + if (oldRepl.equals(newRepl)) { + // 노드 구성에 변화가 없고, 마스터 노드가 변경되지 않고, 슬레이브 노드들이 변경되지 않은 경우 + ArcusReplNodeAddress oldMaster + = (ArcusReplNodeAddress) oldGroup.getMasterNode().getSocketAddress(); + ArcusReplNodeAddress newMaster = newGroupMap.get(oldGroup.getGroupName()) + .stream() + .filter(ArcusReplNodeAddress::isMaster) + .findFirst() + .orElse(null); + if (newMaster != null && oldMaster.getIPPort().equals(newMaster.getIPPort())) { + // 현재 로케이터 기준의 형상을 따른 zk 이벤트 발생 -> flag 원복 + oldGroup.setAlreadySwitched(false); + } + // old 형상이 들어온 경우 flag 원복 skip -> 추후 시점에 원복 시킬 수 있는 이벤트 무조건 발생 + // newMaster가 null이더라도, 추후 시점엔 정상적인 이벤트 발생으로 flag 원복 + continue; + } + } + // groupname, ip port, 노드 구성 중 하나라도 변화가 있는 경우 + changedGroupSet.add(oldGroup.getGroupName()); + oldGroup.setAlreadySwitched(false); + } else { + List addrList = oldGroup.getArcusReplNodeAddressList(); + for (ArcusReplNodeAddress repl : addrList) { + if (newAddrMap.remove(repl.toString()) == null) { // removed node + changedGroupSet.add(oldGroup.getGroupName()); + } + } } } - for (String addr : addrMap.keySet()) { // newly added node - ArcusReplNodeAddress a = (ArcusReplNodeAddress) addrMap.get(addr); + for (String addr : newAddrMap.keySet()) { // newly added node + ArcusReplNodeAddress a = (ArcusReplNodeAddress) newAddrMap.get(addr); changedGroupSet.add(a.getGroupName()); } return changedGroupSet; @@ -416,7 +465,8 @@ private void updateReplConnections(List addrs) throws IOExcep * we find out the changed groups with the comparison of previous and current znode list, * and update the state of groups based on them. */ - Set changedGroups = findChangedGroups(addrs, locator.getAll()); + Set changedGroups = findChangedGroups(addrs, + ((ArcusReplKetamaNodeLocator) locator).getAllGroups().values()); Map> newAllGroups = ArcusReplNodeAddress.makeGroupAddrsList(findAddrsOfChangedGroups(addrs, changedGroups)); diff --git a/src/main/java/net/spy/memcached/MemcachedReplicaGroup.java b/src/main/java/net/spy/memcached/MemcachedReplicaGroup.java index 36ccbe367..14432150e 100644 --- a/src/main/java/net/spy/memcached/MemcachedReplicaGroup.java +++ b/src/main/java/net/spy/memcached/MemcachedReplicaGroup.java @@ -32,6 +32,7 @@ public abstract class MemcachedReplicaGroup extends SpyObject { protected MemcachedNode masterCandidate; private final StringBuilder sb = new StringBuilder(); private boolean delayedSwitchover = false; + private boolean alreadySwitched = false; public static final int MAX_REPL_SLAVE_SIZE = 2; public static final int MAX_REPL_GROUP_SIZE = MAX_REPL_SLAVE_SIZE + 1; @@ -43,6 +44,17 @@ protected MemcachedReplicaGroup(final String groupName) { this.group = groupName; } + public List getArcusReplNodeAddressList() { + List arcusReplNodeAddressList = new ArrayList<>(); + if (masterNode != null) { + arcusReplNodeAddressList.add((ArcusReplNodeAddress) masterNode.getSocketAddress()); + } + for (MemcachedNode slaveNode : slaveNodes) { + arcusReplNodeAddressList.add((ArcusReplNodeAddress) slaveNode.getSocketAddress()); + } + return arcusReplNodeAddressList; + } + @Override public String toString() { sb.setLength(0); @@ -56,6 +68,14 @@ public String toString() { return sb.toString(); } + public boolean isAlreadySwitched() { + return alreadySwitched; + } + + public void setAlreadySwitched(boolean alreadySwitched) { + this.alreadySwitched = alreadySwitched; + } + public boolean isEmptyGroup() { return masterNode == null && slaveNodes.isEmpty(); } diff --git a/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java b/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java index 08362d509..6a88f0b62 100644 --- a/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java @@ -166,6 +166,7 @@ protected final void receivedMoveOperations(String cause) { getLogger().info("%s message received by %s operation from %s", cause, this, handlingNode); transitionState(OperationState.MOVING); + group.setAlreadySwitched(true); } /* ENABLE_REPLICATION end */