Skip to content

Commit 5a2f3d7

Browse files
committed
Add HandshakingTransportAddressConnector
The `PeerFinder`, introduced in elastic#32246, needs to be able to identify, and connect to, a remote master node using only its `TransportAddress`. This can be done by opening a single-channel connection to the address, performing a handshake, and only then forming a full-blown connection to the node. This change implements this logic.
1 parent 2176184 commit 5a2f3d7

File tree

3 files changed

+300
-5
lines changed

3 files changed

+300
-5
lines changed
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.discovery;
21+
22+
import org.elasticsearch.ElasticsearchException;
23+
import org.elasticsearch.Version;
24+
import org.elasticsearch.action.ActionListener;
25+
import org.elasticsearch.cluster.node.DiscoveryNode;
26+
import org.elasticsearch.common.component.AbstractComponent;
27+
import org.elasticsearch.common.settings.Setting;
28+
import org.elasticsearch.common.settings.Settings;
29+
import org.elasticsearch.common.transport.TransportAddress;
30+
import org.elasticsearch.common.unit.TimeValue;
31+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
32+
import org.elasticsearch.core.internal.io.IOUtils;
33+
import org.elasticsearch.discovery.PeerFinder.TransportAddressConnector;
34+
import org.elasticsearch.transport.ConnectionProfile;
35+
import org.elasticsearch.transport.Transport.Connection;
36+
import org.elasticsearch.transport.TransportRequestOptions.Type;
37+
import org.elasticsearch.transport.TransportService;
38+
39+
import static java.util.Collections.emptyMap;
40+
import static java.util.Collections.emptySet;
41+
42+
public class HandshakingTransportAddressConnector extends AbstractComponent implements TransportAddressConnector {
43+
44+
// connection timeout for probes
45+
public static final Setting<TimeValue> PROBE_CONNECT_TIMEOUT_SETTING =
46+
Setting.timeSetting("discovery.probe.connect_timeout",
47+
TimeValue.timeValueMillis(3000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
48+
// handshake timeout for probes
49+
public static final Setting<TimeValue> PROBE_HANDSHAKE_TIMEOUT_SETTING =
50+
Setting.timeSetting("discovery.probe.handshake_timeout",
51+
TimeValue.timeValueMillis(1000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
52+
53+
private final TransportService transportService;
54+
private final TimeValue probeConnectTimeout;
55+
private final TimeValue probeHandshakeTimeout;
56+
57+
public HandshakingTransportAddressConnector(Settings settings, TransportService transportService) {
58+
super(settings);
59+
this.transportService = transportService;
60+
probeConnectTimeout = PROBE_CONNECT_TIMEOUT_SETTING.get(settings);
61+
probeHandshakeTimeout = PROBE_HANDSHAKE_TIMEOUT_SETTING.get(settings);
62+
}
63+
64+
@Override
65+
public void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener<DiscoveryNode> listener) {
66+
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
67+
@Override
68+
protected void doRun() throws Exception {
69+
70+
// TODO if transportService is already connected to this address then skip the handshaking
71+
72+
final DiscoveryNode targetNode = new DiscoveryNode(transportAddress.toString(), transportAddress, emptyMap(),
73+
emptySet(), Version.CURRENT.minimumCompatibilityVersion());
74+
75+
logger.trace("[{}] opening probe connection", this);
76+
final Connection connection = transportService.openConnection(targetNode,
77+
ConnectionProfile.buildSingleChannelProfile(Type.REG, probeConnectTimeout, probeHandshakeTimeout));
78+
logger.trace("[{}] opened probe connection", this);
79+
80+
final DiscoveryNode remoteNode;
81+
try {
82+
remoteNode = transportService.handshake(connection, probeHandshakeTimeout.millis());
83+
// success means (amongst other things) that the cluster names match
84+
logger.trace("[{}] handshake successful: {}", this, remoteNode);
85+
} finally {
86+
IOUtils.closeWhileHandlingException(connection);
87+
}
88+
89+
// NOMERGE better exceptions for failure cases?
90+
if (remoteNode.equals(transportService.getLocalNode())) {
91+
// TODO cache this result for some time? forever?
92+
listener.onFailure(new ElasticsearchException("[" + transportAddress + "] is local node"));
93+
} else if (remoteNode.isMasterNode() == false) {
94+
// TODO cache this result for some time?
95+
listener.onFailure(new ElasticsearchException("[" + transportAddress + "] is not master eligible"));
96+
} else {
97+
transportService.connectToNode(remoteNode);
98+
logger.trace("[{}] full connection successful: {}", this, remoteNode);
99+
listener.onResponse(remoteNode);
100+
}
101+
}
102+
103+
@Override
104+
public void onFailure(Exception e) {
105+
listener.onFailure(e);
106+
}
107+
108+
@Override
109+
public String toString() {
110+
return "connectToRemoteMasterNode[" + transportAddress + "]";
111+
}
112+
});
113+
}
114+
}
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.discovery;
21+
22+
import org.apache.lucene.util.SetOnce;
23+
import org.elasticsearch.Version;
24+
import org.elasticsearch.action.ActionListener;
25+
import org.elasticsearch.cluster.ClusterName;
26+
import org.elasticsearch.cluster.node.DiscoveryNode;
27+
import org.elasticsearch.common.settings.Settings;
28+
import org.elasticsearch.test.ESTestCase;
29+
import org.elasticsearch.test.transport.CapturingTransport;
30+
import org.elasticsearch.threadpool.ThreadPool;
31+
import org.elasticsearch.transport.TransportRequest;
32+
import org.elasticsearch.transport.TransportService;
33+
import org.elasticsearch.transport.TransportService.HandshakeResponse;
34+
import org.junit.After;
35+
import org.junit.Before;
36+
37+
import java.util.concurrent.CountDownLatch;
38+
import java.util.concurrent.TimeUnit;
39+
40+
import static java.util.Collections.emptyMap;
41+
import static java.util.Collections.emptySet;
42+
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
43+
import static org.elasticsearch.discovery.HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING;
44+
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
45+
import static org.hamcrest.Matchers.equalTo;
46+
47+
public class HandshakingTransportAddressConnectorTests extends ESTestCase {
48+
49+
private DiscoveryNode remoteNode;
50+
private TransportService transportService;
51+
private ThreadPool threadPool;
52+
private String remoteClusterName;
53+
private HandshakingTransportAddressConnector handshakingTransportAddressConnector;
54+
private DiscoveryNode localNode;
55+
56+
private boolean dropHandshake;
57+
58+
@Before
59+
public void startServices() {
60+
localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
61+
final Settings settings = Settings.builder()
62+
.put(NODE_NAME_SETTING.getKey(), "node")
63+
.put(CLUSTER_NAME_SETTING.getKey(), "local-cluster")
64+
.build();
65+
threadPool = new ThreadPool(settings);
66+
67+
remoteNode = null;
68+
remoteClusterName = null;
69+
dropHandshake = false;
70+
71+
final CapturingTransport capturingTransport = new CapturingTransport() {
72+
@Override
73+
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
74+
super.onSendRequest(requestId, action, request, node);
75+
assertThat(action, equalTo(TransportService.HANDSHAKE_ACTION_NAME));
76+
assertEquals(remoteNode.getAddress(), node.getAddress());
77+
assertNotEquals(remoteNode, node);
78+
if (dropHandshake == false) {
79+
handleResponse(requestId, new HandshakeResponse(remoteNode, new ClusterName(remoteClusterName), Version.CURRENT));
80+
}
81+
}
82+
};
83+
84+
transportService = new TransportService(settings, capturingTransport, threadPool,
85+
TransportService.NOOP_TRANSPORT_INTERCEPTOR, address -> localNode, null, emptySet());
86+
87+
transportService.start();
88+
transportService.acceptIncomingRequests();
89+
90+
handshakingTransportAddressConnector = new HandshakingTransportAddressConnector(settings, transportService);
91+
}
92+
93+
@After
94+
public void stopServices() {
95+
transportService.stop();
96+
threadPool.shutdown();
97+
}
98+
99+
public void testConnectsToMasterNode() throws InterruptedException {
100+
final CountDownLatch completionLatch = new CountDownLatch(1);
101+
final SetOnce<DiscoveryNode> receivedNode = new SetOnce<>();
102+
103+
remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT);
104+
remoteClusterName = "local-cluster";
105+
106+
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), new ActionListener<DiscoveryNode>() {
107+
@Override
108+
public void onResponse(DiscoveryNode discoveryNode) {
109+
receivedNode.set(discoveryNode);
110+
completionLatch.countDown();
111+
}
112+
113+
@Override
114+
public void onFailure(Exception e) {
115+
throw new AssertionError(e);
116+
}
117+
});
118+
119+
assertTrue(completionLatch.await(1, TimeUnit.SECONDS));
120+
assertEquals(remoteNode, receivedNode.get());
121+
}
122+
123+
public void testDoesNotConnectToNonMasterNode() throws InterruptedException {
124+
remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
125+
remoteClusterName = "local-cluster";
126+
127+
FailureListener failureListener = new FailureListener();
128+
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener);
129+
failureListener.assertFailure();
130+
}
131+
132+
public void testDoesNotConnectToLocalNode() throws Exception {
133+
remoteNode = localNode;
134+
remoteClusterName = "local-cluster";
135+
136+
FailureListener failureListener = new FailureListener();
137+
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener);
138+
failureListener.assertFailure();
139+
}
140+
141+
public void testDoesNotConnectToDifferentCluster() throws InterruptedException {
142+
remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT);
143+
remoteClusterName = "another-cluster";
144+
145+
FailureListener failureListener = new FailureListener();
146+
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener);
147+
failureListener.assertFailure();
148+
}
149+
150+
public void testHandshakeTimesOut() throws InterruptedException {
151+
remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT);
152+
remoteClusterName = "local-cluster";
153+
dropHandshake = true;
154+
155+
FailureListener failureListener = new FailureListener();
156+
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener);
157+
Thread.sleep(PROBE_HANDSHAKE_TIMEOUT_SETTING.get(Settings.EMPTY).millis());
158+
failureListener.assertFailure();
159+
}
160+
161+
private class FailureListener implements ActionListener<DiscoveryNode> {
162+
final CountDownLatch completionLatch = new CountDownLatch(1);
163+
164+
@Override
165+
public void onResponse(DiscoveryNode discoveryNode) {
166+
fail(discoveryNode.toString());
167+
}
168+
169+
@Override
170+
public void onFailure(Exception e) {
171+
completionLatch.countDown();
172+
}
173+
174+
void assertFailure() throws InterruptedException {
175+
assertTrue(completionLatch.await(1, TimeUnit.SECONDS));
176+
}
177+
}
178+
}

test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -204,18 +204,21 @@ public DiscoveryNode getNode() {
204204

205205
@Override
206206
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
207-
throws IOException, TransportException {
208-
requests.put(requestId, Tuple.tuple(node, action));
209-
capturedRequests.add(new CapturedRequest(node, requestId, action, request));
207+
throws TransportException {
208+
onSendRequest(requestId, action, request, node);
210209
}
211210

212211
@Override
213-
public void close() throws IOException {
214-
212+
public void close() {
215213
}
216214
};
217215
}
218216

217+
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
218+
requests.put(requestId, Tuple.tuple(node, action));
219+
capturedRequests.add(new CapturedRequest(node, requestId, action, request));
220+
}
221+
219222
@Override
220223
public TransportStats getStats() {
221224
throw new UnsupportedOperationException();

0 commit comments

Comments
 (0)