Skip to content

Commit

Permalink
4.x: Add optional fallback for ControlConnection#reconnect() (#341)
Browse files Browse the repository at this point in the history
* Add `MockResolverIT#cannot_reconnect_with_resolved_socket()`

Adds a method for testing the issues that surface after cluster
replacements. Due to the variable, sometimes long runtime it is not added
to any of the test groups.

* Add optional fallback for `ControlConnection#reconnect()`

Adds an experimental option to allow `ControlConnection` to try
reconnecting to the original contact points held by `MetadataManager`,
in case of getting empty query plan from the load balancing policy.

In order to separate this logic from query plans of other queries
`LoadBalancingPolicyWrapper#newControlReconnectionQueryPlan()` was introduced
and is called during reconnection in place of `newQueryPlan()`.
  • Loading branch information
Bouncheck authored Oct 8, 2024
1 parent d69bdd4 commit abe2811
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,15 @@ public enum DefaultDriverOption implements DriverOption {
*/
CONTROL_CONNECTION_AGREEMENT_WARN("advanced.control-connection.schema-agreement.warn-on-failure"),

/**
* Whether to forcibly add original contact points held by MetadataManager to the reconnection
* plan, in case there is no live nodes available according to LBP. Experimental.
*
* <p>Value-type: boolean
*/
CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS(
"advanced.control-connection.reconnection.fallback-to-original-contact-points"),

/**
* Whether `Session.prepare` calls should be sent to all nodes in the cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) {
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_INTERVAL, Duration.ofMillis(200));
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT, Duration.ofSeconds(10));
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN, true);
map.put(TypedDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS, false);
map.put(TypedDriverOption.PREPARE_ON_ALL_NODES, true);
map.put(TypedDriverOption.REPREPARE_ENABLED, true);
map.put(TypedDriverOption.REPREPARE_CHECK_SYSTEM_TABLE, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,10 @@ public String toString() {
public static final TypedDriverOption<Boolean> CONTROL_CONNECTION_AGREEMENT_WARN =
new TypedDriverOption<>(
DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN, GenericType.BOOLEAN);
/** Whether to forcibly try original contacts if no live nodes are available */
public static final TypedDriverOption<Boolean> CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS =
new TypedDriverOption<>(
DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS, GenericType.BOOLEAN);
/** Whether `Session.prepare` calls should be sent to all nodes in the cluster. */
public static final TypedDriverOption<Boolean> PREPARE_ON_ALL_NODES =
new TypedDriverOption<>(DefaultDriverOption.PREPARE_ON_ALL_NODES, GenericType.BOOLEAN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ private void init(
.withOwnerLogPrefix(logPrefix + "|control")
.build();

Queue<Node> nodes = context.getLoadBalancingPolicyWrapper().newQueryPlan();
Queue<Node> nodes =
context.getLoadBalancingPolicyWrapper().newControlReconnectionQueryPlan();

connect(
nodes,
Expand Down Expand Up @@ -336,7 +337,7 @@ private void init(

private CompletionStage<Boolean> reconnect() {
assert adminExecutor.inEventLoop();
Queue<Node> nodes = context.getLoadBalancingPolicyWrapper().newQueryPlan();
Queue<Node> nodes = context.getLoadBalancingPolicyWrapper().newControlReconnectionQueryPlan();
CompletableFuture<Boolean> result = new CompletableFuture<>();
connect(
nodes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package com.datastax.oss.driver.internal.core.metadata;

import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
Expand Down Expand Up @@ -161,8 +162,25 @@ public Queue<Node> newQueryPlan(
}

@NonNull
public Queue<Node> newQueryPlan() {
return newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null);
public Queue<Node> newControlReconnectionQueryPlan() {
// First try the original way
Queue<Node> regularQueryPlan = newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null);
if (!regularQueryPlan.isEmpty()) return regularQueryPlan;

if (context
.getConfig()
.getDefaultProfile()
.getBoolean(DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS)) {
Set<DefaultNode> originalNodes = context.getMetadataManager().getContactPoints();
List<Node> nodes = new ArrayList<>();
for (DefaultNode node : originalNodes) {
nodes.add(new DefaultNode(node.getEndPoint(), context));
}
Collections.shuffle(nodes);
return new ConcurrentLinkedQueue<>(nodes);
} else {
return regularQueryPlan;
}
}

// when it comes in from the outside
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2113,6 +2113,17 @@ datastax-java-driver {
# Overridable in a profile: no
warn-on-failure = true
}

reconnection {
# Whether to forcibly add original contact points held by MetadataManager to the reconnection plan,
# in case there is no live nodes available according to LBP.
# Experimental.
#
# Required: yes
# Modifiable at runtime: yes, the new value will be used for checks issued after the change.
# Overridable in a profile: no
fallback-to-original-contact-points = false
}
}

advanced.prepared-statements {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,25 @@ public void setup() {
when(defaultProfile.getBoolean(DefaultDriverOption.CONNECTION_WARN_INIT_ERROR))
.thenReturn(false);

when(context.getConfig()).thenReturn(config);
when(config.getDefaultProfile()).thenReturn(defaultProfile);
when(defaultProfile.getBoolean(DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS))
.thenReturn(false);

controlConnection = new ControlConnection(context);
}

protected void mockQueryPlan(Node... nodes) {
when(loadBalancingPolicyWrapper.newQueryPlan())
when(loadBalancingPolicyWrapper.newControlReconnectionQueryPlan())
.thenAnswer(
i -> {
ConcurrentLinkedQueue<Node> queryPlan = new ConcurrentLinkedQueue<>();
for (Node node : nodes) {
queryPlan.offer(node);
}
return queryPlan;
});
when(loadBalancingPolicyWrapper.newControlReconnectionQueryPlan())
.thenAnswer(
i -> {
ConcurrentLinkedQueue<Node> queryPlan = new ConcurrentLinkedQueue<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,22 @@ public void setup() {
policy3));
}

@Test
public void should_build_control_connection_query_plan_from_contact_points_before_init() {
// When
Queue<Node> queryPlan = wrapper.newControlReconnectionQueryPlan();

// Then
for (LoadBalancingPolicy policy : ImmutableList.of(policy1, policy2, policy3)) {
verify(policy, never()).newQueryPlan(null, null);
}
assertThat(queryPlan).hasSameElementsAs(contactPoints);
}

@Test
public void should_build_query_plan_from_contact_points_before_init() {
// When
Queue<Node> queryPlan = wrapper.newQueryPlan();
Queue<Node> queryPlan = wrapper.newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null);

// Then
for (LoadBalancingPolicy policy : ImmutableList.of(policy1, policy2, policy3)) {
Expand All @@ -139,7 +151,24 @@ public void should_fetch_query_plan_from_policy_after_init() {
}

// When
Queue<Node> queryPlan = wrapper.newQueryPlan();
Queue<Node> queryPlan = wrapper.newControlReconnectionQueryPlan();

// Then
// no-arg newQueryPlan() uses the default profile
verify(policy1).newQueryPlan(null, null);
assertThat(queryPlan).isEqualTo(defaultPolicyQueryPlan);
}

@Test
public void should_fetch_control_connection_query_plan_from_policy_after_init() {
// Given
wrapper.init();
for (LoadBalancingPolicy policy : ImmutableList.of(policy1, policy2, policy3)) {
verify(policy).init(anyMap(), any(DistanceReporter.class));
}

// When
Queue<Node> queryPlan = wrapper.newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null);

// Then
// no-arg newQueryPlan() uses the default profile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package com.datastax.oss.driver.core.resolver;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand All @@ -33,11 +34,14 @@
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.testinfra.ccm.CcmBridge;
import com.datastax.oss.driver.categories.IsolatedTests;
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultProgrammaticDriverConfigLoaderBuilder;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -237,4 +241,176 @@ public void run_replace_test_20_times() {
replace_cluster_test();
}
}

// This is too long to run during CI, but is useful for manual investigations.
@SuppressWarnings("unused")
public void cannot_reconnect_with_resolved_socket() {
DriverConfigLoader loader =
new DefaultProgrammaticDriverConfigLoaderBuilder()
.withBoolean(TypedDriverOption.RESOLVE_CONTACT_POINTS.getRawOption(), false)
.withBoolean(TypedDriverOption.RECONNECT_ON_INIT.getRawOption(), true)
.withStringList(
TypedDriverOption.CONTACT_POINTS.getRawOption(),
Collections.singletonList("test.cluster.fake:9042"))
.build();

CqlSessionBuilder builder = new CqlSessionBuilder().withConfigLoader(loader);
CqlSession session;
Collection<Node> nodes;
Set<Node> filteredNodes;
try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(3).withIpPrefix("127.0.1.").build()) {
MultimapHostResolverProvider.removeResolverEntries("test.cluster.fake");
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(1));
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(2));
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(3));
ccmBridge.create();
ccmBridge.start();
session = builder.build();
long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
while (System.currentTimeMillis() < endTime) {
try {
nodes = session.getMetadata().getNodes().values();
int upNodes = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
upNodes++;
}
}
if (upNodes == 3) {
break;
}
// session.refreshSchema();
SimpleStatement statement =
new SimpleStatementBuilder("SELECT * FROM system.local")
.setTimeout(Duration.ofSeconds(3))
.build();
session.executeAsync(statement);
Thread.sleep(3000);
} catch (InterruptedException e) {
break;
}
}
ResultSet rs = session.execute("SELECT * FROM system.local");
assertThat(rs).isNotNull();
Row row = rs.one();
assertThat(row).isNotNull();
nodes = session.getMetadata().getNodes().values();
assertThat(nodes).hasSize(3);
Iterator<Node> iterator = nodes.iterator();
while (iterator.hasNext()) {
LOG.trace("Metadata node: " + iterator.next().toString());
}
filteredNodes =
nodes.stream()
.filter(x -> x.toString().contains("test.cluster.fake"))
.collect(Collectors.toSet());
assertThat(filteredNodes).hasSize(1);
}
int counter = 0;
while (filteredNodes.size() == 1) {
counter++;
if (counter == 255) {
LOG.error("Completed 254 runs. Breaking.");
break;
}
LOG.warn(
"Launching another cluster until we lose resolved socket from metadata (run {}).",
counter);
try (CcmBridge ccmBridge =
CcmBridge.builder().withNodes(3).withIpPrefix("127.0." + counter + ".").build()) {
MultimapHostResolverProvider.removeResolverEntries("test.cluster.fake");
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(1));
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(2));
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(3));
ccmBridge.create();
ccmBridge.start();
long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
while (System.currentTimeMillis() < endTime) {
try {
nodes = session.getMetadata().getNodes().values();
int upNodes = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
upNodes++;
}
}
if (upNodes == 3) {
break;
}
SimpleStatement statement =
new SimpleStatementBuilder("SELECT * FROM system.local")
.setTimeout(Duration.ofSeconds(3))
.build();
session.executeAsync(statement);
Thread.sleep(3000);
} catch (InterruptedException e) {
break;
}
}
nodes = session.getMetadata().getNodes().values();
assertThat(nodes).hasSize(3);
Iterator<Node> iterator = nodes.iterator();
while (iterator.hasNext()) {
LOG.trace("Metadata node: " + iterator.next().toString());
}
filteredNodes =
nodes.stream()
.filter(x -> x.toString().contains("test.cluster.fake"))
.collect(Collectors.toSet());
if (filteredNodes.size() > 1) {
fail(
"Somehow there is more than 1 node in metadata with unresolved hostname. This should not ever happen.");
}
}
}
Iterator<Node> iterator = nodes.iterator();
while (iterator.hasNext()) {
InetSocketAddress address = (InetSocketAddress) iterator.next().getEndPoint().resolve();
assertFalse(address.isUnresolved());
}
try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(3).withIpPrefix("127.1.1.").build()) {
MultimapHostResolverProvider.removeResolverEntries("test.cluster.fake");
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(1));
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(2));
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(3));
// Now the driver should fail to reconnect since unresolved hostname is gone.
ccmBridge.create();
ccmBridge.start();
long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
while (System.currentTimeMillis() < endTime) {
try {
nodes = session.getMetadata().getNodes().values();
int upNodes = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
upNodes++;
}
}
if (upNodes == 3) {
break;
}
// session.refreshSchema();
SimpleStatement statement =
new SimpleStatementBuilder("SELECT * FROM system.local")
.setTimeout(Duration.ofSeconds(3))
.build();
session.executeAsync(statement);
Thread.sleep(3000);
} catch (InterruptedException e) {
break;
}
}
session.execute("SELECT * FROM system.local");
}
session.close();
}
}

0 comments on commit abe2811

Please sign in to comment.