diff --git a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java index 839a1d19285b..890cdaf25387 100644 --- a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java @@ -32,7 +32,9 @@ import org.elasticsearch.common.util.concurrent.CountDown; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -74,6 +76,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { private final int maxNumConnections; private final AtomicLong counter = new AtomicLong(0); + private final List configuredAddresses; private final List> addresses; private final AtomicReference remoteClusterName = new AtomicReference<>(); private final ConnectionProfile profile; @@ -100,6 +103,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { int maxNumConnections, List configuredAddresses, List> addresses) { super(clusterAlias, transportService, connectionManager); this.maxNumConnections = maxNumConnections; + this.configuredAddresses = configuredAddresses; assert addresses.isEmpty() == false : "Cannot use simple connection strategy with no configured addresses"; this.addresses = addresses; // TODO: Move into the ConnectionManager @@ -134,7 +138,9 @@ protected boolean shouldOpenMoreConnections() { @Override protected boolean strategyMustBeRebuilt(Settings newSettings) { - return false; + List addresses = REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(newSettings); + int numOfSockets = REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(newSettings); + return numOfSockets != maxNumConnections || addressesChanged(configuredAddresses, addresses); } @Override @@ -223,4 +229,13 @@ private TransportAddress nextAddress(List resolvedAddresses) { private static TransportAddress resolveAddress(String address) { return new TransportAddress(parseSeedAddress(address)); } + + private boolean addressesChanged(final List oldAddresses, final List newAddresses) { + if (oldAddresses.size() != newAddresses.size()) { + return true; + } + Set oldSeeds = new HashSet<>(oldAddresses); + Set newSeeds = new HashSet<>(newAddresses); + return oldSeeds.equals(newSeeds) == false; + } } diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index ee56629ebf0a..9ec0f4afe999 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -202,7 +202,9 @@ protected boolean shouldOpenMoreConnections() { protected boolean strategyMustBeRebuilt(Settings newSettings) { String proxy = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(newSettings); List addresses = REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(newSettings); - return seedsChanged(configuredSeedNodes, addresses) || proxyChanged(proxyAddress, proxy); + int nodeConnections = REMOTE_NODE_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(newSettings); + return nodeConnections != maxNumRemoteConnections || seedsChanged(configuredSeedNodes, addresses) || + proxyChanged(proxyAddress, proxy); } @Override diff --git a/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java index 35a6b7a6758a..4144cc856bd3 100644 --- a/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.AbstractScopedSettings; import org.elasticsearch.common.settings.ClusterSettings; @@ -303,6 +304,59 @@ numOfConnections, addresses(address), Collections.singletonList(addressSupplier } } + public void testSimpleStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesChange() { + try (MockTransportService transport1 = startTransport("node1", Version.CURRENT); + MockTransportService transport2 = startTransport("node2", Version.CURRENT)) { + TransportAddress address1 = transport1.boundAddress().publishAddress(); + TransportAddress address2 = transport2.boundAddress().publishAddress(); + + try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) { + localService.start(); + localService.acceptIncomingRequests(); + + ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + int numOfConnections = randomIntBetween(4, 8); + try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager, + numOfConnections, addresses(address1, address2))) { + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + strategy.connect(connectFuture); + connectFuture.actionGet(); + + assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + assertEquals(numOfConnections, connectionManager.size()); + assertTrue(strategy.assertNoRunningConnections()); + + Setting modeSetting = RemoteConnectionStrategy.REMOTE_CONNECTION_MODE + .getConcreteSettingForNamespace("cluster-alias"); + Setting addressesSetting = SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES + .getConcreteSettingForNamespace("cluster-alias"); + Setting socketConnections = SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS + .getConcreteSettingForNamespace("cluster-alias"); + + Settings noChange = Settings.builder() + .put(modeSetting.getKey(), "simple") + .put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1, address2).toArray())) + .put(socketConnections.getKey(), numOfConnections) + .build(); + assertFalse(strategy.shouldRebuildConnection(noChange)); + Settings addressesChanged = Settings.builder() + .put(modeSetting.getKey(), "simple") + .put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1).toArray())) + .build(); + assertTrue(strategy.shouldRebuildConnection(addressesChanged)); + Settings socketsChanged = Settings.builder() + .put(modeSetting.getKey(), "simple") + .put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1, address2).toArray())) + .put(socketConnections.getKey(), numOfConnections + 1) + .build(); + assertTrue(strategy.shouldRebuildConnection(socketsChanged)); + } + } + } + } + public void testModeSettingsCannotBeUsedWhenInDifferentMode() { List, String>> restrictedSettings = Arrays.asList( new Tuple<>(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, "192.168.0.1:8080"), diff --git a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java index 721055a9c20f..30f30723c19a 100644 --- a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java @@ -487,7 +487,7 @@ public void testConfiguredProxyAddressModeWillReplaceNodeAddress() { } } - public void testSniffStrategyWillNeedToBeRebuiltIfSeedsOrProxyChange() { + public void testSniffStrategyWillNeedToBeRebuiltIfNumOfConnectionsOrSeedsOrProxyChange() { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { @@ -516,9 +516,12 @@ public void testSniffStrategyWillNeedToBeRebuiltIfSeedsOrProxyChange() { Setting seedSetting = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("cluster-alias"); Setting proxySetting = SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace("cluster-alias"); + Setting numConnections = SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS + .getConcreteSettingForNamespace("cluster-alias"); Settings noChange = Settings.builder() .put(seedSetting.getKey(), Strings.arrayToCommaDelimitedString(seedNodes(seedNode).toArray())) + .put(numConnections.getKey(), 3) .build(); assertFalse(strategy.shouldRebuildConnection(noChange)); Settings seedsChanged = Settings.builder() @@ -530,6 +533,11 @@ public void testSniffStrategyWillNeedToBeRebuiltIfSeedsOrProxyChange() { .put(proxySetting.getKey(), "proxy_address:9300") .build(); assertTrue(strategy.shouldRebuildConnection(proxyChanged)); + Settings connectionsChanged = Settings.builder() + .put(seedSetting.getKey(), Strings.arrayToCommaDelimitedString(seedNodes(seedNode).toArray())) + .put(numConnections.getKey(), 4) + .build(); + assertTrue(strategy.shouldRebuildConnection(connectionsChanged)); } } }