Skip to content

Commit

Permalink
Support readonly command on replicas node
Browse files Browse the repository at this point in the history
  • Loading branch information
jjz921024 committed Aug 8, 2024
1 parent e14b899 commit 2d33705
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 5 deletions.
5 changes: 5 additions & 0 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,11 @@ private void initializeFromClientConfig(final JedisClientConfig config) {
}
}

// set readonly flag to ALL connections (including master nodes) when enable read from replica
if (config.isReadOnlyForReplica()) {
fireAndForgetMsg.add(new CommandArguments(Command.READONLY));
}

for (CommandArguments arg : fireAndForgetMsg) {
sendCommand(arg);
}
Expand Down
25 changes: 21 additions & 4 deletions src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ public final class DefaultJedisClientConfig implements JedisClientConfig {

private final ClientSetInfoConfig clientSetInfoConfig;

private final boolean readOnlyForReplica;

private DefaultJedisClientConfig(RedisProtocol protocol, int connectionTimeoutMillis, int soTimeoutMillis,
int blockingSocketTimeoutMillis, Supplier<RedisCredentials> credentialsProvider, int database,
String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
HostnameVerifier hostnameVerifier, HostAndPortMapper hostAndPortMapper,
ClientSetInfoConfig clientSetInfoConfig) {
ClientSetInfoConfig clientSetInfoConfig, boolean readOnlyForReplica) {
this.redisProtocol = protocol;
this.connectionTimeoutMillis = connectionTimeoutMillis;
this.socketTimeoutMillis = soTimeoutMillis;
Expand All @@ -44,6 +46,7 @@ private DefaultJedisClientConfig(RedisProtocol protocol, int connectionTimeoutMi
this.hostnameVerifier = hostnameVerifier;
this.hostAndPortMapper = hostAndPortMapper;
this.clientSetInfoConfig = clientSetInfoConfig;
this.readOnlyForReplica = readOnlyForReplica;
}

@Override
Expand Down Expand Up @@ -122,6 +125,11 @@ public ClientSetInfoConfig getClientSetInfoConfig() {
return clientSetInfoConfig;
}

@Override
public boolean isReadOnlyForReplica() {
return readOnlyForReplica;
}

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -149,6 +157,8 @@ public static class Builder {

private ClientSetInfoConfig clientSetInfoConfig = ClientSetInfoConfig.DEFAULT;

private boolean readOnlyForReplicas = false;

private Builder() {
}

Expand All @@ -160,7 +170,8 @@ public DefaultJedisClientConfig build() {

return new DefaultJedisClientConfig(redisProtocol, connectionTimeoutMillis, socketTimeoutMillis,
blockingSocketTimeoutMillis, credentialsProvider, database, clientName, ssl,
sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, clientSetInfoConfig);
sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, clientSetInfoConfig,
readOnlyForReplicas);
}

/**
Expand Down Expand Up @@ -255,6 +266,11 @@ public Builder clientSetInfoConfig(ClientSetInfoConfig setInfoConfig) {
this.clientSetInfoConfig = setInfoConfig;
return this;
}

public Builder readOnlyForReplicas() {
this.readOnlyForReplicas = true;
return this;
}
}

public static DefaultJedisClientConfig create(int connectionTimeoutMillis, int soTimeoutMillis,
Expand All @@ -264,7 +280,8 @@ public static DefaultJedisClientConfig create(int connectionTimeoutMillis, int s
return new DefaultJedisClientConfig(null,
connectionTimeoutMillis, soTimeoutMillis, blockingSocketTimeoutMillis,
new DefaultRedisCredentialsProvider(new DefaultRedisCredentials(user, password)), database,
clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, null);
clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, null,
false);
}

public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) {
Expand All @@ -273,6 +290,6 @@ public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) {
copy.getBlockingSocketTimeoutMillis(), copy.getCredentialsProvider(),
copy.getDatabase(), copy.getClientName(), copy.isSsl(), copy.getSslSocketFactory(),
copy.getSslParameters(), copy.getHostnameVerifier(), copy.getHostAndPortMapper(),
copy.getClientSetInfoConfig());
copy.getClientSetInfoConfig(), copy.isReadOnlyForReplica());
}
}
4 changes: 4 additions & 0 deletions src/main/java/redis/clients/jedis/JedisClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ default HostAndPortMapper getHostAndPortMapper() {
return null;
}

default boolean isReadOnlyForReplica() {
return false;
}

/**
* Modify the behavior of internally executing CLIENT SETINFO command.
* @return CLIENT SETINFO config
Expand Down
38 changes: 38 additions & 0 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class JedisClusterInfoCache {
private final Map<String, ConnectionPool> nodes = new HashMap<>();
private final ConnectionPool[] slots = new ConnectionPool[Protocol.CLUSTER_HASHSLOTS];
private final HostAndPort[] slotNodes = new HostAndPort[Protocol.CLUSTER_HASHSLOTS];
private List<ConnectionPool>[] replicaSlots;
private List<HostAndPort>[] replicaSlotNodes;

private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
Expand Down Expand Up @@ -85,6 +87,10 @@ public JedisClusterInfoCache(final JedisClientConfig clientConfig,
topologyRefreshExecutor.scheduleWithFixedDelay(new TopologyRefreshTask(), topologyRefreshPeriod.toMillis(),
topologyRefreshPeriod.toMillis(), TimeUnit.MILLISECONDS);
}
if (clientConfig.isReadOnlyForReplica()) {
replicaSlots = new ArrayList[Protocol.CLUSTER_HASHSLOTS];
replicaSlotNodes = new ArrayList[Protocol.CLUSTER_HASHSLOTS];
}
}

/**
Expand Down Expand Up @@ -144,6 +150,8 @@ public void discoverClusterNodesAndSlots(Connection jedis) {
setupNodeIfNotExist(targetNode);
if (i == MASTER_NODE_INDEX) {
assignSlotsToNode(slotNums, targetNode);
} else if (clientConfig.isReadOnlyForReplica()) {
assignSlotsToReplicaNode(slotNums, targetNode);
}
}
}
Expand Down Expand Up @@ -236,6 +244,8 @@ private void discoverClusterSlots(Connection jedis) {
setupNodeIfNotExist(targetNode);
if (i == MASTER_NODE_INDEX) {
assignSlotsToNode(slotNums, targetNode);
} else if (clientConfig.isReadOnlyForReplica()) {
assignSlotsToReplicaNode(slotNums, targetNode);
}
}
}
Expand Down Expand Up @@ -307,6 +317,25 @@ public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode)
}
}

public void assignSlotsToReplicaNode(List<Integer> targetSlots, HostAndPort targetNode) {
w.lock();
try {
ConnectionPool targetPool = setupNodeIfNotExist(targetNode);
for (Integer slot : targetSlots) {
if (replicaSlots[slot] == null) {
replicaSlots[slot] = new ArrayList<>();
}
replicaSlots[slot].add(targetPool);
if (replicaSlotNodes[slot] == null) {
replicaSlotNodes[slot] = new ArrayList<>();
}
replicaSlotNodes[slot].add(targetNode);
}
} finally {
w.unlock();
}
}

public ConnectionPool getNode(String nodeKey) {
r.lock();
try {
Expand Down Expand Up @@ -338,6 +367,15 @@ public HostAndPort getSlotNode(int slot) {
}
}

public List<ConnectionPool> getSlotReplicaPools(int slot) {
r.lock();
try {
return replicaSlots[slot];
} finally {
r.unlock();
}
}

public Map<String, ConnectionPool> getNodes() {
r.lock();
try {
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ public final <T> T executeCommand(CommandObject<T> commandObject) {
return executor.executeCommand(commandObject);
}

public final <T> T executeCommandToReplica(CommandObject<T> commandObject) {
return executor.executeCommandToReplica(commandObject);
}

public final <T> T broadcastCommand(CommandObject<T> commandObject) {
return executor.broadcastCommand(commandObject);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ public final <T> T broadcastCommand(CommandObject<T> commandObject) {

@Override
public final <T> T executeCommand(CommandObject<T> commandObject) {
return doExecuteCommand(commandObject, false);
}

@Override
public final <T> T executeCommandToReplica(CommandObject<T> commandObject) {
return doExecuteCommand(commandObject, true);
}

private <T> T doExecuteCommand(CommandObject<T> commandObject, boolean toReplica) {
Instant deadline = Instant.now().plus(maxTotalRetriesDuration);

JedisRedirectionException redirect = null;
Expand All @@ -88,7 +97,8 @@ public final <T> T executeCommand(CommandObject<T> commandObject) {
connection.executeCommand(Protocol.Command.ASKING);
}
} else {
connection = provider.getConnection(commandObject.getArguments());
connection = toReplica ? provider.getReplicaConnection(commandObject.getArguments())
: provider.getConnection(commandObject.getArguments());
}

return execute(connection, commandObject);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ public interface CommandExecutor extends AutoCloseable {

<T> T executeCommand(CommandObject<T> commandObject);

default <T> T executeCommandToReplica(CommandObject<T> commandObject) {
return executeCommand(commandObject);
}

default <T> T broadcastCommand(CommandObject<T> commandObject) {
return executeCommand(commandObject);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.ClusterCommandArguments;
Expand Down Expand Up @@ -102,6 +104,11 @@ public Connection getConnection(CommandArguments args) {
return slot >= 0 ? getConnectionFromSlot(slot) : getConnection();
}

public Connection getReplicaConnection(CommandArguments args) {
final int slot = ((ClusterCommandArguments) args).getCommandHashSlot();
return slot >= 0 ? getReplicaConnectionFromSlot(slot) : getConnection();
}

@Override
public Connection getConnection() {
// In antirez's redis-rb-cluster implementation, getRandomConnection always
Expand Down Expand Up @@ -158,6 +165,25 @@ public Connection getConnectionFromSlot(int slot) {
}
}

public Connection getReplicaConnectionFromSlot(int slot) {
List<ConnectionPool> connectionPools = cache.getSlotReplicaPools(slot);
ThreadLocalRandom random = ThreadLocalRandom.current();
if (connectionPools != null && !connectionPools.isEmpty()) {
// pick up randomly a connection
int idx = random.nextInt(connectionPools.size());
return connectionPools.get(idx).getResource();
}

renewSlotCache();
connectionPools = cache.getSlotReplicaPools(slot);
if (connectionPools != null && !connectionPools.isEmpty()) {
int idx = random.nextInt(connectionPools.size());
return connectionPools.get(idx).getResource();
}

return getConnectionFromSlot(slot);
}

@Override
public Map<String, ConnectionPool> getConnectionMap() {
return Collections.unmodifiableMap(getNodes());
Expand Down

0 comments on commit 2d33705

Please sign in to comment.