Skip to content

Commit

Permalink
Use value object for topology caching.
Browse files Browse the repository at this point in the history
We now use a value object for caching the topology to avoid races in updating the cache timestamp.

Also, we set the cache timestamp after obtaining the topology to avoid that I/O latency expires the topology cache.

Closes: #2986
Original Pull Request: #2989
  • Loading branch information
mp911de authored and christophstrobl committed Sep 11, 2024
1 parent 297ee41 commit 1514461
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -805,13 +805,11 @@ public void returnResourceForSpecificNode(RedisClusterNode node, Object client)
*/
public static class JedisClusterTopologyProvider implements ClusterTopologyProvider {

private long time = 0;
private final JedisCluster cluster;

private final long cacheTimeMs;

private @Nullable ClusterTopology cached;

private final JedisCluster cluster;
private volatile @Nullable JedisClusterTopology cached;

/**
* Create new {@link JedisClusterTopologyProvider}. Uses a default cache timeout of 100 milliseconds.
Expand Down Expand Up @@ -842,12 +840,12 @@ public JedisClusterTopologyProvider(JedisCluster cluster, Duration cacheTimeout)
@Override
public ClusterTopology getTopology() {

if (cached != null && shouldUseCachedValue()) {
return cached;
JedisClusterTopology topology = cached;
if (shouldUseCachedValue(topology)) {
return topology;
}

Map<String, Exception> errors = new LinkedHashMap<>();

List<Entry<String, ConnectionPool>> list = new ArrayList<>(cluster.getClusterNodes().entrySet());

Collections.shuffle(list);
Expand All @@ -856,13 +854,10 @@ public ClusterTopology getTopology() {

try (Connection connection = entry.getValue().getResource()) {

time = System.currentTimeMillis();

Set<RedisClusterNode> nodes = Converters.toSetOfRedisClusterNodes(new Jedis(connection).clusterNodes());

cached = new ClusterTopology(nodes);

return cached;
topology = cached = new JedisClusterTopology(nodes, System.currentTimeMillis());
return topology;

} catch (Exception ex) {
errors.put(entry.getKey(), ex);
Expand All @@ -887,9 +882,38 @@ public ClusterTopology getTopology() {
* topology.
* @see #JedisClusterTopologyProvider(JedisCluster, Duration)
* @since 2.2
* @deprecated since 3.3.4, use {@link #shouldUseCachedValue(JedisClusterTopology)} instead.
*/
@Deprecated(since = "3.3.4")
protected boolean shouldUseCachedValue() {
return time + cacheTimeMs > System.currentTimeMillis();
return false;
}

/**
* Returns whether {@link #getTopology()} should return the cached {@link JedisClusterTopology}. Uses a time-based
* caching.
*
* @return {@literal true} to use the cached {@link ClusterTopology}; {@literal false} to fetch a new cluster
* topology.
* @see #JedisClusterTopologyProvider(JedisCluster, Duration)
* @since 3.3.4
*/
protected boolean shouldUseCachedValue(@Nullable JedisClusterTopology topology) {
return topology != null && topology.getTime() + cacheTimeMs > System.currentTimeMillis();
}
}

protected static class JedisClusterTopology extends ClusterTopology {

private final long time;

public JedisClusterTopology(Set<RedisClusterNode> nodes, long time) {
super(nodes);
this.time = time;
}

public long getTime() {
return time;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;

import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.domain.Range.Bound;
Expand All @@ -53,6 +54,7 @@
import org.springframework.data.redis.connection.BitFieldSubCommands;
import org.springframework.data.redis.connection.ClusterConnectionTests;
import org.springframework.data.redis.connection.ClusterSlotHashUtil;
import org.springframework.data.redis.connection.ClusterTopology;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.connection.DefaultSortParameters;
import org.springframework.data.redis.connection.Limit;
Expand All @@ -75,6 +77,7 @@
import org.springframework.data.redis.test.condition.EnabledOnRedisClusterAvailable;
import org.springframework.data.redis.test.extension.JedisExtension;
import org.springframework.data.redis.test.util.HexStringUtils;
import org.springframework.test.util.ReflectionTestUtils;

/**
* @author Christoph Strobl
Expand Down Expand Up @@ -2950,4 +2953,20 @@ void lPosNonExisting() {

assertThat(result).isEmpty();
}

@Test // GH-2986
void shouldUseCachedTopology() {

JedisClusterConnection.JedisClusterTopologyProvider provider = (JedisClusterConnection.JedisClusterTopologyProvider) clusterConnection
.getTopologyProvider();
ReflectionTestUtils.setField(provider, "cached", null);

ClusterTopology topology = provider.getTopology();
assertThat(topology).isInstanceOf(JedisClusterConnection.JedisClusterTopology.class);

assertThat(provider.shouldUseCachedValue(null)).isFalse();
assertThat(provider.shouldUseCachedValue(new JedisClusterConnection.JedisClusterTopology(Set.of(), 0))).isFalse();
assertThat(provider.shouldUseCachedValue(
new JedisClusterConnection.JedisClusterTopology(Set.of(), System.currentTimeMillis() + 100))).isTrue();
}
}

0 comments on commit 1514461

Please sign in to comment.