Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MockResolverIT#replace_cluster_test() #335

Merged
merged 4 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
*/
public class ResolverProvider {

private static boolean alreadyInUse = false;
private static boolean alreadySet = false;
private static AbstractResolverFactory defaultResolverFactoryImpl = new DefaultResolverFactory();

/**
Expand All @@ -16,7 +18,8 @@ public class ResolverProvider {
* @param clazz Class that is requesting the {@link Resolver}.
* @return new {@link Resolver}.
*/
public static Resolver getResolver(Class<?> clazz) {
public static synchronized Resolver getResolver(Class<?> clazz) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make it work correctly in 100% cases you better use ReadWriteLock, synchronized does not solve any problem here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it solves the problem I described. With synchronized you cannot have getResolver race with setDefaultResolverFactory and it's simpler

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually with synchronized i think we can have normal booleans instead

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it solves the problem I described. With synchronized you cannot have getResolver race with setDefaultResolverFactory and it's simpler

I am pretty sure it is not the case, can you make a test to test it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct, let's convert all of them to regular attributes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

alreadyInUse = true;
return defaultResolverFactoryImpl.getResolver(clazz);
}

Expand All @@ -26,7 +29,20 @@ public static Resolver getResolver(Class<?> clazz) {
*
* @param resolverFactoryImpl new {@link Resolver} factory.
*/
public static void setDefaultResolverFactory(AbstractResolverFactory resolverFactoryImpl) {
public static synchronized void setDefaultResolverFactory(
AbstractResolverFactory resolverFactoryImpl) {
if (alreadyInUse) {
throw new IllegalStateException(
"Cannot change default resolver factory: ResolverProvider has already returned "
+ "an instance of a Resolver to use. Default resolver factory needs to be set up before first use by any "
+ "class.");
}
if (alreadySet) {
throw new IllegalStateException(
"Cannot change default resolver factory: this method has already been called. "
+ "You can set default resolver factory only once.");
}
alreadySet = true;
defaultResolverFactoryImpl = resolverFactoryImpl;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
Expand All @@ -34,7 +35,6 @@
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.testinfra.ccm.CcmBridge;
import com.datastax.oss.driver.api.testinfra.ccm.CustomCcmRule;
import com.datastax.oss.driver.categories.IsolatedTests;
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultProgrammaticDriverConfigLoaderBuilder;
import com.datastax.oss.driver.internal.core.resolver.ResolverProvider;
Expand All @@ -45,10 +45,11 @@
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.ClassRule;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
Expand All @@ -58,52 +59,199 @@
public class MockResolverIT {

private static final Logger LOG = LoggerFactory.getLogger(MockResolverIT.class);
private static final MockResolverFactory RESOLVER_FACTORY = new MockResolverFactory();

@ClassRule
public static final CustomCcmRule CCM_RULE = CustomCcmRule.builder().withNodes(1).build();
private static final int CLUSTER_WAIT_SECONDS =
60; // Maximal wait time for cluster nodes to get up

@BeforeClass
public static void setUpResolver() {
ResolverProvider.setDefaultResolverFactory(RESOLVER_FACTORY);
}

@Test
public void should_connect_with_mocked_hostname() {
CcmBridge ccmBridge = CCM_RULE.getCcmBridge();
CcmBridge.Builder ccmBridgeBuilder = CcmBridge.builder().withNodes(1).withIpPrefix("127.0.1.");
try (CcmBridge ccmBridge = ccmBridgeBuilder.build()) {
RESOLVER_FACTORY.updateResponse(
"test.cluster.fake",
new ValidResponse(new InetAddress[] {getNodeInetAddress(ccmBridge, 1)}));
ccmBridge.create();
ccmBridge.start();

MockResolverFactory resolverFactory = new MockResolverFactory();
resolverFactory.updateResponse(
"node-1.cluster.fake",
new ValidResponse(new InetAddress[] {getNodeInetAddress(ccmBridge, 1)}));
ResolverProvider.setDefaultResolverFactory(resolverFactory);
DriverConfigLoader loader =
new DefaultProgrammaticDriverConfigLoaderBuilder()
.withBoolean(TypedDriverOption.RESOLVE_CONTACT_POINTS.getRawOption(), false)
.withBoolean(TypedDriverOption.RECONNECT_ON_INIT.getRawOption(), true)
.withStringList(
TypedDriverOption.CONTACT_POINTS.getRawOption(),
Collections.singletonList("test.cluster.fake:9042"))
.build();

CqlSessionBuilder builder = new CqlSessionBuilder().withConfigLoader(loader);
try (CqlSession session = builder.build()) {
ResultSet rs = session.execute("SELECT * FROM system.local");
List<Row> rows = rs.all();
assertThat(rows).hasSize(1);
LOG.trace("system.local contents: {}", rows.get(0).getFormattedContents());
Collection<Node> nodes = session.getMetadata().getNodes().values();
for (Node node : nodes) {
LOG.trace("Found metadata node: {}", node);
}
Set<Node> filteredNodes;
filteredNodes =
nodes.stream()
.filter(x -> x.toString().contains("test.cluster.fake"))
.collect(Collectors.toSet());
assertThat(filteredNodes).hasSize(1);
InetSocketAddress address =
(InetSocketAddress) filteredNodes.iterator().next().getEndPoint().resolve();
assertTrue(address.isUnresolved());
}
}
}

@Test
public void replace_cluster_test() {
final int numberOfNodes = 3;
DriverConfigLoader loader =
new DefaultProgrammaticDriverConfigLoaderBuilder()
.withBoolean(TypedDriverOption.RESOLVE_CONTACT_POINTS.getRawOption(), false)
.withBoolean(TypedDriverOption.RECONNECT_ON_INIT.getRawOption(), true)
.withStringList(
TypedDriverOption.CONTACT_POINTS.getRawOption(),
Collections.singletonList("node-1.cluster.fake:9042"))
Collections.singletonList("test.cluster.fake:9042"))
dkropachev marked this conversation as resolved.
Show resolved Hide resolved
.build();

CqlSessionBuilder builder = new CqlSessionBuilder().withConfigLoader(loader);
try (CqlSession session = builder.build()) {
CqlSession session;

try (CcmBridge ccmBridge =
CcmBridge.builder().withNodes(numberOfNodes).withIpPrefix("127.0.1.").build()) {
RESOLVER_FACTORY.updateResponse(
"test.cluster.fake",
new ValidResponse(
new InetAddress[] {
getNodeInetAddress(ccmBridge, 1),
getNodeInetAddress(ccmBridge, 2),
getNodeInetAddress(ccmBridge, 3)
}));
ccmBridge.create();
ccmBridge.start();
session = builder.build();
boolean allNodesUp = false;
int nodesUp = 0;
for (int i = 0; i < CLUSTER_WAIT_SECONDS; i++) {
try {
Collection<Node> nodes = session.getMetadata().getNodes().values();
nodesUp = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
nodesUp++;
}
}
if (nodesUp == numberOfNodes) {
allNodesUp = true;
break;
}
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
}
if (!allNodesUp) {
LOG.error(
"Driver sees only {} nodes UP instead of {} after waiting {}s",
nodesUp,
numberOfNodes,
CLUSTER_WAIT_SECONDS);
}
ResultSet rs = session.execute("SELECT * FROM system.local");
List<Row> rows = rs.all();
assertThat(rows).hasSize(1);
LOG.trace("system.local contents: {}", rows.get(0).getFormattedContents());
assertThat(rs).isNotNull();
Row row = rs.one();
assertThat(row).isNotNull();
Collection<Node> nodes = session.getMetadata().getNodes().values();
for (Node node : nodes) {
LOG.trace("Found metadata node: {}", node);
assertThat(nodes).hasSize(numberOfNodes);
Iterator<Node> iterator = nodes.iterator();
while (iterator.hasNext()) {
LOG.trace("Metadata node: " + iterator.next().toString());
}
Set<Node> filteredNodes;
filteredNodes =
nodes.stream()
.filter(x -> x.toString().contains("node-1.cluster.fake"))
.filter(x -> x.toString().contains("test.cluster.fake"))
dkropachev marked this conversation as resolved.
Show resolved Hide resolved
.collect(Collectors.toSet());
assertThat(filteredNodes).hasSize(1);
InetSocketAddress address =
(InetSocketAddress) filteredNodes.iterator().next().getEndPoint().resolve();
assertTrue(address.isUnresolved());
}
try (CcmBridge ccmBridge =
CcmBridge.builder().withNodes(numberOfNodes).withIpPrefix("127.0.1.").build()) {
ccmBridge.create();
ccmBridge.start();
boolean allNodesUp = false;
int nodesUp = 0;
for (int i = 0; i < CLUSTER_WAIT_SECONDS; i++) {
try {
Collection<Node> nodes = session.getMetadata().getNodes().values();
nodesUp = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
nodesUp++;
}
}
if (nodesUp == numberOfNodes) {
allNodesUp = true;
break;
}
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
}
if (!allNodesUp) {
LOG.error(
"Driver sees only {} nodes UP instead of {} after waiting {}s",
nodesUp,
numberOfNodes,
CLUSTER_WAIT_SECONDS);
}
ResultSet rs = session.execute("SELECT * FROM system.local");
assertThat(rs).isNotNull();
Row row = rs.one();
assertThat(row).isNotNull();

Collection<Node> nodes = session.getMetadata().getNodes().values();
assertThat(nodes).hasSize(numberOfNodes);
Iterator<Node> iterator = nodes.iterator();
while (iterator.hasNext()) {
LOG.trace("Metadata node: " + iterator.next().toString());
}
Set<Node> filteredNodes;
filteredNodes =
nodes.stream()
.filter(x -> x.toString().contains("test.cluster.fake"))
.collect(Collectors.toSet());
if (filteredNodes.size() == 0) {
LOG.error(
"No metadata node with \"test.cluster.fake\" substring. The unresolved endpoint socket was likely "
+ "replaced with resolved one.");
} else if (filteredNodes.size() > 1) {
fail(
"Somehow there is more than 1 node in metadata with unresolved hostname. This should not ever happen.");
}
}
session.close();
}

@SuppressWarnings("unused")
public void run_replace_test_20_times() {
for (int i = 1; i <= 20; i++) {
LOG.info(
"Running ({}/20}) {}", i, MockResolverIT.class.toString() + "#replace_cluster_test()");
replace_cluster_test();
}
}

private InetAddress getNodeInetAddress(CcmBridge ccmBridge, int nodeid) {
private static InetAddress getNodeInetAddress(CcmBridge ccmBridge, int nodeid) {
try {
return InetAddress.getByName(ccmBridge.getNodeIpAddress(nodeid));
} catch (UnknownHostException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public void reloadCore(int node, String keyspace, String table, boolean reindex)
public void start() {
if (started.compareAndSet(false, true)) {
try {
execute("start", jvmArgs, "--wait-for-binary-proto");
execute("start", jvmArgs, "--wait-for-binary-proto", "--wait-other-notice");
} catch (RuntimeException re) {
// if something went wrong starting CCM, see if we can also dump the error
executeCheckLogError();
Expand Down Expand Up @@ -407,9 +407,11 @@ public void start(int n) {
"node" + n,
"start",
"--jvm_arg=-Dcassandra.allow_new_old_config_keys=true",
"--jvm_arg=-Dcassandra.allow_duplicate_config_keys=false");
"--jvm_arg=-Dcassandra.allow_duplicate_config_keys=false",
"--wait-other-notice",
"--wait-for-binary-proto");
} else {
execute("node" + n, "start");
execute("node" + n, "start", "--wait-other-notice", "--wait-for-binary-proto");
}
}

Expand Down
Loading