Skip to content

Commit

Permalink
move queueReconnect line because of switchover
Browse files Browse the repository at this point in the history
delete unnecessary call setupResend
refactoring MoveOperationTask to Task interface
  • Loading branch information
whchoi83 committed Sep 8, 2016
1 parent b6c3f0e commit 2a61d7e
Showing 1 changed file with 80 additions and 30 deletions.
110 changes: 80 additions & 30 deletions src/main/java/net/spy/memcached/MemcachedConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ public void updateConnections(List<InetSocketAddress> addrs) throws IOException
/* ENABLE_REPLICATION if */
if (arcusReplEnabled) {
List<MemcachedReplicaGroup> changeRoleGroups = new ArrayList<MemcachedReplicaGroup>();
List<MoveOperationTask> taskList = new ArrayList<MoveOperationTask>();
// will do task after locator update
List<Task> taskList = new ArrayList<Task>();
Map<String, List<ArcusReplNodeAddress>> newAllGroups =
ArcusReplNodeAddress.makeGroupAddrsList(addrs);

Expand Down Expand Up @@ -300,24 +301,25 @@ public void updateConnections(List<InetSocketAddress> addrs) throws IOException
attachNodes.add(newMasterNode = attachMemcachedNode(newGroupAddrs.get(0)));

/* move operation old master -> new master */
oldGroup.getMasterNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
taskList.add(new SetupResendTask(oldGroup.getMasterNode(), false, "Discarded all pending reading state operation to move operations."));
taskList.add(new MoveOperationTask(oldGroup.getMasterNode(), newMasterNode));
}
} else { /* Old group has both a master node and a slave node. */
if (newGroupAddrs.get(0).getIPPort().equals(oldMasterAddr.getIPPort())) {
/* The old slave has disappeared. */
removeNodes.add(oldGroup.getSlaveNode());

/* move operation slave -> master */
oldGroup.getSlaveNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
/* move operation slave -> master
* Slave node have only read operations, then don't need call setupResend
*/
taskList.add(new MoveOperationTask(oldGroup.getSlaveNode(), oldGroup.getMasterNode()));
} else if (newGroupAddrs.get(0).getIPPort().equals(oldSlaveAddr.getIPPort())) {
/* The old slave has failovered to the master with new slave */
removeNodes.add(oldGroup.getMasterNode());
changeRoleGroups.add(oldGroup);

/* move operation master -> slave */
oldGroup.getMasterNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
taskList.add(new SetupResendTask(oldGroup.getMasterNode(), false, "Discarded all pending reading state operation to move operations."));
taskList.add(new MoveOperationTask(oldGroup.getMasterNode(), oldGroup.getSlaveNode()));
} else {
MemcachedNode newMasterNode;
Expand All @@ -327,10 +329,12 @@ public void updateConnections(List<InetSocketAddress> addrs) throws IOException
attachNodes.add(newMasterNode = attachMemcachedNode(newGroupAddrs.get(0)));

/* move operation old master -> new master */
oldGroup.getMasterNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
taskList.add(new SetupResendTask(oldGroup.getMasterNode(), false, "Discarded all pending reading state operation to move operations."));
taskList.add(new MoveOperationTask(oldGroup.getMasterNode(), newMasterNode));
/* move operation old slave -> new master */
oldGroup.getSlaveNode().setupResend(false, "Discarded all pending reading state operation to move operations.");

/* move operation old slave -> new master
* Slave node have only read operations, then don't need call setupResend
*/
taskList.add(new MoveOperationTask(oldGroup.getSlaveNode(), newMasterNode));
}
}
Expand All @@ -346,7 +350,7 @@ public void updateConnections(List<InetSocketAddress> addrs) throws IOException
attachNodes.add(newMasterNode = attachMemcachedNode(newGroupAddrs.get(0)));

