Skip to content

Commit 3d5f113

Browse files
authored
Ensure we don't use a remote profile if cluster name matches (#31331)
If we are running into a race condition between a node being configured to be a remote node for cross cluster search etc. and that node joining the cluster we might connect to that node with a remote profile. If that node now joins the cluster it connected to it as a CCS remote node we use the wrong profile and can't use bulk connections etc. anymore. This change uses the remote profile only if we connect to a node that has a different cluster name than the local cluster. This is not a perfect fix for this situation but is the safe option while potentially only loose a small optimization of using less connections per node which is small anyways since we only connect to a small set of nodes. Closes #29321
1 parent 5b94afd commit 3d5f113

File tree

4 files changed

+153
-11
lines changed

4 files changed

+153
-11
lines changed

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
8787
private volatile boolean skipUnavailable;
8888
private final ConnectHandler connectHandler;
8989
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
90+
private final ClusterName localClusterName;
9091

9192
/**
9293
* Creates a new {@link RemoteClusterConnection}
@@ -100,6 +101,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
100101
RemoteClusterConnection(Settings settings, String clusterAlias, List<DiscoveryNode> seedNodes,
101102
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
102103
super(settings);
104+
this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
103105
this.transportService = transportService;
104106
this.maxNumRemoteConnections = maxNumRemoteConnections;
105107
this.nodePredicate = nodePredicate;
@@ -310,6 +312,21 @@ public boolean isClosed() {
310312
return connectHandler.isClosed();
311313
}
312314

315+
private ConnectionProfile getRemoteProfile(ClusterName name) {
316+
// we can only compare the cluster name to make a decision if we should use a remote profile
317+
// we can't use a cluster UUID here since we could be connecting to that remote cluster before
318+
// the remote node has joined its cluster and have a cluster UUID. The fact that we just lose a
319+
// rather smallish optimization on the connection layer under certain situations where remote clusters
320+
// have the same name as the local one is minor here.
321+
// the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster,
322+
// gained a cluster UUID and then start connecting etc. we rather use this simplification in order to maintain simplicity
323+
if (this.localClusterName.equals(name)) {
324+
return null;
325+
} else {
326+
return remoteProfile;
327+
}
328+
}
329+
313330
/**
314331
* The connect handler manages node discovery and the actual connect to the remote cluster.
315332
* There is at most one connect job running at any time. If such a connect job is triggered
@@ -419,7 +436,6 @@ protected void doRun() {
419436
collectRemoteNodes(seedNodes.iterator(), transportService, listener);
420437
}
421438
});
422-
423439
}
424440

425441
void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
@@ -431,21 +447,27 @@ void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
431447
if (seedNodes.hasNext()) {
432448
cancellableThreads.executeIO(() -> {
433449
final DiscoveryNode seedNode = seedNodes.next();
434-
final DiscoveryNode handshakeNode;
450+
final TransportService.HandshakeResponse handshakeResponse;
435451
Transport.Connection connection = transportService.openConnection(seedNode,
436452
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
437453
boolean success = false;
438454
try {
439455
try {
440-
handshakeNode = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(),
456+
handshakeResponse = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(),
441457
(c) -> remoteClusterName.get() == null ? true : c.equals(remoteClusterName.get()));
442458
} catch (IllegalStateException ex) {
443459
logger.warn(() -> new ParameterizedMessage("seed node {} cluster name mismatch expected " +
444460
"cluster name {}", connection.getNode(), remoteClusterName.get()), ex);
445461
throw ex;
446462
}
463+
464+
final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode();
447465
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
448-
transportService.connectToNode(handshakeNode, remoteProfile);
466+
transportService.connectToNode(handshakeNode, getRemoteProfile(handshakeResponse.getClusterName()));
467+
if (remoteClusterName.get() == null) {
468+
assert handshakeResponse.getClusterName().value() != null;
469+
remoteClusterName.set(handshakeResponse.getClusterName());
470+
}
449471
connectedNodes.add(handshakeNode);
450472
}
451473
ClusterStateRequest request = new ClusterStateRequest();
@@ -552,7 +574,8 @@ public void handleResponse(ClusterStateResponse response) {
552574
for (DiscoveryNode node : nodesIter) {
553575
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
554576
try {
555-
transportService.connectToNode(node, remoteProfile); // noop if node is connected
577+
transportService.connectToNode(node, getRemoteProfile(remoteClusterName.get())); // noop if node is
578+
// connected
556579
connectedNodes.add(node);
557580
} catch (ConnectTransportException | IllegalStateException ex) {
558581
// ISE if we fail the handshake with an version incompatible node

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
342342
}
343343
transport.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> {
344344
// We don't validate cluster names to allow for CCS connections.
345-
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true);
345+
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true).discoveryNode;
346346
if (validateConnections && node.equals(remote) == false) {
347347
throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote);
348348
}
@@ -378,7 +378,7 @@ public Transport.Connection openConnection(final DiscoveryNode node, ConnectionP
378378
public DiscoveryNode handshake(
379379
final Transport.Connection connection,
380380
final long handshakeTimeout) throws ConnectTransportException {
381-
return handshake(connection, handshakeTimeout, clusterName::equals);
381+
return handshake(connection, handshakeTimeout, clusterName::equals).discoveryNode;
382382
}
383383

384384
/**
@@ -390,11 +390,11 @@ public DiscoveryNode handshake(
390390
* @param connection the connection to a specific node
391391
* @param handshakeTimeout handshake timeout
392392
* @param clusterNamePredicate cluster name validation predicate
393-
* @return the connected node
393+
* @return the handshake response
394394
* @throws ConnectTransportException if the connection failed
395395
* @throws IllegalStateException if the handshake failed
396396
*/
397-
public DiscoveryNode handshake(
397+
public HandshakeResponse handshake(
398398
final Transport.Connection connection,
399399
final long handshakeTimeout, Predicate<ClusterName> clusterNamePredicate) throws ConnectTransportException {
400400
final HandshakeResponse response;
@@ -420,7 +420,7 @@ public HandshakeResponse newInstance() {
420420
throw new IllegalStateException("handshake failed, incompatible version [" + response.version + "] - " + node);
421421
}
422422

423-
return response.discoveryNode;
423+
return response;
424424
}
425425

426426
static class HandshakeRequest extends TransportRequest {
@@ -461,6 +461,14 @@ public void writeTo(StreamOutput out) throws IOException {
461461
clusterName.writeTo(out);
462462
Version.writeVersion(version, out);
463463
}
464+
465+
public DiscoveryNode getDiscoveryNode() {
466+
return discoveryNode;
467+
}
468+
469+
public ClusterName getClusterName() {
470+
return clusterName;
471+
}
464472
}
465473

466474
public void disconnectFromNode(DiscoveryNode node) {

server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,102 @@ public static MockTransportService startTransport(
142142
}
143143
}
144144

145+
public void testLocalProfileIsUsedForLocalCluster() throws Exception {
146+
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
147+
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
148+
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
149+
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
150+
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
151+
knownNodes.add(seedTransport.getLocalDiscoNode());
152+
knownNodes.add(discoverableTransport.getLocalDiscoNode());
153+
Collections.shuffle(knownNodes, random());
154+
155+
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
156+
service.start();
157+
service.acceptIncomingRequests();
158+
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
159+
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
160+
updateSeedNodes(connection, Arrays.asList(seedNode));
161+
assertTrue(service.nodeConnected(seedNode));
162+
assertTrue(service.nodeConnected(discoverableNode));
163+
assertTrue(connection.assertNoRunningConnections());
164+
PlainTransportFuture<ClusterSearchShardsResponse> futureHandler = new PlainTransportFuture<>(
165+
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
166+
@Override
167+
public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
168+
ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse();
169+
inst.readFrom(in);
170+
return inst;
171+
}
172+
});
173+
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK)
174+
.build();
175+
service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(),
176+
options, futureHandler);
177+
futureHandler.txGet();
178+
}
179+
}
180+
}
181+
}
182+
183+
public void testRemoteProfileIsUsedForRemoteCluster() throws Exception {
184+
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
185+
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool,
186+
Settings.builder().put("cluster.name", "foobar").build());
187+
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT,
188+
threadPool, Settings.builder().put("cluster.name", "foobar").build())) {
189+
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
190+
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
191+
knownNodes.add(seedTransport.getLocalDiscoNode());
192+
knownNodes.add(discoverableTransport.getLocalDiscoNode());
193+
Collections.shuffle(knownNodes, random());
194+
195+
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
196+
service.start();
197+
service.acceptIncomingRequests();
198+
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
199+
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
200+
updateSeedNodes(connection, Arrays.asList(seedNode));
201+
assertTrue(service.nodeConnected(seedNode));
202+
assertTrue(service.nodeConnected(discoverableNode));
203+
assertTrue(connection.assertNoRunningConnections());
204+
PlainTransportFuture<ClusterSearchShardsResponse> futureHandler = new PlainTransportFuture<>(
205+
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
206+
@Override
207+
public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
208+
ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse();
209+
inst.readFrom(in);
210+
return inst;
211+
}
212+
});
213+
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK)
214+
.build();
215+
IllegalStateException ise = (IllegalStateException) expectThrows(SendRequestTransportException.class, () -> {
216+
service.sendRequest(discoverableNode,
217+
ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(), options, futureHandler);
218+
futureHandler.txGet();
219+
}).getCause();
220+
assertEquals(ise.getMessage(), "can't select channel size is 0 for types: [RECOVERY, BULK, STATE]");
221+
222+
PlainTransportFuture<ClusterSearchShardsResponse> handler = new PlainTransportFuture<>(
223+
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
224+
@Override
225+
public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
226+
ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse();
227+
inst.readFrom(in);
228+
return inst;
229+
}
230+
});
231+
TransportRequestOptions ops = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG)
232+
.build();
233+
service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(),
234+
ops, handler);
235+
handler.txGet();
236+
}
237+
}
238+
}
239+
}
240+
145241
public void testDiscoverSingleNode() throws Exception {
146242
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
147243
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);

