Skip to content

Commit

Permalink
Avoid lingering connections during topology refresh #1342
Browse files Browse the repository at this point in the history
We now ensure that we don't create duplicate connections during topology refresh. Previously, the set difference algorithm reported differences for items that weren't different and so the dynamic refresh sources setting caused duplicate connections to nodes that were already connected. Since connections are held in a map, the new connection object overwrote the previous one which was left open.
  • Loading branch information
mp911de committed Jul 29, 2020
1 parent 36e31f3 commit 4da1f86
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,17 @@ public Map<RedisURI, Partitions> loadViews(Iterable<RedisURI> seed, Duration con

long commandTimeoutNs = getCommandTimeoutNs(seed);

Connections connections = null;
Connections seedConnections = null;
Connections discoveredConnections = null;
try {
connections = getConnections(seed).get(commandTimeoutNs + connectTimeout.toNanos(), TimeUnit.NANOSECONDS);
seedConnections = getConnections(seed).get(commandTimeoutNs + connectTimeout.toNanos(), TimeUnit.NANOSECONDS);

if (!isEventLoopActive()) {
return Collections.emptyMap();
}

Requests requestedTopology = connections.requestTopology();
Requests requestedClients = connections.requestClients();
Requests requestedTopology = seedConnections.requestTopology();
Requests requestedClients = seedConnections.requestClients();

NodeTopologyViews nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs);

Expand All @@ -91,13 +92,13 @@ public Map<RedisURI, Partitions> loadViews(Iterable<RedisURI> seed, Duration con
Set<RedisURI> discoveredNodes = difference(allKnownUris, toSet(seed));

if (!discoveredNodes.isEmpty()) {
Connections discoveredConnections = getConnections(discoveredNodes).optionalGet(commandTimeoutNs,
discoveredConnections = getConnections(discoveredNodes).optionalGet(commandTimeoutNs,
TimeUnit.NANOSECONDS);
connections = connections.mergeWith(discoveredConnections);
Connections connections = seedConnections.mergeWith(discoveredConnections);

if (isEventLoopActive()) {
requestedTopology = requestedTopology.mergeWith(discoveredConnections.requestTopology());
requestedClients = requestedClients.mergeWith(discoveredConnections.requestClients());
requestedTopology = requestedTopology.mergeWith(connections.requestTopology());
requestedClients = requestedClients.mergeWith(connections.requestClients());

nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs);
}
Expand All @@ -120,9 +121,17 @@ public Map<RedisURI, Partitions> loadViews(Iterable<RedisURI> seed, Duration con
Thread.currentThread().interrupt();
throw new RedisCommandInterruptedException(e);
} finally {
if (connections != null) {
if (seedConnections != null) {
try {
connections.close();
seedConnections.close();
} catch (Exception e) {
logger.debug("Cannot close ClusterTopologyRefresh connections", e);
}
}

if (discoveredConnections != null) {
try {
discoveredConnections.close();
} catch (Exception e) {
logger.debug("Cannot close ClusterTopologyRefresh connections", e);
}
Expand Down Expand Up @@ -329,25 +338,16 @@ public RedisURI getViewedBy(Map<RedisURI, Partitions> map, Partitions partitions
return null;
}

private static <E> Set<E> difference(Set<E> set1, Set<E> set2) {
private static Set<RedisURI> difference(Set<RedisURI> allKnown, Set<RedisURI> seed) {

Set<E> result = new HashSet<>(set1.size());
Set<RedisURI> result = new TreeSet<>(TopologyComparators.RedisURIComparator.INSTANCE);

for (E e1 : set1) {
if (!set2.contains(e1)) {
result.add(e1);
for (RedisURI e : allKnown) {
if (!seed.contains(e)) {
result.add(e);
}
}

List<E> list = new ArrayList<>(set2.size());
for (E e : set2) {
if (!set1.contains(e)) {
list.add(e);
}
}

result.addAll(list);

return result;
}

Expand Down
16 changes: 15 additions & 1 deletion src/main/java/io/lettuce/core/cluster/topology/Connections.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.lettuce.core.protocol.CommandArgs;
import io.lettuce.core.protocol.CommandKeyword;
import io.lettuce.core.protocol.CommandType;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

/**
*
Expand All @@ -36,6 +38,8 @@
*/
class Connections {

private final static InternalLogger LOG = InternalLoggerFactory.getInstance(Connections.class);

private final Map<RedisURI, StatefulRedisConnection<String, String>> connections;

private volatile boolean closed = false;
Expand Down Expand Up @@ -187,7 +191,17 @@ public Connections mergeWith(Connections discoveredConnections) {
synchronized (discoveredConnections.connections) {

result.putAll(this.connections);
result.putAll(discoveredConnections.connections);

for (RedisURI redisURI : discoveredConnections.connections.keySet()) {

StatefulRedisConnection<String, String> existing = result.put(redisURI,
discoveredConnections.connections.get(redisURI));

if (existing != null) {
LOG.error("Duplicate topology refresh connection for " + redisURI);
existing.closeAsync();
}
}
}
}

Expand Down
Loading

0 comments on commit 4da1f86

Please sign in to comment.