Skip to content

Commit

Permalink
Ensure remote strategy settings can be updated (elastic#49772)
Browse files Browse the repository at this point in the history
This is related to elastic#49067. As part of this work a new sniff number of
node connections setting, a simple addresses setting, and a simple
number of sockets setting have been added. This commit ensures that
these settings are properly hooked up to support dynamic updates.
  • Loading branch information
Tim-Brooks authored and SivagurunathanV committed Jan 21, 2020
1 parent cd90088 commit 3afe182
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import org.elasticsearch.common.util.concurrent.CountDown;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -74,6 +76,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {

private final int maxNumConnections;
private final AtomicLong counter = new AtomicLong(0);
private final List<String> configuredAddresses;
private final List<Supplier<TransportAddress>> addresses;
private final AtomicReference<ClusterName> remoteClusterName = new AtomicReference<>();
private final ConnectionProfile profile;
Expand All @@ -100,6 +103,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy {
int maxNumConnections, List<String> configuredAddresses, List<Supplier<TransportAddress>> addresses) {
super(clusterAlias, transportService, connectionManager);
this.maxNumConnections = maxNumConnections;
this.configuredAddresses = configuredAddresses;
assert addresses.isEmpty() == false : "Cannot use simple connection strategy with no configured addresses";
this.addresses = addresses;
// TODO: Move into the ConnectionManager
Expand Down Expand Up @@ -134,7 +138,9 @@ protected boolean shouldOpenMoreConnections() {

@Override
protected boolean strategyMustBeRebuilt(Settings newSettings) {
return false;
List<String> addresses = REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
int numOfSockets = REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
return numOfSockets != maxNumConnections || addressesChanged(configuredAddresses, addresses);
}

@Override
Expand Down Expand Up @@ -223,4 +229,13 @@ private TransportAddress nextAddress(List<TransportAddress> resolvedAddresses) {
private static TransportAddress resolveAddress(String address) {
return new TransportAddress(parseSeedAddress(address));
}

private boolean addressesChanged(final List<String> oldAddresses, final List<String> newAddresses) {
if (oldAddresses.size() != newAddresses.size()) {
return true;
}
Set<String> oldSeeds = new HashSet<>(oldAddresses);
Set<String> newSeeds = new HashSet<>(newAddresses);
return oldSeeds.equals(newSeeds) == false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ protected boolean shouldOpenMoreConnections() {
protected boolean strategyMustBeRebuilt(Settings newSettings) {
String proxy = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
List<String> addresses = REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
return seedsChanged(configuredSeedNodes, addresses) || proxyChanged(proxyAddress, proxy);
int nodeConnections = REMOTE_NODE_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
return nodeConnections != maxNumRemoteConnections || seedsChanged(configuredSeedNodes, addresses) ||
proxyChanged(proxyAddress, proxy);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.AbstractScopedSettings;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -303,6 +304,59 @@ numOfConnections, addresses(address), Collections.singletonList(addressSupplier
}
}

public void testSimpleStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesChange() {
try (MockTransportService transport1 = startTransport("node1", Version.CURRENT);
MockTransportService transport2 = startTransport("node2", Version.CURRENT)) {
TransportAddress address1 = transport1.boundAddress().publishAddress();
TransportAddress address2 = transport2.boundAddress().publishAddress();

try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) {
localService.start();
localService.acceptIncomingRequests();

ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport);
int numOfConnections = randomIntBetween(4, 8);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, addresses(address1, address2))) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();

assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
assertEquals(numOfConnections, connectionManager.size());
assertTrue(strategy.assertNoRunningConnections());

Setting<?> modeSetting = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE
.getConcreteSettingForNamespace("cluster-alias");
Setting<?> addressesSetting = SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES
.getConcreteSettingForNamespace("cluster-alias");
Setting<?> socketConnections = SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS
.getConcreteSettingForNamespace("cluster-alias");

Settings noChange = Settings.builder()
.put(modeSetting.getKey(), "simple")
.put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1, address2).toArray()))
.put(socketConnections.getKey(), numOfConnections)
.build();
assertFalse(strategy.shouldRebuildConnection(noChange));
Settings addressesChanged = Settings.builder()
.put(modeSetting.getKey(), "simple")
.put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1).toArray()))
.build();
assertTrue(strategy.shouldRebuildConnection(addressesChanged));
Settings socketsChanged = Settings.builder()
.put(modeSetting.getKey(), "simple")
.put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1, address2).toArray()))
.put(socketConnections.getKey(), numOfConnections + 1)
.build();
assertTrue(strategy.shouldRebuildConnection(socketsChanged));
}
}
}
}

public void testModeSettingsCannotBeUsedWhenInDifferentMode() {
List<Tuple<Setting.AffixSetting<?>, String>> restrictedSettings = Arrays.asList(
new Tuple<>(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, "192.168.0.1:8080"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ public void testConfiguredProxyAddressModeWillReplaceNodeAddress() {
}
}

public void testSniffStrategyWillNeedToBeRebuiltIfSeedsOrProxyChange() {
public void testSniffStrategyWillNeedToBeRebuiltIfNumOfConnectionsOrSeedsOrProxyChange() {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
Expand Down Expand Up @@ -516,9 +516,12 @@ public void testSniffStrategyWillNeedToBeRebuiltIfSeedsOrProxyChange() {

Setting<?> seedSetting = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("cluster-alias");
Setting<?> proxySetting = SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace("cluster-alias");
Setting<?> numConnections = SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS
.getConcreteSettingForNamespace("cluster-alias");

Settings noChange = Settings.builder()
.put(seedSetting.getKey(), Strings.arrayToCommaDelimitedString(seedNodes(seedNode).toArray()))
.put(numConnections.getKey(), 3)
.build();
assertFalse(strategy.shouldRebuildConnection(noChange));
Settings seedsChanged = Settings.builder()
Expand All @@ -530,6 +533,11 @@ public void testSniffStrategyWillNeedToBeRebuiltIfSeedsOrProxyChange() {
.put(proxySetting.getKey(), "proxy_address:9300")
.build();
assertTrue(strategy.shouldRebuildConnection(proxyChanged));
Settings connectionsChanged = Settings.builder()
.put(seedSetting.getKey(), Strings.arrayToCommaDelimitedString(seedNodes(seedNode).toArray()))
.put(numConnections.getKey(), 4)
.build();
assertTrue(strategy.shouldRebuildConnection(connectionsChanged));
}
}
}
Expand Down

0 comments on commit 3afe182

Please sign in to comment.