Skip to content

Commit

Permalink
add shard movement strategy setting
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <poojiraj@amazon.com>
  • Loading branch information
Poojita-Raj committed Jul 21, 2023
1 parent db838c4 commit e9308ca
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 29 deletions.
108 changes: 84 additions & 24 deletions server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.CollectionUtil;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.opensearch.core.Assertions;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -1314,21 +1315,23 @@ private void ensureMutable() {
* the first node, then the first shard of the second node, etc. until one shard from each node has been returned.
* The iterator then resumes on the first node by returning the second shard and continues until all shards from
* all the nodes have been returned.
* @param movePrimaryFirst if true, all primary shards are iterated over before iterating replica for any node
* @param shardMovementStrategy if ShardMovementStrategy.PRIMARY_FIRST, all primary shards are iterated over before iterating replica for any node
* if ShardMovementStrategy.REPLICA_FIRST, all replica shards are iterated over before iterating primary for any node
* if ShardMovementStrategy.NO_PREFERENCE, order of replica and primary shards doesn't matter in iteration
* @return iterator of shard routings
*/
public Iterator<ShardRouting> nodeInterleavedShardIterator(boolean movePrimaryFirst) {
public Iterator<ShardRouting> nodeInterleavedShardIterator(BalancedShardsAllocator.ShardMovementStrategy shardMovementStrategy) {
final Queue<Iterator<ShardRouting>> queue = new ArrayDeque<>();
for (Map.Entry<String, RoutingNode> entry : nodesToShards.entrySet()) {
queue.add(entry.getValue().copyShards().iterator());
}
if (movePrimaryFirst) {
if (shardMovementStrategy == BalancedShardsAllocator.ShardMovementStrategy.PRIMARY_FIRST) {
return new Iterator<ShardRouting>() {
private Queue<ShardRouting> replicaShards = new ArrayDeque<>();
private Queue<Iterator<ShardRouting>> replicaIterators = new ArrayDeque<>();

public boolean hasNext() {
while (!queue.isEmpty()) {
while (queue.isEmpty() == false) {
if (queue.peek().hasNext()) {
return true;
}
Expand Down Expand Up @@ -1378,32 +1381,89 @@ public void remove() {
}
};
} else {
return new Iterator<ShardRouting>() {
@Override
public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
if (shardMovementStrategy == BalancedShardsAllocator.ShardMovementStrategy.REPLICA_FIRST) {
return new Iterator<ShardRouting>() {
private Queue<ShardRouting> primaryShards = new ArrayDeque<>();
private Queue<Iterator<ShardRouting>> primaryIterators = new ArrayDeque<>();

public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
return true;
}
queue.poll();
}
if (!primaryShards.isEmpty()) {
return true;
}
queue.poll();
while (!primaryIterators.isEmpty()) {
if (primaryIterators.peek().hasNext()) {
return true;
}
primaryIterators.poll();
}
return false;
}
return false;
}

@Override
public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
}
while (!queue.isEmpty()) {
Iterator<ShardRouting> iter = queue.poll();
while (iter.hasNext()) {
ShardRouting result = iter.next();
if (result.primary() == false) {
queue.offer(iter);
return result;
}
primaryShards.offer(result);
primaryIterators.offer(iter);
}
}
if (primaryShards.isEmpty() == false) {
return primaryShards.poll();
}
Iterator<ShardRouting> primaryIterator = primaryIterators.poll();
ShardRouting primaryShard = primaryIterator.next();
primaryIterators.offer(primaryIterator);

assert primaryShard.primary();
return primaryShard;
}
Iterator<ShardRouting> iter = queue.poll();
queue.offer(iter);
return iter.next();
}

public void remove() {
throw new UnsupportedOperationException();
}
};
public void remove() {
throw new UnsupportedOperationException();
}
};
} else {
return new Iterator<ShardRouting>() {
@Override
public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
return true;
}
queue.poll();
}
return false;
}

@Override
public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
}
Iterator<ShardRouting> iter = queue.poll();
queue.offer(iter);
return iter.next();
}

public void remove() {
throw new UnsupportedOperationException();
}
};
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -109,6 +110,14 @@ public class BalancedShardsAllocator implements ShardsAllocator {
Property.Dynamic,
Property.NodeScope
);

public static final Setting<ShardMovementStrategy> SHARD_MOVEMENT_STRATEGY_SETTING = new Setting<ShardMovementStrategy>(
"cluster.routing.allocation.shard_movement_strategy",
ShardMovementStrategy.NO_PREFERENCE.toString(),
ShardMovementStrategy::parse,
Property.Dynamic,
Property.NodeScope
);
public static final Setting<Float> THRESHOLD_SETTING = Setting.floatSetting(
"cluster.routing.allocation.balance.threshold",
1.0f,
Expand All @@ -130,7 +139,33 @@ public class BalancedShardsAllocator implements ShardsAllocator {
Property.NodeScope
);

public enum ShardMovementStrategy {
NO_PREFERENCE,
PRIMARY_FIRST,
REPLICA_FIRST;

public static ShardMovementStrategy parse(String strValue) {
if (strValue == null) {
return null;
} else {
strValue = strValue.toUpperCase(Locale.ROOT);
try {
return ShardMovementStrategy.valueOf(strValue);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Illegal allocation.shard_movement_strategy value [" + strValue + "]");
}
}
}

@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}

}