test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,22 @@ protected MockChannel initiateChannel(InetSocketAddress address, ActionListener<
191191
@Override
192192
protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectionProfile) {
193193
ConnectionProfile connectionProfile1 = resolveConnectionProfile(connectionProfile, defaultConnectionProfile);
194-
ConnectionProfile.Builder builder = new ConnectionProfile.Builder(LIGHT_PROFILE);
194+
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
195+
Set<TransportRequestOptions.Type> allTypesWithConnection = new HashSet<>();
196+
Set<TransportRequestOptions.Type> allTypesWithoutConnection = new HashSet<>();
197+
for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile1.getHandles()) {
198+
Set<TransportRequestOptions.Type> types = handle.getTypes();
199+
if (handle.length > 0) {
200+
allTypesWithConnection.addAll(types);
201+
} else {
202+
allTypesWithoutConnection.addAll(types);
203+
}
204+
}
205+
// make sure we maintain at least the types that are supported by this profile even if we only use a single channel for them.
206+
builder.addConnections(1, allTypesWithConnection.toArray(new TransportRequestOptions.Type[0]));
207+
if (allTypesWithoutConnection.isEmpty() == false) {
208+
builder.addConnections(0, allTypesWithoutConnection.toArray(new TransportRequestOptions.Type[0]));
209+
}
195210
builder.setHandshakeTimeout(connectionProfile1.getHandshakeTimeout());
196211
builder.setConnectTimeout(connectionProfile1.getConnectTimeout());
197212
return builder.build();

0 commit comments

Comments
 (0)