Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
8a9f876
- weighted cluster seleciton
atakavci Jun 27, 2025
d514ecf
- add builder for ClusterConfig
atakavci Jul 9, 2025
df66b1e
- fix naming
atakavci Jul 9, 2025
13757f5
clean up and mark override methods
atakavci Jul 10, 2025
ef5d83a
fix link in javadoc
atakavci Jul 10, 2025
a15fc64
fix formatting
atakavci Jul 10, 2025
cf38240
- fix double registered listeners in healtstatusmgr
atakavci Jul 14, 2025
c2fb34c
Update src/main/java/redis/clients/jedis/mcf/EchoStrategy.java
atakavci Jul 16, 2025
ade866d
- add remove endpoints
atakavci Jul 11, 2025
ca3378d
- replace cluster disabled with failbackCandidate
atakavci Jul 15, 2025
ddcec73
- remove failback candidate
atakavci Jul 16, 2025
c1b6d5f
- fix remove logic
atakavci Jul 16, 2025
ff16330
- periodic failback checks
atakavci Jul 17, 2025
c39fda1
- introduce StatusTracker with purpose of waiting initial healthcheck…
atakavci Jul 19, 2025
975ab78
- introduce forceActiveCluster by duration
atakavci Jul 19, 2025
405101e
- fix failing tests by waiting on clusters to get healthy
atakavci Jul 23, 2025
607c66d
- fix failing scenario test
atakavci Jul 23, 2025
aaac8f7
- adressing reviews and feedback
atakavci Jul 23, 2025
2ffffef
- fix formatting
atakavci Jul 23, 2025
e6e1121
- fix formatting
atakavci Jul 23, 2025
b8d4e87
- get rid of the queue and event ordering for healthstatus change in …
atakavci Jul 24, 2025
1ae7219
- replace use of reflection with helper methods
atakavci Jul 24, 2025
397f437
- introduce clusterSwitchEvent and drop clusterFailover post processor
atakavci Jul 31, 2025
ab05e6c
- introduce fastfailover using objectMaker injection into connection…
atakavci Jul 31, 2025
de034f4
- polish
atakavci Jul 31, 2025
df3d555
- cleanup
atakavci Jul 31, 2025
3352260
- improve healtcheck thread visibility
atakavci Jul 31, 2025
812979a
- introduce TrackingConnectionPool with FailFastConnectionFactory
atakavci Aug 4, 2025
0ad3bbe
- return broken source as usual
atakavci Aug 5, 2025
13cc8db
- unblock waiting threads
atakavci Aug 5, 2025
1c2b549
- failover by closing the pool
atakavci Aug 6, 2025
21a95a2
- formatting
atakavci Aug 6, 2025
984db94
- check waiters and active/idle connections to force disconnect
atakavci Aug 6, 2025
5350cfc
- add builder to trackingconnectionpool
atakavci Aug 6, 2025
03ac208
- fix failing tests due to mocked ctor for trackingConnectionPool
atakavci Aug 6, 2025
e31f1ca
- replace initTracker with split initializtion of conn
atakavci Aug 8, 2025
7b52502
- refactor on builders and ctors
atakavci Aug 8, 2025
db19340
- undo format
atakavci Aug 8, 2025
3dd83c0
- clena up
atakavci Aug 8, 2025
a4d130d
- add exceptions to logs
atakavci Aug 11, 2025
cc38722
- add max wait duration and minConsecutiveSuccessCount to healthCheck…
atakavci Aug 11, 2025
25c6375
- replace 'sleep' with 'await', feedback from @a-TODO-rov
atakavci Aug 11, 2025
d4909ab
Merge branch 'feature/automatic-failover' into ali/aa-failover-improv…
atakavci Aug 14, 2025
5032ce3
- fix status tracker tests
atakavci Aug 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ tags
redis-git
appendonlydir/
.DS_Store
.vscode/settings.json
54 changes: 51 additions & 3 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,39 @@