private volatile boolean movePrimaryFirst;
private volatile ShardMovementStrategy shardMovementStrategy;

private volatile boolean preferPrimaryShardBalance;
private volatile WeightFunction weightFunction;
Expand All @@ -145,8 +180,10 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings));
setThreshold(THRESHOLD_SETTING.get(settings));
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction);
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
}
Expand All @@ -155,6 +192,10 @@ private void setMovePrimaryFirst(boolean movePrimaryFirst) {
this.movePrimaryFirst = movePrimaryFirst;
}

private void setShardMovementStrategy(ShardMovementStrategy shardMovementStrategy) {
this.shardMovementStrategy = shardMovementStrategy;
}

private void setWeightFunction(float indexBalance, float shardBalanceFactor) {
weightFunction = new WeightFunction(indexBalance, shardBalanceFactor);
}
Expand Down Expand Up @@ -184,6 +225,7 @@ public void allocate(RoutingAllocation allocation) {
logger,
allocation,
movePrimaryFirst,
shardMovementStrategy,
weightFunction,
threshold,
preferPrimaryShardBalance
Expand All @@ -205,6 +247,7 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
logger,
allocation,
movePrimaryFirst,
shardMovementStrategy,
weightFunction,
threshold,
preferPrimaryShardBalance
Expand Down Expand Up @@ -456,11 +499,12 @@ public Balancer(
Logger logger,
RoutingAllocation allocation,
boolean movePrimaryFirst,
ShardMovementStrategy shardMovementStrategy,
BalancedShardsAllocator.WeightFunction weight,
float threshold,
boolean preferPrimaryBalance
) {
super(logger, allocation, movePrimaryFirst, weight, threshold, preferPrimaryBalance);
super(logger, allocation, movePrimaryFirst, shardMovementStrategy, weight, threshold, preferPrimaryBalance);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class LocalShardsBalancer extends ShardsBalancer {
private final RoutingAllocation allocation;
private final RoutingNodes routingNodes;
private final boolean movePrimaryFirst;
private final BalancedShardsAllocator.ShardMovementStrategy shardMovementStrategy;

private final boolean preferPrimaryBalance;
private final BalancedShardsAllocator.WeightFunction weight;
Expand All @@ -74,6 +75,7 @@ public LocalShardsBalancer(
Logger logger,
RoutingAllocation allocation,
boolean movePrimaryFirst,
BalancedShardsAllocator.ShardMovementStrategy shardMovementStrategy,
BalancedShardsAllocator.WeightFunction weight,
float threshold,
boolean preferPrimaryBalance
Expand All @@ -93,6 +95,7 @@ public LocalShardsBalancer(
sorter = newNodeSorter();
inEligibleTargetNode = new HashSet<>();
this.preferPrimaryBalance = preferPrimaryBalance;
this.shardMovementStrategy = shardMovementStrategy;
}

/**
Expand Down Expand Up @@ -527,6 +530,22 @@ private void checkAndAddInEligibleTargetNode(RoutingNode targetNode) {
}
}

/**
* Returns the correct Shard movement strategy to use.
* If users are still using deprecated setting "move_primary_first", we want behavior to remain unchanged.
* In the event of changing ShardMovementStrategy setting from default setting NO_PREFERENCE to either PRIMARY_FIRST or REPLICA_FIRST, we want that
* to have priority over values set in move_primary_first setting.
*/
private BalancedShardsAllocator.ShardMovementStrategy getShardMovementStrategy() {
if (shardMovementStrategy != BalancedShardsAllocator.ShardMovementStrategy.NO_PREFERENCE) {
return shardMovementStrategy;
}
if (movePrimaryFirst) {
return BalancedShardsAllocator.ShardMovementStrategy.PRIMARY_FIRST;
}
return BalancedShardsAllocator.ShardMovementStrategy.NO_PREFERENCE;
}

/**
* Move started shards that can not be allocated to a node anymore
*
Expand All @@ -549,7 +568,8 @@ void moveShards() {
checkAndAddInEligibleTargetNode(currentNode.getRoutingNode());
}
boolean primariesThrottled = false;
for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(movePrimaryFirst); it.hasNext();) {
for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(getShardMovementStrategy()); it
.hasNext();) {
// Verify if the cluster concurrent recoveries have been reached.
if (allocation.deciders().canMoveAnyShard(allocation).type() != Decision.Type.YES) {
logger.info(
Expand All @@ -574,7 +594,9 @@ void moveShards() {
}

// Ensure that replicas don't relocate if primaries are being throttled and primary first is enabled
if (movePrimaryFirst && primariesThrottled && !shardRouting.primary()) {
if ((movePrimaryFirst || shardMovementStrategy == BalancedShardsAllocator.ShardMovementStrategy.PRIMARY_FIRST)
&& primariesThrottled
&& !shardRouting.primary()) {
logger.info(
"Cannot move any replica shard in the cluster as movePrimaryFirst is enabled and primary shards"
+ "are being throttled. Skipping shard iteration"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ public void apply(Settings value, Settings current, Settings previous) {
BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING,
BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE,
BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING,
BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING,
BalancedShardsAllocator.THRESHOLD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING,
Expand Down
Loading

0 comments on commit e9308ca

Please sign in to comment.