Skip to content

Commit

Permalink
Prioritize primary shard movement during shard allocation (#1445)
Browse files Browse the repository at this point in the history
When some node or set of nodes is excluded (based on some cluster setting) 
BalancedShardsAllocator iterates over them in breadth first order picking 1 shard from 
each node and repeating the process until all shards are balanced. Since shards from 
each node are picked randomly it's possible the p and r of shard1 is relocated first 
leaving behind both p and r of shard2. If the excluded nodes were to go down the 
cluster becomes red. 

This commit introduces a new setting  "cluster.routing.allocation.move.primary_first" 
that prioritizes the p of both shard1 and shard2 first so the cluster does not become 
red if the excluded nodes were to go down before relocating other shards. Note that
with this setting enabled performance of this change is a direct function of number 
of indices, shards, replicas, and nodes. The larger the indices, replicas, and 
distribution scale, the slower the allocation becomes. This should be used with care.

Signed-off-by: Ankit Jain <jain.ankitk@gmail.com>
  • Loading branch information
jainankitk authored Jan 20, 2022
1 parent 8b8d041 commit 6eb8f6f
Show file tree
Hide file tree
Showing 7 changed files with 530 additions and 71 deletions.
130 changes: 103 additions & 27 deletions server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.Tuple;
import org.opensearch.index.Index;
import org.opensearch.index.shard.ShardId;

Expand All @@ -48,63 +49,140 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* A {@link RoutingNode} represents a cluster node associated with a single {@link DiscoveryNode} including all shards
* that are hosted on that nodes. Each {@link RoutingNode} has a unique node id that can be used to identify the node.
*/
public class RoutingNode implements Iterable<ShardRouting> {

static class BucketedShards implements Iterable<ShardRouting> {
private final Tuple<LinkedHashMap<ShardId, ShardRouting>, LinkedHashMap<ShardId, ShardRouting>> shardTuple; // LinkedHashMap to
// preserve order

BucketedShards(LinkedHashMap<ShardId, ShardRouting> primaryShards, LinkedHashMap<ShardId, ShardRouting> replicaShards) {
this.shardTuple = new Tuple(primaryShards, replicaShards);
}

public boolean isEmpty() {
return this.shardTuple.v1().isEmpty() && this.shardTuple.v2().isEmpty();
}

public int size() {
return this.shardTuple.v1().size() + this.shardTuple.v2().size();
}

public boolean containsKey(ShardId shardId) {
return this.shardTuple.v1().containsKey(shardId) || this.shardTuple.v2().containsKey(shardId);
}

public ShardRouting get(ShardId shardId) {
if (this.shardTuple.v1().containsKey(shardId)) {
return this.shardTuple.v1().get(shardId);
}
return this.shardTuple.v2().get(shardId);
}

public ShardRouting add(ShardRouting shardRouting) {
return put(shardRouting.shardId(), shardRouting);
}

public ShardRouting put(ShardId shardId, ShardRouting shardRouting) {
ShardRouting ret;
if (shardRouting.primary()) {
ret = this.shardTuple.v1().put(shardId, shardRouting);
if (this.shardTuple.v2().containsKey(shardId)) {
ret = this.shardTuple.v2().remove(shardId);
}
} else {
ret = this.shardTuple.v2().put(shardId, shardRouting);
if (this.shardTuple.v1().containsKey(shardId)) {
ret = this.shardTuple.v1().remove(shardId);
}
}

return ret;
}

public ShardRouting remove(ShardId shardId) {
if (this.shardTuple.v1().containsKey(shardId)) {
return this.shardTuple.v1().remove(shardId);
}
return this.shardTuple.v2().remove(shardId);
}

@Override
public Iterator<ShardRouting> iterator() {
final Iterator<ShardRouting> primaryIterator = Collections.unmodifiableCollection(this.shardTuple.v1().values()).iterator();
final Iterator<ShardRouting> replicaIterator = Collections.unmodifiableCollection(this.shardTuple.v2().values()).iterator();
return new Iterator<ShardRouting>() {
@Override
public boolean hasNext() {
return primaryIterator.hasNext() || replicaIterator.hasNext();
}

@Override
public ShardRouting next() {
if (primaryIterator.hasNext()) {
return primaryIterator.next();
}
return replicaIterator.next();
}
};
}
}

private final String nodeId;

private final DiscoveryNode node;

private final LinkedHashMap<ShardId, ShardRouting> shards; // LinkedHashMap to preserve order
private final BucketedShards shards;

private final LinkedHashSet<ShardRouting> initializingShards;

private final LinkedHashSet<ShardRouting> relocatingShards;

private final HashMap<Index, LinkedHashSet<ShardRouting>> shardsByIndex;

public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) {
this(nodeId, node, buildShardRoutingMap(shards));
}

RoutingNode(String nodeId, DiscoveryNode node, LinkedHashMap<ShardId, ShardRouting> shards) {
public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shardRoutings) {
this.nodeId = nodeId;
this.node = node;
this.shards = shards;
final LinkedHashMap<ShardId, ShardRouting> primaryShards = new LinkedHashMap<>();
final LinkedHashMap<ShardId, ShardRouting> replicaShards = new LinkedHashMap<>();
this.shards = new BucketedShards(primaryShards, replicaShards);
this.relocatingShards = new LinkedHashSet<>();
this.initializingShards = new LinkedHashSet<>();
this.shardsByIndex = new LinkedHashMap<>();
for (ShardRouting shardRouting : shards.values()) {

for (ShardRouting shardRouting : shardRoutings) {
if (shardRouting.initializing()) {
initializingShards.add(shardRouting);
} else if (shardRouting.relocating()) {
relocatingShards.add(shardRouting);
}
shardsByIndex.computeIfAbsent(shardRouting.index(), k -> new LinkedHashSet<>()).add(shardRouting);
}
assert invariant();
}

private static LinkedHashMap<ShardId, ShardRouting> buildShardRoutingMap(ShardRouting... shardRoutings) {
final LinkedHashMap<ShardId, ShardRouting> shards = new LinkedHashMap<>();
for (ShardRouting shardRouting : shardRoutings) {
ShardRouting previousValue = shards.put(shardRouting.shardId(), shardRouting);
ShardRouting previousValue;
if (shardRouting.primary()) {
previousValue = primaryShards.put(shardRouting.shardId(), shardRouting);
} else {
previousValue = replicaShards.put(shardRouting.shardId(), shardRouting);
}

if (previousValue != null) {
throw new IllegalArgumentException(
"Cannot have two different shards with same shard id " + shardRouting.shardId() + " on same node "
);
}
}
return shards;

assert invariant();
}

@Override
public Iterator<ShardRouting> iterator() {
return Collections.unmodifiableCollection(shards.values()).iterator();
return shards.iterator();
}

/**
Expand Down Expand Up @@ -139,7 +217,7 @@ public int size() {
*/
void add(ShardRouting shard) {
assert invariant();
if (shards.containsKey(shard.shardId())) {
if (shards.add(shard) != null) {
throw new IllegalStateException(
"Trying to add a shard "
+ shard.shardId()
Expand All @@ -152,7 +230,6 @@ void add(ShardRouting shard) {
+ "]"
);
}
shards.put(shard.shardId(), shard);

if (shard.initializing()) {
initializingShards.add(shard);
Expand Down Expand Up @@ -322,7 +399,7 @@ public int numberOfOwningShardsForIndex(final Index index) {
public String prettyPrint() {
StringBuilder sb = new StringBuilder();
sb.append("-----node_id[").append(nodeId).append("][").append(node == null ? "X" : "V").append("]\n");
for (ShardRouting entry : shards.values()) {
for (ShardRouting entry : shards) {
sb.append("--------").append(entry.shortSummary()).append('\n');
}
return sb.toString();
Expand All @@ -345,7 +422,9 @@ public String toString() {
}

public List<ShardRouting> copyShards() {
return new ArrayList<>(shards.values());
List<ShardRouting> result = new ArrayList<>();
shards.forEach(result::add);
return result;
}

public boolean isEmpty() {
Expand All @@ -355,23 +434,20 @@ public boolean isEmpty() {
private boolean invariant() {

// initializingShards must consistent with that in shards
Collection<ShardRouting> shardRoutingsInitializing = shards.values()
.stream()
Collection<ShardRouting> shardRoutingsInitializing = StreamSupport.stream(shards.spliterator(), false)
.filter(ShardRouting::initializing)
.collect(Collectors.toList());
assert initializingShards.size() == shardRoutingsInitializing.size();
assert initializingShards.containsAll(shardRoutingsInitializing);

// relocatingShards must consistent with that in shards
Collection<ShardRouting> shardRoutingsRelocating = shards.values()
.stream()
Collection<ShardRouting> shardRoutingsRelocating = StreamSupport.stream(shards.spliterator(), false)
.filter(ShardRouting::relocating)
.collect(Collectors.toList());
assert relocatingShards.size() == shardRoutingsRelocating.size();
assert relocatingShards.containsAll(shardRoutingsRelocating);

final Map<Index, Set<ShardRouting>> shardRoutingsByIndex = shards.values()
.stream()
final Map<Index, Set<ShardRouting>> shardRoutingsByIndex = StreamSupport.stream(shards.spliterator(), false)
.collect(Collectors.groupingBy(ShardRouting::index, Collectors.toSet()));
assert shardRoutingsByIndex.equals(shardsByIndex);

Expand Down
Loading

0 comments on commit 6eb8f6f

Please sign in to comment.