Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REFACTOR: methods in MemcachedConnection invoked with updateReplConnection. #829

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 7 additions & 61 deletions src/main/java/net/spy/memcached/MemcachedConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -367,38 +366,6 @@ private void updateConnections(List<InetSocketAddress> addrs) throws IOException
}

/* ENABLE_REPLICATION if */
private Set<String> findChangedGroups(List<InetSocketAddress> addrs,
Collection<MemcachedNode> nodes) {
Map<String, InetSocketAddress> addrMap = new HashMap<>();
for (InetSocketAddress each : addrs) {
addrMap.put(each.toString(), each);
}

Set<String> changedGroupSet = new HashSet<>();
for (MemcachedNode node : nodes) {
String nodeAddr = ((InetSocketAddress) node.getSocketAddress()).toString();
if (addrMap.remove(nodeAddr) == null) { // removed node
changedGroupSet.add(node.getReplicaGroup().getGroupName());
}
}
for (String addr : addrMap.keySet()) { // newly added node
ArcusReplNodeAddress a = (ArcusReplNodeAddress) addrMap.get(addr);
changedGroupSet.add(a.getGroupName());
}
return changedGroupSet;
}

private List<InetSocketAddress> findAddrsOfChangedGroups(List<InetSocketAddress> addrs,
Set<String> changedGroups) {
List<InetSocketAddress> changedGroupAddrs = new ArrayList<>();
for (InetSocketAddress addr : addrs) {
if (changedGroups.contains(((ArcusReplNodeAddress) addr).getGroupName())) {
changedGroupAddrs.add(addr);
}
}
return changedGroupAddrs;
}

