Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions src/main/java/redis/clients/jedis/mcf/HealthCheckImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.Endpoint;
import redis.clients.jedis.annots.VisibleForTesting;

public class HealthCheckImpl implements HealthCheck {

Expand All @@ -36,15 +38,15 @@ public HealthStatus getStatus() {
}
}

private static final Logger log = LoggerFactory.getLogger(HealthCheck.class);
private static final Logger log = LoggerFactory.getLogger(HealthCheckImpl.class);

private Endpoint endpoint;
private HealthCheckStrategy strategy;
private AtomicReference<HealthCheckResult> resultRef = new AtomicReference<HealthCheckResult>();
private Consumer<HealthStatusChangeEvent> statusChangeCallback;

private ScheduledExecutorService scheduler;
private ExecutorService executor = Executors.newCachedThreadPool();
private final ScheduledExecutorService scheduler;
private final ExecutorService executor;

HealthCheckImpl(Endpoint endpoint, HealthCheckStrategy strategy,
Consumer<HealthStatusChangeEvent> statusChangeCallback) {
Expand All @@ -54,6 +56,11 @@ public HealthStatus getStatus() {
this.statusChangeCallback = statusChangeCallback;
resultRef.set(new HealthCheckResult(0L, HealthStatus.UNKNOWN));

executor = Executors.newCachedThreadPool(r -> {
Thread t = new Thread(r, "jedis-healthcheck-worker-" + this.endpoint);
t.setDaemon(true);
return t;
});
scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "jedis-healthcheck-" + this.endpoint);
t.setDaemon(true);
Expand Down Expand Up @@ -118,16 +125,28 @@ private void healthCheck() {
}
}

// -> 0 -> Unhealthy
// -> 1 ->
// -> 2 -> Healthy
// -> 1 -> Unhealthy

