Skip to content

Commit

Permalink
Add flat_skew setting to node overload decider (#3563) (#3582)
Browse files Browse the repository at this point in the history
* Add flat_skew setting to node overload decider

Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
opensearch-trigger-bot[bot] committed Jun 15, 2022
1 parent 00fb2b7 commit f69c34c
Show file tree
Hide file tree
Showing 4 changed files with 338 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,17 @@
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -351,4 +354,140 @@ public void testAwarenessZonesIncrementalNodes() {
assertThat(counts.get(B_1), equalTo(2));
assertThat(counts.get(noZoneNode), equalTo(2));
}

public void testThreeZoneOneReplicaWithForceZoneValueAndLoadAwareness() throws Exception {
int nodeCountPerAZ = 5;
int numOfShards = 30;
int numOfReplica = 1;
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.put("cluster.routing.allocation.load_awareness.skew_factor", "0.0")
.put("cluster.routing.allocation.load_awareness.provisioned_capacity", Integer.toString(nodeCountPerAZ * 3))
.build();

logger.info("--> starting 15 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

// Creating index with 30 primary and 1 replica
createIndex(
"test-1",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplica)
.build()
);

ClusterHealthResponse health = client().admin()
.cluster()
.prepareHealth()
.setIndices("test-1")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes(Integer.toString(nodeCountPerAZ * 3))
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.execute()
.actionGet();
assertFalse(health.isTimedOut());

ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
ObjectIntHashMap<String> counts = new ObjectIntHashMap<>();

for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
counts.addTo(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1);
}
}
}

assertThat(counts.size(), equalTo(nodeCountPerAZ * 3));
// All shards should be started
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(numOfShards * (numOfReplica + 1)));

// stopping half nodes in zone a
int nodesToStop = nodeCountPerAZ / 2;
List<Settings> nodeDataPathSettings = new ArrayList<>();
for (int i = 0; i < nodesToStop; i++) {
nodeDataPathSettings.add(internalCluster().dataPathSettings(nodes_in_zone_a.get(i)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes_in_zone_a.get(i)));
}

client().admin().cluster().prepareReroute().setRetryFailed(true).get();
health = client().admin()
.cluster()
.prepareHealth()
.setIndices("test-1")
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes(Integer.toString(nodeCountPerAZ * 3 - nodesToStop))
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.execute()
.actionGet();
assertFalse(health.isTimedOut());

// Creating another index with 30 primary and 1 replica
createIndex(
"test-2",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numOfShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplica)
.build()
);

health = client().admin()
.cluster()
.prepareHealth()
.setIndices("test-1", "test-2")
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes(Integer.toString(nodeCountPerAZ * 3 - nodesToStop))
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.execute()
.actionGet();
assertFalse(health.isTimedOut());

// Restarting the nodes back
for (int i = 0; i < nodesToStop; i++) {
internalCluster().startNode(
Settings.builder()
.put("node.name", nodes_in_zone_a.get(i))
.put(nodeDataPathSettings.get(i))
.put(commonSettings)
.put("node.attr.zone", "a")
.build()
);
}
client().admin().cluster().prepareReroute().setRetryFailed(true).get();

health = client().admin()
.cluster()
.prepareHealth()
.setIndices("test-1", "test-2")
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes(Integer.toString(nodeCountPerAZ * 3))
.setWaitForGreenStatus()
.setWaitForActiveShards(2 * numOfShards * (numOfReplica + 1))
.setWaitForNoRelocatingShards(true)
.setWaitForNoInitializingShards(true)
.execute()
.actionGet();
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();

// All shards should be started now and cluster health should be green
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2 * numOfShards * (numOfReplica + 1)));
assertThat(health.isTimedOut(), equalTo(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
* </pre>
* <p>
* and prevent allocation on the surviving nodes of the under capacity cluster
* based on overload factor defined as a percentage by
* based on overload factor defined as a percentage and flat skew as absolute allowed skewness by
* </p>
* <pre>
* cluster.routing.allocation.load_awareness.skew_factor: X
* cluster.routing.allocation.load_awareness.flat_skew: N
* </pre>
* The total limit per node based on skew_factor doesn't limit primaries that previously
* The total limit per node based on skew_factor and flat_skew doesn't limit primaries that previously
* existed on the disk as those shards are force allocated by
* {@link AllocationDeciders#canForceAllocatePrimary(ShardRouting, RoutingNode, RoutingAllocation)}
* however new primaries due to index creation, snapshot restore etc can be controlled via the below settings.
Expand Down Expand Up @@ -74,19 +76,29 @@ public class NodeLoadAwareAllocationDecider extends AllocationDecider {
Setting.Property.Dynamic,
Property.NodeScope
);
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING = Setting.intSetting(
"cluster.routing.allocation.load_awareness.flat_skew",
2,
2,
Property.Dynamic,
Property.NodeScope
);

private volatile int provisionedCapacity;

private volatile double skewFactor;

private volatile boolean allowUnassignedPrimaries;

private volatile int flatSkew;

private static final Logger logger = LogManager.getLogger(NodeLoadAwareAllocationDecider.class);

public NodeLoadAwareAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
this.skewFactor = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.get(settings);
this.provisionedCapacity = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.get(settings);
this.allowUnassignedPrimaries = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.get(settings);
this.flatSkew = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING, this::setSkewFactor);
clusterSettings.addSettingsUpdateConsumer(
CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING,
Expand All @@ -96,6 +108,7 @@ public NodeLoadAwareAllocationDecider(Settings settings, ClusterSettings cluster
CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING,
this::setAllowUnassignedPrimaries
);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING, this::setFlatSkew);
}

private void setAllowUnassignedPrimaries(boolean allowUnassignedPrimaries) {
Expand All @@ -110,6 +123,10 @@ private void setProvisionedCapacity(int provisionedCapacity) {
this.provisionedCapacity = provisionedCapacity;
}

private void setFlatSkew(int flatSkew) {
this.flatSkew = flatSkew;
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, (count, limit) -> count >= limit);
Expand Down Expand Up @@ -146,7 +163,7 @@ private Decision underCapacity(
Metadata metadata = allocation.metadata();
float expectedAvgShardsPerNode = (float) metadata.getTotalNumberOfShards() / provisionedCapacity;
int nodeShardCount = node.numberOfOwningShards();
int limit = (int) Math.ceil(expectedAvgShardsPerNode * (1 + skewFactor / 100.0));
int limit = flatSkew + (int) Math.ceil(expectedAvgShardsPerNode * (1 + skewFactor / 100.0));
if (decider.test(nodeShardCount, limit)) {
logger.debug(
() -> new ParameterizedMessage(
Expand All @@ -163,10 +180,11 @@ private Decision underCapacity(
Decision.NO,
NAME,
"too many shards [%d] allocated to this node, limit per node [%d] considering"
+ " overload factor [%.2f] based on capacity [%d]",
+ " overload factor [%.2f] and flat skew [%d] based on capacity [%d]",
nodeShardCount,
limit,
skewFactor,
flatSkew,
provisionedCapacity
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ public void apply(Settings value, Settings current, Settings previous) {
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING,
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING,
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING,
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_FLAT_SKEW_SETTING,
ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED,
ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED,
ShardIndexingPressureSettings.REQUEST_SIZE_WINDOW,
Expand Down
Loading

0 comments on commit f69c34c

Please sign in to comment.