Skip to content

Commit

Permalink
Notify affixMap settings when any under the registered prefix matches (
Browse files Browse the repository at this point in the history
…#28317)

* Notify affixMap settings when any under the registered prefix matches

Previously if an affixMap setting was registered, and then a completely
different setting was applied, the affixMap update consumer would be notified
with an empty map. This caused settings that were previously set to be unset in
local state in a consumer that assumed it would only be called when the affixMap
setting was changed.

This commit changes the behavior so if a prefix `foo.` is registered, any
setting under the prefix will have the update consumer notified if there are
changes starting with `foo.`.

Resolves #28316

* Add unit test

* Address feedback
  • Loading branch information
dakrone committed Jan 22, 2018
1 parent f10335d commit 0b56f41
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ AbstractScopedSettings.SettingUpdater<Map<String, T>> newAffixMapUpdater(Consume

@Override
public boolean hasChanged(Settings current, Settings previous) {
return Stream.concat(matchStream(current), matchStream(previous)).findAny().isPresent();
return current.filter(k -> match(k)).equals(previous.filter(k -> match(k))) == false;
}

@Override
Expand All @@ -612,7 +612,7 @@ public Map<String, T> getValue(Settings current, Settings previous) {
if (updater.hasChanged(current, previous)) {
// only the ones that have changed otherwise we might get too many updates
// the hasChanged above checks only if there are any changes
T value = updater.getValue(current, previous);
T value = updater.getValue(current, previous);
if ((omitDefaults && value.equals(concreteSetting.getDefault(current))) == false) {
result.put(namespace, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@

import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -34,7 +37,9 @@
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -156,5 +161,58 @@ public void testInvalidIPFilterClusterSettings() {
.execute().actionGet());
assertEquals("invalid IP address [192.168.1.1.] for [" + filterSetting.getKey() + ipKey + "]", e.getMessage());
}

public void testTransientSettingsStillApplied() throws Exception {
List<String> nodes = internalCluster().startNodes(6);
Set<String> excludeNodes = new HashSet<>(nodes.subList(0, 3));
Set<String> includeNodes = new HashSet<>(nodes.subList(3, 6));
logger.info("--> exclude: [{}], include: [{}]",
Strings.collectionToCommaDelimitedString(excludeNodes),
Strings.collectionToCommaDelimitedString(includeNodes));
ensureStableCluster(6);
client().admin().indices().prepareCreate("test").get();
ensureGreen("test");

Settings exclude = Settings.builder().put("cluster.routing.allocation.exclude._name",
Strings.collectionToCommaDelimitedString(excludeNodes)).build();

logger.info("--> updating settings");
client().admin().cluster().prepareUpdateSettings().setTransientSettings(exclude).get();

logger.info("--> waiting for relocation");
waitForRelocation(ClusterHealthStatus.GREEN);

ClusterState state = client().admin().cluster().prepareState().get().getState();

for (ShardRouting shard : state.getRoutingTable().shardsWithState(ShardRoutingState.STARTED)) {
String node = state.getRoutingNodes().node(shard.currentNodeId()).node().getName();
logger.info("--> shard on {} - {}", node, shard);
assertTrue("shard on " + node + " but should only be on the include node list: " +
Strings.collectionToCommaDelimitedString(includeNodes),
includeNodes.contains(node));
}

Settings other = Settings.builder().put("cluster.info.update.interval", "45s").build();

logger.info("--> updating settings with random persistent setting");
client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(other).setTransientSettings(exclude).get();

logger.info("--> waiting for relocation");
waitForRelocation(ClusterHealthStatus.GREEN);

state = client().admin().cluster().prepareState().get().getState();

// The transient settings still exist in the state
assertThat(state.metaData().transientSettings(), equalTo(exclude));

for (ShardRouting shard : state.getRoutingTable().shardsWithState(ShardRoutingState.STARTED)) {
String node = state.getRoutingNodes().node(shard.currentNodeId()).node().getName();
logger.info("--> shard on {} - {}", node, shard);
assertTrue("shard on " + node + " but should only be on the include node list: " +
Strings.collectionToCommaDelimitedString(includeNodes),
includeNodes.contains(node));
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,21 @@ public void testAddConsumerAffixMap() {
assertEquals(2, listResults.size());
assertEquals(2, intResults.size());

service.applySettings(Settings.builder()
.put("foo.test.bar", 2)
.put("foo.test_1.bar", 7)
.putList("foo.test_list.list", "16", "17")
.putList("foo.test_list_1.list", "18", "19", "20")
.build());

assertEquals(2, intResults.get("test").intValue());
assertEquals(7, intResults.get("test_1").intValue());
assertEquals(Arrays.asList(16, 17), listResults.get("test_list"));
assertEquals(Arrays.asList(18, 19, 20), listResults.get("test_list_1"));
assertEquals(2, listResults.size());
assertEquals(2, intResults.size());


listResults.clear();
intResults.clear();

Expand All @@ -286,6 +301,35 @@ public void testAddConsumerAffixMap() {

}

public void testAffixMapConsumerNotCalledWithNull() {
Setting.AffixSetting<Integer> prefixSetting = Setting.prefixKeySetting("eggplant.",
(k) -> Setting.intSetting(k, 1, Property.Dynamic, Property.NodeScope));
Setting.AffixSetting<Integer> otherSetting = Setting.prefixKeySetting("other.",
(k) -> Setting.intSetting(k, 1, Property.Dynamic, Property.NodeScope));
AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY,new HashSet<>(Arrays.asList(prefixSetting, otherSetting)));
Map<String, Integer> affixResults = new HashMap<>();

Consumer<Map<String,Integer>> consumer = (map) -> {
logger.info("--> consuming settings {}", map);
affixResults.clear();
affixResults.putAll(map);
};
service.addAffixMapUpdateConsumer(prefixSetting, consumer, (s, k) -> {}, randomBoolean());
assertEquals(0, affixResults.size());
service.applySettings(Settings.builder()
.put("eggplant._name", 2)
.build());
assertThat(affixResults.size(), equalTo(1));
assertThat(affixResults.get("_name"), equalTo(2));

service.applySettings(Settings.builder()
.put("eggplant._name", 2)
.put("other.thing", 3)
.build());

assertThat(affixResults.get("_name"), equalTo(2));
}

public void testApply() {
Setting<Integer> testSetting = Setting.intSetting("foo.bar", 1, Property.Dynamic, Property.NodeScope);
Setting<Integer> testSetting2 = Setting.intSetting("foo.bar.baz", 1, Property.Dynamic, Property.NodeScope);
Expand Down

0 comments on commit 0b56f41

Please sign in to comment.