Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Weighted Round Robin policy for shard coordination traffic routing #3738

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.opensearch.index.Index;
import org.opensearch.index.shard.ShardId;
import org.opensearch.node.ResponseCollectorService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -56,6 +55,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import static java.util.Collections.emptyMap;
Expand All @@ -81,6 +81,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
final List<ShardRouting> assignedShards;
final Set<String> allAllocationIds;
final boolean allShardsStarted;
AtomicInteger lastSelectedShardWRR;

private volatile Map<AttributesKey, AttributesRoutings> activeShardsByAttributes = emptyMap();
private volatile Map<AttributesKey, AttributesRoutings> initializingShardsByAttributes = emptyMap();
Expand All @@ -96,6 +97,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
this.shardId = shardId;
this.shuffler = new RotationShardShuffler(Randomness.get().nextInt());
this.shards = Collections.unmodifiableList(shards);
this.lastSelectedShardWRR = new AtomicInteger(-2);

ShardRouting primary = null;
List<ShardRouting> replicas = new ArrayList<>();
Expand Down Expand Up @@ -292,6 +294,44 @@ public ShardIterator activeInitializingShardsRankedIt(
return new PlainShardIterator(shardId, ordered);
}

public ShardIterator activeInitializingShardsWRR(List<String> weightsWRR, DiscoveryNodes nodes) {
final int seed = shuffler.nextSeed();
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size());
// TODO : Handle initializing shards
List<WeightedRoundRobin.Entity<ShardRouting>> weightedShards = calculateShardWeight(activeShards, weightsWRR, nodes);
lastSelectedShardWRR.getAndIncrement();
WeightedRoundRobin<ShardRouting> wrr = new WeightedRoundRobin<>(-1);
List<WeightedRoundRobin.Entity<ShardRouting>> wrrShards = wrr.orderEntities(weightedShards);

for (WeightedRoundRobin.Entity<ShardRouting> shardRouting : wrrShards) {
ordered.add(shardRouting.getTarget());
}
return new PlainShardIterator(shardId, shuffler.shuffle(ordered, seed));
}

private List<WeightedRoundRobin.Entity<ShardRouting>> calculateShardWeight(
List<ShardRouting> activeShards,
List<String> weightsWRR,
DiscoveryNodes nodes
) {

Map<String, Double> zoneWeightMap = new HashMap<>();
// TODO: Store weights in settings as map Or create a new api for weights
for (String val : weightsWRR) {
String zone = val.split(":")[0];
String weight = val.split(":")[1];
zoneWeightMap.put(zone, Double.parseDouble(weight));
}
List<WeightedRoundRobin.Entity<ShardRouting>> weightedShards = new ArrayList<>();
for (ShardRouting shard : shards) {
shard.currentNodeId();
DiscoveryNode node = nodes.get(shard.currentNodeId());
String attVal = node.getAttributes().get("zone");
weightedShards.add(new WeightedRoundRobin.Entity<>(zoneWeightMap.get(attVal), shard));
}
return weightedShards;
}