// just to avoid to replace status with an outdated result from another healthCheck
private void safeUpdate(long owner, HealthStatus status) {
@VisibleForTesting
void safeUpdate(long owner, HealthStatus status) {
HealthCheckResult newResult = new HealthCheckResult(owner, status);
AtomicBoolean wasUpdated = new AtomicBoolean(false);

HealthCheckResult oldResult = resultRef.getAndUpdate(current -> {
if (current.getTimestamp() < owner) {
wasUpdated.set(true);
return newResult;
}
wasUpdated.set(false);
return current;
});
if (oldResult.getStatus() != status) {

if (wasUpdated.get() && oldResult.getStatus() != status) {
log.info("Health status changed for {} from {} to {}", endpoint, oldResult.getStatus(), status);
// notify listeners
notifyListeners(oldResult.getStatus(), status);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;

import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -35,6 +36,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand All @@ -47,6 +49,7 @@ public class ActiveActiveLocalFailoverTest {
private static final EndpointConfig endpoint1 = HostAndPorts.getRedisEndpoint("redis-failover-1");
private static final EndpointConfig endpoint2 = HostAndPorts.getRedisEndpoint("redis-failover-2");
private static final ToxiproxyClient tp = new ToxiproxyClient("localhost", 8474);
public static final int ENDPOINT_PAUSE_TIME = 10000;
private static Proxy redisProxy1;
private static Proxy redisProxy2;

Expand Down Expand Up @@ -81,6 +84,7 @@ public void setup() throws IOException {
throw new RuntimeException(e);
}
});

}

@ParameterizedTest
Expand Down Expand Up @@ -153,6 +157,13 @@ public void accept(ClusterSwitchEventArgs e) {
}
}

// Ensure endpoints are healthy
assertTrue(redisProxy1.isEnabled());
assertTrue(redisProxy2.isEnabled());
ensureEndpointAvailability(endpoint1.getHostAndPort(), config);
ensureEndpointAvailability(endpoint2.getHostAndPort(), config);

// Create the connection provider
MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider(builder.build());
FailoverReporter reporter = new FailoverReporter();
provider.setClusterSwitchListener(reporter);
Expand All @@ -171,6 +182,11 @@ public void accept(ClusterSwitchEventArgs e) {
String cluster2Id = provider.getCluster(endpoint2.getHostAndPort()).getCircuitBreaker().getName();

// Start thread that imitates an application that uses the client
RateLimiterConfig rateLimiterConfig = RateLimiterConfig.custom()
.limitForPeriod(100)
.limitRefreshPeriod(Duration.ofSeconds(1))
.timeoutDuration(Duration.ofSeconds(1)).build();

MultiThreadedFakeApp fakeApp = new MultiThreadedFakeApp(client, (UnifiedJedis c) -> {

long threadId = Thread.currentThread().getId();
Expand Down Expand Up @@ -239,7 +255,7 @@ public void accept(ClusterSwitchEventArgs e) {
}
}
return true;
}, numberOfThreads);
}, numberOfThreads, rateLimiterConfig);
fakeApp.setKeepExecutingForSeconds(30);
Thread t = new Thread(fakeApp);
t.start();
Expand All @@ -248,7 +264,7 @@ public void accept(ClusterSwitchEventArgs e) {

log.info("Triggering issue on endpoint1");
try (Jedis jedis = new Jedis(endpoint1.getHostAndPort(), endpoint1.getClientConfigBuilder().build())) {
jedis.clientPause(20000);
jedis.clientPause(ENDPOINT_PAUSE_TIME);
}

fakeApp.setAction(new TriggerActionResponse(null) {
Expand Down Expand Up @@ -298,4 +314,15 @@ public boolean isCompleted(Duration checkInterval, Duration delayAfter, Duration
client.close();
}

private static void ensureEndpointAvailability(HostAndPort endpoint, JedisClientConfig config) {
await().atMost(Duration.ofSeconds(ENDPOINT_PAUSE_TIME)).until(() -> {
try (Jedis jedis = new Jedis(endpoint, config)) {
return "PONG".equals(jedis.ping());
} catch (Exception e) {
log.info("Waiting for endpoint {} to become available...", endpoint);
return false;
}
});
}

}
36 changes: 35 additions & 1 deletion src/test/java/redis/clients/jedis/mcf/HealthCheckTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

Expand Down Expand Up @@ -146,7 +147,7 @@ void testHealthCheckCollectionClose() {

@Test
void testHealthCheckStatusUpdate() throws InterruptedException {
when(mockStrategy.getInterval()).thenReturn(100);
when(mockStrategy.getInterval()).thenReturn(1);
when(mockStrategy.getTimeout()).thenReturn(50);
when(mockStrategy.doHealthCheck(any(Endpoint.class))).thenReturn(HealthStatus.UNHEALTHY);

Expand All @@ -164,6 +165,39 @@ void testHealthCheckStatusUpdate() throws InterruptedException {
healthCheck.stop();
}

@Test
void testSafeUpdateChecksDoNotTriggerFalseNotifications() {
AtomicInteger notificationCount = new AtomicInteger(0);
Consumer<HealthStatusChangeEvent> callback = event -> notificationCount.incrementAndGet();

HealthCheckImpl healthCheck = new HealthCheckImpl(testEndpoint, mockStrategy, callback);

// Simulate concurrent health checks with different results
healthCheck.safeUpdate(2000, HealthStatus.HEALTHY); // Newer timestamp
healthCheck.safeUpdate(1000, HealthStatus.UNHEALTHY); // Older timestamp (should be ignored)

// Should only have 1 notification (for the first update), not 2
assertEquals(1, notificationCount.get());
assertEquals(HealthStatus.HEALTHY, healthCheck.getStatus());
}

@Test
void testSafeUpdateWithConcurrentResults() {
AtomicInteger notificationCount = new AtomicInteger(0);
Consumer<HealthStatusChangeEvent> callback = event -> notificationCount.incrementAndGet();

HealthCheckImpl healthCheck = new HealthCheckImpl(testEndpoint, mockStrategy, callback);

// Test the exact scenario: newer result first, then older result
healthCheck.safeUpdate(2000, HealthStatus.HEALTHY); // Should update and notify
assertEquals(1, notificationCount.get());
assertEquals(HealthStatus.HEALTHY, healthCheck.getStatus());

healthCheck.safeUpdate(1000, HealthStatus.UNHEALTHY); // Should NOT update or notify
assertEquals(1, notificationCount.get()); // Still 1, no additional notification
assertEquals(HealthStatus.HEALTHY, healthCheck.getStatus()); // Status unchanged
}

@Test
void testHealthCheckStop() {
when(mockStrategy.getInterval()).thenReturn(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;

import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
Expand All @@ -24,15 +23,22 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import static org.hamcrest.MatcherAssert.assertThat;
import static redis.clients.jedis.Protocol.DEFAULT_TIMEOUT;

@Tags({ @Tag("failover"), @Tag("scenario") })
public class ActiveActiveFailoverTest {
private static final Logger log = LoggerFactory.getLogger(ActiveActiveFailoverTest.class);
private static final int NUM_OF_THREADS = 18; //18;
private static final int SOCKET_TIMEOUT_MS = DEFAULT_TIMEOUT;
private static final int CONNECTION_TIMEOUT_MS = DEFAULT_TIMEOUT;
private static final long NETWORK_FAILURE_INTERVAL = 15L;

private static EndpointConfig endpoint;

Expand All @@ -54,8 +60,8 @@ public void testFailover() {
MultiClusterClientConfig.ClusterConfig[] clusterConfig = new MultiClusterClientConfig.ClusterConfig[2];

JedisClientConfig config = endpoint.getClientConfigBuilder()
.socketTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS)
.connectionTimeoutMillis(RecommendedSettings.DEFAULT_TIMEOUT_MS).build();
.socketTimeoutMillis(SOCKET_TIMEOUT_MS)
.connectionTimeoutMillis(CONNECTION_TIMEOUT_MS).build();

clusterConfig[0] = ClusterConfig.builder(endpoint.getHostAndPort(0), config)
.connectionPoolConfig(RecommendedSettings.poolConfig).weight(1.0f).build();
Expand All @@ -71,7 +77,7 @@ public void testFailover() {

builder.failbackSupported(true);
builder.failbackCheckInterval(1000);
builder.gracePeriod(10000);
builder.gracePeriod(2000);

builder.retryWaitDuration(10);
builder.retryMaxAttempts(1);
Expand Down Expand Up @@ -168,7 +174,7 @@ public void accept(ClusterSwitchEventArgs e) {
}
}
return true;
}, 18);
}, NUM_OF_THREADS);
fakeApp.setKeepExecutingForSeconds(30);
Thread t = new Thread(fakeApp);
t.start();
Expand All @@ -185,12 +191,12 @@ public void accept(ClusterSwitchEventArgs e) {

HashMap<String, Object> params = new HashMap<>();
params.put("bdb_id", endpoint.getBdbId());
params.put("delay", 15);
params.put("delay", NETWORK_FAILURE_INTERVAL);

FaultInjectionClient.TriggerActionResponse actionResponse = null;

try {
log.info("Triggering network_failure for ~15 seconds");
log.info("Triggering network_failure for ~{} seconds", NETWORK_FAILURE_INTERVAL);
actionResponse = faultClient.triggerAction("network_failure", params);
} catch (IOException e) {
fail("Fault Injection Server error:" + e.getMessage());
Expand All @@ -205,21 +211,27 @@ public void accept(ClusterSwitchEventArgs e) {
throw new RuntimeException(e);
}

ConnectionPool pool = provider.getCluster(endpoint.getHostAndPort(0)).getConnectionPool();

log.info("First connection pool state: active: {}, idle: {}", pool.getNumActive(), pool.getNumIdle());
ConnectionPool pool1 = provider.getCluster(endpoint.getHostAndPort(0)).getConnectionPool();
ConnectionPool pool2 = provider.getCluster(endpoint.getHostAndPort(1)).getConnectionPool();

await().atMost(Duration.ofSeconds(1)).until(() -> pool1.getNumActive() == 0);
await().atMost(Duration.ofSeconds(1)).until(() -> pool2.getNumActive() == 0);

log.info("Connection pool {}: active: {}, idle: {}", endpoint.getHostAndPort(0), pool1.getNumActive(), pool1.getNumIdle());
log.info("Connection pool {}: active: {}, idle: {}", endpoint.getHostAndPort(1), pool2.getNumActive(), pool2.getNumIdle());
log.info("Failover happened at: {}", reporter.failoverAt);
log.info("Failback happened at: {}", reporter.failbackAt);
log.info("Last failed command at: {}", lastFailedCommandAt.get());
log.info("Failed commands after failover: {}", failedCommandsAfterFailover.get());
Duration fullFailoverTime = Duration.between(reporter.failoverAt, lastFailedCommandAt.get());
log.info("Full failover time: {} s", fullFailoverTime.getSeconds());

assertEquals(0, pool.getNumActive());
assertEquals(0, pool1.getNumActive());
assertTrue(fakeApp.capturedExceptions().isEmpty());
assertTrue(reporter.failoverHappened);
assertTrue(reporter.failbackHappened);
assertThat(fullFailoverTime.getSeconds(), Matchers.greaterThanOrEqualTo(30L));
assertThat( Duration.between(reporter.failoverAt, reporter.failbackAt).getSeconds(), greaterThanOrEqualTo(NETWORK_FAILURE_INTERVAL));

client.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MultiThreadedFakeApp extends FakeApp {
Expand All @@ -22,8 +23,14 @@ public MultiThreadedFakeApp(UnifiedJedis client, FakeApp.ExecutedAction action,

public MultiThreadedFakeApp(UnifiedJedis client, FakeApp.ExecutedAction action, int numThreads, RateLimiterConfig config) {
super(client, action);
this.executorService = Executors.newFixedThreadPool(numThreads);

this.executorService = new ThreadPoolExecutor(
numThreads,
numThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(100000),
new ThreadPoolExecutor.CallerRunsPolicy()
);
if (config != null) {
this.rateLimiter = RateLimiterRegistry.of(config).rateLimiter("fakeAppLimiter");
} else {
Expand Down
Loading