Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make remote setting updates support diff strategies #47891

Merged
merged 25 commits into from
Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;

import java.net.InetAddress;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -103,6 +102,17 @@ protected RemoteClusterAware(Settings settings) {
this.clusterNameResolver = new ClusterNameExpressionResolver();
}

/**
* Returns remote clusters that are enabled in these settings
*/
protected static Set<String> getEnabledRemoteClusters(final Settings settings) {
final Stream<Setting<List<String>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
return allConcreteSettings
.map(REMOTE_CLUSTERS_SEEDS::getNamespace)
.filter(clusterAlias -> RemoteConnectionStrategy.isConnectionEnabled(clusterAlias, settings))
.collect(Collectors.toSet());
}

/**
* Builds the dynamic per-cluster config from the given settings. This is a map keyed by the cluster alias that points to a tuple
* (ProxyAddresss, [SeedNodeSuppliers]). If a cluster is configured with a proxy address all seed nodes will point to
Expand Down Expand Up @@ -177,29 +187,17 @@ protected Map<String, List<String>> groupClusterIndices(Set<String> remoteCluste
return perClusterIndices;
}

void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy) {
Boolean compress = TransportSettings.TRANSPORT_COMPRESS.get(settings);
TimeValue pingSchedule = TransportSettings.PING_SCHEDULE.get(settings);
updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule);
}

void updateRemoteCluster(String clusterAlias, Settings settings) {
String proxy = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings);
List<String> addresses = REMOTE_CLUSTERS_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings);
Boolean compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings);
TimeValue pingSchedule = RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE
.getConcreteSettingForNamespace(clusterAlias)
.get(settings);

updateRemoteCluster(clusterAlias, addresses, proxy, compress, pingSchedule);
void validateAndUpdateRemoteCluster(String clusterAlias, Settings settings) {
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
}
updateRemoteCluster(clusterAlias, settings);
}

/**
* Subclasses must implement this to receive information about updated cluster aliases. If the given address list is
* empty the cluster alias is unregistered and should be removed.
* Subclasses must implement this to receive information about updated cluster aliases.
*/
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy, boolean compressionEnabled,
TimeValue pingSchedule);
protected abstract void updateRemoteCluster(String clusterAlias, Settings settings);

/**
* Registers this instance to listen to updates on the cluster settings.
Expand All @@ -208,7 +206,7 @@ public void listenForUpdates(ClusterSettings clusterSettings) {
List<Setting.AffixSetting<?>> remoteClusterSettings = Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_PROXY,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE);
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::updateRemoteCluster);
clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster);
}

static InetSocketAddress parseSeedAddress(String remoteHost) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -36,11 +35,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the
Expand All @@ -61,46 +56,29 @@ final class RemoteClusterConnection implements Closeable {
private final RemoteConnectionManager remoteConnectionManager;
private final RemoteConnectionStrategy connectionStrategy;
private final String clusterAlias;
private final int maxNumRemoteConnections;
private final ThreadPool threadPool;
private final List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes;
private final String proxyAddress;
private volatile boolean skipUnavailable;
private final TimeValue initialConnectionTimeout;

/**
* Creates a new {@link RemoteClusterConnection}
* @param settings the nodes settings object
* @param clusterAlias the configured alias of the cluster to connect to
* @param seedNodes a list of seed nodes to discover eligible nodes from
* @param transportService the local nodes transport service
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
* @param proxyAddress the proxy address
* @param connectionProfile the connection profile to use
*/
RemoteClusterConnection(Settings settings, String clusterAlias, List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes,
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
String proxyAddress, ConnectionProfile connectionProfile) {
this(settings, clusterAlias, seedNodes, transportService, maxNumRemoteConnections, nodePredicate, proxyAddress,
createConnectionManager(connectionProfile, transportService));
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService) {
this(settings, clusterAlias, transportService,
createConnectionManager(buildConnectionProfileFromSettings(settings, clusterAlias), transportService));
}

