diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index 45f4a26523590..c5c480dc25d6e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -34,7 +34,6 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.CollectionUtil; -import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.opensearch.core.Assertions; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; @@ -1311,131 +1310,103 @@ private void ensureMutable() { } /** - * Creates an iterator over shards interleaving between nodes: The iterator returns the first shard from - * the first node, then the first shard of the second node, etc. until one shard from each node has been returned. - * The iterator then resumes on the first node by returning the second shard and continues until all shards from - * all the nodes have been returned. - * @param shardMovementStrategy if ShardMovementStrategy.PRIMARY_FIRST, all primary shards are iterated over before iterating replica for any node - * if ShardMovementStrategy.REPLICA_FIRST, all replica shards are iterated over before iterating primary for any node - * if ShardMovementStrategy.NO_PREFERENCE, order of replica and primary shards doesn't matter in iteration - * @return iterator of shard routings + * Returns iterator of shard routings used by {@link #nodeInterleavedShardIterator(ShardMovementStrategy)} + * @param primaryFirst true when ShardMovementStrategy = ShardMovementStrategy.PRIMARY_FIRST, false when it is ShardMovementStrategy.REPLICA_FIRST */ - public Iterator nodeInterleavedShardIterator(BalancedShardsAllocator.ShardMovementStrategy shardMovementStrategy) { + private Iterator buildIteratorForMovementStrategy(boolean primaryFirst) { final Queue> queue = new ArrayDeque<>(); for (Map.Entry entry : nodesToShards.entrySet()) { queue.add(entry.getValue().copyShards().iterator()); } - if (shardMovementStrategy == BalancedShardsAllocator.ShardMovementStrategy.PRIMARY_FIRST) { - return new Iterator() { - private Queue replicaShards = new ArrayDeque<>(); - private Queue> replicaIterators = new ArrayDeque<>(); - - public boolean hasNext() { - while (queue.isEmpty() == false) { - if (queue.peek().hasNext()) { - return true; - } - queue.poll(); - } - if (!replicaShards.isEmpty()) { + return new Iterator() { + private Queue shardRoutings = new ArrayDeque<>(); + private Queue> shardIterators = new ArrayDeque<>(); + + public boolean hasNext() { + while (queue.isEmpty() == false) { + if (queue.peek().hasNext()) { return true; } - while (!replicaIterators.isEmpty()) { - if (replicaIterators.peek().hasNext()) { - return true; - } - replicaIterators.poll(); + queue.poll(); + } + if (!shardRoutings.isEmpty()) { + return true; + } + while (!shardIterators.isEmpty()) { + if (shardIterators.peek().hasNext()) { + return true; } - return false; + shardIterators.poll(); } + return false; + } - public ShardRouting next() { - if (hasNext() == false) { - throw new NoSuchElementException(); - } - while (!queue.isEmpty()) { - Iterator iter = queue.poll(); + public ShardRouting next() { + if (hasNext() == false) { + throw new NoSuchElementException(); + } + while (!queue.isEmpty()) { + Iterator iter = queue.poll(); + if (primaryFirst) { if (iter.hasNext()) { ShardRouting result = iter.next(); if (result.primary()) { queue.offer(iter); return result; } - replicaShards.offer(result); - replicaIterators.offer(iter); + shardRoutings.offer(result); + shardIterators.offer(iter); + } + } else { + while (iter.hasNext()) { + ShardRouting result = iter.next(); + if (result.primary() == false) { + queue.offer(iter); + return result; + } + shardRoutings.offer(result); + shardIterators.offer(iter); } } - if (!replicaShards.isEmpty()) { - return replicaShards.poll(); - } - Iterator replicaIterator = replicaIterators.poll(); - ShardRouting replicaShard = replicaIterator.next(); - replicaIterators.offer(replicaIterator); - - assert !replicaShard.primary(); - return replicaShard; } - - public void remove() { - throw new UnsupportedOperationException(); + if (!shardRoutings.isEmpty()) { + return shardRoutings.poll(); } - }; - } else { - if (shardMovementStrategy == BalancedShardsAllocator.ShardMovementStrategy.REPLICA_FIRST) { - return new Iterator() { - private Queue primaryShards = new ArrayDeque<>(); - private Queue> primaryIterators = new ArrayDeque<>(); + Iterator replicaIterator = shardIterators.poll(); + ShardRouting replicaShard = replicaIterator.next(); + shardIterators.offer(replicaIterator); - public boolean hasNext() { - while (!queue.isEmpty()) { - if (queue.peek().hasNext()) { - return true; - } - queue.poll(); - } - if (!primaryShards.isEmpty()) { - return true; - } - while (!primaryIterators.isEmpty()) { - if (primaryIterators.peek().hasNext()) { - return true; - } - primaryIterators.poll(); - } - return false; - } + assert replicaShard.primary() != primaryFirst; + return replicaShard; + } - public ShardRouting next() { - if (hasNext() == false) { - throw new NoSuchElementException(); - } - while (!queue.isEmpty()) { - Iterator iter = queue.poll(); - while (iter.hasNext()) { - ShardRouting result = iter.next(); - if (result.primary() == false) { - queue.offer(iter); - return result; - } - primaryShards.offer(result); - primaryIterators.offer(iter); - } - } - if (primaryShards.isEmpty() == false) { - return primaryShards.poll(); - } - Iterator primaryIterator = primaryIterators.poll(); - ShardRouting primaryShard = primaryIterator.next(); - primaryIterators.offer(primaryIterator); + public void remove() { + throw new UnsupportedOperationException(); + } - assert primaryShard.primary(); - return primaryShard; - } + }; + } - public void remove() { - throw new UnsupportedOperationException(); - } - }; + /** + * Creates an iterator over shards interleaving between nodes: The iterator returns the first shard from + * the first node, then the first shard of the second node, etc. until one shard from each node has been returned. + * The iterator then resumes on the first node by returning the second shard and continues until all shards from + * all the nodes have been returned. + * @param shardMovementStrategy if ShardMovementStrategy.PRIMARY_FIRST, all primary shards are iterated over before iterating replica for any node + * if ShardMovementStrategy.REPLICA_FIRST, all replica shards are iterated over before iterating primary for any node + * if ShardMovementStrategy.NO_PREFERENCE, order of replica and primary shards doesn't matter in iteration + * @return iterator of shard routings + */ + public Iterator nodeInterleavedShardIterator(ShardMovementStrategy shardMovementStrategy) { + final Queue> queue = new ArrayDeque<>(); + for (Map.Entry entry : nodesToShards.entrySet()) { + queue.add(entry.getValue().copyShards().iterator()); + } + if (shardMovementStrategy == ShardMovementStrategy.PRIMARY_FIRST) { + return buildIteratorForMovementStrategy(true); + } else { + if (shardMovementStrategy == ShardMovementStrategy.REPLICA_FIRST) { + return buildIteratorForMovementStrategy(false); } else { return new Iterator() { @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/ShardMovementStrategy.java b/server/src/main/java/org/opensearch/cluster/routing/ShardMovementStrategy.java new file mode 100644 index 0000000000000..ef5b8bb928d9b --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/ShardMovementStrategy.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; + +import java.util.Locale; + +/** + * ShardMovementStrategy defines the order in which shard movement occurs. + * + * ShardMovementStrategy values or rather their string representation to be used with + * {@link BalancedShardsAllocator#SHARD_MOVEMENT_STRATEGY_SETTING} via cluster settings. + */ +public enum ShardMovementStrategy { + /** + * default behavior in which order of shard movement doesn't matter. + */ + NO_PREFERENCE, + + /** + * primary shards are moved first + */ + PRIMARY_FIRST, + + /** + * replica shards are moved first + */ + REPLICA_FIRST; + + public static ShardMovementStrategy parse(String strValue) { + if (strValue == null) { + return null; + } else { + strValue = strValue.toUpperCase(Locale.ROOT); + try { + return ShardMovementStrategy.valueOf(strValue); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Illegal allocation.shard_movement_strategy value [" + strValue + "]"); + } + } + } + + @Override + public String toString() { + return name().toLowerCase(Locale.ROOT); + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index dd44c6f8973c7..19e0e318eb805 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -37,6 +37,7 @@ import org.apache.lucene.util.IntroSorter; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.ShardMovementStrategy; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus; @@ -56,7 +57,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.Locale; import java.util.Map; import java.util.Set; @@ -112,43 +112,6 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.Deprecated ); - /** - * ShardMovementStrategy defines the order in which shard movement occurs. - *

- * Allocation settings can have the following values (non-casesensitive): - *

    - *
  • NO_PREFERENCE - default behavior in which order of shard movement doesn't matter. - *
  • PRIMARY_FIRST - primary shards are moved first. - *
  • REPLICA_FIRST - replica shards are moved first. - *
- * ShardMovementStrategy values or rather their string representation to be used with - * {@link BalancedShardsAllocator#SHARD_MOVEMENT_STRATEGY_SETTING} via cluster settings. - */ - public enum ShardMovementStrategy { - NO_PREFERENCE, - PRIMARY_FIRST, - REPLICA_FIRST; - - public static ShardMovementStrategy parse(String strValue) { - if (strValue == null) { - return null; - } else { - strValue = strValue.toUpperCase(Locale.ROOT); - try { - return ShardMovementStrategy.valueOf(strValue); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Illegal allocation.shard_movement_strategy value [" + strValue + "]"); - } - } - } - - @Override - public String toString() { - return name().toLowerCase(Locale.ROOT); - } - - } - /** * Decides order in which to move shards from node when shards can not stay on node anymore. {@link LocalShardsBalancer#moveShards()} * Encapsulates behavior of above SHARD_MOVE_PRIMARY_FIRST_SETTING. diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 2538aed129544..e1e6b696e3ad2 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -16,6 +16,7 @@ import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.RoutingPool; +import org.opensearch.cluster.routing.ShardMovementStrategy; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.UnassignedInfo; @@ -58,7 +59,7 @@ public class LocalShardsBalancer extends ShardsBalancer { private final RoutingAllocation allocation; private final RoutingNodes routingNodes; private final boolean movePrimaryFirst; - private final BalancedShardsAllocator.ShardMovementStrategy shardMovementStrategy; + private final ShardMovementStrategy shardMovementStrategy; private final boolean preferPrimaryBalance; private final BalancedShardsAllocator.WeightFunction weight; @@ -75,7 +76,7 @@ public LocalShardsBalancer( Logger logger, RoutingAllocation allocation, boolean movePrimaryFirst, - BalancedShardsAllocator.ShardMovementStrategy shardMovementStrategy, + ShardMovementStrategy shardMovementStrategy, BalancedShardsAllocator.WeightFunction weight, float threshold, boolean preferPrimaryBalance @@ -536,14 +537,14 @@ private void checkAndAddInEligibleTargetNode(RoutingNode targetNode) { * In the event of changing ShardMovementStrategy setting from default setting NO_PREFERENCE to either PRIMARY_FIRST or REPLICA_FIRST, we want that * to have priority over values set in move_primary_first setting. */ - private BalancedShardsAllocator.ShardMovementStrategy getShardMovementStrategy() { - if (shardMovementStrategy != BalancedShardsAllocator.ShardMovementStrategy.NO_PREFERENCE) { + private ShardMovementStrategy getShardMovementStrategy() { + if (shardMovementStrategy != ShardMovementStrategy.NO_PREFERENCE) { return shardMovementStrategy; } if (movePrimaryFirst) { - return BalancedShardsAllocator.ShardMovementStrategy.PRIMARY_FIRST; + return ShardMovementStrategy.PRIMARY_FIRST; } - return BalancedShardsAllocator.ShardMovementStrategy.NO_PREFERENCE; + return ShardMovementStrategy.NO_PREFERENCE; } /** @@ -594,9 +595,7 @@ void moveShards() { } // Ensure that replicas don't relocate if primaries are being throttled and primary first shard movement strategy is enabled - if ((shardMovementStrategy == BalancedShardsAllocator.ShardMovementStrategy.PRIMARY_FIRST) - && primariesThrottled - && !shardRouting.primary()) { + if ((shardMovementStrategy == ShardMovementStrategy.PRIMARY_FIRST) && primariesThrottled && !shardRouting.primary()) { logger.info( "Cannot move any replica shard in the cluster as movePrimaryFirst is enabled and primary shards" + "are being throttled. Skipping shard iteration" diff --git a/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java index 209df9fd2e30b..1304fd5564a2c 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java @@ -41,7 +41,6 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.allocation.AllocationService; -import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.common.settings.Settings; @@ -149,7 +148,7 @@ public void testInterleavedShardIteratorPrimaryFirst() { // Get primary first shard iterator and assert primary shards are iterated over first final Iterator iterator = this.clusterState.getRoutingNodes() - .nodeInterleavedShardIterator(BalancedShardsAllocator.ShardMovementStrategy.PRIMARY_FIRST); + .nodeInterleavedShardIterator(ShardMovementStrategy.PRIMARY_FIRST); boolean iteratingPrimary = true; int shardCount = 0; while (iterator.hasNext()) { @@ -173,7 +172,7 @@ public void testInterleavedShardIteratorNoPreference() { startInitializingShards(TEST_INDEX_2); final Iterator iterator = this.clusterState.getRoutingNodes() - .nodeInterleavedShardIterator(BalancedShardsAllocator.ShardMovementStrategy.NO_PREFERENCE); + .nodeInterleavedShardIterator(ShardMovementStrategy.NO_PREFERENCE); int shardCount = 0; while (iterator.hasNext()) { final ShardRouting shard = iterator.next(); @@ -192,7 +191,7 @@ public void testInterleavedShardIteratorReplicaFirst() { // Get replica first shard iterator and assert replica shards are iterated over first final Iterator iterator = this.clusterState.getRoutingNodes() - .nodeInterleavedShardIterator(BalancedShardsAllocator.ShardMovementStrategy.REPLICA_FIRST); + .nodeInterleavedShardIterator(ShardMovementStrategy.REPLICA_FIRST); boolean iteratingReplica = true; int shardCount = 0; while (iterator.hasNext()) { diff --git a/server/src/test/java/org/opensearch/cluster/routing/ShardMovementStrategyTests.java b/server/src/test/java/org/opensearch/cluster/routing/ShardMovementStrategyTests.java index 50799c8eb94e2..12994bdfcf6d5 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/ShardMovementStrategyTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/ShardMovementStrategyTests.java @@ -11,7 +11,6 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.cluster.ClusterStateListener; -import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.opensearch.common.settings.Settings; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; @@ -49,43 +48,37 @@ protected void createAndIndex(String index, int replicaCount, int shardCount) { flushAndRefresh(index); } - private static Settings.Builder getSettings( - BalancedShardsAllocator.ShardMovementStrategy shardMovementStrategy, - boolean movePrimaryFirst - ) { + private static Settings.Builder getSettings(ShardMovementStrategy shardMovementStrategy, boolean movePrimaryFirst) { return Settings.builder() .put("cluster.routing.allocation.shard_movement_strategy", shardMovementStrategy) .put("cluster.routing.allocation.move.primary_first", movePrimaryFirst); } public void testClusterGreenAfterPartialRelocationPrimaryFirstShardMovementMovePrimarySettingEnabled() throws InterruptedException { - testClusterGreenAfterPartialRelocation(BalancedShardsAllocator.ShardMovementStrategy.PRIMARY_FIRST, true); + testClusterGreenAfterPartialRelocation(ShardMovementStrategy.PRIMARY_FIRST, true); } public void testClusterGreenAfterPartialRelocationPrimaryFirstShardMovementMovePrimarySettingDisabled() throws InterruptedException { - testClusterGreenAfterPartialRelocation(BalancedShardsAllocator.ShardMovementStrategy.PRIMARY_FIRST, false); + testClusterGreenAfterPartialRelocation(ShardMovementStrategy.PRIMARY_FIRST, false); } public void testClusterGreenAfterPartialRelocationReplicaFirstShardMovementPrimaryFirstEnabled() throws InterruptedException { - testClusterGreenAfterPartialRelocation(BalancedShardsAllocator.ShardMovementStrategy.REPLICA_FIRST, true); + testClusterGreenAfterPartialRelocation(ShardMovementStrategy.REPLICA_FIRST, true); } public void testClusterGreenAfterPartialRelocationReplicaFirstShardMovementPrimaryFirstDisabled() throws InterruptedException { - testClusterGreenAfterPartialRelocation(BalancedShardsAllocator.ShardMovementStrategy.REPLICA_FIRST, false); + testClusterGreenAfterPartialRelocation(ShardMovementStrategy.REPLICA_FIRST, false); } public void testClusterGreenAfterPartialRelocationNoPreferenceShardMovementPrimaryFirstEnabled() throws InterruptedException { - testClusterGreenAfterPartialRelocation(BalancedShardsAllocator.ShardMovementStrategy.NO_PREFERENCE, true); + testClusterGreenAfterPartialRelocation(ShardMovementStrategy.NO_PREFERENCE, true); } - private boolean shouldMovePrimaryShardsFirst( - BalancedShardsAllocator.ShardMovementStrategy shardMovementStrategy, - boolean movePrimaryFirst - ) { - if (shardMovementStrategy == BalancedShardsAllocator.ShardMovementStrategy.NO_PREFERENCE && movePrimaryFirst) { + private boolean shouldMovePrimaryShardsFirst(ShardMovementStrategy shardMovementStrategy, boolean movePrimaryFirst) { + if (shardMovementStrategy == ShardMovementStrategy.NO_PREFERENCE && movePrimaryFirst) { return true; } - return shardMovementStrategy == BalancedShardsAllocator.ShardMovementStrategy.PRIMARY_FIRST; + return shardMovementStrategy == ShardMovementStrategy.PRIMARY_FIRST; } /** @@ -95,10 +88,8 @@ private boolean shouldMovePrimaryShardsFirst( * nodes in zone1. Depending on the shard movement strategy, we check whether the * primary or replica shards are moved first, and zone2 nodes have all the shards */ - private void testClusterGreenAfterPartialRelocation( - BalancedShardsAllocator.ShardMovementStrategy shardMovementStrategy, - boolean movePrimaryFirst - ) throws InterruptedException { + private void testClusterGreenAfterPartialRelocation(ShardMovementStrategy shardMovementStrategy, boolean movePrimaryFirst) + throws InterruptedException { internalCluster().startClusterManagerOnlyNodes(1); final String z1 = "zone-1", z2 = "zone-2"; final int primaryShardCount = 6;