Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,18 +232,6 @@ public UnifiedJedis(ConnectionProvider provider, int maxAttempts, Duration maxTo
this(new RetryableCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration), provider);
}

/**
* Constructor which supports multiple cluster/database endpoints each with their own isolated connection pool.
* <p>
* With this Constructor users can seamlessly failover to Disaster Recovery (DR), Backup, and Active-Active cluster(s)
* by using simple configuration which is passed through from Resilience4j - https://resilience4j.readme.io/docs
* <p>
*/
@Experimental
public UnifiedJedis(MultiDbConnectionProvider provider) {
this(new MultiDbCommandExecutor(provider), provider);
}

/**
* The constructor to use a custom {@link CommandExecutor}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import redis.clients.jedis.EndpointConfig;
import redis.clients.jedis.HostAndPorts;
import redis.clients.jedis.JedisClientConfig;
import redis.clients.jedis.MultiDbClient;
import redis.clients.jedis.MultiDbConfig;
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
Expand Down Expand Up @@ -58,7 +59,7 @@ public class FailoverIntegrationTest {
private static String JEDIS1_ID = "";
private static String JEDIS2_ID = "";
private MultiDbConnectionProvider provider;
private UnifiedJedis failoverClient;
private MultiDbClient failoverClient;

@BeforeAll
public static void setupAdminClients() throws IOException {
Expand Down Expand Up @@ -110,7 +111,7 @@ public void setup() throws IOException {

// Create default provider and client for most tests
provider = createProvider();
failoverClient = new UnifiedJedis(provider);
failoverClient = MultiDbClient.builder().connectionProvider(provider).build();
}

@AfterEach
Expand Down Expand Up @@ -272,7 +273,7 @@ public void testCircuitBreakerCountsEachConnectionErrorSeparately() throws IOExc
.build();

MultiDbConnectionProvider provider = new MultiDbConnectionProvider(failoverConfig);
try (UnifiedJedis client = new UnifiedJedis(provider)) {
try (MultiDbClient client = MultiDbClient.builder().connectionProvider(provider).build()) {
// Verify initial connection to first endpoint
assertThat(getNodeId(client.info("server")), equalTo(JEDIS1_ID));

Expand Down Expand Up @@ -321,7 +322,8 @@ public void testInflightCommandsAreRetriedAfterFailover() throws Exception {
builder -> builder.retryOnFailover(true));

// Create a custom client with retryOnFailover enabled for this specific test
try (UnifiedJedis customClient = new UnifiedJedis(customProvider)) {
try (MultiDbClient customClient = MultiDbClient.builder().connectionProvider(customProvider)
.build()) {

assertThat(getNodeId(customClient.info("server")), equalTo(JEDIS1_ID));
Thread.sleep(1000);
Expand Down Expand Up @@ -362,7 +364,8 @@ public void testInflightCommandsAreNotRetriedAfterFailover() throws Exception {
MultiDbConnectionProvider customProvider = createProvider(
builder -> builder.retryOnFailover(false));

try (UnifiedJedis customClient = new UnifiedJedis(customProvider)) {
try (MultiDbClient customClient = MultiDbClient.builder().connectionProvider(customProvider)
.build()) {

assertThat(getNodeId(customClient.info("server")), equalTo(JEDIS1_ID));
Future<List<String>> blpop = executor.submit(() -> customClient.blpop(500, "test-list-2"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import redis.clients.jedis.scenario.RecommendedSettings;
import redis.clients.jedis.scenario.FaultInjectionClient.TriggerActionResponse;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.util.ClientTestUtil;

import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -153,13 +154,12 @@ public void accept(DatabaseSwitchEvent e) {
ensureEndpointAvailability(endpoint1.getHostAndPort(), config);
ensureEndpointAvailability(endpoint2.getHostAndPort(), config);

// Create the connection provider
MultiDbConnectionProvider provider = new MultiDbConnectionProvider(builder.build());
FailoverReporter reporter = new FailoverReporter();
provider.setDatabaseSwitchListener(reporter);
provider.setActiveDatabase(endpoint1.getHostAndPort());

UnifiedJedis client = new UnifiedJedis(provider);
MultiDbClient multiDbClient = MultiDbClient.builder().multiDbConfig(builder.build())
.databaseSwitchListener(reporter).build();

multiDbClient.setActiveDatabase(endpoint1.getHostAndPort());

AtomicLong retryingThreadsCounter = new AtomicLong(0);
AtomicLong failedCommandsAfterFailover = new AtomicLong(0);
Expand All @@ -169,33 +169,32 @@ public void accept(DatabaseSwitchEvent e) {
AtomicBoolean unexpectedErrors = new AtomicBoolean(false);
AtomicReference<Exception> lastException = new AtomicReference<Exception>();
AtomicLong stopRunningAt = new AtomicLong();
String database2Id = provider.getDatabase(endpoint2.getHostAndPort()).getCircuitBreaker()
.getName();
Endpoint db2Endpoint = endpoint2.getHostAndPort();

// Start thread that imitates an application that uses the client
// Start thread that imitates an application that uses the multiDbClient
RateLimiterConfig rateLimiterConfig = RateLimiterConfig.custom().limitForPeriod(100)
.limitRefreshPeriod(Duration.ofSeconds(1)).timeoutDuration(Duration.ofSeconds(1)).build();

MultiThreadedFakeApp fakeApp = new MultiThreadedFakeApp(client, (UnifiedJedis c) -> {
MultiThreadedFakeApp fakeApp = new MultiThreadedFakeApp(multiDbClient, (UnifiedJedis c) -> {

long threadId = Thread.currentThread().getId();

int attempt = 0;
int maxTries = 500;
int retryingDelay = 5;
String currentDatabaseId = null;
String currentDbKey = null;
while (true) {
try {
if (System.currentTimeMillis() > stopRunningAt.get()) break;
currentDatabaseId = provider.getDatabase().getCircuitBreaker().getName();
currentDbKey = dbKey(multiDbClient.getActiveDatabaseEndpoint());
Map<String, String> executionInfo = new HashMap<String, String>() {
{
put("threadId", String.valueOf(threadId));
put("database", reporter.getCurrentDatabaseName());
}
};

client.xadd("execution_log", StreamEntryID.NEW_ENTRY, executionInfo);
multiDbClient.xadd("execution_log", StreamEntryID.NEW_ENTRY, executionInfo);

if (attempt > 0) {
log.info("Thread {} recovered after {} ms. Threads still not recovered: {}", threadId,
Expand All @@ -204,7 +203,7 @@ public void accept(DatabaseSwitchEvent e) {

break;
} catch (JedisConnectionException e) {
if (database2Id.equals(currentDatabaseId)) {
if (dbKey(db2Endpoint).equals(currentDbKey)) {
break;
}
lastException.set(e);
Expand Down Expand Up @@ -232,7 +231,7 @@ public void accept(DatabaseSwitchEvent e) {
}
if (++attempt == maxTries) throw e;
} catch (Exception e) {
if (database2Id.equals(currentDatabaseId)) {
if (dbKey(db2Endpoint).equals(currentDbKey)) {
break;
}
lastException.set(e);
Expand Down Expand Up @@ -276,6 +275,7 @@ public boolean isCompleted(Duration checkInterval, Duration delayAfter, Duration
}
log.info("Fake app completed");

MultiDbConnectionProvider provider = ClientTestUtil.getConnectionProvider(multiDbClient);
ConnectionPool pool = provider.getDatabase(endpoint1.getHostAndPort()).getConnectionPool();

log.info("First connection pool state: active: {}, idle: {}", pool.getNumActive(),
Expand Down Expand Up @@ -306,7 +306,11 @@ public boolean isCompleted(Duration checkInterval, Duration delayAfter, Duration
}
assertFalse(unexpectedErrors.get());

client.close();
multiDbClient.close();
}

private String dbKey(Endpoint endpoint) {
return endpoint.getHost() + ":" + endpoint.getPort();
}

private static void ensureEndpointAvailability(HostAndPort endpoint, JedisClientConfig config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
import redis.clients.jedis.EndpointConfig;
import redis.clients.jedis.HostAndPorts;
import redis.clients.jedis.JedisClientConfig;
import redis.clients.jedis.MultiDbClient;
import redis.clients.jedis.MultiDbConfig;
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.MultiDbConfig.DatabaseConfig;
import redis.clients.jedis.MultiDbConfig.StrategySupplier;
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.mcf.ProbingPolicy.BuiltIn;
import redis.clients.jedis.scenario.RecommendedSettings;

Expand All @@ -32,8 +33,9 @@ public class HealthCheckIntegrationTest {
@Test
public void testDisableHealthCheck() {
// No health check strategy supplier means health check is disabled
MultiDbConnectionProvider customProvider = getMCCF(null);
try (UnifiedJedis customClient = new UnifiedJedis(customProvider)) {
MultiDbConfig multiDbConfig = getMCCF(null);
try (
MultiDbClient customClient = MultiDbClient.builder().multiDbConfig(multiDbConfig).build()) {
// Verify that the client can connect and execute commands
String result = customClient.ping();
assertEquals("PONG", result);
Expand All @@ -46,8 +48,9 @@ public void testDefaultStrategySupplier() {
MultiDbConfig.StrategySupplier defaultSupplier = (hostAndPort, jedisClientConfig) -> {
return new PingStrategy(hostAndPort, jedisClientConfig);
};
MultiDbConnectionProvider customProvider = getMCCF(defaultSupplier);
try (UnifiedJedis customClient = new UnifiedJedis(customProvider)) {
MultiDbConfig multiDbConfig = getMCCF(defaultSupplier);
try (
MultiDbClient customClient = MultiDbClient.builder().multiDbConfig(multiDbConfig).build()) {
// Verify that the client can connect and execute commands
String result = customClient.ping();
assertEquals("PONG", result);
Expand All @@ -70,15 +73,16 @@ public void testCustomStrategySupplier() {
});
};

MultiDbConnectionProvider customProvider = getMCCF(strategySupplier);
try (UnifiedJedis customClient = new UnifiedJedis(customProvider)) {
MultiDbConfig multiDbConfig = getMCCF(strategySupplier);
try (
MultiDbClient customClient = MultiDbClient.builder().multiDbConfig(multiDbConfig).build()) {
// Verify that the client can connect and execute commands
String result = customClient.ping();
assertEquals("PONG", result);
}
}

private MultiDbConnectionProvider getMCCF(MultiDbConfig.StrategySupplier strategySupplier) {
private MultiDbConfig getMCCF(MultiDbConfig.StrategySupplier strategySupplier) {
Function<DatabaseConfig.Builder, DatabaseConfig.Builder> modifier = builder -> strategySupplier == null
? builder.healthCheckEnabled(false)
: builder.healthCheckStrategySupplier(strategySupplier);
Expand All @@ -88,13 +92,13 @@ private MultiDbConnectionProvider getMCCF(MultiDbConfig.StrategySupplier strateg
.apply(MultiDbConfig.DatabaseConfig.builder(e.getHostAndPort(), clientConfig)).build())
.collect(Collectors.toList());

MultiDbConfig mccf = new MultiDbConfig.Builder(databaseConfigs)
MultiDbConfig multiDbConfig = new MultiDbConfig.Builder(databaseConfigs)
.commandRetry(MultiDbConfig.RetryConfig.builder().maxAttempts(1).waitDuration(1).build())
.failureDetector(MultiDbConfig.CircuitBreakerConfig.builder().slidingWindowSize(1)
.failureRateThreshold(100).build())
.build();

return new MultiDbConnectionProvider(mccf);
return multiDbConfig;
}

// ========== Probe Logic Integration Tests ==========
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void testDatabaseSwitchListener() {
isValidTest.set(true);
});

try (UnifiedJedis jedis = new UnifiedJedis(localProvider)) {
try (MultiDbClient jedis = MultiDbClient.builder().connectionProvider(localProvider).build()) {

// This will fail due to unable to connect and open the circuit which will trigger the post
// processor
Expand Down Expand Up @@ -236,7 +236,7 @@ public void userCommand_firstTemporary_thenPermanent_inOrder() {
.maxNumFailoverAttempts(2)
.commandRetry(MultiDbConfig.RetryConfig.builder().maxAttempts(1).build()).build());

try (UnifiedJedis jedis = new UnifiedJedis(testProvider)) {
try (MultiDbClient jedis = MultiDbClient.builder().connectionProvider(testProvider).build()) {
jedis.get("foo");

// Disable both databases so any attempt to switch results in 'no healthy database' path
Expand Down Expand Up @@ -278,7 +278,7 @@ public void userCommand_connectionExceptions_thenMultipleTemporary_thenPermanent
.build()) {
};

try (UnifiedJedis jedis = new UnifiedJedis(testProvider)) {
try (MultiDbClient jedis = MultiDbClient.builder().connectionProvider(testProvider).build()) {
jedis.get("foo");

// disable most weighted database so that it will fail on initial requests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void pipelineWithSwitch() {
getDatabaseConfigs(clientConfig, hostPortWithFailure, workingEndpoint.getHostAndPort()))
.build());

try (UnifiedJedis client = new UnifiedJedis(provider)) {
try (MultiDbClient client = MultiDbClient.builder().connectionProvider(provider).build()) {
AbstractPipeline pipe = client.pipelined();
pipe.set("pstr", "foobar");
pipe.hset("phash", "foo", "bar");
Expand All @@ -93,7 +93,7 @@ public void transactionWithSwitch() {
getDatabaseConfigs(clientConfig, hostPortWithFailure, workingEndpoint.getHostAndPort()))
.build());

try (UnifiedJedis client = new UnifiedJedis(provider)) {
try (MultiDbClient client = MultiDbClient.builder().connectionProvider(provider).build()) {
AbstractTransaction tx = client.multi();
tx.set("tstr", "foobar");
tx.hset("thash", "foo", "bar");
Expand Down Expand Up @@ -125,7 +125,7 @@ public void commandFailoverUnresolvableHost() {
builder.build());
connectionProvider.setDatabaseSwitchListener(failoverReporter);

UnifiedJedis jedis = new UnifiedJedis(connectionProvider);
MultiDbClient jedis = MultiDbClient.builder().connectionProvider(connectionProvider).build();

String key = "hash-" + System.nanoTime();
log.info("Starting calls to Redis");
Expand Down Expand Up @@ -168,7 +168,7 @@ public void commandFailover() {
builder.build());
connectionProvider.setDatabaseSwitchListener(failoverReporter);

UnifiedJedis jedis = new UnifiedJedis(connectionProvider);
MultiDbClient jedis = MultiDbClient.builder().connectionProvider(connectionProvider).build();

String key = "hash-" + System.nanoTime();
log.info("Starting calls to Redis");
Expand Down Expand Up @@ -208,7 +208,7 @@ public void pipelineFailover() {
builder.build());
cacheProvider.setDatabaseSwitchListener(failoverReporter);

UnifiedJedis jedis = new UnifiedJedis(cacheProvider);
MultiDbClient jedis = MultiDbClient.builder().connectionProvider(cacheProvider).build();

String key = "hash-" + System.nanoTime();
log.info("Starting calls to Redis");
Expand Down Expand Up @@ -243,7 +243,7 @@ public void failoverFromAuthError() {
builder.build());
cacheProvider.setDatabaseSwitchListener(failoverReporter);

UnifiedJedis jedis = new UnifiedJedis(cacheProvider);
MultiDbClient jedis = MultiDbClient.builder().connectionProvider(cacheProvider).build();

String key = "hash-" + System.nanoTime();
log.info("Starting calls to Redis");
Expand Down
Loading