// Public for tests to pass a StubbableConnectionManager
RemoteClusterConnection(Settings settings, String clusterAlias, List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes,
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
String proxyAddress, ConnectionManager connectionManager) {
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService,
ConnectionManager connectionManager) {
this.transportService = transportService;
this.maxNumRemoteConnections = maxNumRemoteConnections;
this.clusterAlias = clusterAlias;
this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
this.connectionStrategy = new SniffConnectionStrategy(clusterAlias, transportService, remoteConnectionManager,
proxyAddress, maxNumRemoteConnections, nodePredicate,
Collections.unmodifiableList(seedNodes));
this.connectionStrategy = new SniffConnectionStrategy(clusterAlias, transportService, remoteConnectionManager, settings);
ywelsch marked this conversation as resolved.
Show resolved Hide resolved
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
connectionManager.addListener(transportService);
this.seedNodes = Collections.unmodifiableList(seedNodes);
this.proxyAddress = proxyAddress;
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE
.getConcreteSettingForNamespace(clusterAlias).get(settings);
this.threadPool = transportService.threadPool;
Expand All @@ -125,11 +103,11 @@ boolean isSkipUnavailable() {
* Ensures that this cluster is connected. If the cluster is connected this operation
* will invoke the listener immediately.
*/
void ensureConnected(ActionListener<Void> voidActionListener) {
void ensureConnected(ActionListener<Void> listener) {
if (remoteConnectionManager.size() == 0) {
connectionStrategy.connect(voidActionListener);
connectionStrategy.connect(listener);
} else {
voidActionListener.onResponse(null);
listener.onResponse(null);
}
}

Expand Down Expand Up @@ -215,14 +193,6 @@ public boolean isClosed() {
return connectionStrategy.isClosed();
}

List<Tuple<String, Supplier<DiscoveryNode>>> getSeedNodes() {
return seedNodes;
}

String getProxyAddress() {
return proxyAddress;
}

// for testing only
boolean assertNoRunningConnections() {
return connectionStrategy.assertNoRunningConnections();
Expand All @@ -236,13 +206,24 @@ boolean isNodeConnected(final DiscoveryNode node) {
* Get the information about remote nodes to be rendered on {@code _remote/info} requests.
*/
public RemoteConnectionInfo getConnectionInfo() {
return new RemoteConnectionInfo(
if (connectionStrategy instanceof SniffConnectionStrategy) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of instanceof, can we call a method on the connectionStrategy object here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to work on this in follow-up. The RemoteConnectionInfo piece of work kind of needs it's own refactoring. It currently is tightly coupled to the "sniff" strategy and cross cluster search (the skip unavailable boolean). In the current PR, it retains the existing behavior.

SniffConnectionStrategy sniffStrategy = (SniffConnectionStrategy) this.connectionStrategy;
return new RemoteConnectionInfo(
clusterAlias,
sniffStrategy.getSeedNodes(),
sniffStrategy.getMaxConnections(),
getNumNodesConnected(),
initialConnectionTimeout,
skipUnavailable);
} else {
return new RemoteConnectionInfo(
clusterAlias,
seedNodes.stream().map(Tuple::v1).collect(Collectors.toList()),
maxNumRemoteConnections,
Collections.emptyList(),
0,
getNumNodesConnected(),
initialConnectionTimeout,
skipUnavailable);
}
}

int getNumNodesConnected() {
Expand All @@ -256,4 +237,22 @@ private static ConnectionManager createConnectionManager(ConnectionProfile conne
ConnectionManager getConnectionManager() {
return remoteConnectionManager.getConnectionManager();
}

public boolean shouldRebuildConnection(Settings newSettings) {
return connectionStrategy.shouldRebuildConnection(newSettings);
}

static ConnectionProfile buildConnectionProfileFromSettings(Settings settings, String clusterName) {
return new ConnectionProfile.Builder()
.setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
.setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) // TODO make this configurable?
// we don't want this to be used for anything else but search
.addConnections(0, TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.STATE,
TransportRequestOptions.Type.RECOVERY)
.setCompressionEnabled(RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterName).get(settings))
.setPingInterval(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterName).get(settings))
.build();
}
}
Loading