private static Set<String> getAllNodeIds(final List<ShardRouting> shards) {
final Set<String> nodeIds = new HashSet<>();
for (ShardRouting shard : shards) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ public class OperationRouting {
Setting.Property.NodeScope
);

public static final Setting<Boolean> USE_WEIGHTED_ROUND_ROBIN = Setting.boolSetting(
"cluster.routing.use_weighted_round_robin",
true,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final String IGNORE_AWARENESS_ATTRIBUTES = "cluster.search.ignore_awareness_attributes";
public static final Setting<Boolean> IGNORE_AWARENESS_ATTRIBUTES_SETTING = Setting.boolSetting(
IGNORE_AWARENESS_ATTRIBUTES,
Expand All @@ -78,6 +85,9 @@ public class OperationRouting {
private volatile List<String> awarenessAttributes;
private volatile boolean useAdaptiveReplicaSelection;
private volatile boolean ignoreAwarenessAttr;
private List<String> weightsWRR;

private volatile boolean useWeightedRoundRobin;

public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
// whether to ignore awareness attributes when routing requests
Expand All @@ -88,8 +98,11 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
this::setAwarenessAttributes
);
this.useAdaptiveReplicaSelection = USE_ADAPTIVE_REPLICA_SELECTION_SETTING.get(settings);
this.useWeightedRoundRobin = USE_WEIGHTED_ROUND_ROBIN.get(settings);
clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection);
clusterSettings.addSettingsUpdateConsumer(IGNORE_AWARENESS_ATTRIBUTES_SETTING, this::setIgnoreAwarenessAttributes);
this.weightsWRR = WeightedRoundRobin.WEIGHTS_WRR.get(settings);

}

void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {
Expand All @@ -100,6 +113,14 @@ void setIgnoreAwarenessAttributes(boolean ignoreAwarenessAttributes) {
this.ignoreAwarenessAttr = ignoreAwarenessAttributes;
}

public boolean isUseWeightedRoundRobin() {
return useWeightedRoundRobin;
}

public void setUseWeightedRoundRobin(boolean useWeightedRoundRobin) {
this.useWeightedRoundRobin = useWeightedRoundRobin;
}

public boolean isIgnoreAwarenessAttr() {
return ignoreAwarenessAttr;
}
Expand Down Expand Up @@ -301,7 +322,10 @@ private ShardIterator shardRoutings(
@Nullable Map<String, Long> nodeCounts
) {
if (ignoreAwarenessAttributes()) {
if (useAdaptiveReplicaSelection) {
// TODO: move call for WRR to right call
if (useWeightedRoundRobin) {
return indexShard.activeInitializingShardsWRR(weightsWRR, nodes);
} else if (useAdaptiveReplicaSelection) {
return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
} else {
return indexShard.activeInitializingShardsRandomIt();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.common.settings.Setting;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;

import static java.util.Collections.emptyList;

public class WeightedRoundRobin<T> implements Iterator<T>, Iterable<T> {

private List<WeightedRoundRobin.Entity<T>> entities;
private int turn;
private int lastSelectedEntity;
private double currentWeight = 0;

public static final Setting<List<String>> WEIGHTS_WRR = Setting.listSetting(
"cluster.routing.shard_routing.wrrweights",
emptyList(),
Function.identity(),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public WeightedRoundRobin(int lastSelectedEntity) {
this.entities = null;
this.turn = 0;
this.lastSelectedEntity = lastSelectedEntity;
}

/* (non-Javadoc)
* @see java.util.Iterator#hasNext()
*/
@Override
public boolean hasNext() {
return entities.size() > 0;
}

/* (non-Javadoc)
* @see java.util.Iterator#next()
*/
@Override
public T next() {
Entity<T> entity = entities.get(turn++);
return entity.getTarget();
}

/* (non-Javadoc)
* @see java.lang.Iterable#iterator()
*/
@Override
public Iterator<T> iterator() {
return this;
}

public List<WeightedRoundRobin.Entity<T>> orderEntities(List<WeightedRoundRobin.Entity<T>> entities) {
int size = entities.size();
List<WeightedRoundRobin.Entity<T>> orderedWeight = new ArrayList<>();
if (size <= 0) {
return null;
}
if (size == 1) {
return entities;
}

// Find maximum weight and greatest common divisor of weight across all entities
double maxWeight = 0;
double sumWeight = 0;
Double gcd = null;
for (WeightedRoundRobin.Entity<T> entity : entities) {
maxWeight = Math.max(maxWeight, entity.getWeight());
gcd = (gcd == null) ? entity.getWeight() : gcd(gcd, entity.getWeight());
sumWeight += entity.getWeight();
}
int count = 0;
while (count < sumWeight) {
lastSelectedEntity = (lastSelectedEntity + 1) % size;
if (lastSelectedEntity == 0) {
currentWeight = currentWeight - gcd;
if (currentWeight <= 0) {
currentWeight = maxWeight;
if (currentWeight == 0) {
return orderedWeight;
}
}
}
if (entities.get(lastSelectedEntity).getWeight() >= currentWeight) {
orderedWeight.add(entities.get(lastSelectedEntity));
count++;
}
}
return orderedWeight;

}

/**
* Return greatest common divisor for two integers
* https://en.wikipedia.org/wiki/Greatest_common_divisor#Using_Euclid.27s_algorithm
*
* @param a
* @param b
* @return greatest common divisor
*/
private double gcd(double a, double b) {
return (b == 0) ? a : gcd(b, a % b);
}

static final class Entity<T> {

private double weight;
private T target;

public Entity(double weight, T target) {
this.weight = weight;
this.target = target;
}

public T getTarget() {
return this.target;
}

public void setTarget(T target) {
this.target = target;
}

public double getWeight() {
return this.weight;
}

public void setWeight(double weight) {
this.weight = weight;
}
}

}
Loading