Skip to content

Commit

Permalink
skip zone awareness when auto-expand set to all
Browse files Browse the repository at this point in the history
Signed-off-by: amberzsy <xxamber998@gmail.com>
  • Loading branch information
amberzsy committed Sep 22, 2024
1 parent 34b8888 commit 0d54d66
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement WithFieldName interface in ValuesSourceAggregationBuilder & FieldSortBuilder ([#15916](https://github.com/opensearch-project/OpenSearch/pull/15916))
- Add successfulSearchShardIndices in searchRequestContext ([#15967](https://github.com/opensearch-project/OpenSearch/pull/15967))
- Remove identity-related feature flagged code from the RestController ([#15430](https://github.com/opensearch-project/OpenSearch/pull/15430))
- Skip zone awareness when auto-expand set to all

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.23.1 to 2.24.0 ([#15858](https://github.com/opensearch-project/OpenSearch/pull/15858))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,4 +500,68 @@ public void testThreeZoneOneReplicaWithForceZoneValueAndLoadAwareness() throws E
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2 * numOfShards * (numOfReplica + 1)));
assertThat(health.isTimedOut(), equalTo(false));
}

public void testAwarenessZonesWithAutoExpand() {
Settings commonSettings = Settings.builder()
.put(AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING.getKey(), true)
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a")
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
.build();

logger.info("--> starting 2 nodes on same zone");
List<String> nodes = internalCluster().startNodes(
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
String A = nodes.get(0);
String B = nodes.get(1);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-all")
.build()
);

if (randomBoolean()) {
assertAcked(client().admin().indices().prepareClose("test"));
}

logger.info("--> waiting for shards to be allocated");
health = client().admin()
.cluster()
.prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4));

final Map<String, Integer> counts = new HashMap<>();
int replicaCount = 0;

for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
if (shardRouting.primary()) {
replicaCount++;
}
counts.merge(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1, Integer::sum);
}
}
}
assertThat(counts.get(A), anyOf(equalTo(1), equalTo(2)));
assertThat(counts.get(B), anyOf(equalTo(1), equalTo(2)));
assertThat(replicaCount, equalTo(2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ public boolean isEnabled() {
return enabled;
}

public boolean autoExpandToAll() {
return maxReplicas == Integer.MAX_VALUE;
}

private OptionalInt getDesiredNumberOfReplicas(IndexMetadata indexMetadata, RoutingAllocation allocation) {
if (enabled) {
int numMatchingDataNodes = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.function.Function;

import static java.util.Collections.emptyList;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING;

/**
* This {@link AllocationDecider} controls shard allocation based on
Expand Down Expand Up @@ -160,6 +161,11 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
}

IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());

if (INDEX_AUTO_EXPAND_REPLICAS_SETTING.get(indexMetadata.getSettings()).autoExpandToAll()) {
return allocation.decision(Decision.YES, NAME, "allocation awareness is ignored, this index is set to auto-expand to all");
}

int shardCount = indexMetadata.getNumberOfReplicas() + 1; // 1 for primary
for (String awarenessAttribute : awarenessAttributes) {
// the node the shard exists on must be associated with an awareness attribute.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1063,4 +1063,39 @@ public void testAllocationExplainForUnassignedShardsWithUnbalancedZones() {

}
}

public void testIgnoredByAutoExpandReplicasToAll() {
final Settings settings = Settings.builder()
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
.build();

final AllocationService strategy = createAllocationService(settings);

final IndexMetadata.Builder metadataBuilder = IndexMetadata.builder("test")
.settings(
settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 100)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-all")
);

final Metadata metadata = Metadata.builder().put(metadataBuilder).build();

final DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(newNode("A-0", singletonMap("zone", "a")))
.add(newNode("A-1", singletonMap("zone", "a")))
.add(newNode("A-2", singletonMap("zone", "a")))
.add(newNode("B-0", singletonMap("zone", "b")))
.build();

final ClusterState clusterState = applyStartedShardsUntilNoChange(
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(Settings.EMPTY))
.metadata(metadata)
.routingTable(RoutingTable.builder().addAsNew(metadata.index("test")).build())
.nodes(nodes)
.build(),
strategy
);

assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0));
}
}

0 comments on commit 0d54d66

Please sign in to comment.