Skip to content

Commit

Permalink
Enhance search preference based routing for WRR (#6834)
Browse files Browse the repository at this point in the history
Signed-off-by: Varun Bansal <bansvaru@amazon.com>
  • Loading branch information
linuxpi authored Mar 30, 2023
1 parent 285b450 commit d8d6e73
Show file tree
Hide file tree
Showing 9 changed files with 597 additions and 64 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Segment Replication] Add new cluster setting to set replication strategy by default for all indices in cluster. ([#6791](https://github.com/opensearch-project/OpenSearch/pull/6791))
- Enable sort optimization for all NumericTypes ([#6464](https://github.com/opensearch-project/OpenSearch/pull/6464)
- Remove 'cluster_manager' role attachment when using 'node.master' deprecated setting ([#6331](https://github.com/opensearch-project/OpenSearch/pull/6331))
- Add new cluster settings to ignore weighted round-robin routing and fallback to default behaviour. ([#6834](https://github.com/opensearch-project/OpenSearch/pull/6834))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.util.List;

import static org.opensearch.cluster.routing.OperationRouting.IGNORE_WEIGHTED_SHARD_ROUTING;

/**
* This class contains logic to find next shard to retry search request in case of failure from other shard copy.
* This decides if retryable shard search requests can be tried on shard copies present in data
Expand Down Expand Up @@ -72,9 +74,13 @@ public SearchShardTarget findNext(
Runnable onShardSkipped
) {
SearchShardTarget next = shardIt.nextOrNull();
if (ignoreWeightedRouting(clusterState)) {
return next;
}

while (next != null && WeightedRoutingUtils.isWeighedAway(next.getNodeId(), clusterState)) {
SearchShardTarget nextShard = next;
if (canFailOpen(nextShard.getShardId(), exception, clusterState)) {
if (canFailOpen(nextShard.getShardId(), shardIt.size(), exception, clusterState)) {
logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.getShardId()), exception);
getWeightedRoutingStats().updateFailOpenCount();
break;
Expand All @@ -98,10 +104,13 @@ public SearchShardTarget findNext(
*/
public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState clusterState, Exception exception, Runnable onShardSkipped) {
ShardRouting next = shardsIt.nextOrNull();
if (ignoreWeightedRouting(clusterState)) {
return next;
}

while (next != null && WeightedRoutingUtils.isWeighedAway(next.currentNodeId(), clusterState)) {
ShardRouting nextShard = next;
if (canFailOpen(nextShard.shardId(), exception, clusterState)) {
if (canFailOpen(nextShard.shardId(), shardsIt.size(), exception, clusterState)) {
logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.shardId()), exception);
getWeightedRoutingStats().updateFailOpenCount();
break;
Expand All @@ -117,8 +126,8 @@ public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState cluster
* @return true if can fail open ie request shard copies present in nodes with weighted shard
* routing weight set to zero
*/
private boolean canFailOpen(ShardId shardId, Exception exception, ClusterState clusterState) {
return isInternalFailure(exception) || hasInActiveShardCopies(clusterState, shardId);
private boolean canFailOpen(ShardId shardId, int shardItSize, Exception exception, ClusterState clusterState) {
return shardItSize == 1 || isInternalFailure(exception) || hasInActiveShardCopies(clusterState, shardId);
}

private boolean hasInActiveShardCopies(ClusterState clusterState, ShardId shardId) {
Expand All @@ -131,6 +140,10 @@ private boolean hasInActiveShardCopies(ClusterState clusterState, ShardId shardI
return false;
}

private boolean ignoreWeightedRouting(ClusterState clusterState) {
return IGNORE_WEIGHTED_SHARD_ROUTING.get(clusterState.getMetadata().settings());
}

public WeightedRoutingStats getWeightedRoutingStats() {
return WeightedRoutingStats.getInstance();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,12 @@ public ShardIterator activeInitializingShardsWeightedIt(
WeightedRouting weightedRouting,
DiscoveryNodes nodes,
double defaultWeight,
boolean isFailOpenEnabled
boolean isFailOpenEnabled,
@Nullable Integer seed
) {
final int seed = shufflerForWeightedRouting.nextSeed();
if (seed == null) {
seed = shufflerForWeightedRouting.nextSeed();
}
List<ShardRouting> ordered = activeInitializingShardsWithWeights(weightedRouting, nodes, defaultWeight, seed);

// append shards for attribute value with weight zero, so that shard search requests can be tried on
Expand All @@ -350,6 +353,7 @@ public ShardIterator activeInitializingShardsWeightedIt(
logger.debug("no shard copies found for shard id [{}] for node attribute with weight zero", shardId);
}
}

return new PlainShardIterator(shardId, ordered);
}

Expand All @@ -371,21 +375,6 @@ private List<ShardRouting> activeInitializingShardsWithWeights(
return orderedListWithDistinctShards;
}

/**
* Returns an iterator over active and initializing shards, shards are ordered by weighted
* round-robin scheduling policy. Uses the passed seed to shuffle the shards.
*
*/
public ShardIterator activeInitializingShardsSimpleWeightedIt(
WeightedRouting weightedRouting,
DiscoveryNodes nodes,
double defaultWeight,
int seed
) {
List<ShardRouting> ordered = activeInitializingShardsWithWeights(weightedRouting, nodes, defaultWeight, seed);
return new PlainShardIterator(shardId, ordered);
}

/**
* Returns a list containing shard routings ordered using weighted round-robin scheduling.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,30 @@ public class OperationRouting {

public static final Setting<Boolean> STRICT_WEIGHTED_SHARD_ROUTING_ENABLED = Setting.boolSetting(
"cluster.routing.weighted.strict",
true,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<Boolean> IGNORE_WEIGHTED_SHARD_ROUTING = Setting.boolSetting(
"cluster.routing.ignore_weighted_routing",
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private static final List<Preference> WEIGHTED_ROUTING_RESTRICTED_PREFERENCES = Arrays.asList(
Preference.ONLY_NODES,
Preference.PREFER_NODES
);

private volatile List<String> awarenessAttributes;
private volatile boolean useAdaptiveReplicaSelection;
private volatile boolean ignoreAwarenessAttr;
private volatile double weightedRoutingDefaultWeight;
private volatile boolean isFailOpenEnabled;
private volatile boolean isStrictWeightedShardRouting;
private volatile boolean ignoreWeightedRouting;

public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
// whether to ignore awareness attributes when routing requests
Expand All @@ -116,11 +130,13 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
this.weightedRoutingDefaultWeight = WEIGHTED_ROUTING_DEFAULT_WEIGHT.get(settings);
this.isFailOpenEnabled = WEIGHTED_ROUTING_FAILOPEN_ENABLED.get(settings);
this.isStrictWeightedShardRouting = STRICT_WEIGHTED_SHARD_ROUTING_ENABLED.get(settings);
this.ignoreWeightedRouting = IGNORE_WEIGHTED_SHARD_ROUTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection);
clusterSettings.addSettingsUpdateConsumer(IGNORE_AWARENESS_ATTRIBUTES_SETTING, this::setIgnoreAwarenessAttributes);
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_DEFAULT_WEIGHT, this::setWeightedRoutingDefaultWeight);
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled);
clusterSettings.addSettingsUpdateConsumer(STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, this::setStrictWeightedShardRouting);
clusterSettings.addSettingsUpdateConsumer(IGNORE_WEIGHTED_SHARD_ROUTING, this::setIgnoreWeightedRouting);
}

void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {
Expand All @@ -143,6 +159,10 @@ void setStrictWeightedShardRouting(boolean strictWeightedShardRouting) {
this.isStrictWeightedShardRouting = strictWeightedShardRouting;
}

void setIgnoreWeightedRouting(boolean isWeightedRoundRobinEnabled) {
this.ignoreWeightedRouting = isWeightedRoundRobinEnabled;
}

public boolean isIgnoreAwarenessAttr() {
return ignoreAwarenessAttr;
}
Expand Down Expand Up @@ -314,11 +334,7 @@ private ShardIterator preferenceActiveShardIterator(
}
}
preferenceType = Preference.parse(preference);
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet() && isStrictWeightedShardRouting) {
throw new PreferenceBasedSearchNotAllowedException(
"Preference type based routing not allowed with strict weighted shard routing enabled"
);
}
checkPreferenceBasedRoutingAllowed(preferenceType, weightedRoutingMetadata);
switch (preferenceType) {
case PREFER_NODES:
final Set<String> nodesIds = Arrays.stream(preference.substring(Preference.PREFER_NODES.type().length() + 1).split(","))
Expand All @@ -344,11 +360,16 @@ private ShardIterator preferenceActiveShardIterator(
// for a different element in the list by also incorporating the
// shard ID into the hash of the user-supplied preference key.
routingHash = 31 * routingHash + indexShard.shardId.hashCode();
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet() && isStrictWeightedShardRouting) {
return indexShard.activeInitializingShardsSimpleWeightedIt(
if (WeightedRoutingUtils.shouldPerformStrictWeightedRouting(
isStrictWeightedShardRouting,
ignoreWeightedRouting,
weightedRoutingMetadata
)) {
return indexShard.activeInitializingShardsWeightedIt(
weightedRoutingMetadata.getWeightedRouting(),
nodes,
getWeightedRoutingDefaultWeight(),
isFailOpenEnabled,
routingHash
);
} else if (ignoreAwarenessAttributes()) {
Expand All @@ -365,12 +386,13 @@ private ShardIterator shardRoutings(
@Nullable Map<String, Long> nodeCounts,
@Nullable WeightedRoutingMetadata weightedRoutingMetadata
) {
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet()) {
if (WeightedRoutingUtils.shouldPerformWeightedRouting(ignoreWeightedRouting, weightedRoutingMetadata)) {
return indexShard.activeInitializingShardsWeightedIt(
weightedRoutingMetadata.getWeightedRouting(),
nodes,
getWeightedRoutingDefaultWeight(),
isFailOpenEnabled
isFailOpenEnabled,
null
);
} else if (ignoreAwarenessAttributes()) {
if (useAdaptiveReplicaSelection) {
Expand Down Expand Up @@ -438,4 +460,15 @@ private static int calculateScaledShardId(IndexMetadata indexMetadata, String ef
return Math.floorMod(hash, indexMetadata.getRoutingNumShards()) / indexMetadata.getRoutingFactor();
}

private void checkPreferenceBasedRoutingAllowed(Preference preference, @Nullable WeightedRoutingMetadata weightedRoutingMetadata) {
if (WeightedRoutingUtils.shouldPerformStrictWeightedRouting(
isStrictWeightedShardRouting,
ignoreWeightedRouting,
weightedRoutingMetadata
) && WEIGHTED_ROUTING_RESTRICTED_PREFERENCES.contains(preference)) {
throw new PreferenceBasedSearchNotAllowedException(
"Preference type based routing not allowed with strict weighted shard routing enabled"
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,16 @@ public static boolean isWeighedAway(String nodeId, ClusterState clusterState) {
}
return false;
}

public static boolean shouldPerformWeightedRouting(boolean ignoreWeightedRouting, WeightedRoutingMetadata weightedRoutingMetadata) {
return !ignoreWeightedRouting && weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet();
}

public static boolean shouldPerformStrictWeightedRouting(
boolean isStrictWeightedShardRouting,
boolean ignoreWeightedRouting,
WeightedRoutingMetadata weightedRoutingMetadata
) {
return isStrictWeightedShardRouting && shouldPerformWeightedRouting(ignoreWeightedRouting, weightedRoutingMetadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ public void apply(Settings value, Settings current, Settings previous) {
OperationRouting.WEIGHTED_ROUTING_DEFAULT_WEIGHT,
OperationRouting.WEIGHTED_ROUTING_FAILOPEN_ENABLED,
OperationRouting.STRICT_WEIGHTED_SHARD_ROUTING_ENABLED,
OperationRouting.IGNORE_WEIGHTED_SHARD_ROUTING,
IndexGraveyard.SETTING_MAX_TOMBSTONES,
PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING,
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,
Expand Down
Loading

0 comments on commit d8d6e73

Please sign in to comment.