public class Connection implements Closeable {

public static class Builder {
private JedisSocketFactory socketFactory;
private JedisClientConfig clientConfig;

public Builder socketFactory(JedisSocketFactory socketFactory) {
this.socketFactory = socketFactory;
return this;
}

public Builder clientConfig(JedisClientConfig clientConfig) {
this.clientConfig = clientConfig;
return this;
}

public JedisSocketFactory getSocketFactory() {
return socketFactory;
}

public JedisClientConfig getClientConfig() {
return clientConfig;
}

public Connection build() {
Connection conn = new Connection(this);
conn.initializeFromClientConfig();
return conn;
}
}

public static Builder builder() {
return new Builder();
}

private ConnectionPool memberOf;
protected RedisProtocol protocol;
private final JedisSocketFactory socketFactory;
Expand All @@ -48,6 +81,7 @@ public class Connection implements Closeable {
protected String version;
private AtomicReference<RedisCredentials> currentCredentials = new AtomicReference<>(null);
private AuthXManager authXManager;
private JedisClientConfig clientConfig;

public Connection() {
this(Protocol.DEFAULT_HOST, Protocol.DEFAULT_PORT);
Expand All @@ -67,16 +101,19 @@ public Connection(final HostAndPort hostAndPort, final JedisClientConfig clientC

public Connection(final JedisSocketFactory socketFactory) {
this.socketFactory = socketFactory;
this.authXManager = null;
}

public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig) {
this.socketFactory = socketFactory;
this.soTimeout = clientConfig.getSocketTimeoutMillis();
this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
this.clientConfig = clientConfig;
initializeFromClientConfig(clientConfig);
}

protected Connection(Builder builder) {
this.socketFactory = builder.getSocketFactory();
this.clientConfig = builder.getClientConfig();
}

@Override
public String toString() {
return getClass().getSimpleName() + "{" + socketFactory + "}";
Expand Down Expand Up @@ -288,6 +325,10 @@ public void disconnect() {
}
}

public void forceDisconnect() throws IOException {
socket.close();
}

public boolean isConnected() {
return socket != null && socket.isBound() && !socket.isClosed() && socket.isConnected()
&& !socket.isInputShutdown() && !socket.isOutputShutdown();
Expand Down Expand Up @@ -450,8 +491,15 @@ private static boolean validateClientInfo(String info) {
return true;
}

public void initializeFromClientConfig() {
this.initializeFromClientConfig(clientConfig);
}

protected void initializeFromClientConfig(final JedisClientConfig config) {
try {
this.soTimeout = config.getSocketTimeoutMillis();
this.infiniteSoTimeout = config.getBlockingSocketTimeoutMillis();

connect();

protocol = config.getRedisProtocol();
Expand Down
127 changes: 104 additions & 23 deletions src/main/java/redis/clients/jedis/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,56 +21,137 @@
*/
public class ConnectionFactory implements PooledObjectFactory<Connection> {

public static class Builder {
private JedisClientConfig clientConfig;
private Connection.Builder connectionBuilder;
private JedisSocketFactory jedisSocketFactory;
private Cache cache;
private HostAndPort hostAndPort;

// Fluent API methods (preferred)
public Builder clientConfig(JedisClientConfig clientConfig) {
this.clientConfig = clientConfig;
return this;
}

public Builder connectionBuilder(Connection.Builder connectionBuilder) {
this.connectionBuilder = connectionBuilder;
return this;
}

public Builder socketFactory(JedisSocketFactory jedisSocketFactory) {
this.jedisSocketFactory = jedisSocketFactory;
return this;
}

public Builder cache(Cache cache) {
this.cache = cache;
return this;
}

public Builder hostAndPort(HostAndPort hostAndPort) {
this.hostAndPort = hostAndPort;
return this;
}

public Connection.Builder getConnectionBuilder() {
return connectionBuilder;
}

public JedisSocketFactory getJedisSocketFactory() {
return jedisSocketFactory;
}

public JedisClientConfig getClientConfig() {
return clientConfig;
}

public Cache getCache() {
return cache;
}

public ConnectionFactory build() {
withDefaults();
return new ConnectionFactory(this);
}

private Builder withDefaults() {
if (jedisSocketFactory == null) {
this.jedisSocketFactory = createDefaultSocketFactory();
}
if (connectionBuilder == null) {
this.connectionBuilder = createDefaultConnectionBuilder();
}
return this;
}

private JedisSocketFactory createDefaultSocketFactory() {
if (clientConfig == null) {
clientConfig = DefaultJedisClientConfig.builder().build();
}
if (hostAndPort == null) {
throw new IllegalStateException("HostAndPort is required when no socketFactory is provided");
}
return new DefaultJedisSocketFactory(hostAndPort, clientConfig);
}

private Connection.Builder createDefaultConnectionBuilder() {
Connection.Builder connBuilder = cache == null ? Connection.builder() : CacheConnection.builder(cache);
connBuilder.socketFactory(jedisSocketFactory).clientConfig(clientConfig);
return connBuilder;
}
}

public static Builder builder() {
return new Builder();
}

private static final Logger logger = LoggerFactory.getLogger(ConnectionFactory.class);

private final JedisSocketFactory jedisSocketFactory;
private final JedisClientConfig clientConfig;
private final Cache clientSideCache;
private final Supplier<Connection> objectMaker;
private Supplier<Connection> objectMaker;
private Connection.Builder connectionBuilder;

private final AuthXEventListener authXEventListener;
private AuthXEventListener authXEventListener;

public ConnectionFactory(final HostAndPort hostAndPort) {
this(hostAndPort, DefaultJedisClientConfig.builder().build(), null);
this(builder().hostAndPort(hostAndPort).withDefaults());
}

public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig) {
this(hostAndPort, clientConfig, null);
this(builder().hostAndPort(hostAndPort).clientConfig(clientConfig).withDefaults());
}

@Experimental
public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig,
Cache csCache) {
this(new DefaultJedisSocketFactory(hostAndPort, clientConfig), clientConfig, csCache);
public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, Cache csCache) {
this(builder().hostAndPort(hostAndPort).clientConfig(clientConfig).cache(csCache).withDefaults());
}

public ConnectionFactory(final JedisSocketFactory jedisSocketFactory,
final JedisClientConfig clientConfig) {
this(jedisSocketFactory, clientConfig, null);
public ConnectionFactory(final JedisSocketFactory jedisSocketFactory, final JedisClientConfig clientConfig) {
this(builder().socketFactory(jedisSocketFactory).clientConfig(clientConfig).withDefaults());
}

private ConnectionFactory(final JedisSocketFactory jedisSocketFactory,
final JedisClientConfig clientConfig, Cache csCache) {
public ConnectionFactory(Builder builder) {
this.clientConfig = builder.getClientConfig();
this.connectionBuilder = builder.getConnectionBuilder();

this.jedisSocketFactory = jedisSocketFactory;
this.clientSideCache = csCache;
this.clientConfig = clientConfig;
initAuthXManager();
}

private void initAuthXManager() {
AuthXManager authXManager = clientConfig.getAuthXManager();
if (authXManager == null) {
this.objectMaker = connectionSupplier();
this.objectMaker = () -> build();
this.authXEventListener = AuthXEventListener.NOOP_LISTENER;
} else {
Supplier<Connection> supplier = connectionSupplier();
this.objectMaker = () -> (Connection) authXManager.addConnection(supplier.get());
this.objectMaker = () -> (Connection) authXManager.addConnection(build());
this.authXEventListener = authXManager.getListener();
authXManager.start();
}
}

private Supplier<Connection> connectionSupplier() {
return clientSideCache == null ? () -> new Connection(jedisSocketFactory, clientConfig)
: () -> new CacheConnection(jedisSocketFactory, clientConfig, clientSideCache);
private Connection build() {
return connectionBuilder.build();
}

@Override
Expand Down
57 changes: 34 additions & 23 deletions src/main/java/redis/clients/jedis/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.authentication.core.Token;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.authentication.AuthXManager;
import redis.clients.jedis.csc.Cache;
Expand All @@ -13,25 +14,32 @@ public class ConnectionPool extends Pool<Connection> {

private AuthXManager authXManager;

// Primary constructors using factory
public ConnectionPool(PooledObjectFactory<Connection> factory) {
super(factory);
}

public ConnectionPool(PooledObjectFactory<Connection> factory,
GenericObjectPoolConfig<Connection> poolConfig) {
super(factory, poolConfig);
}

// Convenience constructors
public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
this(new ConnectionFactory(hostAndPort, clientConfig));
attachAuthenticationListener(clientConfig.getAuthXManager());
}

@Experimental
public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig,
Cache clientSideCache) {
this(new ConnectionFactory(hostAndPort, clientConfig, clientSideCache));
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ConnectionFactory(hostAndPort, clientConfig), poolConfig);
attachAuthenticationListener(clientConfig.getAuthXManager());
}

public ConnectionPool(PooledObjectFactory<Connection> factory) {
super(factory);
}

@Experimental
public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ConnectionFactory(hostAndPort, clientConfig), poolConfig);
Cache clientSideCache) {
this(new ConnectionFactory(hostAndPort, clientConfig, clientSideCache));
attachAuthenticationListener(clientConfig.getAuthXManager());
}

Expand All @@ -42,11 +50,6 @@ public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig,
attachAuthenticationListener(clientConfig.getAuthXManager());
}

public ConnectionPool(PooledObjectFactory<Connection> factory,
GenericObjectPoolConfig<Connection> poolConfig) {
super(factory, poolConfig);
}

@Override
public Connection getResource() {
Connection conn = super.getResource();
Expand All @@ -65,17 +68,25 @@ public void close() {
}
}

private void attachAuthenticationListener(AuthXManager authXManager) {
protected void attachAuthenticationListener(AuthXManager authXManager) {
this.authXManager = authXManager;
if (authXManager != null) {
authXManager.addPostAuthenticationHook(token -> {
try {
// this is to trigger validations on each connection via ConnectionFactory
evict();
} catch (Exception e) {
throw new JedisException("Failed to evict connections from pool", e);
}
});
authXManager.addPostAuthenticationHook(this::postAuthentication);
}
}

protected void detachAuthenticationListener() {
if (authXManager != null) {
authXManager.removePostAuthenticationHook(this::postAuthentication);
}
}

private void postAuthentication(Token token) {
try {
// this is to trigger validations on each connection via ConnectionFactory
evict();
} catch (Exception e) {
throw new JedisException("Failed to evict connections from pool", e);
}
}
}
Loading
Loading