Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve cluster testing support #1171

Merged
merged 1 commit into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions src/test/java/io/nats/client/ConnectTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -568,16 +568,30 @@ public void testRunInJsCluster() throws Exception {
listeners[1] = new ListenerForTesting();
listeners[2] = new ListenerForTesting();

ThreeServerTestOptionsAppender appender = (ix, builder) ->
builder.connectionListener(listeners[ix]).errorListener(listeners[ix]);
ThreeServerTestOptions tstOpts = new ThreeServerTestOptions() {
@Override
public void append(int index, Options.Builder builder) {
builder.connectionListener(listeners[index]).errorListener(listeners[index]);
}

@Override
public boolean configureAccount() {
return true;
}

@Override
public boolean includeAllServers() {
return true;
}
};

runInJsCluster(ConnectTests::validateRunInJsCluster);

listeners[0] = new ListenerForTesting();
listeners[1] = new ListenerForTesting();
listeners[2] = new ListenerForTesting();

runInJsCluster(true, appender, ConnectTests::validateRunInJsCluster);
runInJsCluster(tstOpts, ConnectTests::validateRunInJsCluster);
}

private static void validateRunInJsCluster(Connection nc1, Connection nc2, Connection nc3) throws InterruptedException {
Expand Down
44 changes: 25 additions & 19 deletions src/test/java/io/nats/client/impl/ReconnectTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -717,30 +717,15 @@ private static Thread getReconnectOnConnectTestThread(AtomicReference<Connection
@Test
public void testForceReconnect() throws Exception {
ListenerForTesting listener = new ListenerForTesting();
ThreeServerTestOptionsAppender appender = makeOptionsAppender(listener);

runInJsCluster(appender, (nc0, nc1, nc2) -> {
_testForceReconnect(nc0, listener);
});
ThreeServerTestOptions tstOpts = makeThreeServerTestOptions(listener, false);
runInJsCluster(tstOpts, (nc0, nc1, nc2) -> _testForceReconnect(nc0, listener));
}

@Test
public void testForceReconnectWithAccount() throws Exception {
ListenerForTesting listener = new ListenerForTesting();
ThreeServerTestOptionsAppender appender = makeOptionsAppender(listener);
runInJsCluster(true, appender, (nc0, nc1, nc2) -> {
_testForceReconnect(nc0, listener);
});

}

private static ThreeServerTestOptionsAppender makeOptionsAppender(ListenerForTesting listener) {
ThreeServerTestOptionsAppender appender = (ix, builder) -> {
if (ix == 0) {
builder.connectionListener(listener).ignoreDiscoveredServers().noRandomize();
}
};
return appender;
ThreeServerTestOptions tstOpts = makeThreeServerTestOptions(listener, true);
runInJsCluster(tstOpts, (nc0, nc1, nc2) -> _testForceReconnect(nc0, listener));
}

private static void _testForceReconnect(Connection nc0, ListenerForTesting listener) throws IOException, InterruptedException {
Expand All @@ -756,6 +741,27 @@ private static void _testForceReconnect(Connection nc0, ListenerForTesting liste
assertTrue(listener.getConnectionEvents().contains(Events.RECONNECTED));
}

private static ThreeServerTestOptions makeThreeServerTestOptions(ListenerForTesting listener, final boolean configureAccount) {
return new ThreeServerTestOptions() {
@Override
public void append(int index, Options.Builder builder) {
if (index == 0) {
builder.connectionListener(listener).ignoreDiscoveredServers().noRandomize();
}
}

@Override
public boolean configureAccount() {
return configureAccount;
}

@Override
public boolean includeAllServers() {
return true;
}
};
}

@Test
public void testForceReconnectQueueBehaviorCheck() throws Exception {
runInJsCluster((nc0, nc1, nc2) -> {
Expand Down
81 changes: 44 additions & 37 deletions src/test/java/io/nats/client/utils/TestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ public interface ThreeServerTest {
void test(Connection nc1, Connection nc2, Connection nc3) throws Exception;
}

public interface ThreeServerTestOptionsAppender {
void append(int index, Options.Builder builder);
public interface ThreeServerTestOptions {
default void append(int index, Options.Builder builder) {}
default boolean configureAccount() { return false; }
default boolean includeAllServers() { return false; }
}

public interface VersionCheck {
Expand Down Expand Up @@ -331,18 +333,10 @@ public static void runInJsHubLeaf(TwoServerTest twoServerTest) throws Exception
}

public static void runInJsCluster(ThreeServerTest threeServerTest) throws Exception {
runInJsCluster(false, null, threeServerTest);
runInJsCluster(null, threeServerTest);
}

public static void runInJsCluster(ThreeServerTestOptionsAppender appender, ThreeServerTest threeServerTest) throws Exception {
runInJsCluster(false, appender, threeServerTest);
}

public static void runInJsCluster(boolean account, ThreeServerTest threeServerTest) throws Exception {
runInJsCluster(account, null, threeServerTest);
}

public static void runInJsCluster(boolean account, ThreeServerTestOptionsAppender appender, ThreeServerTest threeServerTest) throws Exception {
public static void runInJsCluster(ThreeServerTestOptions tstOpts, ThreeServerTest threeServerTest) throws Exception {
int port1 = NatsTestServer.nextPort();
int port2 = NatsTestServer.nextPort();
int port3 = NatsTestServer.nextPort();
Expand All @@ -352,70 +346,83 @@ public static void runInJsCluster(boolean account, ThreeServerTestOptionsAppende
String dir1 = tempJsStoreDir();
String dir2 = tempJsStoreDir();
String dir3 = tempJsStoreDir();
String cluster = variant();
String serverPrefix = variant();
String cluster = "cluster_" + variant();
String serverPrefix = "server_" + variant() + "_";

if (tstOpts == null) {
tstOpts = new ThreeServerTestOptions() {};
}
boolean configureAccount = tstOpts.configureAccount();

String[] server1Inserts = makeInsert(cluster, serverPrefix, 1, dir1, listen1, listen2, listen3, account);
String[] server2Inserts = makeInsert(cluster, serverPrefix, 2, dir2, listen2, listen1, listen3, account);
String[] server3Inserts = makeInsert(cluster, serverPrefix, 3, dir3, listen3, listen1, listen2, account);
String[] server1Inserts = makeInsert(cluster, serverPrefix + 1, dir1, listen1, listen2, listen3, configureAccount);
String[] server2Inserts = makeInsert(cluster, serverPrefix + 2, dir2, listen2, listen1, listen3, configureAccount);
String[] server3Inserts = makeInsert(cluster, serverPrefix + 3, dir3, listen3, listen1, listen2, configureAccount);

try (NatsTestServer srv1 = new NatsTestServer(port1, false, true, null, server1Inserts, null);
NatsTestServer srv2 = new NatsTestServer(port2, false, true, null, server2Inserts, null);
NatsTestServer srv3 = new NatsTestServer(port3, false, true, null, server3Inserts, null);
Connection nc1 = standardConnection(makeOptions(0, account, appender, srv1, srv2, srv3));
Connection nc2 = standardConnection(makeOptions(1, account, appender, srv2, srv1, srv3));
Connection nc3 = standardConnection(makeOptions(2, account, appender, srv3, srv1, srv2))
Connection nc1 = standardConnection(makeOptions(0, tstOpts, srv1, srv2, srv3));
Connection nc2 = standardConnection(makeOptions(1, tstOpts, srv2, srv1, srv3));
Connection nc3 = standardConnection(makeOptions(2, tstOpts, srv3, srv1, srv2))
) {
try {
threeServerTest.test(nc1, nc2, nc3);
}
finally {
cleanupJs(nc1);
cleanupJs(nc2);
cleanupJs(nc3);
}
}
}

private static String[] makeInsert(String cluster, String serverPrefix, int serverId, String storeDir, int listen, int route1, int route2, boolean account) {
String[] serverInserts = new String[account ? 19 : 12];
private static String[] makeInsert(String clusterName, String serverName, String jsStoreDir, int listen, int route1, int route2, boolean configureAccount) {
String[] serverInserts = new String[configureAccount ? 19 : 12];
int x = -1;
serverInserts[++x] = "jetstream {";
serverInserts[++x] = " store_dir=" + storeDir;
serverInserts[++x] = " store_dir=" + jsStoreDir;
serverInserts[++x] = "}";
serverInserts[++x] = "server_name=" + serverPrefix + serverId;
serverInserts[++x] = "server_name=" + serverName;
serverInserts[++x] = "cluster {";
serverInserts[++x] = " name: " + cluster;
serverInserts[++x] = " name: " + clusterName;
serverInserts[++x] = " listen: 127.0.0.1:" + listen;
serverInserts[++x] = " routes: [";
serverInserts[++x] = " nats-route://127.0.0.1:" + route1;
serverInserts[++x] = " nats-route://127.0.0.1:" + route2;
serverInserts[++x] = " ]";
serverInserts[++x] = "}";
if (account) {
if (configureAccount) {
serverInserts[++x] = "accounts {";
serverInserts[++x] = " $SYS: {}";
serverInserts[++x] = " NVCF: {";
serverInserts[++x] = " jetstream: \"enabled\",";
serverInserts[++x] = " users: [ { nkey: UBAX6GCZQYLJDLSNPBDDPLY6KIBRO2JAUYNPW4HCWBRCZ4OU57YQQQS3 } ]";
serverInserts[++x] = " users: [ { nkey: " + USER_NKEY + " } ]";
serverInserts[++x] = " }";
serverInserts[++x] = "}";
}
return serverInserts;
}

private static final String USER_NKEY = "UBAX6GCZQYLJDLSNPBDDPLY6KIBRO2JAUYNPW4HCWBRCZ4OU57YQQQS3";
private static final String USER_SEED = "SUAIUIHFQNVWSMKYGC4E5H5IEQZHHND3DKHTRKZWPCDXB6LXVD5R2KROSA";
private static Options makeOptions(int id, boolean account, ThreeServerTestOptionsAppender appender, NatsTestServer... srvs) {
String[] servers = new String[srvs.length];
for (int i = 0; i < srvs.length; i++) {
NatsTestServer nts = srvs[i];
servers[i] = nts.getURI();

private static Options makeOptions(int id, ThreeServerTestOptions tstOpts, NatsTestServer... srvs) {
Options.Builder b = Options.builder();
if (tstOpts.includeAllServers()) {
String[] servers = new String[srvs.length];
for (int i = 0; i < srvs.length; i++) {
NatsTestServer nts = srvs[i];
servers[i] = nts.getURI();
}
b.servers(servers);
}
Options.Builder b = Options.builder().servers(servers);
if (account) {
b.authHandler(Nats.staticCredentials(null, USER_SEED.toCharArray()));
else {
b.server(srvs[0].getURI());
}
if (appender != null) {
appender.append(id, b);
if (tstOpts.configureAccount()) {
b.authHandler(Nats.staticCredentials(null, USER_SEED.toCharArray()));
}
tstOpts.append(id, b);
return b.build();
}

Expand Down
Loading