Skip to content

Commit

Permalink
Add optional fallback for ControlConnection#reconnect()
Browse files Browse the repository at this point in the history
Adds an experimental option to allow `ControlConnection` to try
reconnecting to the original contact points held by `MetadataManager`,
in case of getting empty query plan from the load balancing policy.

In order to separate this logic from query plans of other queries
`LoadBalancingPolicyWrapper#newControlReconnectionQueryPlan()` was introduced
and is called during reconnection in place of `newQueryPlan()`.
  • Loading branch information
Bouncheck committed Oct 8, 2024
1 parent 35bf41a commit 2b280a2
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,15 @@ public enum DefaultDriverOption implements DriverOption {
*/
CONTROL_CONNECTION_AGREEMENT_WARN("advanced.control-connection.schema-agreement.warn-on-failure"),

/**
* Whether to forcibly add original contact points held by MetadataManager to the reconnection
* plan, in case there is no live nodes available according to LBP. Experimental.
*
* <p>Value-type: boolean
*/
CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS(
"advanced.control-connection.reconnection.fallback-to-original-contact-points"),

/**
* Whether `Session.prepare` calls should be sent to all nodes in the cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) {
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_INTERVAL, Duration.ofMillis(200));
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT, Duration.ofSeconds(10));
map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN, true);
map.put(TypedDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS, false);
map.put(TypedDriverOption.PREPARE_ON_ALL_NODES, true);
map.put(TypedDriverOption.REPREPARE_ENABLED, true);
map.put(TypedDriverOption.REPREPARE_CHECK_SYSTEM_TABLE, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,10 @@ public String toString() {
public static final TypedDriverOption<Boolean> CONTROL_CONNECTION_AGREEMENT_WARN =
new TypedDriverOption<>(
DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN, GenericType.BOOLEAN);
/** Whether to forcibly try original contacts if no live nodes are available */
public static final TypedDriverOption<Boolean> CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS =
new TypedDriverOption<>(
DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS, GenericType.BOOLEAN);
/** Whether `Session.prepare` calls should be sent to all nodes in the cluster. */
public static final TypedDriverOption<Boolean> PREPARE_ON_ALL_NODES =
new TypedDriverOption<>(DefaultDriverOption.PREPARE_ON_ALL_NODES, GenericType.BOOLEAN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ private void init(
.withOwnerLogPrefix(logPrefix + "|control")
.build();

Queue<Node> nodes = context.getLoadBalancingPolicyWrapper().newQueryPlan();
Queue<Node> nodes =
context.getLoadBalancingPolicyWrapper().newControlReconnectionQueryPlan();

connect(
nodes,
Expand Down Expand Up @@ -336,7 +337,7 @@ private void init(

private CompletionStage<Boolean> reconnect() {
assert adminExecutor.inEventLoop();
Queue<Node> nodes = context.getLoadBalancingPolicyWrapper().newQueryPlan();
Queue<Node> nodes = context.getLoadBalancingPolicyWrapper().newControlReconnectionQueryPlan();
CompletableFuture<Boolean> result = new CompletableFuture<>();
connect(
nodes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package com.datastax.oss.driver.internal.core.metadata;

import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
Expand Down Expand Up @@ -161,8 +162,25 @@ public Queue<Node> newQueryPlan(
}

@NonNull
public Queue<Node> newQueryPlan() {
return newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null);
public Queue<Node> newControlReconnectionQueryPlan() {
// First try the original way
Queue<Node> regularQueryPlan = newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null);
if (!regularQueryPlan.isEmpty()) return regularQueryPlan;

if (context
.getConfig()
.getDefaultProfile()
.getBoolean(DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS)) {
Set<DefaultNode> originalNodes = context.getMetadataManager().getContactPoints();
List<Node> nodes = new ArrayList<>();
for (DefaultNode node : originalNodes) {
nodes.add(new DefaultNode(node.getEndPoint(), context));
}
Collections.shuffle(nodes);
return new ConcurrentLinkedQueue<>(nodes);
} else {
return regularQueryPlan;
}
}

// when it comes in from the outside
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2113,6 +2113,17 @@ datastax-java-driver {
# Overridable in a profile: no
warn-on-failure = true
}

reconnection {
# Whether to forcibly add original contact points held by MetadataManager to the reconnection plan,
# in case there is no live nodes available according to LBP.
# Experimental.
#
# Required: yes
# Modifiable at runtime: yes, the new value will be used for checks issued after the change.
# Overridable in a profile: no
fallback-to-original-contact-points = false
}
}

advanced.prepared-statements {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,25 @@ public void setup() {
when(defaultProfile.getBoolean(DefaultDriverOption.CONNECTION_WARN_INIT_ERROR))
.thenReturn(false);

when(context.getConfig()).thenReturn(config);
when(config.getDefaultProfile()).thenReturn(defaultProfile);
when(defaultProfile.getBoolean(DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS))
.thenReturn(false);

controlConnection = new ControlConnection(context);
}

protected void mockQueryPlan(Node... nodes) {
when(loadBalancingPolicyWrapper.newQueryPlan())
when(loadBalancingPolicyWrapper.newControlReconnectionQueryPlan())
.thenAnswer(
i -> {
ConcurrentLinkedQueue<Node> queryPlan = new ConcurrentLinkedQueue<>();
for (Node node : nodes) {
queryPlan.offer(node);
}
return queryPlan;
});
when(loadBalancingPolicyWrapper.newControlReconnectionQueryPlan())
.thenAnswer(
i -> {
ConcurrentLinkedQueue<Node> queryPlan = new ConcurrentLinkedQueue<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,22 @@ public void setup() {
policy3));
}

@Test
public void should_build_control_connection_query_plan_from_contact_points_before_init() {
// When
Queue<Node> queryPlan = wrapper.newControlReconnectionQueryPlan();

// Then
for (LoadBalancingPolicy policy : ImmutableList.of(policy1, policy2, policy3)) {
verify(policy, never()).newQueryPlan(null, null);
}
assertThat(queryPlan).hasSameElementsAs(contactPoints);
}

@Test
public void should_build_query_plan_from_contact_points_before_init() {
// When
Queue<Node> queryPlan = wrapper.newQueryPlan();
Queue<Node> queryPlan = wrapper.newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null);

// Then
for (LoadBalancingPolicy policy : ImmutableList.of(policy1, policy2, policy3)) {
Expand All @@ -139,7 +151,24 @@ public void should_fetch_query_plan_from_policy_after_init() {
}

// When
Queue<Node> queryPlan = wrapper.newQueryPlan();
Queue<Node> queryPlan = wrapper.newControlReconnectionQueryPlan();

// Then
// no-arg newQueryPlan() uses the default profile
verify(policy1).newQueryPlan(null, null);
assertThat(queryPlan).isEqualTo(defaultPolicyQueryPlan);
}

@Test
public void should_fetch_control_connection_query_plan_from_policy_after_init() {
// Given
wrapper.init();
for (LoadBalancingPolicy policy : ImmutableList.of(policy1, policy2, policy3)) {
verify(policy).init(anyMap(), any(DistanceReporter.class));
}

// When
Queue<Node> queryPlan = wrapper.newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null);

// Then
// no-arg newQueryPlan() uses the default profile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,6 @@ public void cannot_reconnect_with_resolved_socket() {
break;
}
}
/*
ResultSet rs = session.execute("SELECT * FROM system.local");
assertThat(rs).isNotNull();
Row row = rs.one();
assertThat(row).isNotNull();
*/
nodes = session.getMetadata().getNodes().values();
assertThat(nodes).hasSize(3);
Iterator<Node> iterator = nodes.iterator();
Expand Down Expand Up @@ -415,19 +409,6 @@ public void cannot_reconnect_with_resolved_socket() {
break;
}
}
/*
for (int i = 0; i < 15; i++) {
try {
nodes = session.getMetadata().getNodes().values();
if (nodes.size() == 3) {
break;
}
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
}
*/
session.execute("SELECT * FROM system.local");
}
session.close();
Expand Down

0 comments on commit 2b280a2

Please sign in to comment.