@@ -60,40 +60,6 @@ public class FailoverIntegrationTest {
6060 private static String JEDIS2_ID = "" ;
6161 private MultiClusterPooledConnectionProvider provider ;
6262 private UnifiedJedis failoverClient ;
63-
64- /**
65- * Creates a MultiClusterPooledConnectionProvider with standard configuration
66- * @return A configured provider
67- */
68- private MultiClusterPooledConnectionProvider createProvider () {
69- JedisClientConfig clientConfig = DefaultJedisClientConfig .builder ()
70- .socketTimeoutMillis (RecommendedSettings .DEFAULT_TIMEOUT_MS )
71- .connectionTimeoutMillis (RecommendedSettings .DEFAULT_TIMEOUT_MS ).build ();
72-
73- MultiClusterClientConfig failoverConfig = new MultiClusterClientConfig .Builder (
74- getClusterConfigs (clientConfig , endpoint1 , endpoint2 )).retryMaxAttempts (1 )
75- .retryWaitDuration (1 ).circuitBreakerSlidingWindowType (COUNT_BASED )
76- .circuitBreakerSlidingWindowSize (1 ).circuitBreakerFailureRateThreshold (100 )
77- .circuitBreakerSlidingWindowMinCalls (1 ).build ();
78-
79- return new MultiClusterPooledConnectionProvider (failoverConfig );
80- }
81-
82- /**
83- * Creates a UnifiedJedis client with customizable failover options
84- * @param provider The connection provider to use
85- * @param optionsCustomizer A function that customizes the failover options (can be null for defaults)
86- * @return A configured failover client
87- */
88- private UnifiedJedis createClient (MultiClusterPooledConnectionProvider provider ,
89- Function <FailoverOptions .Builder , FailoverOptions .Builder > optionsCustomizer ) {
90- FailoverOptions .Builder builder = FailoverOptions .builder ();
91- if (optionsCustomizer != null ) {
92- builder = optionsCustomizer .apply (builder );
93- }
94-
95- return new UnifiedJedis (provider , builder .build ());
96- }
9763
9864 @ BeforeAll
9965 public static void setupAdminClients () throws IOException {
@@ -119,33 +85,6 @@ public static void cleanupAdminClients() throws IOException {
11985 executor .shutdown ();
12086 }
12187
122- private static String getNodeId (UnifiedJedis client ) {
123-
124- return getNodeId (client .info ("server" ));
125- }
126-
127- private static String getNodeId (String info ) {
128-
129- Matcher m = pattern .matcher (info );
130- if (m .find ()) {
131- return m .group (1 );
132- }
133- return null ;
134- }
135-
136- /**
137- * Generates a string of a specific byte size.
138- * @param byteSize The desired size in bytes
139- * @return A string of the specified byte size
140- */
141- private static String generateTestValue (int byteSize ) {
142- StringBuilder value = new StringBuilder (byteSize );
143- for (int i = 0 ; i < byteSize ; i ++) {
144- value .append ('x' );
145- }
146- return value .toString ();
147- }
148-
14988 @ BeforeEach
15089 public void setup () throws IOException {
15190 tp .getProxies ().forEach (proxy -> {
@@ -184,12 +123,14 @@ public void cleanup() throws IOException {
184123
185124 /**
186125 * Tests the automatic failover behavior when a Redis server becomes unavailable. This test
187- * verifies: 1. Initial connection to the first Redis server works correctly 2. When the first
188- * server is disabled, the first command throws a JedisConnectionException 3. The circuit breaker
189- * for the first endpoint transitions to OPEN state 4. Subsequent commands are automatically
190- * routed to the second available endpoint 5. When the second server is also disabled, all
191- * commands fail with JedisConnectionException 6. The circuit breaker for the second endpoint also
192- * transitions to OPEN state
126+ * verifies:
127+ * <ol>
128+ * <li>Initial connection to the first Redis server works correctly</li>
129+ * <li>Disable access, the first command throws</li>
130+ * <li>Command failure is propagated to the caller</li>
131+ * <li>CB transitions to OPEN, failover is initiated and following commands are sent to the next endpoint</li>
132+ * <li>Second server is also disabled, all commands fail with JedisConnectionException and error is propagated to the caller</li>
133+ * </ol>
193134 */
194135 @ Test
195136 public void testAutomaticFailoverWhenServerBecomesUnavailable () throws Exception {
@@ -199,27 +140,26 @@ public void testAutomaticFailoverWhenServerBecomesUnavailable() throws Exception
199140 redisProxy1 .disable ();
200141
201142 // Endpoint 1 not available
202- // 1. First call should should throw JedisConnectionException and trigger failover
203- // 1.1. Endpoint1 CB transitions to open
204- // 2 . Subsequent calls should be routed to Endpoint2
143+ // 1. First call should throw JedisConnectionException and trigger failover
144+ // 2. Endpoint 1 CB transitions to OPEN
145+ // 3 . Subsequent calls should be routed to Endpoint 2
205146 assertThrows (JedisConnectionException .class , () -> failoverClient .info ("server" ));
206147
207- // Check that the circuit breaker for Endpoint 1 is open
208148 assertThat (provider .getCluster (1 ).getCircuitBreaker ().getState (),
209- equalTo (CircuitBreaker .State .OPEN ));
149+ equalTo (CircuitBreaker .State .OPEN ));
210150
211151 // Check that the failoverClient is now using Endpoint 2
212152 assertThat (getNodeId (failoverClient .info ("server" )), equalTo (JEDIS2_ID ));
213153
214- // Disable also second proxy
154+ // Disable second access to Endpoint 2
215155 redisProxy2 .disable ();
216156
217- // Endpoint1 and Endpoint2 are not available,
157+ // Endpoint1 and Endpoint2 are NOT available,
218158 assertThrows (JedisConnectionException .class , () -> failoverClient .info ("server" ));
219159 assertThat (provider .getCluster (2 ).getCircuitBreaker ().getState (),
220- equalTo (CircuitBreaker .State .OPEN ));
160+ equalTo (CircuitBreaker .State .OPEN ));
221161
222- // and since no other nodes are available, it should throw an exception for subsequent calls
162+ // and since no other nodes are available, it should propagate the errors to the caller subsequent calls
223163 assertThrows (JedisConnectionException .class , () -> failoverClient .info ("server" ));
224164 }
225165
@@ -246,46 +186,44 @@ public void testManualFailoverInflightCommandsCompleteGracefully()
246186 throws ExecutionException , InterruptedException {
247187
248188 assertThat (getNodeId (failoverClient .info ("server" )), equalTo (JEDIS1_ID ));
249- Future <List <String >> blpop = executor .submit (() -> {
250- try {
251- // This command will block until a value is pushed to the list or timeout occurs
252- // We will trigger failover while this command is blocking
253- return failoverClient .blpop (1000 , "test-list" );
254- } catch (Exception e ) {
255- throw new RuntimeException (e );
256- }
257- });
189+
190+ // We will trigger failover while this command is in-flight
191+ Future <List <String >> blpop = executor .submit (() -> failoverClient .blpop (1000 , "test-list" ));
258192
259193 provider .setActiveMultiClusterIndex (2 );
260194
261- // new command should be executed against the new endpoint
195+ // After the manual failover, commands should be executed against Endpoint 2
262196 assertThat (getNodeId (failoverClient .info ("server" )), equalTo (JEDIS2_ID ));
263- //Since failover was manually triggered and there were no errors
264- // previous endpoint CB should be still in CLOSED
197+
198+ // Failover was manually triggered, and there were no errors
199+ // previous endpoint CB should still be in CLOSED state
265200 assertThat (provider .getCluster (1 ).getCircuitBreaker ().getState (),
266- equalTo (CircuitBreaker .State .CLOSED ));
201+ equalTo (CircuitBreaker .State .CLOSED ));
267202
268203 jedis1 .rpush ("test-list" , "somevalue" );
269204
270205 assertThat (blpop .get (), equalTo (Arrays .asList ("test-list" , "somevalue" )));
271206 }
272207
273208 /**
274- * Verify that in-flight commands during manual failover fail gracefully with an error will
275- * propagate the error to the caller and will toggle CB to OPEN state.
209+ * Verify that in-flight commands that complete with error during manual failover will
210+ * propagate the error to the caller and toggle CB to OPEN state.
276211 */
277212 @ Test
278213 public void testManualFailoverInflightCommandsWithErrorsPropagateError () throws Exception {
279214 assertThat (getNodeId (failoverClient .info ("server" )), equalTo (JEDIS1_ID ));
280215
216+ // We will trigger failover while this command is in-flight
281217 Future <List <String >> blpop = executor .submit (() -> failoverClient .blpop (10000 , "test-list-1" ));
282218
283219 // trigger failover manually
284220 provider .setActiveMultiClusterIndex (2 );
285221 Future <String > infoCmd = executor .submit (() -> failoverClient .info ("server" ));
286- // new command should be executed against the new endpoint
222+
223+ // After the manual failover, commands should be executed against Endpoint 2
287224 assertThat (getNodeId (infoCmd .get ()), equalTo (JEDIS2_ID ));
288- // Disable redisProxy1 to simulate an error
225+
226+ // Disable redisProxy1 to drop active connections and trigger an error
289227 redisProxy1 .disable ();
290228
291229 // previously submitted command should fail with JedisConnectionException
@@ -294,9 +232,9 @@ public void testManualFailoverInflightCommandsWithErrorsPropagateError() throws
294232
295233 // Check that the circuit breaker for Endpoint 1 is open after the error
296234 assertThat (provider .getCluster (1 ).getCircuitBreaker ().getState (),
297- equalTo (CircuitBreaker .State .OPEN ));
235+ equalTo (CircuitBreaker .State .OPEN ));
298236
299- // Ensure that active cluster is still Endpoint 2
237+ // Ensure that the active cluster is still Endpoint 2
300238 assertThat (getNodeId (failoverClient .info ("server" )), equalTo (JEDIS2_ID ));
301239 }
302240
@@ -305,68 +243,130 @@ public void testManualFailoverInflightCommandsWithErrorsPropagateError() throws
305243 */
306244 @ Test
307245 public void testInflightCommandsAreRetriedAfterFailover () throws Exception {
308- // Create a custom provider and client with retry enabled for this specific test
246+
309247 MultiClusterPooledConnectionProvider customProvider = createProvider ();
310-
311- try (UnifiedJedis customClient = createClient (customProvider ,
312- builder -> builder .retryFailedInflightCommands (true ))) {
313-
248+
249+ // Create a custom client with retryFailedInflightCommands enabled for this specific test
250+ try (UnifiedJedis customClient = createClient (customProvider ,
251+ builder -> builder .retryFailedInflightCommands (true ))) {
252+
314253 assertThat (getNodeId (customClient .info ("server" )), equalTo (JEDIS1_ID ));
315- Future <List <String >> blpop = executor .submit (() -> {
316- try {
317- // This command will block until a value is pushed to the list or timeout occurs
318- // We will trigger failover while this command is blocking
319- return customClient .blpop (10000 , "test-list-1" );
320- } catch (Exception e ) {
321- throw new RuntimeException (e );
322- }
323- });
324254
325- // info command will return more than 100 bytes so we simulate connection dropped
326- redisProxy1 .toxics ().limitData ("simulate-socket-failure" , ToxicDirection .UPSTREAM , 100 );
327- assertThrows (JedisConnectionException .class , () -> customClient .set ("test-key" , generateTestValue (150 )));
255+ // We will trigger failover while this command is in-flight
256+ Future <List <String >> blpop = executor .submit (() -> customClient .blpop (10000 , "test-list-1" ));
328257
329- // Disable redisProxy1 to enforce current blpop command failure
330- redisProxy1 .disable ();
258+
259+ // Simulate error by sending more than 100 bytes. This causes the connection close, and triggers
260+ // failover
261+ redisProxy1 .toxics ().limitData ("simulate-socket-failure" , ToxicDirection .UPSTREAM , 100 );
262+ assertThrows (JedisConnectionException .class ,
263+ () -> customClient .set ("test-key" , generateTestValue (150 )));
331264
332265 // Check that the circuit breaker for Endpoint 1 is open
333266 assertThat (customProvider .getCluster (1 ).getCircuitBreaker ().getState (),
334- equalTo (CircuitBreaker .State .OPEN ));
267+ equalTo (CircuitBreaker .State .OPEN ));
268+
269+ // Disable redisProxy1 to enforce the current blpop command failure
270+ redisProxy1 .disable ();
335271
336272 customClient .rpush ("test-list-1" , "somevalue" );
337273 assertThat (blpop .get (), equalTo (Arrays .asList ("test-list-1" , "somevalue" )));
338274 }
339275 }
340-
276+
341277 /**
342278 * Tests that in-flight commands are not retried after automatic failover when retry is disabled.
343279 */
344280 @ Test
345281 public void testInflightCommandsAreNotRetriedAfterFailover () throws Exception {
346282 // Create a custom provider and client with retry disabled for this specific test
347283 MultiClusterPooledConnectionProvider customProvider = createProvider ();
348-
349- try (UnifiedJedis customClient = createClient (customProvider ,
350- builder -> builder .retryFailedInflightCommands (false ))) {
351-
284+
285+ try (UnifiedJedis customClient = createClient (customProvider ,
286+ builder -> builder .retryFailedInflightCommands (false ))) {
287+
352288 assertThat (getNodeId (customClient .info ("server" )), equalTo (JEDIS1_ID ));
353- Future <List <String >> blpop = executor .submit (() -> customClient .blpop (10000 , "test-list-2" ));
289+ Future <List <String >> blpop = executor .submit (() -> customClient .blpop (1000 , "test-list-2" ));
354290
355- // info command will return more than 100 bytes so we simulate connection dropped
291+ // Simulate error by sending more than 100 bytes. This causes connection close, and triggers
292+ // failover
356293 redisProxy1 .toxics ().limitData ("simulate-socket-failure" , ToxicDirection .UPSTREAM , 100 );
357- assertThrows (JedisConnectionException .class , () -> customClient .set ("test-key" , generateTestValue (150 )));
358-
359- // Disable redisProxy1 to enforce current blpop command failure
360- redisProxy1 .disable ();
294+ assertThrows (JedisConnectionException .class ,
295+ () -> customClient .set ("test-key" , generateTestValue (150 )));
361296
362297 // Check that the circuit breaker for Endpoint 1 is open
363298 assertThat (customProvider .getCluster (1 ).getCircuitBreaker ().getState (),
364- equalTo (CircuitBreaker .State .OPEN ));
299+ equalTo (CircuitBreaker .State .OPEN ));
300+
301+ // Disable redisProxy1 to enforce the current blpop command failure
302+ redisProxy1 .disable ();
365303
366- // The blpop command should fail since retry is disabled
367- customClient . rpush ( "test-list-2" , "somevalue" );
368- ExecutionException exception = assertThrows ( ExecutionException . class , () -> blpop .get (1 , TimeUnit .SECONDS ));
304+ // The in-flight command should fail since the retry is disabled
305+ ExecutionException exception = assertThrows ( ExecutionException . class ,
306+ () -> blpop .get (1 , TimeUnit .SECONDS ));
369307 assertThat (exception .getCause (), instanceOf (JedisConnectionException .class ));
370308 }
371309 }
310+
311+ private static String getNodeId (UnifiedJedis client ) {
312+
313+ return getNodeId (client .info ("server" ));
314+ }
315+
316+ private static String getNodeId (String info ) {
317+
318+ Matcher m = pattern .matcher (info );
319+ if (m .find ()) {
320+ return m .group (1 );
321+ }
322+ return null ;
323+ }
324+
325+ /**
326+ * Generates a string of a specific byte size.
327+ * @param byteSize The desired size in bytes
328+ * @return A string of the specified byte size
329+ */
330+ private static String generateTestValue (int byteSize ) {
331+ StringBuilder value = new StringBuilder (byteSize );
332+ for (int i = 0 ; i < byteSize ; i ++) {
333+ value .append ('x' );
334+ }
335+ return value .toString ();
336+ }
337+
338+ /**
339+ * Creates a MultiClusterPooledConnectionProvider with standard configuration
340+ * @return A configured provider
341+ */
342+ private MultiClusterPooledConnectionProvider createProvider () {
343+ JedisClientConfig clientConfig = DefaultJedisClientConfig .builder ()
344+ .socketTimeoutMillis (RecommendedSettings .DEFAULT_TIMEOUT_MS )
345+ .connectionTimeoutMillis (RecommendedSettings .DEFAULT_TIMEOUT_MS ).build ();
346+
347+ MultiClusterClientConfig failoverConfig = new MultiClusterClientConfig .Builder (
348+ getClusterConfigs (clientConfig , endpoint1 , endpoint2 )).retryMaxAttempts (1 )
349+ .retryWaitDuration (1 ).circuitBreakerSlidingWindowType (COUNT_BASED )
350+ .circuitBreakerSlidingWindowSize (1 ).circuitBreakerFailureRateThreshold (100 )
351+ .circuitBreakerSlidingWindowMinCalls (1 ).build ();
352+
353+ return new MultiClusterPooledConnectionProvider (failoverConfig );
354+ }
355+
356+ /**
357+ * Creates a UnifiedJedis client with customizable failover options
358+ * @param provider The connection provider to use
359+ * @param optionsCustomizer A function that customizes the failover options (can be null for
360+ * defaults)
361+ * @return A configured failover client
362+ */
363+ private UnifiedJedis createClient (MultiClusterPooledConnectionProvider provider ,
364+ Function <FailoverOptions .Builder , FailoverOptions .Builder > optionsCustomizer ) {
365+ FailoverOptions .Builder builder = FailoverOptions .builder ();
366+ if (optionsCustomizer != null ) {
367+ builder = optionsCustomizer .apply (builder );
368+ }
369+
370+ return new UnifiedJedis (provider , builder .build ());
371+ }
372372}
0 commit comments