Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: Add optional fallback for ControlConnection#reconnect() #341

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,15 @@ public enum DefaultDriverOption implements DriverOption {
*/
CONTROL_CONNECTION_AGREEMENT_WARN("advanced.control-connection.schema-agreement.warn-on-failure"),

/**
* Whether to forcibly add original contact points held by MetadataManager to the reconnection
* plan, in case there is no live nodes available according to LBP. Experimental.
*
* <p>Value-type: boolean
*/
CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS(
"advanced.control-connection.reconnection.fallback-to-original-contact-points"),

/**
* Whether `Session.prepare` calls should be sent to all nodes in the cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) {
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_INTERVAL, Duration.ofMillis(200));
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT, Duration.ofSeconds(10));
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN, true);
map.put(TypedDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS, false);
map.put(TypedDriverOption.PREPARE_ON_ALL_NODES, true);
map.put(TypedDriverOption.REPREPARE_ENABLED, true);
map.put(TypedDriverOption.REPREPARE_CHECK_SYSTEM_TABLE, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,10 @@ public String toString() {
public static final TypedDriverOption<Boolean> CONTROL_CONNECTION_AGREEMENT_WARN =
new TypedDriverOption<>(
DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN, GenericType.BOOLEAN);
/** Whether to forcibly try original contacts if no live nodes are available */
public static final TypedDriverOption<Boolean> CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS =
new TypedDriverOption<>(
DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS, GenericType.BOOLEAN);
/** Whether `Session.prepare` calls should be sent to all nodes in the cluster. */
public static final TypedDriverOption<Boolean> PREPARE_ON_ALL_NODES =
new TypedDriverOption<>(DefaultDriverOption.PREPARE_ON_ALL_NODES, GenericType.BOOLEAN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ private void init(
.withOwnerLogPrefix(logPrefix + "|control")
.build();

Queue<Node> nodes = context.getLoadBalancingPolicyWrapper().newQueryPlan();
Queue<Node> nodes =
context.getLoadBalancingPolicyWrapper().newControlReconnectionQueryPlan();

connect(
nodes,
Expand Down Expand Up @@ -336,7 +337,7 @@ private void init(

private CompletionStage<Boolean> reconnect() {
assert adminExecutor.inEventLoop();
Queue<Node> nodes = context.getLoadBalancingPolicyWrapper().newQueryPlan();
Queue<Node> nodes = context.getLoadBalancingPolicyWrapper().newControlReconnectionQueryPlan();
CompletableFuture<Boolean> result = new CompletableFuture<>();
connect(
nodes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package com.datastax.oss.driver.internal.core.metadata;

import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
Expand Down Expand Up @@ -161,8 +162,25 @@ public Queue<Node> newQueryPlan(
}

@NonNull
public Queue<Node> newQueryPlan() {
return newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null);
public Queue<Node> newControlReconnectionQueryPlan() {
// First try the original way
Queue<Node> regularQueryPlan = newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null);
if (!regularQueryPlan.isEmpty()) return regularQueryPlan;

if (context
.getConfig()
.getDefaultProfile()
.getBoolean(DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS)) {
Set<DefaultNode> originalNodes = context.getMetadataManager().getContactPoints();
List<Node> nodes = new ArrayList<>();
for (DefaultNode node : originalNodes) {
nodes.add(new DefaultNode(node.getEndPoint(), context));
}
Collections.shuffle(nodes);
return new ConcurrentLinkedQueue<>(nodes);
} else {
return regularQueryPlan;
}
}

// when it comes in from the outside
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2113,6 +2113,17 @@ datastax-java-driver {
# Overridable in a profile: no
warn-on-failure = true
}

reconnection {
# Whether to forcibly add original contact points held by MetadataManager to the reconnection plan,
# in case there is no live nodes available according to LBP.
# Experimental.
#
# Required: yes
# Modifiable at runtime: yes, the new value will be used for checks issued after the change.
# Overridable in a profile: no
fallback-to-original-contact-points = false
}
}

advanced.prepared-statements {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,25 @@ public void setup() {
when(defaultProfile.getBoolean(DefaultDriverOption.CONNECTION_WARN_INIT_ERROR))
.thenReturn(false);

when(context.getConfig()).thenReturn(config);
when(config.getDefaultProfile()).thenReturn(defaultProfile);
when(defaultProfile.getBoolean(DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS))
.thenReturn(false);

controlConnection = new ControlConnection(context);
}

protected void mockQueryPlan(Node... nodes) {
when(loadBalancingPolicyWrapper.newQueryPlan())
when(loadBalancingPolicyWrapper.newControlReconnectionQueryPlan())
.thenAnswer(
i -> {
ConcurrentLinkedQueue<Node> queryPlan = new ConcurrentLinkedQueue<>();
for (Node node : nodes) {
queryPlan.offer(node);
}
return queryPlan;
});
when(loadBalancingPolicyWrapper.newControlReconnectionQueryPlan())
.thenAnswer(
i -> {
ConcurrentLinkedQueue<Node> queryPlan = new ConcurrentLinkedQueue<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,22 @@ public void setup() {
policy3));
}

@Test
public void should_build_control_connection_query_plan_from_contact_points_before_init() {
// When
Queue<Node> queryPlan = wrapper.newControlReconnectionQueryPlan();

// Then
for (LoadBalancingPolicy policy : ImmutableList.of(policy1, policy2, policy3)) {
verify(policy, never()).newQueryPlan(null, null);
}
assertThat(queryPlan).hasSameElementsAs(contactPoints);
}

@Test
public void should_build_query_plan_from_contact_points_before_init() {
// When
Queue<Node> queryPlan = wrapper.newQueryPlan();
Queue<Node> queryPlan = wrapper.newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null);

// Then
for (LoadBalancingPolicy policy : ImmutableList.of(policy1, policy2, policy3)) {
Expand All @@ -139,7 +151,24 @@ public void should_fetch_query_plan_from_policy_after_init() {
}

// When
Queue<Node> queryPlan = wrapper.newQueryPlan();
Queue<Node> queryPlan = wrapper.newControlReconnectionQueryPlan();

// Then
// no-arg newQueryPlan() uses the default profile
verify(policy1).newQueryPlan(null, null);
assertThat(queryPlan).isEqualTo(defaultPolicyQueryPlan);
}

@Test
public void should_fetch_control_connection_query_plan_from_policy_after_init() {
// Given
wrapper.init();
for (LoadBalancingPolicy policy : ImmutableList.of(policy1, policy2, policy3)) {
verify(policy).init(anyMap(), any(DistanceReporter.class));
}

// When
Queue<Node> queryPlan = wrapper.newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null);

// Then
// no-arg newQueryPlan() uses the default profile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package com.datastax.oss.driver.core.resolver;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand All @@ -33,11 +34,14 @@
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.testinfra.ccm.CcmBridge;
import com.datastax.oss.driver.categories.IsolatedTests;
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultProgrammaticDriverConfigLoaderBuilder;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -237,4 +241,176 @@ public void run_replace_test_20_times() {
replace_cluster_test();
}
}

// This is too long to run during CI, but is useful for manual investigations.
@SuppressWarnings("unused")
public void cannot_reconnect_with_resolved_socket() {
DriverConfigLoader loader =
new DefaultProgrammaticDriverConfigLoaderBuilder()
.withBoolean(TypedDriverOption.RESOLVE_CONTACT_POINTS.getRawOption(), false)
.withBoolean(TypedDriverOption.RECONNECT_ON_INIT.getRawOption(), true)
.withStringList(
TypedDriverOption.CONTACT_POINTS.getRawOption(),
Collections.singletonList("test.cluster.fake:9042"))
.build();

CqlSessionBuilder builder = new CqlSessionBuilder().withConfigLoader(loader);
CqlSession session;
Collection<Node> nodes;
Set<Node> filteredNodes;
try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(3).withIpPrefix("127.0.1.").build()) {
MultimapHostResolverProvider.removeResolverEntries("test.cluster.fake");
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(1));
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(2));
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(3));
ccmBridge.create();
ccmBridge.start();
session = builder.build();
long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
while (System.currentTimeMillis() < endTime) {
try {
nodes = session.getMetadata().getNodes().values();
int upNodes = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
upNodes++;
}
}
if (upNodes == 3) {
break;
}
// session.refreshSchema();
SimpleStatement statement =
new SimpleStatementBuilder("SELECT * FROM system.local")
.setTimeout(Duration.ofSeconds(3))
.build();
session.executeAsync(statement);
Thread.sleep(3000);
} catch (InterruptedException e) {
break;
}
}
ResultSet rs = session.execute("SELECT * FROM system.local");
assertThat(rs).isNotNull();
Row row = rs.one();
assertThat(row).isNotNull();
nodes = session.getMetadata().getNodes().values();
assertThat(nodes).hasSize(3);
Iterator<Node> iterator = nodes.iterator();
while (iterator.hasNext()) {
LOG.trace("Metadata node: " + iterator.next().toString());
}
filteredNodes =
nodes.stream()
.filter(x -> x.toString().contains("test.cluster.fake"))
.collect(Collectors.toSet());
assertThat(filteredNodes).hasSize(1);
}
int counter = 0;
while (filteredNodes.size() == 1) {
counter++;
if (counter == 255) {
LOG.error("Completed 254 runs. Breaking.");
break;
}
LOG.warn(
"Launching another cluster until we lose resolved socket from metadata (run {}).",
counter);
try (CcmBridge ccmBridge =
CcmBridge.builder().withNodes(3).withIpPrefix("127.0." + counter + ".").build()) {
MultimapHostResolverProvider.removeResolverEntries("test.cluster.fake");
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(1));
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(2));
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(3));
ccmBridge.create();
ccmBridge.start();
long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
while (System.currentTimeMillis() < endTime) {
try {
nodes = session.getMetadata().getNodes().values();
int upNodes = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
upNodes++;
}
}
if (upNodes == 3) {
break;
}
SimpleStatement statement =
new SimpleStatementBuilder("SELECT * FROM system.local")
.setTimeout(Duration.ofSeconds(3))
.build();
session.executeAsync(statement);
Thread.sleep(3000);
} catch (InterruptedException e) {
break;
}
}
nodes = session.getMetadata().getNodes().values();
assertThat(nodes).hasSize(3);
Iterator<Node> iterator = nodes.iterator();
while (iterator.hasNext()) {
LOG.trace("Metadata node: " + iterator.next().toString());
}
filteredNodes =
nodes.stream()
.filter(x -> x.toString().contains("test.cluster.fake"))
.collect(Collectors.toSet());
if (filteredNodes.size() > 1) {
fail(
"Somehow there is more than 1 node in metadata with unresolved hostname. This should not ever happen.");
}
}
}
Iterator<Node> iterator = nodes.iterator();
while (iterator.hasNext()) {
InetSocketAddress address = (InetSocketAddress) iterator.next().getEndPoint().resolve();
assertFalse(address.isUnresolved());
}
try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(3).withIpPrefix("127.1.1.").build()) {
MultimapHostResolverProvider.removeResolverEntries("test.cluster.fake");
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(1));
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(2));
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(3));
// Now the driver should fail to reconnect since unresolved hostname is gone.
ccmBridge.create();
ccmBridge.start();
long endTime = System.currentTimeMillis() + CLUSTER_WAIT_SECONDS * 1000;
while (System.currentTimeMillis() < endTime) {
try {
nodes = session.getMetadata().getNodes().values();
int upNodes = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
upNodes++;
}
}
if (upNodes == 3) {
break;
}
// session.refreshSchema();
SimpleStatement statement =
new SimpleStatementBuilder("SELECT * FROM system.local")
.setTimeout(Duration.ofSeconds(3))
.build();
session.executeAsync(statement);
Thread.sleep(3000);
} catch (InterruptedException e) {
break;
}
}
session.execute("SELECT * FROM system.local");
}
session.close();
}
}
Loading