Skip to content

Commit

Permalink
Fix #302: allow select during sentinel mode
Browse files Browse the repository at this point in the history
Signed-off-by: Paulo Lopes <pmlopes@gmail.com>
  • Loading branch information
pmlopes committed Jul 29, 2021
1 parent 75ccf10 commit ccb3742
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 20 deletions.
26 changes: 13 additions & 13 deletions src/main/java/io/vertx/redis/client/impl/RedisSentinelClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,17 @@ public Future<RedisConnection> connect() {

private void createConnectionInternal(RedisOptions options, RedisRole role, Handler<AsyncResult<RedisConnection>> onCreate) {

final Handler<AsyncResult<String>> createAndConnect = resolve -> {
final Handler<AsyncResult<RedisURI>> createAndConnect = resolve -> {
if (resolve.failed()) {
onCreate.handle(Future.failedFuture(resolve.cause()));
return;
}
// wrap a new client
if (role == RedisRole.SENTINEL) {// sentinel cannot select
final RedisURI uri = new RedisURI(resolve.result());
final RedisURI uri = resolve.result();
connectionManager.getConnection(getSentinelEndpoint(uri), null).onComplete(onCreate);
} else {
connectionManager.getConnection(resolve.result(), null).onComplete(onCreate);
connectionManager.getConnection(resolve.result().toString(), null).onComplete(onCreate);
}
};

Expand All @@ -150,14 +150,14 @@ private void createConnectionInternal(RedisOptions options, RedisRole role, Hand
* We use the algorithm from http://redis.io/topics/sentinel-clients
* to get a sentinel client and then do 'stuff' with it
*/
private static void resolveClient(final Resolver checkEndpointFn, final RedisOptions options, final Handler<AsyncResult<String>> callback) {
private static void resolveClient(final Resolver checkEndpointFn, final RedisOptions options, final Handler<AsyncResult<RedisURI>> callback) {
// Because finding the master is going to be an async list we will terminate
// when we find one then use promises...
iterate(0, checkEndpointFn, options, iterate -> {
if (iterate.failed()) {
callback.handle(Future.failedFuture(iterate.cause()));
} else {
final Pair<Integer, String> found = iterate.result();
final Pair<Integer, RedisURI> found = iterate.result();
// This is the endpoint that has responded so stick it on the top of
// the list
final List<String> endpoints = options.getEndpoints();
Expand All @@ -170,7 +170,7 @@ private static void resolveClient(final Resolver checkEndpointFn, final RedisOpt
});
}

private static void iterate(final int idx, final Resolver checkEndpointFn, final RedisOptions argument, final Handler<AsyncResult<Pair<Integer, String>>> resultHandler) {
private static void iterate(final int idx, final Resolver checkEndpointFn, final RedisOptions argument, final Handler<AsyncResult<Pair<Integer, RedisURI>>> resultHandler) {
// stop condition
final List<String> endpoints = argument.getEndpoints();

Expand All @@ -192,7 +192,7 @@ private static void iterate(final int idx, final Resolver checkEndpointFn, final

// begin endpoint check methods

private void isSentinelOk(String endpoint, RedisOptions argument, Handler<AsyncResult<String>> handler) {
private void isSentinelOk(String endpoint, RedisOptions argument, Handler<AsyncResult<RedisURI>> handler) {
// we can't use the endpoint as is, it should not contain a database selection,
// but can contain authentication
final RedisURI uri = new RedisURI(endpoint);
Expand All @@ -205,15 +205,15 @@ private void isSentinelOk(String endpoint, RedisOptions argument, Handler<AsyncR
if (ping.failed()) {
handler.handle(Future.failedFuture(ping.cause()));
} else {
handler.handle(Future.succeededFuture(endpoint));
handler.handle(Future.succeededFuture(uri));
}
// connection is not needed anymore
conn.close();
});
});
}

private void getMasterFromEndpoint(String endpoint, RedisOptions options, Handler<AsyncResult<String>> handler) {
private void getMasterFromEndpoint(String endpoint, RedisOptions options, Handler<AsyncResult<RedisURI>> handler) {
// we can't use the endpoint as is, it should not contain a database selection,
// but can contain authentication
final RedisURI uri = new RedisURI(endpoint);
Expand All @@ -229,16 +229,16 @@ private void getMasterFromEndpoint(String endpoint, RedisOptions options, Handle
// Test the response
final Response response = getMasterAddrByName.result();
final String host = response.get(0).toString().contains(":") ? "[" + response.get(0).toString() + "]" : response.get(0).toString();
handler.handle(
Future.succeededFuture(uri.protocol() + "://" + uri.userinfo() + host + ":" + response.get(1).toInteger()));
final Integer port = response.get(1).toInteger();
handler.handle(Future.succeededFuture(new RedisURI(uri, host, port)));
}
// we don't need this connection anymore
conn.close();
});
});
}

private void getReplicaFromEndpoint(String endpoint, RedisOptions options, Handler<AsyncResult<String>> handler) {
private void getReplicaFromEndpoint(String endpoint, RedisOptions options, Handler<AsyncResult<RedisURI>> handler) {
// we can't use the endpoint as is, it should not contain a database selection,
// but can contain authentication
final RedisURI uri = new RedisURI(endpoint);
Expand Down Expand Up @@ -277,7 +277,7 @@ private void getReplicaFromEndpoint(String endpoint, RedisOptions options, Handl
} else {
final String host = ip.contains(":") ? "[" + ip + "]" : ip;

handler.handle(Future.succeededFuture(uri.protocol() + "://" + uri.userinfo() + host + ":" + port));
handler.handle(Future.succeededFuture(new RedisURI(uri, host, port)));
}
}
}
Expand Down
18 changes: 12 additions & 6 deletions src/main/java/io/vertx/redis/client/impl/RedisURI.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ public final class RedisURI {
private static final String DEFAULT_HOST = "localhost";
private static final int DEFAULT_PORT = 6379;

/**
* Original address string
*/
private final String connectionString;
/**
* Address, including host and port
*/
Expand Down Expand Up @@ -67,8 +63,18 @@ public final class RedisURI {
*/
private final Map<String, String> params;

public RedisURI(RedisURI base, String host, int port) {
socketAddress = SocketAddress.inetSocketAddress(port, host);
unix = false;
// use the base data
user = base.user;
password = base.password;
select = base.select;
ssl = base.ssl;
params = base.params;
}

public RedisURI(String connectionString) {
this.connectionString = connectionString;
try {
final URI uri = new URI(connectionString);

Expand Down Expand Up @@ -216,6 +222,6 @@ public String protocol() {

@Override
public String toString() {
return connectionString;
return protocol() + "://" + socketAddress() + "/" + (select == null ? "" : select);
}
}
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/redis/client/impl/Resolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@
@FunctionalInterface
interface Resolver {

void resolve(String endpoint, RedisOptions parameter, Handler<AsyncResult<String>> callback);
void resolve(String endpoint, RedisOptions parameter, Handler<AsyncResult<RedisURI>> callback);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.vertx.redis.client.test;

import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.RunTestOnContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.redis.client.*;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(VertxUnitRunner.class)
public class RedisSentinelWithDBTest {

@Rule
public final RunTestOnContext rule = new RunTestOnContext();

@Test
public void testGetClientToMaster(TestContext should) {
final Async test = should.async();

Redis.createClient(
rule.vertx(),
new RedisOptions()
.setType(RedisClientType.SENTINEL)
.addConnectionString("redis://localhost:5000/0")
.addConnectionString("redis://localhost:5001/0")
.addConnectionString("redis://localhost:5002/0")
.setMasterName("sentinel7000")
.setRole(RedisRole.MASTER)
.setMaxPoolSize(4)
.setMaxPoolWaiting(16))
.connect(onCreate -> {
// get a connection to the master node
should.assertTrue(onCreate.succeeded());
// query the info
onCreate.result()
.send(Request.cmd(Command.CLIENT).arg("LIST"), info -> {
should.assertTrue(info.succeeded());
System.out.println(info.result());
test.complete();
});
});
}
}

0 comments on commit ccb3742

Please sign in to comment.