diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index a8e30c00d2155..ded49f6028b20 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -30,7 +30,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.unit.TimeValue; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -103,6 +102,17 @@ protected RemoteClusterAware(Settings settings) { this.clusterNameResolver = new ClusterNameExpressionResolver(); } + /** + * Returns remote clusters that are enabled in these settings + */ + protected static Set getEnabledRemoteClusters(final Settings settings) { + final Stream>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings); + return allConcreteSettings + .map(REMOTE_CLUSTERS_SEEDS::getNamespace) + .filter(clusterAlias -> RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, settings)) + .collect(Collectors.toSet()); + } + /** * Builds the dynamic per-cluster config from the given settings. This is a map keyed by the cluster alias that points to a tuple * (ProxyAddresss, [SeedNodeSuppliers]). If a cluster is configured with a proxy address all seed nodes will point to @@ -177,29 +187,17 @@ protected Map> groupClusterIndices(Set remoteCluste return perClusterIndices; } - void updateRemoteCluster(String clusterAlias, List addresses, String proxy) { - Boolean compress = TransportSettings.TRANSPORT_COMPRESS.get(settings); - TimeValue pingSchedule = TransportSettings.PING_SCHEDULE.get(settings); - updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule); - } - - void updateRemoteCluster(String clusterAlias, Settings settings) { - String proxy = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings); - List addresses = REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings); - Boolean compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings); - TimeValue pingSchedule = RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE - .getConcreteSettingForNamespace(clusterAlias) - .get(settings); - - updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule); + void validateAndUpdateRemoteCluster(String clusterAlias, Settings settings) { + if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { + throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); + } + updateRemoteCluster(clusterAlias, settings); } /** - * Subclasses must implement this to receive information about updated cluster aliases. If the given address list is - * empty the cluster alias is unregistered and should be removed. + * Subclasses must implement this to receive information about updated cluster aliases. */ - protected abstract void updateRemoteCluster(String clusterAlias, List addresses, String proxy, boolean compressionEnabled, - TimeValue pingSchedule); + protected abstract void updateRemoteCluster(String clusterAlias, Settings settings); /** * Registers this instance to listen to updates on the cluster settings. @@ -208,7 +206,7 @@ public void listenForUpdates(ClusterSettings clusterSettings) { List> remoteClusterSettings = Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_PROXY, RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, RemoteClusterService.REMOTE_CLUSTER_COMPRESS, RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE); - clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::updateRemoteCluster); + clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster); } static InetSocketAddress parseSeedAddress(String remoteHost) { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index dd3cd5ea07a5a..d7f5896838b6d 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -25,7 +25,6 @@ import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -36,11 +35,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.Collections; -import java.util.List; import java.util.function.Function; -import java.util.function.Predicate; -import java.util.function.Supplier; -import java.util.stream.Collectors; /** * Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the @@ -61,10 +56,7 @@ final class RemoteClusterConnection implements Closeable { private final RemoteConnectionManager remoteConnectionManager; private final RemoteConnectionStrategy connectionStrategy; private final String clusterAlias; - private final int maxNumRemoteConnections; private final ThreadPool threadPool; - private final List>> seedNodes; - private final String proxyAddress; private volatile boolean skipUnavailable; private final TimeValue initialConnectionTimeout; @@ -72,35 +64,21 @@ final class RemoteClusterConnection implements Closeable { * Creates a new {@link RemoteClusterConnection} * @param settings the nodes settings object * @param clusterAlias the configured alias of the cluster to connect to - * @param seedNodes a list of seed nodes to discover eligible nodes from * @param transportService the local nodes transport service - * @param maxNumRemoteConnections the maximum number of connections to the remote cluster - * @param nodePredicate a predicate to filter eligible remote nodes to connect to - * @param proxyAddress the proxy address - * @param connectionProfile the connection profile to use */ - RemoteClusterConnection(Settings settings, String clusterAlias, List>> seedNodes, - TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate, - String proxyAddress, ConnectionProfile connectionProfile) { - this(settings, clusterAlias, seedNodes, transportService, maxNumRemoteConnections, nodePredicate, proxyAddress, - createConnectionManager(connectionProfile, transportService)); + RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService) { + this(settings, clusterAlias, transportService, + createConnectionManager(buildConnectionProfileFromSettings(settings, clusterAlias), transportService)); } - // Public for tests to pass a StubbableConnectionManager - RemoteClusterConnection(Settings settings, String clusterAlias, List>> seedNodes, - TransportService transportService, int maxNumRemoteConnections, Predicate nodePredicate, - String proxyAddress, ConnectionManager connectionManager) { + RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService, + ConnectionManager connectionManager) { this.transportService = transportService; - this.maxNumRemoteConnections = maxNumRemoteConnections; this.clusterAlias = clusterAlias; this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); - this.connectionStrategy = new SniffConnectionStrategy(clusterAlias, transportService, remoteConnectionManager, - proxyAddress, maxNumRemoteConnections, nodePredicate, - Collections.unmodifiableList(seedNodes)); + this.connectionStrategy = new SniffConnectionStrategy(clusterAlias, transportService, remoteConnectionManager, settings); // we register the transport service here as a listener to make sure we notify handlers on disconnect etc. connectionManager.addListener(transportService); - this.seedNodes = Collections.unmodifiableList(seedNodes); - this.proxyAddress = proxyAddress; this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE .getConcreteSettingForNamespace(clusterAlias).get(settings); this.threadPool = transportService.threadPool; @@ -125,11 +103,11 @@ boolean isSkipUnavailable() { * Ensures that this cluster is connected. If the cluster is connected this operation * will invoke the listener immediately. */ - void ensureConnected(ActionListener voidActionListener) { + void ensureConnected(ActionListener listener) { if (remoteConnectionManager.size() == 0) { - connectionStrategy.connect(voidActionListener); + connectionStrategy.connect(listener); } else { - voidActionListener.onResponse(null); + listener.onResponse(null); } } @@ -215,14 +193,6 @@ public boolean isClosed() { return connectionStrategy.isClosed(); } - List>> getSeedNodes() { - return seedNodes; - } - - String getProxyAddress() { - return proxyAddress; - } - // for testing only boolean assertNoRunningConnections() { return connectionStrategy.assertNoRunningConnections(); @@ -236,13 +206,24 @@ boolean isNodeConnected(final DiscoveryNode node) { * Get the information about remote nodes to be rendered on {@code _remote/info} requests. */ public RemoteConnectionInfo getConnectionInfo() { - return new RemoteConnectionInfo( + if (connectionStrategy instanceof SniffConnectionStrategy) { + SniffConnectionStrategy sniffStrategy = (SniffConnectionStrategy) this.connectionStrategy; + return new RemoteConnectionInfo( + clusterAlias, + sniffStrategy.getSeedNodes(), + sniffStrategy.getMaxConnections(), + getNumNodesConnected(), + initialConnectionTimeout, + skipUnavailable); + } else { + return new RemoteConnectionInfo( clusterAlias, - seedNodes.stream().map(Tuple::v1).collect(Collectors.toList()), - maxNumRemoteConnections, + Collections.emptyList(), + 0, getNumNodesConnected(), initialConnectionTimeout, skipUnavailable); + } } int getNumNodesConnected() { @@ -256,4 +237,22 @@ private static ConnectionManager createConnectionManager(ConnectionProfile conne ConnectionManager getConnectionManager() { return remoteConnectionManager.getConnectionManager(); } + + public boolean shouldRebuildConnection(Settings newSettings) { + return connectionStrategy.shouldRebuildConnection(newSettings); + } + + static ConnectionProfile buildConnectionProfileFromSettings(Settings settings, String clusterName) { + return new ConnectionProfile.Builder() + .setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) + .setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) + .addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable? + // we don't want this to be used for anything else but search + .addConnections(0, TransportRequestOptions.Type.BULK, + TransportRequestOptions.Type.STATE, + TransportRequestOptions.Type.RECOVERY) + .setCompressionEnabled(RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterName).get(settings)) + .setPingInterval(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterName).get(settings)) + .build(); + } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 465ff575f8d63..fcb2c6fca61bc 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -21,20 +21,19 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.threadpool.ThreadPool; @@ -42,19 +41,15 @@ import java.io.Closeable; import java.io.IOException; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiFunction; import java.util.function.Function; -import java.util.function.Predicate; -import java.util.function.Supplier; -import java.util.stream.Collectors; import java.util.stream.Stream; import static org.elasticsearch.common.settings.Setting.boolSetting; @@ -74,20 +69,20 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl * seed node, other nodes will be discovered up to the given number of nodes in this setting. The default is 3. */ public static final Setting REMOTE_CONNECTIONS_PER_CLUSTER = - Setting.intSetting( - "cluster.remote.connections_per_cluster", - 3, - 1, - Setting.Property.NodeScope); + Setting.intSetting( + "cluster.remote.connections_per_cluster", + 3, + 1, + Setting.Property.NodeScope); /** * The initial connect timeout for remote cluster connections */ public static final Setting REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING = - Setting.positiveTimeSetting( - "cluster.remote.initial_connect_timeout", - TimeValue.timeValueSeconds(30), - Setting.Property.NodeScope); + Setting.positiveTimeSetting( + "cluster.remote.initial_connect_timeout", + TimeValue.timeValueSeconds(30), + Setting.Property.NodeScope); /** * The name of a node attribute to select nodes that should be connected to in the remote cluster. @@ -96,7 +91,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl * The value of the setting is expected to be a boolean, {@code true} for nodes that can become gateways, {@code false} otherwise. */ public static final Setting REMOTE_NODE_ATTRIBUTE = - Setting.simpleString("cluster.remote.node.attr", Setting.Property.NodeScope); + Setting.simpleString("cluster.remote.node.attr", Setting.Property.NodeScope); /** * If true connecting to remote clusters is supported on this node. If false this node will not establish @@ -104,127 +99,40 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl * will fail if remote cluster syntax is used as an index pattern. The default is true */ public static final Setting ENABLE_REMOTE_CLUSTERS = - Setting.boolSetting( - "cluster.remote.connect", - true, - Setting.Property.NodeScope); + Setting.boolSetting( + "cluster.remote.connect", + true, + Setting.Property.NodeScope); public static final Setting.AffixSetting REMOTE_CLUSTER_SKIP_UNAVAILABLE = - Setting.affixKeySetting( - "cluster.remote.", - "skip_unavailable", - key -> boolSetting( - key, - false, - Setting.Property.Dynamic, - Setting.Property.NodeScope), - REMOTE_CLUSTERS_SEEDS); - - public static final Setting.AffixSetting REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting( + Setting.affixKeySetting( "cluster.remote.", - "transport.ping_schedule", - key -> timeSetting(key, TransportSettings.PING_SCHEDULE, Setting.Property.Dynamic, Setting.Property.NodeScope), + "skip_unavailable", + key -> boolSetting( + key, + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope), REMOTE_CLUSTERS_SEEDS); + public static final Setting.AffixSetting REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting( + "cluster.remote.", + "transport.ping_schedule", + key -> timeSetting(key, TransportSettings.PING_SCHEDULE, Setting.Property.Dynamic, Setting.Property.NodeScope), + REMOTE_CLUSTERS_SEEDS); + public static final Setting.AffixSetting REMOTE_CLUSTER_COMPRESS = Setting.affixKeySetting( "cluster.remote.", "transport.compress", key -> boolSetting(key, TransportSettings.TRANSPORT_COMPRESS, Setting.Property.Dynamic, Setting.Property.NodeScope), REMOTE_CLUSTERS_SEEDS); - private static final Predicate DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion()) - && (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode()); - private final TransportService transportService; - private final int numRemoteConnections; - private volatile Map remoteClusters = Collections.emptyMap(); - private volatile Map remoteClusterConnectionProfiles = Collections.emptyMap(); + private final Map remoteClusters = ConcurrentCollections.newConcurrentMap(); RemoteClusterService(Settings settings, TransportService transportService) { super(settings); this.transportService = transportService; - numRemoteConnections = REMOTE_CONNECTIONS_PER_CLUSTER.get(settings); - } - - /** - * This method updates the list of remote clusters. It's intended to be used as an update consumer on the settings infrastructure - * @param seeds a cluster alias to discovery node mapping representing the remote clusters seeds nodes - * @param connectionListener a listener invoked once every configured cluster has been connected to - */ - private synchronized void updateRemoteClusters(Map>>>> seeds, - ActionListener connectionListener) { - if (seeds.containsKey(LOCAL_CLUSTER_GROUP_KEY)) { - throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); - } - Map remoteClusters = new HashMap<>(); - if (seeds.isEmpty()) { - connectionListener.onResponse(null); - } else { - CountDown countDown = new CountDown(seeds.size()); - remoteClusters.putAll(this.remoteClusters); - for (Map.Entry>>>> entry : seeds.entrySet()) { - List>> seedList = entry.getValue().v2(); - String proxyAddress = entry.getValue().v1(); - - String clusterAlias = entry.getKey(); - RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias); - ConnectionProfile connectionProfile = this.remoteClusterConnectionProfiles.get(clusterAlias); - if (seedList.isEmpty()) { // with no seed nodes we just remove the connection - try { - IOUtils.close(remote); - } catch (IOException e) { - logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e); - } - remoteClusters.remove(clusterAlias); - continue; - } - - if (remote == null) { // this is a new cluster we have to add a new representation - remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, numRemoteConnections, - getNodePredicate(settings), proxyAddress, connectionProfile); - remoteClusters.put(clusterAlias, remote); - } else if (connectionProfileChanged(remote.getConnectionManager().getConnectionProfile(), connectionProfile) - || seedsChanged(remote.getSeedNodes(), seedList) || proxyChanged(proxyAddress, remote.getProxyAddress())) { - // New ConnectionProfile. Must tear down existing connection - try { - IOUtils.close(remote); - } catch (IOException e) { - logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e); - } - remoteClusters.remove(clusterAlias); - remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, numRemoteConnections, - getNodePredicate(settings), proxyAddress, connectionProfile); - remoteClusters.put(clusterAlias, remote); - } - - // now update the seed nodes no matter if it's new or already existing - RemoteClusterConnection finalRemote = remote; - remote.ensureConnected(ActionListener.wrap( - response -> { - if (countDown.countDown()) { - connectionListener.onResponse(response); - } - }, - exception -> { - if (countDown.fastForward()) { - connectionListener.onFailure(exception); - } - if (finalRemote.isClosed() == false) { - logger.warn("failed to update seed list for cluster: " + clusterAlias, exception); - } - })); - } - } - this.remoteClusters = Collections.unmodifiableMap(remoteClusters); - } - - static Predicate getNodePredicate(Settings settings) { - if (REMOTE_NODE_ATTRIBUTE.exists(settings)) { - // nodes can be tagged with node.attr.remote_gateway: true to allow a node to be a gateway node for cross cluster search - String attribute = REMOTE_NODE_ATTRIBUTE.get(settings); - return DEFAULT_NODE_PREDICATE.and((node) -> Booleans.parseBoolean(node.getAttributes().getOrDefault(attribute, "false"))); - } - return DEFAULT_NODE_PREDICATE; } /** @@ -276,6 +184,7 @@ public Set getRegisteredRemoteClusterNames() { /** * Returns a connection to the given node on the given remote cluster + * * @throws IllegalArgumentException if the remote cluster is unknown */ public Transport.Connection getConnection(DiscoveryNode node, String cluster) { @@ -327,37 +236,69 @@ private synchronized void updateSkipUnavailable(String clusterAlias, Boolean ski } @Override - protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress, boolean compressionEnabled, - TimeValue pingSchedule) { + protected void updateRemoteCluster(String clusterAlias, Settings settings) { + if (remoteClusters.containsKey(clusterAlias) == false) { + CountDownLatch latch = new CountDownLatch(1); + updateRemoteCluster(clusterAlias, settings, ActionListener.wrap(latch::countDown)); + + try { + // Wait 10 seconds for a new cluster. We must use a latch instead of a future because we + // are on the cluster state thread and our custom future implementation will throw an + // assertion. + if (latch.await(10, TimeUnit.SECONDS) == false) { + logger.warn("failed to connect to new remote cluster {} within {}", clusterAlias, TimeValue.timeValueSeconds(10)); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } else { + updateRemoteCluster(clusterAlias, settings, noopListener); + } + } + + /** + * This method updates the list of remote clusters. It's intended to be used as an update consumer on the settings infrastructure + * + * @param clusterAlias a cluster alias to discovery node mapping representing the remote clusters seeds nodes + * @param newSettings the updated settings for the remote connection + * @param listener a listener invoked once every configured cluster has been connected to + */ + synchronized void updateRemoteCluster(String clusterAlias, Settings newSettings, ActionListener listener) { if (LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { throw new IllegalArgumentException("remote clusters must not have the empty string as its key"); } - ConnectionProfile oldProfile = remoteClusterConnectionProfiles.get(clusterAlias); - ConnectionProfile newProfile; - if (oldProfile != null) { - ConnectionProfile.Builder builder = new ConnectionProfile.Builder(oldProfile); - builder.setCompressionEnabled(compressionEnabled); - builder.setPingInterval(pingSchedule); - newProfile = builder.build(); - } else { - ConnectionProfile.Builder builder = new ConnectionProfile.Builder(buildConnectionProfileFromSettings(clusterAlias)); - builder.setCompressionEnabled(compressionEnabled); - builder.setPingInterval(pingSchedule); - newProfile = builder.build(); + + RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias); + if (RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, newSettings) == false) { + try { + IOUtils.close(remote); + } catch (IOException e) { + logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e); + } + remoteClusters.remove(clusterAlias); + listener.onResponse(null); + return; + } + + // this is a new cluster we have to add a new representation + if (remote == null) { + Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build(); + remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService); + remoteClusters.put(clusterAlias, remote); + } else if (remote.shouldRebuildConnection(newSettings)) { + // New ConnectionProfile. Must tear down existing connection + try { + IOUtils.close(remote); + } catch (IOException e) { + logger.warn("failed to close remote cluster connections for cluster: " + clusterAlias, e); + } + remoteClusters.remove(clusterAlias); + Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build(); + remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService); + remoteClusters.put(clusterAlias, remote); } - updateRemoteCluster(clusterAlias, addresses, proxyAddress, newProfile, noopListener); - } - void updateRemoteCluster(final String clusterAlias, final List addresses, final String proxyAddress, - final ConnectionProfile connectionProfile, final ActionListener connectionListener) { - HashMap connectionProfiles = new HashMap<>(remoteClusterConnectionProfiles); - connectionProfiles.put(clusterAlias, connectionProfile); - this.remoteClusterConnectionProfiles = Collections.unmodifiableMap(connectionProfiles); - final List>> nodes = - addresses.stream().>>map(address -> Tuple.tuple(address, () -> - buildSeedNode(clusterAlias, address, Strings.hasLength(proxyAddress))) - ).collect(Collectors.toList()); - updateRemoteClusters(Collections.singletonMap(clusterAlias, new Tuple<>(proxyAddress, nodes)), connectionListener); + remote.ensureConnected(listener); } /** @@ -366,11 +307,22 @@ void updateRemoteCluster(final String clusterAlias, final List addresses */ void initializeRemoteClusters() { final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); - final PlainActionFuture future = new PlainActionFuture<>(); - Map>>>> seeds = - RemoteClusterAware.buildRemoteClustersDynamicConfig(settings); - initializeConnectionProfiles(seeds.keySet()); - updateRemoteClusters(seeds, future); + final PlainActionFuture> future = new PlainActionFuture<>(); + Set enabledClusters = RemoteClusterAware.getEnabledRemoteClusters(settings); + + if (enabledClusters.isEmpty()) { + return; + } + + GroupedActionListener listener = new GroupedActionListener<>(future, enabledClusters.size()); + for (String clusterAlias : enabledClusters) { + updateRemoteCluster(clusterAlias, settings, listener); + } + + if (enabledClusters.isEmpty()) { + future.onResponse(null); + } + try { future.get(timeValue.millis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { @@ -382,32 +334,6 @@ void initializeRemoteClusters() { } } - private synchronized void initializeConnectionProfiles(Set remoteClusters) { - Map connectionProfiles = new HashMap<>(remoteClusters.size()); - for (String clusterName : remoteClusters) { - connectionProfiles.put(clusterName, buildConnectionProfileFromSettings(clusterName)); - } - this.remoteClusterConnectionProfiles = Collections.unmodifiableMap(connectionProfiles); - } - - private ConnectionProfile buildConnectionProfileFromSettings(String clusterName) { - return buildConnectionProfileFromSettings(settings, clusterName); - } - - static ConnectionProfile buildConnectionProfileFromSettings(Settings settings, String clusterName) { - return new ConnectionProfile.Builder() - .setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) - .setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings)) - .addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable? - // we don't want this to be used for anything else but search - .addConnections(0, TransportRequestOptions.Type.BULK, - TransportRequestOptions.Type.STATE, - TransportRequestOptions.Type.RECOVERY) - .setCompressionEnabled(REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterName).get(settings)) - .setPingInterval(REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterName).get(settings)) - .build(); - } - @Override public void close() throws IOException { IOUtils.close(remoteClusters.values()); @@ -417,29 +343,6 @@ public Stream getRemoteConnectionInfos() { return remoteClusters.values().stream().map(RemoteClusterConnection::getConnectionInfo); } - private boolean connectionProfileChanged(ConnectionProfile oldProfile, ConnectionProfile newProfile) { - return Objects.equals(oldProfile.getCompressionEnabled(), newProfile.getCompressionEnabled()) == false - || Objects.equals(oldProfile.getPingInterval(), newProfile.getPingInterval()) == false; - } - - private boolean seedsChanged(final List>> oldSeedNodes, - final List>> newSeedNodes) { - if (oldSeedNodes.size() != newSeedNodes.size()) { - return true; - } - Set oldSeeds = oldSeedNodes.stream().map(Tuple::v1).collect(Collectors.toSet()); - Set newSeeds = newSeedNodes.stream().map(Tuple::v1).collect(Collectors.toSet()); - return oldSeeds.equals(newSeeds) == false; - } - - private boolean proxyChanged(String oldProxy, String newProxy) { - if (oldProxy == null || oldProxy.isEmpty()) { - return (newProxy == null || newProxy.isEmpty()) == false; - } - - return Objects.equals(oldProxy, newProxy) == false; - } - /** * Collects all nodes of the given clusters and returns / passes a (clusterAlias, nodeId) to {@link DiscoveryNode} * function on success. @@ -466,7 +369,7 @@ public void onResponse(Function nodeLookup) { } if (countDown.countDown()) { listener.onResponse((clusterAlias, nodeId) - -> clusterMap.getOrDefault(clusterAlias, nullFunction).apply(nodeId)); + -> clusterMap.getOrDefault(clusterAlias, nullFunction).apply(nodeId)); } } @@ -482,9 +385,9 @@ public void onFailure(Exception e) { /** * Returns a client to the remote cluster if the given cluster alias exists. - * @param threadPool the {@link ThreadPool} for the client - * @param clusterAlias the cluster alias the remote cluster is registered under * + * @param threadPool the {@link ThreadPool} for the client + * @param clusterAlias the cluster alias the remote cluster is registered under * @throws IllegalArgumentException if the given clusterAlias doesn't exist */ public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias) { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index d327a171920e0..19d89dee03d5b 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -26,6 +26,9 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.threadpool.ThreadPool; @@ -33,12 +36,27 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Locale; +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; public abstract class RemoteConnectionStrategy implements TransportConnectionListener, Closeable { + enum ConnectionStrategy { + SNIFF, + SIMPLE + } + + public static final Setting.AffixSetting REMOTE_CONNECTION_MODE = Setting.affixKeySetting( + "cluster.remote.", "mode", key -> new Setting<>( + key, + ConnectionStrategy.SNIFF.name(), + value -> ConnectionStrategy.valueOf(value.toUpperCase(Locale.ROOT)), + Setting.Property.Dynamic)); + + private static final Logger logger = LogManager.getLogger(RemoteConnectionStrategy.class); private static final int MAX_LISTENERS = 100; @@ -111,6 +129,41 @@ public void onFailure(Exception e) { } } + public static boolean isConnectionEnabled(String clusterAlias, Settings settings) { + ConnectionStrategy mode = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).get(settings); + if (mode.equals(ConnectionStrategy.SNIFF)) { + List seeds = RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings); + return seeds.isEmpty() == false; + } else { + return false; + } + } + + boolean shouldRebuildConnection(Settings newSettings) { + ConnectionStrategy newMode = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).get(newSettings); + if (newMode.equals(strategyType()) == false) { + return true; + } else { + Boolean compressionEnabled = RemoteClusterService.REMOTE_CLUSTER_COMPRESS + .getConcreteSettingForNamespace(clusterAlias) + .get(newSettings); + TimeValue pingSchedule = RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE + .getConcreteSettingForNamespace(clusterAlias) + .get(newSettings); + + ConnectionProfile oldProfile = connectionManager.getConnectionManager().getConnectionProfile(); + ConnectionProfile.Builder builder = new ConnectionProfile.Builder(oldProfile); + builder.setCompressionEnabled(compressionEnabled); + builder.setPingInterval(pingSchedule); + ConnectionProfile newProfile = builder.build(); + return connectionProfileChanged(oldProfile, newProfile) || strategyMustBeRebuilt(newSettings); + } + } + + protected abstract boolean strategyMustBeRebuilt(Settings newSettings); + + protected abstract ConnectionStrategy strategyType(); + @Override public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { if (shouldOpenMoreConnections()) { @@ -164,4 +217,9 @@ private List> getAndClearListeners() { } return result; } + + private boolean connectionProfileChanged(ConnectionProfile oldProfile, ConnectionProfile newProfile) { + return Objects.equals(oldProfile.getCompressionEnabled(), newProfile.getCompressionEnabled()) == false + || Objects.equals(oldProfile.getPingInterval(), newProfile.getPingInterval()) == false; + } } diff --git a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java index 24e9e18c8dc10..80260d909a0a1 100644 --- a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.CountDown; @@ -81,6 +82,16 @@ protected boolean shouldOpenMoreConnections() { return connectionManager.size() < maxNumRemoteConnections; } + @Override + protected boolean strategyMustBeRebuilt(Settings newSettings) { + return false; + } + + @Override + protected ConnectionStrategy strategyType() { + return ConnectionStrategy.SIMPLE; + } + @Override protected void connectImpl(ActionListener listener) { performSimpleConnectionProcess(addresses.iterator(), listener); @@ -125,7 +136,7 @@ public void onFailure(Exception e) { TransportAddress address = nextAddress(resolved); String id = clusterAlias + "#" + address; DiscoveryNode node = new DiscoveryNode(id, address, Version.CURRENT.minimumCompatibilityVersion()); - + connectionManager.connectToNode(node, profile, clusterNameValidator, new ActionListener<>() { @Override public void onResponse(Void v) { diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index f71ce576a3c22..c75c5ae13fc28 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; @@ -30,8 +31,11 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.internal.io.IOUtils; @@ -39,40 +43,83 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Objects; +import java.util.Set; import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; public class SniffConnectionStrategy extends RemoteConnectionStrategy { private static final Logger logger = LogManager.getLogger(SniffConnectionStrategy.class); - private final List>> seedNodes; + private static final Predicate DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion()) + && (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode()); + + + private final List configuredSeedNodes; + private final List> seedNodes; private final int maxNumRemoteConnections; private final Predicate nodePredicate; private final SetOnce remoteClusterName = new SetOnce<>(); private volatile String proxyAddress; + SniffConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, + Settings settings) { + this( + clusterAlias, + transportService, + connectionManager, + RemoteClusterAware.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings), + RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER.get(settings), + getNodePredicate(settings), + RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings)); + } + SniffConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, String proxyAddress, int maxNumRemoteConnections, Predicate nodePredicate, - List>> seedNodes) { + List configuredSeedNodes) { + this(clusterAlias, transportService, connectionManager, proxyAddress, maxNumRemoteConnections, nodePredicate, configuredSeedNodes, + configuredSeedNodes.stream().map(seedAddress -> + (Supplier) () -> resolveSeedNode(clusterAlias, seedAddress, proxyAddress)).collect(Collectors.toList())); + } + + SniffConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, + String proxyAddress, int maxNumRemoteConnections, Predicate nodePredicate, + List configuredSeedNodes, List> seedNodes) { super(clusterAlias, transportService, connectionManager); this.proxyAddress = proxyAddress; this.maxNumRemoteConnections = maxNumRemoteConnections; this.nodePredicate = nodePredicate; + this.configuredSeedNodes = configuredSeedNodes; this.seedNodes = seedNodes; } @Override - protected void connectImpl(ActionListener listener) { - collectRemoteNodes(seedNodes.stream().map(Tuple::v2).iterator(), listener); + protected boolean shouldOpenMoreConnections() { + return connectionManager.size() < maxNumRemoteConnections; } @Override - protected boolean shouldOpenMoreConnections() { - return connectionManager.size() < maxNumRemoteConnections; + protected boolean strategyMustBeRebuilt(Settings newSettings) { + String proxy = RemoteClusterAware.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(newSettings); + List addresses = RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(newSettings); + return seedsChanged(configuredSeedNodes, addresses) || proxyChanged(proxyAddress, proxy); + } + + @Override + protected ConnectionStrategy strategyType() { + return ConnectionStrategy.SNIFF; + } + + @Override + protected void connectImpl(ActionListener listener) { + collectRemoteNodes(seedNodes.iterator(), listener); } private void collectRemoteNodes(Iterator> seedNodes, ActionListener listener) { @@ -98,7 +145,7 @@ private void collectRemoteNodes(Iterator> seedNodes, Act listener.onFailure(e); }; - final DiscoveryNode seedNode = maybeAddProxyAddress(proxyAddress, seedNodes.next().get()); + final DiscoveryNode seedNode = seedNodes.next().get(); logger.debug("[{}] opening connection to seed node: [{}] proxy address: [{}]", clusterAlias, seedNode, proxyAddress); final ConnectionProfile profile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG); @@ -168,6 +215,14 @@ private void collectRemoteNodes(Iterator> seedNodes, Act } } + List getSeedNodes() { + return configuredSeedNodes; + } + + int getMaxConnections() { + return maxNumRemoteConnections; + } + /* This class handles the _state response from the remote cluster when sniffing nodes to connect to */ private class SniffClusterStateResponseHandler implements TransportResponseHandler { @@ -261,6 +316,38 @@ public String toString() { }; } + private static DiscoveryNode resolveSeedNode(String clusterAlias, String address, String proxyAddress) { + if (proxyAddress == null || proxyAddress.isEmpty()) { + TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address)); + return new DiscoveryNode(clusterAlias + "#" + transportAddress.toString(), transportAddress, + Version.CURRENT.minimumCompatibilityVersion()); + } else { + TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(proxyAddress)); + String hostName = address.substring(0, indexOfPortSeparator(address)); + return new DiscoveryNode("", clusterAlias + "#" + address, UUIDs.randomBase64UUID(), hostName, address, + transportAddress, Collections.singletonMap("server_name", hostName), DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT.minimumCompatibilityVersion()); + } + } + + // Default visibility for tests + static Predicate getNodePredicate(Settings settings) { + if (RemoteClusterService.REMOTE_NODE_ATTRIBUTE.exists(settings)) { + // nodes can be tagged with node.attr.remote_gateway: true to allow a node to be a gateway node for cross cluster search + String attribute = RemoteClusterService.REMOTE_NODE_ATTRIBUTE.get(settings); + return DEFAULT_NODE_PREDICATE.and((node) -> Booleans.parseBoolean(node.getAttributes().getOrDefault(attribute, "false"))); + } + return DEFAULT_NODE_PREDICATE; + } + + private static int indexOfPortSeparator(String remoteHost) { + int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300 + if (portSeparator == -1 || portSeparator == remoteHost.length()) { + throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead"); + } + return portSeparator; + } + private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, DiscoveryNode node) { if (proxyAddress == null || proxyAddress.isEmpty()) { return node; @@ -271,4 +358,21 @@ private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, Discovery .getHostAddress(), new TransportAddress(proxyInetAddress), node.getAttributes(), node.getRoles(), node.getVersion()); } } + + private boolean seedsChanged(final List oldSeedNodes, final List newSeedNodes) { + if (oldSeedNodes.size() != newSeedNodes.size()) { + return true; + } + Set oldSeeds = new HashSet<>(oldSeedNodes); + Set newSeeds = new HashSet<>(newSeedNodes); + return oldSeeds.equals(newSeeds) == false; + } + + private boolean proxyChanged(String oldProxy, String newProxy) { + if (oldProxy == null || oldProxy.isEmpty()) { + return (newProxy == null || newProxy.isEmpty()) == false; + } + + return Objects.equals(oldProxy, newProxy) == false; + } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 6a4dab7608f27..889c54824999d 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -40,7 +40,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; @@ -58,7 +57,6 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.StubbableConnectionManager; -import org.elasticsearch.test.transport.StubbableTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -71,7 +69,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -80,7 +77,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; -import java.util.function.Supplier; import java.util.stream.Collectors; import static java.util.Collections.emptyMap; @@ -94,7 +90,6 @@ public class RemoteClusterConnectionTests extends ESTestCase { private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); - private final ConnectionProfile profile = RemoteClusterService.buildConnectionProfileFromSettings(Settings.EMPTY, "cluster"); @Override public void tearDown() throws Exception { @@ -198,8 +193,9 @@ public void run() { service.acceptIncomingRequests(); CountDownLatch listenerCalled = new CountDownLatch(1); AtomicReference exceptionReference = new AtomicReference<>(); - try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) { + String clusterAlias = "test-cluster"; + Settings settings = buildSniffSettings(clusterAlias, seedNodes(seedNode)); + try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) { ActionListener listener = ActionListener.wrap(x -> { listenerCalled.countDown(); fail("expected exception"); @@ -223,16 +219,8 @@ public void run() { } } - private static List>> seedNodes(final DiscoveryNode... seedNodes) { - if (seedNodes.length == 0) { - return Collections.emptyList(); - } else if (seedNodes.length == 1) { - return Collections.singletonList(Tuple.tuple(seedNodes[0].toString(), () -> seedNodes[0])); - } else { - return Arrays.stream(seedNodes) - .map(s -> Tuple.tuple(s.toString(), (Supplier) () -> s)) - .collect(Collectors.toList()); - } + private static List seedNodes(final DiscoveryNode... seedNodes) { + return Arrays.stream(seedNodes).map(s -> s.getAddress().toString()).collect(Collectors.toList()); } public void testCloseWhileConcurrentlyConnecting() throws IOException, InterruptedException, BrokenBarrierException { @@ -246,14 +234,15 @@ public void testCloseWhileConcurrentlyConnecting() throws IOException, Interrupt knownNodes.add(discoverableTransport.getLocalDiscoNode()); knownNodes.add(seedTransport1.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); - List>> seedNodes = seedNodes(seedNode1, seedNode); + List seedNodes = seedNodes(seedNode1, seedNode); Collections.shuffle(seedNodes, random()); try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { service.start(); service.acceptIncomingRequests(); - try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true, null, profile)) { + String clusterAlias = "test-cluster"; + Settings settings = buildSniffSettings(clusterAlias, seedNodes); + try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) { int numThreads = randomIntBetween(4, 10); Thread[] threads = new Thread[numThreads]; CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); @@ -332,22 +321,24 @@ public void testGetConnectionInfo() throws Exception { knownNodes.add(transport3.getLocalDiscoNode()); knownNodes.add(transport2.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); - List>> seedNodes = seedNodes(node3, node1, node2); + List seedNodes = seedNodes(node3, node1, node2); Collections.shuffle(seedNodes, random()); try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { service.start(); service.acceptIncomingRequests(); int maxNumConnections = randomIntBetween(1, 5); - try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, maxNumConnections, n -> true, null, profile)) { + String clusterAlias = "test-cluster"; + Settings settings = Settings.builder().put(buildSniffSettings(clusterAlias, seedNodes)) + .put(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER.getKey(), maxNumConnections).build(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) { // test no nodes connected RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); assertNotNull(remoteConnectionInfo); assertEquals(0, remoteConnectionInfo.numNodesConnected); assertEquals(3, remoteConnectionInfo.seedNodes.size()); assertEquals(maxNumConnections, remoteConnectionInfo.connectionsPerCluster); - assertEquals("test-cluster", remoteConnectionInfo.clusterAlias); + assertEquals(clusterAlias, remoteConnectionInfo.clusterAlias); } } } @@ -429,8 +420,9 @@ public void testCollectNodes() throws Exception { try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { service.start(); service.acceptIncomingRequests(); - try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null, profile)) { + String clusterAlias = "test-cluster"; + Settings settings = buildSniffSettings(clusterAlias, seedNodes(seedNode)); + try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) { CountDownLatch responseLatch = new CountDownLatch(1); AtomicReference> reference = new AtomicReference<>(); AtomicReference failReference = new AtomicReference<>(); @@ -461,21 +453,24 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted List discoverableTransports = new CopyOnWriteArrayList<>(); try { final int numDiscoverableNodes = randomIntBetween(5, 20); - List>> discoverableNodes = new ArrayList<>(numDiscoverableNodes); + List discoverableNodes = new ArrayList<>(numDiscoverableNodes); for (int i = 0; i < numDiscoverableNodes; i++) { MockTransportService transportService = startTransport("discoverable_node" + i, knownNodes, Version.CURRENT); - discoverableNodes.add(Tuple.tuple("discoverable_node" + i, transportService::getLocalDiscoNode)); + discoverableNodes.add(transportService.getLocalNode()); discoverableTransports.add(transportService); } - List>> seedNodes = new CopyOnWriteArrayList<>(randomSubsetOf(discoverableNodes)); + List seedNodes = new CopyOnWriteArrayList<>(randomSubsetOf(discoverableNodes.stream() + .map(d -> d.getAddress().toString()).collect(Collectors.toList()))); Collections.shuffle(seedNodes, random()); try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { service.start(); service.acceptIncomingRequests(); - try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true, null, profile)) { + + String clusterAlias = "test-cluster"; + Settings settings = buildSniffSettings(clusterAlias, seedNodes); + try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) { final int numGetThreads = randomIntBetween(4, 10); final Thread[] getThreads = new Thread[numGetThreads]; final int numModifyingThreads = randomIntBetween(4, 10); @@ -507,7 +502,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted try { barrier.await(); for (int j = 0; j < numDisconnects; j++) { - DiscoveryNode node = randomFrom(discoverableNodes).v2().get(); + DiscoveryNode node = randomFrom(discoverableNodes); try { connection.getConnectionManager().getConnection(node); } catch (NodeNotConnectedException e) { @@ -574,8 +569,9 @@ public void sendRequest(long requestId, String action, TransportRequest request, service.start(); service.acceptIncomingRequests(); - try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(connectedNode), service, Integer.MAX_VALUE, n -> true, null, connectionManager)) { + String clusterAlias = "test-cluster"; + Settings settings = buildSniffSettings(clusterAlias, seedNodes(connectedNode)); + try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service, connectionManager)) { PlainActionFuture.get(fut -> connection.ensureConnected(ActionListener.map(fut, x -> null))); for (int i = 0; i < 10; i++) { //always a direct connection as the remote node is already connected @@ -599,88 +595,10 @@ public void sendRequest(long requestId, String action, TransportRequest request, } } - public void testLazyResolveTransportAddress() throws Exception { - List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); - MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - knownNodes.add(seedTransport.getLocalDiscoNode()); - knownNodes.add(discoverableTransport.getLocalDiscoNode()); - Collections.shuffle(knownNodes, random()); - - try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { - service.start(); - service.acceptIncomingRequests(); - CountDownLatch multipleResolveLatch = new CountDownLatch(2); - Tuple> seedSupplier = Tuple.tuple(seedNode.toString(), () -> { - multipleResolveLatch.countDown(); - return seedNode; - }); - try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedSupplier), service, Integer.MAX_VALUE, n -> true, null, profile)) { - PlainActionFuture firstConnect = PlainActionFuture.newFuture(); - connection.ensureConnected(firstConnect); - firstConnect.actionGet(); - // Closing connections leads to RemoteClusterConnection.ConnectHandler.collectRemoteNodes - connection.getConnectionManager().getConnection(seedNode).close(); - - assertTrue(multipleResolveLatch.await(30L, TimeUnit.SECONDS)); - } - } - } - } - - public static Transport getProxyTransport(ThreadPool threadPool, Map> nodeMap) { - if (nodeMap.isEmpty()) { - throw new IllegalArgumentException("nodeMap must be non-empty"); - } - - StubbableTransport stubbableTransport = new StubbableTransport(MockTransportService.newMockTransport(Settings.EMPTY, - Version.CURRENT, threadPool)); - stubbableTransport.setDefaultConnectBehavior((t, node, profile, listener) -> { - Map proxyMapping = nodeMap.get(node.getAddress().toString()); - if (proxyMapping == null) { - throw new IllegalStateException("no proxy mapping for node: " + node); - } - DiscoveryNode proxyNode = proxyMapping.get(node.getName()); - if (proxyNode == null) { - // this is a seednode - lets pick one randomly - assertEquals("seed node must not have a port in the hostname: " + node.getHostName(), - -1, node.getHostName().lastIndexOf(':')); - assertTrue("missing hostname: " + node, proxyMapping.containsKey(node.getHostName())); - // route by seed hostname - proxyNode = proxyMapping.get(node.getHostName()); - } - t.openConnection(proxyNode, profile, ActionListener.delegateFailure(listener, - (delegatedListener, connection) -> delegatedListener.onResponse( - new Transport.Connection() { - @Override - public DiscoveryNode getNode() { - return node; - } - - @Override - public void sendRequest(long requestId, String action, TransportRequest request, - TransportRequestOptions options) throws IOException { - connection.sendRequest(requestId, action, request, options); - } - - @Override - public void addCloseListener(ActionListener listener) { - connection.addCloseListener(listener); - } - - @Override - public boolean isClosed() { - return connection.isClosed(); - } - - @Override - public void close() { - connection.close(); - } - }))); - }); - return stubbableTransport; + private static Settings buildSniffSettings(String clusterAlias, List seedNodes) { + Settings.Builder builder = Settings.builder(); + builder.put(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).getKey(), + Strings.collectionToCommaDelimitedString(seedNodes)); + return builder.build(); } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 0de8a6bdeb00b..050e81fad5c3b 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -22,8 +22,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.AbstractScopedSettings; @@ -33,7 +33,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -44,19 +43,15 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; -import java.util.function.Predicate; import java.util.function.Supplier; -import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.either; @@ -285,17 +280,33 @@ public void testIncrementallyAddClusters() throws IOException { assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); assertFalse(service.isCrossClusterSearchEnabled()); - service.updateRemoteCluster("cluster_1", Collections.singletonList(cluster1Seed.getAddress().toString()), null); + Settings cluster1Settings = createSettings("cluster_1", + Collections.singletonList(cluster1Seed.getAddress().toString())); + PlainActionFuture clusterAdded = PlainActionFuture.newFuture(); + // Add the cluster on a different thread to test that we wait for a new cluster to + // connect before returning. + new Thread(() -> { + try { + service.validateAndUpdateRemoteCluster("cluster_1", cluster1Settings); + clusterAdded.onResponse(null); + } catch (Exception e) { + clusterAdded.onFailure(e); + } + }).start(); + clusterAdded.actionGet(); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); - service.updateRemoteCluster("cluster_2", Collections.singletonList(cluster2Seed.getAddress().toString()), null); + Settings cluster2Settings = createSettings("cluster_2", + Collections.singletonList(cluster2Seed.getAddress().toString())); + service.validateAndUpdateRemoteCluster("cluster_2", cluster2Settings); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); assertTrue(service.isRemoteClusterRegistered("cluster_2")); - service.updateRemoteCluster("cluster_2", Collections.emptyList(), null); + Settings cluster2SettingsDisabled = createSettings("cluster_2", Collections.emptyList()); + service.validateAndUpdateRemoteCluster("cluster_2", cluster2SettingsDisabled); assertFalse(service.isRemoteClusterRegistered("cluster_2")); IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, - () -> service.updateRemoteCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Collections.emptyList(), null)); + () -> service.validateAndUpdateRemoteCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Settings.EMPTY)); assertEquals("remote clusters must not have the empty string as its key", iae.getMessage()); } } @@ -325,7 +336,8 @@ public void testDefaultPingSchedule() throws IOException { assertFalse(service.isCrossClusterSearchEnabled()); service.initializeRemoteClusters(); assertTrue(service.isCrossClusterSearchEnabled()); - service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()), null); + service.validateAndUpdateRemoteCluster("cluster_1", + createSettings("cluster_1", Collections.singletonList(seedNode.getAddress().toString()))); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); @@ -397,7 +409,7 @@ public void testChangeSettings() throws Exception { boolean compressionEnabled = true; settingsChange.put("cluster.remote.cluster_1.transport.compress", compressionEnabled); settingsChange.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); - service.updateRemoteCluster("cluster_1", settingsChange.build()); + service.validateAndUpdateRemoteCluster("cluster_1", settingsChange.build()); assertBusy(remoteClusterConnection::isClosed); remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); @@ -453,15 +465,15 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { final CountDownLatch firstLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_1", - Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null, - genericProfile("cluster_1"), connectionListener(firstLatch)); + createSettings("cluster_1", Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString())), + connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", - Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null, - genericProfile("cluster_2"), connectionListener(secondLatch)); + createSettings("cluster_2", Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString())), + connectionListener(secondLatch)); secondLatch.await(); assertTrue(service.isCrossClusterSearchEnabled()); @@ -518,15 +530,15 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { final CountDownLatch firstLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_1", - Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null, - genericProfile("cluster_1"), connectionListener(firstLatch)); + createSettings("cluster_1", Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString())), + connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", - Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null, - genericProfile("cluster_2"), connectionListener(secondLatch)); + createSettings("cluster_2", Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString())), + connectionListener(secondLatch)); secondLatch.await(); assertTrue(service.isCrossClusterSearchEnabled()); @@ -591,15 +603,15 @@ public void testCollectNodes() throws InterruptedException, IOException { final CountDownLatch firstLatch = new CountDownLatch(1); service.updateRemoteCluster("cluster_1", - Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString()), null, - genericProfile("cluster_1"), connectionListener(firstLatch)); + createSettings("cluster_1", Arrays.asList(c1N1Node.getAddress().toString(), c1N2Node.getAddress().toString())), + connectionListener(firstLatch)); firstLatch.await(); final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_2", - Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString()), null, - genericProfile("cluster_2"), connectionListener(secondLatch)); + createSettings("cluster_2", Arrays.asList(c2N1Node.getAddress().toString(), c2N2Node.getAddress().toString())), + connectionListener(secondLatch)); secondLatch.await(); CountDownLatch latch = new CountDownLatch(1); service.collectNodes(new HashSet<>(Arrays.asList("cluster_1", "cluster_2")), @@ -734,113 +746,7 @@ public void testRemoteClusterSkipIfDisconnectedSetting() { } } - public void testGetNodePredicateNodeRoles() { - TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0); - Predicate nodePredicate = RemoteClusterService.getNodePredicate(Settings.EMPTY); - { - DiscoveryNode all = new DiscoveryNode("id", address, Collections.emptyMap(), - DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT); - assertTrue(nodePredicate.test(all)); - } - { - DiscoveryNode dataMaster = new DiscoveryNode("id", address, Collections.emptyMap(), - Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE), Version.CURRENT); - assertTrue(nodePredicate.test(dataMaster)); - } - { - DiscoveryNode dedicatedMaster = new DiscoveryNode("id", address, Collections.emptyMap(), - Set.of(DiscoveryNodeRole.MASTER_ROLE), Version.CURRENT); - assertFalse(nodePredicate.test(dedicatedMaster)); - } - { - DiscoveryNode dedicatedIngest = new DiscoveryNode("id", address, Collections.emptyMap(), - Set.of(DiscoveryNodeRole.INGEST_ROLE), Version.CURRENT); - assertTrue(nodePredicate.test(dedicatedIngest)); - } - { - DiscoveryNode masterIngest = new DiscoveryNode("id", address, Collections.emptyMap(), - Set.of(DiscoveryNodeRole.INGEST_ROLE, DiscoveryNodeRole.MASTER_ROLE), Version.CURRENT); - assertTrue(nodePredicate.test(masterIngest)); - } - { - DiscoveryNode dedicatedData = new DiscoveryNode("id", address, Collections.emptyMap(), - Set.of(DiscoveryNodeRole.DATA_ROLE), Version.CURRENT); - assertTrue(nodePredicate.test(dedicatedData)); - } - { - DiscoveryNode ingestData = new DiscoveryNode("id", address, Collections.emptyMap(), - Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.INGEST_ROLE), Version.CURRENT); - assertTrue(nodePredicate.test(ingestData)); - } - { - DiscoveryNode coordOnly = new DiscoveryNode("id", address, Collections.emptyMap(), - Set.of(), Version.CURRENT); - assertTrue(nodePredicate.test(coordOnly)); - } - } - - public void testGetNodePredicateNodeVersion() { - TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0); - Set roles = DiscoveryNodeRole.BUILT_IN_ROLES; - Predicate nodePredicate = RemoteClusterService.getNodePredicate(Settings.EMPTY); - Version version = VersionUtils.randomVersion(random()); - DiscoveryNode node = new DiscoveryNode("id", address, Collections.emptyMap(), roles, version); - assertThat(nodePredicate.test(node), equalTo(Version.CURRENT.isCompatible(version))); - } - - public void testGetNodePredicateNodeAttrs() { - TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0); - Set roles = DiscoveryNodeRole.BUILT_IN_ROLES; - Settings settings = Settings.builder().put("cluster.remote.node.attr", "gateway").build(); - Predicate nodePredicate = RemoteClusterService.getNodePredicate(settings); - { - DiscoveryNode nonGatewayNode = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"), - roles, Version.CURRENT); - assertFalse(nodePredicate.test(nonGatewayNode)); - assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(nonGatewayNode)); - } - { - DiscoveryNode gatewayNode = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"), - roles, Version.CURRENT); - assertTrue(nodePredicate.test(gatewayNode)); - assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(gatewayNode)); - } - { - DiscoveryNode noAttrNode = new DiscoveryNode("id", address, Collections.emptyMap(), roles, Version.CURRENT); - assertFalse(nodePredicate.test(noAttrNode)); - assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(noAttrNode)); - } - } - - public void testGetNodePredicatesCombination() { - TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0); - Settings settings = Settings.builder().put("cluster.remote.node.attr", "gateway").build(); - Predicate nodePredicate = RemoteClusterService.getNodePredicate(settings); - Set allRoles = DiscoveryNodeRole.BUILT_IN_ROLES; - Set dedicatedMasterRoles = Set.of(DiscoveryNodeRole.MASTER_ROLE); - { - DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"), - dedicatedMasterRoles, Version.CURRENT); - assertFalse(nodePredicate.test(node)); - } - { - DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"), - dedicatedMasterRoles, Version.CURRENT); - assertFalse(nodePredicate.test(node)); - } - { - DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"), - dedicatedMasterRoles, Version.CURRENT); - assertFalse(nodePredicate.test(node)); - } - { - DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"), - allRoles, Version.CURRENT); - assertTrue(nodePredicate.test(node)); - } - } - - public void testReconnectWhenSeedsNodesOrProxyAreUpdated() throws Exception { + public void testReconnectWhenStrategySettingsUpdated() throws Exception { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService cluster_node_0 = startTransport("cluster_node_0", knownNodes, Version.CURRENT); MockTransportService cluster_node_1 = startTransport("cluster_node_1", knownNodes, Version.CURRENT)) { @@ -872,8 +778,8 @@ public void testReconnectWhenSeedsNodesOrProxyAreUpdated() throws Exception { final CountDownLatch firstLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_test", - Collections.singletonList(node0.getAddress().toString()), null, - genericProfile("cluster_test"), connectionListener(firstLatch)); + createSettings("cluster_test", Collections.singletonList(node0.getAddress().toString())), + connectionListener(firstLatch)); firstLatch.await(); assertTrue(service.isCrossClusterSearchEnabled()); @@ -893,8 +799,8 @@ public void testReconnectWhenSeedsNodesOrProxyAreUpdated() throws Exception { final CountDownLatch secondLatch = new CountDownLatch(1); service.updateRemoteCluster( "cluster_test", - newSeeds, null, - genericProfile("cluster_test"), connectionListener(secondLatch)); + createSettings("cluster_test", newSeeds), + connectionListener(secondLatch)); secondLatch.await(); assertTrue(service.isCrossClusterSearchEnabled()); @@ -910,105 +816,11 @@ public void testReconnectWhenSeedsNodesOrProxyAreUpdated() throws Exception { assertTrue(secondRemoteClusterConnection.isNodeConnected(node1)); assertEquals(2, secondRemoteClusterConnection.getNumNodesConnected()); assertFalse(secondRemoteClusterConnection.isClosed()); - - final CountDownLatch thirdLatch = new CountDownLatch(1); - service.updateRemoteCluster( - "cluster_test", - newSeeds, node1.getAddress().toString(), - genericProfile("cluster_test"), connectionListener(thirdLatch)); - thirdLatch.await(); - - assertBusy(() -> { - assertFalse(secondRemoteClusterConnection.isNodeConnected(node0)); - assertFalse(secondRemoteClusterConnection.isNodeConnected(node1)); - assertEquals(0, secondRemoteClusterConnection.getNumNodesConnected()); - assertTrue(secondRemoteClusterConnection.isClosed()); - }); - - final RemoteClusterConnection thirdRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test"); - assertTrue(thirdRemoteClusterConnection.isNodeConnected(node1)); - // Will only successfully connect to node1 because the proxy address is to node1 and - // validation will fail when attempt to connect to node0 - assertFalse(thirdRemoteClusterConnection.isNodeConnected(node0)); - assertEquals(1, thirdRemoteClusterConnection.getNumNodesConnected()); - assertFalse(thirdRemoteClusterConnection.isClosed()); - } - } - } - } - - public void testRemoteClusterWithProxy() throws Exception { - List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService cluster_1_node0 = startTransport("cluster_1_node0", knownNodes, Version.CURRENT); - MockTransportService cluster_1_node_1 = startTransport("cluster_1_node1", knownNodes, Version.CURRENT); - MockTransportService cluster_2_node0 = startTransport("cluster_2_node0", Collections.emptyList(), Version.CURRENT)) { - knownNodes.add(cluster_1_node0.getLocalDiscoNode()); - knownNodes.add(cluster_1_node_1.getLocalDiscoNode()); - String cluster1Proxy = "1.1.1.1:99"; - String cluster2Proxy = "2.2.2.2:99"; - Map nodesCluster1 = new HashMap<>(); - nodesCluster1.put("cluster_1_node0", cluster_1_node0.getLocalDiscoNode()); - nodesCluster1.put("cluster_1_node1", cluster_1_node_1.getLocalDiscoNode()); - Map> mapping = new HashMap<>(); - mapping.put(cluster1Proxy, nodesCluster1); - mapping.put(cluster2Proxy, Collections.singletonMap("cluster_2_node0", cluster_2_node0.getLocalDiscoNode())); - - Collections.shuffle(knownNodes, random()); - Transport proxyTransport = RemoteClusterConnectionTests.getProxyTransport(threadPool, mapping); - try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, proxyTransport, - Version.CURRENT, threadPool, null, Collections.emptySet());) { - transportService.start(); - transportService.acceptIncomingRequests(); - Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_1.seeds", "cluster_1_node0:8080"); - builder.put("cluster.remote.cluster_1.proxy", cluster1Proxy); - try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { - assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); - assertTrue(service.isCrossClusterSearchEnabled()); - updateRemoteCluster(service, "cluster_1", Collections.singletonList("cluster_1_node1:8081"), cluster1Proxy); - assertTrue(service.isCrossClusterSearchEnabled()); - assertTrue(service.isRemoteClusterRegistered("cluster_1")); - assertFalse(service.isRemoteClusterRegistered("cluster_2")); - updateRemoteCluster(service, "cluster_2", Collections.singletonList("cluster_2_node0:9300"), cluster2Proxy); - assertTrue(service.isCrossClusterSearchEnabled()); - assertTrue(service.isRemoteClusterRegistered("cluster_1")); - assertTrue(service.isRemoteClusterRegistered("cluster_2")); - List infos = service.getRemoteConnectionInfos().collect(Collectors.toList()); - for (RemoteConnectionInfo info : infos) { - switch (info.clusterAlias) { - case "cluster_1": - assertEquals(2, info.numNodesConnected); - break; - case "cluster_2": - assertEquals(1, info.numNodesConnected); - break; - default: - fail("unknown cluster: " + info.clusterAlias); - } - } - service.updateRemoteCluster("cluster_2", Collections.emptyList(), randomBoolean() ? cluster2Proxy : null); - assertFalse(service.isRemoteClusterRegistered("cluster_2")); } } } } - private static void updateRemoteCluster(RemoteClusterService service, String clusterAlias, List addresses, String proxyAddress) - throws Exception { - CountDownLatch latch = new CountDownLatch(1); - AtomicReference exceptionAtomicReference = new AtomicReference<>(); - ActionListener listener = ActionListener.wrap(x -> latch.countDown(), x -> { - exceptionAtomicReference.set(x); - latch.countDown(); - }); - service.updateRemoteCluster(clusterAlias, addresses, proxyAddress, genericProfile(clusterAlias), listener); - latch.await(); - if (exceptionAtomicReference.get() != null) { - throw exceptionAtomicReference.get(); - } - } - public static void updateSkipUnavailable(RemoteClusterService service, String clusterAlias, boolean skipUnavailable) { RemoteClusterConnection connection = service.getRemoteClusterConnection(clusterAlias); connection.updateSkipUnavailable(skipUnavailable); @@ -1045,7 +857,10 @@ public void testSkipUnavailable() { } } - private static ConnectionProfile genericProfile(String clusterName) { - return RemoteClusterService.buildConnectionProfileFromSettings(Settings.EMPTY, clusterName); + private static Settings createSettings(String clusterAlias, List seeds) { + Settings.Builder builder = Settings.builder(); + builder.put(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).getKey(), + Strings.collectionToCommaDelimitedString(seeds)); + return builder.build(); } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java new file mode 100644 index 0000000000000..e669cfb1956a4 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -0,0 +1,105 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.elasticsearch.transport; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; + +import static org.mockito.Mockito.mock; + +public class RemoteConnectionStrategyTests extends ESTestCase { + + public void testStrategyChangeMeansThatStrategyMustBeRebuilt() { + ConnectionManager connectionManager = new ConnectionManager(Settings.EMPTY, mock(Transport.class)); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager); + FakeConnectionStrategy first = new FakeConnectionStrategy("cluster-alias", mock(TransportService.class), remoteConnectionManager, + RemoteConnectionStrategy.ConnectionStrategy.SIMPLE); + Settings newSettings = Settings.builder() + .put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace("cluster-alias").getKey(), "sniff") + .build(); + assertTrue(first.shouldRebuildConnection(newSettings)); + } + + public void testSameStrategyChangeMeansThatStrategyDoesNotNeedToBeRebuilt() { + ConnectionManager connectionManager = new ConnectionManager(Settings.EMPTY, mock(Transport.class)); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager); + FakeConnectionStrategy first = new FakeConnectionStrategy("cluster-alias", mock(TransportService.class), remoteConnectionManager, + RemoteConnectionStrategy.ConnectionStrategy.SIMPLE); + Settings newSettings = Settings.builder() + .put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace("cluster-alias").getKey(), "simple") + .build(); + assertFalse(first.shouldRebuildConnection(newSettings)); + } + + public void testChangeInConnectionProfileMeansTheStrategyMustBeRebuilt() { + ConnectionManager connectionManager = new ConnectionManager(TestProfiles.LIGHT_PROFILE, mock(Transport.class)); + assertEquals(TimeValue.MINUS_ONE, connectionManager.getConnectionProfile().getPingInterval()); + assertEquals(false, connectionManager.getConnectionProfile().getCompressionEnabled()); + RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager); + FakeConnectionStrategy first = new FakeConnectionStrategy("cluster-alias", mock(TransportService.class), remoteConnectionManager, + RemoteConnectionStrategy.ConnectionStrategy.SIMPLE); + + ConnectionProfile profile = connectionManager.getConnectionProfile(); + + Settings.Builder newBuilder = Settings.builder(); + newBuilder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace("cluster-alias").getKey(), "simple"); + if (randomBoolean()) { + newBuilder.put(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace("cluster-alias").getKey(), + TimeValue.timeValueSeconds(5)); + } else { + newBuilder.put(RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("cluster-alias").getKey(), true); + } + assertTrue(first.shouldRebuildConnection(newBuilder.build())); + } + + private static class FakeConnectionStrategy extends RemoteConnectionStrategy { + + private final ConnectionStrategy strategy; + + FakeConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, + RemoteConnectionStrategy.ConnectionStrategy strategy) { + super(clusterAlias, transportService, connectionManager); + this.strategy = strategy; + } + + @Override + protected boolean strategyMustBeRebuilt(Settings newSettings) { + return false; + } + + @Override + protected ConnectionStrategy strategyType() { + return this.strategy; + } + + @Override + protected boolean shouldOpenMoreConnections() { + return false; + } + + @Override + protected void connectImpl(ActionListener listener) { + + } + } +} diff --git a/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java index 68e2622c040fd..207f7ad0e3da7 100644 --- a/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java @@ -40,7 +40,7 @@ public class SimpleConnectionStrategyTests extends ESTestCase { private final String clusterAlias = "cluster-alias"; - private final ConnectionProfile profile = RemoteClusterService.buildConnectionProfileFromSettings(Settings.EMPTY, "cluster"); + private final ConnectionProfile profile = RemoteClusterConnection.buildConnectionProfileFromSettings(Settings.EMPTY, "cluster"); private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Override diff --git a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java index 5e901032b26fe..22dd57e105322 100644 --- a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java @@ -27,11 +27,14 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.StubbableTransport; import org.elasticsearch.threadpool.TestThreadPool; @@ -40,21 +43,24 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.endsWith; -import static org.hamcrest.Matchers.startsWith; +import static org.hamcrest.Matchers.equalTo; public class SniffConnectionStrategyTests extends ESTestCase { private final String clusterAlias = "cluster-alias"; - private final ConnectionProfile profile = RemoteClusterService.buildConnectionProfileFromSettings(Settings.EMPTY, "cluster"); + private final ConnectionProfile profile = RemoteClusterConnection.buildConnectionProfileFromSettings(Settings.EMPTY, "cluster"); private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Override @@ -130,6 +136,44 @@ public void testSniffStrategyWillConnectToAndDiscoverNodes() { } } + public void testSniffStrategyWillResolveDiscoveryNodesEachConnect() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalNode(); + DiscoveryNode discoverableNode = discoverableTransport.getLocalNode(); + knownNodes.add(seedNode); + knownNodes.add(discoverableNode); + Collections.shuffle(knownNodes, random()); + + CountDownLatch multipleResolveLatch = new CountDownLatch(2); + Supplier seedNodeSupplier = () -> { + multipleResolveLatch.countDown(); + return seedNode; + }; + + try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) { + localService.start(); + localService.acceptIncomingRequests(); + + ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager, + null, 3, n -> true, seedNodes(seedNode), Collections.singletonList(seedNodeSupplier))) { + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + strategy.connect(connectFuture); + connectFuture.actionGet(); + + // Closing connections leads to RemoteClusterConnection.ConnectHandler.collectRemoteNodes + connectionManager.getConnection(seedNode).close(); + + assertTrue(multipleResolveLatch.await(30L, TimeUnit.SECONDS)); + assertTrue(connectionManager.nodeConnected(discoverableNode)); + } + } + } + } + public void testSniffStrategyWillConnectToMaxAllowedNodesAndOpenNewConnectionsOnDisconnect() throws Exception { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); @@ -317,8 +361,7 @@ public void testClusterNameValidationPreventConnectingToDifferentClusters() thro strategy.connect(newConnect); IllegalStateException ise = expectThrows(IllegalStateException.class, newConnect::actionGet); assertThat(ise.getMessage(), allOf( - startsWith("handshake with [{other_seed}"), - containsString(otherSeedNode.toString()), + startsWith("handshake with [{cluster-alias#"), endsWith(" failed: remote cluster name [otherCluster] " + "does not match expected remote cluster name [" + clusterAlias + "]"))); @@ -414,8 +457,7 @@ public void testConfiguredProxyAddressModeWillReplaceNodeAddress() { } }); - Tuple> tuple = Tuple.tuple(accessibleNode.toString(), () -> unaddressableSeedNode); - List>> seedNodes = Collections.singletonList(tuple); + List seedNodes = Collections.singletonList(accessibleNode.toString()); TransportAddress proxyAddress = accessibleNode.getAddress(); ConnectionManager connectionManager = new ConnectionManager(profile, transport); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); @@ -439,15 +481,161 @@ public void testConfiguredProxyAddressModeWillReplaceNodeAddress() { } } - private static List>> seedNodes(final DiscoveryNode... seedNodes) { - if (seedNodes.length == 0) { - return Collections.emptyList(); - } else if (seedNodes.length == 1) { - return Collections.singletonList(Tuple.tuple(seedNodes[0].toString(), () -> seedNodes[0])); - } else { - return Arrays.stream(seedNodes) - .map(s -> Tuple.tuple(s.toString(), (Supplier) () -> s)) - .collect(Collectors.toList()); + public void testSniffStrategyWillNeedToBeRebuiltIfSeedsOrProxyChange() { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalNode(); + DiscoveryNode discoverableNode = discoverableTransport.getLocalNode(); + knownNodes.add(seedNode); + knownNodes.add(discoverableNode); + Collections.shuffle(knownNodes, random()); + + + try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) { + localService.start(); + localService.acceptIncomingRequests(); + + ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager, + null, 3, n -> true, seedNodes(seedNode))) { + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + strategy.connect(connectFuture); + connectFuture.actionGet(); + + assertTrue(connectionManager.nodeConnected(seedNode)); + assertTrue(connectionManager.nodeConnected(discoverableNode)); + assertTrue(strategy.assertNoRunningConnections()); + + Setting seedSetting = RemoteClusterService.REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace("cluster-alias"); + Setting proxySetting = RemoteClusterService.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace("cluster-alias"); + + Settings noChange = Settings.builder() + .put(seedSetting.getKey(), Strings.arrayToCommaDelimitedString(seedNodes(seedNode).toArray())) + .build(); + assertFalse(strategy.shouldRebuildConnection(noChange)); + Settings seedsChanged = Settings.builder() + .put(seedSetting.getKey(), Strings.arrayToCommaDelimitedString(seedNodes(discoverableNode).toArray())) + .build(); + assertTrue(strategy.shouldRebuildConnection(seedsChanged)); + Settings proxyChanged = Settings.builder() + .put(seedSetting.getKey(), Strings.arrayToCommaDelimitedString(seedNodes(seedNode).toArray())) + .put(proxySetting.getKey(), "proxy_address:9300") + .build(); + assertTrue(strategy.shouldRebuildConnection(proxyChanged)); + } + } + } + } + + public void testGetNodePredicateNodeRoles() { + TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0); + Predicate nodePredicate = SniffConnectionStrategy.getNodePredicate(Settings.EMPTY); + { + DiscoveryNode all = new DiscoveryNode("id", address, Collections.emptyMap(), + DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT); + assertTrue(nodePredicate.test(all)); + } + { + DiscoveryNode dataMaster = new DiscoveryNode("id", address, Collections.emptyMap(), + Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE), Version.CURRENT); + assertTrue(nodePredicate.test(dataMaster)); + } + { + DiscoveryNode dedicatedMaster = new DiscoveryNode("id", address, Collections.emptyMap(), + Set.of(DiscoveryNodeRole.MASTER_ROLE), Version.CURRENT); + assertFalse(nodePredicate.test(dedicatedMaster)); } + { + DiscoveryNode dedicatedIngest = new DiscoveryNode("id", address, Collections.emptyMap(), + Set.of(DiscoveryNodeRole.INGEST_ROLE), Version.CURRENT); + assertTrue(nodePredicate.test(dedicatedIngest)); + } + { + DiscoveryNode masterIngest = new DiscoveryNode("id", address, Collections.emptyMap(), + Set.of(DiscoveryNodeRole.INGEST_ROLE, DiscoveryNodeRole.MASTER_ROLE), Version.CURRENT); + assertTrue(nodePredicate.test(masterIngest)); + } + { + DiscoveryNode dedicatedData = new DiscoveryNode("id", address, Collections.emptyMap(), + Set.of(DiscoveryNodeRole.DATA_ROLE), Version.CURRENT); + assertTrue(nodePredicate.test(dedicatedData)); + } + { + DiscoveryNode ingestData = new DiscoveryNode("id", address, Collections.emptyMap(), + Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.INGEST_ROLE), Version.CURRENT); + assertTrue(nodePredicate.test(ingestData)); + } + { + DiscoveryNode coordOnly = new DiscoveryNode("id", address, Collections.emptyMap(), + Set.of(), Version.CURRENT); + assertTrue(nodePredicate.test(coordOnly)); + } + } + + public void testGetNodePredicateNodeVersion() { + TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0); + Set roles = DiscoveryNodeRole.BUILT_IN_ROLES; + Predicate nodePredicate = SniffConnectionStrategy.getNodePredicate(Settings.EMPTY); + Version version = VersionUtils.randomVersion(random()); + DiscoveryNode node = new DiscoveryNode("id", address, Collections.emptyMap(), roles, version); + assertThat(nodePredicate.test(node), equalTo(Version.CURRENT.isCompatible(version))); + } + + public void testGetNodePredicateNodeAttrs() { + TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0); + Set roles = DiscoveryNodeRole.BUILT_IN_ROLES; + Settings settings = Settings.builder().put("cluster.remote.node.attr", "gateway").build(); + Predicate nodePredicate = SniffConnectionStrategy.getNodePredicate(settings); + { + DiscoveryNode nonGatewayNode = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"), + roles, Version.CURRENT); + assertFalse(nodePredicate.test(nonGatewayNode)); + assertTrue(SniffConnectionStrategy.getNodePredicate(Settings.EMPTY).test(nonGatewayNode)); + } + { + DiscoveryNode gatewayNode = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"), + roles, Version.CURRENT); + assertTrue(nodePredicate.test(gatewayNode)); + assertTrue(SniffConnectionStrategy.getNodePredicate(Settings.EMPTY).test(gatewayNode)); + } + { + DiscoveryNode noAttrNode = new DiscoveryNode("id", address, Collections.emptyMap(), roles, Version.CURRENT); + assertFalse(nodePredicate.test(noAttrNode)); + assertTrue(SniffConnectionStrategy.getNodePredicate(Settings.EMPTY).test(noAttrNode)); + } + } + + public void testGetNodePredicatesCombination() { + TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0); + Settings settings = Settings.builder().put("cluster.remote.node.attr", "gateway").build(); + Predicate nodePredicate = SniffConnectionStrategy.getNodePredicate(settings); + Set allRoles = DiscoveryNodeRole.BUILT_IN_ROLES; + Set dedicatedMasterRoles = Set.of(DiscoveryNodeRole.MASTER_ROLE); + { + DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"), + dedicatedMasterRoles, Version.CURRENT); + assertFalse(nodePredicate.test(node)); + } + { + DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"), + dedicatedMasterRoles, Version.CURRENT); + assertFalse(nodePredicate.test(node)); + } + { + DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"), + dedicatedMasterRoles, Version.CURRENT); + assertFalse(nodePredicate.test(node)); + } + { + DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"), + allRoles, Version.CURRENT); + assertTrue(nodePredicate.test(node)); + } + } + + private static List seedNodes(final DiscoveryNode... seedNodes) { + return Arrays.stream(seedNodes).map(s -> s.getAddress().toString()).collect(Collectors.toList()); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java index c241c7a9aa070..917308eea972e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java @@ -12,8 +12,8 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.RemoteConnectionStrategy; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryRequest; import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction; @@ -21,7 +21,6 @@ import org.elasticsearch.xpack.ccr.repository.CcrRepository; import java.io.IOException; -import java.util.List; import java.util.Set; class CcrRepositoryManager extends AbstractLifecycleComponent { @@ -69,20 +68,19 @@ private RemoteSettingsUpdateListener(Settings settings) { } void init() { - Set clusterAliases = buildRemoteClustersDynamicConfig(settings).keySet(); + Set clusterAliases = getEnabledRemoteClusters(settings); for (String clusterAlias : clusterAliases) { putRepository(CcrRepository.NAME_PREFIX + clusterAlias); } } @Override - protected void updateRemoteCluster(String clusterAlias, List addresses, String proxy, boolean compressionEnabled, - TimeValue pingSchedule) { + protected void updateRemoteCluster(String clusterAlias, Settings settings) { String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias; - if (addresses.isEmpty()) { - deleteRepository(repositoryName); - } else { + if (RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, settings)) { putRepository(repositoryName); + } else { + deleteRepository(repositoryName); } } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java index 0316482571eb2..3a07ef9aa8852 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java @@ -67,11 +67,9 @@ public void setupLocalRemote() throws Exception { updateSettingsRequest.transientSettings(Settings.builder().put("cluster.remote.local.seeds", address)); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - assertBusy(() -> { - List infos = client().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos(); - assertThat(infos.size(), equalTo(1)); - assertThat(infos.get(0).getNumNodesConnected(), equalTo(1)); - }); + List infos = client().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos(); + assertThat(infos.size(), equalTo(1)); + assertThat(infos.get(0).getNumNodesConnected(), equalTo(1)); } @Before diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java index 4f6c0b041716f..0b419c4bb20b9 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java @@ -93,13 +93,10 @@ private void setupRemoteCluster() throws Exception { String address = getLeaderCluster().getMasterNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); updateSettingsRequest.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", address)); assertAcked(followerClient().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - - assertBusy(() -> { - List infos = - followerClient().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos(); - assertThat(infos.size(), equalTo(1)); - assertThat(infos.get(0).getNumNodesConnected(), greaterThanOrEqualTo(1)); - }); + List infos = + followerClient().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos(); + assertThat(infos.size(), equalTo(1)); + assertThat(infos.get(0).getNumNodesConnected(), greaterThanOrEqualTo(1)); } private void cleanRemoteCluster() throws Exception { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java index 1e3e91871108a..7245bf7356611 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java @@ -23,10 +23,10 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.protocol.xpack.graph.GraphExploreRequest; import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.RemoteConnectionStrategy; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.xpack.core.security.authz.ResolvedIndices; @@ -441,17 +441,16 @@ private static class RemoteClusterResolver extends RemoteClusterAware { private RemoteClusterResolver(Settings settings, ClusterSettings clusterSettings) { super(settings); - clusters = new CopyOnWriteArraySet<>(buildRemoteClustersDynamicConfig(settings).keySet()); + clusters = new CopyOnWriteArraySet<>(getEnabledRemoteClusters(settings)); listenForUpdates(clusterSettings); } @Override - protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress, boolean compressionEnabled, - TimeValue pingSchedule) { - if (addresses.isEmpty()) { - clusters.remove(clusterAlias); - } else { + protected void updateRemoteCluster(String clusterAlias, Settings settings) { + if (RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, settings)) { clusters.add(clusterAlias); + } else { + clusters.remove(clusterAlias); } }