diff --git a/src/main/java/net/spy/memcached/MemcachedConnection.java b/src/main/java/net/spy/memcached/MemcachedConnection.java index a8c2318b0..ed9f24a93 100644 --- a/src/main/java/net/spy/memcached/MemcachedConnection.java +++ b/src/main/java/net/spy/memcached/MemcachedConnection.java @@ -453,7 +453,7 @@ private void updateReplConnections(List addrs) throws IOExcep if (oldGroup.isDelayedSwitchover()) { delayedSwitchoverGroups.remove(oldGroup); - switchoverMemcachedReplGroup(oldGroup.getMasterNode(), true); + switchoverMemcachedReplGroup(oldGroup, true); } MemcachedNode oldMasterNode = oldGroup.getMasterNode(); @@ -587,8 +587,15 @@ private Set getSlaveAddrsFromGroupAddrs( /* ENABLE_REPLICATION end */ /* ENABLE_REPLICATION if */ - private void switchoverMemcachedReplGroup(MemcachedNode node, boolean cancelNonIdempotent) { - MemcachedReplicaGroup group = node.getReplicaGroup(); + private void switchoverMemcachedReplGroup(MemcachedReplicaGroup group, + boolean cancelNonIdempotent) { + + if (group.getMasterCandidate() == null) { + getLogger().warn("Delay switchover because invalid group state : " + group); + return; + } + + MemcachedNode oldMaster = group.getMasterNode(); /* must keep the following execution order when switchover * - first moveOperations @@ -596,17 +603,13 @@ private void switchoverMemcachedReplGroup(MemcachedNode node, boolean cancelNonI * * because moves all operations */ - if (group.getMasterNode() != null && group.getMasterCandidate() != null) { - if (((ArcusReplNodeAddress) node.getSocketAddress()).isMaster()) { - ((ArcusReplKetamaNodeLocator) locator).switchoverReplGroup(group); - } - node.moveOperations(group.getMasterNode(), cancelNonIdempotent); - addedQueue.offer(group.getMasterNode()); - queueReconnect(node, ReconnDelay.IMMEDIATE, - "Discarded all pending reading state operation to move operations."); - } else { - getLogger().warn("Delay switchover because invalid group state : " + group); - } + ((ArcusReplKetamaNodeLocator) locator).switchoverReplGroup(group); + MemcachedNode newMaster = group.getMasterNode(); + + oldMaster.moveOperations(newMaster, cancelNonIdempotent); + addedQueue.offer(newMaster); + queueReconnect(oldMaster, ReconnDelay.IMMEDIATE, + "Discarded all pending reading state operation to move operations."); } /* ENABLE_REPLICATION end */ @@ -1015,8 +1018,9 @@ private void handleReads(MemcachedNode qa) /* ENABLE_REPLICATION if */ if (currentOp != null && currentOp.getState() == OperationState.MOVING) { ((Buffer) rbuf).clear(); - delayedSwitchoverGroups.remove(qa.getReplicaGroup()); - switchoverMemcachedReplGroup(qa, false); + MemcachedReplicaGroup group = qa.getReplicaGroup(); + delayedSwitchoverGroups.remove(group); + switchoverMemcachedReplGroup(group, false); break; } /* ENABLE_REPLICATION end */ @@ -1031,10 +1035,10 @@ private void handleReads(MemcachedNode qa) /* ENABLE_REPLICATION if */ if (arcusReplEnabled) { if (currentOp == null) { // readQ is empty - if (qa.getReplicaGroup().isDelayedSwitchover() && - qa.getReplicaGroup().masterNode == qa) { - delayedSwitchoverGroups.remove(qa.getReplicaGroup()); - switchoverMemcachedReplGroup(qa, false); + MemcachedReplicaGroup group = qa.getReplicaGroup(); + if (group.isDelayedSwitchover() && group.getMasterNode() == qa) { + delayedSwitchoverGroups.remove(group); + switchoverMemcachedReplGroup(group, false); } } } @@ -1179,10 +1183,10 @@ private void queueReconnect(MemcachedNode qa, ReconnDelay type, String cause) { /* ENABLE_REPLICATION if */ if (arcusReplEnabled) { - if (qa.getReplicaGroup().isDelayedSwitchover() && - qa.getReplicaGroup().getMasterNode() == qa) { - delayedSwitchoverGroups.remove(qa.getReplicaGroup()); - switchoverMemcachedReplGroup(qa, true); + MemcachedReplicaGroup group = qa.getReplicaGroup(); + if (group.isDelayedSwitchover() && group.getMasterNode() == qa) { + delayedSwitchoverGroups.remove(group); + switchoverMemcachedReplGroup(group, true); return; } } @@ -1800,12 +1804,15 @@ public void switchover() { Iterator> iterator = groups.entrySet().iterator(); while (iterator.hasNext()) { Entry entry = iterator.next(); - if (now < entry.getKey()) { + long switchoverTime = entry.getKey(); + MemcachedReplicaGroup group = entry.getValue(); + + if (now < switchoverTime) { return; } else { iterator.remove(); - entry.getValue().setDelayedSwitchover(false); - switchoverMemcachedReplGroup(entry.getValue().getMasterNode(), true); + group.setDelayedSwitchover(false); + switchoverMemcachedReplGroup(group, true); } } }