private void updateReplConnections(List<InetSocketAddress> addrs) throws IOException {
List<MemcachedNode> attachNodes = new ArrayList<>();
List<MemcachedNode> removeNodes = new ArrayList<>();
Expand All @@ -416,10 +383,11 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep
* we find out the changed groups with the comparison of previous and current znode list,
* and update the state of groups based on them.
*/
Set<String> changedGroups = findChangedGroups(addrs, locator.getAll());
Set<String> changedGroups = MemcachedReplicaGroup.findChangedGroups(addrs, locator.getAll());

Map<String, List<ArcusReplNodeAddress>> newAllGroups =
ArcusReplNodeAddress.makeGroupAddrsList(findAddrsOfChangedGroups(addrs, changedGroups));
ArcusReplNodeAddress.makeGroupAddrsList(
MemcachedReplicaGroup.findAddrsOfChangedGroups(addrs, changedGroups));

// remove invalidated groups in changedGroups
for (Map.Entry<String, List<ArcusReplNodeAddress>> entry : newAllGroups.entrySet()) {
Expand Down Expand Up @@ -467,8 +435,10 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep
assert oldMasterAddr != null : "invalid old rgroup";
assert newMasterAddr != null : "invalid new rgroup";

Set<ArcusReplNodeAddress> oldSlaveAddrs = getAddrsFromNodes(oldSlaveNodes);
Set<ArcusReplNodeAddress> newSlaveAddrs = getSlaveAddrsFromGroupAddrs(newGroupAddrs);
Set<ArcusReplNodeAddress> oldSlaveAddrs
= MemcachedReplicaGroup.getAddrsFromNodes(oldSlaveNodes);
Set<ArcusReplNodeAddress> newSlaveAddrs
= MemcachedReplicaGroup.getSlaveAddrsFromGroupAddrs(newGroupAddrs);

if (oldMasterAddr.isSameAddress(newMasterAddr)) {
// add newly added slave node
Expand Down Expand Up @@ -560,30 +530,6 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep
// Remove the unavailable nodes.
handleNodesToRemove(removeNodes);
}

private Set<ArcusReplNodeAddress> getAddrsFromNodes(List<MemcachedNode> nodes) {
Set<ArcusReplNodeAddress> addrs = Collections.emptySet();
if (!nodes.isEmpty()) {
addrs = new HashSet<>((int) (nodes.size() / .75f) + 1);
for (MemcachedNode node : nodes) {
addrs.add((ArcusReplNodeAddress) node.getSocketAddress());
}
}
return addrs;
}

private Set<ArcusReplNodeAddress> getSlaveAddrsFromGroupAddrs(
List<ArcusReplNodeAddress> groupAddrs) {
Set<ArcusReplNodeAddress> slaveAddrs = Collections.emptySet();
int groupSize = groupAddrs.size();
if (groupSize > 1) {
slaveAddrs = new HashSet<>((int) ((groupSize - 1) / .75f) + 1);
for (int i = 1; i < groupSize; i++) {
slaveAddrs.add(groupAddrs.get(i));
}
}
return slaveAddrs;
}
/* ENABLE_REPLICATION end */

/* ENABLE_REPLICATION if */
Expand Down
48 changes: 48 additions & 0 deletions src/main/java/net/spy/memcached/MemcachedReplicaGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@
/* ENABLE_REPLICATION if */
package net.spy.memcached;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import net.spy.memcached.compat.SpyObject;

Expand Down Expand Up @@ -190,5 +196,47 @@ private MemcachedNode getNextActiveSlaveNodeNoRotate() {
public static String getGroupNameFromNode(final MemcachedNode node) {
return ((ArcusReplNodeAddress) node.getSocketAddress()).getGroupName();
}

static Set<String> findChangedGroups(List<InetSocketAddress> newAddrs,
Collection<MemcachedNode> oldNodes) {
Set<String> changedGroupSet = new HashSet<>();
Map<String, InetSocketAddress> addrMap = newAddrs.stream()
.collect(Collectors.toMap(InetSocketAddress::toString, addr -> addr));

for (MemcachedNode node : oldNodes) {
if (addrMap.remove(node.getSocketAddress().toString()) == null) {
changedGroupSet.add(node.getReplicaGroup().getGroupName());
}
}

addrMap.values().stream()
.map(addr -> ((ArcusReplNodeAddress) addr).getGroupName())
.forEach(changedGroupSet::add);

return changedGroupSet;
}

static List<InetSocketAddress> findAddrsOfChangedGroups(List<InetSocketAddress> newAddrs,
Set<String> changedGroups) {
List<InetSocketAddress> changedGroupAddrs = new ArrayList<>();
newAddrs.stream()
.filter(addr -> changedGroups.contains(((ArcusReplNodeAddress) addr).getGroupName()))
.forEach(changedGroupAddrs::add);
return changedGroupAddrs;
}

static Set<ArcusReplNodeAddress> getAddrsFromNodes(List<MemcachedNode> nodes) {
return nodes.stream()
.map(node -> (ArcusReplNodeAddress) node.getSocketAddress())
.collect(Collectors.toSet());
}

static Set<ArcusReplNodeAddress> getSlaveAddrsFromGroupAddrs(
List<ArcusReplNodeAddress> groupAddrs) {
if (groupAddrs.size() <= 1) {
return Collections.emptySet();
}
return new HashSet<>(groupAddrs.subList(1, groupAddrs.size()));
}
}
/* ENABLE_REPLICATION end */
76 changes: 76 additions & 0 deletions src/test/java/net/spy/memcached/MemcachedReplicaGroupTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package net.spy.memcached;


import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;


class MemcachedReplicaGroupTest {

@Test
void findChangedGroupsTest() {
List<ArcusReplNodeAddress> g0 = createReplList("g0", "192.168.0.1");
List<ArcusReplNodeAddress> g1 = createReplList("g1", "192.168.0.2");
List<MemcachedNode> old = new ArrayList<>();
setReplGroup(g0, old);
setReplGroup(g1, old);

List<InetSocketAddress> update = new ArrayList<>(g0);

Set<String> changedGroups = MemcachedReplicaGroup.findChangedGroups(update, old);
Assertions.assertEquals(1, changedGroups.size());
Assertions.assertTrue(changedGroups.contains("g1"));
}

@Test
void findAddrsOfChangedGroupsTest() {
List<ArcusReplNodeAddress> g0 = createReplList("g0", "192.168.0.1");
List<ArcusReplNodeAddress> g1 = createReplList("g1", "192.168.0.2");
List<MemcachedNode> old = new ArrayList<>();
setReplGroup(g0, old);
setReplGroup(g1, old);

List<InetSocketAddress> update = new ArrayList<>();
update.addAll(g0.subList(0, 2));
update.addAll(g1.subList(0, 2));

Set<String> changedGroups = MemcachedReplicaGroup.findChangedGroups(update, old);
List<InetSocketAddress> result
= MemcachedReplicaGroup.findAddrsOfChangedGroups(update, changedGroups);

Assertions.assertEquals(4, result.size());
Assertions.assertTrue(result.contains(g0.get(0)));
Assertions.assertTrue(result.contains(g0.get(1)));
Assertions.assertTrue(result.contains(g1.get(0)));
Assertions.assertTrue(result.contains(g1.get(1)));
}

private void setReplGroup(List<ArcusReplNodeAddress> group, List<MemcachedNode> old) {
List<MockMemcachedNode> collect = group.stream()
.map(MockMemcachedNode::new)
.collect(Collectors.toList());
MemcachedReplicaGroupImpl impl = null;
for (MockMemcachedNode node : collect) {
if (impl == null) {
impl = new MemcachedReplicaGroupImpl(node);
} else {
node.setReplicaGroup(impl);
}
}
old.addAll(collect);
}

private List<ArcusReplNodeAddress> createReplList(String group, String ip) {
List<ArcusReplNodeAddress> replList = new ArrayList<>();
replList.add(ArcusReplNodeAddress.create(group, true, ip + ":" + 11211));
replList.add(ArcusReplNodeAddress.create(group, false, ip + ":" + (11211 + 1)));
replList.add(ArcusReplNodeAddress.create(group, false, ip + ":" + (11211 + 2)));
return replList;
}
}
6 changes: 3 additions & 3 deletions src/test/java/net/spy/memcached/MockMemcachedNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

public class MockMemcachedNode implements MemcachedNode {
private final InetSocketAddress socketAddress;
private MemcachedReplicaGroup memcachedReplicaGroup;

public SocketAddress getSocketAddress() {
return socketAddress;
Expand Down Expand Up @@ -260,13 +261,12 @@ public String getOpQueueStatus() {

@Override
public void setReplicaGroup(MemcachedReplicaGroup g) {
// noop
this.memcachedReplicaGroup = g;
}

@Override
public MemcachedReplicaGroup getReplicaGroup() {
// noop
return null;
return memcachedReplicaGroup;
}

@Override
Expand Down