3535import org .elasticsearch .cluster .node .DiscoveryNode ;
3636import org .elasticsearch .cluster .node .DiscoveryNodes ;
3737import org .elasticsearch .common .component .AbstractComponent ;
38+ import org .elasticsearch .common .io .stream .StreamInput ;
3839import org .elasticsearch .common .settings .Settings ;
3940import org .elasticsearch .common .transport .TransportAddress ;
4041import org .elasticsearch .common .unit .TimeValue ;
8081final class RemoteClusterConnection extends AbstractComponent implements TransportConnectionListener , Closeable {
8182
8283 private final TransportService transportService ;
84+ private final ConnectionManager connectionManager ;
8385 private final ConnectionProfile remoteProfile ;
8486 private final ConnectedNodes connectedNodes ;
8587 private final String clusterAlias ;
8688 private final int maxNumRemoteConnections ;
8789 private final Predicate <DiscoveryNode > nodePredicate ;
90+ private final ThreadPool threadPool ;
8891 private volatile List <Supplier <DiscoveryNode >> seedNodes ;
8992 private volatile boolean skipUnavailable ;
9093 private final ConnectHandler connectHandler ;
9194 private SetOnce <ClusterName > remoteClusterName = new SetOnce <>();
92- private final ClusterName localClusterName ;
9395
9496 /**
9597 * Creates a new {@link RemoteClusterConnection}
9698 * @param settings the nodes settings object
9799 * @param clusterAlias the configured alias of the cluster to connect to
98100 * @param seedNodes a list of seed nodes to discover eligible nodes from
99101 * @param transportService the local nodes transport service
102+ * @param connectionManager the connection manager to use for this remote connection
100103 * @param maxNumRemoteConnections the maximum number of connections to the remote cluster
101104 * @param nodePredicate a predicate to filter eligible remote nodes to connect to
102105 */
103106 RemoteClusterConnection (Settings settings , String clusterAlias , List <Supplier <DiscoveryNode >> seedNodes ,
104- TransportService transportService , int maxNumRemoteConnections , Predicate <DiscoveryNode > nodePredicate ) {
107+ TransportService transportService , ConnectionManager connectionManager , int maxNumRemoteConnections ,
108+ Predicate <DiscoveryNode > nodePredicate ) {
105109 super (settings );
106- this .localClusterName = ClusterName .CLUSTER_NAME_SETTING .get (settings );
107110 this .transportService = transportService ;
108111 this .maxNumRemoteConnections = maxNumRemoteConnections ;
109112 this .nodePredicate = nodePredicate ;
@@ -122,7 +125,11 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
122125 this .skipUnavailable = RemoteClusterService .REMOTE_CLUSTER_SKIP_UNAVAILABLE
123126 .getConcreteSettingForNamespace (clusterAlias ).get (settings );
124127 this .connectHandler = new ConnectHandler ();
125- transportService .addConnectionListener (this );
128+ this .threadPool = transportService .threadPool ;
129+ this .connectionManager = connectionManager ;
130+ connectionManager .addListener (this );
131+ // we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
132+ connectionManager .addListener (transportService );
126133 }
127134
128135 /**
@@ -183,8 +190,9 @@ public void ensureConnected(ActionListener<Void> voidActionListener) {
183190
184191 private void fetchShardsInternal (ClusterSearchShardsRequest searchShardsRequest ,
185192 final ActionListener <ClusterSearchShardsResponse > listener ) {
186- final DiscoveryNode node = connectedNodes .getAny ();
187- transportService .sendRequest (node , ClusterSearchShardsAction .NAME , searchShardsRequest ,
193+ final DiscoveryNode node = getAnyConnectedNode ();
194+ Transport .Connection connection = connectionManager .getConnection (node );
195+ transportService .sendRequest (connection , ClusterSearchShardsAction .NAME , searchShardsRequest , TransportRequestOptions .EMPTY ,
188196 new TransportResponseHandler <ClusterSearchShardsResponse >() {
189197
190198 @ Override
@@ -219,12 +227,16 @@ void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
219227 request .clear ();
220228 request .nodes (true );
221229 request .local (true ); // run this on the node that gets the request it's as good as any other
222- final DiscoveryNode node = connectedNodes .getAny ();
223- transportService .sendRequest (node , ClusterStateAction .NAME , request , TransportRequestOptions .EMPTY ,
230+ final DiscoveryNode node = getAnyConnectedNode ();
231+ Transport .Connection connection = connectionManager .getConnection (node );
232+ transportService .sendRequest (connection , ClusterStateAction .NAME , request , TransportRequestOptions .EMPTY ,
224233 new TransportResponseHandler <ClusterStateResponse >() {
234+
225235 @ Override
226- public ClusterStateResponse newInstance () {
227- return new ClusterStateResponse ();
236+ public ClusterStateResponse read (StreamInput in ) throws IOException {
237+ ClusterStateResponse response = new ClusterStateResponse ();
238+ response .readFrom (in );
239+ return response ;
228240 }
229241
230242 @ Override
@@ -261,11 +273,11 @@ public String executor() {
261273 * If such node is not connected, the returned connection will be a proxy connection that redirects to it.
262274 */
263275 Transport .Connection getConnection (DiscoveryNode remoteClusterNode ) {
264- if (transportService .nodeConnected (remoteClusterNode )) {
265- return transportService .getConnection (remoteClusterNode );
276+ if (connectionManager .nodeConnected (remoteClusterNode )) {
277+ return connectionManager .getConnection (remoteClusterNode );
266278 }
267- DiscoveryNode discoveryNode = connectedNodes . getAny ();
268- Transport .Connection connection = transportService .getConnection (discoveryNode );
279+ DiscoveryNode discoveryNode = getAnyConnectedNode ();
280+ Transport .Connection connection = connectionManager .getConnection (discoveryNode );
269281 return new ProxyConnection (connection , remoteClusterNode );
270282 }
271283
@@ -317,33 +329,18 @@ public Version getVersion() {
317329 }
318330
319331 Transport .Connection getConnection () {
320- return transportService .getConnection (getAnyConnectedNode ());
332+ return connectionManager .getConnection (getAnyConnectedNode ());
321333 }
322334
323335 @ Override
324336 public void close () throws IOException {
325- connectHandler .close ();
337+ IOUtils .close (connectHandler , connectionManager );
326338 }
327339
328340 public boolean isClosed () {
329341 return connectHandler .isClosed ();
330342 }
331343
332- private ConnectionProfile getRemoteProfile (ClusterName name ) {
333- // we can only compare the cluster name to make a decision if we should use a remote profile
334- // we can't use a cluster UUID here since we could be connecting to that remote cluster before
335- // the remote node has joined its cluster and have a cluster UUID. The fact that we just lose a
336- // rather smallish optimization on the connection layer under certain situations where remote clusters
337- // have the same name as the local one is minor here.
338- // the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster,
339- // gained a cluster UUID and then start connecting etc. we rather use this simplification in order to maintain simplicity
340- if (this .localClusterName .equals (name )) {
341- return null ;
342- } else {
343- return remoteProfile ;
344- }
345- }
346-
347344 /**
348345 * The connect handler manages node discovery and the actual connect to the remote cluster.
349346 * There is at most one connect job running at any time. If such a connect job is triggered
@@ -387,7 +384,7 @@ private void connect(ActionListener<Void> connectListener, boolean forceRun) {
387384 final boolean runConnect ;
388385 final Collection <ActionListener <Void >> toNotify ;
389386 final ActionListener <Void > listener = connectListener == null ? null :
390- ContextPreservingActionListener .wrapPreservingContext (connectListener , transportService . getThreadPool () .getThreadContext ());
387+ ContextPreservingActionListener .wrapPreservingContext (connectListener , threadPool .getThreadContext ());
391388 synchronized (queue ) {
392389 if (listener != null && queue .offer (listener ) == false ) {
393390 listener .onFailure (new RejectedExecutionException ("connect queue is full" ));
@@ -415,7 +412,6 @@ private void connect(ActionListener<Void> connectListener, boolean forceRun) {
415412 }
416413
417414 private void forkConnect (final Collection <ActionListener <Void >> toNotify ) {
418- ThreadPool threadPool = transportService .getThreadPool ();
419415 ExecutorService executor = threadPool .executor (ThreadPool .Names .MANAGEMENT );
420416 executor .submit (new AbstractRunnable () {
421417 @ Override
@@ -452,13 +448,13 @@ protected void doRun() {
452448 maybeConnect ();
453449 }
454450 });
455- collectRemoteNodes (seedNodes .iterator (), transportService , listener );
451+ collectRemoteNodes (seedNodes .iterator (), transportService , connectionManager , listener );
456452 }
457453 });
458454 }
459455
460456 private void collectRemoteNodes (Iterator <Supplier <DiscoveryNode >> seedNodes ,
461- final TransportService transportService , ActionListener <Void > listener ) {
457+ final TransportService transportService , final ConnectionManager manager , ActionListener <Void > listener ) {
462458 if (Thread .currentThread ().isInterrupted ()) {
463459 listener .onFailure (new InterruptedException ("remote connect thread got interrupted" ));
464460 }
@@ -467,7 +463,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
467463 cancellableThreads .executeIO (() -> {
468464 final DiscoveryNode seedNode = seedNodes .next ().get ();
469465 final TransportService .HandshakeResponse handshakeResponse ;
470- Transport .Connection connection = transportService .openConnection (seedNode ,
466+ Transport .Connection connection = manager .openConnection (seedNode ,
471467 ConnectionProfile .buildSingleChannelProfile (TransportRequestOptions .Type .REG , null , null ));
472468 boolean success = false ;
473469 try {
@@ -482,7 +478,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
482478
483479 final DiscoveryNode handshakeNode = handshakeResponse .getDiscoveryNode ();
484480 if (nodePredicate .test (handshakeNode ) && connectedNodes .size () < maxNumRemoteConnections ) {
485- transportService .connectToNode (handshakeNode , getRemoteProfile ( handshakeResponse . getClusterName () ));
481+ manager .connectToNode (handshakeNode , remoteProfile , transportService . connectionValidator ( handshakeNode ));
486482 if (remoteClusterName .get () == null ) {
487483 assert handshakeResponse .getClusterName ().value () != null ;
488484 remoteClusterName .set (handshakeResponse .getClusterName ());
@@ -524,7 +520,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
524520 // ISE if we fail the handshake with an version incompatible node
525521 if (seedNodes .hasNext ()) {
526522 logger .debug (() -> new ParameterizedMessage ("fetching nodes from external cluster {} failed" , clusterAlias ), ex );
527- collectRemoteNodes (seedNodes , transportService , listener );
523+ collectRemoteNodes (seedNodes , transportService , manager , listener );
528524 } else {
529525 listener .onFailure (ex );
530526 }
@@ -552,7 +548,6 @@ final boolean isClosed() {
552548 /* This class handles the _state response from the remote cluster when sniffing nodes to connect to */
553549 private class SniffClusterStateResponseHandler implements TransportResponseHandler <ClusterStateResponse > {
554550
555- private final TransportService transportService ;
556551 private final Transport .Connection connection ;
557552 private final ActionListener <Void > listener ;
558553 private final Iterator <Supplier <DiscoveryNode >> seedNodes ;
@@ -561,7 +556,6 @@ private class SniffClusterStateResponseHandler implements TransportResponseHandl
561556 SniffClusterStateResponseHandler (TransportService transportService , Transport .Connection connection ,
562557 ActionListener <Void > listener , Iterator <Supplier <DiscoveryNode >> seedNodes ,
563558 CancellableThreads cancellableThreads ) {
564- this .transportService = transportService ;
565559 this .connection = connection ;
566560 this .listener = listener ;
567561 this .seedNodes = seedNodes ;
@@ -592,8 +586,8 @@ public void handleResponse(ClusterStateResponse response) {
592586 for (DiscoveryNode node : nodesIter ) {
593587 if (nodePredicate .test (node ) && connectedNodes .size () < maxNumRemoteConnections ) {
594588 try {
595- transportService .connectToNode (node , getRemoteProfile ( remoteClusterName . get ())); // noop if node is
596- // connected
589+ connectionManager .connectToNode (node , remoteProfile ,
590+ transportService . connectionValidator ( node )); // noop if node is connected
597591 connectedNodes .add (node );
598592 } catch (ConnectTransportException | IllegalStateException ex ) {
599593 // ISE if we fail the handshake with an version incompatible node
@@ -609,7 +603,7 @@ public void handleResponse(ClusterStateResponse response) {
609603 listener .onFailure (ex ); // we got canceled - fail the listener and step out
610604 } catch (Exception ex ) {
611605 logger .warn (() -> new ParameterizedMessage ("fetching nodes from external cluster {} failed" , clusterAlias ), ex );
612- collectRemoteNodes (seedNodes , transportService , listener );
606+ collectRemoteNodes (seedNodes , transportService , connectionManager , listener );
613607 }
614608 }
615609
@@ -620,7 +614,7 @@ public void handleException(TransportException exp) {
620614 IOUtils .closeWhileHandlingException (connection );
621615 } finally {
622616 // once the connection is closed lets try the next node
623- collectRemoteNodes (seedNodes , transportService , listener );
617+ collectRemoteNodes (seedNodes , transportService , connectionManager , listener );
624618 }
625619 }
626620
@@ -715,4 +709,8 @@ private synchronized void ensureIteratorAvailable() {
715709 }
716710 }
717711 }
712+
713+ ConnectionManager getConnectionManager () {
714+ return connectionManager ;
715+ }
718716}
0 commit comments