diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java index a873129723577..3d7ba09c839fc 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java @@ -33,11 +33,14 @@ package org.opensearch.cluster.routing.allocation.decider; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; import com.carrotsearch.hppc.ObjectIntHashMap; +import com.carrotsearch.hppc.cursors.ObjectCursor; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.ShardRouting; @@ -207,12 +210,14 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout int numberOfAttributes = nodesPerAttribute.size(); List fullValues = forcedAwarenessAttributes.get(awarenessAttribute); + if (fullValues != null) { - for (String fullValue : fullValues) { - if (shardPerAttribute.containsKey(fullValue) == false) { - numberOfAttributes++; - } + // If forced awareness is enabled, numberOfAttributes = count(distinct((union(discovered_attributes, forced_attributes))) + Set attributesSet = new HashSet<>(fullValues); + for (ObjectCursor stringObjectCursor : nodesPerAttribute.keys()) { + attributesSet.add(stringObjectCursor.value); } + numberOfAttributes = attributesSet.size(); } // TODO should we remove ones that are not part of full list? diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationTests.java index c9e427a178515..b2adcd21cd8c9 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationTests.java @@ -35,23 +35,32 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.OpenSearchAllocationTestCase; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.command.AllocationCommands; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; + +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import static java.util.Collections.singletonMap; @@ -971,4 +980,87 @@ public void testMultipleAwarenessAttributes() { assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); } + + public void testAllocationExplainForUnassignedShardsWithUnbalancedZones() { + Settings settings = Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + AllocationService strategy = createAllocationService(settings); + + logger.info("Building initial routing table for 'testAllocationExplainForUnassignedShardsWithUnbalancedZones'"); + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(initialRoutingTable) + .build(); + + logger.info("--> adding 3 nodes in different zones and do rerouting"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(newNode("A-0", singletonMap("zone", "a"))) + .add(newNode("A-1", singletonMap("zone", "a"))) + .add(newNode("B-0", singletonMap("zone", "b"))) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(0)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + + logger.info("--> start the shard (primary)"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + // One Shard is unassigned due to forced zone awareness + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1)); + + List unassignedShards = clusterState.getRoutingTable().shardsWithState(UNASSIGNED); + + ClusterSettings EMPTY_CLUSTER_SETTINGS = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + // Add a new node in zone c + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("C-0", singletonMap("zone", "c")))) + .build(); + + final AwarenessAllocationDecider decider = new AwarenessAllocationDecider(settings, EMPTY_CLUSTER_SETTINGS); + + final RoutingAllocation allocation = new RoutingAllocation( + new AllocationDeciders(Collections.singleton(decider)), + clusterState.getRoutingNodes(), + clusterState, + null, + null, + 0L + ); + allocation.debugDecision(true); + + Decision decision = null; + RoutingNodes nodes = clusterState.getRoutingNodes(); + + for (RoutingNode node : nodes) { + // Try to allocate unassigned shard to A-0, fails because of forced zone awareness + if (node.nodeId().equals("A-0")) { + decision = decider.canAllocate(unassignedShards.get(0), node, allocation); + assertEquals(Decision.Type.NO, decision.type()); + assertEquals( + decision.getExplanation(), + "there are too many copies of the shard allocated to nodes with attribute" + + " [zone], there are [3] total configured shard copies for this shard id and [3]" + + " total attribute values, expected the allocated shard count per attribute [2] to" + + " be less than or equal to the upper bound of the required number of shards per attribute [1]" + ); + } + + } + } }