From eb31f0a5170b33e38f187d633e86643f5af01ca5 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 11 May 2018 10:09:50 -0600 Subject: [PATCH 1/5] Start work --- .../TransportClientNodesService.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 109efb400bc93..d560e3aa48070 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -21,7 +21,7 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; @@ -56,6 +56,7 @@ import org.elasticsearch.transport.TransportService; import java.io.Closeable; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -89,6 +90,7 @@ final class TransportClientNodesService extends AbstractComponent implements Clo private final Object mutex = new Object(); private volatile List nodes = Collections.emptyList(); + // Filtered nodes are nodes whose cluster name does not match the configured cluster name private volatile List filteredNodes = Collections.emptyList(); private final AtomicInteger tempNodeIdGenerator = new AtomicInteger(); @@ -364,7 +366,7 @@ public void sample() { * validates a set of potentially newly discovered nodes and returns an immutable * list of the nodes that has passed. */ - protected List validateNewNodes(Set nodes) { + protected List establishNodeConnections(Set nodes) { for (Iterator it = nodes.iterator(); it.hasNext(); ) { DiscoveryNode node = it.next(); if (!transportService.nodeConnected(node)) { @@ -402,14 +404,16 @@ class SimpleNodeSampler extends NodeSampler { @Override protected void doSample() { HashSet newNodes = new HashSet<>(); - HashSet newFilteredNodes = new HashSet<>(); + ArrayList newFilteredNodes = new ArrayList<>(); for (DiscoveryNode listedNode : listedNodes) { try (Transport.Connection connection = transportService.openConnection(listedNode, LISTED_NODES_PROFILE)){ final PlainTransportFuture handler = new PlainTransportFuture<>( new FutureTransportResponseHandler() { @Override - public LivenessResponse newInstance() { - return new LivenessResponse(); + public LivenessResponse read(StreamInput in) throws IOException { + LivenessResponse response = new LivenessResponse(); + response.readFrom(in); + return response; } }); transportService.sendRequest(connection, TransportLivenessAction.NAME, new LivenessRequest(), @@ -435,8 +439,8 @@ public LivenessResponse newInstance() { } } - nodes = validateNewNodes(newNodes); - filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes)); + nodes = establishNodeConnections(newNodes); + filteredNodes = Collections.unmodifiableList(newFilteredNodes); } } @@ -557,7 +561,7 @@ public void handleException(TransportException e) { } } - nodes = validateNewNodes(newNodes); + nodes = establishNodeConnections(newNodes); filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes)); } } From 25c288e98acc11cc20c59295535b73408fde5dd8 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 18 May 2018 13:57:33 -0600 Subject: [PATCH 2/5] Remove validation --- .../TransportClientNodesService.java | 13 ++++++------- .../transport/TransportService.java | 19 ++++++++++++++++++- .../TransportServiceHandshakeTests.java | 15 +++++++++++++++ 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index d560e3aa48070..191fa470eef20 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -363,16 +363,16 @@ public void sample() { protected abstract void doSample(); /** - * validates a set of potentially newly discovered nodes and returns an immutable - * list of the nodes that has passed. + * Establishes the node connections. If validateInHandshake is set to true, the connection will fail if + * node returned in the handshake response is different than the discovery node. */ - protected List establishNodeConnections(Set nodes) { + List establishNodeConnections(Set nodes, boolean validateInHandshake) { for (Iterator it = nodes.iterator(); it.hasNext(); ) { DiscoveryNode node = it.next(); if (!transportService.nodeConnected(node)) { try { logger.trace("connecting to node [{}]", node); - transportService.connectToNode(node); + transportService.connectToNode(node, null, validateInHandshake); } catch (Exception e) { it.remove(); logger.debug(() -> new ParameterizedMessage("failed to connect to discovered node [{}]", node), e); @@ -382,7 +382,6 @@ protected List establishNodeConnections(Set nodes) return Collections.unmodifiableList(new ArrayList<>(nodes)); } - } class ScheduledNodeSampler implements Runnable { @@ -439,7 +438,7 @@ public LivenessResponse read(StreamInput in) throws IOException { } } - nodes = establishNodeConnections(newNodes); + nodes = establishNodeConnections(newNodes, false); filteredNodes = Collections.unmodifiableList(newFilteredNodes); } } @@ -561,7 +560,7 @@ public void handleException(TransportException e) { } } - nodes = establishNodeConnections(newNodes); + nodes = establishNodeConnections(newNodes, true); filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes)); } } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 44dac1d8eae8f..b0d8b97a6f6b7 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -314,6 +314,11 @@ public boolean nodeConnected(DiscoveryNode node) { return isLocalNode(node) || transport.nodeConnected(node); } + /** + * Connect to the specified node with the default connection profile + * + * @param node the node to connect to + */ public void connectToNode(DiscoveryNode node) throws ConnectTransportException { connectToNode(node, null); } @@ -325,13 +330,25 @@ public void connectToNode(DiscoveryNode node) throws ConnectTransportException { * @param connectionProfile the connection profile to use when connecting to this node */ public void connectToNode(final DiscoveryNode node, ConnectionProfile connectionProfile) { + connectToNode(node, connectionProfile, true); + } + + /** + * Connect to the specified node with the given connection profile. This method allows the caller to + * specify whether the node connection should be validated against the specified node. + * + * @param node the node to connect to + * @param connectionProfile the connection profile to use when connecting to this node + * @param validateNodeConnection boolean indicating if the node connection should be validated + */ + public void connectToNode(final DiscoveryNode node, ConnectionProfile connectionProfile, boolean validateNodeConnection) { if (isLocalNode(node)) { return; } transport.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> { // We don't validate cluster names to allow for CCS connections. final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true); - if (node.equals(remote) == false) { + if (validateNodeConnection && node.equals(remote) == false) { throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote); } }); diff --git a/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java b/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java index 08d88ad2e0486..e199246bee4fd 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java @@ -178,6 +178,21 @@ public void testNodeConnectWithDifferentNodeId() { assertFalse(handleA.transportService.nodeConnected(discoveryNode)); } + public void testNodeConnectWithDifferentNodeIdSucceedsIfNoValidation() { + Settings settings = Settings.builder().put("cluster.name", "test").build(); + NetworkHandle handleA = startServices("TS_A", settings, Version.CURRENT); + NetworkHandle handleB = startServices("TS_B", settings, Version.CURRENT); + DiscoveryNode discoveryNode = new DiscoveryNode( + randomAlphaOfLength(10), + handleB.discoveryNode.getAddress(), + emptyMap(), + emptySet(), + handleB.discoveryNode.getVersion()); + + handleA.transportService.connectToNode(discoveryNode, MockTcpTransport.LIGHT_PROFILE, false); + assertTrue(handleA.transportService.nodeConnected(discoveryNode)); + } + private static class NetworkHandle { private TransportService transportService; From 53c087bc6ef23e1ded3c7dced264e33f85615d69 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 18 May 2018 14:25:13 -0600 Subject: [PATCH 3/5] White space --- .../client/transport/TransportClientNodesService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 191fa470eef20..063a244792eab 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -270,7 +270,7 @@ public static class RetryListener implements ActionListener private volatile int i; RetryListener(NodeListenerCallback callback, ActionListener listener, - List nodes, int index, TransportClient.HostFailureListener hostFailureListener) { + List nodes, int index, TransportClient.HostFailureListener hostFailureListener) { this.callback = callback; this.listener = listener; this.nodes = nodes; From 2b2b0362cc0d3cd0de041950110d831ee1e809dc Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 29 May 2018 16:19:23 -0600 Subject: [PATCH 4/5] Changes based on review --- .../TransportClientNodesService.java | 2 +- .../transport/TransportService.java | 21 +++++------- .../TransportServiceHandshakeTests.java | 33 ++++++++++++++++--- 3 files changed, 37 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 063a244792eab..d6a366fd30d83 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -372,7 +372,7 @@ List establishNodeConnections(Set nodes, boolean v if (!transportService.nodeConnected(node)) { try { logger.trace("connecting to node [{}]", node); - transportService.connectToNode(node, null, validateInHandshake); + transportService.connectToNode(node); } catch (Exception e) { it.remove(); logger.debug(() -> new ParameterizedMessage("failed to connect to discovered node [{}]", node), e); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index b0d8b97a6f6b7..9755898be5fef 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -21,6 +21,8 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction; @@ -124,6 +126,8 @@ protected boolean removeEldestEntry(Map.Entry eldest) { private final RemoteClusterService remoteClusterService; + private final boolean validateConnections; + /** if set will call requests sent to this id to shortcut and executed locally */ volatile DiscoveryNode localNode = null; private final Transport.Connection localNodeConnection = new Transport.Connection() { @@ -153,6 +157,9 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa Function localNodeFactory, @Nullable ClusterSettings clusterSettings, Set taskHeaders) { super(settings); + // The only time we do not want to validate node connections is when this is a transport client using the simple node sampler + this.validateConnections = TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey())) == false || + TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings); this.transport = transport; this.threadPool = threadPool; this.localNodeFactory = localNodeFactory; @@ -330,25 +337,13 @@ public void connectToNode(DiscoveryNode node) throws ConnectTransportException { * @param connectionProfile the connection profile to use when connecting to this node */ public void connectToNode(final DiscoveryNode node, ConnectionProfile connectionProfile) { - connectToNode(node, connectionProfile, true); - } - - /** - * Connect to the specified node with the given connection profile. This method allows the caller to - * specify whether the node connection should be validated against the specified node. - * - * @param node the node to connect to - * @param connectionProfile the connection profile to use when connecting to this node - * @param validateNodeConnection boolean indicating if the node connection should be validated - */ - public void connectToNode(final DiscoveryNode node, ConnectionProfile connectionProfile, boolean validateNodeConnection) { if (isLocalNode(node)) { return; } transport.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> { // We don't validate cluster names to allow for CCS connections. final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true); - if (validateNodeConnection && node.equals(remote) == false) { + if (validateConnections && node.equals(remote) == false) { throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote); } }); diff --git a/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java b/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java index e199246bee4fd..ed1dfded0782f 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java @@ -20,6 +20,8 @@ package org.elasticsearch.transport; import org.elasticsearch.Version; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; @@ -178,10 +180,11 @@ public void testNodeConnectWithDifferentNodeId() { assertFalse(handleA.transportService.nodeConnected(discoveryNode)); } - public void testNodeConnectWithDifferentNodeIdSucceedsIfNoValidation() { - Settings settings = Settings.builder().put("cluster.name", "test").build(); - NetworkHandle handleA = startServices("TS_A", settings, Version.CURRENT); - NetworkHandle handleB = startServices("TS_B", settings, Version.CURRENT); + public void testNodeConnectWithDifferentNodeIdSucceedsIfThisIsTransportClientOfSimpleNodeSampler() { + Settings.Builder settings = Settings.builder().put("cluster.name", "test"); + Settings transportClientSettings = settings.put(Client.CLIENT_TYPE_SETTING_S.getKey(), TransportClient.CLIENT_TYPE).build(); + NetworkHandle handleA = startServices("TS_A", transportClientSettings, Version.CURRENT); + NetworkHandle handleB = startServices("TS_B", settings.build(), Version.CURRENT); DiscoveryNode discoveryNode = new DiscoveryNode( randomAlphaOfLength(10), handleB.discoveryNode.getAddress(), @@ -189,10 +192,30 @@ public void testNodeConnectWithDifferentNodeIdSucceedsIfNoValidation() { emptySet(), handleB.discoveryNode.getVersion()); - handleA.transportService.connectToNode(discoveryNode, MockTcpTransport.LIGHT_PROFILE, false); + handleA.transportService.connectToNode(discoveryNode, MockTcpTransport.LIGHT_PROFILE); assertTrue(handleA.transportService.nodeConnected(discoveryNode)); } + public void testNodeConnectWithDifferentNodeIdFailsWhenSnifferTransportClient() { + Settings.Builder settings = Settings.builder().put("cluster.name", "test"); + Settings transportClientSettings = settings.put(Client.CLIENT_TYPE_SETTING_S.getKey(), TransportClient.CLIENT_TYPE) + .put(TransportClient.CLIENT_TRANSPORT_SNIFF.getKey(), true) + .build(); + NetworkHandle handleA = startServices("TS_A", transportClientSettings, Version.CURRENT); + NetworkHandle handleB = startServices("TS_B", settings.build(), Version.CURRENT); + DiscoveryNode discoveryNode = new DiscoveryNode( + randomAlphaOfLength(10), + handleB.discoveryNode.getAddress(), + emptyMap(), + emptySet(), + handleB.discoveryNode.getVersion()); + ConnectTransportException ex = expectThrows(ConnectTransportException.class, () -> { + handleA.transportService.connectToNode(discoveryNode, MockTcpTransport.LIGHT_PROFILE); + }); + assertThat(ex.getMessage(), containsString("unexpected remote node")); + assertFalse(handleA.transportService.nodeConnected(discoveryNode)); + } + private static class NetworkHandle { private TransportService transportService; From d93cb12dbbc297c9c46d205eb0096d087128a0a1 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 31 May 2018 09:48:53 -0600 Subject: [PATCH 5/5] Remove variable --- .../client/transport/TransportClientNodesService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index d6a366fd30d83..aa0672d80ba1d 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -366,7 +366,7 @@ public void sample() { * Establishes the node connections. If validateInHandshake is set to true, the connection will fail if * node returned in the handshake response is different than the discovery node. */ - List establishNodeConnections(Set nodes, boolean validateInHandshake) { + List establishNodeConnections(Set nodes) { for (Iterator it = nodes.iterator(); it.hasNext(); ) { DiscoveryNode node = it.next(); if (!transportService.nodeConnected(node)) { @@ -438,7 +438,7 @@ public LivenessResponse read(StreamInput in) throws IOException { } } - nodes = establishNodeConnections(newNodes, false); + nodes = establishNodeConnections(newNodes); filteredNodes = Collections.unmodifiableList(newFilteredNodes); } } @@ -560,7 +560,7 @@ public void handleException(TransportException e) { } } - nodes = establishNodeConnections(newNodes, true); + nodes = establishNodeConnections(newNodes); filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes)); } }