Skip to content

Commit

Permalink
Make balanced shards allocator timebound (opensearch-project#15239)
Browse files Browse the repository at this point in the history
* Make balanced shards allocator time bound to prioritise critical operations waiting in the pending task queue

Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN authored and akolarkunnu committed Sep 10, 2024
1 parent ba8f3da commit f658f31
Show file tree
Hide file tree
Showing 8 changed files with 591 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897))
- Support filtering on a large list encoded by bitmap ([#14774](https://github.com/opensearch-project/OpenSearch/pull/14774))
- Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153))
- Make balanced shards allocator timebound ([#15239](https://github.com/opensearch-project/OpenSearch/pull/15239))
- Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325))
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1439,7 +1439,9 @@ public void remove() {
*/
public Iterator<ShardRouting> nodeInterleavedShardIterator(ShardMovementStrategy shardMovementStrategy) {
final Queue<Iterator<ShardRouting>> queue = new ArrayDeque<>();
for (Map.Entry<String, RoutingNode> entry : nodesToShards.entrySet()) {
List<Map.Entry<String, RoutingNode>> nodesToShardsEntrySet = new ArrayList<>(nodesToShards.entrySet());
Randomness.shuffle(nodesToShardsEntrySet);
for (Map.Entry<String, RoutingNode> entry : nodesToShardsEntrySet) {
queue.add(entry.getValue().copyShards().iterator());
}
if (shardMovementStrategy == ShardMovementStrategy.PRIMARY_FIRST) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -87,6 +88,7 @@
public class BalancedShardsAllocator implements ShardsAllocator {

private static final Logger logger = LogManager.getLogger(BalancedShardsAllocator.class);
public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20);

public static final Setting<Float> INDEX_BALANCE_FACTOR_SETTING = Setting.floatSetting(
"cluster.routing.allocation.balance.index",
Expand Down Expand Up @@ -169,6 +171,23 @@ public class BalancedShardsAllocator implements ShardsAllocator {
Property.NodeScope
);

public static final Setting<TimeValue> ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.routing.allocation.balanced_shards_allocator.allocator_timeout",
TimeValue.MINUS_ONE,
TimeValue.MINUS_ONE,
timeValue -> {
if (timeValue.compareTo(MIN_ALLOCATOR_TIMEOUT) < 0 && timeValue.compareTo(TimeValue.MINUS_ONE) != 0) {
throw new IllegalArgumentException(
"Setting ["
+ "cluster.routing.allocation.balanced_shards_allocator.allocator_timeout"
+ "] should be more than 20s or -1ms to disable timeout"
);
}
},
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private volatile boolean movePrimaryFirst;
private volatile ShardMovementStrategy shardMovementStrategy;

Expand All @@ -181,6 +200,8 @@ public class BalancedShardsAllocator implements ShardsAllocator {
private volatile float threshold;

private volatile boolean ignoreThrottleInRestore;
private volatile TimeValue allocatorTimeout;
private long startTime;

public BalancedShardsAllocator(Settings settings) {
this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
Expand All @@ -197,6 +218,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
setAllocatorTimeout(ALLOCATOR_TIMEOUT_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
Expand All @@ -206,6 +228,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance);
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
clusterSettings.addSettingsUpdateConsumer(IGNORE_THROTTLE_FOR_REMOTE_RESTORE, this::setIgnoreThrottleInRestore);
clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout);
}

/**
Expand Down Expand Up @@ -284,6 +307,20 @@ private void setThreshold(float threshold) {
this.threshold = threshold;
}

private void setAllocatorTimeout(TimeValue allocatorTimeout) {
this.allocatorTimeout = allocatorTimeout;
}

protected boolean allocatorTimedOut() {
if (allocatorTimeout.equals(TimeValue.MINUS_ONE)) {
if (logger.isTraceEnabled()) {
logger.trace("Allocator timeout is disabled. Will not short circuit allocator tasks");
}
return false;
}
return System.nanoTime() - this.startTime > allocatorTimeout.nanos();
}

@Override
public void allocate(RoutingAllocation allocation) {
if (allocation.routingNodes().size() == 0) {
Expand All @@ -298,8 +335,10 @@ public void allocate(RoutingAllocation allocation) {
threshold,
preferPrimaryShardBalance,
preferPrimaryShardRebalance,
ignoreThrottleInRestore
ignoreThrottleInRestore,
this::allocatorTimedOut
);
this.startTime = System.nanoTime();
localShardsBalancer.allocateUnassigned();
localShardsBalancer.moveShards();
localShardsBalancer.balance();
Expand All @@ -321,7 +360,8 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
threshold,
preferPrimaryShardBalance,
preferPrimaryShardRebalance,
ignoreThrottleInRestore
ignoreThrottleInRestore,
() -> false // as we don't need to check if timed out or not while just understanding ShardAllocationDecision
);
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN;
MoveDecision moveDecision = MoveDecision.NOT_TAKEN;
Expand Down Expand Up @@ -585,7 +625,7 @@ public Balancer(
float threshold,
boolean preferPrimaryBalance
) {
super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false);
super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false, () -> false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -71,6 +72,7 @@ public class LocalShardsBalancer extends ShardsBalancer {
private final float avgPrimaryShardsPerNode;
private final BalancedShardsAllocator.NodeSorter sorter;
private final Set<RoutingNode> inEligibleTargetNode;
private final Supplier<Boolean> timedOutFunc;
private int totalShardCount = 0;

public LocalShardsBalancer(
Expand All @@ -81,7 +83,8 @@ public LocalShardsBalancer(
float threshold,
boolean preferPrimaryBalance,
boolean preferPrimaryRebalance,
boolean ignoreThrottleInRestore
boolean ignoreThrottleInRestore,
Supplier<Boolean> timedOutFunc
) {
this.logger = logger;
this.allocation = allocation;
Expand All @@ -99,6 +102,7 @@ public LocalShardsBalancer(
this.preferPrimaryRebalance = preferPrimaryRebalance;
this.shardMovementStrategy = shardMovementStrategy;
this.ignoreThrottleInRestore = ignoreThrottleInRestore;
this.timedOutFunc = timedOutFunc;
}

/**
Expand Down Expand Up @@ -344,6 +348,14 @@ private void balanceByWeights() {
final BalancedShardsAllocator.ModelNode[] modelNodes = sorter.modelNodes;
final float[] weights = sorter.weights;
for (String index : buildWeightOrderedIndices()) {
// Terminate if the time allocated to the balanced shards allocator has elapsed
if (timedOutFunc != null && timedOutFunc.get()) {
logger.info(
"Cannot balance any shard in the cluster as time allocated to balanced shards allocator has elapsed"
+ ". Skipping indices iteration"
);
return;
}
IndexMetadata indexMetadata = metadata.index(index);

// find nodes that have a shard of this index or where shards of this index are allowed to be allocated to,
Expand All @@ -368,6 +380,14 @@ private void balanceByWeights() {
int lowIdx = 0;
int highIdx = relevantNodes - 1;
while (true) {
// break if the time allocated to the balanced shards allocator has elapsed
if (timedOutFunc != null && timedOutFunc.get()) {
logger.info(
"Cannot balance any shard in the cluster as time allocated to balanced shards allocator has elapsed"
+ ". Skipping relevant nodes iteration"
);
return;
}
final BalancedShardsAllocator.ModelNode minNode = modelNodes[lowIdx];
final BalancedShardsAllocator.ModelNode maxNode = modelNodes[highIdx];
advance_range: if (maxNode.numShards(index) > 0) {
Expand Down Expand Up @@ -572,6 +592,15 @@ void moveShards() {
return;
}

// Terminate if the time allocated to the balanced shards allocator has elapsed
if (timedOutFunc != null && timedOutFunc.get()) {
logger.info(
"Cannot move any shard in the cluster as time allocated to balanced shards allocator has elapsed"
+ ". Skipping shard iteration"
);
return;
}

ShardRouting shardRouting = it.next();

if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
Expand Down Expand Up @@ -799,8 +828,23 @@ void allocateUnassigned() {
int secondaryLength = 0;
int primaryLength = primary.length;
ArrayUtil.timSort(primary, comparator);
if (logger.isTraceEnabled()) {
logger.trace("Staring allocation of [{}] unassigned shards", primaryLength);
}
do {
for (int i = 0; i < primaryLength; i++) {
if (timedOutFunc != null && timedOutFunc.get()) {
// TODO - maybe check if we can allow wait for active shards thingy bypass this condition
logger.info(
"Ignoring [{}] unassigned shards for allocation as time allocated to balanced shards allocator has elapsed",
(primaryLength - i)
);
while (i < primaryLength) {
unassigned.ignoreShard(primary[i], UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes());
i++;
}
return;
}
ShardRouting shard = primary[i];
final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(shard);
final String assignedNodeId = allocationDecision.getTargetNode() != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ public void apply(Settings value, Settings current, Settings previous) {
BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING,
BalancedShardsAllocator.THRESHOLD_SETTING,
BalancedShardsAllocator.IGNORE_THROTTLE_FOR_REMOTE_RESTORE,
BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_TYPE,
Expand Down
Loading

0 comments on commit f658f31

Please sign in to comment.