/* move operation old master -> new master */
queueReconnect(oldGroup.getMasterNode(), ReconnDelay.IMMEDIATE, "Discarded all pending reading state operation to move operations.");
taskList.add(new QueueReconnectTask(oldGroup.getMasterNode(), ReconnDelay.IMMEDIATE, "Discarded all pending reading state operation to move operations."));
taskList.add(new MoveOperationTask(oldGroup.getMasterNode(), newMasterNode));
} else {
MemcachedNode newMasterNode;
Expand All @@ -356,7 +360,7 @@ public void updateConnections(List<InetSocketAddress> addrs) throws IOException
attachNodes.add(attachMemcachedNode(newGroupAddrs.get(1)));

/* move operation old master -> new master */
oldGroup.getMasterNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
taskList.add(new SetupResendTask(oldGroup.getMasterNode(), false, "Discarded all pending reading state operation to move operations."));
taskList.add(new MoveOperationTask(oldGroup.getMasterNode(), newMasterNode));
}
} else { /* Old group has both a master node and a slave node. */
Expand All @@ -369,26 +373,33 @@ public void updateConnections(List<InetSocketAddress> addrs) throws IOException
removeNodes.add(oldGroup.getSlaveNode());
attachNodes.add(newSlaveNode = attachMemcachedNode(newGroupAddrs.get(1)));

/* move operation old slave -> new slave */
oldGroup.getSlaveNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
/* move operation old slave -> new slave
* Slave node have only read operations, then don't need call setupResend
*/
taskList.add(new MoveOperationTask(oldGroup.getSlaveNode(), newSlaveNode));
}
} else if (newGroupAddrs.get(0).getIPPort().equals(oldSlaveAddr.getIPPort())) {
if (newGroupAddrs.get(1).getIPPort().equals(oldMasterAddr.getIPPort())) {
/* Switchover */
changeRoleGroups.add(oldGroup);

/* move operation master -> slave */
queueReconnect(oldGroup.getMasterNode(), ReconnDelay.IMMEDIATE, "Discarded all pending reading state operation to move operations.");
/* move operation master -> slave
* must keep the following execution order when switchover
* - first moveOperations
* - second, queueReconnect
*
* because moves all operations
*/
taskList.add(new MoveOperationTask(oldGroup.getMasterNode(), oldGroup.getSlaveNode()));
taskList.add(new QueueReconnectTask(oldGroup.getMasterNode(), ReconnDelay.IMMEDIATE, "Discarded all pending reading state operation to move operations."));
} else {
/* Failover. And, new slave has appeared */
removeNodes.add(oldGroup.getMasterNode());
changeRoleGroups.add(oldGroup);
attachNodes.add(attachMemcachedNode(newGroupAddrs.get(1)));

/* move operation old master -> old slave */
oldGroup.getMasterNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
taskList.add(new SetupResendTask(oldGroup.getMasterNode(), false, "Discarded all pending reading state operation to move operations."));
taskList.add(new MoveOperationTask(oldGroup.getMasterNode(), oldGroup.getSlaveNode()));
}
} else {
Expand All @@ -401,11 +412,12 @@ public void updateConnections(List<InetSocketAddress> addrs) throws IOException
attachNodes.add(newMasterNode = attachMemcachedNode(newGroupAddrs.get(0)));

/* move operation old master -> new master */
queueReconnect(oldGroup.getMasterNode(), ReconnDelay.IMMEDIATE, "Discarded all pending reading state operation to move operations.");
taskList.add(new QueueReconnectTask(oldGroup.getMasterNode(), ReconnDelay.IMMEDIATE, "Discarded all pending reading state operation to move operations."));
taskList.add(new MoveOperationTask(oldGroup.getMasterNode(), newMasterNode));

/* move operation old slave -> old master(slave) */
oldGroup.getSlaveNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
/* move operation old slave -> old master(slave)
* Slave node have only read operations, then don't need call setupResend
*/
taskList.add(new MoveOperationTask(oldGroup.getSlaveNode(), oldGroup.getMasterNode()));
} else if (newGroupAddrs.get(1).getIPPort().equals(oldSlaveAddr.getIPPort())) {
MemcachedNode newMasterNode;
Expand All @@ -414,7 +426,7 @@ public void updateConnections(List<InetSocketAddress> addrs) throws IOException
attachNodes.add(newMasterNode = attachMemcachedNode(newGroupAddrs.get(0)));

/* move operation old master -> new master */
oldGroup.getMasterNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
taskList.add(new SetupResendTask(oldGroup.getMasterNode(), false, "Discarded all pending reading state operation to move operations."));
taskList.add(new MoveOperationTask(oldGroup.getMasterNode(), newMasterNode));
} else {
MemcachedNode newMasterNode;
Expand All @@ -426,11 +438,12 @@ public void updateConnections(List<InetSocketAddress> addrs) throws IOException
attachNodes.add(newSlaveNode = attachMemcachedNode(newGroupAddrs.get(1)));

/* move operation old master -> new master */
oldGroup.getMasterNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
taskList.add(new SetupResendTask(oldGroup.getMasterNode(), false, "Discarded all pending reading state operation to move operations."));
taskList.add(new MoveOperationTask(oldGroup.getMasterNode(), newMasterNode));

/* move operation old slave -> new slave */
oldGroup.getSlaveNode().setupResend(false, "Discarded all pending reading state operation to move operations.");
/* move operation old slave -> new slave
* Slave node have only read operations, then don't need call setupResend
*/
taskList.add(new MoveOperationTask(oldGroup.getSlaveNode(), newSlaveNode));
}
}
Expand All @@ -449,11 +462,12 @@ public void updateConnections(List<InetSocketAddress> addrs) throws IOException
attachNodes.add(attachMemcachedNode(newGroupAddrs.get(1)));
}
}

// Update the hash.
((ArcusReplKetamaNodeLocator)locator).update(attachNodes, removeNodes, changeRoleGroups);
for (MoveOperationTask task : taskList)
task.moveOperations();
// do task after locator update
for (Task t : taskList)
t.doTask();
} else {
for (MemcachedNode node : locator.getAll()) {
if (addrs.contains((InetSocketAddress) node.getSocketAddress())) {
Expand Down Expand Up @@ -1249,17 +1263,53 @@ public int getAddedQueueSize() {
return addedQueue.size();
}
/* ENABLE_REPLICATION if */

private interface Task {
void doTask();
}

private class SetupResendTask implements Task {
private MemcachedNode node;
private boolean cancelWrite;
private String cause;

public SetupResendTask(MemcachedNode node, boolean cancelWrite, String cause) {
this.node = node;
this.cancelWrite = cancelWrite;
this.cause = cause;
}

public void doTask() {
node.setupResend(cancelWrite, cause);
}
}

private class QueueReconnectTask implements Task {
private MemcachedNode node;
private ReconnDelay delay;
private String cause;

public QueueReconnectTask(MemcachedNode node, ReconnDelay delay, String cause) {
this.node = node;
this.delay = delay;
this.cause = cause;
}

public void doTask() {
queueReconnect(node, delay, cause);
}
}

private class MoveOperationTask {
MemcachedNode fromNode;
MemcachedNode toNode;
private class MoveOperationTask implements Task {
private MemcachedNode fromNode;
private MemcachedNode toNode;

public MoveOperationTask(MemcachedNode from, MemcachedNode to) {
fromNode = from;
toNode = to;
}

public void moveOperations() {
public void doTask() {
if (fromNode.moveOperations(toNode) > 0)
addedQueue.offer(toNode);
}
Expand Down

0 comments on commit 2a61d7e

Please sign in to comment.