Skip to content

Commit e972c21

Browse files
authored
[automatic failover] Replace 'CircuitBreaker' with 'Cluster' for 'CircuitBreakerFailoverBase.clusterFailover' (#4275)
* - replace CircuitBreaker with Cluster for CircuitBreakerFailoverBase.clusterFailover - improve thread safety with provider initialization * - formatting
1 parent beb5e14 commit e972c21

File tree

4 files changed

+31
-28
lines changed

4 files changed

+31
-28
lines changed

src/main/java/redis/clients/jedis/mcf/CircuitBreakerCommandExecutor.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public <T> T executeCommand(CommandObject<T> commandObject) {
3838
supplier.withCircuitBreaker(cluster.getCircuitBreaker());
3939
supplier.withRetry(cluster.getRetry());
4040
supplier.withFallback(provider.getFallbackExceptionList(),
41-
e -> this.handleClusterFailover(commandObject, cluster.getCircuitBreaker()));
41+
e -> this.handleClusterFailover(commandObject, cluster));
4242

4343
return supplier.decorate().get();
4444
}
@@ -73,10 +73,9 @@ private boolean isActiveCluster(Cluster cluster) {
7373
* Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker
7474
* failure scenarios
7575
*/
76-
private <T> T handleClusterFailover(CommandObject<T> commandObject,
77-
CircuitBreaker circuitBreaker) {
76+
private <T> T handleClusterFailover(CommandObject<T> commandObject, Cluster cluster) {
7877

79-
clusterFailover(circuitBreaker);
78+
clusterFailover(cluster);
8079

8180
// Recursive call to the initiating method so the operation can be retried on the next cluster
8281
// connection

src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,10 @@ public void close() {
3838
* Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker
3939
* failure scenarios
4040
*/
41-
protected void clusterFailover(CircuitBreaker circuitBreaker) {
41+
protected void clusterFailover(Cluster cluster) {
4242
lock.lock();
4343

44+
CircuitBreaker circuitBreaker = cluster.getCircuitBreaker();
4445
try {
4546
// Check state to handle race conditions since iterateActiveCluster() is
4647
// non-idempotent
@@ -52,19 +53,17 @@ protected void clusterFailover(CircuitBreaker circuitBreaker) {
5253

5354
Cluster activeCluster = provider.getCluster();
5455
// This should be possible only if active cluster is switched from by other reasons than
55-
// circuit
56-
// breaker, just before circuit breaker triggers
57-
if (activeCluster.getCircuitBreaker() != circuitBreaker) {
56+
// circuit breaker, just before circuit breaker triggers
57+
if (activeCluster != cluster) {
5858
return;
5959
}
6060

61-
activeCluster.setGracePeriod();
61+
cluster.setGracePeriod();
6262
circuitBreaker.transitionToForcedOpenState();
6363

6464
// Iterating the active cluster will allow subsequent calls to the executeCommand() to use
6565
// the next
6666
// cluster's connection pool - according to the configuration's prioritization/order/weight
67-
// int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex1();
6867
provider.iterateActiveCluster(SwitchReason.CIRCUIT_BREAKER);
6968
}
7069
// this check relies on the fact that many failover attempts can hit with the same CB,
@@ -73,13 +72,12 @@ protected void clusterFailover(CircuitBreaker circuitBreaker) {
7372
// different than
7473
// active CB. If its the same one and there are no more clusters to failover to, then throw an
7574
// exception
76-
else if (circuitBreaker == provider.getCluster().getCircuitBreaker()
77-
&& !provider.canIterateOnceMore()) {
78-
throw new JedisConnectionException(
79-
"Cluster/database endpoint could not failover since the MultiClusterClientConfig was not "
80-
+ "provided with an additional cluster/database endpoint according to its prioritized sequence. "
81-
+ "If applicable, consider failing back OR restarting with an available cluster/database endpoint");
82-
}
75+
else if (cluster == provider.getCluster() && !provider.canIterateOnceMore()) {
76+
throw new JedisConnectionException(
77+
"Cluster/database endpoint could not failover since the MultiClusterClientConfig was not "
78+
+ "provided with an additional cluster/database endpoint according to its prioritized sequence. "
79+
+ "If applicable, consider failing back OR restarting with an available cluster/database endpoint");
80+
}
8381
// Ignore exceptions since we are already in a failure state
8482
} finally {
8583
lock.unlock();

src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverConnectionProvider.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public Connection getConnection() {
3131
supplier.withRetry(cluster.getRetry());
3232
supplier.withCircuitBreaker(cluster.getCircuitBreaker());
3333
supplier.withFallback(provider.getFallbackExceptionList(),
34-
e -> this.handleClusterFailover(cluster.getCircuitBreaker()));
34+
e -> this.handleClusterFailover(cluster));
3535

3636
return supplier.decorate().get();
3737
}
@@ -49,9 +49,9 @@ private Connection handleGetConnection(Cluster cluster) {
4949
* Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker
5050
* failure scenarios
5151
*/
52-
private Connection handleClusterFailover(CircuitBreaker circuitBreaker) {
52+
private Connection handleClusterFailover(Cluster cluster) {
5353

54-
clusterFailover(circuitBreaker);
54+
clusterFailover(cluster);
5555

5656
// Recursive call to the initiating method so the operation can be retried on the next cluster
5757
// connection

src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public class MultiClusterPooledConnectionProvider implements ConnectionProvider
8080
*/
8181
private volatile Cluster activeCluster;
8282

83-
private final Lock activeClusterIndexLock = new ReentrantLock(true);
83+
private final Lock activeClusterChangeLock = new ReentrantLock(true);
8484

8585
/**
8686
* Functional interface for listening to cluster switch events. The event args contain the reason
@@ -183,7 +183,13 @@ public MultiClusterPooledConnectionProvider(MultiClusterClientConfig multiCluste
183183
// Mark initialization as complete - handleHealthStatusChange can now process events
184184
initializationComplete = true;
185185
if (!activeCluster.isHealthy()) {
186-
activeCluster = waitForInitialHealthyCluster(statusTracker);
186+
// Race condition: Direct assignment to 'activeCluster' is not thread safe because
187+
// 'onHealthStatusChange' may execute concurrently once 'initializationComplete'
188+
// is set to true.
189+
// Simple rule is to never assign value of 'activeCluster' outside of
190+
// 'activeClusterChangeLock' once the 'initializationComplete' is done.
191+
waitForInitialHealthyCluster(statusTracker);
192+
iterateActiveCluster(SwitchReason.HEALTH_CHECK);
187193
}
188194
this.fallbackExceptionList = multiClusterClientConfig.getFallbackExceptionList();
189195

@@ -211,11 +217,11 @@ public void add(ClusterConfig clusterConfig) {
211217
"Endpoint " + endpoint + " already exists in the provider");
212218
}
213219

214-
activeClusterIndexLock.lock();
220+
activeClusterChangeLock.lock();
215221
try {
216222
addClusterInternal(multiClusterClientConfig, clusterConfig);
217223
} finally {
218-
activeClusterIndexLock.unlock();
224+
activeClusterChangeLock.unlock();
219225
}
220226
}
221227

@@ -240,7 +246,7 @@ public void remove(Endpoint endpoint) {
240246
}
241247
log.debug("Removing endpoint {}", endpoint);
242248

243-
activeClusterIndexLock.lock();
249+
activeClusterChangeLock.lock();
244250
try {
245251
Cluster clusterToRemove = multiClusterMap.get(endpoint);
246252
boolean isActiveCluster = (activeCluster == clusterToRemove);
@@ -273,7 +279,7 @@ public void remove(Endpoint endpoint) {
273279
clusterToRemove.close();
274280
}
275281
} finally {
276-
activeClusterIndexLock.unlock();
282+
activeClusterChangeLock.unlock();
277283
}
278284
}
279285

@@ -542,7 +548,7 @@ private boolean setActiveCluster(Cluster cluster, boolean validateConnection) {
542548
// Cluster cluster = clusterEntry.getValue();
543549
// Field-level synchronization is used to avoid the edge case in which
544550
// incrementActiveMultiClusterIndex() is called at the same time
545-
activeClusterIndexLock.lock();
551+
activeClusterChangeLock.lock();
546552
Cluster oldCluster;
547553
try {
548554

@@ -563,7 +569,7 @@ private boolean setActiveCluster(Cluster cluster, boolean validateConnection) {
563569
oldCluster = activeCluster;
564570
activeCluster = cluster;
565571
} finally {
566-
activeClusterIndexLock.unlock();
572+
activeClusterChangeLock.unlock();
567573
}
568574
boolean switched = oldCluster != cluster;
569575
if (switched && this.multiClusterClientConfig.isFastFailover()) {

0 commit comments

Comments
 (0)